""" app/mcp_server.py FastMCP server definition with the get_user_request tool. The Starlette app returned by mcp.streamable_http_app() is mounted into the main FastAPI application at /mcp. """ from __future__ import annotations import asyncio import logging from typing import Optional from mcp.server.fastmcp import FastMCP, Context from app.config import settings from app.services import config_service, event_service, instruction_service, status_service logger = logging.getLogger(__name__) mcp = FastMCP( settings.mcp_server_name, streamable_http_path="/", stateless_http=settings.mcp_stateless, ) # Build the ASGI app eagerly so that session_manager is created and can be # started explicitly inside the FastAPI lifespan (see main.py). mcp_asgi_app = mcp.streamable_http_app() # Maximum wait the client is allowed to request (guards against runaway holds) # Set very high — the wait is always interruptible by new instructions via the # asyncio.Event wakeup, so there is no practical danger in long waits. _MAX_WAIT_SECONDS = 86400 # 24 hours # Per-agent generation counter — incremented on every new call. # The wait loop only consumes an instruction when it holds the latest generation, # preventing abandoned (timed-out) coroutines from silently consuming queue items. _agent_generations: dict[str, int] = {} # --------------------------------------------------------------------------- # Keepalive configuration # --------------------------------------------------------------------------- # When > 0, the wait loop sends a ctx.info() log notification to the client # every KEEPALIVE_INTERVAL_SECONDS. This keeps the underlying SSE stream # active and prevents transport-level read timeouts (e.g. httpx read timeout). # # IMPORTANT: This does NOT help against application-level wall-clock timeouts # (anyio.fail_after / JS SDK equivalents). Those timers count from request # start and are unaffected by intermediate SSE events. # # Set to 0 to disable keepalives entirely. KEEPALIVE_INTERVAL_SECONDS: float = 20.0 @mcp.tool() async def get_user_request( agent_id: str = "unknown", default_response_override: Optional[str] = None, ctx: Optional[Context] = None, ) -> dict: """ Fetch the next pending user instruction from the queue. If no instruction is available the tool will wait up to `wait_seconds` (or the server-configured default) before returning an empty / default response. Args: agent_id: An identifier for this agent instance (used to track connectivity). default_response_override: Override the server-default empty response text for this single call. Returns: A dict with keys: status, result_type, instruction, response, remaining_pending, waited_seconds. """ cfg = config_service.get_config() # Wait time is entirely server-controlled — the user sets it via the web UI. actual_wait = min(cfg.default_wait_seconds, _MAX_WAIT_SECONDS) # Register this call as the newest for this agent. my_gen = _agent_generations.get(agent_id, 0) + 1 _agent_generations[agent_id] = my_gen def _i_am_active() -> bool: return _agent_generations.get(agent_id) == my_gen # --- Attempt immediate dequeue --- item = instruction_service.consume_next(agent_id=agent_id) if item is not None: counts = instruction_service.get_queue_counts() status_service.record_agent_activity(agent_id, "instruction") event_service.broadcast( "instruction.consumed", {"item": item.model_dump(mode="json"), "consumed_by_agent_id": agent_id}, ) event_service.broadcast("status.changed", {"queue": counts}) logger.info( "get_user_request: instruction delivered id=%s agent=%s", item.id, agent_id ) return { "status": "ok", "result_type": "instruction", "instruction": { "id": item.id, "content": item.content, "consumed_at": item.consumed_at.isoformat() if item.consumed_at else None, }, "response": None, "remaining_pending": counts["pending_count"], "waited_seconds": 0, } # --- Wait loop (event-driven, not polling) --- wakeup = instruction_service.get_wakeup_event() loop = asyncio.get_event_loop() start = loop.time() last_keepalive = start while True: elapsed = loop.time() - start remaining = actual_wait - elapsed if remaining <= 0: break # If a newer call for this agent arrived, step aside without consuming. if not _i_am_active(): logger.debug( "get_user_request: superseded by newer call agent=%s gen=%d", agent_id, my_gen ) break # Clear the event BEFORE checking the queue so we never miss a # wake-up that arrives between the DB check and event.wait(). if wakeup is not None: wakeup.clear() item = instruction_service.consume_next(agent_id=agent_id) if item is not None: counts = instruction_service.get_queue_counts() status_service.record_agent_activity(agent_id, "instruction") event_service.broadcast( "instruction.consumed", {"item": item.model_dump(mode="json"), "consumed_by_agent_id": agent_id}, ) event_service.broadcast("status.changed", {"queue": counts}) waited = int(loop.time() - start) logger.info( "get_user_request: instruction delivered (after %ds wait) id=%s agent=%s gen=%d", waited, item.id, agent_id, my_gen, ) return { "status": "ok", "result_type": "instruction", "instruction": { "id": item.id, "content": item.content, "consumed_at": item.consumed_at.isoformat() if item.consumed_at else None, }, "response": None, "remaining_pending": counts["pending_count"], "waited_seconds": waited, } # Send keepalive notification if enabled and interval has elapsed. # This writes an SSE event to the response stream, preventing transport-level # read timeouts (e.g. httpx read timeout). Does NOT reset application-level # wall-clock timers such as anyio.fail_after. now = loop.time() if KEEPALIVE_INTERVAL_SECONDS > 0 and ctx is not None: if now - last_keepalive >= KEEPALIVE_INTERVAL_SECONDS: waited_so_far = int(now - start) try: await ctx.info( f"keepalive: waiting for instructions " f"(agent={agent_id}, waited={waited_so_far}s)" ) logger.debug( "get_user_request: keepalive sent agent=%s waited=%ds", agent_id, waited_so_far, ) except Exception as exc: # Client disconnected — no point continuing logger.debug("get_user_request: keepalive failed (client gone?): %s", exc) break last_keepalive = now # Sleep until woken by a new instruction or until the next keepalive is due. if KEEPALIVE_INTERVAL_SECONDS > 0: time_to_next_keepalive = max( 0.0, KEEPALIVE_INTERVAL_SECONDS - (loop.time() - last_keepalive) ) wait_for = min(remaining, time_to_next_keepalive, 1.0) else: wait_for = min(remaining, 1.0) if wakeup is not None: try: await asyncio.wait_for(wakeup.wait(), timeout=wait_for) except asyncio.TimeoutError: pass else: await asyncio.sleep(wait_for) waited = int(loop.time() - start) # --- Nothing available after waiting (or superseded) --- if _i_am_active(): status_service.record_agent_activity(agent_id, "empty") event_service.broadcast("status.changed", {}) empty_response = ( default_response_override if default_response_override is not None else cfg.default_empty_response ) result_type = "default_response" if empty_response else "empty" if _i_am_active(): logger.info( "get_user_request: empty result_type=%s waited=%ds agent=%s gen=%d", result_type, waited, agent_id, my_gen, ) return { "status": "ok", "result_type": result_type, "instruction": None, "response": empty_response, "remaining_pending": 0, "waited_seconds": waited, }