"""Ingest the new structured ADC scrape into a running LightRAG server. Input records are produced by scripts/adc_scraper/scrape_adc_product_data.py. The script sends each record's `lightrag_text` to `/documents/text` and uses the ADC detail URL as `file_source`, so repeated runs can detect duplicates. """ from __future__ import annotations import argparse import json import sys import time import urllib.error import urllib.request from collections import Counter from pathlib import Path from typing import Any DEFAULT_INPUT = Path("data_adc_databaza/adc_scrape_2026_05_04/adc_products_structured.json") DEFAULT_PROGRESS = Path("outputs/lightrag_ingest/adc_structured_progress.json") DEFAULT_LIGHTRAG_URL = "http://localhost:9621" if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8") def load_json(path: Path) -> Any: return json.loads(path.read_text(encoding="utf-8")) def write_json(path: Path, data: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") def load_progress(path: Path, resume: bool) -> dict[str, Any]: if resume and path.exists(): return load_json(path) return {"done": [], "failed": [], "skipped": Counter(), "last_track_ids": []} def save_progress(path: Path, progress: dict[str, Any]) -> None: payload = { "done": sorted(set(progress.get("done", []))), "failed": sorted(set(progress.get("failed", []))), "skipped": dict(progress.get("skipped", {})), "last_track_ids": progress.get("last_track_ids", [])[-50:], } write_json(path, payload) def healthcheck(base_url: str) -> dict[str, Any]: with urllib.request.urlopen(f"{base_url}/health", timeout=10) as response: return json.loads(response.read().decode("utf-8")) def pipeline_status(base_url: str) -> dict[str, Any]: with urllib.request.urlopen(f"{base_url}/documents/pipeline_status", timeout=10) as response: return json.loads(response.read().decode("utf-8")) def post_text(base_url: str, text: str, file_source: str, timeout: int) -> dict[str, Any]: payload = json.dumps({"text": text, "file_source": file_source}, ensure_ascii=False).encode("utf-8") request = urllib.request.Request( f"{base_url}/documents/text", data=payload, headers={"Content-Type": "application/json"}, method="POST", ) with urllib.request.urlopen(request, timeout=timeout) as response: return json.loads(response.read().decode("utf-8")) def product_name(record: dict[str, Any]) -> str: product = record.get("product") or {} graph_hints = record.get("graph_hints") or {} return str(graph_hints.get("drug") or product.get("name") or product.get("slug") or "unknown") def source_id(record: dict[str, Any]) -> str: urls = record.get("urls") or {} product = record.get("product") or {} return str(urls.get("detail") or product.get("product_id") or product_name(record)) def is_clinical(record: dict[str, Any]) -> bool: hints = record.get("graph_hints") or {} return any( hints.get(key) for key in ( "contraindications_text", "interactions_text", "warnings_text", "side_effects_text", ) ) def iter_selected_records( records: list[dict[str, Any]], progress: dict[str, Any], clinical_only: bool, ) -> tuple[list[dict[str, Any]], Counter]: done = set(progress.get("done", [])) failed = set(progress.get("failed", [])) skipped: Counter = Counter() selected: list[dict[str, Any]] = [] for record in records: source = source_id(record) if source in done or source in failed: skipped["already_processed"] += 1 continue if clinical_only and not is_clinical(record): skipped["not_clinical"] += 1 continue if not str(record.get("lightrag_text") or "").strip(): skipped["missing_lightrag_text"] += 1 continue selected.append(record) return selected, skipped def show_status(base_url: str, progress_path: Path) -> None: if progress_path.exists(): progress = load_json(progress_path) print(f"Progress file: {progress_path}") print(f"Done: {len(progress.get('done', []))}") print(f"Failed: {len(progress.get('failed', []))}") print(f"Skipped: {progress.get('skipped', {})}") else: print(f"No progress file found: {progress_path}") try: status = pipeline_status(base_url) except Exception as exc: print(f"LightRAG pipeline status unavailable: {exc}") return print(f"Pipeline busy: {status.get('busy')}") print(f"Latest message: {status.get('latest_message', '')}") def ingest(args: argparse.Namespace) -> None: records = load_json(args.input) if not isinstance(records, list): raise SystemExit(f"Input must be a JSON list: {args.input}") progress = load_progress(args.progress, resume=args.resume) selected, skipped = iter_selected_records(records, progress, clinical_only=not args.all_records) progress["skipped"] = Counter(progress.get("skipped", {})) + skipped if args.limit is not None: selected = selected[: args.limit] print(f"Input records: {len(records)}") print(f"Selected for upload: {len(selected)}") print(f"Skipped now: {dict(skipped)}") print(f"Progress: done={len(progress.get('done', []))}, failed={len(progress.get('failed', []))}") if args.dry_run: for record in selected[:10]: print(f"- {product_name(record)[:90]} | {source_id(record)}") save_progress(args.progress, progress) return health = healthcheck(args.url) configuration = health.get("configuration") or {} print(f"LightRAG OK: {args.url}") if configuration: print(f"LLM: {configuration.get('llm_model')} | Embedding: {configuration.get('embedding_model')}") done = set(progress.get("done", [])) failed = set(progress.get("failed", [])) last_track_ids = list(progress.get("last_track_ids", [])) for index, record in enumerate(selected, start=1): source = source_id(record) text = str(record.get("lightrag_text") or "").strip() name = product_name(record) try: result = post_text(args.url, text=text, file_source=source, timeout=args.timeout) status = result.get("status", "unknown") track_id = result.get("track_id") if track_id: last_track_ids.append(track_id) if status in {"success", "duplicated"}: done.add(source) mark = "OK" else: failed.add(source) mark = "!!" print(f"[{index:5d}/{len(selected):5d}] {mark} {status:<10} {name[:80]}") except urllib.error.HTTPError as exc: failed.add(source) detail = exc.read().decode("utf-8", errors="replace")[:300] print(f"[{index:5d}/{len(selected):5d}] HTTP {exc.code} {name[:80]} | {detail}") except Exception as exc: failed.add(source) print(f"[{index:5d}/{len(selected):5d}] ERROR {name[:80]} | {exc}") progress["done"] = sorted(done) progress["failed"] = sorted(failed) progress["last_track_ids"] = last_track_ids[-50:] if index % args.save_every == 0: save_progress(args.progress, progress) print(f"Saved progress: done={len(done)}, failed={len(failed)}") time.sleep(args.delay) save_progress(args.progress, progress) print(f"Finished upload loop. Done={len(done)}, failed={len(failed)}") print("LightRAG will continue extracting graph entities and relations in the background.") def main() -> None: parser = argparse.ArgumentParser(description="Ingest structured ADC data into LightRAG.") parser.add_argument("--input", type=Path, default=DEFAULT_INPUT) parser.add_argument("--progress", type=Path, default=DEFAULT_PROGRESS) parser.add_argument("--url", default=DEFAULT_LIGHTRAG_URL) parser.add_argument("--limit", type=int, default=50) parser.add_argument("--delay", type=float, default=0.3) parser.add_argument("--timeout", type=int, default=60) parser.add_argument("--save-every", type=int, default=10) parser.add_argument("--resume", action="store_true", help="Resume from the progress file.") parser.add_argument("--all-records", action="store_true", help="Upload all records, not only clinical ones.") parser.add_argument("--dry-run", action="store_true", help="Show selected records without uploading.") parser.add_argument("--status", action="store_true", help="Show local progress and LightRAG pipeline status.") args = parser.parse_args() if args.status: show_status(args.url, args.progress) return ingest(args) if __name__ == "__main__": main()