diff --git a/RUN_INSTRUCTION.md b/RUN_INSTRUCTION.md index 57dce8f..c920a12 100644 --- a/RUN_INSTRUCTION.md +++ b/RUN_INSTRUCTION.md @@ -90,17 +90,94 @@ http://localhost:8010/health - embedding server http://localhost:9621/health - LightRAG server ``` +## Build Explicit Knowledge Graph + +This step does not require LightRAG servers. It builds a deterministic graph +directly from the structured ADC JSON. + +Test on the small sample: + +```powershell +python scripts/kg/build_adc_knowledge_graph.py ` + --input data_adc_databaza/adc_scrape_2026_05_04/adc_products_structured_10.json ` + --out-dir outputs/knowledge_graph_sample +``` + +Build the full graph: + +```powershell +python scripts/kg/build_adc_knowledge_graph.py ` + --input data_adc_databaza/adc_scrape_2026_05_04/adc_products_structured.json ` + --out-dir outputs/knowledge_graph_full +``` + +Generated files: + +```text +outputs/knowledge_graph_full/adc_knowledge_graph.graphml +outputs/knowledge_graph_full/adc_knowledge_triples.jsonl +outputs/knowledge_graph_full/adc_graph_stats.json +``` + +## Ingest New ADC Data Into LightRAG + +First start the servers: + +```powershell +python start_servers.py +``` + +Keep that terminal open. In a second terminal, run a dry-run: + +```powershell +python scripts/lightrag_ingest/ingest_adc_structured.py ` + --input data_adc_databaza/adc_scrape_2026_05_04/adc_products_structured_10.json ` + --dry-run ` + --limit 5 +``` + +Upload a small clinical batch: + +```powershell +python scripts/lightrag_ingest/ingest_adc_structured.py ` + --input data_adc_databaza/adc_scrape_2026_05_04/adc_products_structured.json ` + --limit 50 ` + --resume +``` + +Check progress and LightRAG pipeline status: + +```powershell +python scripts/lightrag_ingest/ingest_adc_structured.py --status +``` + +Continue with a larger batch: + +```powershell +python scripts/lightrag_ingest/ingest_adc_structured.py ` + --input data_adc_databaza/adc_scrape_2026_05_04/adc_products_structured.json ` + --limit 200 ` + --resume +``` + +By default the script uploads only clinically useful records that contain at +least one of: contraindications, interactions, warnings, or side effects. To +upload every record, add: + +```powershell +--all-records +``` + ## Old Ingestion Pipeline -The folder `checkpoint_02_ingest/` contains an older ingestion pipeline that -loads data from: +Older local experiments used `checkpoint_02_ingest/` to load data from: ```powershell data_adc_databaza/cleaned_general_info_additional.json ``` -It is kept as a reference because it already contains working LightRAG upload -logic and progress tracking: +If that folder is present in your local workspace, treat it only as a reference +for older LightRAG upload logic: ```powershell python checkpoint_02_ingest/load_leaflets.py --count 50 @@ -108,13 +185,17 @@ python checkpoint_02_ingest/load_leaflets.py --status ``` Do not treat this as the final ingestion path for the new dataset. The next -step is to create a new ingestion script that reads: +current ingestion script for the new dataset reads: ```powershell data_adc_databaza/adc_scrape_2026_05_04/adc_products_structured.json ``` -and sends each record's `lightrag_text` to LightRAG. +and sends each record's `lightrag_text` to LightRAG: + +```powershell +python scripts/lightrag_ingest/ingest_adc_structured.py --limit 50 --resume +``` ## Query LightRAG @@ -140,13 +221,13 @@ Available query modes: Avoid querying while the document pipeline is still busy. Entity extraction can take several minutes per batch depending on the LLM API and concurrency limits. -## Reset LightRAG Storage +## Reset LightRAG Storage For a Clean Rebuild Stop the servers first, then clear generated graph/vector data: ```powershell Remove-Item -LiteralPath "c:\Users\Oleh\Desktop\Diplomova praca\lightrag\rag_storage\*" -Force -python checkpoint_02_ingest/load_leaflets.py --reset +Remove-Item -LiteralPath "c:\Users\Oleh\Desktop\Diplomova praca\outputs\lightrag_ingest\adc_structured_progress.json" -Force ``` Use this only when you intentionally want to rebuild the graph. diff --git a/scripts/lightrag_ingest/__init__.py b/scripts/lightrag_ingest/__init__.py new file mode 100644 index 0000000..88b2b09 --- /dev/null +++ b/scripts/lightrag_ingest/__init__.py @@ -0,0 +1 @@ +"""LightRAG ingestion scripts for ADC structured data.""" diff --git a/scripts/lightrag_ingest/ingest_adc_structured.py b/scripts/lightrag_ingest/ingest_adc_structured.py new file mode 100644 index 0000000..dd31318 --- /dev/null +++ b/scripts/lightrag_ingest/ingest_adc_structured.py @@ -0,0 +1,244 @@ +"""Ingest the new structured ADC scrape into a running LightRAG server. + +Input records are produced by scripts/adc_scraper/scrape_adc_product_data.py. +The script sends each record's `lightrag_text` to `/documents/text` and uses +the ADC detail URL as `file_source`, so repeated runs can detect duplicates. +""" + +from __future__ import annotations + +import argparse +import json +import sys +import time +import urllib.error +import urllib.request +from collections import Counter +from pathlib import Path +from typing import Any + + +DEFAULT_INPUT = Path("data_adc_databaza/adc_scrape_2026_05_04/adc_products_structured.json") +DEFAULT_PROGRESS = Path("outputs/lightrag_ingest/adc_structured_progress.json") +DEFAULT_LIGHTRAG_URL = "http://localhost:9621" + +if hasattr(sys.stdout, "reconfigure"): + sys.stdout.reconfigure(encoding="utf-8") + + +def load_json(path: Path) -> Any: + return json.loads(path.read_text(encoding="utf-8")) + + +def write_json(path: Path, data: Any) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") + + +def load_progress(path: Path, resume: bool) -> dict[str, Any]: + if resume and path.exists(): + return load_json(path) + return {"done": [], "failed": [], "skipped": Counter(), "last_track_ids": []} + + +def save_progress(path: Path, progress: dict[str, Any]) -> None: + payload = { + "done": sorted(set(progress.get("done", []))), + "failed": sorted(set(progress.get("failed", []))), + "skipped": dict(progress.get("skipped", {})), + "last_track_ids": progress.get("last_track_ids", [])[-50:], + } + write_json(path, payload) + + +def healthcheck(base_url: str) -> dict[str, Any]: + with urllib.request.urlopen(f"{base_url}/health", timeout=10) as response: + return json.loads(response.read().decode("utf-8")) + + +def pipeline_status(base_url: str) -> dict[str, Any]: + with urllib.request.urlopen(f"{base_url}/documents/pipeline_status", timeout=10) as response: + return json.loads(response.read().decode("utf-8")) + + +def post_text(base_url: str, text: str, file_source: str, timeout: int) -> dict[str, Any]: + payload = json.dumps({"text": text, "file_source": file_source}, ensure_ascii=False).encode("utf-8") + request = urllib.request.Request( + f"{base_url}/documents/text", + data=payload, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(request, timeout=timeout) as response: + return json.loads(response.read().decode("utf-8")) + + +def product_name(record: dict[str, Any]) -> str: + product = record.get("product") or {} + graph_hints = record.get("graph_hints") or {} + return str(graph_hints.get("drug") or product.get("name") or product.get("slug") or "unknown") + + +def source_id(record: dict[str, Any]) -> str: + urls = record.get("urls") or {} + product = record.get("product") or {} + return str(urls.get("detail") or product.get("product_id") or product_name(record)) + + +def is_clinical(record: dict[str, Any]) -> bool: + hints = record.get("graph_hints") or {} + return any( + hints.get(key) + for key in ( + "contraindications_text", + "interactions_text", + "warnings_text", + "side_effects_text", + ) + ) + + +def iter_selected_records( + records: list[dict[str, Any]], + progress: dict[str, Any], + clinical_only: bool, +) -> tuple[list[dict[str, Any]], Counter]: + done = set(progress.get("done", [])) + failed = set(progress.get("failed", [])) + skipped: Counter = Counter() + selected: list[dict[str, Any]] = [] + + for record in records: + source = source_id(record) + if source in done or source in failed: + skipped["already_processed"] += 1 + continue + if clinical_only and not is_clinical(record): + skipped["not_clinical"] += 1 + continue + if not str(record.get("lightrag_text") or "").strip(): + skipped["missing_lightrag_text"] += 1 + continue + selected.append(record) + + return selected, skipped + + +def show_status(base_url: str, progress_path: Path) -> None: + if progress_path.exists(): + progress = load_json(progress_path) + print(f"Progress file: {progress_path}") + print(f"Done: {len(progress.get('done', []))}") + print(f"Failed: {len(progress.get('failed', []))}") + print(f"Skipped: {progress.get('skipped', {})}") + else: + print(f"No progress file found: {progress_path}") + + try: + status = pipeline_status(base_url) + except Exception as exc: + print(f"LightRAG pipeline status unavailable: {exc}") + return + + print(f"Pipeline busy: {status.get('busy')}") + print(f"Latest message: {status.get('latest_message', '')}") + + +def ingest(args: argparse.Namespace) -> None: + records = load_json(args.input) + if not isinstance(records, list): + raise SystemExit(f"Input must be a JSON list: {args.input}") + + progress = load_progress(args.progress, resume=args.resume) + selected, skipped = iter_selected_records(records, progress, clinical_only=not args.all_records) + progress["skipped"] = Counter(progress.get("skipped", {})) + skipped + + if args.limit is not None: + selected = selected[: args.limit] + + print(f"Input records: {len(records)}") + print(f"Selected for upload: {len(selected)}") + print(f"Skipped now: {dict(skipped)}") + print(f"Progress: done={len(progress.get('done', []))}, failed={len(progress.get('failed', []))}") + + if args.dry_run: + for record in selected[:10]: + print(f"- {product_name(record)[:90]} | {source_id(record)}") + save_progress(args.progress, progress) + return + + health = healthcheck(args.url) + configuration = health.get("configuration") or {} + print(f"LightRAG OK: {args.url}") + if configuration: + print(f"LLM: {configuration.get('llm_model')} | Embedding: {configuration.get('embedding_model')}") + + done = set(progress.get("done", [])) + failed = set(progress.get("failed", [])) + last_track_ids = list(progress.get("last_track_ids", [])) + + for index, record in enumerate(selected, start=1): + source = source_id(record) + text = str(record.get("lightrag_text") or "").strip() + name = product_name(record) + + try: + result = post_text(args.url, text=text, file_source=source, timeout=args.timeout) + status = result.get("status", "unknown") + track_id = result.get("track_id") + if track_id: + last_track_ids.append(track_id) + if status in {"success", "duplicated"}: + done.add(source) + mark = "OK" + else: + failed.add(source) + mark = "!!" + print(f"[{index:5d}/{len(selected):5d}] {mark} {status:<10} {name[:80]}") + except urllib.error.HTTPError as exc: + failed.add(source) + detail = exc.read().decode("utf-8", errors="replace")[:300] + print(f"[{index:5d}/{len(selected):5d}] HTTP {exc.code} {name[:80]} | {detail}") + except Exception as exc: + failed.add(source) + print(f"[{index:5d}/{len(selected):5d}] ERROR {name[:80]} | {exc}") + + progress["done"] = sorted(done) + progress["failed"] = sorted(failed) + progress["last_track_ids"] = last_track_ids[-50:] + + if index % args.save_every == 0: + save_progress(args.progress, progress) + print(f"Saved progress: done={len(done)}, failed={len(failed)}") + + time.sleep(args.delay) + + save_progress(args.progress, progress) + print(f"Finished upload loop. Done={len(done)}, failed={len(failed)}") + print("LightRAG will continue extracting graph entities and relations in the background.") + + +def main() -> None: + parser = argparse.ArgumentParser(description="Ingest structured ADC data into LightRAG.") + parser.add_argument("--input", type=Path, default=DEFAULT_INPUT) + parser.add_argument("--progress", type=Path, default=DEFAULT_PROGRESS) + parser.add_argument("--url", default=DEFAULT_LIGHTRAG_URL) + parser.add_argument("--limit", type=int, default=50) + parser.add_argument("--delay", type=float, default=0.3) + parser.add_argument("--timeout", type=int, default=60) + parser.add_argument("--save-every", type=int, default=10) + parser.add_argument("--resume", action="store_true", help="Resume from the progress file.") + parser.add_argument("--all-records", action="store_true", help="Upload all records, not only clinical ones.") + parser.add_argument("--dry-run", action="store_true", help="Show selected records without uploading.") + parser.add_argument("--status", action="store_true", help="Show local progress and LightRAG pipeline status.") + args = parser.parse_args() + + if args.status: + show_status(args.url, args.progress) + return + + ingest(args) + + +if __name__ == "__main__": + main()