""" app/database.py SQLite database initialisation, schema management, and low-level connection helpers. """ from __future__ import annotations import logging import os import sqlite3 import threading from contextlib import contextmanager from typing import Generator from app.config import DEFAULT_EMPTY_RESPONSE logger = logging.getLogger(__name__) # Module-level lock for write serialisation (consume atomicity, settings writes, etc.) _write_lock = threading.Lock() _db_path: str = "" def init_db(db_path: str) -> None: """Initialise the database: create directory, schema, and seed defaults.""" global _db_path _db_path = db_path os.makedirs(os.path.dirname(db_path) if os.path.dirname(db_path) else ".", exist_ok=True) with _connect() as conn: _create_schema(conn) _seed_defaults(conn) logger.info("Database initialised at %s", db_path) # --------------------------------------------------------------------------- # Connection helpers # --------------------------------------------------------------------------- @contextmanager def get_conn() -> Generator[sqlite3.Connection, None, None]: """Yield a short-lived read-write connection. Commits on success, rolls back on error.""" conn = _connect() try: yield conn conn.commit() except Exception: conn.rollback() raise finally: conn.close() @contextmanager def get_write_conn() -> Generator[sqlite3.Connection, None, None]: """Yield a connection protected by the module write lock (for atomic operations).""" with _write_lock: with get_conn() as conn: yield conn def _connect() -> sqlite3.Connection: conn = sqlite3.connect(_db_path, check_same_thread=False) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") return conn # --------------------------------------------------------------------------- # Schema # --------------------------------------------------------------------------- _SCHEMA = """ CREATE TABLE IF NOT EXISTS instructions ( id TEXT PRIMARY KEY, content TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'pending', created_at TEXT NOT NULL, updated_at TEXT NOT NULL, consumed_at TEXT, consumed_by_agent_id TEXT, position INTEGER NOT NULL ); CREATE INDEX IF NOT EXISTS idx_instructions_status_position ON instructions (status, position); CREATE TABLE IF NOT EXISTS settings ( key TEXT PRIMARY KEY, value TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS agent_activity ( agent_id TEXT PRIMARY KEY, last_seen_at TEXT NOT NULL, last_fetch_at TEXT NOT NULL, last_result_type TEXT NOT NULL ); """ _DEFAULT_SETTINGS = { "default_wait_seconds": "10", "default_empty_response": DEFAULT_EMPTY_RESPONSE, "agent_stale_after_seconds": "30", } def _create_schema(conn: sqlite3.Connection) -> None: conn.executescript(_SCHEMA) conn.commit() logger.debug("Schema ensured") def _seed_defaults(conn: sqlite3.Connection) -> None: for key, value in _DEFAULT_SETTINGS.items(): conn.execute( "INSERT OR IGNORE INTO settings (key, value) VALUES (?, ?)", (key, value), ) conn.execute( "UPDATE settings SET value = ? WHERE key = 'default_empty_response' AND value = ''", (_DEFAULT_SETTINGS["default_empty_response"],), ) conn.commit() logger.debug("Default settings seeded")