This commit is contained in:
2026-03-27 03:58:57 +08:00
commit 86eba27a24
38 changed files with 4074 additions and 0 deletions

2
app/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
# app package

2
app/api/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
# api package

41
app/api/routes_config.py Normal file
View File

@@ -0,0 +1,41 @@
"""
app/api/routes_config.py
HTTP endpoints for reading and updating runtime configuration.
"""
from __future__ import annotations
import logging
from fastapi import APIRouter
from app.models import ConfigResponse, UpdateConfigRequest
from app.services import config_service, event_service
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/config", tags=["config"])
@router.get("", response_model=ConfigResponse)
def get_config():
return config_service.get_config()
@router.patch("", response_model=ConfigResponse)
def update_config(body: UpdateConfigRequest):
cfg = config_service.update_config(
default_wait_seconds=body.default_wait_seconds,
default_empty_response=body.default_empty_response,
agent_stale_after_seconds=body.agent_stale_after_seconds,
)
event_service.broadcast(
"config.updated",
{
"default_wait_seconds": cfg.default_wait_seconds,
"default_empty_response": cfg.default_empty_response,
"agent_stale_after_seconds": cfg.agent_stale_after_seconds,
},
)
return cfg

View File

@@ -0,0 +1,72 @@
"""
app/api/routes_instructions.py
HTTP endpoints for instruction CRUD.
"""
from __future__ import annotations
import logging
from fastapi import APIRouter, HTTPException, Query
from fastapi.responses import Response
from app.models import (
CreateInstructionRequest,
InstructionCreateResponse,
InstructionListResponse,
UpdateInstructionRequest,
)
from app.services import event_service, instruction_service
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/instructions", tags=["instructions"])
@router.get("", response_model=InstructionListResponse)
def list_instructions(
status: str = Query(default="all", pattern="^(pending|consumed|all)$")
):
items = instruction_service.list_instructions(status_filter=status)
return InstructionListResponse(items=items)
@router.post("", response_model=InstructionCreateResponse, status_code=201)
def create_instruction(body: CreateInstructionRequest):
item = instruction_service.create_instruction(body.content)
event_service.broadcast(
"instruction.created",
{"item": item.model_dump(mode="json")},
)
return InstructionCreateResponse(item=item)
@router.patch("/{instruction_id}", response_model=InstructionCreateResponse)
def update_instruction(instruction_id: str, body: UpdateInstructionRequest):
try:
item = instruction_service.update_instruction(instruction_id, body.content)
except KeyError:
raise HTTPException(status_code=404, detail="Instruction not found")
except PermissionError as exc:
raise HTTPException(status_code=409, detail=str(exc))
event_service.broadcast(
"instruction.updated",
{"item": item.model_dump(mode="json")},
)
return InstructionCreateResponse(item=item)
@router.delete("/{instruction_id}", status_code=204)
def delete_instruction(instruction_id: str):
try:
instruction_service.delete_instruction(instruction_id)
except KeyError:
raise HTTPException(status_code=404, detail="Instruction not found")
except PermissionError as exc:
raise HTTPException(status_code=409, detail=str(exc))
event_service.broadcast("instruction.deleted", {"id": instruction_id})
return Response(status_code=204)

77
app/api/routes_status.py Normal file
View File

@@ -0,0 +1,77 @@
"""
app/api/routes_status.py
HTTP endpoints for server/agent status and SSE events.
"""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
from app.models import (
AgentInfo,
HealthResponse,
QueueCounts,
ServerInfo,
StatusResponse,
)
from app.services import config_service, event_service, instruction_service, status_service
logger = logging.getLogger(__name__)
router = APIRouter(tags=["status"])
@router.get("/healthz", response_model=HealthResponse)
def health():
return HealthResponse(status="ok", server_time=datetime.now(timezone.utc))
@router.get("/api/status", response_model=StatusResponse)
def get_status():
agent_row = status_service.get_latest_agent_activity()
connected = status_service.is_agent_connected()
agent_info = AgentInfo(
connected=connected,
last_seen_at=datetime.fromisoformat(agent_row["last_seen_at"]) if agent_row else None,
last_fetch_at=datetime.fromisoformat(agent_row["last_fetch_at"]) if agent_row else None,
agent_id=agent_row["agent_id"] if agent_row else None,
)
counts = instruction_service.get_queue_counts()
cfg = config_service.get_config()
return StatusResponse(
server=ServerInfo(
status="up",
started_at=status_service.server_started_at(),
),
agent=agent_info,
queue=QueueCounts(**counts),
settings=cfg,
)
@router.get("/api/events")
async def sse_events():
q = event_service.subscribe()
async def stream():
async for chunk in event_service.event_generator(q):
yield chunk
return StreamingResponse(
stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
"Connection": "keep-alive",
},
)

62
app/config.py Normal file
View File

@@ -0,0 +1,62 @@
"""
app/config.py
Runtime configuration loaded from environment variables with sensible defaults.
"""
from __future__ import annotations
import os
from dataclasses import dataclass, field
@dataclass
class Settings:
# Server
host: str = "0.0.0.0"
http_port: int = 8000
# Database
db_path: str = "data/local_mcp.sqlite3"
# Logging
log_level: str = "INFO"
# MCP / queue behaviour (runtime-editable values are stored in DB; these are defaults for first run)
default_wait_seconds: int = 10
default_empty_response: str = ""
agent_stale_after_seconds: int = 30
# MCP server name
mcp_server_name: str = "local-mcp"
# MCP transport — stateless=True means no session IDs, survives server restarts.
# Set MCP_STATELESS=false to use stateful sessions (needed for multi-turn MCP flows).
mcp_stateless: bool = True
def _parse_bool(value: str, default: bool) -> bool:
if value.lower() in ("1", "true", "yes", "on"):
return True
if value.lower() in ("0", "false", "no", "off"):
return False
return default
def load_settings() -> Settings:
"""Load settings from environment variables, falling back to defaults."""
return Settings(
host=os.getenv("HOST", "0.0.0.0"),
http_port=int(os.getenv("HTTP_PORT", "8000")),
db_path=os.getenv("DB_PATH", "data/local_mcp.sqlite3"),
log_level=os.getenv("LOG_LEVEL", "INFO"),
default_wait_seconds=int(os.getenv("DEFAULT_WAIT_SECONDS", "10")),
default_empty_response=os.getenv("DEFAULT_EMPTY_RESPONSE", ""),
agent_stale_after_seconds=int(os.getenv("AGENT_STALE_AFTER_SECONDS", "30")),
mcp_server_name=os.getenv("MCP_SERVER_NAME", "local-mcp"),
mcp_stateless=_parse_bool(os.getenv("MCP_STATELESS", "true"), default=True),
)
# Singleton imported and used throughout the app
settings: Settings = load_settings()

124
app/database.py Normal file
View File

@@ -0,0 +1,124 @@
"""
app/database.py
SQLite database initialisation, schema management, and low-level connection helpers.
"""
from __future__ import annotations
import logging
import os
import sqlite3
import threading
from contextlib import contextmanager
from typing import Generator
logger = logging.getLogger(__name__)
# Module-level lock for write serialisation (consume atomicity, settings writes, etc.)
_write_lock = threading.Lock()
_db_path: str = ""
def init_db(db_path: str) -> None:
"""Initialise the database: create directory, schema, and seed defaults."""
global _db_path
_db_path = db_path
os.makedirs(os.path.dirname(db_path) if os.path.dirname(db_path) else ".", exist_ok=True)
with _connect() as conn:
_create_schema(conn)
_seed_defaults(conn)
logger.info("Database initialised at %s", db_path)
# ---------------------------------------------------------------------------
# Connection helpers
# ---------------------------------------------------------------------------
@contextmanager
def get_conn() -> Generator[sqlite3.Connection, None, None]:
"""Yield a short-lived read-write connection. Commits on success, rolls back on error."""
conn = _connect()
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
@contextmanager
def get_write_conn() -> Generator[sqlite3.Connection, None, None]:
"""Yield a connection protected by the module write lock (for atomic operations)."""
with _write_lock:
with get_conn() as conn:
yield conn
def _connect() -> sqlite3.Connection:
conn = sqlite3.connect(_db_path, check_same_thread=False)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
# ---------------------------------------------------------------------------
# Schema
# ---------------------------------------------------------------------------
_SCHEMA = """
CREATE TABLE IF NOT EXISTS instructions (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
consumed_at TEXT,
consumed_by_agent_id TEXT,
position INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_instructions_status_position
ON instructions (status, position);
CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS agent_activity (
agent_id TEXT PRIMARY KEY,
last_seen_at TEXT NOT NULL,
last_fetch_at TEXT NOT NULL,
last_result_type TEXT NOT NULL
);
"""
_DEFAULT_SETTINGS = {
"default_wait_seconds": "10",
"default_empty_response": "",
"agent_stale_after_seconds": "30",
}
def _create_schema(conn: sqlite3.Connection) -> None:
conn.executescript(_SCHEMA)
conn.commit()
logger.debug("Schema ensured")
def _seed_defaults(conn: sqlite3.Connection) -> None:
for key, value in _DEFAULT_SETTINGS.items():
conn.execute(
"INSERT OR IGNORE INTO settings (key, value) VALUES (?, ?)",
(key, value),
)
conn.commit()
logger.debug("Default settings seeded")

41
app/logging_setup.py Normal file
View File

@@ -0,0 +1,41 @@
"""
app/logging_setup.py
Centralised logging configuration for the application.
"""
from __future__ import annotations
import logging
import sys
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from app.config import Settings
def configure_logging(settings: "Settings") -> None:
"""Configure root logger and suppress noisy third-party loggers."""
level = getattr(logging, settings.log_level.upper(), logging.INFO)
formatter = logging.Formatter(
fmt="%(asctime)s %(levelname)-8s %(name)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
root = logging.getLogger()
root.setLevel(level)
# Avoid duplicate handlers if called multiple times
if not root.handlers:
root.addHandler(handler)
else:
root.handlers[0] = handler
# Quieten noisy third-party loggers
for noisy in ("uvicorn.access", "uvicorn.error", "httpx", "httpcore"):
logging.getLogger(noisy).setLevel(logging.WARNING)
logging.getLogger("uvicorn").setLevel(logging.INFO)

201
app/mcp_server.py Normal file
View File

@@ -0,0 +1,201 @@
"""
app/mcp_server.py
FastMCP server definition with the get_user_request tool.
The Starlette app returned by mcp.streamable_http_app() is mounted into
the main FastAPI application at /mcp.
"""
from __future__ import annotations
import asyncio
import logging
from typing import Optional
from mcp.server.fastmcp import FastMCP
from app.config import settings
from app.services import config_service, event_service, instruction_service, status_service
logger = logging.getLogger(__name__)
mcp = FastMCP(
settings.mcp_server_name,
streamable_http_path="/",
stateless_http=settings.mcp_stateless,
)
# Build the ASGI app eagerly so that session_manager is created and can be
# started explicitly inside the FastAPI lifespan (see main.py).
mcp_asgi_app = mcp.streamable_http_app()
# Maximum wait the client is allowed to request (guards against runaway holds)
# Set very high — the wait is always interruptible by new instructions via the
# asyncio.Event wakeup, so there is no practical danger in long waits.
_MAX_WAIT_SECONDS = 86400 # 24 hours
# Per-agent generation counter — incremented on every new call.
# The wait loop only consumes an instruction when it holds the latest generation,
# preventing abandoned (timed-out) coroutines from silently consuming queue items.
_agent_generations: dict[str, int] = {}
@mcp.tool()
async def get_user_request(
agent_id: str = "unknown",
wait_seconds: Optional[int] = None,
default_response_override: Optional[str] = None,
) -> dict:
"""
Fetch the next pending user instruction from the queue.
The server enforces a minimum wait time (configurable via the web UI).
The agent may request a longer wait via `wait_seconds`, but cannot go
below that minimum — this prevents busy-polling when the queue is empty.
actual_wait = max(wait_seconds or 0, server_min_wait_seconds).
Args:
agent_id: An identifier for this agent instance (used to track connectivity).
wait_seconds: Desired wait when queue is empty. Actual wait is
max(this, server minimum). Omit to use server minimum only.
default_response_override: Override the server-default empty response text
for this single call.
Returns:
A dict with keys: status, result_type, instruction, response,
remaining_pending, waited_seconds.
"""
cfg = config_service.get_config()
# default_wait_seconds is the server-enforced MINIMUM wait time.
client_requested = wait_seconds if wait_seconds is not None else 0
actual_wait = min(
max(client_requested, cfg.default_wait_seconds), # enforce floor
_MAX_WAIT_SECONDS,
)
# Register this call as the newest for this agent. Any older coroutines
# still lingering (e.g. client timed-out and retried) will see a stale
# generation and skip the consume step, leaving the instruction for us.
my_gen = _agent_generations.get(agent_id, 0) + 1
_agent_generations[agent_id] = my_gen
def _i_am_active() -> bool:
"""True if no newer call has arrived for this agent since we started."""
return _agent_generations.get(agent_id) == my_gen
# --- Attempt immediate dequeue ---
item = instruction_service.consume_next(agent_id=agent_id)
if item is not None:
counts = instruction_service.get_queue_counts()
status_service.record_agent_activity(agent_id, "instruction")
event_service.broadcast(
"instruction.consumed",
{"item": item.model_dump(mode="json"), "consumed_by_agent_id": agent_id},
)
event_service.broadcast("status.changed", {"queue": counts})
logger.info(
"get_user_request: instruction delivered id=%s agent=%s", item.id, agent_id
)
return {
"status": "ok",
"result_type": "instruction",
"instruction": {
"id": item.id,
"content": item.content,
"consumed_at": item.consumed_at.isoformat() if item.consumed_at else None,
},
"response": None,
"remaining_pending": counts["pending_count"],
"waited_seconds": 0,
}
# --- Wait loop (event-driven, not polling) ---
wakeup = instruction_service.get_wakeup_event()
loop = asyncio.get_event_loop()
start = loop.time()
while True:
elapsed = loop.time() - start
remaining = actual_wait - elapsed
if remaining <= 0:
break
# If a newer call for this agent arrived, step aside without consuming.
if not _i_am_active():
logger.debug(
"get_user_request: superseded by newer call agent=%s gen=%d", agent_id, my_gen
)
break
# Clear the event BEFORE checking the queue so we never miss a
# wake-up that arrives between the DB check and event.wait().
if wakeup is not None:
wakeup.clear()
item = instruction_service.consume_next(agent_id=agent_id)
if item is not None:
counts = instruction_service.get_queue_counts()
status_service.record_agent_activity(agent_id, "instruction")
event_service.broadcast(
"instruction.consumed",
{"item": item.model_dump(mode="json"), "consumed_by_agent_id": agent_id},
)
event_service.broadcast("status.changed", {"queue": counts})
waited = int(loop.time() - start)
logger.info(
"get_user_request: instruction delivered (after %ds wait) id=%s agent=%s gen=%d",
waited, item.id, agent_id, my_gen,
)
return {
"status": "ok",
"result_type": "instruction",
"instruction": {
"id": item.id,
"content": item.content,
"consumed_at": item.consumed_at.isoformat() if item.consumed_at else None,
},
"response": None,
"remaining_pending": counts["pending_count"],
"waited_seconds": waited,
}
# Sleep until woken by a new instruction or 1 s elapses (safety net)
wait_for = min(remaining, 1.0)
if wakeup is not None:
try:
await asyncio.wait_for(wakeup.wait(), timeout=wait_for)
except asyncio.TimeoutError:
pass
else:
await asyncio.sleep(wait_for)
waited = int(loop.time() - start)
# --- Nothing available after waiting (or superseded) ---
if _i_am_active():
# Only record/broadcast when we're the active caller
status_service.record_agent_activity(agent_id, "empty")
event_service.broadcast("status.changed", {})
empty_response = (
default_response_override
if default_response_override is not None
else cfg.default_empty_response
)
result_type = "default_response" if empty_response else "empty"
if _i_am_active():
logger.info(
"get_user_request: empty result_type=%s waited=%ds agent=%s gen=%d",
result_type, waited, agent_id, my_gen,
)
return {
"status": "ok",
"result_type": result_type,
"instruction": None,
"response": empty_response,
"remaining_pending": 0,
"waited_seconds": waited,
}

146
app/models.py Normal file
View File

@@ -0,0 +1,146 @@
"""
app/models.py
Pydantic request/response models used by the HTTP API.
"""
from __future__ import annotations
from datetime import datetime
from enum import Enum
from typing import Optional
from pydantic import BaseModel, field_validator
# ---------------------------------------------------------------------------
# Enumerations
# ---------------------------------------------------------------------------
class InstructionStatus(str, Enum):
pending = "pending"
consumed = "consumed"
class ResultType(str, Enum):
instruction = "instruction"
empty = "empty"
default_response = "default_response"
# ---------------------------------------------------------------------------
# Instruction models
# ---------------------------------------------------------------------------
class InstructionItem(BaseModel):
id: str
content: str
status: InstructionStatus
created_at: datetime
updated_at: datetime
consumed_at: Optional[datetime] = None
consumed_by_agent_id: Optional[str] = None
position: int
class InstructionListResponse(BaseModel):
items: list[InstructionItem]
class InstructionCreateResponse(BaseModel):
item: InstructionItem
class CreateInstructionRequest(BaseModel):
content: str
@field_validator("content")
@classmethod
def content_not_empty(cls, v: str) -> str:
if not v.strip():
raise ValueError("content must not be blank")
return v.strip()
class UpdateInstructionRequest(BaseModel):
content: str
@field_validator("content")
@classmethod
def content_not_empty(cls, v: str) -> str:
if not v.strip():
raise ValueError("content must not be blank")
return v.strip()
# ---------------------------------------------------------------------------
# Status models
# ---------------------------------------------------------------------------
class ServerInfo(BaseModel):
status: str
started_at: datetime
class AgentInfo(BaseModel):
connected: bool
last_seen_at: Optional[datetime] = None
last_fetch_at: Optional[datetime] = None
agent_id: Optional[str] = None
class QueueCounts(BaseModel):
pending_count: int
consumed_count: int
class StatusResponse(BaseModel):
server: ServerInfo
agent: AgentInfo
queue: QueueCounts
settings: "ConfigResponse"
# ---------------------------------------------------------------------------
# Config models
# ---------------------------------------------------------------------------
class ConfigResponse(BaseModel):
default_wait_seconds: int
default_empty_response: str
agent_stale_after_seconds: int
class UpdateConfigRequest(BaseModel):
default_wait_seconds: Optional[int] = None
default_empty_response: Optional[str] = None
agent_stale_after_seconds: Optional[int] = None
# ---------------------------------------------------------------------------
# Health model
# ---------------------------------------------------------------------------
class HealthResponse(BaseModel):
status: str
server_time: datetime
# ---------------------------------------------------------------------------
# MCP tool response models
# ---------------------------------------------------------------------------
class InstructionPayload(BaseModel):
id: str
content: str
consumed_at: datetime
class GetUserRequestResponse(BaseModel):
status: str
result_type: ResultType
instruction: Optional[InstructionPayload] = None
response: Optional[str] = None
remaining_pending: int
waited_seconds: int

2
app/services/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
# services package

View File

@@ -0,0 +1,52 @@
"""
app/services/config_service.py
Read and write runtime settings stored in the SQLite settings table.
"""
from __future__ import annotations
import logging
from app.database import get_conn, get_write_conn
from app.models import ConfigResponse
logger = logging.getLogger(__name__)
_SETTING_KEYS = {"default_wait_seconds", "default_empty_response", "agent_stale_after_seconds"}
def get_config() -> ConfigResponse:
with get_conn() as conn:
rows = conn.execute("SELECT key, value FROM settings").fetchall()
data = {r["key"]: r["value"] for r in rows}
return ConfigResponse(
default_wait_seconds=int(data.get("default_wait_seconds", 10)),
default_empty_response=data.get("default_empty_response", ""),
agent_stale_after_seconds=int(data.get("agent_stale_after_seconds", 30)),
)
def update_config(
default_wait_seconds: int | None = None,
default_empty_response: str | None = None,
agent_stale_after_seconds: int | None = None,
) -> ConfigResponse:
updates: dict[str, str] = {}
if default_wait_seconds is not None:
updates["default_wait_seconds"] = str(default_wait_seconds)
if default_empty_response is not None:
updates["default_empty_response"] = default_empty_response
if agent_stale_after_seconds is not None:
updates["agent_stale_after_seconds"] = str(agent_stale_after_seconds)
if updates:
with get_write_conn() as conn:
for key, value in updates.items():
conn.execute(
"INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
(key, value),
)
logger.info("Config updated: %s", list(updates.keys()))
return get_config()

View File

@@ -0,0 +1,75 @@
"""
app/services/event_service.py
Server-Sent Events (SSE) broadcaster.
Maintains a set of subscriber asyncio queues and fans out events to all of them.
"""
from __future__ import annotations
import asyncio
import json
import logging
from datetime import datetime, timezone
from typing import AsyncGenerator
logger = logging.getLogger(__name__)
# All active SSE subscriber queues
_subscribers: set[asyncio.Queue] = set()
def subscribe() -> asyncio.Queue:
"""Register a new SSE subscriber and return its queue."""
q: asyncio.Queue = asyncio.Queue(maxsize=64)
_subscribers.add(q)
logger.debug("SSE subscriber added, total=%d", len(_subscribers))
return q
def unsubscribe(q: asyncio.Queue) -> None:
"""Remove a subscriber queue."""
_subscribers.discard(q)
logger.debug("SSE subscriber removed, total=%d", len(_subscribers))
def broadcast(event_type: str, data: dict) -> None:
"""
Fan out an event to all current subscribers.
Safe to call from synchronous code uses put_nowait and discards slow consumers.
"""
payload = json.dumps(
{
"type": event_type,
"timestamp": datetime.now(timezone.utc).isoformat(),
"data": data,
}
)
dead: list[asyncio.Queue] = []
for q in list(_subscribers):
try:
q.put_nowait(payload)
except asyncio.QueueFull:
logger.warning("SSE subscriber queue full, dropping event type=%s", event_type)
dead.append(q)
for q in dead:
_subscribers.discard(q)
async def event_generator(q: asyncio.Queue) -> AsyncGenerator[str, None]:
"""
Async generator that yields SSE-formatted strings from a subscriber queue.
Yields a keep-alive comment every 15 seconds when idle.
"""
try:
while True:
try:
payload = await asyncio.wait_for(q.get(), timeout=15.0)
yield f"data: {payload}\n\n"
except asyncio.TimeoutError:
# Keep-alive ping so browsers don't close the connection
yield ": ping\n\n"
except asyncio.CancelledError:
pass
finally:
unsubscribe(q)

View File

@@ -0,0 +1,203 @@
"""
app/services/instruction_service.py
Business logic for managing the instruction queue.
All write operations that affect queue integrity use the write lock via get_write_conn().
"""
from __future__ import annotations
import asyncio
import logging
import sqlite3
import uuid
from datetime import datetime, timezone
from typing import Optional
from app.database import get_conn, get_write_conn
from app.models import InstructionItem, InstructionStatus
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Wakeup event lets get_user_request react instantly when a new instruction
# is enqueued instead of sleeping for a full second.
# ---------------------------------------------------------------------------
_wakeup_event: asyncio.Event | None = None
_event_loop: asyncio.AbstractEventLoop | None = None
async def init_wakeup() -> None:
"""Create the wakeup event. Must be called from async context (lifespan)."""
global _wakeup_event, _event_loop
_wakeup_event = asyncio.Event()
_event_loop = asyncio.get_running_loop()
logger.debug("Instruction wakeup event initialised")
def get_wakeup_event() -> asyncio.Event | None:
return _wakeup_event
def _trigger_wakeup() -> None:
"""Thread-safe: schedule event.set() on the running event loop."""
if _wakeup_event is None or _event_loop is None:
return
_event_loop.call_soon_threadsafe(_wakeup_event.set)
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _row_to_item(row: sqlite3.Row) -> InstructionItem:
return InstructionItem(
id=row["id"],
content=row["content"],
status=InstructionStatus(row["status"]),
created_at=row["created_at"],
updated_at=row["updated_at"],
consumed_at=row["consumed_at"],
consumed_by_agent_id=row["consumed_by_agent_id"],
position=row["position"],
)
# ---------------------------------------------------------------------------
# Reads
# ---------------------------------------------------------------------------
def list_instructions(status_filter: Optional[str] = None) -> list[InstructionItem]:
query = "SELECT * FROM instructions"
params: tuple = ()
if status_filter and status_filter != "all":
query += " WHERE status = ?"
params = (status_filter,)
query += " ORDER BY position ASC"
with get_conn() as conn:
rows = conn.execute(query, params).fetchall()
return [_row_to_item(r) for r in rows]
def get_instruction(instruction_id: str) -> Optional[InstructionItem]:
with get_conn() as conn:
row = conn.execute(
"SELECT * FROM instructions WHERE id = ?", (instruction_id,)
).fetchone()
return _row_to_item(row) if row else None
def get_queue_counts() -> dict[str, int]:
with get_conn() as conn:
pending = conn.execute(
"SELECT COUNT(*) FROM instructions WHERE status = 'pending'"
).fetchone()[0]
consumed = conn.execute(
"SELECT COUNT(*) FROM instructions WHERE status = 'consumed'"
).fetchone()[0]
return {"pending_count": pending, "consumed_count": consumed}
def _next_position(conn: sqlite3.Connection) -> int:
row = conn.execute("SELECT MAX(position) FROM instructions").fetchone()
return (row[0] or 0) + 1
# ---------------------------------------------------------------------------
# Writes
# ---------------------------------------------------------------------------
def create_instruction(content: str) -> InstructionItem:
instruction_id = str(uuid.uuid4())
now = _now_iso()
with get_write_conn() as conn:
pos = _next_position(conn)
conn.execute(
"""
INSERT INTO instructions (id, content, status, created_at, updated_at, position)
VALUES (?, ?, 'pending', ?, ?, ?)
""",
(instruction_id, content, now, now, pos),
)
logger.info("Instruction created id=%s pos=%d", instruction_id, pos)
item = get_instruction(instruction_id)
assert item is not None
_trigger_wakeup() # wake up any waiting get_user_request calls immediately
return item
def update_instruction(instruction_id: str, content: str) -> InstructionItem:
"""Update content of a pending instruction. Raises ValueError if consumed or not found."""
with get_write_conn() as conn:
row = conn.execute(
"SELECT status FROM instructions WHERE id = ?", (instruction_id,)
).fetchone()
if row is None:
raise KeyError(instruction_id)
if row["status"] != "pending":
raise PermissionError(f"Instruction {instruction_id} is already consumed")
now = _now_iso()
conn.execute(
"UPDATE instructions SET content = ?, updated_at = ? WHERE id = ?",
(content, now, instruction_id),
)
logger.info("Instruction updated id=%s", instruction_id)
item = get_instruction(instruction_id)
assert item is not None
return item
def delete_instruction(instruction_id: str) -> None:
"""Delete a pending instruction. Raises ValueError if consumed or not found."""
with get_write_conn() as conn:
row = conn.execute(
"SELECT status FROM instructions WHERE id = ?", (instruction_id,)
).fetchone()
if row is None:
raise KeyError(instruction_id)
if row["status"] != "pending":
raise PermissionError(f"Instruction {instruction_id} is already consumed")
conn.execute("DELETE FROM instructions WHERE id = ?", (instruction_id,))
logger.info("Instruction deleted id=%s", instruction_id)
def consume_next(agent_id: str = "unknown") -> Optional[InstructionItem]:
"""
Atomically claim and return the next pending instruction.
Uses the write lock to prevent two concurrent callers from consuming the same item.
Returns None if the queue is empty.
"""
with get_write_conn() as conn:
row = conn.execute(
"""
SELECT id FROM instructions
WHERE status = 'pending'
ORDER BY position ASC
LIMIT 1
"""
).fetchone()
if row is None:
return None
instruction_id = row["id"]
now = _now_iso()
conn.execute(
"""
UPDATE instructions
SET status = 'consumed', consumed_at = ?, consumed_by_agent_id = ?, updated_at = ?
WHERE id = ?
""",
(now, agent_id, now, instruction_id),
)
logger.info("Instruction consumed id=%s agent=%s", instruction_id, agent_id)
return get_instruction(instruction_id)

View File

@@ -0,0 +1,79 @@
"""
app/services/status_service.py
Tracks server startup time and agent activity.
"""
from __future__ import annotations
import logging
import sqlite3
from datetime import datetime, timezone
from typing import Optional
from app.database import get_conn, get_write_conn
logger = logging.getLogger(__name__)
_server_started_at: datetime = datetime.now(timezone.utc)
def server_started_at() -> datetime:
return _server_started_at
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
# ---------------------------------------------------------------------------
# Agent activity
# ---------------------------------------------------------------------------
def record_agent_activity(agent_id: str, result_type: str) -> None:
"""Upsert agent activity record on every tool call."""
now = _now_iso()
with get_write_conn() as conn:
conn.execute(
"""
INSERT INTO agent_activity (agent_id, last_seen_at, last_fetch_at, last_result_type)
VALUES (?, ?, ?, ?)
ON CONFLICT(agent_id) DO UPDATE SET
last_seen_at = excluded.last_seen_at,
last_fetch_at = excluded.last_fetch_at,
last_result_type = excluded.last_result_type
""",
(agent_id, now, now, result_type),
)
logger.debug("Agent activity recorded agent=%s result=%s", agent_id, result_type)
def get_latest_agent_activity() -> Optional[sqlite3.Row]:
"""Return the most recently active agent row, or None."""
with get_conn() as conn:
return conn.execute(
"SELECT * FROM agent_activity ORDER BY last_seen_at DESC LIMIT 1"
).fetchone()
def get_agent_stale_seconds() -> int:
"""Read agent_stale_after_seconds from settings table."""
with get_conn() as conn:
row = conn.execute(
"SELECT value FROM settings WHERE key = 'agent_stale_after_seconds'"
).fetchone()
return int(row["value"]) if row else 30
def is_agent_connected() -> bool:
"""True if the most recent agent activity is within the stale threshold."""
row = get_latest_agent_activity()
if row is None:
return False
stale_seconds = get_agent_stale_seconds()
last_seen = datetime.fromisoformat(row["last_seen_at"])
now = datetime.now(timezone.utc)
if last_seen.tzinfo is None:
last_seen = last_seen.replace(tzinfo=timezone.utc)
delta = (now - last_seen).total_seconds()
return delta <= stale_seconds