"""Scrape structured ADC product data from detail and PIL pages. Example: python scripts/adc_scraper/scrape_adc_product_data.py --browser --limit 10 """ from __future__ import annotations import argparse import json import re import time from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Callable, Iterable from urllib.parse import urljoin, urlparse import requests from bs4 import BeautifulSoup, Tag from tqdm import tqdm BASE_URL = "https://www.adc.sk" DEFAULT_DATA_DIR = Path("data_adc_databaza/adc_scrape_2026_05_04") DEFAULT_LINKS = DEFAULT_DATA_DIR / "adc_product_links.json" DEFAULT_OUT = DEFAULT_DATA_DIR / "adc_products_structured.json" HEADERS = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/124.0.0.0 Safari/537.36" ), "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "sk-SK,sk;q=0.9,cs;q=0.8,en;q=0.7", "Cache-Control": "no-cache", "Pragma": "no-cache", "Referer": "https://www.adc.sk/databazy/produkty", } DETAIL_SECTION_ALIASES = { "Popis a určenie": "description_and_indications", "Použitie": "use_and_dosage", "Nežiaduce účinky": "side_effects", "Účinné látky": "active_substances", "Indikačná skupina": "indication_group", "ADC Klasifikácia produktu": "adc_classification", "Všeobecné informácie vzťahujúce sa k produktu": "general_product_info", } PIL_SECTION_PATTERNS = { "what_is_it": r"^1\.\s+Čo je .+", "before_use": r"^2\.\s+Čo potrebujete vedieť .+", "how_to_use": r"^3\.\s+Ako .+", "side_effects": r"^4\.\s+Možné .+účinky", "storage": r"^5\.\s+Ako uchovávať .+", "package_info": r"^6\.\s+Obsah balenia .+", } PIL_SUBSECTION_ALIASES = { "contraindications": [ r"^Neužívajte .+", r"^Nepoužívajte .+", r"^Nesmiete .+", ], "warnings": [ r"^Upozornenia a opatrenia", r"^Buďte zvlášť opatrný .+", ], "interactions": [ r"^Iné lieky a .+", r"^Užívanie .+ s inými liekmi", ], "pregnancy_breastfeeding": [ r"^Tehotenstvo.*dojčenie.*", r"^Tehotenstvo.*", ], "driving": [ r"^Vedenie vozidiel .+", ], } @dataclass(frozen=True) class ProductUrls: detail_url: str pil_url: str spc_url: str def clean_text(value: str) -> str: value = value.replace("\xa0", " ") value = re.sub(r"[ \t\r\f\v]+", " ", value) value = re.sub(r"\n{3,}", "\n\n", value) value = re.sub(r"(?im)^reklama$", "", value) return value.strip() def normalize_key(value: str) -> str: value = clean_text(value).lower() replacements = { "á": "a", "ä": "a", "č": "c", "ď": "d", "é": "e", "í": "i", "ľ": "l", "ĺ": "l", "ň": "n", "ó": "o", "ô": "o", "ŕ": "r", "š": "s", "ť": "t", "ú": "u", "ý": "y", "ž": "z", } for source, target in replacements.items(): value = value.replace(source, target) value = re.sub(r"[^a-z0-9]+", "_", value) return value.strip("_") def product_urls(detail_url: str) -> ProductUrls: return ProductUrls( detail_url=detail_url, pil_url=detail_url.replace("/detail/", "/pil/"), spc_url=detail_url.replace("/detail/", "/spc/"), ) def product_id_from_url(url: str) -> str | None: match = re.search(r"-(\d+)\.html(?:$|\?)", urlparse(url).path) return match.group(1) if match else None def slug_from_url(url: str) -> str: name = Path(urlparse(url).path).name return re.sub(r"-\d+\.html$", "", name) def load_links(path: Path) -> list[str]: data = json.loads(path.read_text(encoding="utf-8")) if not isinstance(data, list): raise ValueError(f"Expected a JSON list in {path}") return [str(item) for item in data if str(item).strip()] def soup_from_html(html: str) -> BeautifulSoup: return BeautifulSoup(html, "lxml") def remove_noise(root: Tag) -> None: for tag in root.select( "script, style, noscript, nav, header, footer, iframe, form, " ".modal, .adbl, .ad-video-fake, .breadcrumb, .piktograms" ): tag.decompose() def node_text(node: Tag) -> str: remove_noise(node) return clean_text(node.get_text("\n", strip=True)) def parse_json_ld_product(soup: BeautifulSoup) -> dict[str, str | None]: for script in soup.find_all("script", {"type": "application/ld+json"}): raw = script.string or script.get_text() if not raw.strip(): continue try: data = json.loads(raw) except json.JSONDecodeError: continue items = data if isinstance(data, list) else [data] for item in items: if isinstance(item, dict) and item.get("@type") == "Product": return { "name": item.get("name"), "description": item.get("description"), "image_url": item.get("image"), } return {} def parse_info_rows(soup: BeautifulSoup) -> dict[str, str]: fields: dict[str, str] = {} for row in soup.select(".pmi-info-row"): children = [child for child in row.find_all(recursive=False) if isinstance(child, Tag)] if len(children) >= 2: key = clean_text(children[0].get_text(" ", strip=True)) value = clean_text(" ".join(child.get_text(" ", strip=True) for child in children[1:])) else: parts = [part.strip() for part in row.get_text("|", strip=True).split("|") if part.strip()] if len(parts) < 2: continue key, value = parts[0], " ".join(parts[1:]) if key and value: fields[normalize_key(key)] = value for table in soup.find_all("table"): for tr in table.find_all("tr"): cells = [clean_text(c.get_text(" ", strip=True)) for c in tr.find_all(["th", "td"])] if len(cells) == 2 and len(cells[0]) <= 80 and cells[1]: fields.setdefault(normalize_key(cells[0]), cells[1]) return fields def collect_until_next_section(header: Tag) -> str: parts: list[str] = [] for sibling in header.next_siblings: if isinstance(sibling, Tag) and sibling.name == "h4" and "section-product" in sibling.get("class", []): break if not isinstance(sibling, Tag): continue clone = BeautifulSoup(str(sibling), "lxml") text = node_text(clone) if text and text != clean_text(header.get_text(" ", strip=True)): parts.append(text) return clean_text("\n".join(parts)) def parse_detail_sections(soup: BeautifulSoup) -> dict[str, str]: sections: dict[str, str] = {} for header in soup.select("h4.section-product"): title = clean_text(header.get_text(" ", strip=True)) key = DETAIL_SECTION_ALIASES.get(title, normalize_key(title)) text = collect_until_next_section(header) if text: sections[key] = text return sections def parse_classification(soup: BeautifulSoup) -> list[dict[str, str]]: levels: list[dict[str, str]] = [] box = soup.select_one(".classification-levels") if not box: return levels for tr in box.find_all("tr"): cells = [clean_text(c.get_text(" ", strip=True)) for c in tr.find_all("td")] if len(cells) >= 2: levels.append({"code": cells[0], "name": cells[1]}) return levels def parse_detail_page(html: str, detail_url: str) -> dict: soup = soup_from_html(html) json_ld = parse_json_ld_product(soup) h1 = soup.find("h1") fields = parse_info_rows(soup) sections = parse_detail_sections(soup) return { "product_id": product_id_from_url(detail_url), "slug": slug_from_url(detail_url), "name": json_ld.get("name") or (clean_text(h1.get_text(" ", strip=True)) if h1 else None), "short_description": clean_text(str(json_ld.get("description") or "")) or None, "image_url": json_ld.get("image_url"), "detail_fields": fields, "sections": sections, "classification": parse_classification(soup), "active_substances": split_list_field(sections.get("active_substances") or ""), "indication_group": sections.get("indication_group"), } def split_list_field(value: str) -> list[str]: if not value: return [] items = [clean_text(item) for item in re.split(r"\n|,|;", value) if clean_text(item)] return list(dict.fromkeys(items)) def extract_article_text(html: str) -> str: soup = soup_from_html(html) article = soup.find("article") if article: return node_text(article) fallback = soup.find("div", id="product") or soup.body or soup return node_text(fallback) def split_by_numbered_pil_sections(text: str) -> dict[str, str]: lines = [line.strip() for line in text.splitlines() if line.strip()] starts: list[tuple[str, int]] = [] for idx, line in enumerate(lines): for key, pattern in PIL_SECTION_PATTERNS.items(): if re.match(pattern, line, flags=re.IGNORECASE): starts.append((key, idx)) break sections: dict[str, str] = {} for pos, (key, idx) in enumerate(starts): end = starts[pos + 1][1] if pos + 1 < len(starts) else len(lines) sections[key] = clean_text("\n".join(lines[idx:end])) return sections def split_pil_subsections(before_use_text: str) -> dict[str, str]: if not before_use_text: return {} lines = [line.strip() for line in before_use_text.splitlines() if line.strip()] starts: list[tuple[str, int]] = [] for idx, line in enumerate(lines): for key, patterns in PIL_SUBSECTION_ALIASES.items(): if any(re.match(pattern, line, flags=re.IGNORECASE) for pattern in patterns): starts.append((key, idx)) break result: dict[str, str] = {} for pos, (key, idx) in enumerate(starts): end = starts[pos + 1][1] if pos + 1 < len(starts) else len(lines) result[key] = clean_text("\n".join(lines[idx:end])) return result def parse_pil_page(html: str) -> dict: text = extract_article_text(html) sections = split_by_numbered_pil_sections(text) subsections = split_pil_subsections(sections.get("before_use", "")) return { "full_text": text, "sections": sections, "subsections": subsections, } def build_lightrag_text(detail: dict, pil: dict | None, urls: ProductUrls) -> str: chunks: list[str] = [] name = detail.get("name") or detail.get("slug") or urls.detail_url chunks.append(f"Liek: {name}") chunks.append(f"ADC detail URL: {urls.detail_url}") chunks.append(f"ADC PIL URL: {urls.pil_url}") fields = detail.get("detail_fields") or {} important_fields = [ "registracne_cislo_produktu", "kod_statnej_autority_sukl", "nazov_produktu_podla_sukl", "aplikacna_forma", "vyrobca", "drzitel_rozhodnutia", "dodavatelia", "vydaj", "typ_produktu", "legislativne_zatriedenie", ] for key in important_fields: if fields.get(key): chunks.append(f"{key}: {fields[key]}") for section_key, title in [ ("description_and_indications", "Popis a indikácie"), ("use_and_dosage", "Použitie a dávkovanie"), ("side_effects", "Nežiaduce účinky"), ("active_substances", "Účinné látky"), ("indication_group", "Indikačná skupina"), ("general_product_info", "Všeobecné informácie"), ]: text = (detail.get("sections") or {}).get(section_key) if text: chunks.append(f"\n{title}\n{text}") if pil: subsections = pil.get("subsections") or {} for key, title in [ ("contraindications", "Kontraindikácie z PIL"), ("warnings", "Upozornenia z PIL"), ("interactions", "Interakcie z PIL"), ("pregnancy_breastfeeding", "Tehotenstvo a dojčenie z PIL"), ("driving", "Vedenie vozidiel z PIL"), ]: if subsections.get(key): chunks.append(f"\n{title}\n{subsections[key]}") for key, title in [ ("what_is_it", "Na čo sa používa z PIL"), ("how_to_use", "Ako užívať z PIL"), ("side_effects", "Vedľajšie účinky z PIL"), ]: section_text = (pil.get("sections") or {}).get(key) if section_text: chunks.append(f"\n{title}\n{section_text}") return clean_text("\n\n".join(chunks)) def build_graph_hints(detail: dict, pil: dict | None) -> dict: fields = detail.get("detail_fields") or {} sections = detail.get("sections") or {} pil_subsections = (pil or {}).get("subsections") or {} pil_sections = (pil or {}).get("sections") or {} return { "drug": detail.get("name"), "active_substances": detail.get("active_substances") or [], "dosage_form": fields.get("aplikacna_forma"), "manufacturer": fields.get("vyrobca"), "marketing_authorization_holder": fields.get("drzitel_rozhodnutia"), "supplier": fields.get("dodavatelia"), "sukl_code": fields.get("kod_statnej_autority_sukl"), "registration_number": fields.get("registracne_cislo_produktu"), "classification_codes": detail.get("classification") or [], "indications_text": sections.get("description_and_indications") or pil_sections.get("what_is_it"), "dosage_text": sections.get("use_and_dosage") or pil_sections.get("how_to_use"), "contraindications_text": pil_subsections.get("contraindications"), "warnings_text": pil_subsections.get("warnings"), "interactions_text": pil_subsections.get("interactions"), "side_effects_text": sections.get("side_effects") or pil_sections.get("side_effects"), } def build_record(detail_html: str, pil_html: str | None, urls: ProductUrls) -> dict: detail = parse_detail_page(detail_html, urls.detail_url) pil = parse_pil_page(pil_html) if pil_html else None scraped_at = datetime.now(timezone.utc).isoformat(timespec="seconds") return { "source": "adc.sk", "scraped_at": scraped_at, "urls": { "detail": urls.detail_url, "pil": urls.pil_url, "spc": urls.spc_url, }, "product": detail, "pil": pil, "graph_hints": build_graph_hints(detail, pil), "lightrag_text": build_lightrag_text(detail, pil, urls), } def fetch_requests(session: requests.Session, url: str, timeout: int, retries: int) -> str: last_error: Exception | None = None for attempt in range(1, retries + 1): try: response = session.get(url, headers=HEADERS, timeout=timeout) response.raise_for_status() response.encoding = response.apparent_encoding or "utf-8" return response.text except Exception as exc: last_error = exc if attempt < retries: time.sleep(1.5 * attempt) raise RuntimeError(f"Failed to fetch {url}: {last_error}") def make_requests_fetcher(timeout: int, retries: int) -> Callable[[str], str]: session = requests.Session() return lambda url: fetch_requests(session, url, timeout, retries) def make_browser_fetcher() -> tuple[Callable[[str], str], Callable[[], None]]: try: from playwright.sync_api import sync_playwright except ImportError as exc: raise SystemExit( "Playwright is not installed. Run: pip install playwright; python -m playwright install chromium" ) from exc playwright = sync_playwright().start() browser = playwright.chromium.launch(headless=True) page = browser.new_page( user_agent=HEADERS["User-Agent"], locale="sk-SK", viewport={"width": 1366, "height": 900}, ) def fetch(url: str) -> str: response = page.goto(url, wait_until="domcontentloaded", timeout=60000) if response is None or response.status >= 400: status = response.status if response else "no-response" raise RuntimeError(f"HTTP {status} for {url}") return page.content() def close() -> None: browser.close() playwright.stop() return fetch, close def iter_links(links: Iterable[str], limit: int | None) -> Iterable[str]: count = 0 for link in links: if limit is not None and count >= limit: break count += 1 yield link def write_records_json( out_path: Path, links: list[str], fetch: Callable[[str], str], limit: int | None, delay: float, skip_failed: bool, ) -> list[dict[str, str]]: out_path.parent.mkdir(parents=True, exist_ok=True) failures: list[dict[str, str]] = [] selected_links = list(iter_links(links, limit)) with out_path.open("w", encoding="utf-8") as out: out.write("[\n") wrote_any = False for detail_url in tqdm(selected_links, desc="ADC products"): urls = product_urls(detail_url) try: detail_html = fetch(urls.detail_url) time.sleep(delay) pil_html = fetch(urls.pil_url) record = build_record(detail_html, pil_html, urls) except Exception as exc: failures.append({"url": detail_url, "error": str(exc)}) tqdm.write(f"Failed product {detail_url}: {exc}") if not skip_failed: raise continue if wrote_any: out.write(",\n") json.dump(record, out, ensure_ascii=False, indent=2) wrote_any = True out.flush() time.sleep(delay) out.write("\n]\n") return failures def main() -> None: parser = argparse.ArgumentParser(description="Scrape ADC product detail + PIL data into structured JSON.") parser.add_argument("--links", type=Path, default=DEFAULT_LINKS, help="Input JSON list with detail URLs.") parser.add_argument("--out", type=Path, default=DEFAULT_OUT, help="Output structured JSON file.") parser.add_argument("--limit", type=int, default=None, help="Scrape only the first N products.") parser.add_argument("--delay", type=float, default=0.25, help="Delay between page loads in seconds.") parser.add_argument("--timeout", type=int, default=30, help="HTTP timeout in seconds for requests mode.") parser.add_argument("--retries", type=int, default=3, help="Retries per URL in requests mode.") parser.add_argument("--browser", action="store_true", help="Use Playwright Chromium. Use this if ADC returns 403.") parser.add_argument("--stop-on-fail", action="store_true", help="Stop on first failed product.") args = parser.parse_args() links = load_links(args.links) close_browser: Callable[[], None] | None = None if args.browser: fetch, close_browser = make_browser_fetcher() else: fetch = make_requests_fetcher(args.timeout, args.retries) try: failures = write_records_json( out_path=args.out, links=links, fetch=fetch, limit=args.limit, delay=args.delay, skip_failed=not args.stop_on_fail, ) finally: if close_browser: close_browser() print(f"Saved structured product data to {args.out}") if failures: failed_path = args.out.with_suffix(".failed.json") failed_path.write_text(json.dumps(failures, ensure_ascii=False, indent=2), encoding="utf-8") print(f"Failed products: {len(failures)}. Saved errors to {failed_path}") if __name__ == "__main__": main()