From 141437e968878664625a1acd9bb701e141b45a35 Mon Sep 17 00:00:00 2001 From: Oleh Poiasnik Date: Thu, 14 May 2026 12:32:49 +0200 Subject: [PATCH] Add ADC knowledge graph builder --- .gitignore | 1 + scripts/adc_scraper/requirements.txt | 1 + scripts/kg/__init__.py | 1 + scripts/kg/build_adc_knowledge_graph.py | 334 ++++++++++++++++++++++++ 4 files changed, 337 insertions(+) create mode 100644 scripts/kg/__init__.py create mode 100644 scripts/kg/build_adc_knowledge_graph.py diff --git a/.gitignore b/.gitignore index 90ec8ea..cd79b0c 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,7 @@ detail-product.html # Generated graph artifacts *.graphml +outputs/ # Logs *.log diff --git a/scripts/adc_scraper/requirements.txt b/scripts/adc_scraper/requirements.txt index 36f0ee0..4401529 100644 --- a/scripts/adc_scraper/requirements.txt +++ b/scripts/adc_scraper/requirements.txt @@ -3,3 +3,4 @@ beautifulsoup4>=4.12.0 lxml>=5.0.0 tqdm>=4.66.0 playwright>=1.45.0 +networkx>=3.2.0 diff --git a/scripts/kg/__init__.py b/scripts/kg/__init__.py new file mode 100644 index 0000000..9d8ffc5 --- /dev/null +++ b/scripts/kg/__init__.py @@ -0,0 +1 @@ +"""Knowledge graph construction utilities for ADC data.""" diff --git a/scripts/kg/build_adc_knowledge_graph.py b/scripts/kg/build_adc_knowledge_graph.py new file mode 100644 index 0000000..2b56713 --- /dev/null +++ b/scripts/kg/build_adc_knowledge_graph.py @@ -0,0 +1,334 @@ +"""Build an explicit ADC knowledge graph from structured scraper output. + +The script reads records produced by scripts/adc_scraper/scrape_adc_product_data.py +and creates: + - GraphML for graph tools / NetworkX / Gephi + - JSONL triples for inspection and downstream processing + - JSON statistics for quick validation +""" + +from __future__ import annotations + +import argparse +import hashlib +import json +import re +from collections import Counter +from pathlib import Path +from typing import Any + +import networkx as nx + + +DEFAULT_INPUT = Path("data_adc_databaza/adc_scrape_2026_05_04/adc_products_structured.json") +DEFAULT_OUT_DIR = Path("outputs/knowledge_graph") + + +def clean_text(value: Any) -> str: + if value is None: + return "" + text = str(value).replace("\xa0", " ") + text = re.sub(r"\s+", " ", text) + return text.strip() + + +def slugify(value: str, fallback: str = "unknown") -> str: + value = clean_text(value).lower() + value = re.sub(r"[^a-z0-9]+", "_", value) + value = value.strip("_") + return value or fallback + + +def short_hash(value: str, length: int = 12) -> str: + return hashlib.sha1(value.encode("utf-8")).hexdigest()[:length] + + +def node_id(kind: str, value: str) -> str: + cleaned = clean_text(value) + return f"{kind}:{slugify(cleaned)[:80]}:{short_hash(cleaned)}" + + +def text_node_id(kind: str, drug_id: str, text: str) -> str: + return f"{kind}:{short_hash(drug_id + '|' + clean_text(text), 16)}" + + +def add_node(graph: nx.MultiDiGraph, node: str, label: str, kind: str, **attrs: Any) -> None: + payload = { + "label": clean_text(label), + "kind": kind, + } + for key, value in attrs.items(): + if value is not None: + payload[key] = clean_text(value) if not isinstance(value, (int, float)) else value + + if graph.has_node(node): + graph.nodes[node].update({k: v for k, v in payload.items() if v}) + else: + graph.add_node(node, **payload) + + +def add_edge( + graph: nx.MultiDiGraph, + triples: list[dict[str, str]], + source: str, + relation: str, + target: str, + source_label: str, + target_label: str, + evidence: str | None = None, +) -> None: + edge_attrs = {"relation": relation} + if evidence: + edge_attrs["evidence"] = clean_text(evidence)[:1000] + graph.add_edge(source, target, **edge_attrs) + triples.append( + { + "subject_id": source, + "subject": clean_text(source_label), + "predicate": relation, + "object_id": target, + "object": clean_text(target_label), + "evidence": clean_text(evidence or ""), + } + ) + + +def split_active_substances(values: Any) -> list[str]: + if not values: + return [] + if isinstance(values, list): + items = values + else: + items = re.split(r"[,;\n]", str(values)) + cleaned = [clean_text(item) for item in items if clean_text(item)] + return list(dict.fromkeys(cleaned)) + + +def iter_records(path: Path, limit: int | None = None) -> list[dict[str, Any]]: + data = json.loads(path.read_text(encoding="utf-8")) + if not isinstance(data, list): + raise ValueError(f"Expected a JSON list in {path}") + if limit is not None: + return data[:limit] + return data + + +def add_text_evidence( + graph: nx.MultiDiGraph, + triples: list[dict[str, str]], + drug_id: str, + drug_label: str, + relation: str, + kind: str, + label: str, + text: str | None, +) -> None: + text = clean_text(text) + if not text: + return + evidence_id = text_node_id(kind, drug_id, text) + add_node( + graph, + evidence_id, + label, + kind, + text=text[:4000], + char_count=len(text), + ) + add_edge(graph, triples, drug_id, relation, evidence_id, drug_label, label, evidence=text) + + +def build_graph(records: list[dict[str, Any]]) -> tuple[nx.MultiDiGraph, list[dict[str, str]], dict[str, Any]]: + graph = nx.MultiDiGraph() + triples: list[dict[str, str]] = [] + counters: Counter[str] = Counter() + + for record in records: + product = record.get("product") or {} + graph_hints = record.get("graph_hints") or {} + fields = product.get("detail_fields") or {} + urls = record.get("urls") or {} + + drug_label = clean_text(graph_hints.get("drug") or product.get("name") or product.get("slug")) + if not drug_label: + counters["skipped_missing_drug_name"] += 1 + continue + + drug_key = product.get("product_id") or fields.get("kod_statnej_autority_sukl") or urls.get("detail") or drug_label + drug_id = node_id("drug", str(drug_key)) + add_node( + graph, + drug_id, + drug_label, + "Drug", + product_id=product.get("product_id"), + sukl_code=fields.get("kod_statnej_autority_sukl"), + detail_url=urls.get("detail"), + pil_url=urls.get("pil"), + dosage_form=graph_hints.get("dosage_form"), + ) + counters["drugs"] += 1 + + for substance in split_active_substances(graph_hints.get("active_substances")): + substance_id = node_id("substance", substance) + add_node(graph, substance_id, substance, "ActiveSubstance") + add_edge( + graph, + triples, + drug_id, + "HAS_ACTIVE_SUBSTANCE", + substance_id, + drug_label, + substance, + ) + counters["active_substance_edges"] += 1 + + dosage_form = clean_text(graph_hints.get("dosage_form")) + if dosage_form: + dosage_id = node_id("dosage_form", dosage_form) + add_node(graph, dosage_id, dosage_form, "DosageForm") + add_edge(graph, triples, drug_id, "HAS_DOSAGE_FORM", dosage_id, drug_label, dosage_form) + counters["dosage_form_edges"] += 1 + + for holder_key, relation, kind in [ + ("marketing_authorization_holder", "HAS_MARKETING_AUTHORIZATION_HOLDER", "MAHolder"), + ("manufacturer", "HAS_MANUFACTURER", "Manufacturer"), + ("supplier", "HAS_SUPPLIER", "Supplier"), + ]: + value = clean_text(graph_hints.get(holder_key)) + if not value: + continue + target_id = node_id(kind.lower(), value) + add_node(graph, target_id, value, kind) + add_edge(graph, triples, drug_id, relation, target_id, drug_label, value) + counters[f"{relation.lower()}_edges"] += 1 + + sukl_code = clean_text(graph_hints.get("sukl_code")) + if sukl_code: + sukl_id = node_id("sukl_code", sukl_code) + add_node(graph, sukl_id, sukl_code, "SuklCode") + add_edge(graph, triples, drug_id, "HAS_SUKL_CODE", sukl_id, drug_label, sukl_code) + counters["sukl_code_edges"] += 1 + + for item in graph_hints.get("classification_codes") or []: + if not isinstance(item, dict): + continue + code = clean_text(item.get("code")) + name = clean_text(item.get("name")) + if not code and not name: + continue + label = f"{code} - {name}".strip(" -") + class_id = node_id("classification", label) + add_node(graph, class_id, label, "Classification", code=code, name=name) + add_edge(graph, triples, drug_id, "BELONGS_TO_CLASS", class_id, drug_label, label) + counters["classification_edges"] += 1 + + add_text_evidence( + graph, + triples, + drug_id, + drug_label, + "HAS_INDICATION_TEXT", + "IndicationText", + "Indications", + graph_hints.get("indications_text"), + ) + add_text_evidence( + graph, + triples, + drug_id, + drug_label, + "HAS_DOSAGE_TEXT", + "DosageText", + "Dosage", + graph_hints.get("dosage_text"), + ) + add_text_evidence( + graph, + triples, + drug_id, + drug_label, + "HAS_CONTRAINDICATION_TEXT", + "ContraindicationText", + "Contraindications", + graph_hints.get("contraindications_text"), + ) + add_text_evidence( + graph, + triples, + drug_id, + drug_label, + "HAS_WARNING_TEXT", + "WarningText", + "Warnings", + graph_hints.get("warnings_text"), + ) + add_text_evidence( + graph, + triples, + drug_id, + drug_label, + "HAS_INTERACTION_TEXT", + "InteractionText", + "Interactions", + graph_hints.get("interactions_text"), + ) + add_text_evidence( + graph, + triples, + drug_id, + drug_label, + "HAS_SIDE_EFFECT_TEXT", + "SideEffectText", + "Side effects", + graph_hints.get("side_effects_text"), + ) + + kind_counts = Counter(data.get("kind", "Unknown") for _, data in graph.nodes(data=True)) + relation_counts = Counter(data.get("relation", "UNKNOWN") for _, _, data in graph.edges(data=True)) + stats = { + "input_records": len(records), + "nodes": graph.number_of_nodes(), + "edges": graph.number_of_edges(), + "node_kinds": dict(kind_counts.most_common()), + "relations": dict(relation_counts.most_common()), + "counters": dict(counters.most_common()), + } + return graph, triples, stats + + +def write_triples(path: Path, triples: list[dict[str, str]]) -> None: + with path.open("w", encoding="utf-8") as out: + for triple in triples: + out.write(json.dumps(triple, ensure_ascii=False) + "\n") + + +def main() -> None: + parser = argparse.ArgumentParser(description="Build ADC knowledge graph from structured JSON.") + parser.add_argument("--input", type=Path, default=DEFAULT_INPUT) + parser.add_argument("--out-dir", type=Path, default=DEFAULT_OUT_DIR) + parser.add_argument("--limit", type=int, default=None) + args = parser.parse_args() + + records = iter_records(args.input, limit=args.limit) + graph, triples, stats = build_graph(records) + + args.out_dir.mkdir(parents=True, exist_ok=True) + graphml_path = args.out_dir / "adc_knowledge_graph.graphml" + triples_path = args.out_dir / "adc_knowledge_triples.jsonl" + stats_path = args.out_dir / "adc_graph_stats.json" + + nx.write_graphml(graph, graphml_path) + write_triples(triples_path, triples) + stats_path.write_text(json.dumps(stats, ensure_ascii=False, indent=2), encoding="utf-8") + + print(f"Input records: {stats['input_records']}") + print(f"Nodes: {stats['nodes']}") + print(f"Edges: {stats['edges']}") + print(f"GraphML: {graphml_path}") + print(f"Triples: {triples_path}") + print(f"Stats: {stats_path}") + + +if __name__ == "__main__": + main()