Add sync and reindex endpoint

This commit is contained in:
Ján Pták 2026-06-04 17:19:18 +02:00
parent 10c45de1d7
commit b6f4857ba6
6 changed files with 379 additions and 74 deletions

View File

@ -5,6 +5,10 @@ WORKDIR /app
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
RUN apt-get update \
&& apt-get install -y --no-install-recommends git \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

View File

@ -1,14 +1,19 @@
from pathlib import Path
import sqlite3
import os
import re
import sqlite3
import subprocess
import sys
import time
import unicodedata
from collections import Counter
from fastapi import FastAPI
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
DB_FILE = Path("data/zp_index.sqlite")
ZPWIKI_ROOT = Path(os.getenv("ZPWIKI_ROOT", "../zpwiki"))
TECHNICAL_TERMS = {
@ -46,7 +51,7 @@ TECHNICAL_TERMS = {
app = FastAPI(
title="ZP Agent API",
description="API pre vyhľadávanie v repozitári záverečných prác zpwiki.",
version="0.1.0",
version="0.2.0",
)
@ -55,18 +60,11 @@ class SearchRequest(BaseModel):
limit: int = Field(default=10, ge=1, le=50)
class SearchResult(BaseModel):
score: int
chunk_id: str
document_path: str
source_url: str
title: str | None
author: str | None
chunk_index: int
categories: list[str]
tags: list[str]
text: str
text_length: int
class SyncRequest(BaseModel):
pull_git: bool = Field(
default=False,
description="Ak je true, pred reindexovaním sa vykoná git pull v repozitári zpwiki.",
)
def normalize_text(text: str) -> str:
@ -268,18 +266,95 @@ def search_database(query: str, limit: int) -> tuple[str, list[dict]]:
return mode, results[:limit]
def run_command(command: list[str], cwd: Path | None = None) -> str:
result = subprocess.run(
command,
cwd=cwd,
text=True,
capture_output=True,
)
output = ""
if result.stdout:
output += result.stdout
if result.stderr:
output += result.stderr
if result.returncode != 0:
raise RuntimeError(output.strip())
return output.strip()
def get_index_counts() -> dict:
if not DB_FILE.exists():
return {
"documents": 0,
"chunks": 0,
"tags": 0,
"categories": 0,
}
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
counts = {
"documents": cursor.execute("SELECT COUNT(*) FROM documents").fetchone()[0],
"chunks": cursor.execute("SELECT COUNT(*) FROM chunks").fetchone()[0],
"tags": cursor.execute("SELECT COUNT(*) FROM chunk_tags").fetchone()[0],
"categories": cursor.execute("SELECT COUNT(*) FROM chunk_categories").fetchone()[0],
}
conn.close()
return counts
def rebuild_index(pull_git: bool = False) -> dict:
start = time.time()
logs = []
if pull_git:
if not ZPWIKI_ROOT.exists():
raise RuntimeError(f"ZPWIKI_ROOT neexistuje: {ZPWIKI_ROOT}")
if not (ZPWIKI_ROOT / ".git").exists():
raise RuntimeError(f"Nie je to git repozitár: {ZPWIKI_ROOT}")
logs.append(run_command(["git", "pull"], cwd=ZPWIKI_ROOT))
logs.append(run_command([sys.executable, "scripts/scan_zpwiki.py"]))
logs.append(run_command([sys.executable, "scripts/build_chunks.py"]))
logs.append(run_command([sys.executable, "scripts/build_sqlite_index.py"]))
counts = get_index_counts()
duration = round(time.time() - start, 2)
return {
"duration_seconds": duration,
"counts": counts,
"logs": logs,
}
@app.get("/health")
def health():
return {
"status": "ok",
"database_exists": DB_FILE.exists(),
"database_path": str(DB_FILE),
"zpwiki_root": str(ZPWIKI_ROOT),
"zpwiki_exists": ZPWIKI_ROOT.exists(),
}
@app.post("/search")
def search(request: SearchRequest):
mode, results = search_database(request.query, request.limit)
try:
mode, results = search_database(request.query, request.limit)
except FileNotFoundError as error:
raise HTTPException(status_code=500, detail=str(error)) from error
return {
"query": request.query,
@ -287,3 +362,18 @@ def search(request: SearchRequest):
"count": len(results),
"results": results,
}
@app.post("/sync")
def sync(request: SyncRequest):
try:
result = rebuild_index(pull_git=request.pull_git)
except RuntimeError as error:
raise HTTPException(status_code=500, detail=str(error)) from error
return {
"status": "ok",
"pull_git": request.pull_git,
"duration_seconds": result["duration_seconds"],
"counts": result["counts"],
}

View File

@ -4,6 +4,9 @@ services:
container_name: zp-agent-api
ports:
- "8000:8000"
environment:
- ZPWIKI_ROOT=/zpwiki
volumes:
- ./data:/app/data
- ../zpwiki:/zpwiki
restart: unless-stopped

107
scripts/rebuild_index.py Normal file
View File

@ -0,0 +1,107 @@
from pathlib import Path
import argparse
import os
import sqlite3
import subprocess
import sys
import time
from rich import print
ZPWIKI_ROOT = Path(os.getenv("ZPWIKI_ROOT", "../zpwiki"))
DB_FILE = Path("data/zp_index.sqlite")
def run_command(command: list[str], cwd: Path | None = None) -> None:
print(f"[cyan]Spúšťam:[/cyan] {' '.join(command)}")
result = subprocess.run(
command,
cwd=cwd,
text=True,
capture_output=True,
)
if result.stdout:
print(result.stdout.strip())
if result.stderr:
print(result.stderr.strip())
if result.returncode != 0:
raise RuntimeError(
f"Príkaz zlyhal: {' '.join(command)}"
)
def git_pull() -> None:
if not ZPWIKI_ROOT.exists():
raise RuntimeError(f"ZPWIKI_ROOT neexistuje: {ZPWIKI_ROOT}")
if not (ZPWIKI_ROOT / ".git").exists():
raise RuntimeError(f"Nie je to git repozitár: {ZPWIKI_ROOT}")
run_command(["git", "pull"], cwd=ZPWIKI_ROOT)
def rebuild_index() -> None:
run_command([sys.executable, "scripts/scan_zpwiki.py"])
run_command([sys.executable, "scripts/build_chunks.py"])
run_command([sys.executable, "scripts/build_sqlite_index.py"])
def get_counts() -> dict:
if not DB_FILE.exists():
return {
"documents": 0,
"chunks": 0,
"tags": 0,
"categories": 0,
}
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
counts = {
"documents": cursor.execute("SELECT COUNT(*) FROM documents").fetchone()[0],
"chunks": cursor.execute("SELECT COUNT(*) FROM chunks").fetchone()[0],
"tags": cursor.execute("SELECT COUNT(*) FROM chunk_tags").fetchone()[0],
"categories": cursor.execute("SELECT COUNT(*) FROM chunk_categories").fetchone()[0],
}
conn.close()
return counts
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--pull",
action="store_true",
help="Pred reindexovaním spustí git pull v zpwiki repozitári.",
)
args = parser.parse_args()
start = time.time()
print(f"[green]ZPWIKI_ROOT:[/green] {ZPWIKI_ROOT}")
if args.pull:
git_pull()
rebuild_index()
counts = get_counts()
duration = round(time.time() - start, 2)
print("[green]Reindex hotový.[/green]")
print(f"Trvanie: {duration} s")
print(f"Dokumentov: {counts['documents']}")
print(f"Chunkov: {counts['chunks']}")
print(f"Tag záznamov: {counts['tags']}")
print(f"Kategória záznamov: {counts['categories']}")
if __name__ == "__main__":
main()

View File

@ -1,11 +1,12 @@
from pathlib import Path
from collections import Counter
import json
import os
import frontmatter
from rich import print
ZPWIKI_ROOT = Path("../zpwiki")
ZPWIKI_ROOT = Path(os.getenv("ZPWIKI_ROOT", "../zpwiki"))
PAGES_ROOT = ZPWIKI_ROOT / "pages"
OUTPUT_FILE = Path("data/documents.json")
@ -111,6 +112,7 @@ def main():
with OUTPUT_FILE.open("w", encoding="utf-8") as file:
json.dump(documents, file, ensure_ascii=False, indent=2)
print(f"[green]ZPWIKI_ROOT:[/green] {ZPWIKI_ROOT}")
print(f"[green]Našiel som README.md súborov:[/green] {len(markdown_files)}")
print(f"[green]Výstup uložený do:[/green] {OUTPUT_FILE}")

View File

@ -1,89 +1,188 @@
from pathlib import Path
import json
import re
import sys
from collections import Counter
import os
import frontmatter
from rich import print
CHUNKS_FILE = Path("data/chunks.json")
ZPWIKI_ROOT = Path(os.getenv("ZPWIKI_ROOT", "../zpwiki"))
PAGES_ROOT = ZPWIKI_ROOT / "pages"
OUTPUT_FILE = Path("data/chunks.json")
MAX_CHARS = 1200
OVERLAP_CHARS = 200
def tokenize(text: str) -> list[str]:
text = text.lower()
text = re.sub(r"[^a-záäčďéíĺľňóôŕšťúýž0-9]+", " ", text)
return [word for word in text.split() if len(word) >= 2]
def json_safe(value):
if value is None:
return None
if isinstance(value, (str, int, float, bool)):
return value
if isinstance(value, list):
return [json_safe(item) for item in value]
if isinstance(value, dict):
return {str(key): json_safe(val) for key, val in value.items()}
return str(value)
def score_chunk(query_tokens: list[str], chunk: dict) -> int:
text = " ".join([
chunk.get("title") or "",
" ".join(chunk.get("tags") or []),
" ".join(chunk.get("categories") or []),
chunk.get("author") or "",
chunk.get("text") or "",
])
def normalize_list(value):
if value is None:
return []
tokens = tokenize(text)
token_counts = Counter(tokens)
if isinstance(value, list):
return [str(item).strip() for item in value if str(item).strip()]
score = 0
if isinstance(value, str):
return [item.strip() for item in value.split(",") if item.strip()]
for query_token in query_tokens:
score += token_counts.get(query_token, 0) * 3
return [str(value)]
if query_token in [tag.lower() for tag in chunk.get("tags", [])]:
score += 10
if query_token in [category.lower() for category in chunk.get("categories", [])]:
score += 6
def clean_markdown(text: str) -> str:
text = text.replace("\r\n", "\n")
text = re.sub(r"\n{3,}", "\n\n", text)
text = text.strip()
return text
title = (chunk.get("title") or "").lower()
if query_token in title:
score += 5
return score
def split_by_headings(text: str) -> list[str]:
parts = re.split(r"(?m)(?=^#{1,6}\s+)", text)
return [part.strip() for part in parts if part.strip()]
def split_long_text(
text: str,
max_chars: int = MAX_CHARS,
overlap: int = OVERLAP_CHARS,
) -> list[str]:
if len(text) <= max_chars:
return [text]
chunks = []
start = 0
while start < len(text):
end = start + max_chars
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
if end >= len(text):
break
start = max(0, end - overlap)
return chunks
def chunk_markdown(text: str) -> list[str]:
text = clean_markdown(text)
if not text:
return []
heading_parts = split_by_headings(text)
chunks = []
for part in heading_parts:
if len(part) <= MAX_CHARS:
chunks.append(part)
else:
chunks.extend(split_long_text(part))
return chunks
def extract_document(file_path: Path) -> dict:
post = frontmatter.load(file_path)
metadata = {
key: json_safe(value)
for key, value in post.metadata.items()
}
taxonomy = metadata.get("taxonomy") or {}
categories = normalize_list(
metadata.get("category")
or taxonomy.get("category")
)
tags = normalize_list(
metadata.get("tag")
or metadata.get("tags")
or taxonomy.get("tag")
or taxonomy.get("tags")
)
author = (
metadata.get("author")
or taxonomy.get("author")
)
relative_path = file_path.relative_to(ZPWIKI_ROOT)
return {
"path": str(relative_path),
"title": metadata.get("title"),
"categories": categories,
"tags": tags,
"published": metadata.get("published"),
"author": author,
"content": post.content.strip(),
"metadata": metadata,
}
def main():
if len(sys.argv) < 2:
print("[red]Použitie:[/red] python scripts/search_chunks.py \"rag agent\"")
raise SystemExit(1)
if not PAGES_ROOT.exists():
raise SystemExit(f"Neexistuje priečinok: {PAGES_ROOT}")
query = " ".join(sys.argv[1:])
query_tokens = tokenize(query)
markdown_files = sorted(PAGES_ROOT.glob("**/README.md"))
if not CHUNKS_FILE.exists():
raise SystemExit(f"Súbor neexistuje: {CHUNKS_FILE}")
all_chunks = []
document_count = 0
with CHUNKS_FILE.open("r", encoding="utf-8") as file:
chunks = json.load(file)
for file_path in markdown_files:
document = extract_document(file_path)
chunks = chunk_markdown(document["content"])
results = []
document_count += 1
for chunk in chunks:
score = score_chunk(query_tokens, chunk)
for index, chunk_text in enumerate(chunks):
all_chunks.append({
"chunk_id": f"{document['path']}::chunk-{index}",
"document_path": document["path"],
"title": document["title"],
"categories": document["categories"],
"tags": document["tags"],
"author": document["author"],
"published": document["published"],
"chunk_index": index,
"text": chunk_text,
"text_length": len(chunk_text),
})
if score > 0:
results.append((score, chunk))
OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True)
results.sort(key=lambda item: item[0], reverse=True)
with OUTPUT_FILE.open("w", encoding="utf-8") as file:
json.dump(all_chunks, file, ensure_ascii=False, indent=2)
print(f"[bold]Dopyt:[/bold] {query}")
print(f"[bold]Počet výsledkov:[/bold] {len(results)}")
print(f"[green]ZPWIKI_ROOT:[/green] {ZPWIKI_ROOT}")
print(f"[green]Dokumentov:[/green] {document_count}")
print(f"[green]Chunkov:[/green] {len(all_chunks)}")
print(f"[green]Výstup uložený do:[/green] {OUTPUT_FILE}")
print("\n[bold]Top výsledky:[/bold]\n")
for rank, (score, chunk) in enumerate(results[:10], start=1):
print(f"[cyan]{rank}. Skóre: {score}[/cyan]")
print(f"[bold]Názov:[/bold] {chunk.get('title')}")
print(f"[bold]Cesta:[/bold] {chunk.get('document_path')}")
print(f"[bold]Kategórie:[/bold] {chunk.get('categories')}")
print(f"[bold]Tagy:[/bold] {chunk.get('tags')}")
print(f"[bold]Autor:[/bold] {chunk.get('author')}")
print("[bold]Text:[/bold]")
print((chunk.get("text") or "")[:700])
print("-" * 80)
if all_chunks:
print("\n[bold]Ukážka prvého chunku:[/bold]")
print(all_chunks[0])
if __name__ == "__main__":