data-use-annotation / prepare_data.py
rafmacalaba's picture
feat: add language detection to prepare_data.py, exclude non-English docs
85eb22c
#!/usr/bin/env python3
"""
prepare_data.py
Scans local wbg_extractions, identifies docs with real _direct_judged.jsonl
(not dummy), detects language, uploads English docs to HF, and generates/uploads
an updated wbg_pdf_links.json.
Usage:
# Dry run (scan only, no uploads):
uv run --with huggingface_hub,requests,langdetect python3 prepare_data.py --dry-run
# Upload missing docs + generate new pdf_links:
uv run --with huggingface_hub,requests,langdetect python3 prepare_data.py
# Only generate pdf_links without uploading docs:
uv run --with huggingface_hub,requests,langdetect python3 prepare_data.py --links-only
Requires: huggingface_hub, requests, langdetect
"""
import argparse
import json
import os
import sys
import requests
from pathlib import Path
from huggingface_hub import HfApi
from langdetect import detect, LangDetectException
# ─── Configuration ───────────────────────────────
HF_TOKEN = os.environ.get("HF_TOKEN")
REPO_ID = "ai4data/annotation_data"
LOCAL_BASE = Path(__file__).parent / "annotation_data" / "wbg_extractions"
LINKS_REPO_PATH = "annotation_data/wbg_data/wbg_pdf_links.json"
def get_hf_token():
"""Get HF token from env, .env file, or cached token."""
if HF_TOKEN:
return HF_TOKEN
env_path = Path(__file__).parent / ".env"
if env_path.exists():
for line in env_path.read_text().splitlines():
if line.startswith("HF_TOKEN="):
return line.split("=", 1)[1].strip()
cached = Path.home() / ".cache" / "huggingface" / "token"
if cached.exists():
return cached.read_text().strip()
return None
def detect_language(doc_path):
"""
Detect language of a document by sampling pages 2-5 (skipping first page
which often contains abbreviation tables / currency equivalents).
Returns ISO 639-1 language code (e.g. 'en', 'fr', 'ar').
"""
try:
data = json.loads(Path(doc_path).read_text())
# Sample from pages 2-5 to avoid abbreviation-heavy first pages
texts = " ".join(p.get("input_text", "")[:500] for p in data[1:5])
if len(texts.strip()) < 50:
# Fallback to first 3 pages if later pages are empty
texts = " ".join(p.get("input_text", "")[:500] for p in data[:3])
return detect(texts)
except (LangDetectException, json.JSONDecodeError, FileNotFoundError):
return "unknown"
def scan_local_docs():
"""Scan local wbg_extractions and classify docs with language detection."""
docs = sorted(
[d for d in os.listdir(LOCAL_BASE) if d.startswith("doc_")],
key=lambda x: int(x.split("_")[1]),
)
results = {"real": [], "real_non_english": [], "dummy": [], "no_file": []}
for doc in docs:
idx = int(doc.split("_")[1])
raw_dir = LOCAL_BASE / doc / "raw"
real_file = raw_dir / f"{doc}_direct_judged.jsonl"
dummy_file = raw_dir / f"{doc}_dummy_direct_judged.jsonl"
if real_file.exists():
lang = detect_language(str(real_file))
entry = {"name": doc, "index": idx, "path": str(real_file), "language": lang}
if lang == "en":
results["real"].append(entry)
else:
results["real_non_english"].append(entry)
elif dummy_file.exists():
results["dummy"].append({"name": doc, "index": idx})
else:
results["no_file"].append({"name": doc, "index": idx})
return results
def get_existing_hf_docs(api):
"""Check which docs already have _direct_judged.jsonl on HF."""
try:
items = list(api.list_repo_tree(
REPO_ID, repo_type="dataset",
path_in_repo="annotation_data/wbg_extractions"
))
doc_names = [item.path.split("/")[-1] for item in items if hasattr(item, "path")]
return set(doc_names)
except Exception as e:
print(f" Warning: Could not list HF repo: {e}")
return set()
def upload_docs(api, docs_to_upload, dry_run=False):
"""Upload _direct_judged.jsonl files to HF for docs that are missing."""
uploaded = 0
skipped = 0
for doc in docs_to_upload:
repo_path = f"annotation_data/wbg_extractions/{doc['name']}/raw/{doc['name']}_direct_judged.jsonl"
if dry_run:
print(f" [DRY RUN] Would upload: {doc['name']}")
continue
try:
api.upload_file(
path_or_fileobj=doc["path"],
path_in_repo=repo_path,
repo_id=REPO_ID,
repo_type="dataset",
commit_message=f"Upload {doc['name']}_direct_judged.jsonl",
)
print(f" βœ… Uploaded: {doc['name']}")
uploaded += 1
except Exception as e:
print(f" ❌ Failed {doc['name']}: {e}")
skipped += 1
return uploaded, skipped
def fetch_current_links(api, token):
"""Fetch current wbg_pdf_links.json from HF."""
url = f"https://huggingface.co/datasets/{REPO_ID}/raw/main/{LINKS_REPO_PATH}"
resp = requests.get(url, headers={"Authorization": f"Bearer {token}"})
if resp.status_code == 200:
return resp.json()
print(f" Warning: Could not fetch existing links (HTTP {resp.status_code})")
return []
def generate_updated_links(current_links, local_docs, token):
"""
Generate updated wbg_pdf_links.json with:
- src_docname: doc_{index}
- has_revalidation: true if English _direct_judged.jsonl exists
- language: detected language code
"""
# Build lookup: index β†’ language
lang_map = {}
for d in local_docs["real"]:
lang_map[d["index"]] = d.get("language", "en")
for d in local_docs["real_non_english"]:
lang_map[d["index"]] = d.get("language", "unknown")
real_english_indices = {d["index"] for d in local_docs["real"]}
updated_links = []
for link in current_links:
idx = link["index"]
entry = {
"index": idx,
"src_docname": f"doc_{idx}",
"landing_page_url": link.get("landing_page_url", ""),
"direct_pdf_url": link.get("direct_pdf_url", ""),
"status": link.get("status", "unknown"),
"has_revalidation": idx in real_english_indices,
"language": lang_map.get(idx, "unknown"),
}
updated_links.append(entry)
return updated_links
def upload_links(api, links, dry_run=False):
"""Upload the updated wbg_pdf_links.json to HF."""
content = json.dumps(links, indent=2)
if dry_run:
print(f" [DRY RUN] Would upload updated wbg_pdf_links.json ({len(links)} entries)")
return
local_path = Path(__file__).parent / "annotation_data" / "wbg_data"
local_path.mkdir(parents=True, exist_ok=True)
local_file = local_path / "wbg_pdf_links.json"
local_file.write_text(content)
print(f" πŸ’Ύ Saved locally: {local_file}")
api.upload_file(
path_or_fileobj=str(local_file),
path_in_repo=LINKS_REPO_PATH,
repo_id=REPO_ID,
repo_type="dataset",
commit_message="Update wbg_pdf_links.json with language field, exclude non-English",
)
print(f" βœ… Uploaded wbg_pdf_links.json to HF")
def main():
parser = argparse.ArgumentParser(description="Prepare and upload annotation data")
parser.add_argument("--dry-run", action="store_true", help="Scan only, don't upload")
parser.add_argument("--links-only", action="store_true",
help="Only generate/upload updated pdf_links, skip doc uploads")
args = parser.parse_args()
token = get_hf_token()
if not token:
print("❌ No HF_TOKEN found. Set it via environment variable or .env file.")
sys.exit(1)
api = HfApi(token=token)
# 1. Scan local docs with language detection
print("\nπŸ“‚ Scanning local wbg_extractions (with language detection)...")
local_docs = scan_local_docs()
print(f" Real (English): {len(local_docs['real'])}")
print(f" Real (non-English): {len(local_docs['real_non_english'])}")
print(f" Dummy (skipped): {len(local_docs['dummy'])}")
print(f" No file: {len(local_docs['no_file'])}")
if local_docs["real_non_english"]:
print("\n Non-English docs excluded:")
for d in local_docs["real_non_english"]:
print(f" {d['name']}: {d['language']}")
if not args.links_only:
# 2. Check what's already on HF
print("\nπŸ” Checking existing docs on HF...")
existing = get_existing_hf_docs(api)
print(f" Found {len(existing)} doc folders on HF")
# 3. Upload only English docs not yet on HF
to_upload = [d for d in local_docs["real"] if d["name"] not in existing]
already_on_hf = [d for d in local_docs["real"] if d["name"] in existing]
print(f"\nπŸ“€ English docs to upload: {len(to_upload)}")
print(f" Already on HF: {len(already_on_hf)}")
if to_upload:
print("\nπŸš€ Uploading missing English docs...")
uploaded, skipped = upload_docs(api, to_upload, dry_run=args.dry_run)
if not args.dry_run:
print(f" Uploaded: {uploaded}, Skipped: {skipped}")
# 4. Generate updated pdf_links
print("\nπŸ“‹ Generating updated wbg_pdf_links.json...")
current_links = fetch_current_links(api, token)
updated_links = generate_updated_links(current_links, local_docs, token)
with_revalidation = sum(1 for l in updated_links if l["has_revalidation"])
non_english = sum(1 for l in updated_links if l["language"] not in ("en", "unknown"))
print(f" Total entries: {len(updated_links)}")
print(f" English with revalidation: {with_revalidation}")
print(f" Non-English (excluded): {non_english}")
# 5. Upload
print("\nπŸ“€ Uploading updated wbg_pdf_links.json...")
upload_links(api, updated_links, dry_run=args.dry_run)
print("\nβœ… Done!")
if __name__ == "__main__":
main()