from __future__ import annotations import json import sqlite3 import sys from pathlib import Path from rich import print PROJECT_ROOT = Path(__file__).resolve().parents[1] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from scripts.common import CHUNKS_FILE, DB_FILE, DOCUMENTS_FILE, read_json def create_tables(conn: sqlite3.Connection) -> None: cursor = conn.cursor() cursor.executescript( """ PRAGMA foreign_keys = ON; DROP TABLE IF EXISTS chunk_tags; DROP TABLE IF EXISTS chunk_categories; DROP TABLE IF EXISTS chunks; DROP TABLE IF EXISTS documents; CREATE TABLE documents ( id INTEGER PRIMARY KEY AUTOINCREMENT, path TEXT UNIQUE NOT NULL, title TEXT, author TEXT, published INTEGER, content_length INTEGER, metadata_json TEXT ); CREATE TABLE chunks ( id INTEGER PRIMARY KEY AUTOINCREMENT, chunk_id TEXT UNIQUE NOT NULL, document_path TEXT NOT NULL, title TEXT, author TEXT, chunk_index INTEGER, text TEXT NOT NULL, text_length INTEGER, FOREIGN KEY(document_path) REFERENCES documents(path) ); CREATE TABLE chunk_tags ( chunk_id TEXT NOT NULL, tag TEXT NOT NULL, UNIQUE(chunk_id, tag), FOREIGN KEY(chunk_id) REFERENCES chunks(chunk_id) ); CREATE TABLE chunk_categories ( chunk_id TEXT NOT NULL, category TEXT NOT NULL, UNIQUE(chunk_id, category), FOREIGN KEY(chunk_id) REFERENCES chunks(chunk_id) ); CREATE INDEX idx_documents_path ON documents(path); CREATE INDEX idx_chunks_document_path ON chunks(document_path); CREATE INDEX idx_chunks_title ON chunks(title); CREATE INDEX idx_chunk_tags_tag ON chunk_tags(tag); CREATE INDEX idx_chunk_categories_category ON chunk_categories(category); """ ) conn.commit() def insert_documents(conn: sqlite3.Connection, documents: list[dict]) -> None: rows = [ ( doc.get("path"), doc.get("title"), doc.get("author"), 1 if doc.get("published") else 0, doc.get("content_length"), json.dumps(doc.get("metadata") or {}, ensure_ascii=False), ) for doc in documents ] conn.executemany( """ INSERT INTO documents ( path, title, author, published, content_length, metadata_json ) VALUES (?, ?, ?, ?, ?, ?) """, rows, ) conn.commit() def insert_chunks(conn: sqlite3.Connection, chunks: list[dict]) -> None: chunk_rows = [] tag_rows = [] category_rows = [] for chunk in chunks: chunk_id = chunk.get("chunk_id") chunk_rows.append( ( chunk_id, chunk.get("document_path"), chunk.get("title"), chunk.get("author"), chunk.get("chunk_index"), chunk.get("text"), chunk.get("text_length"), ) ) for tag in chunk.get("tags") or []: tag_rows.append((chunk_id, tag)) for category in chunk.get("categories") or []: category_rows.append((chunk_id, category)) conn.executemany( """ INSERT INTO chunks ( chunk_id, document_path, title, author, chunk_index, text, text_length ) VALUES (?, ?, ?, ?, ?, ?, ?) """, chunk_rows, ) conn.executemany( """ INSERT OR IGNORE INTO chunk_tags (chunk_id, tag) VALUES (?, ?) """, tag_rows, ) conn.executemany( """ INSERT OR IGNORE INTO chunk_categories (chunk_id, category) VALUES (?, ?) """, category_rows, ) conn.commit() def get_counts(conn: sqlite3.Connection) -> dict[str, int]: cursor = conn.cursor() return { "documents": cursor.execute("SELECT COUNT(*) FROM documents").fetchone()[0], "chunks": cursor.execute("SELECT COUNT(*) FROM chunks").fetchone()[0], "tags": cursor.execute("SELECT COUNT(*) FROM chunk_tags").fetchone()[0], "categories": cursor.execute("SELECT COUNT(*) FROM chunk_categories").fetchone()[0], } def build_database() -> dict[str, int]: documents = read_json(DOCUMENTS_FILE) chunks = read_json(CHUNKS_FILE) DB_FILE.parent.mkdir(parents=True, exist_ok=True) with sqlite3.connect(DB_FILE) as conn: conn.execute("PRAGMA foreign_keys = ON") create_tables(conn) insert_documents(conn, documents) insert_chunks(conn, chunks) counts = get_counts(conn) print(f"[green]SQLite index vytvorený:[/green] {DB_FILE}") print(f"Dokumentov: {counts['documents']}") print(f"Chunkov: {counts['chunks']}") print(f"Tag záznamov: {counts['tags']}") print(f"Kategória záznamov: {counts['categories']}") return counts def main() -> None: build_database() if __name__ == "__main__": main()