191 lines
6.1 KiB
Python
191 lines
6.1 KiB
Python
from fastapi import FastAPI, HTTPException, Request, Header
|
|
from fastapi.responses import RedirectResponse
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from pydantic import BaseModel
|
|
from datetime import datetime
|
|
import psycopg2
|
|
import psycopg2.pool
|
|
import os
|
|
import string
|
|
import random
|
|
import logging
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
|
logger = logging.getLogger(__name__)
|
|
|
|
app = FastAPI(title="ShortLink API", version="1.0.0")
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
DATABASE_URL = os.environ["DATABASE_URL"]
|
|
BASE_URL = os.environ.get("BASE_URL", "http://localhost")
|
|
ADMIN_TOKEN = os.environ.get("ADMIN_TOKEN", "")
|
|
|
|
_pool = None
|
|
|
|
|
|
def get_pool():
|
|
global _pool
|
|
if _pool is None:
|
|
_pool = psycopg2.pool.SimpleConnectionPool(1, 10, DATABASE_URL)
|
|
return _pool
|
|
|
|
|
|
def get_client_ip(request: Request) -> str:
|
|
forwarded = request.headers.get("X-Forwarded-For")
|
|
return forwarded.split(",")[0].strip() if forwarded else request.client.host
|
|
|
|
|
|
def generate_code(length: int = 6) -> str:
|
|
chars = string.ascii_letters + string.digits
|
|
return "".join(random.choices(chars, k=length))
|
|
|
|
|
|
# ── Models ────────────────────────────────────────────────────────────────────
|
|
|
|
class URLCreate(BaseModel):
|
|
url: str
|
|
custom_code: str | None = None
|
|
title: str | None = None
|
|
|
|
|
|
# ── Routes ────────────────────────────────────────────────────────────────────
|
|
|
|
@app.get("/health")
|
|
def health():
|
|
return {"status": "ok", "timestamp": datetime.utcnow().isoformat()}
|
|
|
|
|
|
@app.post("/api/shorten")
|
|
def shorten_url(data: URLCreate, request: Request):
|
|
url = data.url.strip()
|
|
if not url.startswith(("http://", "https://")):
|
|
url = "https://" + url
|
|
|
|
code = (data.custom_code or "").strip() or generate_code()
|
|
|
|
if not code.replace("-", "").replace("_", "").isalnum():
|
|
raise HTTPException(status_code=400, detail="Code may only contain letters, digits, hyphens and underscores")
|
|
|
|
pool = get_pool()
|
|
conn = pool.getconn()
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT id FROM links WHERE code = %s", (code,))
|
|
if cur.fetchone():
|
|
raise HTTPException(status_code=409, detail="That short code is already in use")
|
|
|
|
ip = get_client_ip(request)
|
|
cur.execute(
|
|
"INSERT INTO links (code, original_url, title, created_at, created_by_ip) VALUES (%s, %s, %s, %s, %s)",
|
|
(code, url, data.title, datetime.utcnow(), ip),
|
|
)
|
|
conn.commit()
|
|
cur.close()
|
|
logger.info("Created link %s -> %s", code, url)
|
|
return {"short_url": f"{BASE_URL}/s/{code}", "code": code, "original_url": url}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as exc:
|
|
conn.rollback()
|
|
logger.error("Error creating link: %s", exc)
|
|
raise HTTPException(status_code=500, detail="Internal server error")
|
|
finally:
|
|
pool.putconn(conn)
|
|
|
|
|
|
@app.get("/api/stats")
|
|
def get_stats():
|
|
pool = get_pool()
|
|
conn = pool.getconn()
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
SELECT l.code, l.original_url, l.title, l.created_at,
|
|
COUNT(v.id) AS visit_count,
|
|
MAX(v.visited_at) AS last_visit
|
|
FROM links l
|
|
LEFT JOIN visits v ON l.id = v.link_id
|
|
GROUP BY l.id
|
|
ORDER BY visit_count DESC, l.created_at DESC
|
|
LIMIT 50
|
|
""")
|
|
rows = cur.fetchall()
|
|
|
|
cur.execute("SELECT COUNT(*) FROM links")
|
|
total_links = cur.fetchone()[0]
|
|
|
|
cur.execute("SELECT COUNT(*) FROM visits")
|
|
total_visits = cur.fetchone()[0]
|
|
cur.close()
|
|
|
|
return {
|
|
"total_links": total_links,
|
|
"total_visits": total_visits,
|
|
"links": [
|
|
{
|
|
"code": r[0],
|
|
"original_url": r[1],
|
|
"title": r[2],
|
|
"created_at": r[3].isoformat(),
|
|
"visit_count": r[4],
|
|
"last_visit": r[5].isoformat() if r[5] else None,
|
|
"short_url": f"{BASE_URL}/s/{r[0]}",
|
|
}
|
|
for r in rows
|
|
],
|
|
}
|
|
finally:
|
|
pool.putconn(conn)
|
|
|
|
|
|
@app.delete("/api/links/{code}")
|
|
def delete_link(code: str, authorization: str = Header(None)):
|
|
if not authorization or authorization != f"Bearer {ADMIN_TOKEN}":
|
|
raise HTTPException(status_code=401, detail="Unauthorized")
|
|
|
|
pool = get_pool()
|
|
conn = pool.getconn()
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute("DELETE FROM links WHERE code = %s RETURNING id", (code,))
|
|
if not cur.fetchone():
|
|
raise HTTPException(status_code=404, detail="Link not found")
|
|
conn.commit()
|
|
cur.close()
|
|
logger.info("Deleted link %s", code)
|
|
return {"message": f"Link '{code}' deleted"}
|
|
finally:
|
|
pool.putconn(conn)
|
|
|
|
|
|
@app.get("/s/{code}")
|
|
def redirect_link(code: str, request: Request):
|
|
pool = get_pool()
|
|
conn = pool.getconn()
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT id, original_url FROM links WHERE code = %s", (code,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="Short link not found")
|
|
|
|
link_id, original_url = row
|
|
cur.execute(
|
|
"INSERT INTO visits (link_id, visited_at, ip_address, user_agent, referer) VALUES (%s, %s, %s, %s, %s)",
|
|
(link_id, datetime.utcnow(),
|
|
get_client_ip(request),
|
|
request.headers.get("User-Agent", "")[:500],
|
|
request.headers.get("Referer", "")[:500]),
|
|
)
|
|
conn.commit()
|
|
cur.close()
|
|
return RedirectResponse(url=original_url, status_code=302)
|
|
finally:
|
|
pool.putconn(conn)
|