Parser a Fast API

This commit is contained in:
Ján Pták 2026-06-03 21:04:03 +02:00
commit fe79c9c2ed
11 changed files with 21410 additions and 0 deletions

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
.venv/
__pycache__/
*.pyc
data/*.sqlite
data/*.db

0
app/__init__.py Normal file
View File

289
app/main.py Normal file
View File

@ -0,0 +1,289 @@
from pathlib import Path
import sqlite3
import re
import unicodedata
from collections import Counter
from fastapi import FastAPI
from pydantic import BaseModel, Field
DB_FILE = Path("data/zp_index.sqlite")
TECHNICAL_TERMS = {
"rag",
"agent",
"graph",
"knowledge",
"chatbot",
"nlp",
"llm",
"lm",
"openwebui",
"docker",
"webhook",
"database",
"db",
"neo4j",
"python",
"search",
"retrieval",
"generation",
"embedding",
"vector",
"vectors",
"langchain",
"graphrag",
"qa",
"question",
"answer",
"cloud",
"api",
}
app = FastAPI(
title="ZP Agent API",
description="API pre vyhľadávanie v repozitári záverečných prác zpwiki.",
version="0.1.0",
)
class SearchRequest(BaseModel):
query: str = Field(..., min_length=1)
limit: int = Field(default=10, ge=1, le=50)
class SearchResult(BaseModel):
score: int
chunk_id: str
document_path: str
source_url: str
title: str | None
author: str | None
chunk_index: int
categories: list[str]
tags: list[str]
text: str
text_length: int
def normalize_text(text: str) -> str:
text = text.lower()
text = text.replace("_", " ")
text = text.replace("/", " ")
text = text.replace("-", " ")
text = unicodedata.normalize("NFKD", text)
text = "".join(ch for ch in text if not unicodedata.combining(ch))
text = re.sub(r"[^a-z0-9]+", " ", text)
return text.strip()
def tokenize(text: str) -> list[str]:
text = normalize_text(text)
return [word for word in text.split() if len(word) >= 2]
def detect_search_mode(query_tokens: list[str]) -> str:
if not query_tokens:
return "topic"
has_technical_term = any(token in TECHNICAL_TERMS for token in query_tokens)
if len(query_tokens) == 2 and not has_technical_term:
return "person"
return "topic"
def score_tokens(query_tokens: list[str], field_tokens: list[str], weight: int) -> int:
counts = Counter(field_tokens)
score = 0
for token in query_tokens:
score += counts.get(token, 0) * weight
return score
def contains_all_tokens(query_tokens: list[str], field_tokens: list[str]) -> bool:
return all(token in field_tokens for token in query_tokens)
def get_tags(conn: sqlite3.Connection, chunk_id: str) -> list[str]:
rows = conn.execute(
"SELECT tag FROM chunk_tags WHERE chunk_id = ?",
(chunk_id,),
).fetchall()
return [row[0] for row in rows]
def get_categories(conn: sqlite3.Connection, chunk_id: str) -> list[str]:
rows = conn.execute(
"SELECT category FROM chunk_categories WHERE chunk_id = ?",
(chunk_id,),
).fetchall()
return [row[0] for row in rows]
def person_match(query_tokens: list[str], item: dict) -> bool:
title_tokens = tokenize(item.get("title") or "")
path_tokens = tokenize(item.get("document_path") or "")
author_tokens = tokenize(item.get("author") or "")
text_tokens = tokenize(item.get("text") or "")
if contains_all_tokens(query_tokens, title_tokens):
return True
if contains_all_tokens(query_tokens, path_tokens):
return True
if contains_all_tokens(query_tokens, author_tokens):
return True
if contains_all_tokens(query_tokens, text_tokens):
return True
return False
def score_item(query: str, query_tokens: list[str], item: dict, mode: str) -> int:
title = item.get("title") or ""
path = item.get("document_path") or ""
author = item.get("author") or ""
text = item.get("text") or ""
tags = item.get("tags") or []
categories = item.get("categories") or []
title_tokens = tokenize(title)
path_tokens = tokenize(path)
author_tokens = tokenize(author)
text_tokens = tokenize(text)
tag_tokens = tokenize(" ".join(tags))
category_tokens = tokenize(" ".join(categories))
score = 0
if mode == "person":
score += score_tokens(query_tokens, title_tokens, 30)
score += score_tokens(query_tokens, path_tokens, 30)
score += score_tokens(query_tokens, author_tokens, 15)
score += score_tokens(query_tokens, text_tokens, 2)
if contains_all_tokens(query_tokens, title_tokens):
score += 100
if contains_all_tokens(query_tokens, path_tokens):
score += 100
if contains_all_tokens(query_tokens, author_tokens):
score += 60
return score
score += score_tokens(query_tokens, title_tokens, 12)
score += score_tokens(query_tokens, path_tokens, 12)
score += score_tokens(query_tokens, tag_tokens, 10)
score += score_tokens(query_tokens, category_tokens, 6)
score += score_tokens(query_tokens, author_tokens, 3)
score += score_tokens(query_tokens, text_tokens, 2)
normalized_query = normalize_text(query)
normalized_title = normalize_text(title)
normalized_path = normalize_text(path)
if normalized_query and normalized_query in normalized_title:
score += 30
if normalized_query and normalized_query in normalized_path:
score += 30
matched_title_tokens = sum(1 for token in query_tokens if token in title_tokens)
matched_path_tokens = sum(1 for token in query_tokens if token in path_tokens)
if query_tokens and matched_title_tokens == len(query_tokens):
score += 25
if query_tokens and matched_path_tokens == len(query_tokens):
score += 25
return score
def make_source_url(document_path: str) -> str:
clean_path = document_path.replace("pages/", "").replace("/README.md", "")
return f"https://zp.kemt.fei.tuke.sk/{clean_path}"
def search_database(query: str, limit: int) -> tuple[str, list[dict]]:
if not DB_FILE.exists():
raise FileNotFoundError(f"Databáza neexistuje: {DB_FILE}")
query_tokens = tokenize(query)
mode = detect_search_mode(query_tokens)
conn = sqlite3.connect(DB_FILE)
rows = conn.execute("""
SELECT chunk_id, document_path, title, author, chunk_index, text, text_length
FROM chunks
""").fetchall()
results = []
for row in rows:
chunk_id, document_path, title, author, chunk_index, text, text_length = row
item = {
"chunk_id": chunk_id,
"document_path": document_path,
"title": title,
"author": author,
"chunk_index": chunk_index,
"text": text,
"text_length": text_length,
"tags": get_tags(conn, chunk_id),
"categories": get_categories(conn, chunk_id),
}
if mode == "person" and not person_match(query_tokens, item):
continue
score = score_item(query, query_tokens, item, mode)
if score > 0:
item["score"] = score
item["source_url"] = make_source_url(document_path)
results.append(item)
conn.close()
results.sort(key=lambda item: item["score"], reverse=True)
return mode, results[:limit]
@app.get("/health")
def health():
return {
"status": "ok",
"database_exists": DB_FILE.exists(),
"database_path": str(DB_FILE),
}
@app.post("/search")
def search(request: SearchRequest):
mode, results = search_database(request.query, request.limit)
return {
"query": request.query,
"mode": mode,
"count": len(results),
"results": results,
}

15761
data/chunks.json Normal file

File diff suppressed because it is too large Load Diff

4482
data/documents.json Normal file

File diff suppressed because it is too large Load Diff

23
requirements.txt Normal file
View File

@ -0,0 +1,23 @@
annotated-doc==0.0.4
annotated-types==0.7.0
anyio==4.13.0
click==8.4.1
exceptiongroup==1.3.1
fastapi==0.136.3
gitdb==4.0.12
GitPython==3.1.50
h11==0.16.0
idna==3.18
markdown-it-py==4.2.0
mdurl==0.1.2
pydantic==2.13.4
pydantic_core==2.46.4
Pygments==2.20.0
python-frontmatter==1.3.0
PyYAML==6.0.3
rich==15.0.0
smmap==5.0.3
starlette==1.2.1
typing-inspection==0.4.2
typing_extensions==4.15.0
uvicorn==0.48.0

183
scripts/build_chunks.py Normal file
View File

@ -0,0 +1,183 @@
from pathlib import Path
import json
import re
import frontmatter
from rich import print
ZPWIKI_ROOT = Path("../zpwiki")
PAGES_ROOT = ZPWIKI_ROOT / "pages"
OUTPUT_FILE = Path("data/chunks.json")
MAX_CHARS = 1200
OVERLAP_CHARS = 200
def json_safe(value):
if value is None:
return None
if isinstance(value, (str, int, float, bool)):
return value
if isinstance(value, list):
return [json_safe(item) for item in value]
if isinstance(value, dict):
return {str(key): json_safe(val) for key, val in value.items()}
return str(value)
def normalize_list(value):
if value is None:
return []
if isinstance(value, list):
return [str(item).strip() for item in value if str(item).strip()]
if isinstance(value, str):
return [item.strip() for item in value.split(",") if item.strip()]
return [str(value)]
def clean_markdown(text: str) -> str:
text = text.replace("\r\n", "\n")
text = re.sub(r"\n{3,}", "\n\n", text)
text = text.strip()
return text
def split_by_headings(text: str) -> list[str]:
parts = re.split(r"(?m)(?=^#{1,6}\s+)", text)
return [part.strip() for part in parts if part.strip()]
def split_long_text(text: str, max_chars: int = MAX_CHARS, overlap: int = OVERLAP_CHARS) -> list[str]:
if len(text) <= max_chars:
return [text]
chunks = []
start = 0
while start < len(text):
end = start + max_chars
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
if end >= len(text):
break
start = max(0, end - overlap)
return chunks
def chunk_markdown(text: str) -> list[str]:
text = clean_markdown(text)
if not text:
return []
heading_parts = split_by_headings(text)
chunks = []
for part in heading_parts:
if len(part) <= MAX_CHARS:
chunks.append(part)
else:
chunks.extend(split_long_text(part))
return chunks
def extract_document(file_path: Path) -> dict:
post = frontmatter.load(file_path)
metadata = {
key: json_safe(value)
for key, value in post.metadata.items()
}
taxonomy = metadata.get("taxonomy") or {}
categories = normalize_list(
metadata.get("category")
or taxonomy.get("category")
)
tags = normalize_list(
metadata.get("tag")
or metadata.get("tags")
or taxonomy.get("tag")
or taxonomy.get("tags")
)
author = (
metadata.get("author")
or taxonomy.get("author")
)
relative_path = file_path.relative_to(ZPWIKI_ROOT)
return {
"path": str(relative_path),
"title": metadata.get("title"),
"categories": categories,
"tags": tags,
"published": metadata.get("published"),
"author": author,
"content": post.content.strip(),
"metadata": metadata,
}
def main():
if not PAGES_ROOT.exists():
raise SystemExit(f"Neexistuje priečinok: {PAGES_ROOT}")
markdown_files = sorted(PAGES_ROOT.glob("**/README.md"))
all_chunks = []
document_count = 0
for file_path in markdown_files:
document = extract_document(file_path)
chunks = chunk_markdown(document["content"])
document_count += 1
for index, chunk_text in enumerate(chunks):
all_chunks.append({
"chunk_id": f"{document['path']}::chunk-{index}",
"document_path": document["path"],
"title": document["title"],
"categories": document["categories"],
"tags": document["tags"],
"author": document["author"],
"published": document["published"],
"chunk_index": index,
"text": chunk_text,
"text_length": len(chunk_text),
})
OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True)
with OUTPUT_FILE.open("w", encoding="utf-8") as file:
json.dump(all_chunks, file, ensure_ascii=False, indent=2)
print(f"[green]Dokumentov:[/green] {document_count}")
print(f"[green]Chunkov:[/green] {len(all_chunks)}")
print(f"[green]Výstup uložený do:[/green] {OUTPUT_FILE}")
if all_chunks:
print("\n[bold]Ukážka prvého chunku:[/bold]")
print(all_chunks[0])
if __name__ == "__main__":
main()

View File

@ -0,0 +1,167 @@
from pathlib import Path
import json
import sqlite3
from rich import print
DOCUMENTS_FILE = Path("data/documents.json")
CHUNKS_FILE = Path("data/chunks.json")
DB_FILE = Path("data/zp_index.sqlite")
def create_tables(conn: sqlite3.Connection):
cursor = conn.cursor()
cursor.execute("DROP TABLE IF EXISTS chunk_tags")
cursor.execute("DROP TABLE IF EXISTS chunk_categories")
cursor.execute("DROP TABLE IF EXISTS chunks")
cursor.execute("DROP TABLE IF EXISTS documents")
cursor.execute("""
CREATE TABLE documents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
path TEXT UNIQUE NOT NULL,
title TEXT,
author TEXT,
published INTEGER,
content_length INTEGER,
metadata_json TEXT
)
""")
cursor.execute("""
CREATE TABLE chunks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chunk_id TEXT UNIQUE NOT NULL,
document_path TEXT NOT NULL,
title TEXT,
author TEXT,
chunk_index INTEGER,
text TEXT NOT NULL,
text_length INTEGER,
FOREIGN KEY(document_path) REFERENCES documents(path)
)
""")
cursor.execute("""
CREATE TABLE chunk_tags (
chunk_id TEXT NOT NULL,
tag TEXT NOT NULL
)
""")
cursor.execute("""
CREATE TABLE chunk_categories (
chunk_id TEXT NOT NULL,
category TEXT NOT NULL
)
""")
cursor.execute("CREATE INDEX idx_documents_path ON documents(path)")
cursor.execute("CREATE INDEX idx_chunks_document_path ON chunks(document_path)")
cursor.execute("CREATE INDEX idx_chunks_title ON chunks(title)")
cursor.execute("CREATE INDEX idx_chunk_tags_tag ON chunk_tags(tag)")
cursor.execute("CREATE INDEX idx_chunk_categories_category ON chunk_categories(category)")
conn.commit()
def load_json(path: Path):
if not path.exists():
raise SystemExit(f"Súbor neexistuje: {path}")
with path.open("r", encoding="utf-8") as file:
return json.load(file)
def insert_documents(conn: sqlite3.Connection, documents: list[dict]):
cursor = conn.cursor()
for doc in documents:
cursor.execute("""
INSERT INTO documents (
path, title, author, published, content_length, metadata_json
)
VALUES (?, ?, ?, ?, ?, ?)
""", (
doc.get("path"),
doc.get("title"),
doc.get("author"),
1 if doc.get("published") else 0,
doc.get("content_length"),
json.dumps(doc.get("metadata") or {}, ensure_ascii=False),
))
conn.commit()
def insert_chunks(conn: sqlite3.Connection, chunks: list[dict]):
cursor = conn.cursor()
for chunk in chunks:
cursor.execute("""
INSERT INTO chunks (
chunk_id, document_path, title, author, chunk_index, text, text_length
)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
chunk.get("chunk_id"),
chunk.get("document_path"),
chunk.get("title"),
chunk.get("author"),
chunk.get("chunk_index"),
chunk.get("text"),
chunk.get("text_length"),
))
for tag in chunk.get("tags") or []:
cursor.execute("""
INSERT INTO chunk_tags (chunk_id, tag)
VALUES (?, ?)
""", (
chunk.get("chunk_id"),
tag,
))
for category in chunk.get("categories") or []:
cursor.execute("""
INSERT INTO chunk_categories (chunk_id, category)
VALUES (?, ?)
""", (
chunk.get("chunk_id"),
category,
))
conn.commit()
def main():
documents = load_json(DOCUMENTS_FILE)
chunks = load_json(CHUNKS_FILE)
DB_FILE.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(DB_FILE)
create_tables(conn)
insert_documents(conn, documents)
insert_chunks(conn, chunks)
cursor = conn.cursor()
document_count = cursor.execute("SELECT COUNT(*) FROM documents").fetchone()[0]
chunk_count = cursor.execute("SELECT COUNT(*) FROM chunks").fetchone()[0]
tag_count = cursor.execute("SELECT COUNT(*) FROM chunk_tags").fetchone()[0]
category_count = cursor.execute("SELECT COUNT(*) FROM chunk_categories").fetchone()[0]
conn.close()
print(f"[green]SQLite index vytvorený:[/green] {DB_FILE}")
print(f"Dokumentov: {document_count}")
print(f"Chunkov: {chunk_count}")
print(f"Tag záznamov: {tag_count}")
print(f"Kategória záznamov: {category_count}")
if __name__ == "__main__":
main()

139
scripts/scan_zpwiki.py Normal file
View File

@ -0,0 +1,139 @@
from pathlib import Path
from collections import Counter
import json
import frontmatter
from rich import print
ZPWIKI_ROOT = Path("../zpwiki")
PAGES_ROOT = ZPWIKI_ROOT / "pages"
OUTPUT_FILE = Path("data/documents.json")
def json_safe(value):
if value is None:
return None
if isinstance(value, (str, int, float, bool)):
return value
if isinstance(value, list):
return [json_safe(item) for item in value]
if isinstance(value, dict):
return {str(key): json_safe(val) for key, val in value.items()}
return str(value)
def normalize_list(value):
if value is None:
return []
if isinstance(value, list):
return [str(item).strip() for item in value if str(item).strip()]
if isinstance(value, str):
return [item.strip() for item in value.split(",") if item.strip()]
return [str(value)]
def main():
if not PAGES_ROOT.exists():
raise SystemExit(f"Neexistuje priečinok: {PAGES_ROOT}")
markdown_files = sorted(PAGES_ROOT.glob("**/README.md"))
documents = []
metadata_keys = Counter()
categories_counter = Counter()
tags_counter = Counter()
authors_counter = Counter()
for file_path in markdown_files:
post = frontmatter.load(file_path)
metadata = {
key: json_safe(value)
for key, value in post.metadata.items()
}
taxonomy = metadata.get("taxonomy") or {}
content = post.content.strip()
for key in metadata.keys():
metadata_keys[key] += 1
categories = normalize_list(
metadata.get("category")
or taxonomy.get("category")
)
tags = normalize_list(
metadata.get("tag")
or metadata.get("tags")
or taxonomy.get("tag")
or taxonomy.get("tags")
)
author = (
metadata.get("author")
or taxonomy.get("author")
)
for category in categories:
categories_counter[category] += 1
for tag in tags:
tags_counter[tag] += 1
if author:
authors_counter[str(author)] += 1
relative_path = file_path.relative_to(ZPWIKI_ROOT)
documents.append({
"path": str(relative_path),
"title": metadata.get("title"),
"categories": categories,
"tags": tags,
"published": metadata.get("published"),
"author": author,
"taxonomy": taxonomy,
"metadata": metadata,
"content_preview": content[:500],
"content_length": len(content),
})
OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True)
with OUTPUT_FILE.open("w", encoding="utf-8") as file:
json.dump(documents, file, ensure_ascii=False, indent=2)
print(f"[green]Našiel som README.md súborov:[/green] {len(markdown_files)}")
print(f"[green]Výstup uložený do:[/green] {OUTPUT_FILE}")
print("\n[bold]Najčastejšie metadata kľúče:[/bold]")
for key, count in metadata_keys.most_common(30):
print(f"{key}: {count}")
print("\n[bold]Najčastejšie kategórie:[/bold]")
for key, count in categories_counter.most_common(30):
print(f"{key}: {count}")
print("\n[bold]Najčastejšie tagy:[/bold]")
for key, count in tags_counter.most_common(40):
print(f"{key}: {count}")
print("\n[bold]Najčastejší autori:[/bold]")
for key, count in authors_counter.most_common(20):
print(f"{key}: {count}")
print("\n[bold]Ukážka prvého dokumentu:[/bold]")
if documents:
print(documents[0])
if __name__ == "__main__":
main()

90
scripts/search_chunks.py Normal file
View File

@ -0,0 +1,90 @@
from pathlib import Path
import json
import re
import sys
from collections import Counter
from rich import print
CHUNKS_FILE = Path("data/chunks.json")
def tokenize(text: str) -> list[str]:
text = text.lower()
text = re.sub(r"[^a-záäčďéíĺľňóôŕšťúýž0-9]+", " ", text)
return [word for word in text.split() if len(word) >= 2]
def score_chunk(query_tokens: list[str], chunk: dict) -> int:
text = " ".join([
chunk.get("title") or "",
" ".join(chunk.get("tags") or []),
" ".join(chunk.get("categories") or []),
chunk.get("author") or "",
chunk.get("text") or "",
])
tokens = tokenize(text)
token_counts = Counter(tokens)
score = 0
for query_token in query_tokens:
score += token_counts.get(query_token, 0) * 3
if query_token in [tag.lower() for tag in chunk.get("tags", [])]:
score += 10
if query_token in [category.lower() for category in chunk.get("categories", [])]:
score += 6
title = (chunk.get("title") or "").lower()
if query_token in title:
score += 5
return score
def main():
if len(sys.argv) < 2:
print("[red]Použitie:[/red] python scripts/search_chunks.py \"rag agent\"")
raise SystemExit(1)
query = " ".join(sys.argv[1:])
query_tokens = tokenize(query)
if not CHUNKS_FILE.exists():
raise SystemExit(f"Súbor neexistuje: {CHUNKS_FILE}")
with CHUNKS_FILE.open("r", encoding="utf-8") as file:
chunks = json.load(file)
results = []
for chunk in chunks:
score = score_chunk(query_tokens, chunk)
if score > 0:
results.append((score, chunk))
results.sort(key=lambda item: item[0], reverse=True)
print(f"[bold]Dopyt:[/bold] {query}")
print(f"[bold]Počet výsledkov:[/bold] {len(results)}")
print("\n[bold]Top výsledky:[/bold]\n")
for rank, (score, chunk) in enumerate(results[:10], start=1):
print(f"[cyan]{rank}. Skóre: {score}[/cyan]")
print(f"[bold]Názov:[/bold] {chunk.get('title')}")
print(f"[bold]Cesta:[/bold] {chunk.get('document_path')}")
print(f"[bold]Kategórie:[/bold] {chunk.get('categories')}")
print(f"[bold]Tagy:[/bold] {chunk.get('tags')}")
print(f"[bold]Autor:[/bold] {chunk.get('author')}")
print("[bold]Text:[/bold]")
print((chunk.get("text") or "")[:700])
print("-" * 80)
if __name__ == "__main__":
main()

271
scripts/search_db.py Normal file
View File

@ -0,0 +1,271 @@
from pathlib import Path
import sqlite3
import re
import sys
import unicodedata
from collections import Counter
from rich import print
DB_FILE = Path("data/zp_index.sqlite")
TECHNICAL_TERMS = {
"rag",
"agent",
"graph",
"knowledge",
"chatbot",
"nlp",
"llm",
"lm",
"openwebui",
"docker",
"webhook",
"database",
"db",
"neo4j",
"python",
"search",
"retrieval",
"generation",
"embedding",
"vector",
"vectors",
"langchain",
"graphrag",
"qa",
"question",
"answer",
"cloud",
"api",
}
def normalize_text(text: str) -> str:
text = text.lower()
text = text.replace("_", " ")
text = text.replace("/", " ")
text = text.replace("-", " ")
text = unicodedata.normalize("NFKD", text)
text = "".join(ch for ch in text if not unicodedata.combining(ch))
text = re.sub(r"[^a-z0-9]+", " ", text)
return text.strip()
def tokenize(text: str) -> list[str]:
text = normalize_text(text)
return [word for word in text.split() if len(word) >= 2]
def detect_search_mode(query_tokens: list[str]) -> str:
"""
person režim:
napríklad jan ptak, jan holp, daniel hladek
topic režim:
napríklad rag agent, knowledge graph, nlp chatbot
"""
if not query_tokens:
return "topic"
has_technical_term = any(token in TECHNICAL_TERMS for token in query_tokens)
if len(query_tokens) == 2 and not has_technical_term:
return "person"
return "topic"
def score_tokens(query_tokens: list[str], field_tokens: list[str], weight: int) -> int:
counts = Counter(field_tokens)
score = 0
for token in query_tokens:
score += counts.get(token, 0) * weight
return score
def get_tags(conn: sqlite3.Connection, chunk_id: str) -> list[str]:
rows = conn.execute(
"SELECT tag FROM chunk_tags WHERE chunk_id = ?",
(chunk_id,)
).fetchall()
return [row[0] for row in rows]
def get_categories(conn: sqlite3.Connection, chunk_id: str) -> list[str]:
rows = conn.execute(
"SELECT category FROM chunk_categories WHERE chunk_id = ?",
(chunk_id,)
).fetchall()
return [row[0] for row in rows]
def contains_all_tokens(query_tokens: list[str], field_tokens: list[str]) -> bool:
return all(token in field_tokens for token in query_tokens)
def person_match(query_tokens: list[str], item: dict) -> bool:
title_tokens = tokenize(item.get("title") or "")
path_tokens = tokenize(item.get("document_path") or "")
author_tokens = tokenize(item.get("author") or "")
text_tokens = tokenize(item.get("text") or "")
if contains_all_tokens(query_tokens, title_tokens):
return True
if contains_all_tokens(query_tokens, path_tokens):
return True
if contains_all_tokens(query_tokens, author_tokens):
return True
"""
Text berieme slabšie, ale necháme ho ako fallback.
Napríklad ak meno nie je v title, ale je v obsahu.
"""
if contains_all_tokens(query_tokens, text_tokens):
return True
return False
def score_item(query: str, query_tokens: list[str], item: dict, mode: str) -> int:
title = item.get("title") or ""
path = item.get("document_path") or ""
author = item.get("author") or ""
text = item.get("text") or ""
tags = item.get("tags") or []
categories = item.get("categories") or []
title_tokens = tokenize(title)
path_tokens = tokenize(path)
author_tokens = tokenize(author)
text_tokens = tokenize(text)
tag_tokens = tokenize(" ".join(tags))
category_tokens = tokenize(" ".join(categories))
score = 0
if mode == "person":
score += score_tokens(query_tokens, title_tokens, 30)
score += score_tokens(query_tokens, path_tokens, 30)
score += score_tokens(query_tokens, author_tokens, 15)
score += score_tokens(query_tokens, text_tokens, 2)
if contains_all_tokens(query_tokens, title_tokens):
score += 100
if contains_all_tokens(query_tokens, path_tokens):
score += 100
if contains_all_tokens(query_tokens, author_tokens):
score += 60
return score
score += score_tokens(query_tokens, title_tokens, 12)
score += score_tokens(query_tokens, path_tokens, 12)
score += score_tokens(query_tokens, tag_tokens, 10)
score += score_tokens(query_tokens, category_tokens, 6)
score += score_tokens(query_tokens, author_tokens, 3)
score += score_tokens(query_tokens, text_tokens, 2)
normalized_query = normalize_text(query)
normalized_title = normalize_text(title)
normalized_path = normalize_text(path)
if normalized_query and normalized_query in normalized_title:
score += 30
if normalized_query and normalized_query in normalized_path:
score += 30
matched_title_tokens = sum(1 for token in query_tokens if token in title_tokens)
matched_path_tokens = sum(1 for token in query_tokens if token in path_tokens)
if query_tokens and matched_title_tokens == len(query_tokens):
score += 25
if query_tokens and matched_path_tokens == len(query_tokens):
score += 25
return score
def main():
if len(sys.argv) < 2:
print("[red]Použitie:[/red] python scripts/search_db.py \"rag agent\"")
raise SystemExit(1)
if not DB_FILE.exists():
raise SystemExit(f"Databáza neexistuje: {DB_FILE}")
query = " ".join(sys.argv[1:])
query_tokens = tokenize(query)
mode = detect_search_mode(query_tokens)
conn = sqlite3.connect(DB_FILE)
rows = conn.execute("""
SELECT chunk_id, document_path, title, author, chunk_index, text, text_length
FROM chunks
""").fetchall()
results = []
for row in rows:
chunk_id, document_path, title, author, chunk_index, text, text_length = row
item = {
"chunk_id": chunk_id,
"document_path": document_path,
"title": title,
"author": author,
"chunk_index": chunk_index,
"text": text,
"text_length": text_length,
"tags": get_tags(conn, chunk_id),
"categories": get_categories(conn, chunk_id),
}
if mode == "person" and not person_match(query_tokens, item):
continue
score = score_item(query, query_tokens, item, mode)
if score > 0:
item["score"] = score
results.append(item)
results.sort(key=lambda item: item["score"], reverse=True)
print(f"[bold]Dopyt:[/bold] {query}")
print(f"[bold]Režim:[/bold] {mode}")
print(f"[bold]Počet výsledkov:[/bold] {len(results)}")
print("\n[bold]Top výsledky:[/bold]\n")
for rank, item in enumerate(results[:10], start=1):
print(f"[cyan]{rank}. Skóre: {item['score']}[/cyan]")
print(f"[bold]Názov:[/bold] {item['title']}")
print(f"[bold]Cesta:[/bold] {item['document_path']}")
print(f"[bold]Chunk:[/bold] {item['chunk_index']}")
print(f"[bold]Kategórie:[/bold] {item['categories']}")
print(f"[bold]Tagy:[/bold] {item['tags']}")
print(f"[bold]Autor:[/bold] {item['author']}")
print("[bold]Text:[/bold]")
print((item["text"] or "")[:700])
print("-" * 80)
conn.close()
if __name__ == "__main__":
main()