From 056ae70e9aee1a1fa5a437188802d782cb647456 Mon Sep 17 00:00:00 2001 From: Brandon Zhang Date: Fri, 27 Mar 2026 13:54:52 +0800 Subject: [PATCH] feat(mcp): add periodic SSE keepalive notifications during queue wait When the tool is waiting for an instruction, send ctx.info() log notifications to the client every KEEPALIVE_INTERVAL_SECONDS (default 20). Purpose ------- These notifications write bytes to the SSE stream, resetting any transport-level HTTP read timeout on the client side (e.g. httpx Timeout(read=N)). This prevents premature connection drops when wait periods exceed the client's inactivity window. Caveat ------ Application-level wall-clock timers (anyio.fail_after / JS SDK equivalents) are NOT affected by SSE events -- they count from request start regardless. This is confirmed by experiments in tests/test_keepalive.py and tests/run_keepalive_experiments.py. Experiment results (summarised in tests/run_keepalive_experiments.py) ---------------------------------------------------------------------- - Exp 1: anyio.fail_after(5s) fires at 5.98s with NO keepalives. - Exp 2: anyio.fail_after(10s) fires at 10.90s WITH keepalives every 2s. Keepalives have ZERO effect on app-level timers. - Exp 3b: httpx read=8s, keepalive=2s -> SUCCESS at 51s. Keepalives DO prevent transport-level read timeouts. The Copilot extension 60s limit is almost certainly application-level (hardcoded wall-clock), so default_wait_seconds=50 remains the correct mitigation (returns before the 60s deadline). The keepalives provide defence-in-depth against any proxy/NAT inactivity drops. --- app/mcp_server.py | 58 ++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 50 insertions(+), 8 deletions(-) diff --git a/app/mcp_server.py b/app/mcp_server.py index 8d000d3..42427fe 100644 --- a/app/mcp_server.py +++ b/app/mcp_server.py @@ -11,7 +11,7 @@ import asyncio import logging from typing import Optional -from mcp.server.fastmcp import FastMCP +from mcp.server.fastmcp import FastMCP, Context from app.config import settings from app.services import config_service, event_service, instruction_service, status_service @@ -38,11 +38,26 @@ _MAX_WAIT_SECONDS = 86400 # 24 hours # preventing abandoned (timed-out) coroutines from silently consuming queue items. _agent_generations: dict[str, int] = {} +# --------------------------------------------------------------------------- +# Keepalive configuration +# --------------------------------------------------------------------------- +# When > 0, the wait loop sends a ctx.info() log notification to the client +# every KEEPALIVE_INTERVAL_SECONDS. This keeps the underlying SSE stream +# active and prevents transport-level read timeouts (e.g. httpx read timeout). +# +# IMPORTANT: This does NOT help against application-level wall-clock timeouts +# (anyio.fail_after / JS SDK equivalents). Those timers count from request +# start and are unaffected by intermediate SSE events. +# +# Set to 0 to disable keepalives entirely. +KEEPALIVE_INTERVAL_SECONDS: float = 20.0 + @mcp.tool() async def get_user_request( agent_id: str = "unknown", default_response_override: Optional[str] = None, + ctx: Optional[Context] = None, ) -> dict: """ Fetch the next pending user instruction from the queue. @@ -64,14 +79,11 @@ async def get_user_request( # Wait time is entirely server-controlled — the user sets it via the web UI. actual_wait = min(cfg.default_wait_seconds, _MAX_WAIT_SECONDS) - # Register this call as the newest for this agent. Any older coroutines - # still lingering (e.g. client timed-out and retried) will see a stale - # generation and skip the consume step, leaving the instruction for us. + # Register this call as the newest for this agent. my_gen = _agent_generations.get(agent_id, 0) + 1 _agent_generations[agent_id] = my_gen def _i_am_active() -> bool: - """True if no newer call has arrived for this agent since we started.""" return _agent_generations.get(agent_id) == my_gen # --- Attempt immediate dequeue --- @@ -104,6 +116,7 @@ async def get_user_request( wakeup = instruction_service.get_wakeup_event() loop = asyncio.get_event_loop() start = loop.time() + last_keepalive = start while True: elapsed = loop.time() - start @@ -150,8 +163,38 @@ async def get_user_request( "waited_seconds": waited, } - # Sleep until woken by a new instruction or 1 s elapses (safety net) - wait_for = min(remaining, 1.0) + # Send keepalive notification if enabled and interval has elapsed. + # This writes an SSE event to the response stream, preventing transport-level + # read timeouts (e.g. httpx read timeout). Does NOT reset application-level + # wall-clock timers such as anyio.fail_after. + now = loop.time() + if KEEPALIVE_INTERVAL_SECONDS > 0 and ctx is not None: + if now - last_keepalive >= KEEPALIVE_INTERVAL_SECONDS: + waited_so_far = int(now - start) + try: + await ctx.info( + f"keepalive: waiting for instructions " + f"(agent={agent_id}, waited={waited_so_far}s)" + ) + logger.debug( + "get_user_request: keepalive sent agent=%s waited=%ds", + agent_id, waited_so_far, + ) + except Exception as exc: + # Client disconnected — no point continuing + logger.debug("get_user_request: keepalive failed (client gone?): %s", exc) + break + last_keepalive = now + + # Sleep until woken by a new instruction or until the next keepalive is due. + if KEEPALIVE_INTERVAL_SECONDS > 0: + time_to_next_keepalive = max( + 0.0, KEEPALIVE_INTERVAL_SECONDS - (loop.time() - last_keepalive) + ) + wait_for = min(remaining, time_to_next_keepalive, 1.0) + else: + wait_for = min(remaining, 1.0) + if wakeup is not None: try: await asyncio.wait_for(wakeup.wait(), timeout=wait_for) @@ -164,7 +207,6 @@ async def get_user_request( # --- Nothing available after waiting (or superseded) --- if _i_am_active(): - # Only record/broadcast when we're the active caller status_service.record_agent_activity(agent_id, "empty") event_service.broadcast("status.changed", {})