diff --git a/app/mcp_server.py b/app/mcp_server.py index 8d000d3..42427fe 100644 --- a/app/mcp_server.py +++ b/app/mcp_server.py @@ -11,7 +11,7 @@ import asyncio import logging from typing import Optional -from mcp.server.fastmcp import FastMCP +from mcp.server.fastmcp import FastMCP, Context from app.config import settings from app.services import config_service, event_service, instruction_service, status_service @@ -38,11 +38,26 @@ _MAX_WAIT_SECONDS = 86400 # 24 hours # preventing abandoned (timed-out) coroutines from silently consuming queue items. _agent_generations: dict[str, int] = {} +# --------------------------------------------------------------------------- +# Keepalive configuration +# --------------------------------------------------------------------------- +# When > 0, the wait loop sends a ctx.info() log notification to the client +# every KEEPALIVE_INTERVAL_SECONDS. This keeps the underlying SSE stream +# active and prevents transport-level read timeouts (e.g. httpx read timeout). +# +# IMPORTANT: This does NOT help against application-level wall-clock timeouts +# (anyio.fail_after / JS SDK equivalents). Those timers count from request +# start and are unaffected by intermediate SSE events. +# +# Set to 0 to disable keepalives entirely. +KEEPALIVE_INTERVAL_SECONDS: float = 20.0 + @mcp.tool() async def get_user_request( agent_id: str = "unknown", default_response_override: Optional[str] = None, + ctx: Optional[Context] = None, ) -> dict: """ Fetch the next pending user instruction from the queue. @@ -64,14 +79,11 @@ async def get_user_request( # Wait time is entirely server-controlled — the user sets it via the web UI. actual_wait = min(cfg.default_wait_seconds, _MAX_WAIT_SECONDS) - # Register this call as the newest for this agent. Any older coroutines - # still lingering (e.g. client timed-out and retried) will see a stale - # generation and skip the consume step, leaving the instruction for us. + # Register this call as the newest for this agent. my_gen = _agent_generations.get(agent_id, 0) + 1 _agent_generations[agent_id] = my_gen def _i_am_active() -> bool: - """True if no newer call has arrived for this agent since we started.""" return _agent_generations.get(agent_id) == my_gen # --- Attempt immediate dequeue --- @@ -104,6 +116,7 @@ async def get_user_request( wakeup = instruction_service.get_wakeup_event() loop = asyncio.get_event_loop() start = loop.time() + last_keepalive = start while True: elapsed = loop.time() - start @@ -150,8 +163,38 @@ async def get_user_request( "waited_seconds": waited, } - # Sleep until woken by a new instruction or 1 s elapses (safety net) - wait_for = min(remaining, 1.0) + # Send keepalive notification if enabled and interval has elapsed. + # This writes an SSE event to the response stream, preventing transport-level + # read timeouts (e.g. httpx read timeout). Does NOT reset application-level + # wall-clock timers such as anyio.fail_after. + now = loop.time() + if KEEPALIVE_INTERVAL_SECONDS > 0 and ctx is not None: + if now - last_keepalive >= KEEPALIVE_INTERVAL_SECONDS: + waited_so_far = int(now - start) + try: + await ctx.info( + f"keepalive: waiting for instructions " + f"(agent={agent_id}, waited={waited_so_far}s)" + ) + logger.debug( + "get_user_request: keepalive sent agent=%s waited=%ds", + agent_id, waited_so_far, + ) + except Exception as exc: + # Client disconnected — no point continuing + logger.debug("get_user_request: keepalive failed (client gone?): %s", exc) + break + last_keepalive = now + + # Sleep until woken by a new instruction or until the next keepalive is due. + if KEEPALIVE_INTERVAL_SECONDS > 0: + time_to_next_keepalive = max( + 0.0, KEEPALIVE_INTERVAL_SECONDS - (loop.time() - last_keepalive) + ) + wait_for = min(remaining, time_to_next_keepalive, 1.0) + else: + wait_for = min(remaining, 1.0) + if wakeup is not None: try: await asyncio.wait_for(wakeup.wait(), timeout=wait_for) @@ -164,7 +207,6 @@ async def get_user_request( # --- Nothing available after waiting (or superseded) --- if _i_am_active(): - # Only record/broadcast when we're the active caller status_service.record_agent_activity(agent_id, "empty") event_service.broadcast("status.changed", {})