DiplomovaPraca/scripts/adc_scraper/scrape_adc_product_data.py
2026-05-14 12:26:11 +02:00

581 lines
20 KiB
Python

"""Scrape structured ADC product data from detail and PIL pages.
Example:
python scripts/adc_scraper/scrape_adc_product_data.py --browser --limit 10
"""
from __future__ import annotations
import argparse
import json
import re
import time
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Callable, Iterable
from urllib.parse import urljoin, urlparse
import requests
from bs4 import BeautifulSoup, Tag
from tqdm import tqdm
BASE_URL = "https://www.adc.sk"
DEFAULT_DATA_DIR = Path("data_adc_databaza/adc_scrape_2026_05_04")
DEFAULT_LINKS = DEFAULT_DATA_DIR / "adc_product_links.json"
DEFAULT_OUT = DEFAULT_DATA_DIR / "adc_products_structured.json"
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "sk-SK,sk;q=0.9,cs;q=0.8,en;q=0.7",
"Cache-Control": "no-cache",
"Pragma": "no-cache",
"Referer": "https://www.adc.sk/databazy/produkty",
}
DETAIL_SECTION_ALIASES = {
"Popis a určenie": "description_and_indications",
"Použitie": "use_and_dosage",
"Nežiaduce účinky": "side_effects",
"Účinné látky": "active_substances",
"Indikačná skupina": "indication_group",
"ADC Klasifikácia produktu": "adc_classification",
"Všeobecné informácie vzťahujúce sa k produktu": "general_product_info",
}
PIL_SECTION_PATTERNS = {
"what_is_it": r"^1\.\s+Čo je .+",
"before_use": r"^2\.\s+Čo potrebujete vedieť .+",
"how_to_use": r"^3\.\s+Ako .+",
"side_effects": r"^4\.\s+Možné .+účinky",
"storage": r"^5\.\s+Ako uchovávať .+",
"package_info": r"^6\.\s+Obsah balenia .+",
}
PIL_SUBSECTION_ALIASES = {
"contraindications": [
r"^Neužívajte .+",
r"^Nepoužívajte .+",
r"^Nesmiete .+",
],
"warnings": [
r"^Upozornenia a opatrenia",
r"^Buďte zvlášť opatrný .+",
],
"interactions": [
r"^Iné lieky a .+",
r"^Užívanie .+ s inými liekmi",
],
"pregnancy_breastfeeding": [
r"^Tehotenstvo.*dojčenie.*",
r"^Tehotenstvo.*",
],
"driving": [
r"^Vedenie vozidiel .+",
],
}
@dataclass(frozen=True)
class ProductUrls:
detail_url: str
pil_url: str
spc_url: str
def clean_text(value: str) -> str:
value = value.replace("\xa0", " ")
value = re.sub(r"[ \t\r\f\v]+", " ", value)
value = re.sub(r"\n{3,}", "\n\n", value)
value = re.sub(r"(?im)^reklama$", "", value)
return value.strip()
def normalize_key(value: str) -> str:
value = clean_text(value).lower()
replacements = {
"á": "a",
"ä": "a",
"č": "c",
"ď": "d",
"é": "e",
"í": "i",
"ľ": "l",
"ĺ": "l",
"ň": "n",
"ó": "o",
"ô": "o",
"ŕ": "r",
"š": "s",
"ť": "t",
"ú": "u",
"ý": "y",
"ž": "z",
}
for source, target in replacements.items():
value = value.replace(source, target)
value = re.sub(r"[^a-z0-9]+", "_", value)
return value.strip("_")
def product_urls(detail_url: str) -> ProductUrls:
return ProductUrls(
detail_url=detail_url,
pil_url=detail_url.replace("/detail/", "/pil/"),
spc_url=detail_url.replace("/detail/", "/spc/"),
)
def product_id_from_url(url: str) -> str | None:
match = re.search(r"-(\d+)\.html(?:$|\?)", urlparse(url).path)
return match.group(1) if match else None
def slug_from_url(url: str) -> str:
name = Path(urlparse(url).path).name
return re.sub(r"-\d+\.html$", "", name)
def load_links(path: Path) -> list[str]:
data = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(data, list):
raise ValueError(f"Expected a JSON list in {path}")
return [str(item) for item in data if str(item).strip()]
def soup_from_html(html: str) -> BeautifulSoup:
return BeautifulSoup(html, "lxml")
def remove_noise(root: Tag) -> None:
for tag in root.select(
"script, style, noscript, nav, header, footer, iframe, form, "
".modal, .adbl, .ad-video-fake, .breadcrumb, .piktograms"
):
tag.decompose()
def node_text(node: Tag) -> str:
remove_noise(node)
return clean_text(node.get_text("\n", strip=True))
def parse_json_ld_product(soup: BeautifulSoup) -> dict[str, str | None]:
for script in soup.find_all("script", {"type": "application/ld+json"}):
raw = script.string or script.get_text()
if not raw.strip():
continue
try:
data = json.loads(raw)
except json.JSONDecodeError:
continue
items = data if isinstance(data, list) else [data]
for item in items:
if isinstance(item, dict) and item.get("@type") == "Product":
return {
"name": item.get("name"),
"description": item.get("description"),
"image_url": item.get("image"),
}
return {}
def parse_info_rows(soup: BeautifulSoup) -> dict[str, str]:
fields: dict[str, str] = {}
for row in soup.select(".pmi-info-row"):
children = [child for child in row.find_all(recursive=False) if isinstance(child, Tag)]
if len(children) >= 2:
key = clean_text(children[0].get_text(" ", strip=True))
value = clean_text(" ".join(child.get_text(" ", strip=True) for child in children[1:]))
else:
parts = [part.strip() for part in row.get_text("|", strip=True).split("|") if part.strip()]
if len(parts) < 2:
continue
key, value = parts[0], " ".join(parts[1:])
if key and value:
fields[normalize_key(key)] = value
for table in soup.find_all("table"):
for tr in table.find_all("tr"):
cells = [clean_text(c.get_text(" ", strip=True)) for c in tr.find_all(["th", "td"])]
if len(cells) == 2 and len(cells[0]) <= 80 and cells[1]:
fields.setdefault(normalize_key(cells[0]), cells[1])
return fields
def collect_until_next_section(header: Tag) -> str:
parts: list[str] = []
for sibling in header.next_siblings:
if isinstance(sibling, Tag) and sibling.name == "h4" and "section-product" in sibling.get("class", []):
break
if not isinstance(sibling, Tag):
continue
clone = BeautifulSoup(str(sibling), "lxml")
text = node_text(clone)
if text and text != clean_text(header.get_text(" ", strip=True)):
parts.append(text)
return clean_text("\n".join(parts))
def parse_detail_sections(soup: BeautifulSoup) -> dict[str, str]:
sections: dict[str, str] = {}
for header in soup.select("h4.section-product"):
title = clean_text(header.get_text(" ", strip=True))
key = DETAIL_SECTION_ALIASES.get(title, normalize_key(title))
text = collect_until_next_section(header)
if text:
sections[key] = text
return sections
def parse_classification(soup: BeautifulSoup) -> list[dict[str, str]]:
levels: list[dict[str, str]] = []
box = soup.select_one(".classification-levels")
if not box:
return levels
for tr in box.find_all("tr"):
cells = [clean_text(c.get_text(" ", strip=True)) for c in tr.find_all("td")]
if len(cells) >= 2:
levels.append({"code": cells[0], "name": cells[1]})
return levels
def parse_detail_page(html: str, detail_url: str) -> dict:
soup = soup_from_html(html)
json_ld = parse_json_ld_product(soup)
h1 = soup.find("h1")
fields = parse_info_rows(soup)
sections = parse_detail_sections(soup)
return {
"product_id": product_id_from_url(detail_url),
"slug": slug_from_url(detail_url),
"name": json_ld.get("name") or (clean_text(h1.get_text(" ", strip=True)) if h1 else None),
"short_description": clean_text(str(json_ld.get("description") or "")) or None,
"image_url": json_ld.get("image_url"),
"detail_fields": fields,
"sections": sections,
"classification": parse_classification(soup),
"active_substances": split_list_field(sections.get("active_substances") or ""),
"indication_group": sections.get("indication_group"),
}
def split_list_field(value: str) -> list[str]:
if not value:
return []
items = [clean_text(item) for item in re.split(r"\n|,|;", value) if clean_text(item)]
return list(dict.fromkeys(items))
def extract_article_text(html: str) -> str:
soup = soup_from_html(html)
article = soup.find("article")
if article:
return node_text(article)
fallback = soup.find("div", id="product") or soup.body or soup
return node_text(fallback)
def split_by_numbered_pil_sections(text: str) -> dict[str, str]:
lines = [line.strip() for line in text.splitlines() if line.strip()]
starts: list[tuple[str, int]] = []
for idx, line in enumerate(lines):
for key, pattern in PIL_SECTION_PATTERNS.items():
if re.match(pattern, line, flags=re.IGNORECASE):
starts.append((key, idx))
break
sections: dict[str, str] = {}
for pos, (key, idx) in enumerate(starts):
end = starts[pos + 1][1] if pos + 1 < len(starts) else len(lines)
sections[key] = clean_text("\n".join(lines[idx:end]))
return sections
def split_pil_subsections(before_use_text: str) -> dict[str, str]:
if not before_use_text:
return {}
lines = [line.strip() for line in before_use_text.splitlines() if line.strip()]
starts: list[tuple[str, int]] = []
for idx, line in enumerate(lines):
for key, patterns in PIL_SUBSECTION_ALIASES.items():
if any(re.match(pattern, line, flags=re.IGNORECASE) for pattern in patterns):
starts.append((key, idx))
break
result: dict[str, str] = {}
for pos, (key, idx) in enumerate(starts):
end = starts[pos + 1][1] if pos + 1 < len(starts) else len(lines)
result[key] = clean_text("\n".join(lines[idx:end]))
return result
def parse_pil_page(html: str) -> dict:
text = extract_article_text(html)
sections = split_by_numbered_pil_sections(text)
subsections = split_pil_subsections(sections.get("before_use", ""))
return {
"full_text": text,
"sections": sections,
"subsections": subsections,
}
def build_lightrag_text(detail: dict, pil: dict | None, urls: ProductUrls) -> str:
chunks: list[str] = []
name = detail.get("name") or detail.get("slug") or urls.detail_url
chunks.append(f"Liek: {name}")
chunks.append(f"ADC detail URL: {urls.detail_url}")
chunks.append(f"ADC PIL URL: {urls.pil_url}")
fields = detail.get("detail_fields") or {}
important_fields = [
"registracne_cislo_produktu",
"kod_statnej_autority_sukl",
"nazov_produktu_podla_sukl",
"aplikacna_forma",
"vyrobca",
"drzitel_rozhodnutia",
"dodavatelia",
"vydaj",
"typ_produktu",
"legislativne_zatriedenie",
]
for key in important_fields:
if fields.get(key):
chunks.append(f"{key}: {fields[key]}")
for section_key, title in [
("description_and_indications", "Popis a indikácie"),
("use_and_dosage", "Použitie a dávkovanie"),
("side_effects", "Nežiaduce účinky"),
("active_substances", "Účinné látky"),
("indication_group", "Indikačná skupina"),
("general_product_info", "Všeobecné informácie"),
]:
text = (detail.get("sections") or {}).get(section_key)
if text:
chunks.append(f"\n{title}\n{text}")
if pil:
subsections = pil.get("subsections") or {}
for key, title in [
("contraindications", "Kontraindikácie z PIL"),
("warnings", "Upozornenia z PIL"),
("interactions", "Interakcie z PIL"),
("pregnancy_breastfeeding", "Tehotenstvo a dojčenie z PIL"),
("driving", "Vedenie vozidiel z PIL"),
]:
if subsections.get(key):
chunks.append(f"\n{title}\n{subsections[key]}")
for key, title in [
("what_is_it", "Na čo sa používa z PIL"),
("how_to_use", "Ako užívať z PIL"),
("side_effects", "Vedľajšie účinky z PIL"),
]:
section_text = (pil.get("sections") or {}).get(key)
if section_text:
chunks.append(f"\n{title}\n{section_text}")
return clean_text("\n\n".join(chunks))
def build_graph_hints(detail: dict, pil: dict | None) -> dict:
fields = detail.get("detail_fields") or {}
sections = detail.get("sections") or {}
pil_subsections = (pil or {}).get("subsections") or {}
pil_sections = (pil or {}).get("sections") or {}
return {
"drug": detail.get("name"),
"active_substances": detail.get("active_substances") or [],
"dosage_form": fields.get("aplikacna_forma"),
"manufacturer": fields.get("vyrobca"),
"marketing_authorization_holder": fields.get("drzitel_rozhodnutia"),
"supplier": fields.get("dodavatelia"),
"sukl_code": fields.get("kod_statnej_autority_sukl"),
"registration_number": fields.get("registracne_cislo_produktu"),
"classification_codes": detail.get("classification") or [],
"indications_text": sections.get("description_and_indications") or pil_sections.get("what_is_it"),
"dosage_text": sections.get("use_and_dosage") or pil_sections.get("how_to_use"),
"contraindications_text": pil_subsections.get("contraindications"),
"warnings_text": pil_subsections.get("warnings"),
"interactions_text": pil_subsections.get("interactions"),
"side_effects_text": sections.get("side_effects") or pil_sections.get("side_effects"),
}
def build_record(detail_html: str, pil_html: str | None, urls: ProductUrls) -> dict:
detail = parse_detail_page(detail_html, urls.detail_url)
pil = parse_pil_page(pil_html) if pil_html else None
scraped_at = datetime.now(timezone.utc).isoformat(timespec="seconds")
return {
"source": "adc.sk",
"scraped_at": scraped_at,
"urls": {
"detail": urls.detail_url,
"pil": urls.pil_url,
"spc": urls.spc_url,
},
"product": detail,
"pil": pil,
"graph_hints": build_graph_hints(detail, pil),
"lightrag_text": build_lightrag_text(detail, pil, urls),
}
def fetch_requests(session: requests.Session, url: str, timeout: int, retries: int) -> str:
last_error: Exception | None = None
for attempt in range(1, retries + 1):
try:
response = session.get(url, headers=HEADERS, timeout=timeout)
response.raise_for_status()
response.encoding = response.apparent_encoding or "utf-8"
return response.text
except Exception as exc:
last_error = exc
if attempt < retries:
time.sleep(1.5 * attempt)
raise RuntimeError(f"Failed to fetch {url}: {last_error}")
def make_requests_fetcher(timeout: int, retries: int) -> Callable[[str], str]:
session = requests.Session()
return lambda url: fetch_requests(session, url, timeout, retries)
def make_browser_fetcher() -> tuple[Callable[[str], str], Callable[[], None]]:
try:
from playwright.sync_api import sync_playwright
except ImportError as exc:
raise SystemExit(
"Playwright is not installed. Run: pip install playwright; python -m playwright install chromium"
) from exc
playwright = sync_playwright().start()
browser = playwright.chromium.launch(headless=True)
page = browser.new_page(
user_agent=HEADERS["User-Agent"],
locale="sk-SK",
viewport={"width": 1366, "height": 900},
)
def fetch(url: str) -> str:
response = page.goto(url, wait_until="domcontentloaded", timeout=60000)
if response is None or response.status >= 400:
status = response.status if response else "no-response"
raise RuntimeError(f"HTTP {status} for {url}")
return page.content()
def close() -> None:
browser.close()
playwright.stop()
return fetch, close
def iter_links(links: Iterable[str], limit: int | None) -> Iterable[str]:
count = 0
for link in links:
if limit is not None and count >= limit:
break
count += 1
yield link
def write_records_json(
out_path: Path,
links: list[str],
fetch: Callable[[str], str],
limit: int | None,
delay: float,
skip_failed: bool,
) -> list[dict[str, str]]:
out_path.parent.mkdir(parents=True, exist_ok=True)
failures: list[dict[str, str]] = []
selected_links = list(iter_links(links, limit))
with out_path.open("w", encoding="utf-8") as out:
out.write("[\n")
wrote_any = False
for detail_url in tqdm(selected_links, desc="ADC products"):
urls = product_urls(detail_url)
try:
detail_html = fetch(urls.detail_url)
time.sleep(delay)
pil_html = fetch(urls.pil_url)
record = build_record(detail_html, pil_html, urls)
except Exception as exc:
failures.append({"url": detail_url, "error": str(exc)})
tqdm.write(f"Failed product {detail_url}: {exc}")
if not skip_failed:
raise
continue
if wrote_any:
out.write(",\n")
json.dump(record, out, ensure_ascii=False, indent=2)
wrote_any = True
out.flush()
time.sleep(delay)
out.write("\n]\n")
return failures
def main() -> None:
parser = argparse.ArgumentParser(description="Scrape ADC product detail + PIL data into structured JSON.")
parser.add_argument("--links", type=Path, default=DEFAULT_LINKS, help="Input JSON list with detail URLs.")
parser.add_argument("--out", type=Path, default=DEFAULT_OUT, help="Output structured JSON file.")
parser.add_argument("--limit", type=int, default=None, help="Scrape only the first N products.")
parser.add_argument("--delay", type=float, default=0.25, help="Delay between page loads in seconds.")
parser.add_argument("--timeout", type=int, default=30, help="HTTP timeout in seconds for requests mode.")
parser.add_argument("--retries", type=int, default=3, help="Retries per URL in requests mode.")
parser.add_argument("--browser", action="store_true", help="Use Playwright Chromium. Use this if ADC returns 403.")
parser.add_argument("--stop-on-fail", action="store_true", help="Stop on first failed product.")
args = parser.parse_args()
links = load_links(args.links)
close_browser: Callable[[], None] | None = None
if args.browser:
fetch, close_browser = make_browser_fetcher()
else:
fetch = make_requests_fetcher(args.timeout, args.retries)
try:
failures = write_records_json(
out_path=args.out,
links=links,
fetch=fetch,
limit=args.limit,
delay=args.delay,
skip_failed=not args.stop_on_fail,
)
finally:
if close_browser:
close_browser()
print(f"Saved structured product data to {args.out}")
if failures:
failed_path = args.out.with_suffix(".failed.json")
failed_path.write_text(json.dumps(failures, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"Failed products: {len(failures)}. Saved errors to {failed_path}")
if __name__ == "__main__":
main()