diff --git a/README.md b/README.md index 50c09b2..a2f51d9 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,9 @@ Agent pre manažment záverečných prác nad repozitárom `zpwiki`. -Projekt zatiaľ rieši základnú časť systému pre vyhľadávanie v Markdown súboroch zo školského repozitára záverečných prác. Cieľom je vytvoriť samostatnú službu, ktorá vie indexovať obsah `zpwiki`, vyhľadávať v ňom a neskôr sa napojí na OpenWebUI, RAG, znalostný graf a webhook synchronizáciu. +Projekt rieši základnú časť systému pre vyhľadávanie v Markdown súboroch zo školského repozitára záverečných prác. Cieľom je vytvoriť samostatnú službu, ktorá vie indexovať obsah `zpwiki`, vyhľadávať v ňom a neskôr sa napojí na OpenWebUI, RAG, znalostný graf a GraphRAG. + +Aktuálne je implementovaný prototyp, ktorý vie načítať Markdown dokumenty, spracovať ich metadata, rozdeliť ich na menšie časti, uložiť ich do SQLite databázy a sprístupniť vyhľadávanie cez FastAPI. ## Aktuálny stav @@ -13,12 +15,35 @@ Zatiaľ je implementované: 3. spracovanie položiek `taxonomy`, hlavne kategórie, tagy a autor, 4. rozdelenie dokumentov na menšie textové chunky, 5. vytvorenie SQLite indexu, -6. jednoduché fulltextové vyhľadávanie nad chunkmi, +6. jednoduché skórovacie fulltextové vyhľadávanie nad chunkmi, 7. rozlíšenie režimu vyhľadávania: 1. `person` pre mená osôb, napríklad `jan ptak`, 2. `topic` pre tematické dopyty, napríklad `rag agent` alebo `knowledge graph`, -8. FastAPI backend s endpointmi `/health` a `/search`, -9. automatická Swagger dokumentácia API. +8. FastAPI backend, +9. endpoint `GET /health`, +10. endpoint `POST /search`, +11. endpoint `POST /sync` pre manuálne spustenie reindexovania, +12. endpoint `POST /webhook/gitea` pre prijatie webhooku z Gitea, +13. overenie webhooku pomocou jednoduchého tokenu alebo HMAC podpisu, +14. automatická Swagger dokumentácia API, +15. Dockerfile a `docker-compose.yml`, +16. spustenie celého riešenia cez Docker, +17. volume mount pre priečinok `data`, +18. volume mount pre repozitár `zpwiki`. + +## Overený stav testovania + +Pri testovaní cez Docker bolo overené: + +1. FastAPI kontajner sa spustí, +2. endpoint `/health` vracia `200 OK`, +3. endpoint `/search` vracia `200 OK`, +4. endpoint `/sync` spustí reindexovanie a vracia `200 OK`, +5. endpoint `/webhook/gitea` prijme platný webhook a spustí reindexovanie, +6. Docker kontajner vidí repozitár `zpwiki` cez cestu `/zpwiki`, +7. systém načítal 114 dokumentov, +8. systém vytvoril 955 chunkov, +9. SQLite index bol vytvorený v `/app/data/zp_index.sqlite`. ## Štruktúra projektu @@ -28,17 +53,96 @@ dp-zp-agent/ │ ├── __init__.py │ └── main.py ├── scripts/ +│ ├── __init__.py +│ ├── common.py │ ├── scan_zpwiki.py │ ├── build_chunks.py │ ├── build_sqlite_index.py -│ ├── search_chunks.py -│ └── search_db.py +│ ├── rebuild_index.py +│ ├── search_db.py +│ └── search_utils.py ├── data/ +├── Dockerfile +├── docker-compose.yml ├── requirements.txt ├── .gitignore └── README.md ``` +Súbor `scripts/search_chunks.py` bol odstránený, pretože jeho funkcionalita bola duplicitná voči súboru `scripts/build_chunks.py`. + +## Popis hlavných súborov + +### `app/main.py` + +Obsahuje FastAPI aplikáciu a API endpointy: + +1. `GET /health`, +2. `POST /search`, +3. `POST /sync`, +4. `POST /webhook/gitea`. + +### `scripts/common.py` + +Obsahuje spoločné konštanty a pomocné funkcie: + +1. cesty k projektu, +2. cesta k `zpwiki`, +3. cesta k dátovým súborom, +4. čítanie a zápis JSON, +5. spracovanie YAML metadát, +6. normalizácia tagov a kategórií. + +### `scripts/scan_zpwiki.py` + +Prejde Markdown súbory v `zpwiki`, načíta metadata a uloží základné informácie do súboru: + +```text +data/documents.json +``` + +### `scripts/build_chunks.py` + +Rozdelí obsah Markdown dokumentov na menšie textové chunky a uloží ich do súboru: + +```text +data/chunks.json +``` + +### `scripts/build_sqlite_index.py` + +Vytvorí SQLite databázu: + +```text +data/zp_index.sqlite +``` + +Do databázy uloží dokumenty, chunky, tagy a kategórie. + +### `scripts/rebuild_index.py` + +Spustí celý proces naraz: + +1. načítanie dokumentov, +2. vytvorenie chunkov, +3. vytvorenie SQLite indexu. + +Voliteľne vie pred reindexovaním spustiť aj `git pull`. + +### `scripts/search_utils.py` + +Obsahuje spoločnú logiku vyhľadávania: + +1. normalizácia textu, +2. tokenizácia, +3. rozlíšenie režimu `person` a `topic`, +4. skórovanie výsledkov, +5. vyhľadávanie v SQLite databáze. + +### `scripts/search_db.py` + +Slúži na testovanie vyhľadávania z terminálu. + ## Príprava prostredia Projekt očakáva, že vedľa neho existuje naklonovaný repozitár `zpwiki`. @@ -47,10 +151,12 @@ Odporúčaná štruktúra: ```text ~/DP/ -├── zpwiki -└── zp-agent +├── zpwiki/ +└── zp-agent/ ``` +## Lokálne spustenie bez Dockeru + Vytvorenie a aktivácia Python prostredia: ```bash @@ -59,49 +165,29 @@ source .venv/bin/activate pip install -r requirements.txt ``` -## Vygenerovanie dát a indexu +Vygenerovanie dát a indexu: -Najprv sa načítajú dokumenty a metadata: +```bash +python scripts/rebuild_index.py +``` + +Alternatívne sa dá proces spustiť po krokoch: ```bash python scripts/scan_zpwiki.py -``` - -Potom sa dokumenty rozdelia na chunky: - -```bash python scripts/build_chunks.py -``` - -Nakoniec sa vytvorí SQLite index: - -```bash python scripts/build_sqlite_index.py ``` -## Testovanie vyhľadávania v termináli - -Vyhľadávanie podľa osoby: +Testovanie vyhľadávania v termináli: ```bash python scripts/search_db.py "jan ptak" -``` - -Vyhľadávanie podľa témy: - -```bash python scripts/search_db.py "rag agent" -``` - -Vyhľadávanie podľa znalostného grafu: - -```bash python scripts/search_db.py "knowledge graph" ``` -## Spustenie API - -FastAPI server sa spustí príkazom: +Spustenie API lokálne: ```bash uvicorn app.main:app --reload @@ -121,6 +207,97 @@ curl -X POST http://127.0.0.1:8000/search \ -d '{"query":"jan ptak","limit":5}' ``` +## Spustenie cez Docker + +Projekt je možné spustiť cez Docker Compose. Kontajner používa volume mount pre priečinok `data` a pre repozitár `zpwiki`. + +Build Docker image: + +```bash +docker compose build --no-cache +``` + +Spustenie kontajnera: + +```bash +docker compose up -d +``` + +Zobrazenie logov: + +```bash +docker compose logs -f zp-agent-api +``` + +Zastavenie kontajnera: + +```bash +docker compose down +``` + +## Reindexovanie cez Docker + +Celý proces indexovania je možné spustiť priamo v Docker kontajneri: + +```bash +docker compose run --rm zp-agent-api python scripts/rebuild_index.py +``` + +Tento príkaz vykoná: + +1. načítanie Markdown dokumentov, +2. extrakciu metadát, +3. rozdelenie dokumentov na chunky, +4. vytvorenie SQLite indexu. + +Po úspešnom behu vzniknú v priečinku `data` súbory: + +```text +documents.json +chunks.json +zp_index.sqlite +``` + +Kontrola dát: + +```bash +ls -lh data +``` + +## Testovanie vyhľadávania cez Docker + +```bash +docker compose run --rm zp-agent-api python scripts/search_db.py "rag agent" +``` + +```bash +docker compose run --rm zp-agent-api python scripts/search_db.py "jan ptak" +``` + +## Testovanie API cez Docker + +Health check: + +```bash +curl http://127.0.0.1:8000/health +``` + +Vyhľadávanie: + +```bash +curl -X POST http://127.0.0.1:8000/search \ + -H "Content-Type: application/json" \ + -d '{"query":"rag agent","limit":5}' +``` + +Manuálne reindexovanie cez API: + +```bash +curl -X POST http://127.0.0.1:8000/sync \ + -H "Content-Type: application/json" \ + -d '{"pull_git":false}' +``` + ## Swagger UI FastAPI automaticky generuje Swagger dokumentáciu API. @@ -131,70 +308,108 @@ Po spustení servera je dostupná na adrese: http://127.0.0.1:8000/docs ``` -V Swagger UI je možné testovať endpointy `/health` a `/search` priamo z prehliadača. +V Swagger UI je možné testovať endpointy: -## Čo ešte treba dorobiť +1. `/health`, +2. `/search`, +3. `/sync`, +4. `/webhook/gitea`. -### 1. Dockerizácia aplikácie +## Webhook pre Gitea -Treba vytvoriť: - -1. `Dockerfile`, -2. `docker-compose.yml`, -3. jednoduchý návod na spustenie cez Docker, -4. volume alebo mount pre dáta a SQLite databázu. - -Cieľ je, aby sa služba dala spustiť jedným príkazom: - -```bash -docker compose up --build -``` - -### 2. Upratanie kódu do modulov - -Aktuálne je veľká časť logiky priamo v `app/main.py`. Neskôr treba kód rozdeliť napríklad takto: - -```text -app/ -├── main.py -├── search.py -├── database.py -├── schemas.py -├── sync.py -└── webhook.py -``` - -Cieľ je, aby API, vyhľadávanie, databáza a synchronizácia neboli v jednom veľkom súbore. - -### 3. Synchronizácia so `zpwiki` - -Treba pridať mechanizmus, ktorý bude vedieť aktualizovať dáta zo školského repozitára. - -Plánované časti: - -1. skript pre `git pull`, -2. zistenie aktuálneho commitu, -3. detekcia zmenených Markdown súborov, -4. reindexovanie zmenených dokumentov, -5. uloženie stavu synchronizácie do databázy. - -### 4. Webhook endpoint pre Gitea - -Treba vytvoriť endpoint napríklad: +Aplikácia obsahuje endpoint: ```text POST /webhook/gitea ``` -Tento endpoint má: +Webhook slúži na spustenie reindexovania po zmene v repozitári. -1. prijať webhook z Gitea, -2. overiť secret alebo podpis webhooku, -3. spustiť synchronizáciu repozitára, -4. spustiť reindexovanie zmenených súborov, -5. zapísať výsledok do logu alebo tabuľky synchronizácie. +Endpoint podporuje dva spôsoby overenia: -### 5. OpenWebUI integrácia +1. jednoduchý token cez header `X-Gitea-Token`, +2. HMAC podpis cez header `X-Gitea-Signature`. + +Hodnota tajného kľúča sa nastavuje cez environment premennú: + +```text +WEBHOOK_SECRET +``` + +V `docker-compose.yml` je počas vývoja nastavené: + +```text +WEBHOOK_SECRET=dev-secret +``` + +### Test webhooku cez token + +```bash +curl -X POST http://127.0.0.1:8000/webhook/gitea \ + -H "Content-Type: application/json" \ + -H "X-Gitea-Event: push" \ + -H "X-Gitea-Token: dev-secret" \ + -d '{"repository":{"full_name":"KEMT/zpwiki"}}' +``` + +### Test webhooku cez HMAC podpis + +```bash +BODY='{"repository":{"full_name":"KEMT/zpwiki"}}' + +SIG=$(printf '%s' "$BODY" | openssl dgst -sha256 -hmac "dev-secret" -hex | sed 's/^.* //') + +curl -X POST http://127.0.0.1:8000/webhook/gitea \ + -H "Content-Type: application/json" \ + -H "X-Gitea-Event: push" \ + -H "X-Gitea-Signature: sha256=$SIG" \ + --data-raw "$BODY" +``` + +### Test neplatného tokenu + +Pri neplatnom tokene má endpoint vrátiť `401 Unauthorized`. + +```bash +curl -i -X POST http://127.0.0.1:8000/webhook/gitea \ + -H "Content-Type: application/json" \ + -H "X-Gitea-Event: push" \ + -H "X-Gitea-Token: zly-token" \ + -d '{"repository":{"full_name":"KEMT/zpwiki"}}' +``` + +## Kompletný test cez Docker + +```bash +cd ~/DP/zp-agent + +docker compose down +docker compose build --no-cache + +docker compose run --rm zp-agent-api ls /zpwiki/pages | head + +docker compose run --rm zp-agent-api python scripts/rebuild_index.py + +ls -lh data + +docker compose run --rm zp-agent-api python scripts/search_db.py "rag agent" + +docker compose up -d + +curl http://127.0.0.1:8000/health + +curl -X POST http://127.0.0.1:8000/search \ + -H "Content-Type: application/json" \ + -d '{"query":"rag agent","limit":5}' + +curl -X POST http://127.0.0.1:8000/sync \ + -H "Content-Type: application/json" \ + -d '{"pull_git":false}' +``` + +## Čo ešte treba dorobiť + +### 1. OpenWebUI integrácia Treba napojiť API na OpenWebUI. @@ -207,7 +422,7 @@ Možné riešenia: Cieľ je, aby používateľ mohol v OpenWebUI položiť otázku a agent použil vyhľadávanie nad `zpwiki`. -### 6. Embeddingy a vektorové vyhľadávanie +### 2. Embeddingy a vektorové vyhľadávanie Aktuálne vyhľadávanie je fulltextové a skórovacie. Ďalší krok je pridať embeddingy. @@ -226,7 +441,7 @@ Možné databázy: 3. ChromaDB, 4. FAISS ako jednoduchý lokálny prototyp. -### 7. RAG odpovede s citáciami +### 3. RAG odpovede s citáciami Treba doplniť generovanie odpovede pomocou jazykového modelu. @@ -240,7 +455,7 @@ Postup: Cieľ je, aby agent nehalucinoval a vedel ukázať, z ktorých dokumentov odpovedal. -### 8. Znalostný graf +### 4. Znalostný graf Treba vytvoriť štruktúrovaný graf nad dátami zo `zpwiki`. @@ -262,7 +477,7 @@ Základné vzťahy: 5. práca je podobná inej práci, 6. práca patrí do roka alebo obdobia. -### 9. GraphRAG +### 5. GraphRAG Treba prepojiť RAG a znalostný graf. @@ -274,7 +489,19 @@ GraphRAG časť má umožniť: 4. analýzu tém podľa tagov, rokov a kategórií, 5. kombináciu textového, vektorového a grafového vyhľadávania. -### 10. Vyhodnotenie systému +### 6. Čiastočné reindexovanie + +Aktuálne endpoint `/sync` a webhook spúšťajú celé reindexovanie. Neskôr treba doplniť efektívnejší spôsob synchronizácie. + +Plánované časti: + +1. zistenie aktuálneho commitu, +2. detekcia zmenených Markdown súborov, +3. reindexovanie iba zmenených dokumentov, +4. uloženie stavu synchronizácie do databázy, +5. logovanie výsledku synchronizácie. + +### 7. Vyhodnotenie systému Treba pripraviť testovaciu sadu otázok a porovnať viacero prístupov. @@ -303,7 +530,7 @@ Sledované vlastnosti: 5. čas odpovede, 6. čas reindexovania po zmene v Gite. -### 11. Dokumentácia do diplomovej práce +### 8. Dokumentácia do diplomovej práce Treba priebežne písať: @@ -320,4 +547,4 @@ Treba priebežne písať: ## Najbližší praktický krok -Najbližšie treba spraviť Docker nasadenie aktuálneho FastAPI prototypu. +Najbližšie treba pokračovať integráciou s OpenWebUI a prípravou RAG odpovedí s citáciami. Potom bude možné porovnať jednoduché fulltextové vyhľadávanie s RAG a neskôr s GraphRAG. diff --git a/app/main.py b/app/main.py index 19e1ef5..d60b52a 100644 --- a/app/main.py +++ b/app/main.py @@ -1,61 +1,34 @@ -from pathlib import Path +from __future__ import annotations + import hashlib import hmac import json import os -import re -import sqlite3 -import subprocess import sys -import time -import unicodedata -from collections import Counter +from pathlib import Path from fastapi import FastAPI, Header, HTTPException, Request from pydantic import BaseModel, Field -DB_FILE = Path("data/zp_index.sqlite") -ZPWIKI_ROOT = Path(os.getenv("ZPWIKI_ROOT", "../zpwiki")) +PROJECT_ROOT = Path(__file__).resolve().parents[1] + +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) + + +from scripts.common import DB_FILE, ZPWIKI_ROOT +from scripts.rebuild_index import rebuild_index +from scripts.search_utils import search_database + + WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET", "dev-secret") -TECHNICAL_TERMS = { - "rag", - "agent", - "graph", - "knowledge", - "chatbot", - "nlp", - "llm", - "lm", - "openwebui", - "docker", - "webhook", - "database", - "db", - "neo4j", - "python", - "search", - "retrieval", - "generation", - "embedding", - "vector", - "vectors", - "langchain", - "graphrag", - "qa", - "question", - "answer", - "cloud", - "api", -} - - app = FastAPI( title="ZP Agent API", description="API pre vyhľadávanie v repozitári záverečných prác zpwiki.", - version="0.3.0", + version="0.4.0", ) @@ -71,277 +44,6 @@ class SyncRequest(BaseModel): ) -def normalize_text(text: str) -> str: - text = text.lower() - text = text.replace("_", " ") - text = text.replace("/", " ") - text = text.replace("-", " ") - - text = unicodedata.normalize("NFKD", text) - text = "".join(ch for ch in text if not unicodedata.combining(ch)) - - text = re.sub(r"[^a-z0-9]+", " ", text) - return text.strip() - - -def tokenize(text: str) -> list[str]: - text = normalize_text(text) - return [word for word in text.split() if len(word) >= 2] - - -def detect_search_mode(query_tokens: list[str]) -> str: - if not query_tokens: - return "topic" - - has_technical_term = any(token in TECHNICAL_TERMS for token in query_tokens) - - if len(query_tokens) == 2 and not has_technical_term: - return "person" - - return "topic" - - -def score_tokens(query_tokens: list[str], field_tokens: list[str], weight: int) -> int: - counts = Counter(field_tokens) - score = 0 - - for token in query_tokens: - score += counts.get(token, 0) * weight - - return score - - -def contains_all_tokens(query_tokens: list[str], field_tokens: list[str]) -> bool: - return all(token in field_tokens for token in query_tokens) - - -def get_tags(conn: sqlite3.Connection, chunk_id: str) -> list[str]: - rows = conn.execute( - "SELECT tag FROM chunk_tags WHERE chunk_id = ?", - (chunk_id,), - ).fetchall() - - return [row[0] for row in rows] - - -def get_categories(conn: sqlite3.Connection, chunk_id: str) -> list[str]: - rows = conn.execute( - "SELECT category FROM chunk_categories WHERE chunk_id = ?", - (chunk_id,), - ).fetchall() - - return [row[0] for row in rows] - - -def person_match(query_tokens: list[str], item: dict) -> bool: - title_tokens = tokenize(item.get("title") or "") - path_tokens = tokenize(item.get("document_path") or "") - author_tokens = tokenize(item.get("author") or "") - text_tokens = tokenize(item.get("text") or "") - - if contains_all_tokens(query_tokens, title_tokens): - return True - - if contains_all_tokens(query_tokens, path_tokens): - return True - - if contains_all_tokens(query_tokens, author_tokens): - return True - - if contains_all_tokens(query_tokens, text_tokens): - return True - - return False - - -def score_item(query: str, query_tokens: list[str], item: dict, mode: str) -> int: - title = item.get("title") or "" - path = item.get("document_path") or "" - author = item.get("author") or "" - text = item.get("text") or "" - tags = item.get("tags") or [] - categories = item.get("categories") or [] - - title_tokens = tokenize(title) - path_tokens = tokenize(path) - author_tokens = tokenize(author) - text_tokens = tokenize(text) - tag_tokens = tokenize(" ".join(tags)) - category_tokens = tokenize(" ".join(categories)) - - score = 0 - - if mode == "person": - score += score_tokens(query_tokens, title_tokens, 30) - score += score_tokens(query_tokens, path_tokens, 30) - score += score_tokens(query_tokens, author_tokens, 15) - score += score_tokens(query_tokens, text_tokens, 2) - - if contains_all_tokens(query_tokens, title_tokens): - score += 100 - - if contains_all_tokens(query_tokens, path_tokens): - score += 100 - - if contains_all_tokens(query_tokens, author_tokens): - score += 60 - - return score - - score += score_tokens(query_tokens, title_tokens, 12) - score += score_tokens(query_tokens, path_tokens, 12) - score += score_tokens(query_tokens, tag_tokens, 10) - score += score_tokens(query_tokens, category_tokens, 6) - score += score_tokens(query_tokens, author_tokens, 3) - score += score_tokens(query_tokens, text_tokens, 2) - - normalized_query = normalize_text(query) - normalized_title = normalize_text(title) - normalized_path = normalize_text(path) - - if normalized_query and normalized_query in normalized_title: - score += 30 - - if normalized_query and normalized_query in normalized_path: - score += 30 - - matched_title_tokens = sum(1 for token in query_tokens if token in title_tokens) - matched_path_tokens = sum(1 for token in query_tokens if token in path_tokens) - - if query_tokens and matched_title_tokens == len(query_tokens): - score += 25 - - if query_tokens and matched_path_tokens == len(query_tokens): - score += 25 - - return score - - -def make_source_url(document_path: str) -> str: - clean_path = document_path.replace("pages/", "").replace("/README.md", "") - return f"https://zp.kemt.fei.tuke.sk/{clean_path}" - - -def search_database(query: str, limit: int) -> tuple[str, list[dict]]: - if not DB_FILE.exists(): - raise FileNotFoundError(f"Databáza neexistuje: {DB_FILE}") - - query_tokens = tokenize(query) - mode = detect_search_mode(query_tokens) - - conn = sqlite3.connect(DB_FILE) - - rows = conn.execute(""" - SELECT chunk_id, document_path, title, author, chunk_index, text, text_length - FROM chunks - """).fetchall() - - results = [] - - for row in rows: - chunk_id, document_path, title, author, chunk_index, text, text_length = row - - item = { - "chunk_id": chunk_id, - "document_path": document_path, - "title": title, - "author": author, - "chunk_index": chunk_index, - "text": text, - "text_length": text_length, - "tags": get_tags(conn, chunk_id), - "categories": get_categories(conn, chunk_id), - } - - if mode == "person" and not person_match(query_tokens, item): - continue - - score = score_item(query, query_tokens, item, mode) - - if score > 0: - item["score"] = score - item["source_url"] = make_source_url(document_path) - results.append(item) - - conn.close() - - results.sort(key=lambda item: item["score"], reverse=True) - - return mode, results[:limit] - - -def run_command(command: list[str], cwd: Path | None = None) -> str: - result = subprocess.run( - command, - cwd=cwd, - text=True, - capture_output=True, - ) - - output = "" - - if result.stdout: - output += result.stdout - - if result.stderr: - output += result.stderr - - if result.returncode != 0: - raise RuntimeError(output.strip()) - - return output.strip() - - -def get_index_counts() -> dict: - if not DB_FILE.exists(): - return { - "documents": 0, - "chunks": 0, - "tags": 0, - "categories": 0, - } - - conn = sqlite3.connect(DB_FILE) - cursor = conn.cursor() - - counts = { - "documents": cursor.execute("SELECT COUNT(*) FROM documents").fetchone()[0], - "chunks": cursor.execute("SELECT COUNT(*) FROM chunks").fetchone()[0], - "tags": cursor.execute("SELECT COUNT(*) FROM chunk_tags").fetchone()[0], - "categories": cursor.execute("SELECT COUNT(*) FROM chunk_categories").fetchone()[0], - } - - conn.close() - return counts - - -def rebuild_index(pull_git: bool = False) -> dict: - start = time.time() - logs = [] - - if pull_git: - if not ZPWIKI_ROOT.exists(): - raise RuntimeError(f"ZPWIKI_ROOT neexistuje: {ZPWIKI_ROOT}") - - if not (ZPWIKI_ROOT / ".git").exists(): - raise RuntimeError(f"Nie je to git repozitár: {ZPWIKI_ROOT}") - - logs.append(run_command(["git", "pull"], cwd=ZPWIKI_ROOT)) - - logs.append(run_command([sys.executable, "scripts/scan_zpwiki.py"])) - logs.append(run_command([sys.executable, "scripts/build_chunks.py"])) - logs.append(run_command([sys.executable, "scripts/build_sqlite_index.py"])) - - counts = get_index_counts() - duration = round(time.time() - start, 2) - - return { - "duration_seconds": duration, - "counts": counts, - "logs": logs, - } - - def verify_gitea_signature(raw_body: bytes, signature: str | None) -> bool: if not signature: return False @@ -368,7 +70,7 @@ def verify_simple_token(token: str | None) -> bool: @app.get("/health") -def health(): +def health() -> dict: return { "status": "ok", "database_exists": DB_FILE.exists(), @@ -380,9 +82,13 @@ def health(): @app.post("/search") -def search(request: SearchRequest): +def search(request: SearchRequest) -> dict: try: - mode, results = search_database(request.query, request.limit) + mode, results = search_database( + DB_FILE, + request.query, + request.limit, + ) except FileNotFoundError as error: raise HTTPException(status_code=500, detail=str(error)) from error @@ -395,7 +101,7 @@ def search(request: SearchRequest): @app.post("/sync") -def sync(request: SyncRequest): +def sync(request: SyncRequest) -> dict: try: result = rebuild_index(pull_git=request.pull_git) except RuntimeError as error: @@ -415,7 +121,7 @@ async def gitea_webhook( x_gitea_event: str | None = Header(default=None, alias="X-Gitea-Event"), x_gitea_signature: str | None = Header(default=None, alias="X-Gitea-Signature"), x_gitea_token: str | None = Header(default=None, alias="X-Gitea-Token"), -): +) -> dict: raw_body = await request.body() signature_ok = verify_gitea_signature(raw_body, x_gitea_signature) diff --git a/scripts/__init__.py b/scripts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scripts/build_chunks.py b/scripts/build_chunks.py index 6380c38..cd01286 100644 --- a/scripts/build_chunks.py +++ b/scripts/build_chunks.py @@ -1,52 +1,29 @@ -from pathlib import Path -import json +from __future__ import annotations + import re -import frontmatter +import sys +from pathlib import Path + from rich import print -ZPWIKI_ROOT = Path("../zpwiki") -PAGES_ROOT = ZPWIKI_ROOT / "pages" -OUTPUT_FILE = Path("data/chunks.json") +PROJECT_ROOT = Path(__file__).resolve().parents[1] + +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) + + +from scripts.common import CHUNKS_FILE, PAGES_ROOT, ZPWIKI_ROOT, load_zpwiki_page, write_json + MAX_CHARS = 1200 OVERLAP_CHARS = 200 -def json_safe(value): - if value is None: - return None - - if isinstance(value, (str, int, float, bool)): - return value - - if isinstance(value, list): - return [json_safe(item) for item in value] - - if isinstance(value, dict): - return {str(key): json_safe(val) for key, val in value.items()} - - return str(value) - - -def normalize_list(value): - if value is None: - return [] - - if isinstance(value, list): - return [str(item).strip() for item in value if str(item).strip()] - - if isinstance(value, str): - return [item.strip() for item in value.split(",") if item.strip()] - - return [str(value)] - - def clean_markdown(text: str) -> str: text = text.replace("\r\n", "\n") text = re.sub(r"\n{3,}", "\n\n", text) - text = text.strip() - return text + return text.strip() def split_by_headings(text: str) -> list[str]: @@ -54,7 +31,31 @@ def split_by_headings(text: str) -> list[str]: return [part.strip() for part in parts if part.strip()] -def split_long_text(text: str, max_chars: int = MAX_CHARS, overlap: int = OVERLAP_CHARS) -> list[str]: +def find_split_position(text: str, max_chars: int) -> int: + """Nájde lepšie miesto delenia, aby chunk nekončil úplne náhodne.""" + if len(text) <= max_chars: + return len(text) + + search_area = text[:max_chars] + min_position = int(max_chars * 0.6) + + for separator in ("\n\n", "\n", ". ", " "): + position = search_area.rfind(separator) + + if position >= min_position: + return position + len(separator) + + return max_chars + + +def split_long_text( + text: str, + max_chars: int = MAX_CHARS, + overlap: int = OVERLAP_CHARS, +) -> list[str]: + if max_chars <= overlap: + raise ValueError("max_chars musí byť väčšie ako overlap") + if len(text) <= max_chars: return [text] @@ -62,122 +63,86 @@ def split_long_text(text: str, max_chars: int = MAX_CHARS, overlap: int = OVERLA start = 0 while start < len(text): - end = start + max_chars - chunk = text[start:end].strip() + remaining = text[start:] + + if len(remaining) <= max_chars: + chunk = remaining.strip() + + if chunk: + chunks.append(chunk) + + break + + split_at = find_split_position(remaining, max_chars) + chunk = remaining[:split_at].strip() if chunk: chunks.append(chunk) - if end >= len(text): - break - - start = max(0, end - overlap) + start += max(1, split_at - overlap) return chunks def chunk_markdown(text: str) -> list[str]: + """Rozdelí Markdown najprv podľa nadpisov a potom podľa dĺžky.""" text = clean_markdown(text) if not text: return [] - heading_parts = split_by_headings(text) - chunks = [] - for part in heading_parts: - if len(part) <= MAX_CHARS: - chunks.append(part) - else: - chunks.extend(split_long_text(part)) + for part in split_by_headings(text): + chunks.extend(split_long_text(part)) return chunks -def extract_document(file_path: Path) -> dict: - post = frontmatter.load(file_path) - - metadata = { - key: json_safe(value) - for key, value in post.metadata.items() - } - - taxonomy = metadata.get("taxonomy") or {} - - categories = normalize_list( - metadata.get("category") - or taxonomy.get("category") - ) - - tags = normalize_list( - metadata.get("tag") - or metadata.get("tags") - or taxonomy.get("tag") - or taxonomy.get("tags") - ) - - author = ( - metadata.get("author") - or taxonomy.get("author") - ) - - relative_path = file_path.relative_to(ZPWIKI_ROOT) - - return { - "path": str(relative_path), - "title": metadata.get("title"), - "categories": categories, - "tags": tags, - "published": metadata.get("published"), - "author": author, - "content": post.content.strip(), - "metadata": metadata, - } - - -def main(): +def build_chunks() -> list[dict]: if not PAGES_ROOT.exists(): raise SystemExit(f"Neexistuje priečinok: {PAGES_ROOT}") - markdown_files = sorted(PAGES_ROOT.glob("**/README.md")) - all_chunks = [] document_count = 0 - for file_path in markdown_files: - document = extract_document(file_path) - chunks = chunk_markdown(document["content"]) - + for file_path in sorted(PAGES_ROOT.glob("**/README.md")): + document = load_zpwiki_page(file_path) document_count += 1 - for index, chunk_text in enumerate(chunks): - all_chunks.append({ - "chunk_id": f"{document['path']}::chunk-{index}", - "document_path": document["path"], - "title": document["title"], - "categories": document["categories"], - "tags": document["tags"], - "author": document["author"], - "published": document["published"], - "chunk_index": index, - "text": chunk_text, - "text_length": len(chunk_text), - }) + for index, text in enumerate(chunk_markdown(document["content"])): + all_chunks.append( + { + "chunk_id": f"{document['path']}::chunk-{index}", + "document_path": document["path"], + "title": document["title"], + "categories": document["categories"], + "tags": document["tags"], + "author": document["author"], + "published": document["published"], + "chunk_index": index, + "text": text, + "text_length": len(text), + } + ) - OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True) - - with OUTPUT_FILE.open("w", encoding="utf-8") as file: - json.dump(all_chunks, file, ensure_ascii=False, indent=2) + write_json(CHUNKS_FILE, all_chunks) + print(f"[green]ZPWIKI_ROOT:[/green] {ZPWIKI_ROOT}") print(f"[green]Dokumentov:[/green] {document_count}") print(f"[green]Chunkov:[/green] {len(all_chunks)}") - print(f"[green]Výstup uložený do:[/green] {OUTPUT_FILE}") + print(f"[green]Výstup uložený do:[/green] {CHUNKS_FILE}") if all_chunks: print("\n[bold]Ukážka prvého chunku:[/bold]") print(all_chunks[0]) + return all_chunks + + +def main() -> None: + build_chunks() + if __name__ == "__main__": main() diff --git a/scripts/build_sqlite_index.py b/scripts/build_sqlite_index.py index 4cc78c2..b39fba5 100644 --- a/scripts/build_sqlite_index.py +++ b/scripts/build_sqlite_index.py @@ -1,23 +1,34 @@ -from pathlib import Path +from __future__ import annotations + import json import sqlite3 +import sys +from pathlib import Path + from rich import print -DOCUMENTS_FILE = Path("data/documents.json") -CHUNKS_FILE = Path("data/chunks.json") -DB_FILE = Path("data/zp_index.sqlite") +PROJECT_ROOT = Path(__file__).resolve().parents[1] + +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) -def create_tables(conn: sqlite3.Connection): +from scripts.common import CHUNKS_FILE, DB_FILE, DOCUMENTS_FILE, read_json + + +def create_tables(conn: sqlite3.Connection) -> None: cursor = conn.cursor() - cursor.execute("DROP TABLE IF EXISTS chunk_tags") - cursor.execute("DROP TABLE IF EXISTS chunk_categories") - cursor.execute("DROP TABLE IF EXISTS chunks") - cursor.execute("DROP TABLE IF EXISTS documents") + cursor.executescript( + """ + PRAGMA foreign_keys = ON; + + DROP TABLE IF EXISTS chunk_tags; + DROP TABLE IF EXISTS chunk_categories; + DROP TABLE IF EXISTS chunks; + DROP TABLE IF EXISTS documents; - cursor.execute(""" CREATE TABLE documents ( id INTEGER PRIMARY KEY AUTOINCREMENT, path TEXT UNIQUE NOT NULL, @@ -26,10 +37,8 @@ def create_tables(conn: sqlite3.Connection): published INTEGER, content_length INTEGER, metadata_json TEXT - ) - """) + ); - cursor.execute(""" CREATE TABLE chunks ( id INTEGER PRIMARY KEY AUTOINCREMENT, chunk_id TEXT UNIQUE NOT NULL, @@ -40,127 +49,160 @@ def create_tables(conn: sqlite3.Connection): text TEXT NOT NULL, text_length INTEGER, FOREIGN KEY(document_path) REFERENCES documents(path) - ) - """) + ); - cursor.execute(""" CREATE TABLE chunk_tags ( chunk_id TEXT NOT NULL, - tag TEXT NOT NULL - ) - """) + tag TEXT NOT NULL, + UNIQUE(chunk_id, tag), + FOREIGN KEY(chunk_id) REFERENCES chunks(chunk_id) + ); - cursor.execute(""" CREATE TABLE chunk_categories ( chunk_id TEXT NOT NULL, - category TEXT NOT NULL - ) - """) + category TEXT NOT NULL, + UNIQUE(chunk_id, category), + FOREIGN KEY(chunk_id) REFERENCES chunks(chunk_id) + ); - cursor.execute("CREATE INDEX idx_documents_path ON documents(path)") - cursor.execute("CREATE INDEX idx_chunks_document_path ON chunks(document_path)") - cursor.execute("CREATE INDEX idx_chunks_title ON chunks(title)") - cursor.execute("CREATE INDEX idx_chunk_tags_tag ON chunk_tags(tag)") - cursor.execute("CREATE INDEX idx_chunk_categories_category ON chunk_categories(category)") + CREATE INDEX idx_documents_path ON documents(path); + CREATE INDEX idx_chunks_document_path ON chunks(document_path); + CREATE INDEX idx_chunks_title ON chunks(title); + CREATE INDEX idx_chunk_tags_tag ON chunk_tags(tag); + CREATE INDEX idx_chunk_categories_category ON chunk_categories(category); + """ + ) conn.commit() -def load_json(path: Path): - if not path.exists(): - raise SystemExit(f"Súbor neexistuje: {path}") - - with path.open("r", encoding="utf-8") as file: - return json.load(file) - - -def insert_documents(conn: sqlite3.Connection, documents: list[dict]): - cursor = conn.cursor() - - for doc in documents: - cursor.execute(""" - INSERT INTO documents ( - path, title, author, published, content_length, metadata_json - ) - VALUES (?, ?, ?, ?, ?, ?) - """, ( +def insert_documents(conn: sqlite3.Connection, documents: list[dict]) -> None: + rows = [ + ( doc.get("path"), doc.get("title"), doc.get("author"), 1 if doc.get("published") else 0, doc.get("content_length"), json.dumps(doc.get("metadata") or {}, ensure_ascii=False), - )) + ) + for doc in documents + ] + + conn.executemany( + """ + INSERT INTO documents ( + path, + title, + author, + published, + content_length, + metadata_json + ) + VALUES (?, ?, ?, ?, ?, ?) + """, + rows, + ) conn.commit() -def insert_chunks(conn: sqlite3.Connection, chunks: list[dict]): - cursor = conn.cursor() +def insert_chunks(conn: sqlite3.Connection, chunks: list[dict]) -> None: + chunk_rows = [] + tag_rows = [] + category_rows = [] for chunk in chunks: - cursor.execute(""" - INSERT INTO chunks ( - chunk_id, document_path, title, author, chunk_index, text, text_length + chunk_id = chunk.get("chunk_id") + + chunk_rows.append( + ( + chunk_id, + chunk.get("document_path"), + chunk.get("title"), + chunk.get("author"), + chunk.get("chunk_index"), + chunk.get("text"), + chunk.get("text_length"), ) - VALUES (?, ?, ?, ?, ?, ?, ?) - """, ( - chunk.get("chunk_id"), - chunk.get("document_path"), - chunk.get("title"), - chunk.get("author"), - chunk.get("chunk_index"), - chunk.get("text"), - chunk.get("text_length"), - )) + ) for tag in chunk.get("tags") or []: - cursor.execute(""" - INSERT INTO chunk_tags (chunk_id, tag) - VALUES (?, ?) - """, ( - chunk.get("chunk_id"), - tag, - )) + tag_rows.append((chunk_id, tag)) for category in chunk.get("categories") or []: - cursor.execute(""" - INSERT INTO chunk_categories (chunk_id, category) - VALUES (?, ?) - """, ( - chunk.get("chunk_id"), - category, - )) + category_rows.append((chunk_id, category)) + + conn.executemany( + """ + INSERT INTO chunks ( + chunk_id, + document_path, + title, + author, + chunk_index, + text, + text_length + ) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + chunk_rows, + ) + + conn.executemany( + """ + INSERT OR IGNORE INTO chunk_tags (chunk_id, tag) + VALUES (?, ?) + """, + tag_rows, + ) + + conn.executemany( + """ + INSERT OR IGNORE INTO chunk_categories (chunk_id, category) + VALUES (?, ?) + """, + category_rows, + ) conn.commit() -def main(): - documents = load_json(DOCUMENTS_FILE) - chunks = load_json(CHUNKS_FILE) +def get_counts(conn: sqlite3.Connection) -> dict[str, int]: + cursor = conn.cursor() + + return { + "documents": cursor.execute("SELECT COUNT(*) FROM documents").fetchone()[0], + "chunks": cursor.execute("SELECT COUNT(*) FROM chunks").fetchone()[0], + "tags": cursor.execute("SELECT COUNT(*) FROM chunk_tags").fetchone()[0], + "categories": cursor.execute("SELECT COUNT(*) FROM chunk_categories").fetchone()[0], + } + + +def build_database() -> dict[str, int]: + documents = read_json(DOCUMENTS_FILE) + chunks = read_json(CHUNKS_FILE) DB_FILE.parent.mkdir(parents=True, exist_ok=True) - conn = sqlite3.connect(DB_FILE) - - create_tables(conn) - insert_documents(conn, documents) - insert_chunks(conn, chunks) - - cursor = conn.cursor() - - document_count = cursor.execute("SELECT COUNT(*) FROM documents").fetchone()[0] - chunk_count = cursor.execute("SELECT COUNT(*) FROM chunks").fetchone()[0] - tag_count = cursor.execute("SELECT COUNT(*) FROM chunk_tags").fetchone()[0] - category_count = cursor.execute("SELECT COUNT(*) FROM chunk_categories").fetchone()[0] - - conn.close() + with sqlite3.connect(DB_FILE) as conn: + conn.execute("PRAGMA foreign_keys = ON") + create_tables(conn) + insert_documents(conn, documents) + insert_chunks(conn, chunks) + counts = get_counts(conn) print(f"[green]SQLite index vytvorený:[/green] {DB_FILE}") - print(f"Dokumentov: {document_count}") - print(f"Chunkov: {chunk_count}") - print(f"Tag záznamov: {tag_count}") - print(f"Kategória záznamov: {category_count}") + print(f"Dokumentov: {counts['documents']}") + print(f"Chunkov: {counts['chunks']}") + print(f"Tag záznamov: {counts['tags']}") + print(f"Kategória záznamov: {counts['categories']}") + + return counts + + +def main() -> None: + build_database() if __name__ == "__main__": diff --git a/scripts/common.py b/scripts/common.py new file mode 100644 index 0000000..14cc838 --- /dev/null +++ b/scripts/common.py @@ -0,0 +1,105 @@ +from __future__ import annotations + +import json +import os +from pathlib import Path +from typing import Any + +import frontmatter + + +PROJECT_ROOT = Path(__file__).resolve().parents[1] +ZPWIKI_ROOT = Path(os.getenv("ZPWIKI_ROOT", str(PROJECT_ROOT.parent / "zpwiki"))).resolve() +PAGES_ROOT = ZPWIKI_ROOT / "pages" + +DATA_DIR = PROJECT_ROOT / "data" +DOCUMENTS_FILE = DATA_DIR / "documents.json" +CHUNKS_FILE = DATA_DIR / "chunks.json" +DB_FILE = DATA_DIR / "zp_index.sqlite" + + +def json_safe(value: Any) -> Any: + """Prevedie metadata do formátu vhodného pre JSON.""" + if value is None or isinstance(value, (str, int, float, bool)): + return value + + if isinstance(value, list): + return [json_safe(item) for item in value] + + if isinstance(value, dict): + return {str(key): json_safe(item) for key, item in value.items()} + + return str(value) + + +def normalize_list(value: Any) -> list[str]: + """Zjednotí hodnotu na čistý zoznam bez duplicít.""" + if value is None: + return [] + + if isinstance(value, list): + raw_items = [str(item).strip() for item in value] + elif isinstance(value, str): + raw_items = [item.strip() for item in value.split(",")] + else: + raw_items = [str(value).strip()] + + items = [] + seen = set() + + for item in raw_items: + if item and item not in seen: + items.append(item) + seen.add(item) + + return items + + +def read_json(path: Path) -> Any: + if not path.exists(): + raise FileNotFoundError(f"Súbor neexistuje: {path}") + + with path.open("r", encoding="utf-8") as file: + return json.load(file) + + +def write_json(path: Path, data: Any) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + + with path.open("w", encoding="utf-8") as file: + json.dump(data, file, ensure_ascii=False, indent=2) + + +def load_zpwiki_page(file_path: Path) -> dict[str, Any]: + post = frontmatter.load(file_path) + + metadata = { + key: json_safe(value) + for key, value in post.metadata.items() + } + + taxonomy = metadata.get("taxonomy") or {} + + categories = normalize_list( + metadata.get("category") + or taxonomy.get("category") + ) + + tags = normalize_list( + metadata.get("tag") + or metadata.get("tags") + or taxonomy.get("tag") + or taxonomy.get("tags") + ) + + return { + "path": str(file_path.relative_to(ZPWIKI_ROOT)), + "title": metadata.get("title"), + "categories": categories, + "tags": tags, + "published": metadata.get("published"), + "author": metadata.get("author") or taxonomy.get("author"), + "taxonomy": taxonomy, + "metadata": metadata, + "content": post.content.strip(), + } diff --git a/scripts/rebuild_index.py b/scripts/rebuild_index.py index e49e58a..a6f0fec 100644 --- a/scripts/rebuild_index.py +++ b/scripts/rebuild_index.py @@ -1,23 +1,36 @@ -from pathlib import Path +from __future__ import annotations + import argparse -import os -import sqlite3 import subprocess import sys import time +from pathlib import Path + from rich import print -ZPWIKI_ROOT = Path(os.getenv("ZPWIKI_ROOT", "../zpwiki")) -DB_FILE = Path("data/zp_index.sqlite") +PROJECT_ROOT = Path(__file__).resolve().parents[1] + +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) -def run_command(command: list[str], cwd: Path | None = None) -> None: - print(f"[cyan]Spúšťam:[/cyan] {' '.join(command)}") +from scripts.build_chunks import build_chunks +from scripts.build_sqlite_index import build_database +from scripts.common import DB_FILE, ZPWIKI_ROOT +from scripts.scan_zpwiki import scan_pages + + +def git_pull(repo_path: Path = ZPWIKI_ROOT) -> None: + if not repo_path.exists(): + raise RuntimeError(f"ZPWIKI_ROOT neexistuje: {repo_path}") + + if not (repo_path / ".git").exists(): + raise RuntimeError(f"Nie je to git repozitár: {repo_path}") result = subprocess.run( - command, - cwd=cwd, + ["git", "pull"], + cwd=repo_path, text=True, capture_output=True, ) @@ -29,52 +42,37 @@ def run_command(command: list[str], cwd: Path | None = None) -> None: print(result.stderr.strip()) if result.returncode != 0: - raise RuntimeError( - f"Príkaz zlyhal: {' '.join(command)}" - ) + raise RuntimeError("Git pull zlyhal") -def git_pull() -> None: - if not ZPWIKI_ROOT.exists(): - raise RuntimeError(f"ZPWIKI_ROOT neexistuje: {ZPWIKI_ROOT}") +def rebuild_index(pull_git: bool = False) -> dict: + start = time.time() - if not (ZPWIKI_ROOT / ".git").exists(): - raise RuntimeError(f"Nie je to git repozitár: {ZPWIKI_ROOT}") + print(f"[green]ZPWIKI_ROOT:[/green] {ZPWIKI_ROOT}") - run_command(["git", "pull"], cwd=ZPWIKI_ROOT) + if pull_git: + git_pull() + documents = scan_pages() + chunks = build_chunks() + counts = build_database() -def rebuild_index() -> None: - run_command([sys.executable, "scripts/scan_zpwiki.py"]) - run_command([sys.executable, "scripts/build_chunks.py"]) - run_command([sys.executable, "scripts/build_sqlite_index.py"]) + duration = round(time.time() - start, 2) - -def get_counts() -> dict: - if not DB_FILE.exists(): - return { - "documents": 0, - "chunks": 0, - "tags": 0, - "categories": 0, - } - - conn = sqlite3.connect(DB_FILE) - cursor = conn.cursor() - - counts = { - "documents": cursor.execute("SELECT COUNT(*) FROM documents").fetchone()[0], - "chunks": cursor.execute("SELECT COUNT(*) FROM chunks").fetchone()[0], - "tags": cursor.execute("SELECT COUNT(*) FROM chunk_tags").fetchone()[0], - "categories": cursor.execute("SELECT COUNT(*) FROM chunk_categories").fetchone()[0], + return { + "duration_seconds": duration, + "documents_scanned": len(documents), + "chunks_created": len(chunks), + "counts": counts, + "database_path": str(DB_FILE), } - conn.close() - return counts +def main() -> None: + parser = argparse.ArgumentParser( + description="Obnoví JSON súbory a SQLite index." + ) -def main(): - parser = argparse.ArgumentParser() parser.add_argument( "--pull", action="store_true", @@ -82,21 +80,11 @@ def main(): ) args = parser.parse_args() - - start = time.time() - - print(f"[green]ZPWIKI_ROOT:[/green] {ZPWIKI_ROOT}") - - if args.pull: - git_pull() - - rebuild_index() - - counts = get_counts() - duration = round(time.time() - start, 2) + result = rebuild_index(pull_git=args.pull) + counts = result["counts"] print("[green]Reindex hotový.[/green]") - print(f"Trvanie: {duration} s") + print(f"Trvanie: {result['duration_seconds']} s") print(f"Dokumentov: {counts['documents']}") print(f"Chunkov: {counts['chunks']}") print(f"Tag záznamov: {counts['tags']}") diff --git a/scripts/scan_zpwiki.py b/scripts/scan_zpwiki.py index 175b1e9..f64bbd0 100644 --- a/scripts/scan_zpwiki.py +++ b/scripts/scan_zpwiki.py @@ -1,141 +1,96 @@ -from pathlib import Path +from __future__ import annotations + +import sys from collections import Counter -import json -import os -import frontmatter +from pathlib import Path + from rich import print -ZPWIKI_ROOT = Path(os.getenv("ZPWIKI_ROOT", "../zpwiki")) -PAGES_ROOT = ZPWIKI_ROOT / "pages" -OUTPUT_FILE = Path("data/documents.json") +PROJECT_ROOT = Path(__file__).resolve().parents[1] + +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) -def json_safe(value): - if value is None: - return None - - if isinstance(value, (str, int, float, bool)): - return value - - if isinstance(value, list): - return [json_safe(item) for item in value] - - if isinstance(value, dict): - return {str(key): json_safe(val) for key, val in value.items()} - - return str(value) +from scripts.common import DOCUMENTS_FILE, PAGES_ROOT, ZPWIKI_ROOT, load_zpwiki_page, write_json -def normalize_list(value): - if value is None: - return [] - - if isinstance(value, list): - return [str(item).strip() for item in value if str(item).strip()] - - if isinstance(value, str): - return [item.strip() for item in value.split(",") if item.strip()] - - return [str(value)] - - -def main(): +def scan_pages() -> list[dict]: if not PAGES_ROOT.exists(): raise SystemExit(f"Neexistuje priečinok: {PAGES_ROOT}") - markdown_files = sorted(PAGES_ROOT.glob("**/README.md")) - documents = [] metadata_keys = Counter() - categories_counter = Counter() - tags_counter = Counter() - authors_counter = Counter() + categories = Counter() + tags = Counter() + authors = Counter() - for file_path in markdown_files: - post = frontmatter.load(file_path) + for file_path in sorted(PAGES_ROOT.glob("**/README.md")): + page = load_zpwiki_page(file_path) + content = page.pop("content", "") - metadata = { - key: json_safe(value) - for key, value in post.metadata.items() - } - - taxonomy = metadata.get("taxonomy") or {} - content = post.content.strip() - - for key in metadata.keys(): + for key in page["metadata"]: metadata_keys[key] += 1 - categories = normalize_list( - metadata.get("category") - or taxonomy.get("category") + for category in page["categories"]: + categories[category] += 1 + + for tag in page["tags"]: + tags[tag] += 1 + + if page.get("author"): + authors[str(page["author"])] += 1 + + documents.append( + { + **page, + "content_preview": content[:500], + "content_length": len(content), + } ) - tags = normalize_list( - metadata.get("tag") - or metadata.get("tags") - or taxonomy.get("tag") - or taxonomy.get("tags") - ) + write_json(DOCUMENTS_FILE, documents) + print_summary(documents, metadata_keys, categories, tags, authors) - author = ( - metadata.get("author") - or taxonomy.get("author") - ) + return documents - for category in categories: - categories_counter[category] += 1 - - for tag in tags: - tags_counter[tag] += 1 - - if author: - authors_counter[str(author)] += 1 - - relative_path = file_path.relative_to(ZPWIKI_ROOT) - - documents.append({ - "path": str(relative_path), - "title": metadata.get("title"), - "categories": categories, - "tags": tags, - "published": metadata.get("published"), - "author": author, - "taxonomy": taxonomy, - "metadata": metadata, - "content_preview": content[:500], - "content_length": len(content), - }) - - OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True) - - with OUTPUT_FILE.open("w", encoding="utf-8") as file: - json.dump(documents, file, ensure_ascii=False, indent=2) +def print_summary( + documents: list[dict], + metadata_keys: Counter, + categories: Counter, + tags: Counter, + authors: Counter, +) -> None: print(f"[green]ZPWIKI_ROOT:[/green] {ZPWIKI_ROOT}") - print(f"[green]Našiel som README.md súborov:[/green] {len(markdown_files)}") - print(f"[green]Výstup uložený do:[/green] {OUTPUT_FILE}") + print(f"[green]Našiel som dokumentov:[/green] {len(documents)}") + print(f"[green]Výstup uložený do:[/green] {DOCUMENTS_FILE}") print("\n[bold]Najčastejšie metadata kľúče:[/bold]") for key, count in metadata_keys.most_common(30): print(f"{key}: {count}") print("\n[bold]Najčastejšie kategórie:[/bold]") - for key, count in categories_counter.most_common(30): + for key, count in categories.most_common(30): print(f"{key}: {count}") print("\n[bold]Najčastejšie tagy:[/bold]") - for key, count in tags_counter.most_common(40): + for key, count in tags.most_common(40): print(f"{key}: {count}") print("\n[bold]Najčastejší autori:[/bold]") - for key, count in authors_counter.most_common(20): + for key, count in authors.most_common(20): print(f"{key}: {count}") - print("\n[bold]Ukážka prvého dokumentu:[/bold]") if documents: + print("\n[bold]Ukážka prvého dokumentu:[/bold]") print(documents[0]) +def main() -> None: + scan_pages() + + if __name__ == "__main__": main() diff --git a/scripts/search_chunks.py b/scripts/search_chunks.py deleted file mode 100644 index 27d0ef9..0000000 --- a/scripts/search_chunks.py +++ /dev/null @@ -1,189 +0,0 @@ -from pathlib import Path -import json -import re -import os -import frontmatter -from rich import print - - -ZPWIKI_ROOT = Path(os.getenv("ZPWIKI_ROOT", "../zpwiki")) -PAGES_ROOT = ZPWIKI_ROOT / "pages" -OUTPUT_FILE = Path("data/chunks.json") - -MAX_CHARS = 1200 -OVERLAP_CHARS = 200 - - -def json_safe(value): - if value is None: - return None - - if isinstance(value, (str, int, float, bool)): - return value - - if isinstance(value, list): - return [json_safe(item) for item in value] - - if isinstance(value, dict): - return {str(key): json_safe(val) for key, val in value.items()} - - return str(value) - - -def normalize_list(value): - if value is None: - return [] - - if isinstance(value, list): - return [str(item).strip() for item in value if str(item).strip()] - - if isinstance(value, str): - return [item.strip() for item in value.split(",") if item.strip()] - - return [str(value)] - - -def clean_markdown(text: str) -> str: - text = text.replace("\r\n", "\n") - text = re.sub(r"\n{3,}", "\n\n", text) - text = text.strip() - return text - - -def split_by_headings(text: str) -> list[str]: - parts = re.split(r"(?m)(?=^#{1,6}\s+)", text) - return [part.strip() for part in parts if part.strip()] - - -def split_long_text( - text: str, - max_chars: int = MAX_CHARS, - overlap: int = OVERLAP_CHARS, -) -> list[str]: - if len(text) <= max_chars: - return [text] - - chunks = [] - start = 0 - - while start < len(text): - end = start + max_chars - chunk = text[start:end].strip() - - if chunk: - chunks.append(chunk) - - if end >= len(text): - break - - start = max(0, end - overlap) - - return chunks - - -def chunk_markdown(text: str) -> list[str]: - text = clean_markdown(text) - - if not text: - return [] - - heading_parts = split_by_headings(text) - - chunks = [] - - for part in heading_parts: - if len(part) <= MAX_CHARS: - chunks.append(part) - else: - chunks.extend(split_long_text(part)) - - return chunks - - -def extract_document(file_path: Path) -> dict: - post = frontmatter.load(file_path) - - metadata = { - key: json_safe(value) - for key, value in post.metadata.items() - } - - taxonomy = metadata.get("taxonomy") or {} - - categories = normalize_list( - metadata.get("category") - or taxonomy.get("category") - ) - - tags = normalize_list( - metadata.get("tag") - or metadata.get("tags") - or taxonomy.get("tag") - or taxonomy.get("tags") - ) - - author = ( - metadata.get("author") - or taxonomy.get("author") - ) - - relative_path = file_path.relative_to(ZPWIKI_ROOT) - - return { - "path": str(relative_path), - "title": metadata.get("title"), - "categories": categories, - "tags": tags, - "published": metadata.get("published"), - "author": author, - "content": post.content.strip(), - "metadata": metadata, - } - - -def main(): - if not PAGES_ROOT.exists(): - raise SystemExit(f"Neexistuje priečinok: {PAGES_ROOT}") - - markdown_files = sorted(PAGES_ROOT.glob("**/README.md")) - - all_chunks = [] - document_count = 0 - - for file_path in markdown_files: - document = extract_document(file_path) - chunks = chunk_markdown(document["content"]) - - document_count += 1 - - for index, chunk_text in enumerate(chunks): - all_chunks.append({ - "chunk_id": f"{document['path']}::chunk-{index}", - "document_path": document["path"], - "title": document["title"], - "categories": document["categories"], - "tags": document["tags"], - "author": document["author"], - "published": document["published"], - "chunk_index": index, - "text": chunk_text, - "text_length": len(chunk_text), - }) - - OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True) - - with OUTPUT_FILE.open("w", encoding="utf-8") as file: - json.dump(all_chunks, file, ensure_ascii=False, indent=2) - - print(f"[green]ZPWIKI_ROOT:[/green] {ZPWIKI_ROOT}") - print(f"[green]Dokumentov:[/green] {document_count}") - print(f"[green]Chunkov:[/green] {len(all_chunks)}") - print(f"[green]Výstup uložený do:[/green] {OUTPUT_FILE}") - - if all_chunks: - print("\n[bold]Ukážka prvého chunku:[/bold]") - print(all_chunks[0]) - - -if __name__ == "__main__": - main() diff --git a/scripts/search_db.py b/scripts/search_db.py index 9ababa3..a82d439 100644 --- a/scripts/search_db.py +++ b/scripts/search_db.py @@ -1,261 +1,33 @@ -from pathlib import Path -import sqlite3 -import re +from __future__ import annotations + +import argparse import sys -import unicodedata -from collections import Counter +from pathlib import Path + from rich import print -DB_FILE = Path("data/zp_index.sqlite") +PROJECT_ROOT = Path(__file__).resolve().parents[1] +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) -TECHNICAL_TERMS = { - "rag", - "agent", - "graph", - "knowledge", - "chatbot", - "nlp", - "llm", - "lm", - "openwebui", - "docker", - "webhook", - "database", - "db", - "neo4j", - "python", - "search", - "retrieval", - "generation", - "embedding", - "vector", - "vectors", - "langchain", - "graphrag", - "qa", - "question", - "answer", - "cloud", - "api", -} +from scripts.common import DB_FILE +from scripts.search_utils import search_database -def normalize_text(text: str) -> str: - text = text.lower() - text = text.replace("_", " ") - text = text.replace("/", " ") - text = text.replace("-", " ") - - text = unicodedata.normalize("NFKD", text) - text = "".join(ch for ch in text if not unicodedata.combining(ch)) - - text = re.sub(r"[^a-z0-9]+", " ", text) - return text.strip() - - -def tokenize(text: str) -> list[str]: - text = normalize_text(text) - return [word for word in text.split() if len(word) >= 2] - - -def detect_search_mode(query_tokens: list[str]) -> str: - """ - person režim: - napríklad jan ptak, jan holp, daniel hladek - - topic režim: - napríklad rag agent, knowledge graph, nlp chatbot - """ - - if not query_tokens: - return "topic" - - has_technical_term = any(token in TECHNICAL_TERMS for token in query_tokens) - - if len(query_tokens) == 2 and not has_technical_term: - return "person" - - return "topic" - - -def score_tokens(query_tokens: list[str], field_tokens: list[str], weight: int) -> int: - counts = Counter(field_tokens) - score = 0 - - for token in query_tokens: - score += counts.get(token, 0) * weight - - return score - - -def get_tags(conn: sqlite3.Connection, chunk_id: str) -> list[str]: - rows = conn.execute( - "SELECT tag FROM chunk_tags WHERE chunk_id = ?", - (chunk_id,) - ).fetchall() - - return [row[0] for row in rows] - - -def get_categories(conn: sqlite3.Connection, chunk_id: str) -> list[str]: - rows = conn.execute( - "SELECT category FROM chunk_categories WHERE chunk_id = ?", - (chunk_id,) - ).fetchall() - - return [row[0] for row in rows] - - -def contains_all_tokens(query_tokens: list[str], field_tokens: list[str]) -> bool: - return all(token in field_tokens for token in query_tokens) - - -def person_match(query_tokens: list[str], item: dict) -> bool: - title_tokens = tokenize(item.get("title") or "") - path_tokens = tokenize(item.get("document_path") or "") - author_tokens = tokenize(item.get("author") or "") - text_tokens = tokenize(item.get("text") or "") - - if contains_all_tokens(query_tokens, title_tokens): - return True - - if contains_all_tokens(query_tokens, path_tokens): - return True - - if contains_all_tokens(query_tokens, author_tokens): - return True - - """ - Text berieme slabšie, ale necháme ho ako fallback. - Napríklad ak meno nie je v title, ale je v obsahu. - """ - if contains_all_tokens(query_tokens, text_tokens): - return True - - return False - - -def score_item(query: str, query_tokens: list[str], item: dict, mode: str) -> int: - title = item.get("title") or "" - path = item.get("document_path") or "" - author = item.get("author") or "" - text = item.get("text") or "" - tags = item.get("tags") or [] - categories = item.get("categories") or [] - - title_tokens = tokenize(title) - path_tokens = tokenize(path) - author_tokens = tokenize(author) - text_tokens = tokenize(text) - tag_tokens = tokenize(" ".join(tags)) - category_tokens = tokenize(" ".join(categories)) - - score = 0 - - if mode == "person": - score += score_tokens(query_tokens, title_tokens, 30) - score += score_tokens(query_tokens, path_tokens, 30) - score += score_tokens(query_tokens, author_tokens, 15) - score += score_tokens(query_tokens, text_tokens, 2) - - if contains_all_tokens(query_tokens, title_tokens): - score += 100 - - if contains_all_tokens(query_tokens, path_tokens): - score += 100 - - if contains_all_tokens(query_tokens, author_tokens): - score += 60 - - return score - - score += score_tokens(query_tokens, title_tokens, 12) - score += score_tokens(query_tokens, path_tokens, 12) - score += score_tokens(query_tokens, tag_tokens, 10) - score += score_tokens(query_tokens, category_tokens, 6) - score += score_tokens(query_tokens, author_tokens, 3) - score += score_tokens(query_tokens, text_tokens, 2) - - normalized_query = normalize_text(query) - normalized_title = normalize_text(title) - normalized_path = normalize_text(path) - - if normalized_query and normalized_query in normalized_title: - score += 30 - - if normalized_query and normalized_query in normalized_path: - score += 30 - - matched_title_tokens = sum(1 for token in query_tokens if token in title_tokens) - matched_path_tokens = sum(1 for token in query_tokens if token in path_tokens) - - if query_tokens and matched_title_tokens == len(query_tokens): - score += 25 - - if query_tokens and matched_path_tokens == len(query_tokens): - score += 25 - - return score - - -def main(): - if len(sys.argv) < 2: - print("[red]Použitie:[/red] python scripts/search_db.py \"rag agent\"") - raise SystemExit(1) - - if not DB_FILE.exists(): - raise SystemExit(f"Databáza neexistuje: {DB_FILE}") - - query = " ".join(sys.argv[1:]) - query_tokens = tokenize(query) - mode = detect_search_mode(query_tokens) - - conn = sqlite3.connect(DB_FILE) - - rows = conn.execute(""" - SELECT chunk_id, document_path, title, author, chunk_index, text, text_length - FROM chunks - """).fetchall() - - results = [] - - for row in rows: - chunk_id, document_path, title, author, chunk_index, text, text_length = row - - item = { - "chunk_id": chunk_id, - "document_path": document_path, - "title": title, - "author": author, - "chunk_index": chunk_index, - "text": text, - "text_length": text_length, - "tags": get_tags(conn, chunk_id), - "categories": get_categories(conn, chunk_id), - } - - if mode == "person" and not person_match(query_tokens, item): - continue - - score = score_item(query, query_tokens, item, mode) - - if score > 0: - item["score"] = score - results.append(item) - - results.sort(key=lambda item: item["score"], reverse=True) +def print_results(query: str, mode: str, results: list[dict]) -> None: print(f"[bold]Dopyt:[/bold] {query}") print(f"[bold]Režim:[/bold] {mode}") print(f"[bold]Počet výsledkov:[/bold] {len(results)}") print("\n[bold]Top výsledky:[/bold]\n") - for rank, item in enumerate(results[:10], start=1): + for rank, item in enumerate(results, start=1): print(f"[cyan]{rank}. Skóre: {item['score']}[/cyan]") print(f"[bold]Názov:[/bold] {item['title']}") print(f"[bold]Cesta:[/bold] {item['document_path']}") + print(f"[bold]URL:[/bold] {item['source_url']}") print(f"[bold]Chunk:[/bold] {item['chunk_index']}") print(f"[bold]Kategórie:[/bold] {item['categories']}") print(f"[bold]Tagy:[/bold] {item['tags']}") @@ -264,7 +36,34 @@ def main(): print((item["text"] or "")[:700]) print("-" * 80) - conn.close() + +def main() -> None: + parser = argparse.ArgumentParser( + description="Vyhľadávanie v SQLite indexe zpwiki." + ) + + parser.add_argument( + "query", + nargs="+", + help="Text, ktorý sa má vyhľadať.", + ) + + parser.add_argument( + "--limit", + type=int, + default=10, + help="Počet výsledkov.", + ) + + args = parser.parse_args() + query = " ".join(args.query) + + try: + mode, results = search_database(DB_FILE, query, args.limit) + except FileNotFoundError as error: + raise SystemExit(str(error)) from error + + print_results(query, mode, results) if __name__ == "__main__": diff --git a/scripts/search_utils.py b/scripts/search_utils.py new file mode 100644 index 0000000..6c4e600 --- /dev/null +++ b/scripts/search_utils.py @@ -0,0 +1,239 @@ +from __future__ import annotations + +import re +import sqlite3 +import unicodedata +from collections import Counter, defaultdict +from pathlib import Path +from typing import Any + + +TECHNICAL_TERMS = { + "rag", + "agent", + "graph", + "knowledge", + "chatbot", + "nlp", + "llm", + "lm", + "openwebui", + "docker", + "webhook", + "database", + "db", + "neo4j", + "python", + "search", + "retrieval", + "generation", + "embedding", + "vector", + "vectors", + "langchain", + "graphrag", + "qa", + "question", + "answer", + "cloud", + "api", +} + + +def normalize_text(text: str) -> str: + text = text.lower() + text = text.replace("_", " ") + text = text.replace("/", " ") + text = text.replace("-", " ") + + text = unicodedata.normalize("NFKD", text) + text = "".join(ch for ch in text if not unicodedata.combining(ch)) + + return re.sub(r"[^a-z0-9]+", " ", text).strip() + + +def tokenize(text: str) -> list[str]: + return [ + word + for word in normalize_text(text).split() + if len(word) >= 2 + ] + + +def detect_search_mode(tokens: list[str]) -> str: + """Jednoduchý odhad, či ide o meno osoby alebo odbornú tému.""" + if not tokens: + return "topic" + + has_technical_term = any(token in TECHNICAL_TERMS for token in tokens) + + if len(tokens) == 2 and not has_technical_term: + return "person" + + return "topic" + + +def contains_all(query_tokens: list[str], field_tokens: list[str]) -> bool: + return all(token in field_tokens for token in query_tokens) + + +def score_tokens( + query_tokens: list[str], + field_tokens: list[str], + weight: int, +) -> int: + counts = Counter(field_tokens) + + return sum( + counts.get(token, 0) * weight + for token in query_tokens + ) + + +def make_source_url(document_path: str) -> str: + clean_path = document_path.replace("pages/", "").replace("/README.md", "") + return f"https://zp.kemt.fei.tuke.sk/{clean_path}" + + +def load_labels( + conn: sqlite3.Connection, + table: str, + column: str, +) -> dict[str, list[str]]: + rows = conn.execute(f"SELECT chunk_id, {column} FROM {table}").fetchall() + labels: dict[str, list[str]] = defaultdict(list) + + for chunk_id, value in rows: + labels[chunk_id].append(value) + + return labels + + +def person_matches(query_tokens: list[str], item: dict[str, Any]) -> bool: + fields = [ + item.get("title") or "", + item.get("document_path") or "", + item.get("author") or "", + item.get("text") or "", + ] + + return any( + contains_all(query_tokens, tokenize(field)) + for field in fields + ) + + +def score_item( + query: str, + query_tokens: list[str], + item: dict[str, Any], + mode: str, +) -> int: + title_tokens = tokenize(item.get("title") or "") + path_tokens = tokenize(item.get("document_path") or "") + author_tokens = tokenize(item.get("author") or "") + text_tokens = tokenize(item.get("text") or "") + tag_tokens = tokenize(" ".join(item.get("tags") or [])) + category_tokens = tokenize(" ".join(item.get("categories") or [])) + + if mode == "person": + score = 0 + score += score_tokens(query_tokens, title_tokens, 30) + score += score_tokens(query_tokens, path_tokens, 30) + score += score_tokens(query_tokens, author_tokens, 15) + score += score_tokens(query_tokens, text_tokens, 2) + + if contains_all(query_tokens, title_tokens): + score += 100 + + if contains_all(query_tokens, path_tokens): + score += 100 + + if contains_all(query_tokens, author_tokens): + score += 60 + + return score + + score = 0 + score += score_tokens(query_tokens, title_tokens, 12) + score += score_tokens(query_tokens, path_tokens, 12) + score += score_tokens(query_tokens, tag_tokens, 10) + score += score_tokens(query_tokens, category_tokens, 6) + score += score_tokens(query_tokens, author_tokens, 3) + score += score_tokens(query_tokens, text_tokens, 2) + + normalized_query = normalize_text(query) + normalized_title = normalize_text(item.get("title") or "") + normalized_path = normalize_text(item.get("document_path") or "") + + if normalized_query and normalized_query in normalized_title: + score += 30 + + if normalized_query and normalized_query in normalized_path: + score += 30 + + if query_tokens and contains_all(query_tokens, title_tokens): + score += 25 + + if query_tokens and contains_all(query_tokens, path_tokens): + score += 25 + + return score + + +def search_database( + db_file: Path, + query: str, + limit: int = 10, +) -> tuple[str, list[dict[str, Any]]]: + if not db_file.exists(): + raise FileNotFoundError(f"Databáza neexistuje: {db_file}") + + query_tokens = tokenize(query) + mode = detect_search_mode(query_tokens) + + with sqlite3.connect(db_file) as conn: + conn.row_factory = sqlite3.Row + + tags_by_chunk = load_labels(conn, "chunk_tags", "tag") + categories_by_chunk = load_labels(conn, "chunk_categories", "category") + + rows = conn.execute( + """ + SELECT + chunk_id, + document_path, + title, + author, + chunk_index, + text, + text_length + FROM chunks + """ + ).fetchall() + + results = [] + + for row in rows: + item = dict(row) + chunk_id = item["chunk_id"] + + item["tags"] = tags_by_chunk.get(chunk_id, []) + item["categories"] = categories_by_chunk.get(chunk_id, []) + + if mode == "person" and not person_matches(query_tokens, item): + continue + + score = score_item(query, query_tokens, item, mode) + + if score <= 0: + continue + + item["score"] = score + item["source_url"] = make_source_url(item["document_path"]) + + results.append(item) + + results.sort(key=lambda item: item["score"], reverse=True) + + return mode, results[:limit]