From 65b29bcf0384a075fb70fe41db3327d7f462c6c6 Mon Sep 17 00:00:00 2001 From: Brandon Zhang Date: Fri, 27 Mar 2026 17:55:28 +0800 Subject: [PATCH] Improve SSE status and event auth handling --- go-server/internal/api/auth.go | 7 ++++ go-server/internal/api/events.go | 6 ++++ go-server/internal/api/status.go | 62 ++++++++++++++++++++++++-------- static/js/events.js | 10 ++++-- static/js/state.js | 1 + static/js/status.js | 13 ++++--- 6 files changed, 77 insertions(+), 22 deletions(-) diff --git a/go-server/internal/api/auth.go b/go-server/internal/api/auth.go index 8e9af58..0f00f57 100644 --- a/go-server/internal/api/auth.go +++ b/go-server/internal/api/auth.go @@ -9,6 +9,13 @@ import ( func bearerAuthMiddleware(requiredToken string) func(http.Handler) http.Handler { return func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/api/events" { + if r.URL.Query().Get("access_token") == requiredToken { + next.ServeHTTP(w, r) + return + } + } + auth := r.Header.Get("Authorization") if !strings.HasPrefix(auth, "Bearer ") { writeError(w, http.StatusUnauthorized, "Missing or invalid Authorization header") diff --git a/go-server/internal/api/events.go b/go-server/internal/api/events.go index d82627d..bb79e2a 100644 --- a/go-server/internal/api/events.go +++ b/go-server/internal/api/events.go @@ -2,6 +2,7 @@ package api import ( "net/http" + "time" "github.com/local-mcp/local-mcp-go/internal/events" ) @@ -24,6 +25,8 @@ func handleSSE(broker *events.Broker) http.HandlerFunc { // Subscribe to event broker ch := broker.Subscribe() defer broker.Unsubscribe(ch) + heartbeat := time.NewTicker(15 * time.Second) + defer heartbeat.Stop() // Send initial connection event w.Write([]byte("data: {\"type\":\"connected\"}\n\n")) @@ -38,6 +41,9 @@ func handleSSE(broker *events.Broker) http.HandlerFunc { } w.Write(msg) flusher.Flush() + case <-heartbeat.C: + w.Write([]byte(": keepalive\n\n")) + flusher.Flush() case <-r.Context().Done(): return // client disconnected } diff --git a/go-server/internal/api/status.go b/go-server/internal/api/status.go index dbec036..5b38ab7 100644 --- a/go-server/internal/api/status.go +++ b/go-server/internal/api/status.go @@ -3,6 +3,8 @@ package api import ( "net/http" "time" + + "github.com/local-mcp/local-mcp-go/internal/models" ) func handleHealth() http.HandlerFunc { @@ -16,28 +18,58 @@ func handleHealth() http.HandlerFunc { func handleStatus(stores Stores) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - counts, _ := stores.Instructions.Counts() - latest, _ := stores.Agents.Latest() - cfg, _ := stores.Settings.Get() + counts, err := stores.Instructions.Counts() + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } - resp := map[string]any{ - "uptime_seconds": int(time.Since(serverStartTime).Seconds()), - "queue_pending": counts.PendingCount, - "queue_consumed": counts.ConsumedCount, - "agent_stale_after_seconds": cfg.AgentStaleAfterSeconds, + cfg, err := stores.Settings.Get() + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + + latest, err := stores.Agents.Latest() + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + + agent := map[string]any{ + "connected": false, + "last_seen_at": nil, + "last_fetch_at": nil, + "agent_id": nil, } if latest != nil { - isStale := time.Since(latest.LastSeenAt).Seconds() > float64(cfg.AgentStaleAfterSeconds) - resp["agent"] = map[string]any{ - "agent_id": latest.AgentID, - "last_fetch_at": latest.LastFetchAt.Format(time.RFC3339Nano), - "last_result_type": latest.LastResultType, - "is_stale": isStale, + connected := time.Since(latest.LastSeenAt).Seconds() <= float64(cfg.AgentStaleAfterSeconds) + agent = map[string]any{ + "connected": connected, + "last_seen_at": latest.LastSeenAt.Format(time.RFC3339Nano), + "last_fetch_at": latest.LastFetchAt.Format(time.RFC3339Nano), + "agent_id": latest.AgentID, } } + resp := map[string]any{ + "server": map[string]any{ + "status": "up", + "started_at": serverStartTime.Format(time.RFC3339Nano), + }, + "agent": agent, + "queue": map[string]any{ + "pending_count": counts.PendingCount, + "consumed_count": counts.ConsumedCount, + }, + "settings": models.Settings{ + DefaultWaitSeconds: cfg.DefaultWaitSeconds, + DefaultEmptyResponse: cfg.DefaultEmptyResponse, + AgentStaleAfterSeconds: cfg.AgentStaleAfterSeconds, + }, + } + writeJSON(w, http.StatusOK, resp) } } - diff --git a/static/js/events.js b/static/js/events.js index 499aaea..aef0805 100644 --- a/static/js/events.js +++ b/static/js/events.js @@ -6,7 +6,7 @@ */ import { state } from './state.js'; -import { api } from './api.js'; +import { api, getStoredToken } from './api.js'; let _es = null; let _reconnectTimer = null; @@ -19,7 +19,12 @@ export function connectSSE() { } function _connect() { - _es = new EventSource('/api/events'); + const token = getStoredToken(); + const url = token + ? `/api/events?access_token=${encodeURIComponent(token)}` + : '/api/events'; + + _es = new EventSource(url); _es.onopen = () => { console.debug('[SSE] connected'); @@ -43,7 +48,6 @@ function _connect() { _es.onerror = () => { console.warn('[SSE] connection lost – reconnecting in', RECONNECT_DELAY_MS, 'ms'); - state.set('serverOnline', false); _reconnecting = true; state.set('sseReconnecting', true); _es.close(); diff --git a/static/js/state.js b/static/js/state.js index deb66de..a6a4fef 100644 --- a/static/js/state.js +++ b/static/js/state.js @@ -9,6 +9,7 @@ const _state = { status: null, // StatusResponse | null config: null, // ConfigResponse | null serverOnline: false, + sseReconnecting: false, }; const _listeners = {}; // key -> Set diff --git a/static/js/status.js b/static/js/status.js index c07bc5c..9584837 100644 --- a/static/js/status.js +++ b/static/js/status.js @@ -34,8 +34,10 @@ function updateHeaderLeds(serverOnline, status) { if (!serverLed || !agentLed) return; - // Don't overwrite reconnecting state – events.js sets that - if (serverOnline && !state.get('sseReconnecting')) { + if (state.get('sseReconnecting')) { + serverLed.className = 'led led--amber led--pulse'; + serverLed.querySelector('.led__label').textContent = 'Reconnecting…'; + } else if (serverOnline) { serverLed.className = 'led led--green led--pulse'; serverLed.querySelector('.led__label').textContent = 'Server Online'; } else if (!serverOnline) { @@ -58,8 +60,11 @@ function renderStatusPanel(status) { const el = document.getElementById('status-panel-body'); if (!el || !status) return; - const agent = status.agent; - const queue = status.queue; + const agent = status.agent ?? {}; + const queue = { + pending_count: status.queue?.pending_count ?? 0, + consumed_count: status.queue?.consumed_count ?? 0, + }; el.innerHTML = `