Files
local-mcp/app/services/instruction_service.py
cabbage 256a445e2f feat: add Clear History button to delete all consumed instructions
- Backend: instruction_service.clear_consumed() bulk-deletes consumed rows
- Backend: DELETE /api/instructions/consumed route (preserves pending)
- Frontend: Clear button in consumed panel header (hidden when empty)
- Frontend: SSE handler for history.cleared event - instant UI update
- Frontend: api.clearConsumed() fetch wrapper
2026-03-27 04:16:24 +08:00

217 lines
7.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
app/services/instruction_service.py
Business logic for managing the instruction queue.
All write operations that affect queue integrity use the write lock via get_write_conn().
"""
from __future__ import annotations
import asyncio
import logging
import sqlite3
import uuid
from datetime import datetime, timezone
from typing import Optional
from app.database import get_conn, get_write_conn
from app.models import InstructionItem, InstructionStatus
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Wakeup event lets get_user_request react instantly when a new instruction
# is enqueued instead of sleeping for a full second.
# ---------------------------------------------------------------------------
_wakeup_event: asyncio.Event | None = None
_event_loop: asyncio.AbstractEventLoop | None = None
async def init_wakeup() -> None:
"""Create the wakeup event. Must be called from async context (lifespan)."""
global _wakeup_event, _event_loop
_wakeup_event = asyncio.Event()
_event_loop = asyncio.get_running_loop()
logger.debug("Instruction wakeup event initialised")
def get_wakeup_event() -> asyncio.Event | None:
return _wakeup_event
def _trigger_wakeup() -> None:
"""Thread-safe: schedule event.set() on the running event loop."""
if _wakeup_event is None or _event_loop is None:
return
_event_loop.call_soon_threadsafe(_wakeup_event.set)
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _row_to_item(row: sqlite3.Row) -> InstructionItem:
return InstructionItem(
id=row["id"],
content=row["content"],
status=InstructionStatus(row["status"]),
created_at=row["created_at"],
updated_at=row["updated_at"],
consumed_at=row["consumed_at"],
consumed_by_agent_id=row["consumed_by_agent_id"],
position=row["position"],
)
# ---------------------------------------------------------------------------
# Reads
# ---------------------------------------------------------------------------
def list_instructions(status_filter: Optional[str] = None) -> list[InstructionItem]:
query = "SELECT * FROM instructions"
params: tuple = ()
if status_filter and status_filter != "all":
query += " WHERE status = ?"
params = (status_filter,)
query += " ORDER BY position ASC"
with get_conn() as conn:
rows = conn.execute(query, params).fetchall()
return [_row_to_item(r) for r in rows]
def get_instruction(instruction_id: str) -> Optional[InstructionItem]:
with get_conn() as conn:
row = conn.execute(
"SELECT * FROM instructions WHERE id = ?", (instruction_id,)
).fetchone()
return _row_to_item(row) if row else None
def get_queue_counts() -> dict[str, int]:
with get_conn() as conn:
pending = conn.execute(
"SELECT COUNT(*) FROM instructions WHERE status = 'pending'"
).fetchone()[0]
consumed = conn.execute(
"SELECT COUNT(*) FROM instructions WHERE status = 'consumed'"
).fetchone()[0]
return {"pending_count": pending, "consumed_count": consumed}
def _next_position(conn: sqlite3.Connection) -> int:
row = conn.execute("SELECT MAX(position) FROM instructions").fetchone()
return (row[0] or 0) + 1
# ---------------------------------------------------------------------------
# Writes
# ---------------------------------------------------------------------------
def create_instruction(content: str) -> InstructionItem:
instruction_id = str(uuid.uuid4())
now = _now_iso()
with get_write_conn() as conn:
pos = _next_position(conn)
conn.execute(
"""
INSERT INTO instructions (id, content, status, created_at, updated_at, position)
VALUES (?, ?, 'pending', ?, ?, ?)
""",
(instruction_id, content, now, now, pos),
)
logger.info("Instruction created id=%s pos=%d", instruction_id, pos)
item = get_instruction(instruction_id)
assert item is not None
_trigger_wakeup() # wake up any waiting get_user_request calls immediately
return item
def update_instruction(instruction_id: str, content: str) -> InstructionItem:
"""Update content of a pending instruction. Raises ValueError if consumed or not found."""
with get_write_conn() as conn:
row = conn.execute(
"SELECT status FROM instructions WHERE id = ?", (instruction_id,)
).fetchone()
if row is None:
raise KeyError(instruction_id)
if row["status"] != "pending":
raise PermissionError(f"Instruction {instruction_id} is already consumed")
now = _now_iso()
conn.execute(
"UPDATE instructions SET content = ?, updated_at = ? WHERE id = ?",
(content, now, instruction_id),
)
logger.info("Instruction updated id=%s", instruction_id)
item = get_instruction(instruction_id)
assert item is not None
return item
def delete_instruction(instruction_id: str) -> None:
"""Delete a pending instruction. Raises ValueError if consumed or not found."""
with get_write_conn() as conn:
row = conn.execute(
"SELECT status FROM instructions WHERE id = ?", (instruction_id,)
).fetchone()
if row is None:
raise KeyError(instruction_id)
if row["status"] != "pending":
raise PermissionError(f"Instruction {instruction_id} is already consumed")
conn.execute("DELETE FROM instructions WHERE id = ?", (instruction_id,))
logger.info("Instruction deleted id=%s", instruction_id)
def clear_consumed() -> int:
"""Delete all consumed instructions. Returns the number of rows deleted."""
with get_write_conn() as conn:
row = conn.execute(
"SELECT COUNT(*) FROM instructions WHERE status = 'consumed'"
).fetchone()
count = row[0] if row else 0
conn.execute("DELETE FROM instructions WHERE status = 'consumed'")
logger.info("Consumed instructions cleared count=%d", count)
return count
def consume_next(agent_id: str = "unknown") -> Optional[InstructionItem]:
"""
Atomically claim and return the next pending instruction.
Uses the write lock to prevent two concurrent callers from consuming the same item.
Returns None if the queue is empty.
"""
with get_write_conn() as conn:
row = conn.execute(
"""
SELECT id FROM instructions
WHERE status = 'pending'
ORDER BY position ASC
LIMIT 1
"""
).fetchone()
if row is None:
return None
instruction_id = row["id"]
now = _now_iso()
conn.execute(
"""
UPDATE instructions
SET status = 'consumed', consumed_at = ?, consumed_by_agent_id = ?, updated_at = ?
WHERE id = ?
""",
(now, agent_id, now, instruction_id),
)
logger.info("Instruction consumed id=%s agent=%s", instruction_id, agent_id)
return get_instruction(instruction_id)