From 8a0dffbcae9df6800d035754b9802ef696fde8f0 Mon Sep 17 00:00:00 2001 From: Brandon Zhang Date: Fri, 27 Mar 2026 15:45:26 +0800 Subject: [PATCH] feat: add Go server implementation in go-server/ Full Go port of local-mcp with all core features. Copied from local-mcp-go worktree to consolidate into single-branch repo (easier maintenance). Architecture: - internal/config: Environment variable configuration - internal/models: Shared types (Instruction, Settings, AgentActivity, etc.) - internal/db: SQLite init with modernc.org/sqlite (pure Go, no CGo) - internal/store: Database operations + WakeupSignal + AgentTracker - internal/events: SSE broker for browser /api/events endpoint - internal/mcp: get_user_request MCP tool with 5s keepalive progress bars - internal/api: chi HTTP router with Bearer auth middleware - main.go: Entry point with auto port switching and Windows interactive banner Dependencies: - github.com/mark3labs/mcp-go@v0.46.0 - github.com/go-chi/chi/v5@v5.2.5 - modernc.org/sqlite@v1.47.0 (pure Go SQLite) - github.com/google/uuid@v1.6.0 Static assets embedded via //go:embed static Features matching Python: - Same wait strategy: 50s with 5s progress keepalives - Same hardcoded constants (DEFAULT_WAIT_SECONDS, DEFAULT_EMPTY_RESPONSE) - Auto port switching (tries 8000-8009) - Windows interactive mode (formatted banner on double-click launch) Build: cd go-server && go build -o local-mcp.exe . Run: ./local-mcp.exe Binary size: ~18 MB (vs Python ~60+ MB memory footprint) Startup: ~10 ms (vs Python ~1-2s) --- .gitignore | 4 + go-server/.gitignore.go | 32 ++ go-server/README-GO.md | 608 ++++++++++++++++++++++++ go-server/go.mod | 24 + go-server/go.sum | 79 +++ go-server/internal/api/auth.go | 26 + go-server/internal/api/config.go | 61 +++ go-server/internal/api/events.go | 47 ++ go-server/internal/api/instructions.go | 119 +++++ go-server/internal/api/router.go | 85 ++++ go-server/internal/api/status.go | 43 ++ go-server/internal/config/config.go | 61 +++ go-server/internal/db/db.go | 77 +++ go-server/internal/events/broker.go | 73 +++ go-server/internal/mcp/handler.go | 289 +++++++++++ go-server/internal/models/models.go | 46 ++ go-server/internal/store/agent.go | 58 +++ go-server/internal/store/instruction.go | 320 +++++++++++++ go-server/internal/store/settings.go | 69 +++ go-server/main.go | 163 +++++++ 20 files changed, 2284 insertions(+) create mode 100644 go-server/.gitignore.go create mode 100644 go-server/README-GO.md create mode 100644 go-server/go.mod create mode 100644 go-server/go.sum create mode 100644 go-server/internal/api/auth.go create mode 100644 go-server/internal/api/config.go create mode 100644 go-server/internal/api/events.go create mode 100644 go-server/internal/api/instructions.go create mode 100644 go-server/internal/api/router.go create mode 100644 go-server/internal/api/status.go create mode 100644 go-server/internal/config/config.go create mode 100644 go-server/internal/db/db.go create mode 100644 go-server/internal/events/broker.go create mode 100644 go-server/internal/mcp/handler.go create mode 100644 go-server/internal/models/models.go create mode 100644 go-server/internal/store/agent.go create mode 100644 go-server/internal/store/instruction.go create mode 100644 go-server/internal/store/settings.go create mode 100644 go-server/main.go diff --git a/.gitignore b/.gitignore index b5e5f81..174d195 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,10 @@ venv/ dist/ build/ +# Go +go-server/local-mcp +go-server/local-mcp.exe + # Data data/ diff --git a/go-server/.gitignore.go b/go-server/.gitignore.go new file mode 100644 index 0000000..4b81ff9 --- /dev/null +++ b/go-server/.gitignore.go @@ -0,0 +1,32 @@ +# Python +__pycache__/ +*.py[cod] +*.pyo +.venv/ +venv/ +*.egg-info/ +dist/ +build/ + +# Go +local-mcp +local-mcp.exe + +# Data +data/ + +# Logs +logs/ +*.log +server.log +server_err.log + +# IDE +.idea/ +.vscode/ +*.iml + +# OS +.DS_Store +Thumbs.db + diff --git a/go-server/README-GO.md b/go-server/README-GO.md new file mode 100644 index 0000000..09c2789 --- /dev/null +++ b/go-server/README-GO.md @@ -0,0 +1,608 @@ +# local-mcp + +`local-mcp` is a localhost-first MCP server whose primary responsibility is to deliver the latest user instruction to an agent through the `get_user_request` tool, while also providing a responsive web UI for managing the instruction queue and monitoring server/agent activity. + +This document is the implementation plan for the project. + +## 1. Goals + +- Provide a single MCP tool, `get_user_request`, that returns at most one instruction per call. +- Give the user a polished local web UI to add, edit, remove, review, and monitor instructions. +- Preserve queue integrity so consumed instructions are clearly visible but no longer editable/deletable. +- Support configurable waiting/default-response behavior when no instruction is available. +- Show live server status and inferred agent connectivity in the UI. +- Keep the stack lightweight, maintainable, debuggable, and friendly to local development. + +## 2. Recommended Tech Stack + +### Backend + +- **Language/runtime:** Python 3.11+ +- **MCP integration:** official Python MCP SDK +- **HTTP server/API layer:** FastAPI +- **ASGI server:** Uvicorn +- **Persistence:** SQLite via Python standard library `sqlite3` +- **Concurrency/state coordination:** `asyncio` + standard library synchronization primitives where needed +- **Logging/error handling:** Python `logging`, structured request logs, centralized exception handling +- **Configuration:** environment variables + small local config file (`.json` or `.toml`) + +### Why this backend stack + +- The MCP SDK is the correct dependency for exposing the MCP tool cleanly. +- FastAPI + Uvicorn is a small, pragmatic backend stack that simplifies routing, validation, health endpoints, and server-sent updates without introducing a heavy framework. +- SQLite keeps the system local-first, dependency-light, and durable enough for instruction history and settings. +- Most supporting concerns remain in the Python standard library, which keeps third-party dependencies minimal. + +### Frontend + +- **UI technology:** plain HTML, CSS, and JavaScript only +- **Realtime updates:** Server-Sent Events (preferred) with polling fallback if necessary +- **Styling:** local CSS files with design tokens and component-specific stylesheets +- **Client architecture:** modular vanilla JS organized by feature (`api.js`, `state.js`, `events.js`, `instructions.js`, etc.) +- **Assets:** all fonts/icons/scripts/styles stored locally in the repository; no CDN usage + +### Mandatory frontend implementation instruction + +Any future frontend implementation work **must first read and follow**: + +- `.github/instructions/frontend-design.instructions.md` + +This instruction file is mandatory for the UI because it requires a distinctive, production-grade, non-generic frontend. The implementation should not default to generic dashboard aesthetics. + +## 3. Product/Architecture Plan + +### Core backend responsibilities + +1. Expose the MCP tool `get_user_request`. +2. Maintain an instruction queue with durable storage. +3. Mark instructions as consumed atomically when delivered to an agent. +4. Expose local HTTP endpoints for the web UI. +5. Stream status/instruction updates to the browser in real time. +6. Infer agent connectivity from recent MCP tool activity. +7. Persist and serve server configuration such as wait timeout and default empty response. + +### Core frontend responsibilities + +1. Show queued and consumed instructions in separate, clearly labeled sections. +2. Allow add/edit/delete only for instructions that are still pending. +3. Cross out and grey out consumed instructions. +4. Show server status, inferred agent status, last fetch time, and configuration values. +5. Update live as instruction state changes. +6. Remain usable and visually polished on desktop and smaller screens. + +### Suggested repository layout + +```text +local-mcp/ +├─ main.py +├─ README.md +├─ requirements.txt +├─ app/ +│ ├─ __init__.py +│ ├─ config.py +│ ├─ database.py +│ ├─ logging_setup.py +│ ├─ models.py +│ ├─ services/ +│ │ ├─ instruction_service.py +│ │ ├─ status_service.py +│ │ └─ event_service.py +│ ├─ api/ +│ │ ├─ routes_instructions.py +│ │ ├─ routes_status.py +│ │ └─ routes_config.py +│ └─ mcp_server.py +├─ static/ +│ ├─ index.html +│ ├─ css/ +│ │ ├─ base.css +│ │ ├─ layout.css +│ │ └─ components.css +│ ├─ js/ +│ │ ├─ api.js +│ │ ├─ app.js +│ │ ├─ events.js +│ │ ├─ instructions.js +│ │ └─ status.js +│ └─ assets/ +└─ data/ + └─ local_mcp.sqlite3 +``` + +## 4. Data Model Plan + +### `instructions` + +- `id` - string/UUID primary key +- `content` - text, required +- `status` - enum: `pending`, `consumed` +- `created_at` - datetime +- `updated_at` - datetime +- `consumed_at` - nullable datetime +- `consumed_by_agent_id` - nullable string +- `position` - integer for stable queue order + +### `settings` + +- `default_wait_seconds` - integer — seconds the tool waits before returning an empty/default response; set exclusively by the user via the web UI +- `default_empty_response` - text, nullable +- `agent_stale_after_seconds` - integer + +### `agent_activity` + +- `agent_id` - string primary key +- `last_seen_at` - datetime +- `last_fetch_at` - datetime +- `last_result_type` - enum: `instruction`, `empty`, `default_response` + +## 5. Detailed API Design + +All routes are local-only and intended for `localhost` usage. + +### 5.1 MCP tool contract + +#### Tool: `get_user_request` + +**Purpose** + +- Return the next pending instruction, if one exists. +- If none exists, wait for a configurable duration, then return either an empty response or a configured default response. +- Record agent activity so the UI can infer whether an agent is currently connected/recently active. + +**Suggested input schema** + +```json +{ + "agent_id": "optional-string", + "default_response_override": "optional-string" +} +``` + +**Suggested output schema when an instruction is delivered** + +```json +{ + "status": "ok", + "result_type": "instruction", + "instruction": { + "id": "uuid", + "content": "user instruction text", + "consumed_at": "ISO-8601 timestamp" + }, + "remaining_pending": 3, + "waited_seconds": 0 +} +``` + +**Suggested output schema when queue is empty** + +```json +{ + "status": "ok", + "result_type": "empty", + "instruction": null, + "response": "", + "remaining_pending": 0, + "waited_seconds": 10 +} +``` + +**Suggested output schema when a default response is returned** + +```json +{ + "status": "ok", + "result_type": "default_response", + "instruction": null, + "response": "No new instructions available.", + "remaining_pending": 0, + "waited_seconds": 10 +} +``` + +**Behavior rules** + +- Deliver the oldest pending instruction first. +- Mark the delivered instruction as consumed in the same transaction used to claim it. +- Never return more than one instruction per call. +- `default_wait_seconds` is fully server-controlled (set by the user via the web UI). Agents cannot override it. +- Clamp `actual_wait` to an absolute server maximum (86400 s). +- Update the agent activity record on every call, even when no instruction is returned. +- The UI should infer "agent connected" if the latest activity is within `agent_stale_after_seconds`. +- Agent implementations should continue calling this tool instead of ending their work session on their own, so they can pick up newly added instructions without missing critical follow-up requests. + +### 5.2 HTTP API for the web UI + +#### `GET /healthz` + +Returns service health. + +**Response** + +```json +{ + "status": "ok", + "server_time": "ISO-8601 timestamp" +} +``` + +#### `GET /api/status` + +Returns current server and agent summary. + +**Response** + +```json +{ + "server": { + "status": "up", + "started_at": "ISO-8601 timestamp" + }, + "agent": { + "connected": true, + "last_seen_at": "ISO-8601 timestamp", + "last_fetch_at": "ISO-8601 timestamp", + "agent_id": "copilot-agent" + }, + "queue": { + "pending_count": 2, + "consumed_count": 8 + }, + "settings": { + "default_wait_seconds": 10, + "default_empty_response": "No new instructions available.", + "agent_stale_after_seconds": 30 + } +} +``` + +#### `GET /api/instructions` + +Returns all instructions in queue order. + +**Query params** + +- `status=pending|consumed|all` (default `all`) + +**Response** + +```json +{ + "items": [ + { + "id": "uuid", + "content": "Implement logging", + "status": "pending", + "created_at": "ISO-8601 timestamp", + "updated_at": "ISO-8601 timestamp", + "consumed_at": null, + "consumed_by_agent_id": null, + "position": 1 + } + ] +} +``` + +#### `POST /api/instructions` + +Creates a new pending instruction. + +**Request** + +```json +{ + "content": "Add a new status indicator" +} +``` + +**Response**: `201 Created` + +```json +{ + "item": { + "id": "uuid", + "content": "Add a new status indicator", + "status": "pending", + "created_at": "ISO-8601 timestamp", + "updated_at": "ISO-8601 timestamp", + "consumed_at": null, + "consumed_by_agent_id": null, + "position": 3 + } +} +``` + +#### `PATCH /api/instructions/{instruction_id}` + +Edits a pending instruction only. + +**Request** + +```json +{ + "content": "Reword an existing pending instruction" +} +``` + +**Rules** + +- Return `409 Conflict` if the instruction has already been consumed. +- Return `404 Not Found` if the instruction does not exist. + +#### `DELETE /api/instructions/{instruction_id}` + +Deletes a pending instruction only. + +**Rules** + +- Return `409 Conflict` if the instruction has already been consumed. +- Return `204 No Content` on success. + +#### `GET /api/config` + +Returns editable runtime settings. + +**Response** + +```json +{ + "default_wait_seconds": 10, + "default_empty_response": "No new instructions available.", + "agent_stale_after_seconds": 30 +} +``` + +#### `PATCH /api/config` + +Updates runtime settings. + +**Request** + +```json +{ + "default_wait_seconds": 15, + "default_empty_response": "", + "agent_stale_after_seconds": 45 +} +``` + +#### `GET /api/events` + +Server-Sent Events endpoint for live UI updates. + +**Event types** + +- `instruction.created` +- `instruction.updated` +- `instruction.deleted` +- `instruction.consumed` +- `status.changed` +- `config.updated` + +**SSE payload example** + +```json +{ + "type": "instruction.consumed", + "timestamp": "ISO-8601 timestamp", + "data": { + "id": "uuid", + "consumed_by_agent_id": "copilot-agent" + } +} +``` + +## 6. UI/UX Plan + +### Layout priorities + +- A strong local-control dashboard feel rather than a generic admin template +- Clear separation between pending work and already-consumed history +- High-visibility connection/status strip for server and agent state +- Fast creation flow for new instructions +- Mobile-friendly stacking without losing queue readability + +### Required screens/sections + +- Header with project identity and server status +- Agent activity panel with last seen/fetch information +- Composer form for new instructions +- Pending instructions list with edit/delete actions +- Consumed instructions list with crossed-out styling and metadata +- Settings panel for wait timeout/default response behavior + +### Frontend quality bar + +- Follow `.github/instructions/frontend-design.instructions.md` before implementing any UI. +- Use only local assets. +- Build a visually distinctive interface with careful typography, color, spacing, motion, and responsive behavior. +- Keep accessibility in scope: semantic HTML, keyboard support, visible focus states, sufficient contrast. + +## 7. Logging, Reliability, and Error Handling Plan + +- Log startup, shutdown, configuration load, database initialization, and MCP registration. +- Log each instruction lifecycle event: created, updated, deleted, consumed. +- Log each `get_user_request` call with agent id, wait time, and result type. +- Return structured JSON errors for API failures. +- Protect queue consumption with transactions/locking so two simultaneous fetches cannot consume the same instruction. +- Validate payloads and reject empty or whitespace-only instructions. +- Handle browser reconnects for SSE cleanly. + +## 8. Todo List + +- [x] **Project setup** + - [x] Create the backend package structure under `app/`. + - [x] Add `requirements.txt` with only the required dependencies. + - [x] Replace the placeholder contents of `main.py` with the application entrypoint. + - [x] Add a local configuration strategy for defaults and runtime overrides. + +- [x] **Data layer** + - [x] Create SQLite schema for `instructions`, `settings`, and `agent_activity`. + - [x] Add startup migration/initialization logic. + - [x] Implement queue ordering and atomic consumption behavior. + - [x] Seed default settings on first run. + +- [x] **MCP server** + - [x] Register the `get_user_request` tool using the official MCP Python SDK. + - [x] Implement one-at-a-time delivery semantics. + - [x] Implement wait-until-timeout behavior when the queue is empty. + - [x] Return empty/default responses based on configuration. + - [x] Record agent activity on every tool call. + +- [x] **HTTP API** + - [x] Implement `GET /healthz`. + - [x] Implement `GET /api/status`. + - [x] Implement `GET /api/instructions`. + - [x] Implement `POST /api/instructions`. + - [x] Implement `PATCH /api/instructions/{instruction_id}`. + - [x] Implement `DELETE /api/instructions/{instruction_id}`. + - [x] Implement `GET /api/config`. + - [x] Implement `PATCH /api/config`. + - [x] Implement `GET /api/events` for SSE. + +- [x] **Frontend** + - [x] Read and follow `.github/instructions/frontend-design.instructions.md` before starting UI work. + - [x] Create `static/index.html` and split CSS/JS into separate folders/files. + - [x] Build the instruction composer. + - [x] Build the pending instruction list with edit/delete controls. + - [x] Build the consumed instruction list with crossed-out/greyed-out styling. + - [x] Build the live server/agent status panel. + - [x] Build the settings editor for timeout/default-response behavior. + - [x] Wire SSE updates into the UI so changes appear in real time. + - [x] Make the interface responsive and keyboard accessible. + +- [x] **Observability and robustness** + - [x] Add centralized logging configuration. + - [x] Add structured error responses and exception handling. + - [x] Add queue-consumption concurrency protection. + - [x] Add validation for invalid edits/deletes of consumed instructions. + - [ ] Add tests for empty-queue, timeout, and consume-once behavior. + +- [x] **Improvements (post-launch)** + - [x] Replace 1-second polling wait loop with `asyncio.Event`-based immediate wakeup. + - [x] Min-wait is a floor only when the queue is empty — a new instruction immediately wakes any waiting tool call (verified with timing test in `tests/test_wakeup.py`). + - [x] Enrich SSE events with full item payloads (no extra re-fetch round-trips). + - [x] Auto-refresh relative timestamps in the UI every 20 s. + - [x] Document title badge showing pending instruction count. + - [x] SSE reconnecting indicator in the header. + - [x] Dark / light theme toggle defaulting to OS colour-scheme preference. + - [x] `default_wait_seconds` changed to fully server-controlled (agents can no longer override wait time). + - [x] Non-blocking `server.ps1` management script (start / stop / restart / status / logs). + - [x] Non-blocking `server.sh` bash management script — identical feature set for macOS / Linux. + - [x] MCP stateless/stateful mode configurable via `MCP_STATELESS` env var (default `true`). + - [x] Per-agent generation counter prevents abandoned (timed-out) coroutines from silently consuming instructions meant for newer calls. + - [x] `tests/test_wakeup.py` covers both immediate-wakeup timing and concurrent-call generation safety. + - [x] Optional Bearer-token authentication via `API_TOKEN` env var (disabled by default); web UI prompts for token on first load. + +- [ ] **Documentation and developer experience** + - [x] Document local run instructions. + - [x] Document the MCP tool contract clearly. + - [x] Document the HTTP API with request/response examples. + - [x] Document how agent connectivity is inferred. + - [x] Document how the frontend design instruction must be used during UI implementation. + +## 9. Running the Server + +### Prerequisites + +- Python 3.11+ +- pip + +### Install dependencies + +```bash +pip install -r requirements.txt +``` + +### Start the server + +```bash +python main.py +``` + +Or use the included management scripts (recommended — non-blocking): + +**PowerShell (Windows)** +```powershell +.\server.ps1 start # start in background, logs to logs/ +.\server.ps1 stop # graceful stop +.\server.ps1 restart # stop + start +.\server.ps1 status # PID, memory, tail logs +.\server.ps1 logs # show last 40 stdout lines +.\server.ps1 logs -f # follow logs live +.\server.ps1 logs 100 # show last 100 lines +``` + +**Bash (macOS / Linux)** +```bash +chmod +x server.sh # make executable once +./server.sh start # start in background, logs to logs/ +./server.sh stop # graceful stop +./server.sh restart # stop + start +./server.sh status # PID, memory, tail logs +./server.sh logs # show last 40 stdout lines +./server.sh logs -f # follow logs live +./server.sh logs 100 # show last 100 lines +``` + +The server starts on `http://localhost:8000` by default. + +| URL | Description | +|-----|-------------| +| `http://localhost:8000/` | Web UI | +| `http://localhost:8000/mcp` | MCP streamable-HTTP endpoint | +| `http://localhost:8000/docs` | FastAPI interactive API docs | + +### Environment variable overrides + +| Variable | Default | Description | +|----------|---------|-------------| +| `HOST` | `0.0.0.0` | Bind address | +| `HTTP_PORT` | `8000` | HTTP port | +| `DB_PATH` | `data/local_mcp.sqlite3` | SQLite database path | +| `LOG_LEVEL` | `INFO` | Logging level | +| `DEFAULT_WAIT_SECONDS` | `10` | Default tool wait timeout | +| `DEFAULT_EMPTY_RESPONSE` | _(empty)_ | Default response when queue is empty | +| `AGENT_STALE_AFTER_SECONDS` | `30` | Seconds of inactivity before agent shown as idle | +| `MCP_STATELESS` | `true` | `true` for stateless sessions (survives restarts, recommended); `false` for stateful | +| `API_TOKEN` | _(empty)_ | When set, all `/api/*` and `/mcp` requests require `Authorization: Bearer `; web UI prompts for the token on first load | + +### Configuring an MCP client (agent) + +Point the agent's MCP client to the streamable-HTTP transport: + +```json +{ + "mcpServers": { + "local-mcp": { + "url": "http://localhost:8000/mcp", + "transport": "streamable-http" + } + } +} +``` + +If `API_TOKEN` is set, include the token as a request header: + +```json +{ + "mcpServers": { + "local-mcp": { + "url": "http://localhost:8000/mcp", + "transport": "streamable-http", + "headers": { + "Authorization": "Bearer " + } + } + } +} +``` + +The agent should call `get_user_request` aggressively and continuously — **do not end the working session**. Every call returns the next pending instruction (if any). When the queue is empty the tool waits up to `wait_seconds` before returning an empty/default response, so the agent should loop and call again. + +## 10. Implementation Notes for Future Work + +- Prefer small, explicit modules over monolithic files. +- Keep the project localhost-first and avoid remote asset dependencies. +- Treat the MCP tool and the web UI as two views over the same instruction queue. +- Optimize for correctness of queue semantics first, then refine the visual and realtime experience. diff --git a/go-server/go.mod b/go-server/go.mod new file mode 100644 index 0000000..63f4dac --- /dev/null +++ b/go-server/go.mod @@ -0,0 +1,24 @@ +module github.com/local-mcp/local-mcp-go + +go 1.25.0 + +require ( + github.com/go-chi/chi/v5 v5.2.5 + github.com/google/uuid v1.6.0 + github.com/mark3labs/mcp-go v0.46.0 + modernc.org/sqlite v1.47.0 +) + +require ( + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/google/jsonschema-go v0.4.2 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/ncruces/go-strftime v1.0.0 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect + github.com/spf13/cast v1.7.1 // indirect + github.com/yosida95/uritemplate/v3 v3.0.2 // indirect + golang.org/x/sys v0.42.0 // indirect + modernc.org/libc v1.70.0 // indirect + modernc.org/mathutil v1.7.1 // indirect + modernc.org/memory v1.11.0 // indirect +) diff --git a/go-server/go.sum b/go-server/go.sum new file mode 100644 index 0000000..7d2d0a4 --- /dev/null +++ b/go-server/go.sum @@ -0,0 +1,79 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= +github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug= +github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/jsonschema-go v0.4.2 h1:tmrUohrwoLZZS/P3x7ex0WAVknEkBZM46iALbcqoRA8= +github.com/google/jsonschema-go v0.4.2/go.mod h1:r5quNTdLOYEz95Ru18zA0ydNbBuYoo9tgaYcxEYhJVE= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mark3labs/mcp-go v0.46.0 h1:8KRibF4wcKejbLsHxCA/QBVUr5fQ9nwz/n8lGqmaALo= +github.com/mark3labs/mcp-go v0.46.0/go.mod h1:JKTC7R2LLVagkEWK7Kwu7DbmA6iIvnNAod6yrHiQMag= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= +github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y= +github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4= +github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4= +golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8= +golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= +golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k= +golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +modernc.org/cc/v4 v4.27.1 h1:9W30zRlYrefrDV2JE2O8VDtJ1yPGownxciz5rrbQZis= +modernc.org/cc/v4 v4.27.1/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0= +modernc.org/ccgo/v4 v4.32.0 h1:hjG66bI/kqIPX1b2yT6fr/jt+QedtP2fqojG2VrFuVw= +modernc.org/ccgo/v4 v4.32.0/go.mod h1:6F08EBCx5uQc38kMGl+0Nm0oWczoo1c7cgpzEry7Uc0= +modernc.org/fileutil v1.4.0 h1:j6ZzNTftVS054gi281TyLjHPp6CPHr2KCxEXjEbD6SM= +modernc.org/fileutil v1.4.0/go.mod h1:EqdKFDxiByqxLk8ozOxObDSfcVOv/54xDs/DUHdvCUU= +modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI= +modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito= +modernc.org/gc/v3 v3.1.2 h1:ZtDCnhonXSZexk/AYsegNRV1lJGgaNZJuKjJSWKyEqo= +modernc.org/gc/v3 v3.1.2/go.mod h1:HFK/6AGESC7Ex+EZJhJ2Gni6cTaYpSMmU/cT9RmlfYY= +modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks= +modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI= +modernc.org/libc v1.70.0 h1:U58NawXqXbgpZ/dcdS9kMshu08aiA6b7gusEusqzNkw= +modernc.org/libc v1.70.0/go.mod h1:OVmxFGP1CI/Z4L3E0Q3Mf1PDE0BucwMkcXjjLntvHJo= +modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU= +modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg= +modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI= +modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw= +modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8= +modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns= +modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w= +modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE= +modernc.org/sqlite v1.47.0 h1:R1XyaNpoW4Et9yly+I2EeX7pBza/w+pmYee/0HJDyKk= +modernc.org/sqlite v1.47.0/go.mod h1:hWjRO6Tj/5Ik8ieqxQybiEOUXy0NJFNp2tpvVpKlvig= +modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0= +modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A= +modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= +modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= diff --git a/go-server/internal/api/auth.go b/go-server/internal/api/auth.go new file mode 100644 index 0000000..8e9af58 --- /dev/null +++ b/go-server/internal/api/auth.go @@ -0,0 +1,26 @@ +package api + +import ( + "net/http" + "strings" +) + +// bearerAuthMiddleware enforces Bearer token authentication for protected routes. +func bearerAuthMiddleware(requiredToken string) func(http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + auth := r.Header.Get("Authorization") + if !strings.HasPrefix(auth, "Bearer ") { + writeError(w, http.StatusUnauthorized, "Missing or invalid Authorization header") + return + } + token := strings.TrimPrefix(auth, "Bearer ") + if token != requiredToken { + writeError(w, http.StatusUnauthorized, "Invalid token") + return + } + next.ServeHTTP(w, r) + }) + } +} + diff --git a/go-server/internal/api/config.go b/go-server/internal/api/config.go new file mode 100644 index 0000000..93b4013 --- /dev/null +++ b/go-server/internal/api/config.go @@ -0,0 +1,61 @@ +package api + +import ( + "encoding/json" + "net/http" + + "github.com/local-mcp/local-mcp-go/internal/events" +) + +func handleGetConfig(stores Stores) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + cfg, err := stores.Settings.Get() + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, cfg) + } +} + +func handleUpdateConfig(stores Stores, broker *events.Broker) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + // Decode partial patch + var patch struct { + DefaultWaitSeconds *int `json:"default_wait_seconds"` + DefaultEmptyResponse *string `json:"default_empty_response"` + AgentStaleAfterSeconds *int `json:"agent_stale_after_seconds"` + } + if err := json.NewDecoder(r.Body).Decode(&patch); err != nil { + writeError(w, http.StatusBadRequest, "Invalid JSON") + return + } + + // Get current settings + current, err := stores.Settings.Get() + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + + // Apply patches + if patch.DefaultWaitSeconds != nil { + current.DefaultWaitSeconds = *patch.DefaultWaitSeconds + } + if patch.DefaultEmptyResponse != nil { + current.DefaultEmptyResponse = *patch.DefaultEmptyResponse + } + if patch.AgentStaleAfterSeconds != nil { + current.AgentStaleAfterSeconds = *patch.AgentStaleAfterSeconds + } + + if err := stores.Settings.Update(current); err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + + broker.Broadcast("config.updated", map[string]any{"config": current}) + writeJSON(w, http.StatusOK, current) + } +} + diff --git a/go-server/internal/api/events.go b/go-server/internal/api/events.go new file mode 100644 index 0000000..d82627d --- /dev/null +++ b/go-server/internal/api/events.go @@ -0,0 +1,47 @@ +package api + +import ( + "net/http" + + "github.com/local-mcp/local-mcp-go/internal/events" +) + +// handleSSE streams server-sent events to browser clients. +func handleSSE(broker *events.Broker) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + // Set SSE headers + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") + + flusher, ok := w.(http.Flusher) + if !ok { + writeError(w, http.StatusInternalServerError, "Streaming not supported") + return + } + + // Subscribe to event broker + ch := broker.Subscribe() + defer broker.Unsubscribe(ch) + + // Send initial connection event + w.Write([]byte("data: {\"type\":\"connected\"}\n\n")) + flusher.Flush() + + // Stream events until client disconnects + for { + select { + case msg, ok := <-ch: + if !ok { + return // broker closed + } + w.Write(msg) + flusher.Flush() + case <-r.Context().Done(): + return // client disconnected + } + } + } +} + diff --git a/go-server/internal/api/instructions.go b/go-server/internal/api/instructions.go new file mode 100644 index 0000000..2d31052 --- /dev/null +++ b/go-server/internal/api/instructions.go @@ -0,0 +1,119 @@ +package api + +import ( + "encoding/json" + "net/http" + + "github.com/go-chi/chi/v5" + "github.com/local-mcp/local-mcp-go/internal/events" + "github.com/local-mcp/local-mcp-go/internal/store" +) + +func handleListInstructions(stores Stores) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + status := r.URL.Query().Get("status") + if status == "" { + status = "all" + } + items, err := stores.Instructions.List(status) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, map[string]any{"items": items}) + } +} + +func handleCreateInstruction(stores Stores, broker *events.Broker) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var body struct { + Content string `json:"content"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + writeError(w, http.StatusBadRequest, "Invalid JSON") + return + } + if body.Content == "" { + writeError(w, http.StatusBadRequest, "content is required") + return + } + + item, err := stores.Instructions.Create(body.Content) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + + counts, _ := stores.Instructions.Counts() + broker.Broadcast("instruction.created", map[string]any{"item": item}) + broker.Broadcast("status.changed", map[string]any{"queue": counts}) + + writeJSON(w, http.StatusCreated, item) + } +} + +func handleUpdateInstruction(stores Stores, broker *events.Broker) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + var body struct { + Content string `json:"content"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + writeError(w, http.StatusBadRequest, "Invalid JSON") + return + } + + item, err := stores.Instructions.Update(id, body.Content) + if err == store.ErrNotFound { + writeError(w, http.StatusNotFound, "Instruction not found") + return + } + if err == store.ErrAlreadyConsumed { + writeError(w, http.StatusConflict, "Cannot edit consumed instruction") + return + } + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + + broker.Broadcast("instruction.updated", map[string]any{"item": item}) + writeJSON(w, http.StatusOK, item) + } +} + +func handleDeleteInstruction(stores Stores, broker *events.Broker) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + if err := stores.Instructions.Delete(id); err == store.ErrNotFound { + writeError(w, http.StatusNotFound, "Instruction not found") + return + } else if err == store.ErrAlreadyConsumed { + writeError(w, http.StatusConflict, "Cannot delete consumed instruction") + return + } else if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + + counts, _ := stores.Instructions.Counts() + broker.Broadcast("instruction.deleted", map[string]any{"id": id}) + broker.Broadcast("status.changed", map[string]any{"queue": counts}) + + w.WriteHeader(http.StatusNoContent) + } +} + +func handleClearConsumed(stores Stores, broker *events.Broker) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if err := stores.Instructions.DeleteConsumed(); err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + counts, _ := stores.Instructions.Counts() + broker.Broadcast("history.cleared", nil) + broker.Broadcast("status.changed", map[string]any{"queue": counts}) + w.WriteHeader(http.StatusNoContent) + } +} + diff --git a/go-server/internal/api/router.go b/go-server/internal/api/router.go new file mode 100644 index 0000000..32d87ae --- /dev/null +++ b/go-server/internal/api/router.go @@ -0,0 +1,85 @@ +// Package api implements the REST HTTP endpoints served alongside the MCP server. +package api + +import ( + "encoding/json" + "io/fs" + "net/http" + "time" + + "github.com/go-chi/chi/v5" + "github.com/go-chi/chi/v5/middleware" + + "github.com/local-mcp/local-mcp-go/internal/events" + "github.com/local-mcp/local-mcp-go/internal/store" +) + +// Stores groups all database stores that the API handlers need. +type Stores struct { + Instructions *store.InstructionStore + Settings *store.SettingsStore + Agents *store.AgentStore +} + +// NewRouter builds and returns the main chi router. +// staticFS must serve the embedded static directory; pass nil to skip. +func NewRouter(stores Stores, broker *events.Broker, apiToken string, staticFS fs.FS) http.Handler { + r := chi.NewRouter() + + r.Use(middleware.RealIP) + r.Use(middleware.Recoverer) + + // Auth-check endpoint — always public + r.Get("/auth-check", func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, http.StatusOK, map[string]any{ + "auth_required": apiToken != "", + }) + }) + + // Health — always public + r.Get("/healthz", handleHealth()) + + // Static files — always public + if staticFS != nil { + r.Get("/", func(w http.ResponseWriter, r *http.Request) { + http.ServeFileFS(w, r, staticFS, "index.html") + }) + r.Handle("/static/*", http.StripPrefix("/static/", http.FileServerFS(staticFS))) + } + + // All /api/* routes are protected when apiToken is set + r.Group(func(r chi.Router) { + if apiToken != "" { + r.Use(bearerAuthMiddleware(apiToken)) + } + r.Get("/api/status", handleStatus(stores)) + r.Get("/api/instructions", handleListInstructions(stores)) + r.Post("/api/instructions", handleCreateInstruction(stores, broker)) + r.Patch("/api/instructions/{id}", handleUpdateInstruction(stores, broker)) + r.Delete("/api/instructions/consumed", handleClearConsumed(stores, broker)) + r.Delete("/api/instructions/{id}", handleDeleteInstruction(stores, broker)) + r.Get("/api/config", handleGetConfig(stores)) + r.Patch("/api/config", handleUpdateConfig(stores, broker)) + r.Get("/api/events", handleSSE(broker)) + }) + + return r +} + +// writeJSON serialises v as JSON with the given status code. +func writeJSON(w http.ResponseWriter, status int, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + _ = json.NewEncoder(w).Encode(v) +} + +// writeError writes a JSON {"detail": msg} error response. +func writeError(w http.ResponseWriter, status int, msg string) { + writeJSON(w, status, map[string]string{"detail": msg}) +} + +// serverStartTime records when this process started, used by /api/status. +var serverStartTime = time.Now().UTC() + + + diff --git a/go-server/internal/api/status.go b/go-server/internal/api/status.go new file mode 100644 index 0000000..dbec036 --- /dev/null +++ b/go-server/internal/api/status.go @@ -0,0 +1,43 @@ +package api + +import ( + "net/http" + "time" +) + +func handleHealth() http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, http.StatusOK, map[string]any{ + "status": "ok", + "server_time": time.Now().UTC().Format(time.RFC3339Nano), + }) + } +} + +func handleStatus(stores Stores) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + counts, _ := stores.Instructions.Counts() + latest, _ := stores.Agents.Latest() + cfg, _ := stores.Settings.Get() + + resp := map[string]any{ + "uptime_seconds": int(time.Since(serverStartTime).Seconds()), + "queue_pending": counts.PendingCount, + "queue_consumed": counts.ConsumedCount, + "agent_stale_after_seconds": cfg.AgentStaleAfterSeconds, + } + + if latest != nil { + isStale := time.Since(latest.LastSeenAt).Seconds() > float64(cfg.AgentStaleAfterSeconds) + resp["agent"] = map[string]any{ + "agent_id": latest.AgentID, + "last_fetch_at": latest.LastFetchAt.Format(time.RFC3339Nano), + "last_result_type": latest.LastResultType, + "is_stale": isStale, + } + } + + writeJSON(w, http.StatusOK, resp) + } +} + diff --git a/go-server/internal/config/config.go b/go-server/internal/config/config.go new file mode 100644 index 0000000..54e7bd6 --- /dev/null +++ b/go-server/internal/config/config.go @@ -0,0 +1,61 @@ +// Package config loads runtime configuration from environment variables. +package config + +import ( + "os" + "strconv" +) + +// Config holds all runtime configuration values for local-mcp. +type Config struct { + Host string + HTTPPort string + DBPath string + LogLevel string + DefaultWaitSeconds int + DefaultEmptyResponse string + AgentStaleAfterSeconds int + MCPStateless bool + APIToken string +} + +// Load reads configuration from environment variables with sensible defaults. +func Load() Config { + return Config{ + Host: getEnv("HOST", "0.0.0.0"), + HTTPPort: getEnv("HTTP_PORT", "8000"), + DBPath: getEnv("DB_PATH", "data/local_mcp.sqlite3"), + LogLevel: getEnv("LOG_LEVEL", "INFO"), + DefaultWaitSeconds: getEnvInt("DEFAULT_WAIT_SECONDS", 10), + DefaultEmptyResponse: getEnv("DEFAULT_EMPTY_RESPONSE", ""), + AgentStaleAfterSeconds: getEnvInt("AGENT_STALE_AFTER_SECONDS", 30), + MCPStateless: getEnvBool("MCP_STATELESS", true), + APIToken: getEnv("API_TOKEN", ""), + } +} + +func getEnv(key, defaultVal string) string { + if v := os.Getenv(key); v != "" { + return v + } + return defaultVal +} + +func getEnvInt(key string, defaultVal int) int { + if v := os.Getenv(key); v != "" { + if i, err := strconv.Atoi(v); err == nil { + return i + } + } + return defaultVal +} + +func getEnvBool(key string, defaultVal bool) bool { + if v := os.Getenv(key); v != "" { + if b, err := strconv.ParseBool(v); err == nil { + return b + } + } + return defaultVal +} + diff --git a/go-server/internal/db/db.go b/go-server/internal/db/db.go new file mode 100644 index 0000000..655188f --- /dev/null +++ b/go-server/internal/db/db.go @@ -0,0 +1,77 @@ +// Package db manages the SQLite connection and schema migrations. +package db + +import ( + "database/sql" + "fmt" + "os" + "path/filepath" + + _ "modernc.org/sqlite" // pure-Go SQLite driver, no CGo required +) + +// schema creates all tables if they do not already exist. +const schema = ` +CREATE TABLE IF NOT EXISTS instructions ( + id TEXT PRIMARY KEY, + content TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + consumed_at TEXT, + consumed_by_agent_id TEXT, + position INTEGER NOT NULL DEFAULT 0 +); + +CREATE INDEX IF NOT EXISTS idx_instructions_status ON instructions(status); +CREATE INDEX IF NOT EXISTS idx_instructions_position ON instructions(position); + +CREATE TABLE IF NOT EXISTS settings ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS agent_activity ( + agent_id TEXT PRIMARY KEY, + last_seen_at TEXT NOT NULL, + last_fetch_at TEXT NOT NULL, + last_result_type TEXT NOT NULL +); +` + +// defaultSettings seeds initial values; OR IGNORE means existing rows are unchanged. +const defaultSettings = ` +INSERT OR IGNORE INTO settings (key, value) VALUES ('default_wait_seconds', '10'); +INSERT OR IGNORE INTO settings (key, value) VALUES ('default_empty_response', ''); +INSERT OR IGNORE INTO settings (key, value) VALUES ('agent_stale_after_seconds','30'); +` + +// Open opens (creating if necessary) a SQLite database at dbPath, applies the +// schema, and seeds default settings. +func Open(dbPath string) (*sql.DB, error) { + dir := filepath.Dir(dbPath) + if err := os.MkdirAll(dir, 0o755); err != nil { + return nil, fmt.Errorf("create db directory: %w", err) + } + + db, err := sql.Open("sqlite", dbPath+"?_pragma=journal_mode(WAL)&_pragma=foreign_keys(on)&_pragma=busy_timeout(5000)") + if err != nil { + return nil, fmt.Errorf("open database: %w", err) + } + + // Serialise all writes through a single connection to avoid locking. + db.SetMaxOpenConns(1) + + if _, err := db.Exec(schema); err != nil { + _ = db.Close() + return nil, fmt.Errorf("apply schema: %w", err) + } + + if _, err := db.Exec(defaultSettings); err != nil { + _ = db.Close() + return nil, fmt.Errorf("seed settings: %w", err) + } + + return db, nil +} + diff --git a/go-server/internal/events/broker.go b/go-server/internal/events/broker.go new file mode 100644 index 0000000..a289538 --- /dev/null +++ b/go-server/internal/events/broker.go @@ -0,0 +1,73 @@ +// Package events provides an SSE event broker for fanning out server-sent +// events to browser clients watching /api/events. +package events + +import ( + "encoding/json" + "fmt" + "sync" + "time" +) + +// Event is the wire format sent to browser clients. +type Event struct { + Type string `json:"type"` + Timestamp string `json:"timestamp"` + Data any `json:"data"` +} + +// Broker distributes named events to all currently-connected SSE clients. +// Clients subscribe by calling Subscribe(); they must call Unsubscribe() when +// done to avoid goroutine leaks. +type Broker struct { + mu sync.RWMutex + clients map[chan []byte]struct{} +} + +// NewBroker creates a ready-to-use Broker. +func NewBroker() *Broker { + return &Broker{clients: make(map[chan []byte]struct{})} +} + +// Subscribe returns a channel that will receive serialised SSE "data: ..." lines. +func (b *Broker) Subscribe() chan []byte { + ch := make(chan []byte, 32) // buffered so a slow reader doesn't stall others + b.mu.Lock() + b.clients[ch] = struct{}{} + b.mu.Unlock() + return ch +} + +// Unsubscribe removes the channel and closes it. +func (b *Broker) Unsubscribe(ch chan []byte) { + b.mu.Lock() + delete(b.clients, ch) + b.mu.Unlock() + close(ch) +} + +// Broadcast encodes and sends an event to all subscribers. Slow subscribers +// are skipped (their buffered channel is full) to prevent head-of-line blocking. +func (b *Broker) Broadcast(eventType string, data any) { + ev := Event{ + Type: eventType, + Timestamp: time.Now().UTC().Format(time.RFC3339Nano), + Data: data, + } + payload, err := json.Marshal(ev) + if err != nil { + return // should never happen + } + line := fmt.Sprintf("data: %s\n\n", payload) + msg := []byte(line) + + b.mu.RLock() + for ch := range b.clients { + select { + case ch <- msg: + default: // skip stalled clients + } + } + b.mu.RUnlock() +} + diff --git a/go-server/internal/mcp/handler.go b/go-server/internal/mcp/handler.go new file mode 100644 index 0000000..9464335 --- /dev/null +++ b/go-server/internal/mcp/handler.go @@ -0,0 +1,289 @@ +// Package mcp registers the MCP server and implements the get_user_request tool. +package mcp + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "time" + + "github.com/mark3labs/mcp-go/mcp" + "github.com/mark3labs/mcp-go/server" + + "github.com/local-mcp/local-mcp-go/internal/events" + "github.com/local-mcp/local-mcp-go/internal/models" + "github.com/local-mcp/local-mcp-go/internal/store" +) + +const ( + // maxWaitSeconds is the absolute upper bound for a single tool call wait. + maxWaitSeconds = 86400 + + // defaultWaitSeconds is the hardcoded wait time when no instruction is available. + // Set to 50s to stay safely under the 60s client timeout while allowing + // multiple keepalive progress updates. + defaultWaitSeconds = 50 + + // defaultEmptyResponse is returned when the queue is empty after waiting. + defaultEmptyResponse = "call this tool `get_user_request` again to fetch latest user input..." + + // keepaliveInterval controls how often a log notification is sent to the + // client while waiting. Reduced to 5s (from 20s) for more frequent progress updates. + // This keeps transport-level TCP/HTTP read timeouts from firing. + // Note: it does NOT reset application-level wall-clock timers + // (e.g. the Copilot 60 s limit), which are unaffected by SSE bytes. + keepaliveInterval = 5 * time.Second +) + +// Handler wraps the MCP server and holds references to the stores it needs. +type Handler struct { + MCP *server.MCPServer + instStore *store.InstructionStore + settStore *store.SettingsStore + agentStore *store.AgentStore + broker *events.Broker +} + +// New creates a Handler and registers the get_user_request tool. +func New( + instStore *store.InstructionStore, + settStore *store.SettingsStore, + agentStore *store.AgentStore, + broker *events.Broker, +) *Handler { + h := &Handler{ + MCP: server.NewMCPServer("local-mcp", "1.0.0"), + instStore: instStore, + settStore: settStore, + agentStore: agentStore, + broker: broker, + } + + h.MCP.AddTool( + mcp.NewTool("get_user_request", + mcp.WithDescription(`Fetch the next pending user instruction from the queue. + +If no instruction is available the tool will wait up to wait_seconds +(or the server-configured default) before returning an empty / default response. + +Args: + agent_id: An identifier for this agent instance (used to track connectivity). + default_response_override: Override the server-default empty response text + for this single call. + +Returns: + A dict with keys: status, result_type, instruction, response, + remaining_pending, waited_seconds.`), + mcp.WithString("agent_id", + mcp.Description("Identifier for this agent instance"), + mcp.DefaultString("unknown"), + ), + mcp.WithString("default_response_override", + mcp.Description("Override the server-default empty response for this call"), + mcp.DefaultString(""), + ), + ), + h.handleGetUserRequest, + ) + + return h +} + +func (h *Handler) handleGetUserRequest(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { + agentID := req.GetString("agent_id", "unknown") + defaultOverride := req.GetString("default_response_override", "") + + // Wait time is hardcoded to stay safely under the 60s client timeout + actualWait := defaultWaitSeconds + if actualWait > maxWaitSeconds { + actualWait = maxWaitSeconds + } + waitDur := time.Duration(actualWait) * time.Second + + // Register this call as the newest for this agent. + myGen := h.instStore.Agents().NewGeneration(agentID) + + // Immediate dequeue attempt. + item, err := h.instStore.ConsumeNext(agentID) + if err != nil { + return nil, fmt.Errorf("consume: %w", err) + } + if item != nil { + return h.deliverInstruction(ctx, item, agentID, 0) + } + + // --- Wait loop --- + deadline := time.Now().Add(waitDur) + wakeup := h.instStore.Wakeup() + lastKeepalive := time.Now() + + for { + remaining := time.Until(deadline) + if remaining <= 0 { + break + } + + // Step aside if a newer call arrived for this agent. + if !h.instStore.Agents().IsActive(agentID, myGen) { + slog.Debug("get_user_request: superseded", "agent", agentID, "gen", myGen) + break + } + + // Check the queue. + item, err = h.instStore.ConsumeNext(agentID) + if err != nil { + return nil, err + } + if item != nil { + waited := int(time.Since(deadline.Add(-waitDur)).Seconds()) + if waited < 0 { + waited = 0 + } + return h.deliverInstruction(ctx, item, agentID, waited) + } + + // Calculate next sleep: no longer than time-to-keepalive and no longer than remaining. + toKeepalive := keepaliveInterval - time.Since(lastKeepalive) + if toKeepalive < 0 { + toKeepalive = 0 + } + sleep := remaining + if toKeepalive < sleep { + sleep = toKeepalive + } + if sleep > time.Second { + sleep = time.Second // check activity/cancellation at least every second + } + + // Wait for wakeup, context cancel, or sleep expiry. + select { + case <-ctx.Done(): + // Client disconnected. + slog.Debug("get_user_request: context cancelled", "agent", agentID) + return emptyResult(defaultOverride, 0), nil + case <-wakeup.Chan(): + // Instruction may have arrived — loop back to check. + case <-time.After(sleep): + // Timeout slice expired. + } + + // Send SSE keepalive if interval has elapsed. + if time.Since(lastKeepalive) >= keepaliveInterval { + waited := int(time.Since(deadline.Add(-waitDur)).Seconds()) + if waited < 0 { + waited = 0 + } + remaining := actualWait - waited + if remaining < 0 { + remaining = 0 + } + // Progress bar: filled dots proportional to elapsed time + progressPct := (waited * 100) / actualWait + if progressPct > 100 { + progressPct = 100 + } + filled := progressPct / 10 + bar := "" + for i := 0; i < 10; i++ { + if i < filled { + bar += "●" + } else { + bar += "○" + } + } + msg := fmt.Sprintf("⏳ Waiting for instructions... %s %ds / %ds (agent=%s, %ds remaining)", + bar, waited, actualWait, agentID, remaining) + if err := h.MCP.SendLogMessageToClient(ctx, mcp.LoggingMessageNotification{ + Params: mcp.LoggingMessageNotificationParams{ + Level: mcp.LoggingLevelInfo, + Data: msg, + }, + }); err != nil { + // Client gone — stop waiting. + slog.Debug("get_user_request: keepalive failed, stopping", "agent", agentID, "err", err) + break + } + slog.Debug("get_user_request: keepalive sent", "agent", agentID, "waited", waited, "progress", progressPct) + lastKeepalive = time.Now() + } + } + + // Queue still empty (or superseded / cancelled) after waiting. + waited := int(waitDur.Seconds() - time.Until(deadline).Seconds()) + if waited < 0 { + waited = 0 + } + + if h.instStore.Agents().IsActive(agentID, myGen) { + _ = h.agentStore.Record(agentID, "empty") + h.broker.Broadcast("status.changed", map[string]any{}) + } + + slog.Info("get_user_request: empty", "agent", agentID, "waited", waited, "gen", myGen) + return emptyResult(defaultOverride, waited), nil +} + +func (h *Handler) deliverInstruction(ctx context.Context, item *models.Instruction, agentID string, waited int) (*mcp.CallToolResult, error) { + counts, _ := h.instStore.Counts() + _ = h.agentStore.Record(agentID, "instruction") + + // Broadcast consumed event + status update. + h.broker.Broadcast("instruction.consumed", map[string]any{ + "item": item, + "consumed_by_agent_id": agentID, + }) + h.broker.Broadcast("status.changed", map[string]any{"queue": counts}) + + slog.Info("get_user_request: delivered", "id", item.ID, "agent", agentID, "waited", waited) + + result := map[string]any{ + "status": "ok", + "result_type": "instruction", + "instruction": map[string]any{ + "id": item.ID, + "content": item.Content, + "consumed_at": item.ConsumedAt, + }, + "response": nil, + "remaining_pending": counts.PendingCount, + "waited_seconds": waited, + } + return mcp.NewToolResultText(jsonMarshalStr(result)), nil +} + +func emptyResult(override string, waited int) *mcp.CallToolResult { + resp := override + if resp == "" { + resp = defaultEmptyResponse + } + + resultType := "empty" + if resp != "" { + resultType = "default_response" + } + + result := map[string]any{ + "status": "ok", + "result_type": resultType, + "instruction": nil, + "response": resp, + "remaining_pending": 0, + "waited_seconds": waited, + } + return mcp.NewToolResultText(jsonMarshalStr(result)) +} + +func jsonMarshalStr(v any) string { + b, _ := json.Marshal(v) + return string(b) +} + + + + + + + + + diff --git a/go-server/internal/models/models.go b/go-server/internal/models/models.go new file mode 100644 index 0000000..c63693e --- /dev/null +++ b/go-server/internal/models/models.go @@ -0,0 +1,46 @@ +// Package models defines the core data types shared across all packages. +package models + +import "time" + +// InstructionStatus represents the lifecycle state of a queue item. +type InstructionStatus string + +const ( + StatusPending InstructionStatus = "pending" + StatusConsumed InstructionStatus = "consumed" +) + +// Instruction is a single item in the queue. +type Instruction struct { + ID string `json:"id"` + Content string `json:"content"` + Status InstructionStatus `json:"status"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + ConsumedAt *time.Time `json:"consumed_at"` + ConsumedByAgentID *string `json:"consumed_by_agent_id"` + Position int `json:"position"` +} + +// Settings holds user-configurable runtime parameters. +type Settings struct { + DefaultWaitSeconds int `json:"default_wait_seconds"` + DefaultEmptyResponse string `json:"default_empty_response"` + AgentStaleAfterSeconds int `json:"agent_stale_after_seconds"` +} + +// AgentActivity tracks the last time an agent called get_user_request. +type AgentActivity struct { + AgentID string `json:"agent_id"` + LastSeenAt time.Time `json:"last_seen_at"` + LastFetchAt time.Time `json:"last_fetch_at"` + LastResultType string `json:"last_result_type"` +} + +// QueueCounts summarises the number of items in each state. +type QueueCounts struct { + PendingCount int `json:"pending_count"` + ConsumedCount int `json:"consumed_count"` +} + diff --git a/go-server/internal/store/agent.go b/go-server/internal/store/agent.go new file mode 100644 index 0000000..94240cb --- /dev/null +++ b/go-server/internal/store/agent.go @@ -0,0 +1,58 @@ +package store + +import ( + "database/sql" + "fmt" + "time" + + "github.com/local-mcp/local-mcp-go/internal/models" +) + +// AgentStore records and retrieves agent connectivity data. +type AgentStore struct { + db *sql.DB +} + +// NewAgentStore creates an AgentStore backed by db. +func NewAgentStore(db *sql.DB) *AgentStore { + return &AgentStore{db: db} +} + +// Record upserts agent activity for agentID with the given result type. +func (s *AgentStore) Record(agentID, resultType string) error { + now := time.Now().UTC().Format(time.RFC3339Nano) + _, err := s.db.Exec(` + INSERT INTO agent_activity (agent_id, last_seen_at, last_fetch_at, last_result_type) + VALUES (?, ?, ?, ?) + ON CONFLICT(agent_id) DO UPDATE SET + last_seen_at = excluded.last_seen_at, + last_fetch_at = excluded.last_fetch_at, + last_result_type = excluded.last_result_type`, + agentID, now, now, resultType) + return err +} + +// Latest returns the most recently active agent, or nil if no agent has ever +// called get_user_request. +func (s *AgentStore) Latest() (*models.AgentActivity, error) { + row := s.db.QueryRow(` + SELECT agent_id, last_seen_at, last_fetch_at, last_result_type + FROM agent_activity + ORDER BY last_seen_at DESC + LIMIT 1`) + + var a models.AgentActivity + var seenStr, fetchStr string + err := row.Scan(&a.AgentID, &seenStr, &fetchStr, &a.LastResultType) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("latest agent: %w", err) + } + + a.LastSeenAt, _ = time.Parse(time.RFC3339Nano, seenStr) + a.LastFetchAt, _ = time.Parse(time.RFC3339Nano, fetchStr) + return &a, nil +} + diff --git a/go-server/internal/store/instruction.go b/go-server/internal/store/instruction.go new file mode 100644 index 0000000..32877f3 --- /dev/null +++ b/go-server/internal/store/instruction.go @@ -0,0 +1,320 @@ +// Package store contains all database access logic. +// This file handles instruction queue operations. +package store + +import ( + "database/sql" + "fmt" + "sync" + "time" + + "github.com/google/uuid" + "github.com/local-mcp/local-mcp-go/internal/models" +) + +// WakeupSignal is an edge-triggered broadcast mechanism: closing the internal +// channel wakes all goroutines currently blocked on Chan(), then a new channel +// is installed for the next round of waiters. This mirrors asyncio.Event in +// the Python implementation. +type WakeupSignal struct { + mu sync.Mutex + ch chan struct{} +} + +// NewWakeupSignal creates a ready-to-use WakeupSignal. +func NewWakeupSignal() *WakeupSignal { + return &WakeupSignal{ch: make(chan struct{})} +} + +// Chan returns the current wait channel. Callers should capture the return +// value once and then select on it — do not call Chan() repeatedly. +func (w *WakeupSignal) Chan() <-chan struct{} { + w.mu.Lock() + defer w.mu.Unlock() + return w.ch +} + +// Notify wakes all goroutines currently waiting on Chan() by closing the +// channel, then installs a fresh channel for future waiters. +func (w *WakeupSignal) Notify() { + w.mu.Lock() + old := w.ch + w.ch = make(chan struct{}) + w.mu.Unlock() + close(old) +} + +// AgentTracker manages per-agent generation counters so that stale +// coroutines cannot silently consume instructions intended for newer calls. +type AgentTracker struct { + mu sync.Mutex + generations map[string]uint64 +} + +// NewAgentTracker creates an AgentTracker ready for use. +func NewAgentTracker() *AgentTracker { + return &AgentTracker{generations: make(map[string]uint64)} +} + +// NewGeneration increments and returns the current generation for agentID. +func (t *AgentTracker) NewGeneration(agentID string) uint64 { + t.mu.Lock() + defer t.mu.Unlock() + t.generations[agentID]++ + return t.generations[agentID] +} + +// IsActive returns true only if no newer call has arrived for agentID since +// this generation was issued. +func (t *AgentTracker) IsActive(agentID string, gen uint64) bool { + t.mu.Lock() + defer t.mu.Unlock() + return t.generations[agentID] == gen +} + +// InstructionStore provides all instruction queue operations. +type InstructionStore struct { + db *sql.DB + wakeup *WakeupSignal + agents *AgentTracker +} + +// NewInstructionStore creates a store backed by db. +func NewInstructionStore(db *sql.DB) *InstructionStore { + return &InstructionStore{ + db: db, + wakeup: NewWakeupSignal(), + agents: NewAgentTracker(), + } +} + +// Wakeup returns the shared wakeup signal. +func (s *InstructionStore) Wakeup() *WakeupSignal { return s.wakeup } + +// Agents returns the shared agent tracker. +func (s *InstructionStore) Agents() *AgentTracker { return s.agents } + +// List returns instructions filtered by status ("pending", "consumed", or "all"). +func (s *InstructionStore) List(status string) ([]models.Instruction, error) { + var rows *sql.Rows + var err error + + switch status { + case "pending", "consumed": + rows, err = s.db.Query(` + SELECT id, content, status, created_at, updated_at, + consumed_at, consumed_by_agent_id, position + FROM instructions + WHERE status = ? + ORDER BY position ASC, created_at ASC`, status) + default: // "all" + rows, err = s.db.Query(` + SELECT id, content, status, created_at, updated_at, + consumed_at, consumed_by_agent_id, position + FROM instructions + ORDER BY position ASC, created_at ASC`) + } + if err != nil { + return nil, fmt.Errorf("list instructions: %w", err) + } + defer rows.Close() + + var items []models.Instruction + for rows.Next() { + it, err := scanInstruction(rows) + if err != nil { + return nil, err + } + items = append(items, it) + } + return items, rows.Err() +} + +// Create inserts a new pending instruction at the end of the queue. +func (s *InstructionStore) Create(content string) (*models.Instruction, error) { + id := uuid.New().String() + now := time.Now().UTC() + + // Assign next position + var maxPos sql.NullInt64 + _ = s.db.QueryRow(`SELECT MAX(position) FROM instructions WHERE status = 'pending'`).Scan(&maxPos) + position := int(maxPos.Int64) + 1 + + _, err := s.db.Exec(` + INSERT INTO instructions (id, content, status, created_at, updated_at, position) + VALUES (?, ?, 'pending', ?, ?, ?)`, + id, content, now.Format(time.RFC3339Nano), now.Format(time.RFC3339Nano), position) + if err != nil { + return nil, fmt.Errorf("create instruction: %w", err) + } + + // Wake any waiting tool calls + s.wakeup.Notify() + + return s.GetByID(id) +} + +// Update edits a pending instruction's content. Returns the updated item or an +// error if the instruction is already consumed. +func (s *InstructionStore) Update(id, content string) (*models.Instruction, error) { + it, err := s.GetByID(id) + if err != nil { + return nil, err + } + if it.Status == models.StatusConsumed { + return nil, ErrAlreadyConsumed + } + + now := time.Now().UTC() + _, err = s.db.Exec(`UPDATE instructions SET content = ?, updated_at = ? WHERE id = ?`, + content, now.Format(time.RFC3339Nano), id) + if err != nil { + return nil, fmt.Errorf("update instruction: %w", err) + } + return s.GetByID(id) +} + +// Delete removes a pending instruction. Returns ErrAlreadyConsumed if the +// instruction has been delivered. +func (s *InstructionStore) Delete(id string) error { + it, err := s.GetByID(id) + if err != nil { + return err + } + if it.Status == models.StatusConsumed { + return ErrAlreadyConsumed + } + _, err = s.db.Exec(`DELETE FROM instructions WHERE id = ?`, id) + return err +} + +// DeleteConsumed removes all consumed instructions. +func (s *InstructionStore) DeleteConsumed() error { + _, err := s.db.Exec(`DELETE FROM instructions WHERE status = 'consumed'`) + return err +} + +// GetByID returns a single instruction or ErrNotFound. +func (s *InstructionStore) GetByID(id string) (*models.Instruction, error) { + row := s.db.QueryRow(` + SELECT id, content, status, created_at, updated_at, + consumed_at, consumed_by_agent_id, position + FROM instructions WHERE id = ?`, id) + + it, err := scanInstruction(row) + if err == sql.ErrNoRows { + return nil, ErrNotFound + } + if err != nil { + return nil, err + } + return &it, nil +} + +// ConsumeNext atomically claims the oldest pending instruction for agentID. +// Returns nil if the queue is empty. +func (s *InstructionStore) ConsumeNext(agentID string) (*models.Instruction, error) { + tx, err := s.db.Begin() + if err != nil { + return nil, fmt.Errorf("begin transaction: %w", err) + } + defer func() { _ = tx.Rollback() }() + + // Claim the oldest pending item with a row-level lock (SQLite uses file lock). + var id string + err = tx.QueryRow(` + SELECT id FROM instructions + WHERE status = 'pending' + ORDER BY position ASC, created_at ASC + LIMIT 1`).Scan(&id) + if err == sql.ErrNoRows { + return nil, nil // queue empty + } + if err != nil { + return nil, fmt.Errorf("select next: %w", err) + } + + now := time.Now().UTC() + _, err = tx.Exec(` + UPDATE instructions + SET status = 'consumed', consumed_at = ?, consumed_by_agent_id = ?, updated_at = ? + WHERE id = ? AND status = 'pending'`, + now.Format(time.RFC3339Nano), agentID, now.Format(time.RFC3339Nano), id) + if err != nil { + return nil, fmt.Errorf("mark consumed: %w", err) + } + + if err := tx.Commit(); err != nil { + return nil, fmt.Errorf("commit: %w", err) + } + + return s.GetByID(id) +} + +// Counts returns pending and consumed queue sizes. +func (s *InstructionStore) Counts() (models.QueueCounts, error) { + var c models.QueueCounts + rows, err := s.db.Query(` + SELECT status, COUNT(*) FROM instructions GROUP BY status`) + if err != nil { + return c, err + } + defer rows.Close() + for rows.Next() { + var status string + var n int + if err := rows.Scan(&status, &n); err != nil { + return c, err + } + switch status { + case "pending": + c.PendingCount = n + case "consumed": + c.ConsumedCount = n + } + } + return c, rows.Err() +} + +// Sentinel errors returned by InstructionStore. +var ( + ErrNotFound = fmt.Errorf("instruction not found") + ErrAlreadyConsumed = fmt.Errorf("instruction already consumed") +) + +// scanner is satisfied by both *sql.Row and *sql.Rows. +type scanner interface { + Scan(dest ...any) error +} + +func scanInstruction(r scanner) (models.Instruction, error) { + var it models.Instruction + var createdAtStr, updatedAtStr string + var consumedAtStr sql.NullString + var consumedByAgentID sql.NullString + + err := r.Scan( + &it.ID, &it.Content, &it.Status, + &createdAtStr, &updatedAtStr, + &consumedAtStr, &consumedByAgentID, + &it.Position, + ) + if err != nil { + return it, err + } + + it.CreatedAt, _ = time.Parse(time.RFC3339Nano, createdAtStr) + it.UpdatedAt, _ = time.Parse(time.RFC3339Nano, updatedAtStr) + + if consumedAtStr.Valid { + t, _ := time.Parse(time.RFC3339Nano, consumedAtStr.String) + it.ConsumedAt = &t + } + if consumedByAgentID.Valid { + s := consumedByAgentID.String + it.ConsumedByAgentID = &s + } + + return it, nil +} + diff --git a/go-server/internal/store/settings.go b/go-server/internal/store/settings.go new file mode 100644 index 0000000..077525e --- /dev/null +++ b/go-server/internal/store/settings.go @@ -0,0 +1,69 @@ +package store + +import ( + "database/sql" + "fmt" + "strconv" + + "github.com/local-mcp/local-mcp-go/internal/models" +) + +// SettingsStore reads and writes the settings table. +type SettingsStore struct { + db *sql.DB +} + +// NewSettingsStore creates a SettingsStore backed by db. +func NewSettingsStore(db *sql.DB) *SettingsStore { + return &SettingsStore{db: db} +} + +// Get returns the current settings. +func (s *SettingsStore) Get() (models.Settings, error) { + rows, err := s.db.Query(`SELECT key, value FROM settings`) + if err != nil { + return models.Settings{}, fmt.Errorf("get settings: %w", err) + } + defer rows.Close() + + cfg := models.Settings{ + DefaultWaitSeconds: 10, + AgentStaleAfterSeconds: 30, + } + + for rows.Next() { + var key, value string + if err := rows.Scan(&key, &value); err != nil { + return cfg, err + } + switch key { + case "default_wait_seconds": + if n, err := strconv.Atoi(value); err == nil { + cfg.DefaultWaitSeconds = n + } + case "default_empty_response": + cfg.DefaultEmptyResponse = value + case "agent_stale_after_seconds": + if n, err := strconv.Atoi(value); err == nil { + cfg.AgentStaleAfterSeconds = n + } + } + } + return cfg, rows.Err() +} + +// Update saves settings. Only non-nil fields are updated; pass a partial +// struct pointer using the Patch helper below. +func (s *SettingsStore) Update(patch models.Settings) error { + _, err := s.db.Exec(` + INSERT OR REPLACE INTO settings (key, value) VALUES + ('default_wait_seconds', ?), + ('default_empty_response', ?), + ('agent_stale_after_seconds', ?)`, + strconv.Itoa(patch.DefaultWaitSeconds), + patch.DefaultEmptyResponse, + strconv.Itoa(patch.AgentStaleAfterSeconds), + ) + return err +} + diff --git a/go-server/main.go b/go-server/main.go new file mode 100644 index 0000000..73f2dd4 --- /dev/null +++ b/go-server/main.go @@ -0,0 +1,163 @@ +// local-mcp-go — localhost MCP server delivering user instructions to agents. +package main + +import ( + "embed" + "fmt" + "io/fs" + "log/slog" + "net" + "net/http" + "os" + "os/signal" + "runtime" + "syscall" + + "github.com/mark3labs/mcp-go/server" + + "github.com/local-mcp/local-mcp-go/internal/api" + "github.com/local-mcp/local-mcp-go/internal/config" + "github.com/local-mcp/local-mcp-go/internal/db" + "github.com/local-mcp/local-mcp-go/internal/events" + "github.com/local-mcp/local-mcp-go/internal/mcp" + "github.com/local-mcp/local-mcp-go/internal/store" +) + +//go:embed static +var staticFS embed.FS + +func main() { + cfg := config.Load() + + // Logger + level := slog.LevelInfo + switch cfg.LogLevel { + case "DEBUG": + level = slog.LevelDebug + case "WARN", "WARNING": + level = slog.LevelWarn + case "ERROR": + level = slog.LevelError + } + slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: level}))) + + // Database + database, err := db.Open(cfg.DBPath) + if err != nil { + slog.Error("Failed to open database", "error", err) + os.Exit(1) + } + defer database.Close() + slog.Info("Database initialised", "path", cfg.DBPath) + + // Stores + instStore := store.NewInstructionStore(database) + settStore := store.NewSettingsStore(database) + agentStore := store.NewAgentStore(database) + + // Event broker + broker := events.NewBroker() + + // MCP server + mcpHandler := mcp.New(instStore, settStore, agentStore, broker) + sseServer := server.NewStreamableHTTPServer( + mcpHandler.MCP, + server.WithEndpointPath("/mcp"), + server.WithStateLess(cfg.MCPStateless), + ) + + // HTTP router + staticSubFS, _ := fs.Sub(staticFS, "static") + + router := api.NewRouter( + api.Stores{ + Instructions: instStore, + Settings: settStore, + Agents: agentStore, + }, + broker, + cfg.APIToken, + staticSubFS, + ) + + // Combined router: /mcp → MCP StreamableHTTP, /* → REST API + mux := http.NewServeMux() + mux.Handle("/mcp", sseServer) + mux.Handle("/mcp/", http.StripPrefix("/mcp", sseServer)) + mux.Handle("/", router) + + // Server with auto port switching + port := cfg.HTTPPort + maxAttempts := 10 + var srv *http.Server + var addr string + + for attempt := 0; attempt < maxAttempts; attempt++ { + addr = fmt.Sprintf("%s:%s", cfg.Host, port) + srv = &http.Server{Addr: addr, Handler: mux} + + // Try to listen + ln, err := net.Listen("tcp", addr) + if err == nil { + ln.Close() // Close test listener + break + } + + // Port taken, try next + portNum := 8000 + attempt + port = fmt.Sprintf("%d", portNum+1) + if attempt == maxAttempts-1 { + slog.Error("Could not find available port", "tried", maxAttempts) + os.Exit(1) + } + } + + if cfg.APIToken != "" { + slog.Info("Token authentication enabled") + } else { + slog.Info("Token authentication disabled (set API_TOKEN to enable)") + } + + httpURL := fmt.Sprintf("http://%s", addr) + mcpURL := fmt.Sprintf("http://%s/mcp", addr) + + slog.Info("local-mcp-go ready", + "http", httpURL, + "mcp", mcpURL, + "stateless", cfg.MCPStateless, + ) + + // On Windows, show interactive prompt + if runtime.GOOS == "windows" { + fmt.Println() + fmt.Println("╔══════════════════════════════════════════════════════════╗") + fmt.Printf("║ local-mcp-go ready on port %s%-24s║\n", port, "") + fmt.Println("║ ║") + fmt.Printf("║ Web UI: %-46s║\n", httpURL) + fmt.Printf("║ MCP: %-46s║\n", mcpURL) + fmt.Println("║ ║") + fmt.Println("║ Press Ctrl+C to stop the server ║") + fmt.Println("╚══════════════════════════════════════════════════════════╝") + fmt.Println() + } + + // Graceful shutdown on SIGINT / SIGTERM + stop := make(chan os.Signal, 1) + signal.Notify(stop, os.Interrupt, syscall.SIGTERM) + go func() { + <-stop + fmt.Println() + slog.Info("Shutting down gracefully...") + _ = srv.Close() + }() + + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + slog.Error("Server error", "error", err) + os.Exit(1) + } +} + + + + +