zkt26/sk1/main.py

191 lines
6.1 KiB
Python

from fastapi import FastAPI, HTTPException, Request, Header
from fastapi.responses import RedirectResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from datetime import datetime
import psycopg2
import psycopg2.pool
import os
import string
import random
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
app = FastAPI(title="ShortLink API", version="1.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
DATABASE_URL = os.environ["DATABASE_URL"]
BASE_URL = os.environ.get("BASE_URL", "http://localhost")
ADMIN_TOKEN = os.environ.get("ADMIN_TOKEN", "")
_pool = None
def get_pool():
global _pool
if _pool is None:
_pool = psycopg2.pool.SimpleConnectionPool(1, 10, DATABASE_URL)
return _pool
def get_client_ip(request: Request) -> str:
forwarded = request.headers.get("X-Forwarded-For")
return forwarded.split(",")[0].strip() if forwarded else request.client.host
def generate_code(length: int = 6) -> str:
chars = string.ascii_letters + string.digits
return "".join(random.choices(chars, k=length))
# ── Models ────────────────────────────────────────────────────────────────────
class URLCreate(BaseModel):
url: str
custom_code: str | None = None
title: str | None = None
# ── Routes ────────────────────────────────────────────────────────────────────
@app.get("/health")
def health():
return {"status": "ok", "timestamp": datetime.utcnow().isoformat()}
@app.post("/api/shorten")
def shorten_url(data: URLCreate, request: Request):
url = data.url.strip()
if not url.startswith(("http://", "https://")):
url = "https://" + url
code = (data.custom_code or "").strip() or generate_code()
if not code.replace("-", "").replace("_", "").isalnum():
raise HTTPException(status_code=400, detail="Code may only contain letters, digits, hyphens and underscores")
pool = get_pool()
conn = pool.getconn()
try:
cur = conn.cursor()
cur.execute("SELECT id FROM links WHERE code = %s", (code,))
if cur.fetchone():
raise HTTPException(status_code=409, detail="That short code is already in use")
ip = get_client_ip(request)
cur.execute(
"INSERT INTO links (code, original_url, title, created_at, created_by_ip) VALUES (%s, %s, %s, %s, %s)",
(code, url, data.title, datetime.utcnow(), ip),
)
conn.commit()
cur.close()
logger.info("Created link %s -> %s", code, url)
return {"short_url": f"{BASE_URL}/s/{code}", "code": code, "original_url": url}
except HTTPException:
raise
except Exception as exc:
conn.rollback()
logger.error("Error creating link: %s", exc)
raise HTTPException(status_code=500, detail="Internal server error")
finally:
pool.putconn(conn)
@app.get("/api/stats")
def get_stats():
pool = get_pool()
conn = pool.getconn()
try:
cur = conn.cursor()
cur.execute("""
SELECT l.code, l.original_url, l.title, l.created_at,
COUNT(v.id) AS visit_count,
MAX(v.visited_at) AS last_visit
FROM links l
LEFT JOIN visits v ON l.id = v.link_id
GROUP BY l.id
ORDER BY visit_count DESC, l.created_at DESC
LIMIT 50
""")
rows = cur.fetchall()
cur.execute("SELECT COUNT(*) FROM links")
total_links = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM visits")
total_visits = cur.fetchone()[0]
cur.close()
return {
"total_links": total_links,
"total_visits": total_visits,
"links": [
{
"code": r[0],
"original_url": r[1],
"title": r[2],
"created_at": r[3].isoformat(),
"visit_count": r[4],
"last_visit": r[5].isoformat() if r[5] else None,
"short_url": f"{BASE_URL}/s/{r[0]}",
}
for r in rows
],
}
finally:
pool.putconn(conn)
@app.delete("/api/links/{code}")
def delete_link(code: str, authorization: str = Header(None)):
if not authorization or authorization != f"Bearer {ADMIN_TOKEN}":
raise HTTPException(status_code=401, detail="Unauthorized")
pool = get_pool()
conn = pool.getconn()
try:
cur = conn.cursor()
cur.execute("DELETE FROM links WHERE code = %s RETURNING id", (code,))
if not cur.fetchone():
raise HTTPException(status_code=404, detail="Link not found")
conn.commit()
cur.close()
logger.info("Deleted link %s", code)
return {"message": f"Link '{code}' deleted"}
finally:
pool.putconn(conn)
@app.get("/s/{code}")
def redirect_link(code: str, request: Request):
pool = get_pool()
conn = pool.getconn()
try:
cur = conn.cursor()
cur.execute("SELECT id, original_url FROM links WHERE code = %s", (code,))
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Short link not found")
link_id, original_url = row
cur.execute(
"INSERT INTO visits (link_id, visited_at, ip_address, user_agent, referer) VALUES (%s, %s, %s, %s, %s)",
(link_id, datetime.utcnow(),
get_client_ip(request),
request.headers.get("User-Agent", "")[:500],
request.headers.get("Referer", "")[:500]),
)
conn.commit()
cur.close()
return RedirectResponse(url=original_url, status_code=302)
finally:
pool.putconn(conn)