import os import time import psycopg2 from psycopg2.extras import RealDictCursor from flask import Flask, request, jsonify app = Flask(__name__) DB_CONFIG = { "host": os.environ.get("DB_HOST", "db"), "database": os.environ.get("DB_NAME", "taskapp"), "user": os.environ.get("DB_USER", "taskapp"), "password": os.environ.get("DB_PASSWORD", "taskapp123"), } def get_db(): return psycopg2.connect(**DB_CONFIG, cursor_factory=RealDictCursor) def init_db(): """Wait for PostgreSQL and create the tasks table if it doesn't exist.""" for attempt in range(30): try: conn = get_db() cur = conn.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS tasks ( id SERIAL PRIMARY KEY, title VARCHAR(255) NOT NULL, completed BOOLEAN NOT NULL DEFAULT FALSE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) conn.commit() cur.close() conn.close() print("Database initialized.") return except psycopg2.OperationalError: print(f"Waiting for database... (attempt {attempt + 1}/30)") time.sleep(2) raise RuntimeError("Could not connect to the database after 30 attempts.") @app.route("/api/tasks", methods=["GET"]) def get_tasks(): conn = get_db() cur = conn.cursor() cur.execute("SELECT * FROM tasks ORDER BY created_at DESC") tasks = cur.fetchall() cur.close() conn.close() return jsonify(tasks) @app.route("/api/tasks", methods=["POST"]) def create_task(): data = request.get_json() title = data.get("title", "").strip() if not title: return jsonify({"error": "Title is required"}), 400 conn = get_db() cur = conn.cursor() cur.execute( "INSERT INTO tasks (title) VALUES (%s) RETURNING *", (title,), ) task = cur.fetchone() conn.commit() cur.close() conn.close() return jsonify(task), 201 @app.route("/api/tasks/", methods=["PUT"]) def update_task(task_id): conn = get_db() cur = conn.cursor() cur.execute( "UPDATE tasks SET completed = NOT completed WHERE id = %s RETURNING *", (task_id,), ) task = cur.fetchone() conn.commit() cur.close() conn.close() if task is None: return jsonify({"error": "Task not found"}), 404 return jsonify(task) @app.route("/api/tasks/", methods=["DELETE"]) def delete_task(task_id): conn = get_db() cur = conn.cursor() cur.execute("DELETE FROM tasks WHERE id = %s RETURNING id", (task_id,)) deleted = cur.fetchone() conn.commit() cur.close() conn.close() if deleted is None: return jsonify({"error": "Task not found"}), 404 return jsonify({"result": "ok"}) if __name__ == "__main__": init_db() app.run(host="0.0.0.0", port=5000)