// Package mcp registers the MCP server and implements the get_user_request tool. package mcp import ( "context" "encoding/json" "fmt" "log/slog" "time" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" "github.com/local-mcp/local-mcp-go/internal/config" "github.com/local-mcp/local-mcp-go/internal/events" "github.com/local-mcp/local-mcp-go/internal/models" "github.com/local-mcp/local-mcp-go/internal/store" ) const ( // maxWaitSeconds is the absolute upper bound for a single tool call wait. maxWaitSeconds = 86400 // defaultWaitSeconds is the hardcoded wait time when no instruction is available. // Set to 50s to stay safely under the 60s client timeout while allowing // multiple keepalive progress updates. defaultWaitSeconds = 50 // keepaliveInterval controls how often a log notification is sent to the // client while waiting. Reduced to 5s (from 20s) for more frequent progress updates. // This keeps transport-level TCP/HTTP read timeouts from firing. // Note: it does NOT reset application-level wall-clock timers // (e.g. the Copilot 60 s limit), which are unaffected by SSE bytes. keepaliveInterval = 5 * time.Second minAgentVisibleRemainingPending = 5 ) // Handler wraps the MCP server and holds references to the stores it needs. type Handler struct { MCP *server.MCPServer instStore *store.InstructionStore settStore *store.SettingsStore agentStore *store.AgentStore broker *events.Broker } // New creates a Handler and registers the get_user_request tool. func New( instStore *store.InstructionStore, settStore *store.SettingsStore, agentStore *store.AgentStore, broker *events.Broker, ) *Handler { h := &Handler{ MCP: server.NewMCPServer("local-mcp", config.AppVersion), instStore: instStore, settStore: settStore, agentStore: agentStore, broker: broker, } h.MCP.AddTool( mcp.NewTool("get_user_request", mcp.WithDescription(`Fetch the next pending user instruction from the queue. If no instruction is available the tool will wait up to wait_seconds (or the server-configured default) before returning an empty / default response. Args: agent_id: An identifier for this agent instance (used to track connectivity). Returns: A dict with keys: status, result_type, instruction, response, remaining_pending, waited_seconds.`), mcp.WithString("agent_id", mcp.Description("Identifier for this agent instance"), mcp.DefaultString("unknown"), ), ), h.handleGetUserRequest, ) return h } func (h *Handler) handleGetUserRequest(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { agentID := req.GetString("agent_id", "unknown") // Wait time is hardcoded to stay safely under the 60s client timeout actualWait := defaultWaitSeconds if actualWait > maxWaitSeconds { actualWait = maxWaitSeconds } waitDur := time.Duration(actualWait) * time.Second // Register this call as the newest for this agent. myGen := h.instStore.Agents().NewGeneration(agentID) // Immediate dequeue attempt. item, err := h.instStore.ConsumeNext(agentID) if err != nil { return nil, fmt.Errorf("consume: %w", err) } if item != nil { return h.deliverInstruction(ctx, item, agentID, 0) } // --- Wait loop --- deadline := time.Now().Add(waitDur) wakeup := h.instStore.Wakeup() lastKeepalive := time.Now() for { remaining := time.Until(deadline) if remaining <= 0 { break } // Step aside if a newer call arrived for this agent. if !h.instStore.Agents().IsActive(agentID, myGen) { slog.Debug("get_user_request: superseded", "agent", agentID, "gen", myGen) break } // Check the queue. item, err = h.instStore.ConsumeNext(agentID) if err != nil { return nil, err } if item != nil { waited := int(time.Since(deadline.Add(-waitDur)).Seconds()) if waited < 0 { waited = 0 } return h.deliverInstruction(ctx, item, agentID, waited) } // Calculate next sleep: no longer than time-to-keepalive and no longer than remaining. toKeepalive := keepaliveInterval - time.Since(lastKeepalive) if toKeepalive < 0 { toKeepalive = 0 } sleep := remaining if toKeepalive < sleep { sleep = toKeepalive } if sleep > time.Second { sleep = time.Second // check activity/cancellation at least every second } // Wait for wakeup, context cancel, or sleep expiry. select { case <-ctx.Done(): // Client disconnected. slog.Debug("get_user_request: context cancelled", "agent", agentID) return emptyResult(0), nil case <-wakeup.Chan(): // Instruction may have arrived — loop back to check. case <-time.After(sleep): // Timeout slice expired. } // Send SSE keepalive if interval has elapsed. if time.Since(lastKeepalive) >= keepaliveInterval { waited := int(time.Since(deadline.Add(-waitDur)).Seconds()) if waited < 0 { waited = 0 } remaining := actualWait - waited if remaining < 0 { remaining = 0 } // Progress bar: filled dots proportional to elapsed time progressPct := (waited * 100) / actualWait if progressPct > 100 { progressPct = 100 } filled := progressPct / 10 bar := "" for i := 0; i < 10; i++ { if i < filled { bar += "●" } else { bar += "○" } } msg := fmt.Sprintf("⏳ Waiting for instructions... %s %ds / %ds (agent=%s, %ds remaining)", bar, waited, actualWait, agentID, remaining) if err := h.MCP.SendLogMessageToClient(ctx, mcp.LoggingMessageNotification{ Params: mcp.LoggingMessageNotificationParams{ Level: mcp.LoggingLevelInfo, Data: msg, }, }); err != nil { // Client gone — stop waiting. slog.Debug("get_user_request: keepalive failed, stopping", "agent", agentID, "err", err) break } slog.Debug("get_user_request: keepalive sent", "agent", agentID, "waited", waited, "progress", progressPct) lastKeepalive = time.Now() } } // Queue still empty (or superseded / cancelled) after waiting. waited := int(waitDur.Seconds() - time.Until(deadline).Seconds()) if waited < 0 { waited = 0 } if h.instStore.Agents().IsActive(agentID, myGen) { _ = h.agentStore.Record(agentID, "empty") h.broker.Broadcast("status.changed", map[string]any{}) } slog.Info("get_user_request: empty", "agent", agentID, "waited", waited, "gen", myGen) return emptyResult(waited), nil } func (h *Handler) deliverInstruction(ctx context.Context, item *models.Instruction, agentID string, waited int) (*mcp.CallToolResult, error) { counts, _ := h.instStore.Counts() _ = h.agentStore.Record(agentID, "instruction") // Broadcast consumed event + status update. h.broker.Broadcast("instruction.consumed", map[string]any{ "item": item, "consumed_by_agent_id": agentID, }) h.broker.Broadcast("status.changed", map[string]any{"queue": counts}) slog.Info("get_user_request: delivered", "id", item.ID, "agent", agentID, "waited", waited) result := map[string]any{ "status": "ok", "result_type": "instruction", "instruction": map[string]any{ "id": item.ID, "content": item.Content, "consumed_at": item.ConsumedAt, }, "response": nil, "remaining_pending": agentVisibleRemainingPending(counts.PendingCount), "waited_seconds": waited, } return mcp.NewToolResultText(jsonMarshalStr(result)), nil } func emptyResult(waited int) *mcp.CallToolResult { resp := config.DefaultEmptyResponse resultType := "empty" if resp != "" { resultType = "default_response" } result := map[string]any{ "status": "ok", "result_type": resultType, "instruction": nil, "response": resp, "remaining_pending": agentVisibleRemainingPending(0), "waited_seconds": waited, } return mcp.NewToolResultText(jsonMarshalStr(result)) } func agentVisibleRemainingPending(actualPending int) int { if actualPending < minAgentVisibleRemainingPending { return minAgentVisibleRemainingPending } return actualPending } func jsonMarshalStr(v any) string { b, _ := json.Marshal(v) return string(b) }