#!/usr/bin/env python3 """ prepare_data.py Scans local wbg_extractions, identifies docs with real _direct_judged.jsonl (not dummy), detects language, uploads English docs to HF, and generates/uploads an updated wbg_pdf_links.json. Usage: # Dry run (scan only, no uploads): uv run --with huggingface_hub,requests,langdetect python3 prepare_data.py --dry-run # Upload missing docs + generate new pdf_links: uv run --with huggingface_hub,requests,langdetect python3 prepare_data.py # Only generate pdf_links without uploading docs: uv run --with huggingface_hub,requests,langdetect python3 prepare_data.py --links-only Requires: huggingface_hub, requests, langdetect """ import argparse import json import os import sys import requests from pathlib import Path from huggingface_hub import HfApi from langdetect import detect, LangDetectException # ─── Configuration ─────────────────────────────── HF_TOKEN = os.environ.get("HF_TOKEN") REPO_ID = "ai4data/annotation_data" LOCAL_BASE = Path(__file__).parent / "annotation_data" / "wbg_extractions" LINKS_REPO_PATH = "annotation_data/wbg_data/wbg_pdf_links.json" def get_hf_token(): """Get HF token from env, .env file, or cached token.""" if HF_TOKEN: return HF_TOKEN env_path = Path(__file__).parent / ".env" if env_path.exists(): for line in env_path.read_text().splitlines(): if line.startswith("HF_TOKEN="): return line.split("=", 1)[1].strip() cached = Path.home() / ".cache" / "huggingface" / "token" if cached.exists(): return cached.read_text().strip() return None def detect_language(doc_path): """ Detect language of a document by sampling pages 2-5 (skipping first page which often contains abbreviation tables / currency equivalents). Returns ISO 639-1 language code (e.g. 'en', 'fr', 'ar'). """ try: data = json.loads(Path(doc_path).read_text()) # Sample from pages 2-5 to avoid abbreviation-heavy first pages texts = " ".join(p.get("input_text", "")[:500] for p in data[1:5]) if len(texts.strip()) < 50: # Fallback to first 3 pages if later pages are empty texts = " ".join(p.get("input_text", "")[:500] for p in data[:3]) return detect(texts) except (LangDetectException, json.JSONDecodeError, FileNotFoundError): return "unknown" def scan_local_docs(): """Scan local wbg_extractions and classify docs with language detection.""" docs = sorted( [d for d in os.listdir(LOCAL_BASE) if d.startswith("doc_")], key=lambda x: int(x.split("_")[1]), ) results = {"real": [], "real_non_english": [], "dummy": [], "no_file": []} for doc in docs: idx = int(doc.split("_")[1]) raw_dir = LOCAL_BASE / doc / "raw" real_file = raw_dir / f"{doc}_direct_judged.jsonl" dummy_file = raw_dir / f"{doc}_dummy_direct_judged.jsonl" if real_file.exists(): lang = detect_language(str(real_file)) entry = {"name": doc, "index": idx, "path": str(real_file), "language": lang} if lang == "en": results["real"].append(entry) else: results["real_non_english"].append(entry) elif dummy_file.exists(): results["dummy"].append({"name": doc, "index": idx}) else: results["no_file"].append({"name": doc, "index": idx}) return results def get_existing_hf_docs(api): """Check which docs already have _direct_judged.jsonl on HF.""" try: items = list(api.list_repo_tree( REPO_ID, repo_type="dataset", path_in_repo="annotation_data/wbg_extractions" )) doc_names = [item.path.split("/")[-1] for item in items if hasattr(item, "path")] return set(doc_names) except Exception as e: print(f" Warning: Could not list HF repo: {e}") return set() def upload_docs(api, docs_to_upload, dry_run=False): """Upload _direct_judged.jsonl files to HF for docs that are missing.""" uploaded = 0 skipped = 0 for doc in docs_to_upload: repo_path = f"annotation_data/wbg_extractions/{doc['name']}/raw/{doc['name']}_direct_judged.jsonl" if dry_run: print(f" [DRY RUN] Would upload: {doc['name']}") continue try: api.upload_file( path_or_fileobj=doc["path"], path_in_repo=repo_path, repo_id=REPO_ID, repo_type="dataset", commit_message=f"Upload {doc['name']}_direct_judged.jsonl", ) print(f" ✅ Uploaded: {doc['name']}") uploaded += 1 except Exception as e: print(f" ❌ Failed {doc['name']}: {e}") skipped += 1 return uploaded, skipped def fetch_current_links(api, token): """Fetch current wbg_pdf_links.json from HF.""" url = f"https://huggingface.co/datasets/{REPO_ID}/raw/main/{LINKS_REPO_PATH}" resp = requests.get(url, headers={"Authorization": f"Bearer {token}"}) if resp.status_code == 200: return resp.json() print(f" Warning: Could not fetch existing links (HTTP {resp.status_code})") return [] def generate_updated_links(current_links, local_docs, token): """ Generate updated wbg_pdf_links.json with: - src_docname: doc_{index} - has_revalidation: true if English _direct_judged.jsonl exists - language: detected language code """ # Build lookup: index → language lang_map = {} for d in local_docs["real"]: lang_map[d["index"]] = d.get("language", "en") for d in local_docs["real_non_english"]: lang_map[d["index"]] = d.get("language", "unknown") real_english_indices = {d["index"] for d in local_docs["real"]} updated_links = [] for link in current_links: idx = link["index"] entry = { "index": idx, "src_docname": f"doc_{idx}", "landing_page_url": link.get("landing_page_url", ""), "direct_pdf_url": link.get("direct_pdf_url", ""), "status": link.get("status", "unknown"), "has_revalidation": idx in real_english_indices, "language": lang_map.get(idx, "unknown"), } updated_links.append(entry) return updated_links def upload_links(api, links, dry_run=False): """Upload the updated wbg_pdf_links.json to HF.""" content = json.dumps(links, indent=2) if dry_run: print(f" [DRY RUN] Would upload updated wbg_pdf_links.json ({len(links)} entries)") return local_path = Path(__file__).parent / "annotation_data" / "wbg_data" local_path.mkdir(parents=True, exist_ok=True) local_file = local_path / "wbg_pdf_links.json" local_file.write_text(content) print(f" 💾 Saved locally: {local_file}") api.upload_file( path_or_fileobj=str(local_file), path_in_repo=LINKS_REPO_PATH, repo_id=REPO_ID, repo_type="dataset", commit_message="Update wbg_pdf_links.json with language field, exclude non-English", ) print(f" ✅ Uploaded wbg_pdf_links.json to HF") def main(): parser = argparse.ArgumentParser(description="Prepare and upload annotation data") parser.add_argument("--dry-run", action="store_true", help="Scan only, don't upload") parser.add_argument("--links-only", action="store_true", help="Only generate/upload updated pdf_links, skip doc uploads") args = parser.parse_args() token = get_hf_token() if not token: print("❌ No HF_TOKEN found. Set it via environment variable or .env file.") sys.exit(1) api = HfApi(token=token) # 1. Scan local docs with language detection print("\n📂 Scanning local wbg_extractions (with language detection)...") local_docs = scan_local_docs() print(f" Real (English): {len(local_docs['real'])}") print(f" Real (non-English): {len(local_docs['real_non_english'])}") print(f" Dummy (skipped): {len(local_docs['dummy'])}") print(f" No file: {len(local_docs['no_file'])}") if local_docs["real_non_english"]: print("\n Non-English docs excluded:") for d in local_docs["real_non_english"]: print(f" {d['name']}: {d['language']}") if not args.links_only: # 2. Check what's already on HF print("\n🔍 Checking existing docs on HF...") existing = get_existing_hf_docs(api) print(f" Found {len(existing)} doc folders on HF") # 3. Upload only English docs not yet on HF to_upload = [d for d in local_docs["real"] if d["name"] not in existing] already_on_hf = [d for d in local_docs["real"] if d["name"] in existing] print(f"\n📤 English docs to upload: {len(to_upload)}") print(f" Already on HF: {len(already_on_hf)}") if to_upload: print("\n🚀 Uploading missing English docs...") uploaded, skipped = upload_docs(api, to_upload, dry_run=args.dry_run) if not args.dry_run: print(f" Uploaded: {uploaded}, Skipped: {skipped}") # 4. Generate updated pdf_links print("\n📋 Generating updated wbg_pdf_links.json...") current_links = fetch_current_links(api, token) updated_links = generate_updated_links(current_links, local_docs, token) with_revalidation = sum(1 for l in updated_links if l["has_revalidation"]) non_english = sum(1 for l in updated_links if l["language"] not in ("en", "unknown")) print(f" Total entries: {len(updated_links)}") print(f" English with revalidation: {with_revalidation}") print(f" Non-English (excluded): {non_english}") # 5. Upload print("\n📤 Uploading updated wbg_pdf_links.json...") upload_links(api, updated_links, dry_run=args.dry_run) print("\n✅ Done!") if __name__ == "__main__": main()