// Package events provides an SSE event broker for fanning out server-sent // events to browser clients watching /api/events. package events import ( "encoding/json" "fmt" "sync" "time" ) // Event is the wire format sent to browser clients. type Event struct { Type string `json:"type"` Timestamp string `json:"timestamp"` Data any `json:"data"` } // Broker distributes named events to all currently-connected SSE clients. // Clients subscribe by calling Subscribe(); they must call Unsubscribe() when // done to avoid goroutine leaks. type Broker struct { mu sync.RWMutex clients map[chan []byte]struct{} } // NewBroker creates a ready-to-use Broker. func NewBroker() *Broker { return &Broker{clients: make(map[chan []byte]struct{})} } // Subscribe returns a channel that will receive serialised SSE "data: ..." lines. func (b *Broker) Subscribe() chan []byte { ch := make(chan []byte, 32) // buffered so a slow reader doesn't stall others b.mu.Lock() b.clients[ch] = struct{}{} b.mu.Unlock() return ch } // Unsubscribe removes the channel and closes it. func (b *Broker) Unsubscribe(ch chan []byte) { b.mu.Lock() delete(b.clients, ch) b.mu.Unlock() close(ch) } // Broadcast encodes and sends an event to all subscribers. Slow subscribers // are skipped (their buffered channel is full) to prevent head-of-line blocking. func (b *Broker) Broadcast(eventType string, data any) { ev := Event{ Type: eventType, Timestamp: time.Now().UTC().Format(time.RFC3339Nano), Data: data, } payload, err := json.Marshal(ev) if err != nil { return // should never happen } line := fmt.Sprintf("data: %s\n\n", payload) msg := []byte(line) b.mu.RLock() for ch := range b.clients { select { case ch <- msg: default: // skip stalled clients } } b.mu.RUnlock() }