Files
local-mcp/tests/run_keepalive_experiments.py
Brandon Zhang b1fdd98740 fix(script): add Windows .venv/Scripts path fallback in server.sh
On Windows the venv Python binary lives at .venv/Scripts/python.exe,
not .venv/bin/python.  Fall back to the Windows path when the Unix
path does not exist so the script works cross-platform.
2026-03-27 13:53:38 +08:00

226 lines
9.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
tests/run_keepalive_experiments.py
Runs all 4 keepalive experiments against the running local-mcp server.
Requirements:
- Server must be running at http://localhost:8000
- Server keepalive interval must be set to 2s (KEEPALIVE_INTERVAL_SECONDS=2.0)
- Server default_wait_seconds should be >= 30 (e.g. 50)
"""
import asyncio
import logging
import sys
import time
from datetime import timedelta
import httpx
from mcp import ClientSession
from mcp.client.streamable_http import streamable_http_client
# stdout unbuffered
logging.basicConfig(
level=logging.WARNING, # suppress httpx noise
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
stream=sys.stdout,
)
SERVER_URL = "http://localhost:8000/mcp"
SEP = "=" * 70
def hdr(n: int, title: str) -> None:
print(f"\n{SEP}")
print(f" EXPERIMENT {n}: {title}")
print(SEP)
def ok(msg: str) -> None:
print(f"{msg}")
def fail(msg: str) -> None:
print(f"{msg}")
def info(msg: str) -> None:
print(f" {msg}")
# ── helpers ────────────────────────────────────────────────────────────────
async def call_app_timeout(agent: str, timeout_s: float) -> tuple[str, float]:
"""Call with anyio.fail_after-style application-level timeout."""
start = time.perf_counter()
try:
async with streamable_http_client(SERVER_URL) as (r, w, _):
async with ClientSession(r, w) as s:
await s.initialize()
result = await s.call_tool(
"get_user_request",
{"agent_id": agent},
read_timeout_seconds=timedelta(seconds=timeout_s),
)
return "success", time.perf_counter() - start
except Exception as exc:
return f"{type(exc).__name__}: {exc}", time.perf_counter() - start
async def call_transport_timeout(agent: str, read_s: float) -> tuple[str, float]:
"""Call with httpx transport-level read timeout (no app-level override)."""
start = time.perf_counter()
try:
client = httpx.AsyncClient(
timeout=httpx.Timeout(connect=10.0, read=read_s, write=10.0, pool=10.0),
follow_redirects=True,
)
async with client:
async with streamable_http_client(SERVER_URL, http_client=client) as (r, w, _):
async with ClientSession(r, w) as s:
await s.initialize()
# No read_timeout_seconds → relies purely on httpx transport
result = await s.call_tool(
"get_user_request",
{"agent_id": agent},
)
return "success", time.perf_counter() - start
except Exception as exc:
return f"{type(exc).__name__}: {exc}", time.perf_counter() - start
# ── experiments ────────────────────────────────────────────────────────────
async def exp1() -> None:
hdr(1, "Application-level timeout, NO keepalives reaching client (5s)")
info("Mechanism: anyio.fail_after(5) inside MCP session.send_request()")
info("Server wait=50s, no instruction queued → server will silently hold")
info("Expected: TimeoutError (McpError/ExceptionGroup) after ~5s")
outcome, elapsed = await call_app_timeout("exp1-agent", 5.0)
print(f" Result: {outcome}")
print(f" Elapsed: {elapsed:.2f}s")
if elapsed < 10 and "success" not in outcome:
ok(f"App-level timeout fires in {elapsed:.2f}s — no bytes needed to trigger it")
else:
fail("Unexpected result")
async def exp2() -> None:
hdr(2, "Application-level timeout WITH ctx.info() keepalives every 2s (timeout=10s)")
info("Mechanism: anyio.fail_after(10) inside MCP session.send_request()")
info("Server sends ctx.info() every 2s → SSE events arrive before timeout")
info("Expected: STILL times out after 10s — anyio.fail_after is unaffected by SSE events")
info("(anyio.fail_after is a wall-clock timer — it does NOT reset on data receipt)")
outcome, elapsed = await call_app_timeout("exp2-agent", 10.0)
print(f" Result: {outcome}")
print(f" Elapsed: {elapsed:.2f}s")
if 9 < elapsed < 16 and "success" not in outcome:
ok(f"App-level timeout fires after {elapsed:.2f}s despite keepalives → keepalives DON'T help here")
elif "success" in outcome:
fail(f"Unexpected success in {elapsed:.2f}s — keepalives somehow helped (impossible with anyio.fail_after?)")
else:
info(f"Timing unexpected ({elapsed:.2f}s) — investigate")
async def exp3() -> None:
hdr(3, "Transport-level (httpx) read timeout, NO keepalives (read=5s)")
info("Mechanism: httpx read timeout — fires if no bytes on SSE for 5s")
info("Note: keepalives are still running on server, so this tests")
info(" whether ANY SSE bytes arrive to prevent the httpx timeout.")
info("Expected: Timeout fires in ~5s (before first 2s keepalive fires)")
info(" Wait — actually keepalive fires at 2s, so httpx sees bytes before 5s → SUCCESS?")
info(" This experiment tests: does a transport timeout fire BETWEEN keepalives?")
# For this to be a proper baseline, we'd need KEEPALIVE=0 on the server.
# Instead we use read=1.5s (less than the 2s keepalive interval) to catch
# the gap between keepalives.
outcome, elapsed = await call_transport_timeout("exp3-agent", read_s=1.5)
print(f" Result: {outcome}")
print(f" Elapsed: {elapsed:.2f}s")
if "success" not in outcome and elapsed < 10:
ok(f"Transport timeout fires in {elapsed:.2f}s when read window < keepalive interval")
info(" → confirms httpx read timeout IS reset by SSE bytes")
elif "success" in outcome:
info(f"Completed successfully in {elapsed:.2f}s (may have received queued instruction)")
else:
info(f"Other result ({elapsed:.2f}s): {outcome}")
async def exp4() -> None:
hdr(4, "Transport-level read timeout WITH keepalives (read=8s, keepalive=2s)")
info("Server sends ctx.info() every 2s → SSE bytes arrive every 2s")
info("httpx read timeout = 8s → resets every time bytes arrive")
info("Expected: NO timeout — tool runs to completion (~50s)")
info("(This may take 50s+ to complete...)")
outcome, elapsed = await call_transport_timeout("exp4-agent", read_s=8.0)
print(f" Result: {outcome}")
print(f" Elapsed: {elapsed:.2f}s")
if "success" in outcome and elapsed > 20:
ok(f"NO transport timeout after {elapsed:.2f}s! ctx.info() keepalives successfully")
ok(f"prevented httpx transport-level read timeouts by keeping SSE bytes flowing.")
elif "success" not in outcome:
fail(f"Transport timeout still fired at {elapsed:.2f}s — investigate")
else:
info(f"Result ({elapsed:.2f}s): {outcome}")
# ── main ───────────────────────────────────────────────────────────────────
async def main() -> None:
print(SEP)
print(" MCP KEEPALIVE EXPERIMENTS")
print(" Server: http://localhost:8000")
print(" Server keepalive interval: 2s (KEEPALIVE_INTERVAL_SECONDS=2.0)")
print(" Server default_wait_seconds: 50")
print(SEP)
print()
print(" HYPOTHESIS:")
print(" The 60s Copilot timeout is an APPLICATION-LEVEL wall-clock timer")
print(" (equivalent to anyio.fail_after). Sending ctx.info() keepalives")
print(" keeps SSE bytes flowing, which resets TRANSPORT-LEVEL timeouts")
print(" (httpx read timeout) but NOT application-level timers.")
print()
print(" If the Copilot client uses a transport-level timeout, keepalives WILL help.")
print(" If it uses an app-level timer, keepalives will NOT help.")
print()
await exp1()
print()
await asyncio.sleep(1)
await exp2()
print()
await asyncio.sleep(1)
await exp3()
print()
await asyncio.sleep(1)
print(f"\n{SEP}")
print(" EXPERIMENT 4 runs to server wait completion (~50s). Starting...")
print(SEP)
await exp4()
print(f"\n{SEP}")
print(" SUMMARY")
print(SEP)
print(" Exp 1: anyio.fail_after(5s) → times out regardless (baseline)")
print(" Exp 2: anyio.fail_after(10s) + ctx.info() every 2s → STILL times out")
print(" (anyio.fail_after is immune to SSE bytes)")
print(" Exp 3: httpx read=1.5s < keepalive=2s → transport timeout fires")
print(" Exp 4: httpx read=8s, keepalive=2s → NO timeout, runs to completion")
print()
print(" CONCLUSION:")
print(" ctx.info() keepalives PREVENT transport-level httpx timeouts.")
print(" ctx.info() keepalives do NOT prevent application-level anyio.fail_after.")
print()
print(" For the Copilot 60s timeout: if it is transport-level, keepalives will fix it.")
print(" If it is app-level (likely for a hardcoded 60s wall-clock limit), they won't.")
print(" The character-by-character approach works ONLY for transport-level timeouts.")
print(SEP)
if __name__ == "__main__":
asyncio.run(main())