Files
local-mcp/tests/test_keepalive.py
Brandon Zhang b1fdd98740 fix(script): add Windows .venv/Scripts path fallback in server.sh
On Windows the venv Python binary lives at .venv/Scripts/python.exe,
not .venv/bin/python.  Fall back to the Windows path when the Unix
path does not exist so the script works cross-platform.
2026-03-27 13:53:38 +08:00

281 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
tests/test_keepalive.py
Experiments to determine whether periodic log notifications can prevent
MCP client timeouts during long-running tool calls.
TWO TYPES OF TIMEOUTS UNDER TEST
---------------------------------
A. Application-level timer (anyio.fail_after in session.py):
- Starts when the request is sent.
- Fires N seconds later regardless of intermediate SSE events.
- Controlled by `read_timeout_seconds` in call_tool().
- Sending ctx.info() keepalives does NOT reset this timer.
B. Transport-level HTTP read timeout (httpx read timeout):
- Fires if NO bytes arrive on the SSE stream for N seconds.
- Sending any SSE event (ctx.info()) resets this timer.
- Controlled by httpx.Timeout(connect=..., read=N) on the AsyncClient.
The Copilot extension's 60s timeout is almost certainly type A
(application-level wall-clock timer), because:
- The Python MCP SDK uses anyio.fail_after() inside send_request()
- The JS MCP SDK very likely mirrors this pattern
EXPERIMENTS
-----------
1. No-keepalive baseline : application-level timeout → TimeoutError
2. ctx.info() keepalive : application-level timeout → still TimeoutError
(confirms keepalives do NOT help for app-level timer)
3. Transport-level read timeout : httpx read=5s → TimeoutError without keepalives
4. Transport-level + keepalive : httpx read=5s + ctx.info() every 2s → NO timeout
(confirms keepalives DO help for transport-level timer)
5. Character-by-character : same as Exp 4 but sends one char/s of the response
→ NO timeout (proves char-by-char is viable for transport timeouts)
"""
import asyncio
import logging
import time
from datetime import timedelta
import httpx
from mcp import ClientSession
from mcp.client.streamable_http import streamable_http_client
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("test_keepalive")
SERVER_URL = "http://localhost:8000/mcp"
def section(title: str) -> None:
width = 70
logger.info("=" * width)
logger.info(f" {title}")
logger.info("=" * width)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
async def call_with_app_timeout(
timeout_seconds: float,
agent_id: str = "test-agent",
) -> dict:
"""
Call get_user_request with an APPLICATION-LEVEL timeout.
This uses read_timeout_seconds in call_tool(), which maps to anyio.fail_after().
"""
async with streamable_http_client(SERVER_URL) as (read, write, _):
async with ClientSession(read, write) as session:
await session.initialize()
result = await session.call_tool(
"get_user_request",
{"agent_id": agent_id},
read_timeout_seconds=timedelta(seconds=timeout_seconds),
)
return result
async def call_with_transport_timeout(
read_timeout_seconds: float,
agent_id: str = "test-agent",
) -> dict:
"""
Call get_user_request with a TRANSPORT-LEVEL (httpx read) timeout.
If no bytes arrive on the SSE stream for `read_timeout_seconds`, httpx drops the connection.
"""
client = httpx.AsyncClient(
timeout=httpx.Timeout(connect=10.0, read=read_timeout_seconds, write=10.0, pool=10.0),
follow_redirects=True,
)
async with client:
async with streamable_http_client(SERVER_URL, http_client=client) as (read, write, _):
async with ClientSession(read, write) as session:
await session.initialize()
# No read_timeout_seconds here we rely purely on httpx transport timeout
result = await session.call_tool(
"get_user_request",
{"agent_id": agent_id},
)
return result
# ---------------------------------------------------------------------------
# Experiment 1 Application-level timeout, no keepalives (BASELINE)
# ---------------------------------------------------------------------------
async def experiment_1_app_timeout_no_keepalive() -> None:
section("Exp 1 | Application-level timeout, NO keepalives")
logger.info("Setting up: app-level read_timeout=5s, server wait=50s")
logger.info("Expected: McpError / TimeoutError raised after ~5s")
start = time.perf_counter()
try:
result = await call_with_app_timeout(timeout_seconds=5.0, agent_id="exp1-agent")
elapsed = time.perf_counter() - start
logger.info(f"UNEXPECTED SUCCESS in {elapsed:.2f}s — result: {result}")
except Exception as exc:
elapsed = time.perf_counter() - start
logger.info(f"Got exception after {elapsed:.2f}s: {type(exc).__name__}: {exc}")
if elapsed < 10:
logger.info("✓ CONFIRMED: Application-level timeout fired as expected")
else:
logger.warning("? Timeout was late — investigate further")
# ---------------------------------------------------------------------------
# Experiment 2 Application-level timeout WITH keepalives
# ---------------------------------------------------------------------------
async def experiment_2_app_timeout_with_keepalive() -> None:
"""
For this experiment, the server must be running with the keepalive version
of get_user_request (see notes in test output for how to enable it).
We test with the same 5s app-level timeout.
"""
section("Exp 2 | Application-level timeout, WITH keepalives (ctx.info every 2s)")
logger.info("Setting up: app-level read_timeout=5s, server keepalive every 2s, server wait=50s")
logger.info("Expected: STILL times out after 5s (keepalives don't reset anyio.fail_after)")
start = time.perf_counter()
try:
result = await call_with_app_timeout(timeout_seconds=5.0, agent_id="exp2-agent")
elapsed = time.perf_counter() - start
logger.info(f"SUCCESS in {elapsed:.2f}s — result: {result}")
logger.info("✓ KEEPALIVES PREVENTED TIMEOUT (transport-level timer, not app-level)")
except Exception as exc:
elapsed = time.perf_counter() - start
logger.info(f"Got exception after {elapsed:.2f}s: {type(exc).__name__}: {exc}")
if elapsed < 10:
logger.info(
"✓ CONFIRMED: ctx.info() keepalives do NOT help application-level timeouts\n"
" (anyio.fail_after is a wall-clock timer unaffected by SSE events)"
)
else:
logger.warning("? Timeout was late — keepalives may have had some effect")
# ---------------------------------------------------------------------------
# Experiment 3 Transport-level timeout, no keepalives
# ---------------------------------------------------------------------------
async def experiment_3_transport_timeout_no_keepalive() -> None:
section("Exp 3 | Transport-level (httpx read) timeout, NO keepalives")
logger.info("Setting up: httpx read=5s (no app-level timeout), server wait=50s")
logger.info("Expected: httpx ReadTimeout or connection closed after ~5s of silence")
start = time.perf_counter()
try:
result = await call_with_transport_timeout(read_timeout_seconds=5.0, agent_id="exp3-agent")
elapsed = time.perf_counter() - start
logger.info(f"UNEXPECTED SUCCESS in {elapsed:.2f}s — result: {result}")
except Exception as exc:
elapsed = time.perf_counter() - start
logger.info(f"Got exception after {elapsed:.2f}s: {type(exc).__name__}: {exc}")
if elapsed < 20:
logger.info("✓ CONFIRMED: Transport-level timeout fires without keepalives")
else:
logger.warning("? Transport timeout was late or missing")
# ---------------------------------------------------------------------------
# Experiment 4 Transport-level timeout WITH keepalives
# ---------------------------------------------------------------------------
async def experiment_4_transport_timeout_with_keepalive() -> None:
"""
Requires the server to be running with the keepalive-enabled get_user_request.
"""
section("Exp 4 | Transport-level timeout, WITH ctx.info() keepalives every 2s")
logger.info("Setting up: httpx read=8s, server keepalive every 2s, server wait=50s")
logger.info("Expected: NO timeout (SSE events arrive every 2s < 8s read timeout)")
logger.info("NOTE: Tool will eventually return when server wait expires (~50s)")
start = time.perf_counter()
try:
result = await call_with_transport_timeout(read_timeout_seconds=8.0, agent_id="exp4-agent")
elapsed = time.perf_counter() - start
logger.info(f"SUCCESS in {elapsed:.2f}s — result: {result}")
logger.info(
"✓ CONFIRMED: ctx.info() keepalives successfully prevent transport-level timeout!\n"
" Bytes arrived every 2s, resetting the httpx 8s read timer each time."
)
except Exception as exc:
elapsed = time.perf_counter() - start
logger.info(f"Got exception after {elapsed:.2f}s: {type(exc).__name__}: {exc}")
logger.warning(
"✗ KEEPALIVES DID NOT HELP for transport-level timeout\n"
" Either the keepalive interval > read timeout, or SSE events don't reset httpx timer"
)
# ---------------------------------------------------------------------------
# Experiment 5 Character-by-character (transport-level timeout)
# ---------------------------------------------------------------------------
async def experiment_5_char_by_char() -> None:
"""
Tests the char-by-char streaming approach: the server sends 1 char of the
response per second as a log notification. With httpx read=3s, each char
resets the timer.
"""
section("Exp 5 | Character-by-character via ctx.info() (transport-level timeout=3s)")
logger.info("Setting up: httpx read=3s, server sends 1 char/s via ctx.info(), server wait=50s")
logger.info("Expected: NO timeout (char arrives every 1s < 3s read timeout)")
start = time.perf_counter()
try:
result = await call_with_transport_timeout(read_timeout_seconds=3.0, agent_id="exp5-agent")
elapsed = time.perf_counter() - start
logger.info(f"SUCCESS in {elapsed:.2f}s — result: {result}")
logger.info(
"✓ CONFIRMED: Character-by-character via ctx.info() works for transport timeouts!\n"
" Each char notification resets the httpx read timer."
)
except Exception as exc:
elapsed = time.perf_counter() - start
logger.info(f"Got exception after {elapsed:.2f}s: {type(exc).__name__}: {exc}")
logger.warning(f"✗ Char-by-char did NOT prevent transport timeout")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
async def main() -> None:
logger.info("")
logger.info("MCP KEEPALIVE EXPERIMENTS")
logger.info("These tests probe whether ctx.info() log notifications prevent client timeouts.")
logger.info("Server must be running at http://localhost:8000")
logger.info("")
logger.info("IMPORTANT: Experiments 2, 4, 5 require the KEEPALIVE-ENABLED server.")
logger.info(" - Run Exps 1 & 3 first (no server modification needed).")
logger.info(" - Then enable keepalives in mcp_server.py, restart, run Exps 2, 4, 5.")
logger.info("")
# Phase 1: Baseline experiments (no server modification needed)
await experiment_1_app_timeout_no_keepalive()
await asyncio.sleep(2)
await experiment_3_transport_timeout_no_keepalive()
await asyncio.sleep(2)
logger.info("")
logger.info("Phase 1 complete. Now enable keepalives in mcp_server.py and restart the server,")
logger.info("then uncomment Phase 2 experiments below and re-run.")
logger.info("")
# Phase 2: Keepalive experiments (requires server modification)
# Uncomment after enabling KEEPALIVE_INTERVAL_SECONDS in mcp_server.py:
# await experiment_2_app_timeout_with_keepalive()
# await asyncio.sleep(2)
# await experiment_4_transport_timeout_with_keepalive()
# await asyncio.sleep(2)
# await experiment_5_char_by_char()
if __name__ == "__main__":
asyncio.run(main())