ZKT26/SK1/backend/main.py
2026-05-12 15:19:22 +02:00

170 lines
6.0 KiB
Python

from fastapi import FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional
import asyncpg
import redis.asyncio as aioredis
import uuid
import hashlib
import os
import json
from datetime import datetime, timedelta
app = FastAPI(title="PasteVault API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
DATABASE_URL = os.getenv("DATABASE_URL")
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379")
SECRET_KEY = os.getenv("SECRET_KEY", "changeme")
db_pool = None
redis_client = None
@app.on_event("startup")
async def startup():
global db_pool, redis_client
db_pool = await asyncpg.create_pool(DATABASE_URL, ssl="require")
redis_client = await aioredis.from_url(REDIS_URL, decode_responses=True)
async with db_pool.acquire() as conn:
await conn.execute("""
CREATE TABLE IF NOT EXISTS pastes (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
title VARCHAR(255) NOT NULL DEFAULT 'Untitled',
content TEXT NOT NULL,
language VARCHAR(50) DEFAULT 'plaintext',
password_hash VARCHAR(255),
expires_at TIMESTAMP,
created_at TIMESTAMP DEFAULT NOW(),
views INTEGER DEFAULT 0
)
""")
@app.on_event("shutdown")
async def shutdown():
await db_pool.close()
await redis_client.aclose()
class PasteCreate(BaseModel):
title: str = "Untitled"
content: str
language: str = "plaintext"
password: Optional[str] = None
expiry: Optional[str] = None
def hash_password(password: str) -> str:
return hashlib.sha256((password + SECRET_KEY).encode()).hexdigest()
def get_expiry(expiry_str: Optional[str]) -> Optional[datetime]:
mapping = {"1h": timedelta(hours=1), "1d": timedelta(days=1),
"7d": timedelta(days=7), "30d": timedelta(days=30)}
if not expiry_str or expiry_str not in mapping:
return None
return datetime.utcnow() + mapping[expiry_str]
@app.get("/api/health")
async def health():
return {"status": "ok", "service": "PasteVault"}
@app.post("/api/pastes")
async def create_paste(paste: PasteCreate):
if not paste.content.strip():
raise HTTPException(status_code=400, detail="Content cannot be empty")
expires_at = get_expiry(paste.expiry)
password_hash = hash_password(paste.password) if paste.password else None
async with db_pool.acquire() as conn:
row = await conn.fetchrow("""
INSERT INTO pastes (title, content, language, password_hash, expires_at)
VALUES ($1, $2, $3, $4, $5)
RETURNING id, title, language, expires_at, created_at
""", paste.title, paste.content, paste.language, password_hash, expires_at)
return {
"id": str(row["id"]),
"title": row["title"],
"language": row["language"],
"expires_at": row["expires_at"].isoformat() if row["expires_at"] else None,
"created_at": row["created_at"].isoformat(),
"has_password": password_hash is not None,
}
@app.get("/api/pastes/{paste_id}")
async def get_paste(paste_id: str, password: Optional[str] = Query(default=None)):
try:
pid = uuid.UUID(paste_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid paste ID")
cached = await redis_client.get(f"paste:{paste_id}")
if cached:
paste = json.loads(cached)
else:
async with db_pool.acquire() as conn:
row = await conn.fetchrow("SELECT * FROM pastes WHERE id = $1", pid)
if not row:
raise HTTPException(status_code=404, detail="Paste not found")
paste = dict(row)
paste["id"] = str(paste["id"])
paste["expires_at"] = paste["expires_at"].isoformat() if paste["expires_at"] else None
paste["created_at"] = paste["created_at"].isoformat()
await redis_client.setex(f"paste:{paste_id}", 30, json.dumps(paste, default=str))
if paste["expires_at"]:
if datetime.fromisoformat(paste["expires_at"]) < datetime.utcnow():
async with db_pool.acquire() as conn:
await conn.execute("DELETE FROM pastes WHERE id = $1", pid)
await redis_client.delete(f"paste:{paste_id}")
raise HTTPException(status_code=404, detail="Paste has expired")
if paste["password_hash"]:
if not password:
return {"id": paste["id"], "title": paste["title"], "requires_password": True}
if hash_password(password) != paste["password_hash"]:
raise HTTPException(status_code=403, detail="Wrong password")
async with db_pool.acquire() as conn:
await conn.execute("UPDATE pastes SET views = views + 1 WHERE id = $1", pid)
await redis_client.delete(f"paste:{paste_id}")
return {
"id": paste["id"],
"title": paste["title"],
"content": paste["content"],
"language": paste["language"],
"expires_at": paste["expires_at"],
"created_at": paste["created_at"],
"views": paste["views"] + 1,
"has_password": paste["password_hash"] is not None,
"requires_password": False,
}
@app.delete("/api/pastes/{paste_id}")
async def delete_paste(paste_id: str, password: Optional[str] = Query(default=None)):
try:
pid = uuid.UUID(paste_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid paste ID")
async with db_pool.acquire() as conn:
row = await conn.fetchrow("SELECT * FROM pastes WHERE id = $1", pid)
if not row:
raise HTTPException(status_code=404, detail="Paste not found")
if row["password_hash"] and (not password or hash_password(password) != row["password_hash"]):
raise HTTPException(status_code=403, detail="Wrong password")
async with db_pool.acquire() as conn:
await conn.execute("DELETE FROM pastes WHERE id = $1", pid)
await redis_client.delete(f"paste:{paste_id}")
return {"message": "Paste deleted successfully"}