76 lines
2.1 KiB
Python
76 lines
2.1 KiB
Python
"""
|
||
app/services/event_service.py
|
||
Server-Sent Events (SSE) broadcaster.
|
||
Maintains a set of subscriber asyncio queues and fans out events to all of them.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
from datetime import datetime, timezone
|
||
from typing import AsyncGenerator
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# All active SSE subscriber queues
|
||
_subscribers: set[asyncio.Queue] = set()
|
||
|
||
|
||
def subscribe() -> asyncio.Queue:
|
||
"""Register a new SSE subscriber and return its queue."""
|
||
q: asyncio.Queue = asyncio.Queue(maxsize=64)
|
||
_subscribers.add(q)
|
||
logger.debug("SSE subscriber added, total=%d", len(_subscribers))
|
||
return q
|
||
|
||
|
||
def unsubscribe(q: asyncio.Queue) -> None:
|
||
"""Remove a subscriber queue."""
|
||
_subscribers.discard(q)
|
||
logger.debug("SSE subscriber removed, total=%d", len(_subscribers))
|
||
|
||
|
||
def broadcast(event_type: str, data: dict) -> None:
|
||
"""
|
||
Fan out an event to all current subscribers.
|
||
Safe to call from synchronous code – uses put_nowait and discards slow consumers.
|
||
"""
|
||
payload = json.dumps(
|
||
{
|
||
"type": event_type,
|
||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||
"data": data,
|
||
}
|
||
)
|
||
dead: list[asyncio.Queue] = []
|
||
for q in list(_subscribers):
|
||
try:
|
||
q.put_nowait(payload)
|
||
except asyncio.QueueFull:
|
||
logger.warning("SSE subscriber queue full, dropping event type=%s", event_type)
|
||
dead.append(q)
|
||
for q in dead:
|
||
_subscribers.discard(q)
|
||
|
||
|
||
async def event_generator(q: asyncio.Queue) -> AsyncGenerator[str, None]:
|
||
"""
|
||
Async generator that yields SSE-formatted strings from a subscriber queue.
|
||
Yields a keep-alive comment every 15 seconds when idle.
|
||
"""
|
||
try:
|
||
while True:
|
||
try:
|
||
payload = await asyncio.wait_for(q.get(), timeout=15.0)
|
||
yield f"data: {payload}\n\n"
|
||
except asyncio.TimeoutError:
|
||
# Keep-alive ping so browsers don't close the connection
|
||
yield ": ping\n\n"
|
||
except asyncio.CancelledError:
|
||
pass
|
||
finally:
|
||
unsubscribe(q)
|
||
|