""" app/services/instruction_service.py Business logic for managing the instruction queue. All write operations that affect queue integrity use the write lock via get_write_conn(). """ from __future__ import annotations import asyncio import logging import sqlite3 import uuid from datetime import datetime, timezone from typing import Optional from app.database import get_conn, get_write_conn from app.models import InstructionItem, InstructionStatus logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Wakeup event – lets get_user_request react instantly when a new instruction # is enqueued instead of sleeping for a full second. # --------------------------------------------------------------------------- _wakeup_event: asyncio.Event | None = None _event_loop: asyncio.AbstractEventLoop | None = None async def init_wakeup() -> None: """Create the wakeup event. Must be called from async context (lifespan).""" global _wakeup_event, _event_loop _wakeup_event = asyncio.Event() _event_loop = asyncio.get_running_loop() logger.debug("Instruction wakeup event initialised") def get_wakeup_event() -> asyncio.Event | None: return _wakeup_event def _trigger_wakeup() -> None: """Thread-safe: schedule event.set() on the running event loop.""" if _wakeup_event is None or _event_loop is None: return _event_loop.call_soon_threadsafe(_wakeup_event.set) def _now_iso() -> str: return datetime.now(timezone.utc).isoformat() def _row_to_item(row: sqlite3.Row) -> InstructionItem: return InstructionItem( id=row["id"], content=row["content"], status=InstructionStatus(row["status"]), created_at=row["created_at"], updated_at=row["updated_at"], consumed_at=row["consumed_at"], consumed_by_agent_id=row["consumed_by_agent_id"], position=row["position"], ) # --------------------------------------------------------------------------- # Reads # --------------------------------------------------------------------------- def list_instructions(status_filter: Optional[str] = None) -> list[InstructionItem]: query = "SELECT * FROM instructions" params: tuple = () if status_filter and status_filter != "all": query += " WHERE status = ?" params = (status_filter,) query += " ORDER BY position ASC" with get_conn() as conn: rows = conn.execute(query, params).fetchall() return [_row_to_item(r) for r in rows] def get_instruction(instruction_id: str) -> Optional[InstructionItem]: with get_conn() as conn: row = conn.execute( "SELECT * FROM instructions WHERE id = ?", (instruction_id,) ).fetchone() return _row_to_item(row) if row else None def get_queue_counts() -> dict[str, int]: with get_conn() as conn: pending = conn.execute( "SELECT COUNT(*) FROM instructions WHERE status = 'pending'" ).fetchone()[0] consumed = conn.execute( "SELECT COUNT(*) FROM instructions WHERE status = 'consumed'" ).fetchone()[0] return {"pending_count": pending, "consumed_count": consumed} def _next_position(conn: sqlite3.Connection) -> int: row = conn.execute("SELECT MAX(position) FROM instructions").fetchone() return (row[0] or 0) + 1 # --------------------------------------------------------------------------- # Writes # --------------------------------------------------------------------------- def create_instruction(content: str) -> InstructionItem: instruction_id = str(uuid.uuid4()) now = _now_iso() with get_write_conn() as conn: pos = _next_position(conn) conn.execute( """ INSERT INTO instructions (id, content, status, created_at, updated_at, position) VALUES (?, ?, 'pending', ?, ?, ?) """, (instruction_id, content, now, now, pos), ) logger.info("Instruction created id=%s pos=%d", instruction_id, pos) item = get_instruction(instruction_id) assert item is not None _trigger_wakeup() # wake up any waiting get_user_request calls immediately return item def update_instruction(instruction_id: str, content: str) -> InstructionItem: """Update content of a pending instruction. Raises ValueError if consumed or not found.""" with get_write_conn() as conn: row = conn.execute( "SELECT status FROM instructions WHERE id = ?", (instruction_id,) ).fetchone() if row is None: raise KeyError(instruction_id) if row["status"] != "pending": raise PermissionError(f"Instruction {instruction_id} is already consumed") now = _now_iso() conn.execute( "UPDATE instructions SET content = ?, updated_at = ? WHERE id = ?", (content, now, instruction_id), ) logger.info("Instruction updated id=%s", instruction_id) item = get_instruction(instruction_id) assert item is not None return item def delete_instruction(instruction_id: str) -> None: """Delete a pending instruction. Raises ValueError if consumed or not found.""" with get_write_conn() as conn: row = conn.execute( "SELECT status FROM instructions WHERE id = ?", (instruction_id,) ).fetchone() if row is None: raise KeyError(instruction_id) if row["status"] != "pending": raise PermissionError(f"Instruction {instruction_id} is already consumed") conn.execute("DELETE FROM instructions WHERE id = ?", (instruction_id,)) logger.info("Instruction deleted id=%s", instruction_id) def clear_consumed() -> int: """Delete all consumed instructions. Returns the number of rows deleted.""" with get_write_conn() as conn: row = conn.execute( "SELECT COUNT(*) FROM instructions WHERE status = 'consumed'" ).fetchone() count = row[0] if row else 0 conn.execute("DELETE FROM instructions WHERE status = 'consumed'") logger.info("Consumed instructions cleared count=%d", count) return count def consume_next(agent_id: str = "unknown") -> Optional[InstructionItem]: """ Atomically claim and return the next pending instruction. Uses the write lock to prevent two concurrent callers from consuming the same item. Returns None if the queue is empty. """ with get_write_conn() as conn: row = conn.execute( """ SELECT id FROM instructions WHERE status = 'pending' ORDER BY position ASC LIMIT 1 """ ).fetchone() if row is None: return None instruction_id = row["id"] now = _now_iso() conn.execute( """ UPDATE instructions SET status = 'consumed', consumed_at = ?, consumed_by_agent_id = ?, updated_at = ? WHERE id = ? """, (now, agent_id, now, instruction_id), ) logger.info("Instruction consumed id=%s agent=%s", instruction_id, agent_id) return get_instruction(instruction_id)