from fastapi import FastAPI, HTTPException, Request, Header from fastapi.responses import RedirectResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from datetime import datetime import psycopg2 import psycopg2.pool import os import string import random import logging logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logger = logging.getLogger(__name__) app = FastAPI(title="ShortLink API", version="1.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) DATABASE_URL = os.environ["DATABASE_URL"] BASE_URL = os.environ.get("BASE_URL", "http://localhost") ADMIN_TOKEN = os.environ.get("ADMIN_TOKEN", "") _pool = None def get_pool(): global _pool if _pool is None: _pool = psycopg2.pool.SimpleConnectionPool(1, 10, DATABASE_URL) return _pool def get_client_ip(request: Request) -> str: forwarded = request.headers.get("X-Forwarded-For") return forwarded.split(",")[0].strip() if forwarded else request.client.host def generate_code(length: int = 6) -> str: chars = string.ascii_letters + string.digits return "".join(random.choices(chars, k=length)) # ── Models ──────────────────────────────────────────────────────────────────── class URLCreate(BaseModel): url: str custom_code: str | None = None title: str | None = None # ── Routes ──────────────────────────────────────────────────────────────────── @app.get("/health") def health(): return {"status": "ok", "timestamp": datetime.utcnow().isoformat()} @app.post("/api/shorten") def shorten_url(data: URLCreate, request: Request): url = data.url.strip() if not url.startswith(("http://", "https://")): url = "https://" + url code = (data.custom_code or "").strip() or generate_code() if not code.replace("-", "").replace("_", "").isalnum(): raise HTTPException(status_code=400, detail="Code may only contain letters, digits, hyphens and underscores") pool = get_pool() conn = pool.getconn() try: cur = conn.cursor() cur.execute("SELECT id FROM links WHERE code = %s", (code,)) if cur.fetchone(): raise HTTPException(status_code=409, detail="That short code is already in use") ip = get_client_ip(request) cur.execute( "INSERT INTO links (code, original_url, title, created_at, created_by_ip) VALUES (%s, %s, %s, %s, %s)", (code, url, data.title, datetime.utcnow(), ip), ) conn.commit() cur.close() logger.info("Created link %s -> %s", code, url) return {"short_url": f"{BASE_URL}/s/{code}", "code": code, "original_url": url} except HTTPException: raise except Exception as exc: conn.rollback() logger.error("Error creating link: %s", exc) raise HTTPException(status_code=500, detail="Internal server error") finally: pool.putconn(conn) @app.get("/api/stats") def get_stats(): pool = get_pool() conn = pool.getconn() try: cur = conn.cursor() cur.execute(""" SELECT l.code, l.original_url, l.title, l.created_at, COUNT(v.id) AS visit_count, MAX(v.visited_at) AS last_visit FROM links l LEFT JOIN visits v ON l.id = v.link_id GROUP BY l.id ORDER BY visit_count DESC, l.created_at DESC LIMIT 50 """) rows = cur.fetchall() cur.execute("SELECT COUNT(*) FROM links") total_links = cur.fetchone()[0] cur.execute("SELECT COUNT(*) FROM visits") total_visits = cur.fetchone()[0] cur.close() return { "total_links": total_links, "total_visits": total_visits, "links": [ { "code": r[0], "original_url": r[1], "title": r[2], "created_at": r[3].isoformat(), "visit_count": r[4], "last_visit": r[5].isoformat() if r[5] else None, "short_url": f"{BASE_URL}/s/{r[0]}", } for r in rows ], } finally: pool.putconn(conn) @app.delete("/api/links/{code}") def delete_link(code: str, authorization: str = Header(None)): if not authorization or authorization != f"Bearer {ADMIN_TOKEN}": raise HTTPException(status_code=401, detail="Unauthorized") pool = get_pool() conn = pool.getconn() try: cur = conn.cursor() cur.execute("DELETE FROM links WHERE code = %s RETURNING id", (code,)) if not cur.fetchone(): raise HTTPException(status_code=404, detail="Link not found") conn.commit() cur.close() logger.info("Deleted link %s", code) return {"message": f"Link '{code}' deleted"} finally: pool.putconn(conn) @app.get("/s/{code}") def redirect_link(code: str, request: Request): pool = get_pool() conn = pool.getconn() try: cur = conn.cursor() cur.execute("SELECT id, original_url FROM links WHERE code = %s", (code,)) row = cur.fetchone() if not row: raise HTTPException(status_code=404, detail="Short link not found") link_id, original_url = row cur.execute( "INSERT INTO visits (link_id, visited_at, ip_address, user_agent, referer) VALUES (%s, %s, %s, %s, %s)", (link_id, datetime.utcnow(), get_client_ip(request), request.headers.get("User-Agent", "")[:500], request.headers.get("Referer", "")[:500]), ) conn.commit() cur.close() return RedirectResponse(url=original_url, status_code=302) finally: pool.putconn(conn)