- Backend: instruction_service.clear_consumed() bulk-deletes consumed rows - Backend: DELETE /api/instructions/consumed route (preserves pending) - Frontend: Clear button in consumed panel header (hidden when empty) - Frontend: SSE handler for history.cleared event - instant UI update - Frontend: api.clearConsumed() fetch wrapper
217 lines
7.1 KiB
Python
217 lines
7.1 KiB
Python
"""
|
||
app/services/instruction_service.py
|
||
Business logic for managing the instruction queue.
|
||
All write operations that affect queue integrity use the write lock via get_write_conn().
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import logging
|
||
import sqlite3
|
||
import uuid
|
||
from datetime import datetime, timezone
|
||
from typing import Optional
|
||
|
||
from app.database import get_conn, get_write_conn
|
||
from app.models import InstructionItem, InstructionStatus
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Wakeup event – lets get_user_request react instantly when a new instruction
|
||
# is enqueued instead of sleeping for a full second.
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_wakeup_event: asyncio.Event | None = None
|
||
_event_loop: asyncio.AbstractEventLoop | None = None
|
||
|
||
|
||
async def init_wakeup() -> None:
|
||
"""Create the wakeup event. Must be called from async context (lifespan)."""
|
||
global _wakeup_event, _event_loop
|
||
_wakeup_event = asyncio.Event()
|
||
_event_loop = asyncio.get_running_loop()
|
||
logger.debug("Instruction wakeup event initialised")
|
||
|
||
|
||
def get_wakeup_event() -> asyncio.Event | None:
|
||
return _wakeup_event
|
||
|
||
|
||
def _trigger_wakeup() -> None:
|
||
"""Thread-safe: schedule event.set() on the running event loop."""
|
||
if _wakeup_event is None or _event_loop is None:
|
||
return
|
||
_event_loop.call_soon_threadsafe(_wakeup_event.set)
|
||
|
||
|
||
def _now_iso() -> str:
|
||
return datetime.now(timezone.utc).isoformat()
|
||
|
||
|
||
def _row_to_item(row: sqlite3.Row) -> InstructionItem:
|
||
return InstructionItem(
|
||
id=row["id"],
|
||
content=row["content"],
|
||
status=InstructionStatus(row["status"]),
|
||
created_at=row["created_at"],
|
||
updated_at=row["updated_at"],
|
||
consumed_at=row["consumed_at"],
|
||
consumed_by_agent_id=row["consumed_by_agent_id"],
|
||
position=row["position"],
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Reads
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def list_instructions(status_filter: Optional[str] = None) -> list[InstructionItem]:
|
||
query = "SELECT * FROM instructions"
|
||
params: tuple = ()
|
||
if status_filter and status_filter != "all":
|
||
query += " WHERE status = ?"
|
||
params = (status_filter,)
|
||
query += " ORDER BY position ASC"
|
||
|
||
with get_conn() as conn:
|
||
rows = conn.execute(query, params).fetchall()
|
||
return [_row_to_item(r) for r in rows]
|
||
|
||
|
||
def get_instruction(instruction_id: str) -> Optional[InstructionItem]:
|
||
with get_conn() as conn:
|
||
row = conn.execute(
|
||
"SELECT * FROM instructions WHERE id = ?", (instruction_id,)
|
||
).fetchone()
|
||
return _row_to_item(row) if row else None
|
||
|
||
|
||
def get_queue_counts() -> dict[str, int]:
|
||
with get_conn() as conn:
|
||
pending = conn.execute(
|
||
"SELECT COUNT(*) FROM instructions WHERE status = 'pending'"
|
||
).fetchone()[0]
|
||
consumed = conn.execute(
|
||
"SELECT COUNT(*) FROM instructions WHERE status = 'consumed'"
|
||
).fetchone()[0]
|
||
return {"pending_count": pending, "consumed_count": consumed}
|
||
|
||
|
||
def _next_position(conn: sqlite3.Connection) -> int:
|
||
row = conn.execute("SELECT MAX(position) FROM instructions").fetchone()
|
||
return (row[0] or 0) + 1
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Writes
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def create_instruction(content: str) -> InstructionItem:
|
||
instruction_id = str(uuid.uuid4())
|
||
now = _now_iso()
|
||
|
||
with get_write_conn() as conn:
|
||
pos = _next_position(conn)
|
||
conn.execute(
|
||
"""
|
||
INSERT INTO instructions (id, content, status, created_at, updated_at, position)
|
||
VALUES (?, ?, 'pending', ?, ?, ?)
|
||
""",
|
||
(instruction_id, content, now, now, pos),
|
||
)
|
||
|
||
logger.info("Instruction created id=%s pos=%d", instruction_id, pos)
|
||
item = get_instruction(instruction_id)
|
||
assert item is not None
|
||
_trigger_wakeup() # wake up any waiting get_user_request calls immediately
|
||
return item
|
||
|
||
|
||
def update_instruction(instruction_id: str, content: str) -> InstructionItem:
|
||
"""Update content of a pending instruction. Raises ValueError if consumed or not found."""
|
||
with get_write_conn() as conn:
|
||
row = conn.execute(
|
||
"SELECT status FROM instructions WHERE id = ?", (instruction_id,)
|
||
).fetchone()
|
||
if row is None:
|
||
raise KeyError(instruction_id)
|
||
if row["status"] != "pending":
|
||
raise PermissionError(f"Instruction {instruction_id} is already consumed")
|
||
|
||
now = _now_iso()
|
||
conn.execute(
|
||
"UPDATE instructions SET content = ?, updated_at = ? WHERE id = ?",
|
||
(content, now, instruction_id),
|
||
)
|
||
|
||
logger.info("Instruction updated id=%s", instruction_id)
|
||
item = get_instruction(instruction_id)
|
||
assert item is not None
|
||
return item
|
||
|
||
|
||
def delete_instruction(instruction_id: str) -> None:
|
||
"""Delete a pending instruction. Raises ValueError if consumed or not found."""
|
||
with get_write_conn() as conn:
|
||
row = conn.execute(
|
||
"SELECT status FROM instructions WHERE id = ?", (instruction_id,)
|
||
).fetchone()
|
||
if row is None:
|
||
raise KeyError(instruction_id)
|
||
if row["status"] != "pending":
|
||
raise PermissionError(f"Instruction {instruction_id} is already consumed")
|
||
|
||
conn.execute("DELETE FROM instructions WHERE id = ?", (instruction_id,))
|
||
|
||
logger.info("Instruction deleted id=%s", instruction_id)
|
||
|
||
|
||
def clear_consumed() -> int:
|
||
"""Delete all consumed instructions. Returns the number of rows deleted."""
|
||
with get_write_conn() as conn:
|
||
row = conn.execute(
|
||
"SELECT COUNT(*) FROM instructions WHERE status = 'consumed'"
|
||
).fetchone()
|
||
count = row[0] if row else 0
|
||
conn.execute("DELETE FROM instructions WHERE status = 'consumed'")
|
||
|
||
logger.info("Consumed instructions cleared count=%d", count)
|
||
return count
|
||
|
||
|
||
def consume_next(agent_id: str = "unknown") -> Optional[InstructionItem]:
|
||
"""
|
||
Atomically claim and return the next pending instruction.
|
||
Uses the write lock to prevent two concurrent callers from consuming the same item.
|
||
Returns None if the queue is empty.
|
||
"""
|
||
with get_write_conn() as conn:
|
||
row = conn.execute(
|
||
"""
|
||
SELECT id FROM instructions
|
||
WHERE status = 'pending'
|
||
ORDER BY position ASC
|
||
LIMIT 1
|
||
"""
|
||
).fetchone()
|
||
|
||
if row is None:
|
||
return None
|
||
|
||
instruction_id = row["id"]
|
||
now = _now_iso()
|
||
conn.execute(
|
||
"""
|
||
UPDATE instructions
|
||
SET status = 'consumed', consumed_at = ?, consumed_by_agent_id = ?, updated_at = ?
|
||
WHERE id = ?
|
||
""",
|
||
(now, agent_id, now, instruction_id),
|
||
)
|
||
|
||
logger.info("Instruction consumed id=%s agent=%s", instruction_id, agent_id)
|
||
return get_instruction(instruction_id)
|
||
|