Files
local-mcp/app/mcp_server.py
Brandon Zhang 056ae70e9a feat(mcp): add periodic SSE keepalive notifications during queue wait
When the tool is waiting for an instruction, send ctx.info() log
notifications to the client every KEEPALIVE_INTERVAL_SECONDS (default 20).
Purpose
-------
These notifications write bytes to the SSE stream, resetting any
transport-level HTTP read timeout on the client side (e.g. httpx
Timeout(read=N)).  This prevents premature connection drops when
wait periods exceed the client's inactivity window.
Caveat
------
Application-level wall-clock timers (anyio.fail_after / JS SDK
equivalents) are NOT affected by SSE events -- they count from
request start regardless.  This is confirmed by experiments in
tests/test_keepalive.py and tests/run_keepalive_experiments.py.
Experiment results (summarised in tests/run_keepalive_experiments.py)
----------------------------------------------------------------------
- Exp 1: anyio.fail_after(5s) fires at 5.98s with NO keepalives.
- Exp 2: anyio.fail_after(10s) fires at 10.90s WITH keepalives every 2s.
         Keepalives have ZERO effect on app-level timers.
- Exp 3b: httpx read=8s, keepalive=2s -> SUCCESS at 51s.
          Keepalives DO prevent transport-level read timeouts.
The Copilot extension 60s limit is almost certainly application-level
(hardcoded wall-clock), so default_wait_seconds=50 remains the correct
mitigation (returns before the 60s deadline).  The keepalives provide
defence-in-depth against any proxy/NAT inactivity drops.
2026-03-27 13:54:52 +08:00

235 lines
8.7 KiB
Python

"""
app/mcp_server.py
FastMCP server definition with the get_user_request tool.
The Starlette app returned by mcp.streamable_http_app() is mounted into
the main FastAPI application at /mcp.
"""
from __future__ import annotations
import asyncio
import logging
from typing import Optional
from mcp.server.fastmcp import FastMCP, Context
from app.config import settings
from app.services import config_service, event_service, instruction_service, status_service
logger = logging.getLogger(__name__)
mcp = FastMCP(
settings.mcp_server_name,
streamable_http_path="/",
stateless_http=settings.mcp_stateless,
)
# Build the ASGI app eagerly so that session_manager is created and can be
# started explicitly inside the FastAPI lifespan (see main.py).
mcp_asgi_app = mcp.streamable_http_app()
# Maximum wait the client is allowed to request (guards against runaway holds)
# Set very high — the wait is always interruptible by new instructions via the
# asyncio.Event wakeup, so there is no practical danger in long waits.
_MAX_WAIT_SECONDS = 86400 # 24 hours
# Per-agent generation counter — incremented on every new call.
# The wait loop only consumes an instruction when it holds the latest generation,
# preventing abandoned (timed-out) coroutines from silently consuming queue items.
_agent_generations: dict[str, int] = {}
# ---------------------------------------------------------------------------
# Keepalive configuration
# ---------------------------------------------------------------------------
# When > 0, the wait loop sends a ctx.info() log notification to the client
# every KEEPALIVE_INTERVAL_SECONDS. This keeps the underlying SSE stream
# active and prevents transport-level read timeouts (e.g. httpx read timeout).
#
# IMPORTANT: This does NOT help against application-level wall-clock timeouts
# (anyio.fail_after / JS SDK equivalents). Those timers count from request
# start and are unaffected by intermediate SSE events.
#
# Set to 0 to disable keepalives entirely.
KEEPALIVE_INTERVAL_SECONDS: float = 20.0
@mcp.tool()
async def get_user_request(
agent_id: str = "unknown",
default_response_override: Optional[str] = None,
ctx: Optional[Context] = None,
) -> dict:
"""
Fetch the next pending user instruction from the queue.
If no instruction is available the tool will wait up to `wait_seconds`
(or the server-configured default) before returning an empty / default response.
Args:
agent_id: An identifier for this agent instance (used to track connectivity).
default_response_override: Override the server-default empty response text
for this single call.
Returns:
A dict with keys: status, result_type, instruction, response,
remaining_pending, waited_seconds.
"""
cfg = config_service.get_config()
# Wait time is entirely server-controlled — the user sets it via the web UI.
actual_wait = min(cfg.default_wait_seconds, _MAX_WAIT_SECONDS)
# Register this call as the newest for this agent.
my_gen = _agent_generations.get(agent_id, 0) + 1
_agent_generations[agent_id] = my_gen
def _i_am_active() -> bool:
return _agent_generations.get(agent_id) == my_gen
# --- Attempt immediate dequeue ---
item = instruction_service.consume_next(agent_id=agent_id)
if item is not None:
counts = instruction_service.get_queue_counts()
status_service.record_agent_activity(agent_id, "instruction")
event_service.broadcast(
"instruction.consumed",
{"item": item.model_dump(mode="json"), "consumed_by_agent_id": agent_id},
)
event_service.broadcast("status.changed", {"queue": counts})
logger.info(
"get_user_request: instruction delivered id=%s agent=%s", item.id, agent_id
)
return {
"status": "ok",
"result_type": "instruction",
"instruction": {
"id": item.id,
"content": item.content,
"consumed_at": item.consumed_at.isoformat() if item.consumed_at else None,
},
"response": None,
"remaining_pending": counts["pending_count"],
"waited_seconds": 0,
}
# --- Wait loop (event-driven, not polling) ---
wakeup = instruction_service.get_wakeup_event()
loop = asyncio.get_event_loop()
start = loop.time()
last_keepalive = start
while True:
elapsed = loop.time() - start
remaining = actual_wait - elapsed
if remaining <= 0:
break
# If a newer call for this agent arrived, step aside without consuming.
if not _i_am_active():
logger.debug(
"get_user_request: superseded by newer call agent=%s gen=%d", agent_id, my_gen
)
break
# Clear the event BEFORE checking the queue so we never miss a
# wake-up that arrives between the DB check and event.wait().
if wakeup is not None:
wakeup.clear()
item = instruction_service.consume_next(agent_id=agent_id)
if item is not None:
counts = instruction_service.get_queue_counts()
status_service.record_agent_activity(agent_id, "instruction")
event_service.broadcast(
"instruction.consumed",
{"item": item.model_dump(mode="json"), "consumed_by_agent_id": agent_id},
)
event_service.broadcast("status.changed", {"queue": counts})
waited = int(loop.time() - start)
logger.info(
"get_user_request: instruction delivered (after %ds wait) id=%s agent=%s gen=%d",
waited, item.id, agent_id, my_gen,
)
return {
"status": "ok",
"result_type": "instruction",
"instruction": {
"id": item.id,
"content": item.content,
"consumed_at": item.consumed_at.isoformat() if item.consumed_at else None,
},
"response": None,
"remaining_pending": counts["pending_count"],
"waited_seconds": waited,
}
# Send keepalive notification if enabled and interval has elapsed.
# This writes an SSE event to the response stream, preventing transport-level
# read timeouts (e.g. httpx read timeout). Does NOT reset application-level
# wall-clock timers such as anyio.fail_after.
now = loop.time()
if KEEPALIVE_INTERVAL_SECONDS > 0 and ctx is not None:
if now - last_keepalive >= KEEPALIVE_INTERVAL_SECONDS:
waited_so_far = int(now - start)
try:
await ctx.info(
f"keepalive: waiting for instructions "
f"(agent={agent_id}, waited={waited_so_far}s)"
)
logger.debug(
"get_user_request: keepalive sent agent=%s waited=%ds",
agent_id, waited_so_far,
)
except Exception as exc:
# Client disconnected — no point continuing
logger.debug("get_user_request: keepalive failed (client gone?): %s", exc)
break
last_keepalive = now
# Sleep until woken by a new instruction or until the next keepalive is due.
if KEEPALIVE_INTERVAL_SECONDS > 0:
time_to_next_keepalive = max(
0.0, KEEPALIVE_INTERVAL_SECONDS - (loop.time() - last_keepalive)
)
wait_for = min(remaining, time_to_next_keepalive, 1.0)
else:
wait_for = min(remaining, 1.0)
if wakeup is not None:
try:
await asyncio.wait_for(wakeup.wait(), timeout=wait_for)
except asyncio.TimeoutError:
pass
else:
await asyncio.sleep(wait_for)
waited = int(loop.time() - start)
# --- Nothing available after waiting (or superseded) ---
if _i_am_active():
status_service.record_agent_activity(agent_id, "empty")
event_service.broadcast("status.changed", {})
empty_response = (
default_response_override
if default_response_override is not None
else cfg.default_empty_response
)
result_type = "default_response" if empty_response else "empty"
if _i_am_active():
logger.info(
"get_user_request: empty result_type=%s waited=%ds agent=%s gen=%d",
result_type, waited, agent_id, my_gen,
)
return {
"status": "ok",
"result_type": result_type,
"instruction": None,
"response": empty_response,
"remaining_pending": 0,
"waited_seconds": waited,
}