Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| prepare_data.py | |
| Scans local wbg_extractions, identifies docs with real _direct_judged.jsonl | |
| (not dummy), detects language, uploads English docs to HF, and generates/uploads | |
| an updated wbg_pdf_links.json. | |
| Usage: | |
| # Dry run (scan only, no uploads): | |
| uv run --with huggingface_hub,requests,langdetect python3 prepare_data.py --dry-run | |
| # Upload missing docs + generate new pdf_links: | |
| uv run --with huggingface_hub,requests,langdetect python3 prepare_data.py | |
| # Only generate pdf_links without uploading docs: | |
| uv run --with huggingface_hub,requests,langdetect python3 prepare_data.py --links-only | |
| Requires: huggingface_hub, requests, langdetect | |
| """ | |
| import argparse | |
| import json | |
| import os | |
| import sys | |
| import requests | |
| from pathlib import Path | |
| from huggingface_hub import HfApi | |
| from langdetect import detect, LangDetectException | |
| # βββ Configuration βββββββββββββββββββββββββββββββ | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| REPO_ID = "ai4data/annotation_data" | |
| LOCAL_BASE = Path(__file__).parent / "annotation_data" / "wbg_extractions" | |
| LINKS_REPO_PATH = "annotation_data/wbg_data/wbg_pdf_links.json" | |
| def get_hf_token(): | |
| """Get HF token from env, .env file, or cached token.""" | |
| if HF_TOKEN: | |
| return HF_TOKEN | |
| env_path = Path(__file__).parent / ".env" | |
| if env_path.exists(): | |
| for line in env_path.read_text().splitlines(): | |
| if line.startswith("HF_TOKEN="): | |
| return line.split("=", 1)[1].strip() | |
| cached = Path.home() / ".cache" / "huggingface" / "token" | |
| if cached.exists(): | |
| return cached.read_text().strip() | |
| return None | |
| def detect_language(doc_path): | |
| """ | |
| Detect language of a document by sampling pages 2-5 (skipping first page | |
| which often contains abbreviation tables / currency equivalents). | |
| Returns ISO 639-1 language code (e.g. 'en', 'fr', 'ar'). | |
| """ | |
| try: | |
| data = json.loads(Path(doc_path).read_text()) | |
| # Sample from pages 2-5 to avoid abbreviation-heavy first pages | |
| texts = " ".join(p.get("input_text", "")[:500] for p in data[1:5]) | |
| if len(texts.strip()) < 50: | |
| # Fallback to first 3 pages if later pages are empty | |
| texts = " ".join(p.get("input_text", "")[:500] for p in data[:3]) | |
| return detect(texts) | |
| except (LangDetectException, json.JSONDecodeError, FileNotFoundError): | |
| return "unknown" | |
| def scan_local_docs(): | |
| """Scan local wbg_extractions and classify docs with language detection.""" | |
| docs = sorted( | |
| [d for d in os.listdir(LOCAL_BASE) if d.startswith("doc_")], | |
| key=lambda x: int(x.split("_")[1]), | |
| ) | |
| results = {"real": [], "real_non_english": [], "dummy": [], "no_file": []} | |
| for doc in docs: | |
| idx = int(doc.split("_")[1]) | |
| raw_dir = LOCAL_BASE / doc / "raw" | |
| real_file = raw_dir / f"{doc}_direct_judged.jsonl" | |
| dummy_file = raw_dir / f"{doc}_dummy_direct_judged.jsonl" | |
| if real_file.exists(): | |
| lang = detect_language(str(real_file)) | |
| entry = {"name": doc, "index": idx, "path": str(real_file), "language": lang} | |
| if lang == "en": | |
| results["real"].append(entry) | |
| else: | |
| results["real_non_english"].append(entry) | |
| elif dummy_file.exists(): | |
| results["dummy"].append({"name": doc, "index": idx}) | |
| else: | |
| results["no_file"].append({"name": doc, "index": idx}) | |
| return results | |
| def get_existing_hf_docs(api): | |
| """Check which docs already have _direct_judged.jsonl on HF.""" | |
| try: | |
| items = list(api.list_repo_tree( | |
| REPO_ID, repo_type="dataset", | |
| path_in_repo="annotation_data/wbg_extractions" | |
| )) | |
| doc_names = [item.path.split("/")[-1] for item in items if hasattr(item, "path")] | |
| return set(doc_names) | |
| except Exception as e: | |
| print(f" Warning: Could not list HF repo: {e}") | |
| return set() | |
| def upload_docs(api, docs_to_upload, dry_run=False): | |
| """Upload _direct_judged.jsonl files to HF for docs that are missing.""" | |
| uploaded = 0 | |
| skipped = 0 | |
| for doc in docs_to_upload: | |
| repo_path = f"annotation_data/wbg_extractions/{doc['name']}/raw/{doc['name']}_direct_judged.jsonl" | |
| if dry_run: | |
| print(f" [DRY RUN] Would upload: {doc['name']}") | |
| continue | |
| try: | |
| api.upload_file( | |
| path_or_fileobj=doc["path"], | |
| path_in_repo=repo_path, | |
| repo_id=REPO_ID, | |
| repo_type="dataset", | |
| commit_message=f"Upload {doc['name']}_direct_judged.jsonl", | |
| ) | |
| print(f" β Uploaded: {doc['name']}") | |
| uploaded += 1 | |
| except Exception as e: | |
| print(f" β Failed {doc['name']}: {e}") | |
| skipped += 1 | |
| return uploaded, skipped | |
| def fetch_current_links(api, token): | |
| """Fetch current wbg_pdf_links.json from HF.""" | |
| url = f"https://huggingface.co/datasets/{REPO_ID}/raw/main/{LINKS_REPO_PATH}" | |
| resp = requests.get(url, headers={"Authorization": f"Bearer {token}"}) | |
| if resp.status_code == 200: | |
| return resp.json() | |
| print(f" Warning: Could not fetch existing links (HTTP {resp.status_code})") | |
| return [] | |
| def generate_updated_links(current_links, local_docs, token): | |
| """ | |
| Generate updated wbg_pdf_links.json with: | |
| - src_docname: doc_{index} | |
| - has_revalidation: true if English _direct_judged.jsonl exists | |
| - language: detected language code | |
| """ | |
| # Build lookup: index β language | |
| lang_map = {} | |
| for d in local_docs["real"]: | |
| lang_map[d["index"]] = d.get("language", "en") | |
| for d in local_docs["real_non_english"]: | |
| lang_map[d["index"]] = d.get("language", "unknown") | |
| real_english_indices = {d["index"] for d in local_docs["real"]} | |
| updated_links = [] | |
| for link in current_links: | |
| idx = link["index"] | |
| entry = { | |
| "index": idx, | |
| "src_docname": f"doc_{idx}", | |
| "landing_page_url": link.get("landing_page_url", ""), | |
| "direct_pdf_url": link.get("direct_pdf_url", ""), | |
| "status": link.get("status", "unknown"), | |
| "has_revalidation": idx in real_english_indices, | |
| "language": lang_map.get(idx, "unknown"), | |
| } | |
| updated_links.append(entry) | |
| return updated_links | |
| def upload_links(api, links, dry_run=False): | |
| """Upload the updated wbg_pdf_links.json to HF.""" | |
| content = json.dumps(links, indent=2) | |
| if dry_run: | |
| print(f" [DRY RUN] Would upload updated wbg_pdf_links.json ({len(links)} entries)") | |
| return | |
| local_path = Path(__file__).parent / "annotation_data" / "wbg_data" | |
| local_path.mkdir(parents=True, exist_ok=True) | |
| local_file = local_path / "wbg_pdf_links.json" | |
| local_file.write_text(content) | |
| print(f" πΎ Saved locally: {local_file}") | |
| api.upload_file( | |
| path_or_fileobj=str(local_file), | |
| path_in_repo=LINKS_REPO_PATH, | |
| repo_id=REPO_ID, | |
| repo_type="dataset", | |
| commit_message="Update wbg_pdf_links.json with language field, exclude non-English", | |
| ) | |
| print(f" β Uploaded wbg_pdf_links.json to HF") | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Prepare and upload annotation data") | |
| parser.add_argument("--dry-run", action="store_true", help="Scan only, don't upload") | |
| parser.add_argument("--links-only", action="store_true", | |
| help="Only generate/upload updated pdf_links, skip doc uploads") | |
| args = parser.parse_args() | |
| token = get_hf_token() | |
| if not token: | |
| print("β No HF_TOKEN found. Set it via environment variable or .env file.") | |
| sys.exit(1) | |
| api = HfApi(token=token) | |
| # 1. Scan local docs with language detection | |
| print("\nπ Scanning local wbg_extractions (with language detection)...") | |
| local_docs = scan_local_docs() | |
| print(f" Real (English): {len(local_docs['real'])}") | |
| print(f" Real (non-English): {len(local_docs['real_non_english'])}") | |
| print(f" Dummy (skipped): {len(local_docs['dummy'])}") | |
| print(f" No file: {len(local_docs['no_file'])}") | |
| if local_docs["real_non_english"]: | |
| print("\n Non-English docs excluded:") | |
| for d in local_docs["real_non_english"]: | |
| print(f" {d['name']}: {d['language']}") | |
| if not args.links_only: | |
| # 2. Check what's already on HF | |
| print("\nπ Checking existing docs on HF...") | |
| existing = get_existing_hf_docs(api) | |
| print(f" Found {len(existing)} doc folders on HF") | |
| # 3. Upload only English docs not yet on HF | |
| to_upload = [d for d in local_docs["real"] if d["name"] not in existing] | |
| already_on_hf = [d for d in local_docs["real"] if d["name"] in existing] | |
| print(f"\nπ€ English docs to upload: {len(to_upload)}") | |
| print(f" Already on HF: {len(already_on_hf)}") | |
| if to_upload: | |
| print("\nπ Uploading missing English docs...") | |
| uploaded, skipped = upload_docs(api, to_upload, dry_run=args.dry_run) | |
| if not args.dry_run: | |
| print(f" Uploaded: {uploaded}, Skipped: {skipped}") | |
| # 4. Generate updated pdf_links | |
| print("\nπ Generating updated wbg_pdf_links.json...") | |
| current_links = fetch_current_links(api, token) | |
| updated_links = generate_updated_links(current_links, local_docs, token) | |
| with_revalidation = sum(1 for l in updated_links if l["has_revalidation"]) | |
| non_english = sum(1 for l in updated_links if l["language"] not in ("en", "unknown")) | |
| print(f" Total entries: {len(updated_links)}") | |
| print(f" English with revalidation: {with_revalidation}") | |
| print(f" Non-English (excluded): {non_english}") | |
| # 5. Upload | |
| print("\nπ€ Uploading updated wbg_pdf_links.json...") | |
| upload_links(api, updated_links, dry_run=args.dry_run) | |
| print("\nβ Done!") | |
| if __name__ == "__main__": | |
| main() | |