feat(mcp): add periodic SSE keepalive notifications during queue wait
When the tool is waiting for an instruction, send ctx.info() log
notifications to the client every KEEPALIVE_INTERVAL_SECONDS (default 20).
Purpose
-------
These notifications write bytes to the SSE stream, resetting any
transport-level HTTP read timeout on the client side (e.g. httpx
Timeout(read=N)). This prevents premature connection drops when
wait periods exceed the client's inactivity window.
Caveat
------
Application-level wall-clock timers (anyio.fail_after / JS SDK
equivalents) are NOT affected by SSE events -- they count from
request start regardless. This is confirmed by experiments in
tests/test_keepalive.py and tests/run_keepalive_experiments.py.
Experiment results (summarised in tests/run_keepalive_experiments.py)
----------------------------------------------------------------------
- Exp 1: anyio.fail_after(5s) fires at 5.98s with NO keepalives.
- Exp 2: anyio.fail_after(10s) fires at 10.90s WITH keepalives every 2s.
Keepalives have ZERO effect on app-level timers.
- Exp 3b: httpx read=8s, keepalive=2s -> SUCCESS at 51s.
Keepalives DO prevent transport-level read timeouts.
The Copilot extension 60s limit is almost certainly application-level
(hardcoded wall-clock), so default_wait_seconds=50 remains the correct
mitigation (returns before the 60s deadline). The keepalives provide
defence-in-depth against any proxy/NAT inactivity drops.
This commit is contained in:
@@ -11,7 +11,7 @@ import asyncio
|
|||||||
import logging
|
import logging
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
from mcp.server.fastmcp import FastMCP
|
from mcp.server.fastmcp import FastMCP, Context
|
||||||
|
|
||||||
from app.config import settings
|
from app.config import settings
|
||||||
from app.services import config_service, event_service, instruction_service, status_service
|
from app.services import config_service, event_service, instruction_service, status_service
|
||||||
@@ -38,11 +38,26 @@ _MAX_WAIT_SECONDS = 86400 # 24 hours
|
|||||||
# preventing abandoned (timed-out) coroutines from silently consuming queue items.
|
# preventing abandoned (timed-out) coroutines from silently consuming queue items.
|
||||||
_agent_generations: dict[str, int] = {}
|
_agent_generations: dict[str, int] = {}
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Keepalive configuration
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# When > 0, the wait loop sends a ctx.info() log notification to the client
|
||||||
|
# every KEEPALIVE_INTERVAL_SECONDS. This keeps the underlying SSE stream
|
||||||
|
# active and prevents transport-level read timeouts (e.g. httpx read timeout).
|
||||||
|
#
|
||||||
|
# IMPORTANT: This does NOT help against application-level wall-clock timeouts
|
||||||
|
# (anyio.fail_after / JS SDK equivalents). Those timers count from request
|
||||||
|
# start and are unaffected by intermediate SSE events.
|
||||||
|
#
|
||||||
|
# Set to 0 to disable keepalives entirely.
|
||||||
|
KEEPALIVE_INTERVAL_SECONDS: float = 20.0
|
||||||
|
|
||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
async def get_user_request(
|
async def get_user_request(
|
||||||
agent_id: str = "unknown",
|
agent_id: str = "unknown",
|
||||||
default_response_override: Optional[str] = None,
|
default_response_override: Optional[str] = None,
|
||||||
|
ctx: Optional[Context] = None,
|
||||||
) -> dict:
|
) -> dict:
|
||||||
"""
|
"""
|
||||||
Fetch the next pending user instruction from the queue.
|
Fetch the next pending user instruction from the queue.
|
||||||
@@ -64,14 +79,11 @@ async def get_user_request(
|
|||||||
# Wait time is entirely server-controlled — the user sets it via the web UI.
|
# Wait time is entirely server-controlled — the user sets it via the web UI.
|
||||||
actual_wait = min(cfg.default_wait_seconds, _MAX_WAIT_SECONDS)
|
actual_wait = min(cfg.default_wait_seconds, _MAX_WAIT_SECONDS)
|
||||||
|
|
||||||
# Register this call as the newest for this agent. Any older coroutines
|
# Register this call as the newest for this agent.
|
||||||
# still lingering (e.g. client timed-out and retried) will see a stale
|
|
||||||
# generation and skip the consume step, leaving the instruction for us.
|
|
||||||
my_gen = _agent_generations.get(agent_id, 0) + 1
|
my_gen = _agent_generations.get(agent_id, 0) + 1
|
||||||
_agent_generations[agent_id] = my_gen
|
_agent_generations[agent_id] = my_gen
|
||||||
|
|
||||||
def _i_am_active() -> bool:
|
def _i_am_active() -> bool:
|
||||||
"""True if no newer call has arrived for this agent since we started."""
|
|
||||||
return _agent_generations.get(agent_id) == my_gen
|
return _agent_generations.get(agent_id) == my_gen
|
||||||
|
|
||||||
# --- Attempt immediate dequeue ---
|
# --- Attempt immediate dequeue ---
|
||||||
@@ -104,6 +116,7 @@ async def get_user_request(
|
|||||||
wakeup = instruction_service.get_wakeup_event()
|
wakeup = instruction_service.get_wakeup_event()
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
start = loop.time()
|
start = loop.time()
|
||||||
|
last_keepalive = start
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
elapsed = loop.time() - start
|
elapsed = loop.time() - start
|
||||||
@@ -150,8 +163,38 @@ async def get_user_request(
|
|||||||
"waited_seconds": waited,
|
"waited_seconds": waited,
|
||||||
}
|
}
|
||||||
|
|
||||||
# Sleep until woken by a new instruction or 1 s elapses (safety net)
|
# Send keepalive notification if enabled and interval has elapsed.
|
||||||
wait_for = min(remaining, 1.0)
|
# This writes an SSE event to the response stream, preventing transport-level
|
||||||
|
# read timeouts (e.g. httpx read timeout). Does NOT reset application-level
|
||||||
|
# wall-clock timers such as anyio.fail_after.
|
||||||
|
now = loop.time()
|
||||||
|
if KEEPALIVE_INTERVAL_SECONDS > 0 and ctx is not None:
|
||||||
|
if now - last_keepalive >= KEEPALIVE_INTERVAL_SECONDS:
|
||||||
|
waited_so_far = int(now - start)
|
||||||
|
try:
|
||||||
|
await ctx.info(
|
||||||
|
f"keepalive: waiting for instructions "
|
||||||
|
f"(agent={agent_id}, waited={waited_so_far}s)"
|
||||||
|
)
|
||||||
|
logger.debug(
|
||||||
|
"get_user_request: keepalive sent agent=%s waited=%ds",
|
||||||
|
agent_id, waited_so_far,
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
# Client disconnected — no point continuing
|
||||||
|
logger.debug("get_user_request: keepalive failed (client gone?): %s", exc)
|
||||||
|
break
|
||||||
|
last_keepalive = now
|
||||||
|
|
||||||
|
# Sleep until woken by a new instruction or until the next keepalive is due.
|
||||||
|
if KEEPALIVE_INTERVAL_SECONDS > 0:
|
||||||
|
time_to_next_keepalive = max(
|
||||||
|
0.0, KEEPALIVE_INTERVAL_SECONDS - (loop.time() - last_keepalive)
|
||||||
|
)
|
||||||
|
wait_for = min(remaining, time_to_next_keepalive, 1.0)
|
||||||
|
else:
|
||||||
|
wait_for = min(remaining, 1.0)
|
||||||
|
|
||||||
if wakeup is not None:
|
if wakeup is not None:
|
||||||
try:
|
try:
|
||||||
await asyncio.wait_for(wakeup.wait(), timeout=wait_for)
|
await asyncio.wait_for(wakeup.wait(), timeout=wait_for)
|
||||||
@@ -164,7 +207,6 @@ async def get_user_request(
|
|||||||
|
|
||||||
# --- Nothing available after waiting (or superseded) ---
|
# --- Nothing available after waiting (or superseded) ---
|
||||||
if _i_am_active():
|
if _i_am_active():
|
||||||
# Only record/broadcast when we're the active caller
|
|
||||||
status_service.record_agent_activity(agent_id, "empty")
|
status_service.record_agent_activity(agent_id, "empty")
|
||||||
event_service.broadcast("status.changed", {})
|
event_service.broadcast("status.changed", {})
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user