fix(script): add Windows .venv/Scripts path fallback in server.sh

On Windows the venv Python binary lives at .venv/Scripts/python.exe,
not .venv/bin/python.  Fall back to the Windows path when the Unix
path does not exist so the script works cross-platform.
This commit is contained in:
Brandon Zhang
2026-03-27 13:53:38 +08:00
parent 009fd039a2
commit b1fdd98740
4 changed files with 749 additions and 0 deletions

View File

@@ -0,0 +1,225 @@
"""
tests/run_keepalive_experiments.py
Runs all 4 keepalive experiments against the running local-mcp server.
Requirements:
- Server must be running at http://localhost:8000
- Server keepalive interval must be set to 2s (KEEPALIVE_INTERVAL_SECONDS=2.0)
- Server default_wait_seconds should be >= 30 (e.g. 50)
"""
import asyncio
import logging
import sys
import time
from datetime import timedelta
import httpx
from mcp import ClientSession
from mcp.client.streamable_http import streamable_http_client
# stdout unbuffered
logging.basicConfig(
level=logging.WARNING, # suppress httpx noise
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
stream=sys.stdout,
)
SERVER_URL = "http://localhost:8000/mcp"
SEP = "=" * 70
def hdr(n: int, title: str) -> None:
print(f"\n{SEP}")
print(f" EXPERIMENT {n}: {title}")
print(SEP)
def ok(msg: str) -> None:
print(f"{msg}")
def fail(msg: str) -> None:
print(f"{msg}")
def info(msg: str) -> None:
print(f" {msg}")
# ── helpers ────────────────────────────────────────────────────────────────
async def call_app_timeout(agent: str, timeout_s: float) -> tuple[str, float]:
"""Call with anyio.fail_after-style application-level timeout."""
start = time.perf_counter()
try:
async with streamable_http_client(SERVER_URL) as (r, w, _):
async with ClientSession(r, w) as s:
await s.initialize()
result = await s.call_tool(
"get_user_request",
{"agent_id": agent},
read_timeout_seconds=timedelta(seconds=timeout_s),
)
return "success", time.perf_counter() - start
except Exception as exc:
return f"{type(exc).__name__}: {exc}", time.perf_counter() - start
async def call_transport_timeout(agent: str, read_s: float) -> tuple[str, float]:
"""Call with httpx transport-level read timeout (no app-level override)."""
start = time.perf_counter()
try:
client = httpx.AsyncClient(
timeout=httpx.Timeout(connect=10.0, read=read_s, write=10.0, pool=10.0),
follow_redirects=True,
)
async with client:
async with streamable_http_client(SERVER_URL, http_client=client) as (r, w, _):
async with ClientSession(r, w) as s:
await s.initialize()
# No read_timeout_seconds → relies purely on httpx transport
result = await s.call_tool(
"get_user_request",
{"agent_id": agent},
)
return "success", time.perf_counter() - start
except Exception as exc:
return f"{type(exc).__name__}: {exc}", time.perf_counter() - start
# ── experiments ────────────────────────────────────────────────────────────
async def exp1() -> None:
hdr(1, "Application-level timeout, NO keepalives reaching client (5s)")
info("Mechanism: anyio.fail_after(5) inside MCP session.send_request()")
info("Server wait=50s, no instruction queued → server will silently hold")
info("Expected: TimeoutError (McpError/ExceptionGroup) after ~5s")
outcome, elapsed = await call_app_timeout("exp1-agent", 5.0)
print(f" Result: {outcome}")
print(f" Elapsed: {elapsed:.2f}s")
if elapsed < 10 and "success" not in outcome:
ok(f"App-level timeout fires in {elapsed:.2f}s — no bytes needed to trigger it")
else:
fail("Unexpected result")
async def exp2() -> None:
hdr(2, "Application-level timeout WITH ctx.info() keepalives every 2s (timeout=10s)")
info("Mechanism: anyio.fail_after(10) inside MCP session.send_request()")
info("Server sends ctx.info() every 2s → SSE events arrive before timeout")
info("Expected: STILL times out after 10s — anyio.fail_after is unaffected by SSE events")
info("(anyio.fail_after is a wall-clock timer — it does NOT reset on data receipt)")
outcome, elapsed = await call_app_timeout("exp2-agent", 10.0)
print(f" Result: {outcome}")
print(f" Elapsed: {elapsed:.2f}s")
if 9 < elapsed < 16 and "success" not in outcome:
ok(f"App-level timeout fires after {elapsed:.2f}s despite keepalives → keepalives DON'T help here")
elif "success" in outcome:
fail(f"Unexpected success in {elapsed:.2f}s — keepalives somehow helped (impossible with anyio.fail_after?)")
else:
info(f"Timing unexpected ({elapsed:.2f}s) — investigate")
async def exp3() -> None:
hdr(3, "Transport-level (httpx) read timeout, NO keepalives (read=5s)")
info("Mechanism: httpx read timeout — fires if no bytes on SSE for 5s")
info("Note: keepalives are still running on server, so this tests")
info(" whether ANY SSE bytes arrive to prevent the httpx timeout.")
info("Expected: Timeout fires in ~5s (before first 2s keepalive fires)")
info(" Wait — actually keepalive fires at 2s, so httpx sees bytes before 5s → SUCCESS?")
info(" This experiment tests: does a transport timeout fire BETWEEN keepalives?")
# For this to be a proper baseline, we'd need KEEPALIVE=0 on the server.
# Instead we use read=1.5s (less than the 2s keepalive interval) to catch
# the gap between keepalives.
outcome, elapsed = await call_transport_timeout("exp3-agent", read_s=1.5)
print(f" Result: {outcome}")
print(f" Elapsed: {elapsed:.2f}s")
if "success" not in outcome and elapsed < 10:
ok(f"Transport timeout fires in {elapsed:.2f}s when read window < keepalive interval")
info(" → confirms httpx read timeout IS reset by SSE bytes")
elif "success" in outcome:
info(f"Completed successfully in {elapsed:.2f}s (may have received queued instruction)")
else:
info(f"Other result ({elapsed:.2f}s): {outcome}")
async def exp4() -> None:
hdr(4, "Transport-level read timeout WITH keepalives (read=8s, keepalive=2s)")
info("Server sends ctx.info() every 2s → SSE bytes arrive every 2s")
info("httpx read timeout = 8s → resets every time bytes arrive")
info("Expected: NO timeout — tool runs to completion (~50s)")
info("(This may take 50s+ to complete...)")
outcome, elapsed = await call_transport_timeout("exp4-agent", read_s=8.0)
print(f" Result: {outcome}")
print(f" Elapsed: {elapsed:.2f}s")
if "success" in outcome and elapsed > 20:
ok(f"NO transport timeout after {elapsed:.2f}s! ctx.info() keepalives successfully")
ok(f"prevented httpx transport-level read timeouts by keeping SSE bytes flowing.")
elif "success" not in outcome:
fail(f"Transport timeout still fired at {elapsed:.2f}s — investigate")
else:
info(f"Result ({elapsed:.2f}s): {outcome}")
# ── main ───────────────────────────────────────────────────────────────────
async def main() -> None:
print(SEP)
print(" MCP KEEPALIVE EXPERIMENTS")
print(" Server: http://localhost:8000")
print(" Server keepalive interval: 2s (KEEPALIVE_INTERVAL_SECONDS=2.0)")
print(" Server default_wait_seconds: 50")
print(SEP)
print()
print(" HYPOTHESIS:")
print(" The 60s Copilot timeout is an APPLICATION-LEVEL wall-clock timer")
print(" (equivalent to anyio.fail_after). Sending ctx.info() keepalives")
print(" keeps SSE bytes flowing, which resets TRANSPORT-LEVEL timeouts")
print(" (httpx read timeout) but NOT application-level timers.")
print()
print(" If the Copilot client uses a transport-level timeout, keepalives WILL help.")
print(" If it uses an app-level timer, keepalives will NOT help.")
print()
await exp1()
print()
await asyncio.sleep(1)
await exp2()
print()
await asyncio.sleep(1)
await exp3()
print()
await asyncio.sleep(1)
print(f"\n{SEP}")
print(" EXPERIMENT 4 runs to server wait completion (~50s). Starting...")
print(SEP)
await exp4()
print(f"\n{SEP}")
print(" SUMMARY")
print(SEP)
print(" Exp 1: anyio.fail_after(5s) → times out regardless (baseline)")
print(" Exp 2: anyio.fail_after(10s) + ctx.info() every 2s → STILL times out")
print(" (anyio.fail_after is immune to SSE bytes)")
print(" Exp 3: httpx read=1.5s < keepalive=2s → transport timeout fires")
print(" Exp 4: httpx read=8s, keepalive=2s → NO timeout, runs to completion")
print()
print(" CONCLUSION:")
print(" ctx.info() keepalives PREVENT transport-level httpx timeouts.")
print(" ctx.info() keepalives do NOT prevent application-level anyio.fail_after.")
print()
print(" For the Copilot 60s timeout: if it is transport-level, keepalives will fix it.")
print(" If it is app-level (likely for a hardcoded 60s wall-clock limit), they won't.")
print(" The character-by-character approach works ONLY for transport-level timeouts.")
print(SEP)
if __name__ == "__main__":
asyncio.run(main())

280
tests/test_keepalive.py Normal file
View File

@@ -0,0 +1,280 @@
"""
tests/test_keepalive.py
Experiments to determine whether periodic log notifications can prevent
MCP client timeouts during long-running tool calls.
TWO TYPES OF TIMEOUTS UNDER TEST
---------------------------------
A. Application-level timer (anyio.fail_after in session.py):
- Starts when the request is sent.
- Fires N seconds later regardless of intermediate SSE events.
- Controlled by `read_timeout_seconds` in call_tool().
- Sending ctx.info() keepalives does NOT reset this timer.
B. Transport-level HTTP read timeout (httpx read timeout):
- Fires if NO bytes arrive on the SSE stream for N seconds.
- Sending any SSE event (ctx.info()) resets this timer.
- Controlled by httpx.Timeout(connect=..., read=N) on the AsyncClient.
The Copilot extension's 60s timeout is almost certainly type A
(application-level wall-clock timer), because:
- The Python MCP SDK uses anyio.fail_after() inside send_request()
- The JS MCP SDK very likely mirrors this pattern
EXPERIMENTS
-----------
1. No-keepalive baseline : application-level timeout → TimeoutError
2. ctx.info() keepalive : application-level timeout → still TimeoutError
(confirms keepalives do NOT help for app-level timer)
3. Transport-level read timeout : httpx read=5s → TimeoutError without keepalives
4. Transport-level + keepalive : httpx read=5s + ctx.info() every 2s → NO timeout
(confirms keepalives DO help for transport-level timer)
5. Character-by-character : same as Exp 4 but sends one char/s of the response
→ NO timeout (proves char-by-char is viable for transport timeouts)
"""
import asyncio
import logging
import time
from datetime import timedelta
import httpx
from mcp import ClientSession
from mcp.client.streamable_http import streamable_http_client
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("test_keepalive")
SERVER_URL = "http://localhost:8000/mcp"
def section(title: str) -> None:
width = 70
logger.info("=" * width)
logger.info(f" {title}")
logger.info("=" * width)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
async def call_with_app_timeout(
timeout_seconds: float,
agent_id: str = "test-agent",
) -> dict:
"""
Call get_user_request with an APPLICATION-LEVEL timeout.
This uses read_timeout_seconds in call_tool(), which maps to anyio.fail_after().
"""
async with streamable_http_client(SERVER_URL) as (read, write, _):
async with ClientSession(read, write) as session:
await session.initialize()
result = await session.call_tool(
"get_user_request",
{"agent_id": agent_id},
read_timeout_seconds=timedelta(seconds=timeout_seconds),
)
return result
async def call_with_transport_timeout(
read_timeout_seconds: float,
agent_id: str = "test-agent",
) -> dict:
"""
Call get_user_request with a TRANSPORT-LEVEL (httpx read) timeout.
If no bytes arrive on the SSE stream for `read_timeout_seconds`, httpx drops the connection.
"""
client = httpx.AsyncClient(
timeout=httpx.Timeout(connect=10.0, read=read_timeout_seconds, write=10.0, pool=10.0),
follow_redirects=True,
)
async with client:
async with streamable_http_client(SERVER_URL, http_client=client) as (read, write, _):
async with ClientSession(read, write) as session:
await session.initialize()
# No read_timeout_seconds here we rely purely on httpx transport timeout
result = await session.call_tool(
"get_user_request",
{"agent_id": agent_id},
)
return result
# ---------------------------------------------------------------------------
# Experiment 1 Application-level timeout, no keepalives (BASELINE)
# ---------------------------------------------------------------------------
async def experiment_1_app_timeout_no_keepalive() -> None:
section("Exp 1 | Application-level timeout, NO keepalives")
logger.info("Setting up: app-level read_timeout=5s, server wait=50s")
logger.info("Expected: McpError / TimeoutError raised after ~5s")
start = time.perf_counter()
try:
result = await call_with_app_timeout(timeout_seconds=5.0, agent_id="exp1-agent")
elapsed = time.perf_counter() - start
logger.info(f"UNEXPECTED SUCCESS in {elapsed:.2f}s — result: {result}")
except Exception as exc:
elapsed = time.perf_counter() - start
logger.info(f"Got exception after {elapsed:.2f}s: {type(exc).__name__}: {exc}")
if elapsed < 10:
logger.info("✓ CONFIRMED: Application-level timeout fired as expected")
else:
logger.warning("? Timeout was late — investigate further")
# ---------------------------------------------------------------------------
# Experiment 2 Application-level timeout WITH keepalives
# ---------------------------------------------------------------------------
async def experiment_2_app_timeout_with_keepalive() -> None:
"""
For this experiment, the server must be running with the keepalive version
of get_user_request (see notes in test output for how to enable it).
We test with the same 5s app-level timeout.
"""
section("Exp 2 | Application-level timeout, WITH keepalives (ctx.info every 2s)")
logger.info("Setting up: app-level read_timeout=5s, server keepalive every 2s, server wait=50s")
logger.info("Expected: STILL times out after 5s (keepalives don't reset anyio.fail_after)")
start = time.perf_counter()
try:
result = await call_with_app_timeout(timeout_seconds=5.0, agent_id="exp2-agent")
elapsed = time.perf_counter() - start
logger.info(f"SUCCESS in {elapsed:.2f}s — result: {result}")
logger.info("✓ KEEPALIVES PREVENTED TIMEOUT (transport-level timer, not app-level)")
except Exception as exc:
elapsed = time.perf_counter() - start
logger.info(f"Got exception after {elapsed:.2f}s: {type(exc).__name__}: {exc}")
if elapsed < 10:
logger.info(
"✓ CONFIRMED: ctx.info() keepalives do NOT help application-level timeouts\n"
" (anyio.fail_after is a wall-clock timer unaffected by SSE events)"
)
else:
logger.warning("? Timeout was late — keepalives may have had some effect")
# ---------------------------------------------------------------------------
# Experiment 3 Transport-level timeout, no keepalives
# ---------------------------------------------------------------------------
async def experiment_3_transport_timeout_no_keepalive() -> None:
section("Exp 3 | Transport-level (httpx read) timeout, NO keepalives")
logger.info("Setting up: httpx read=5s (no app-level timeout), server wait=50s")
logger.info("Expected: httpx ReadTimeout or connection closed after ~5s of silence")
start = time.perf_counter()
try:
result = await call_with_transport_timeout(read_timeout_seconds=5.0, agent_id="exp3-agent")
elapsed = time.perf_counter() - start
logger.info(f"UNEXPECTED SUCCESS in {elapsed:.2f}s — result: {result}")
except Exception as exc:
elapsed = time.perf_counter() - start
logger.info(f"Got exception after {elapsed:.2f}s: {type(exc).__name__}: {exc}")
if elapsed < 20:
logger.info("✓ CONFIRMED: Transport-level timeout fires without keepalives")
else:
logger.warning("? Transport timeout was late or missing")
# ---------------------------------------------------------------------------
# Experiment 4 Transport-level timeout WITH keepalives
# ---------------------------------------------------------------------------
async def experiment_4_transport_timeout_with_keepalive() -> None:
"""
Requires the server to be running with the keepalive-enabled get_user_request.
"""
section("Exp 4 | Transport-level timeout, WITH ctx.info() keepalives every 2s")
logger.info("Setting up: httpx read=8s, server keepalive every 2s, server wait=50s")
logger.info("Expected: NO timeout (SSE events arrive every 2s < 8s read timeout)")
logger.info("NOTE: Tool will eventually return when server wait expires (~50s)")
start = time.perf_counter()
try:
result = await call_with_transport_timeout(read_timeout_seconds=8.0, agent_id="exp4-agent")
elapsed = time.perf_counter() - start
logger.info(f"SUCCESS in {elapsed:.2f}s — result: {result}")
logger.info(
"✓ CONFIRMED: ctx.info() keepalives successfully prevent transport-level timeout!\n"
" Bytes arrived every 2s, resetting the httpx 8s read timer each time."
)
except Exception as exc:
elapsed = time.perf_counter() - start
logger.info(f"Got exception after {elapsed:.2f}s: {type(exc).__name__}: {exc}")
logger.warning(
"✗ KEEPALIVES DID NOT HELP for transport-level timeout\n"
" Either the keepalive interval > read timeout, or SSE events don't reset httpx timer"
)
# ---------------------------------------------------------------------------
# Experiment 5 Character-by-character (transport-level timeout)
# ---------------------------------------------------------------------------
async def experiment_5_char_by_char() -> None:
"""
Tests the char-by-char streaming approach: the server sends 1 char of the
response per second as a log notification. With httpx read=3s, each char
resets the timer.
"""
section("Exp 5 | Character-by-character via ctx.info() (transport-level timeout=3s)")
logger.info("Setting up: httpx read=3s, server sends 1 char/s via ctx.info(), server wait=50s")
logger.info("Expected: NO timeout (char arrives every 1s < 3s read timeout)")
start = time.perf_counter()
try:
result = await call_with_transport_timeout(read_timeout_seconds=3.0, agent_id="exp5-agent")
elapsed = time.perf_counter() - start
logger.info(f"SUCCESS in {elapsed:.2f}s — result: {result}")
logger.info(
"✓ CONFIRMED: Character-by-character via ctx.info() works for transport timeouts!\n"
" Each char notification resets the httpx read timer."
)
except Exception as exc:
elapsed = time.perf_counter() - start
logger.info(f"Got exception after {elapsed:.2f}s: {type(exc).__name__}: {exc}")
logger.warning(f"✗ Char-by-char did NOT prevent transport timeout")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
async def main() -> None:
logger.info("")
logger.info("MCP KEEPALIVE EXPERIMENTS")
logger.info("These tests probe whether ctx.info() log notifications prevent client timeouts.")
logger.info("Server must be running at http://localhost:8000")
logger.info("")
logger.info("IMPORTANT: Experiments 2, 4, 5 require the KEEPALIVE-ENABLED server.")
logger.info(" - Run Exps 1 & 3 first (no server modification needed).")
logger.info(" - Then enable keepalives in mcp_server.py, restart, run Exps 2, 4, 5.")
logger.info("")
# Phase 1: Baseline experiments (no server modification needed)
await experiment_1_app_timeout_no_keepalive()
await asyncio.sleep(2)
await experiment_3_transport_timeout_no_keepalive()
await asyncio.sleep(2)
logger.info("")
logger.info("Phase 1 complete. Now enable keepalives in mcp_server.py and restart the server,")
logger.info("then uncomment Phase 2 experiments below and re-run.")
logger.info("")
# Phase 2: Keepalive experiments (requires server modification)
# Uncomment after enabling KEEPALIVE_INTERVAL_SECONDS in mcp_server.py:
# await experiment_2_app_timeout_with_keepalive()
# await asyncio.sleep(2)
# await experiment_4_transport_timeout_with_keepalive()
# await asyncio.sleep(2)
# await experiment_5_char_by_char()
if __name__ == "__main__":
asyncio.run(main())