DiplomovaPraca/scripts/kg/build_adc_knowledge_graph.py
2026-05-14 12:32:49 +02:00

335 lines
11 KiB
Python

"""Build an explicit ADC knowledge graph from structured scraper output.
The script reads records produced by scripts/adc_scraper/scrape_adc_product_data.py
and creates:
- GraphML for graph tools / NetworkX / Gephi
- JSONL triples for inspection and downstream processing
- JSON statistics for quick validation
"""
from __future__ import annotations
import argparse
import hashlib
import json
import re
from collections import Counter
from pathlib import Path
from typing import Any
import networkx as nx
DEFAULT_INPUT = Path("data_adc_databaza/adc_scrape_2026_05_04/adc_products_structured.json")
DEFAULT_OUT_DIR = Path("outputs/knowledge_graph")
def clean_text(value: Any) -> str:
if value is None:
return ""
text = str(value).replace("\xa0", " ")
text = re.sub(r"\s+", " ", text)
return text.strip()
def slugify(value: str, fallback: str = "unknown") -> str:
value = clean_text(value).lower()
value = re.sub(r"[^a-z0-9]+", "_", value)
value = value.strip("_")
return value or fallback
def short_hash(value: str, length: int = 12) -> str:
return hashlib.sha1(value.encode("utf-8")).hexdigest()[:length]
def node_id(kind: str, value: str) -> str:
cleaned = clean_text(value)
return f"{kind}:{slugify(cleaned)[:80]}:{short_hash(cleaned)}"
def text_node_id(kind: str, drug_id: str, text: str) -> str:
return f"{kind}:{short_hash(drug_id + '|' + clean_text(text), 16)}"
def add_node(graph: nx.MultiDiGraph, node: str, label: str, kind: str, **attrs: Any) -> None:
payload = {
"label": clean_text(label),
"kind": kind,
}
for key, value in attrs.items():
if value is not None:
payload[key] = clean_text(value) if not isinstance(value, (int, float)) else value
if graph.has_node(node):
graph.nodes[node].update({k: v for k, v in payload.items() if v})
else:
graph.add_node(node, **payload)
def add_edge(
graph: nx.MultiDiGraph,
triples: list[dict[str, str]],
source: str,
relation: str,
target: str,
source_label: str,
target_label: str,
evidence: str | None = None,
) -> None:
edge_attrs = {"relation": relation}
if evidence:
edge_attrs["evidence"] = clean_text(evidence)[:1000]
graph.add_edge(source, target, **edge_attrs)
triples.append(
{
"subject_id": source,
"subject": clean_text(source_label),
"predicate": relation,
"object_id": target,
"object": clean_text(target_label),
"evidence": clean_text(evidence or ""),
}
)
def split_active_substances(values: Any) -> list[str]:
if not values:
return []
if isinstance(values, list):
items = values
else:
items = re.split(r"[,;\n]", str(values))
cleaned = [clean_text(item) for item in items if clean_text(item)]
return list(dict.fromkeys(cleaned))
def iter_records(path: Path, limit: int | None = None) -> list[dict[str, Any]]:
data = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(data, list):
raise ValueError(f"Expected a JSON list in {path}")
if limit is not None:
return data[:limit]
return data
def add_text_evidence(
graph: nx.MultiDiGraph,
triples: list[dict[str, str]],
drug_id: str,
drug_label: str,
relation: str,
kind: str,
label: str,
text: str | None,
) -> None:
text = clean_text(text)
if not text:
return
evidence_id = text_node_id(kind, drug_id, text)
add_node(
graph,
evidence_id,
label,
kind,
text=text[:4000],
char_count=len(text),
)
add_edge(graph, triples, drug_id, relation, evidence_id, drug_label, label, evidence=text)
def build_graph(records: list[dict[str, Any]]) -> tuple[nx.MultiDiGraph, list[dict[str, str]], dict[str, Any]]:
graph = nx.MultiDiGraph()
triples: list[dict[str, str]] = []
counters: Counter[str] = Counter()
for record in records:
product = record.get("product") or {}
graph_hints = record.get("graph_hints") or {}
fields = product.get("detail_fields") or {}
urls = record.get("urls") or {}
drug_label = clean_text(graph_hints.get("drug") or product.get("name") or product.get("slug"))
if not drug_label:
counters["skipped_missing_drug_name"] += 1
continue
drug_key = product.get("product_id") or fields.get("kod_statnej_autority_sukl") or urls.get("detail") or drug_label
drug_id = node_id("drug", str(drug_key))
add_node(
graph,
drug_id,
drug_label,
"Drug",
product_id=product.get("product_id"),
sukl_code=fields.get("kod_statnej_autority_sukl"),
detail_url=urls.get("detail"),
pil_url=urls.get("pil"),
dosage_form=graph_hints.get("dosage_form"),
)
counters["drugs"] += 1
for substance in split_active_substances(graph_hints.get("active_substances")):
substance_id = node_id("substance", substance)
add_node(graph, substance_id, substance, "ActiveSubstance")
add_edge(
graph,
triples,
drug_id,
"HAS_ACTIVE_SUBSTANCE",
substance_id,
drug_label,
substance,
)
counters["active_substance_edges"] += 1
dosage_form = clean_text(graph_hints.get("dosage_form"))
if dosage_form:
dosage_id = node_id("dosage_form", dosage_form)
add_node(graph, dosage_id, dosage_form, "DosageForm")
add_edge(graph, triples, drug_id, "HAS_DOSAGE_FORM", dosage_id, drug_label, dosage_form)
counters["dosage_form_edges"] += 1
for holder_key, relation, kind in [
("marketing_authorization_holder", "HAS_MARKETING_AUTHORIZATION_HOLDER", "MAHolder"),
("manufacturer", "HAS_MANUFACTURER", "Manufacturer"),
("supplier", "HAS_SUPPLIER", "Supplier"),
]:
value = clean_text(graph_hints.get(holder_key))
if not value:
continue
target_id = node_id(kind.lower(), value)
add_node(graph, target_id, value, kind)
add_edge(graph, triples, drug_id, relation, target_id, drug_label, value)
counters[f"{relation.lower()}_edges"] += 1
sukl_code = clean_text(graph_hints.get("sukl_code"))
if sukl_code:
sukl_id = node_id("sukl_code", sukl_code)
add_node(graph, sukl_id, sukl_code, "SuklCode")
add_edge(graph, triples, drug_id, "HAS_SUKL_CODE", sukl_id, drug_label, sukl_code)
counters["sukl_code_edges"] += 1
for item in graph_hints.get("classification_codes") or []:
if not isinstance(item, dict):
continue
code = clean_text(item.get("code"))
name = clean_text(item.get("name"))
if not code and not name:
continue
label = f"{code} - {name}".strip(" -")
class_id = node_id("classification", label)
add_node(graph, class_id, label, "Classification", code=code, name=name)
add_edge(graph, triples, drug_id, "BELONGS_TO_CLASS", class_id, drug_label, label)
counters["classification_edges"] += 1
add_text_evidence(
graph,
triples,
drug_id,
drug_label,
"HAS_INDICATION_TEXT",
"IndicationText",
"Indications",
graph_hints.get("indications_text"),
)
add_text_evidence(
graph,
triples,
drug_id,
drug_label,
"HAS_DOSAGE_TEXT",
"DosageText",
"Dosage",
graph_hints.get("dosage_text"),
)
add_text_evidence(
graph,
triples,
drug_id,
drug_label,
"HAS_CONTRAINDICATION_TEXT",
"ContraindicationText",
"Contraindications",
graph_hints.get("contraindications_text"),
)
add_text_evidence(
graph,
triples,
drug_id,
drug_label,
"HAS_WARNING_TEXT",
"WarningText",
"Warnings",
graph_hints.get("warnings_text"),
)
add_text_evidence(
graph,
triples,
drug_id,
drug_label,
"HAS_INTERACTION_TEXT",
"InteractionText",
"Interactions",
graph_hints.get("interactions_text"),
)
add_text_evidence(
graph,
triples,
drug_id,
drug_label,
"HAS_SIDE_EFFECT_TEXT",
"SideEffectText",
"Side effects",
graph_hints.get("side_effects_text"),
)
kind_counts = Counter(data.get("kind", "Unknown") for _, data in graph.nodes(data=True))
relation_counts = Counter(data.get("relation", "UNKNOWN") for _, _, data in graph.edges(data=True))
stats = {
"input_records": len(records),
"nodes": graph.number_of_nodes(),
"edges": graph.number_of_edges(),
"node_kinds": dict(kind_counts.most_common()),
"relations": dict(relation_counts.most_common()),
"counters": dict(counters.most_common()),
}
return graph, triples, stats
def write_triples(path: Path, triples: list[dict[str, str]]) -> None:
with path.open("w", encoding="utf-8") as out:
for triple in triples:
out.write(json.dumps(triple, ensure_ascii=False) + "\n")
def main() -> None:
parser = argparse.ArgumentParser(description="Build ADC knowledge graph from structured JSON.")
parser.add_argument("--input", type=Path, default=DEFAULT_INPUT)
parser.add_argument("--out-dir", type=Path, default=DEFAULT_OUT_DIR)
parser.add_argument("--limit", type=int, default=None)
args = parser.parse_args()
records = iter_records(args.input, limit=args.limit)
graph, triples, stats = build_graph(records)
args.out_dir.mkdir(parents=True, exist_ok=True)
graphml_path = args.out_dir / "adc_knowledge_graph.graphml"
triples_path = args.out_dir / "adc_knowledge_triples.jsonl"
stats_path = args.out_dir / "adc_graph_stats.json"
nx.write_graphml(graph, graphml_path)
write_triples(triples_path, triples)
stats_path.write_text(json.dumps(stats, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"Input records: {stats['input_records']}")
print(f"Nodes: {stats['nodes']}")
print(f"Edges: {stats['edges']}")
print(f"GraphML: {graphml_path}")
print(f"Triples: {triples_path}")
print(f"Stats: {stats_path}")
if __name__ == "__main__":
main()