Files
local-mcp/app/services/status_service.py
2026-03-27 03:58:57 +08:00

80 lines
2.5 KiB
Python

"""
app/services/status_service.py
Tracks server startup time and agent activity.
"""
from __future__ import annotations
import logging
import sqlite3
from datetime import datetime, timezone
from typing import Optional
from app.database import get_conn, get_write_conn
logger = logging.getLogger(__name__)
_server_started_at: datetime = datetime.now(timezone.utc)
def server_started_at() -> datetime:
return _server_started_at
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
# ---------------------------------------------------------------------------
# Agent activity
# ---------------------------------------------------------------------------
def record_agent_activity(agent_id: str, result_type: str) -> None:
"""Upsert agent activity record on every tool call."""
now = _now_iso()
with get_write_conn() as conn:
conn.execute(
"""
INSERT INTO agent_activity (agent_id, last_seen_at, last_fetch_at, last_result_type)
VALUES (?, ?, ?, ?)
ON CONFLICT(agent_id) DO UPDATE SET
last_seen_at = excluded.last_seen_at,
last_fetch_at = excluded.last_fetch_at,
last_result_type = excluded.last_result_type
""",
(agent_id, now, now, result_type),
)
logger.debug("Agent activity recorded agent=%s result=%s", agent_id, result_type)
def get_latest_agent_activity() -> Optional[sqlite3.Row]:
"""Return the most recently active agent row, or None."""
with get_conn() as conn:
return conn.execute(
"SELECT * FROM agent_activity ORDER BY last_seen_at DESC LIMIT 1"
).fetchone()
def get_agent_stale_seconds() -> int:
"""Read agent_stale_after_seconds from settings table."""
with get_conn() as conn:
row = conn.execute(
"SELECT value FROM settings WHERE key = 'agent_stale_after_seconds'"
).fetchone()
return int(row["value"]) if row else 30
def is_agent_connected() -> bool:
"""True if the most recent agent activity is within the stale threshold."""
row = get_latest_agent_activity()
if row is None:
return False
stale_seconds = get_agent_stale_seconds()
last_seen = datetime.fromisoformat(row["last_seen_at"])
now = datetime.now(timezone.utc)
if last_seen.tzinfo is None:
last_seen = last_seen.replace(tzinfo=timezone.utc)
delta = (now - last_seen).total_seconds()
return delta <= stale_seconds