Full Go port of local-mcp with all core features. Copied from local-mcp-go worktree to consolidate into single-branch repo (easier maintenance). Architecture: - internal/config: Environment variable configuration - internal/models: Shared types (Instruction, Settings, AgentActivity, etc.) - internal/db: SQLite init with modernc.org/sqlite (pure Go, no CGo) - internal/store: Database operations + WakeupSignal + AgentTracker - internal/events: SSE broker for browser /api/events endpoint - internal/mcp: get_user_request MCP tool with 5s keepalive progress bars - internal/api: chi HTTP router with Bearer auth middleware - main.go: Entry point with auto port switching and Windows interactive banner Dependencies: - github.com/mark3labs/mcp-go@v0.46.0 - github.com/go-chi/chi/v5@v5.2.5 - modernc.org/sqlite@v1.47.0 (pure Go SQLite) - github.com/google/uuid@v1.6.0 Static assets embedded via //go:embed static Features matching Python: - Same wait strategy: 50s with 5s progress keepalives - Same hardcoded constants (DEFAULT_WAIT_SECONDS, DEFAULT_EMPTY_RESPONSE) - Auto port switching (tries 8000-8009) - Windows interactive mode (formatted banner on double-click launch) Build: cd go-server && go build -o local-mcp.exe . Run: ./local-mcp.exe Binary size: ~18 MB (vs Python ~60+ MB memory footprint) Startup: ~10 ms (vs Python ~1-2s)
321 lines
8.6 KiB
Go
321 lines
8.6 KiB
Go
// Package store contains all database access logic.
|
|
// This file handles instruction queue operations.
|
|
package store
|
|
|
|
import (
|
|
"database/sql"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/local-mcp/local-mcp-go/internal/models"
|
|
)
|
|
|
|
// WakeupSignal is an edge-triggered broadcast mechanism: closing the internal
|
|
// channel wakes all goroutines currently blocked on Chan(), then a new channel
|
|
// is installed for the next round of waiters. This mirrors asyncio.Event in
|
|
// the Python implementation.
|
|
type WakeupSignal struct {
|
|
mu sync.Mutex
|
|
ch chan struct{}
|
|
}
|
|
|
|
// NewWakeupSignal creates a ready-to-use WakeupSignal.
|
|
func NewWakeupSignal() *WakeupSignal {
|
|
return &WakeupSignal{ch: make(chan struct{})}
|
|
}
|
|
|
|
// Chan returns the current wait channel. Callers should capture the return
|
|
// value once and then select on it — do not call Chan() repeatedly.
|
|
func (w *WakeupSignal) Chan() <-chan struct{} {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
return w.ch
|
|
}
|
|
|
|
// Notify wakes all goroutines currently waiting on Chan() by closing the
|
|
// channel, then installs a fresh channel for future waiters.
|
|
func (w *WakeupSignal) Notify() {
|
|
w.mu.Lock()
|
|
old := w.ch
|
|
w.ch = make(chan struct{})
|
|
w.mu.Unlock()
|
|
close(old)
|
|
}
|
|
|
|
// AgentTracker manages per-agent generation counters so that stale
|
|
// coroutines cannot silently consume instructions intended for newer calls.
|
|
type AgentTracker struct {
|
|
mu sync.Mutex
|
|
generations map[string]uint64
|
|
}
|
|
|
|
// NewAgentTracker creates an AgentTracker ready for use.
|
|
func NewAgentTracker() *AgentTracker {
|
|
return &AgentTracker{generations: make(map[string]uint64)}
|
|
}
|
|
|
|
// NewGeneration increments and returns the current generation for agentID.
|
|
func (t *AgentTracker) NewGeneration(agentID string) uint64 {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
t.generations[agentID]++
|
|
return t.generations[agentID]
|
|
}
|
|
|
|
// IsActive returns true only if no newer call has arrived for agentID since
|
|
// this generation was issued.
|
|
func (t *AgentTracker) IsActive(agentID string, gen uint64) bool {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
return t.generations[agentID] == gen
|
|
}
|
|
|
|
// InstructionStore provides all instruction queue operations.
|
|
type InstructionStore struct {
|
|
db *sql.DB
|
|
wakeup *WakeupSignal
|
|
agents *AgentTracker
|
|
}
|
|
|
|
// NewInstructionStore creates a store backed by db.
|
|
func NewInstructionStore(db *sql.DB) *InstructionStore {
|
|
return &InstructionStore{
|
|
db: db,
|
|
wakeup: NewWakeupSignal(),
|
|
agents: NewAgentTracker(),
|
|
}
|
|
}
|
|
|
|
// Wakeup returns the shared wakeup signal.
|
|
func (s *InstructionStore) Wakeup() *WakeupSignal { return s.wakeup }
|
|
|
|
// Agents returns the shared agent tracker.
|
|
func (s *InstructionStore) Agents() *AgentTracker { return s.agents }
|
|
|
|
// List returns instructions filtered by status ("pending", "consumed", or "all").
|
|
func (s *InstructionStore) List(status string) ([]models.Instruction, error) {
|
|
var rows *sql.Rows
|
|
var err error
|
|
|
|
switch status {
|
|
case "pending", "consumed":
|
|
rows, err = s.db.Query(`
|
|
SELECT id, content, status, created_at, updated_at,
|
|
consumed_at, consumed_by_agent_id, position
|
|
FROM instructions
|
|
WHERE status = ?
|
|
ORDER BY position ASC, created_at ASC`, status)
|
|
default: // "all"
|
|
rows, err = s.db.Query(`
|
|
SELECT id, content, status, created_at, updated_at,
|
|
consumed_at, consumed_by_agent_id, position
|
|
FROM instructions
|
|
ORDER BY position ASC, created_at ASC`)
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("list instructions: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var items []models.Instruction
|
|
for rows.Next() {
|
|
it, err := scanInstruction(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, it)
|
|
}
|
|
return items, rows.Err()
|
|
}
|
|
|
|
// Create inserts a new pending instruction at the end of the queue.
|
|
func (s *InstructionStore) Create(content string) (*models.Instruction, error) {
|
|
id := uuid.New().String()
|
|
now := time.Now().UTC()
|
|
|
|
// Assign next position
|
|
var maxPos sql.NullInt64
|
|
_ = s.db.QueryRow(`SELECT MAX(position) FROM instructions WHERE status = 'pending'`).Scan(&maxPos)
|
|
position := int(maxPos.Int64) + 1
|
|
|
|
_, err := s.db.Exec(`
|
|
INSERT INTO instructions (id, content, status, created_at, updated_at, position)
|
|
VALUES (?, ?, 'pending', ?, ?, ?)`,
|
|
id, content, now.Format(time.RFC3339Nano), now.Format(time.RFC3339Nano), position)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create instruction: %w", err)
|
|
}
|
|
|
|
// Wake any waiting tool calls
|
|
s.wakeup.Notify()
|
|
|
|
return s.GetByID(id)
|
|
}
|
|
|
|
// Update edits a pending instruction's content. Returns the updated item or an
|
|
// error if the instruction is already consumed.
|
|
func (s *InstructionStore) Update(id, content string) (*models.Instruction, error) {
|
|
it, err := s.GetByID(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if it.Status == models.StatusConsumed {
|
|
return nil, ErrAlreadyConsumed
|
|
}
|
|
|
|
now := time.Now().UTC()
|
|
_, err = s.db.Exec(`UPDATE instructions SET content = ?, updated_at = ? WHERE id = ?`,
|
|
content, now.Format(time.RFC3339Nano), id)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("update instruction: %w", err)
|
|
}
|
|
return s.GetByID(id)
|
|
}
|
|
|
|
// Delete removes a pending instruction. Returns ErrAlreadyConsumed if the
|
|
// instruction has been delivered.
|
|
func (s *InstructionStore) Delete(id string) error {
|
|
it, err := s.GetByID(id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if it.Status == models.StatusConsumed {
|
|
return ErrAlreadyConsumed
|
|
}
|
|
_, err = s.db.Exec(`DELETE FROM instructions WHERE id = ?`, id)
|
|
return err
|
|
}
|
|
|
|
// DeleteConsumed removes all consumed instructions.
|
|
func (s *InstructionStore) DeleteConsumed() error {
|
|
_, err := s.db.Exec(`DELETE FROM instructions WHERE status = 'consumed'`)
|
|
return err
|
|
}
|
|
|
|
// GetByID returns a single instruction or ErrNotFound.
|
|
func (s *InstructionStore) GetByID(id string) (*models.Instruction, error) {
|
|
row := s.db.QueryRow(`
|
|
SELECT id, content, status, created_at, updated_at,
|
|
consumed_at, consumed_by_agent_id, position
|
|
FROM instructions WHERE id = ?`, id)
|
|
|
|
it, err := scanInstruction(row)
|
|
if err == sql.ErrNoRows {
|
|
return nil, ErrNotFound
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &it, nil
|
|
}
|
|
|
|
// ConsumeNext atomically claims the oldest pending instruction for agentID.
|
|
// Returns nil if the queue is empty.
|
|
func (s *InstructionStore) ConsumeNext(agentID string) (*models.Instruction, error) {
|
|
tx, err := s.db.Begin()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("begin transaction: %w", err)
|
|
}
|
|
defer func() { _ = tx.Rollback() }()
|
|
|
|
// Claim the oldest pending item with a row-level lock (SQLite uses file lock).
|
|
var id string
|
|
err = tx.QueryRow(`
|
|
SELECT id FROM instructions
|
|
WHERE status = 'pending'
|
|
ORDER BY position ASC, created_at ASC
|
|
LIMIT 1`).Scan(&id)
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil // queue empty
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("select next: %w", err)
|
|
}
|
|
|
|
now := time.Now().UTC()
|
|
_, err = tx.Exec(`
|
|
UPDATE instructions
|
|
SET status = 'consumed', consumed_at = ?, consumed_by_agent_id = ?, updated_at = ?
|
|
WHERE id = ? AND status = 'pending'`,
|
|
now.Format(time.RFC3339Nano), agentID, now.Format(time.RFC3339Nano), id)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("mark consumed: %w", err)
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return nil, fmt.Errorf("commit: %w", err)
|
|
}
|
|
|
|
return s.GetByID(id)
|
|
}
|
|
|
|
// Counts returns pending and consumed queue sizes.
|
|
func (s *InstructionStore) Counts() (models.QueueCounts, error) {
|
|
var c models.QueueCounts
|
|
rows, err := s.db.Query(`
|
|
SELECT status, COUNT(*) FROM instructions GROUP BY status`)
|
|
if err != nil {
|
|
return c, err
|
|
}
|
|
defer rows.Close()
|
|
for rows.Next() {
|
|
var status string
|
|
var n int
|
|
if err := rows.Scan(&status, &n); err != nil {
|
|
return c, err
|
|
}
|
|
switch status {
|
|
case "pending":
|
|
c.PendingCount = n
|
|
case "consumed":
|
|
c.ConsumedCount = n
|
|
}
|
|
}
|
|
return c, rows.Err()
|
|
}
|
|
|
|
// Sentinel errors returned by InstructionStore.
|
|
var (
|
|
ErrNotFound = fmt.Errorf("instruction not found")
|
|
ErrAlreadyConsumed = fmt.Errorf("instruction already consumed")
|
|
)
|
|
|
|
// scanner is satisfied by both *sql.Row and *sql.Rows.
|
|
type scanner interface {
|
|
Scan(dest ...any) error
|
|
}
|
|
|
|
func scanInstruction(r scanner) (models.Instruction, error) {
|
|
var it models.Instruction
|
|
var createdAtStr, updatedAtStr string
|
|
var consumedAtStr sql.NullString
|
|
var consumedByAgentID sql.NullString
|
|
|
|
err := r.Scan(
|
|
&it.ID, &it.Content, &it.Status,
|
|
&createdAtStr, &updatedAtStr,
|
|
&consumedAtStr, &consumedByAgentID,
|
|
&it.Position,
|
|
)
|
|
if err != nil {
|
|
return it, err
|
|
}
|
|
|
|
it.CreatedAt, _ = time.Parse(time.RFC3339Nano, createdAtStr)
|
|
it.UpdatedAt, _ = time.Parse(time.RFC3339Nano, updatedAtStr)
|
|
|
|
if consumedAtStr.Valid {
|
|
t, _ := time.Parse(time.RFC3339Nano, consumedAtStr.String)
|
|
it.ConsumedAt = &t
|
|
}
|
|
if consumedByAgentID.Valid {
|
|
s := consumedByAgentID.String
|
|
it.ConsumedByAgentID = &s
|
|
}
|
|
|
|
return it, nil
|
|
}
|
|
|