Files
local-mcp/app/services/event_service.py
2026-03-27 03:58:57 +08:00

76 lines
2.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
app/services/event_service.py
Server-Sent Events (SSE) broadcaster.
Maintains a set of subscriber asyncio queues and fans out events to all of them.
"""
from __future__ import annotations
import asyncio
import json
import logging
from datetime import datetime, timezone
from typing import AsyncGenerator
logger = logging.getLogger(__name__)
# All active SSE subscriber queues
_subscribers: set[asyncio.Queue] = set()
def subscribe() -> asyncio.Queue:
"""Register a new SSE subscriber and return its queue."""
q: asyncio.Queue = asyncio.Queue(maxsize=64)
_subscribers.add(q)
logger.debug("SSE subscriber added, total=%d", len(_subscribers))
return q
def unsubscribe(q: asyncio.Queue) -> None:
"""Remove a subscriber queue."""
_subscribers.discard(q)
logger.debug("SSE subscriber removed, total=%d", len(_subscribers))
def broadcast(event_type: str, data: dict) -> None:
"""
Fan out an event to all current subscribers.
Safe to call from synchronous code uses put_nowait and discards slow consumers.
"""
payload = json.dumps(
{
"type": event_type,
"timestamp": datetime.now(timezone.utc).isoformat(),
"data": data,
}
)
dead: list[asyncio.Queue] = []
for q in list(_subscribers):
try:
q.put_nowait(payload)
except asyncio.QueueFull:
logger.warning("SSE subscriber queue full, dropping event type=%s", event_type)
dead.append(q)
for q in dead:
_subscribers.discard(q)
async def event_generator(q: asyncio.Queue) -> AsyncGenerator[str, None]:
"""
Async generator that yields SSE-formatted strings from a subscriber queue.
Yields a keep-alive comment every 15 seconds when idle.
"""
try:
while True:
try:
payload = await asyncio.wait_for(q.get(), timeout=15.0)
yield f"data: {payload}\n\n"
except asyncio.TimeoutError:
# Keep-alive ping so browsers don't close the connection
yield ": ping\n\n"
except asyncio.CancelledError:
pass
finally:
unsubscribe(q)