EXPERIMENT (NOT FOR PRODUCTION YET)
Changes:
- KEEPALIVE_INTERVAL_SECONDS reduced from 20s to 5s
- Keepalive messages now show progress bar with dots: ●●●●○○○○○○
- Show elapsed time, total wait, and remaining seconds
- Example: ⏳ Waiting for instructions... ●●●●○○○○○○ 20s / 50s (agent=copilot, 30s remaining)
Goal: Test if more frequent progress updates provide better UX and prevent
perceived freezing during the 50s wait. No functional change - the
60s client timeout limit remains the binding constraint.
247 lines
9.5 KiB
Python
247 lines
9.5 KiB
Python
"""
|
|
app/mcp_server.py
|
|
FastMCP server definition with the get_user_request tool.
|
|
The Starlette app returned by mcp.streamable_http_app() is mounted into
|
|
the main FastAPI application at /mcp.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
from typing import Optional
|
|
|
|
from mcp.server.fastmcp import FastMCP, Context
|
|
|
|
from app.config import settings
|
|
from app.services import config_service, event_service, instruction_service, status_service
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
mcp = FastMCP(
|
|
settings.mcp_server_name,
|
|
streamable_http_path="/",
|
|
stateless_http=settings.mcp_stateless,
|
|
)
|
|
|
|
# Build the ASGI app eagerly so that session_manager is created and can be
|
|
# started explicitly inside the FastAPI lifespan (see main.py).
|
|
mcp_asgi_app = mcp.streamable_http_app()
|
|
|
|
# Maximum wait the client is allowed to request (guards against runaway holds)
|
|
# Set very high — the wait is always interruptible by new instructions via the
|
|
# asyncio.Event wakeup, so there is no practical danger in long waits.
|
|
_MAX_WAIT_SECONDS = 86400 # 24 hours
|
|
|
|
# Default wait time when no instruction is available (seconds)
|
|
# Set to 50s to stay safely under the 60s client timeout while allowing
|
|
# multiple keepalive progress updates.
|
|
DEFAULT_WAIT_SECONDS = 50
|
|
|
|
# Default response when queue is empty after waiting
|
|
DEFAULT_EMPTY_RESPONSE = "call this tool `get_user_request` again to fetch latest user input..."
|
|
|
|
# Per-agent generation counter — incremented on every new call.
|
|
# The wait loop only consumes an instruction when it holds the latest generation,
|
|
# preventing abandoned (timed-out) coroutines from silently consuming queue items.
|
|
_agent_generations: dict[str, int] = {}
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Keepalive configuration
|
|
# ---------------------------------------------------------------------------
|
|
# When > 0, the wait loop sends a ctx.info() log notification to the client
|
|
# every KEEPALIVE_INTERVAL_SECONDS. This keeps the underlying SSE stream
|
|
# active and prevents transport-level read timeouts (e.g. httpx read timeout).
|
|
#
|
|
# IMPORTANT: This does NOT help against application-level wall-clock timeouts
|
|
# (anyio.fail_after / JS SDK equivalents). Those timers count from request
|
|
# start and are unaffected by intermediate SSE events.
|
|
#
|
|
# Set to 0 to disable keepalives entirely.
|
|
# EXPERIMENT: Reduced from 20.0 to 5.0 for more frequent progress updates
|
|
KEEPALIVE_INTERVAL_SECONDS: float = 5.0
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_user_request(
|
|
agent_id: str = "unknown",
|
|
default_response_override: Optional[str] = None,
|
|
ctx: Optional[Context] = None,
|
|
) -> dict:
|
|
"""
|
|
Fetch the next pending user instruction from the queue.
|
|
|
|
If no instruction is available the tool will wait up to `wait_seconds`
|
|
(or the server-configured default) before returning an empty / default response.
|
|
|
|
Args:
|
|
agent_id: An identifier for this agent instance (used to track connectivity).
|
|
default_response_override: Override the server-default empty response text
|
|
for this single call.
|
|
|
|
Returns:
|
|
A dict with keys: status, result_type, instruction, response,
|
|
remaining_pending, waited_seconds.
|
|
"""
|
|
# Wait time is hardcoded to stay safely under the 60s client timeout
|
|
actual_wait = min(DEFAULT_WAIT_SECONDS, _MAX_WAIT_SECONDS)
|
|
|
|
# Register this call as the newest for this agent.
|
|
my_gen = _agent_generations.get(agent_id, 0) + 1
|
|
_agent_generations[agent_id] = my_gen
|
|
|
|
def _i_am_active() -> bool:
|
|
return _agent_generations.get(agent_id) == my_gen
|
|
|
|
# --- Attempt immediate dequeue ---
|
|
item = instruction_service.consume_next(agent_id=agent_id)
|
|
if item is not None:
|
|
counts = instruction_service.get_queue_counts()
|
|
status_service.record_agent_activity(agent_id, "instruction")
|
|
event_service.broadcast(
|
|
"instruction.consumed",
|
|
{"item": item.model_dump(mode="json"), "consumed_by_agent_id": agent_id},
|
|
)
|
|
event_service.broadcast("status.changed", {"queue": counts})
|
|
logger.info(
|
|
"get_user_request: instruction delivered id=%s agent=%s", item.id, agent_id
|
|
)
|
|
return {
|
|
"status": "ok",
|
|
"result_type": "instruction",
|
|
"instruction": {
|
|
"id": item.id,
|
|
"content": item.content,
|
|
"consumed_at": item.consumed_at.isoformat() if item.consumed_at else None,
|
|
},
|
|
"response": None,
|
|
"remaining_pending": counts["pending_count"],
|
|
"waited_seconds": 0,
|
|
}
|
|
|
|
# --- Wait loop (event-driven, not polling) ---
|
|
wakeup = instruction_service.get_wakeup_event()
|
|
loop = asyncio.get_event_loop()
|
|
start = loop.time()
|
|
last_keepalive = start
|
|
|
|
while True:
|
|
elapsed = loop.time() - start
|
|
remaining = actual_wait - elapsed
|
|
if remaining <= 0:
|
|
break
|
|
|
|
# If a newer call for this agent arrived, step aside without consuming.
|
|
if not _i_am_active():
|
|
logger.debug(
|
|
"get_user_request: superseded by newer call agent=%s gen=%d", agent_id, my_gen
|
|
)
|
|
break
|
|
|
|
# Clear the event BEFORE checking the queue so we never miss a
|
|
# wake-up that arrives between the DB check and event.wait().
|
|
if wakeup is not None:
|
|
wakeup.clear()
|
|
|
|
item = instruction_service.consume_next(agent_id=agent_id)
|
|
if item is not None:
|
|
counts = instruction_service.get_queue_counts()
|
|
status_service.record_agent_activity(agent_id, "instruction")
|
|
event_service.broadcast(
|
|
"instruction.consumed",
|
|
{"item": item.model_dump(mode="json"), "consumed_by_agent_id": agent_id},
|
|
)
|
|
event_service.broadcast("status.changed", {"queue": counts})
|
|
waited = int(loop.time() - start)
|
|
logger.info(
|
|
"get_user_request: instruction delivered (after %ds wait) id=%s agent=%s gen=%d",
|
|
waited, item.id, agent_id, my_gen,
|
|
)
|
|
return {
|
|
"status": "ok",
|
|
"result_type": "instruction",
|
|
"instruction": {
|
|
"id": item.id,
|
|
"content": item.content,
|
|
"consumed_at": item.consumed_at.isoformat() if item.consumed_at else None,
|
|
},
|
|
"response": None,
|
|
"remaining_pending": counts["pending_count"],
|
|
"waited_seconds": waited,
|
|
}
|
|
|
|
# Send keepalive notification if enabled and interval has elapsed.
|
|
# This writes an SSE event to the response stream, preventing transport-level
|
|
# read timeouts (e.g. httpx read timeout). Does NOT reset application-level
|
|
# wall-clock timers such as anyio.fail_after.
|
|
now = loop.time()
|
|
if KEEPALIVE_INTERVAL_SECONDS > 0 and ctx is not None:
|
|
if now - last_keepalive >= KEEPALIVE_INTERVAL_SECONDS:
|
|
waited_so_far = int(now - start)
|
|
remaining_sec = max(0, actual_wait - waited_so_far)
|
|
# Progress bar: filled dots proportional to elapsed time
|
|
progress_pct = min(100, int((waited_so_far / actual_wait) * 100))
|
|
filled = int(progress_pct / 10)
|
|
bar = "●" * filled + "○" * (10 - filled)
|
|
try:
|
|
await ctx.info(
|
|
f"⏳ Waiting for instructions... {bar} "
|
|
f"{waited_so_far}s / {actual_wait}s (agent={agent_id}, {remaining_sec}s remaining)"
|
|
)
|
|
logger.debug(
|
|
"get_user_request: keepalive sent agent=%s waited=%ds progress=%d%%",
|
|
agent_id, waited_so_far, progress_pct,
|
|
)
|
|
except Exception as exc:
|
|
# Client disconnected — no point continuing
|
|
logger.debug("get_user_request: keepalive failed (client gone?): %s", exc)
|
|
break
|
|
last_keepalive = now
|
|
|
|
# Sleep until woken by a new instruction or until the next keepalive is due.
|
|
if KEEPALIVE_INTERVAL_SECONDS > 0:
|
|
time_to_next_keepalive = max(
|
|
0.0, KEEPALIVE_INTERVAL_SECONDS - (loop.time() - last_keepalive)
|
|
)
|
|
wait_for = min(remaining, time_to_next_keepalive, 1.0)
|
|
else:
|
|
wait_for = min(remaining, 1.0)
|
|
|
|
if wakeup is not None:
|
|
try:
|
|
await asyncio.wait_for(wakeup.wait(), timeout=wait_for)
|
|
except asyncio.TimeoutError:
|
|
pass
|
|
else:
|
|
await asyncio.sleep(wait_for)
|
|
|
|
waited = int(loop.time() - start)
|
|
|
|
# --- Nothing available after waiting (or superseded) ---
|
|
if _i_am_active():
|
|
status_service.record_agent_activity(agent_id, "empty")
|
|
event_service.broadcast("status.changed", {})
|
|
|
|
empty_response = (
|
|
default_response_override
|
|
if default_response_override is not None
|
|
else DEFAULT_EMPTY_RESPONSE
|
|
)
|
|
result_type = "default_response" if empty_response else "empty"
|
|
|
|
if _i_am_active():
|
|
logger.info(
|
|
"get_user_request: empty result_type=%s waited=%ds agent=%s gen=%d",
|
|
result_type, waited, agent_id, my_gen,
|
|
)
|
|
|
|
return {
|
|
"status": "ok",
|
|
"result_type": result_type,
|
|
"instruction": None,
|
|
"response": empty_response,
|
|
"remaining_pending": 0,
|
|
"waited_seconds": waited,
|
|
}
|
|
|