// Package store contains all database access logic. // This file handles instruction queue operations. package store import ( "database/sql" "fmt" "sync" "time" "github.com/google/uuid" "github.com/local-mcp/local-mcp-go/internal/models" ) // WakeupSignal is an edge-triggered broadcast mechanism: closing the internal // channel wakes all goroutines currently blocked on Chan(), then a new channel // is installed for the next round of waiters. This mirrors asyncio.Event in // the Python implementation. type WakeupSignal struct { mu sync.Mutex ch chan struct{} } // NewWakeupSignal creates a ready-to-use WakeupSignal. func NewWakeupSignal() *WakeupSignal { return &WakeupSignal{ch: make(chan struct{})} } // Chan returns the current wait channel. Callers should capture the return // value once and then select on it — do not call Chan() repeatedly. func (w *WakeupSignal) Chan() <-chan struct{} { w.mu.Lock() defer w.mu.Unlock() return w.ch } // Notify wakes all goroutines currently waiting on Chan() by closing the // channel, then installs a fresh channel for future waiters. func (w *WakeupSignal) Notify() { w.mu.Lock() old := w.ch w.ch = make(chan struct{}) w.mu.Unlock() close(old) } // AgentTracker manages per-agent generation counters so that stale // coroutines cannot silently consume instructions intended for newer calls. type AgentTracker struct { mu sync.Mutex generations map[string]uint64 } // NewAgentTracker creates an AgentTracker ready for use. func NewAgentTracker() *AgentTracker { return &AgentTracker{generations: make(map[string]uint64)} } // NewGeneration increments and returns the current generation for agentID. func (t *AgentTracker) NewGeneration(agentID string) uint64 { t.mu.Lock() defer t.mu.Unlock() t.generations[agentID]++ return t.generations[agentID] } // IsActive returns true only if no newer call has arrived for agentID since // this generation was issued. func (t *AgentTracker) IsActive(agentID string, gen uint64) bool { t.mu.Lock() defer t.mu.Unlock() return t.generations[agentID] == gen } // InstructionStore provides all instruction queue operations. type InstructionStore struct { db *sql.DB wakeup *WakeupSignal agents *AgentTracker } // NewInstructionStore creates a store backed by db. func NewInstructionStore(db *sql.DB) *InstructionStore { return &InstructionStore{ db: db, wakeup: NewWakeupSignal(), agents: NewAgentTracker(), } } // Wakeup returns the shared wakeup signal. func (s *InstructionStore) Wakeup() *WakeupSignal { return s.wakeup } // Agents returns the shared agent tracker. func (s *InstructionStore) Agents() *AgentTracker { return s.agents } // List returns instructions filtered by status ("pending", "consumed", or "all"). func (s *InstructionStore) List(status string) ([]models.Instruction, error) { var rows *sql.Rows var err error switch status { case "pending", "consumed": rows, err = s.db.Query(` SELECT id, content, status, created_at, updated_at, consumed_at, consumed_by_agent_id, position FROM instructions WHERE status = ? ORDER BY position ASC, created_at ASC`, status) default: // "all" rows, err = s.db.Query(` SELECT id, content, status, created_at, updated_at, consumed_at, consumed_by_agent_id, position FROM instructions ORDER BY position ASC, created_at ASC`) } if err != nil { return nil, fmt.Errorf("list instructions: %w", err) } defer rows.Close() var items []models.Instruction for rows.Next() { it, err := scanInstruction(rows) if err != nil { return nil, err } items = append(items, it) } return items, rows.Err() } // Create inserts a new pending instruction at the end of the queue. func (s *InstructionStore) Create(content string) (*models.Instruction, error) { id := uuid.New().String() now := time.Now().UTC() // Assign next position var maxPos sql.NullInt64 _ = s.db.QueryRow(`SELECT MAX(position) FROM instructions WHERE status = 'pending'`).Scan(&maxPos) position := int(maxPos.Int64) + 1 _, err := s.db.Exec(` INSERT INTO instructions (id, content, status, created_at, updated_at, position) VALUES (?, ?, 'pending', ?, ?, ?)`, id, content, now.Format(time.RFC3339Nano), now.Format(time.RFC3339Nano), position) if err != nil { return nil, fmt.Errorf("create instruction: %w", err) } // Wake any waiting tool calls s.wakeup.Notify() return s.GetByID(id) } // Update edits a pending instruction's content. Returns the updated item or an // error if the instruction is already consumed. func (s *InstructionStore) Update(id, content string) (*models.Instruction, error) { it, err := s.GetByID(id) if err != nil { return nil, err } if it.Status == models.StatusConsumed { return nil, ErrAlreadyConsumed } now := time.Now().UTC() _, err = s.db.Exec(`UPDATE instructions SET content = ?, updated_at = ? WHERE id = ?`, content, now.Format(time.RFC3339Nano), id) if err != nil { return nil, fmt.Errorf("update instruction: %w", err) } return s.GetByID(id) } // Delete removes a pending instruction. Returns ErrAlreadyConsumed if the // instruction has been delivered. func (s *InstructionStore) Delete(id string) error { it, err := s.GetByID(id) if err != nil { return err } if it.Status == models.StatusConsumed { return ErrAlreadyConsumed } _, err = s.db.Exec(`DELETE FROM instructions WHERE id = ?`, id) return err } // DeleteConsumed removes all consumed instructions. func (s *InstructionStore) DeleteConsumed() error { _, err := s.db.Exec(`DELETE FROM instructions WHERE status = 'consumed'`) return err } // GetByID returns a single instruction or ErrNotFound. func (s *InstructionStore) GetByID(id string) (*models.Instruction, error) { row := s.db.QueryRow(` SELECT id, content, status, created_at, updated_at, consumed_at, consumed_by_agent_id, position FROM instructions WHERE id = ?`, id) it, err := scanInstruction(row) if err == sql.ErrNoRows { return nil, ErrNotFound } if err != nil { return nil, err } return &it, nil } // ConsumeNext atomically claims the oldest pending instruction for agentID. // Returns nil if the queue is empty. func (s *InstructionStore) ConsumeNext(agentID string) (*models.Instruction, error) { tx, err := s.db.Begin() if err != nil { return nil, fmt.Errorf("begin transaction: %w", err) } defer func() { _ = tx.Rollback() }() // Claim the oldest pending item with a row-level lock (SQLite uses file lock). var id string err = tx.QueryRow(` SELECT id FROM instructions WHERE status = 'pending' ORDER BY position ASC, created_at ASC LIMIT 1`).Scan(&id) if err == sql.ErrNoRows { return nil, nil // queue empty } if err != nil { return nil, fmt.Errorf("select next: %w", err) } now := time.Now().UTC() _, err = tx.Exec(` UPDATE instructions SET status = 'consumed', consumed_at = ?, consumed_by_agent_id = ?, updated_at = ? WHERE id = ? AND status = 'pending'`, now.Format(time.RFC3339Nano), agentID, now.Format(time.RFC3339Nano), id) if err != nil { return nil, fmt.Errorf("mark consumed: %w", err) } if err := tx.Commit(); err != nil { return nil, fmt.Errorf("commit: %w", err) } return s.GetByID(id) } // Counts returns pending and consumed queue sizes. func (s *InstructionStore) Counts() (models.QueueCounts, error) { var c models.QueueCounts rows, err := s.db.Query(` SELECT status, COUNT(*) FROM instructions GROUP BY status`) if err != nil { return c, err } defer rows.Close() for rows.Next() { var status string var n int if err := rows.Scan(&status, &n); err != nil { return c, err } switch status { case "pending": c.PendingCount = n case "consumed": c.ConsumedCount = n } } return c, rows.Err() } // Sentinel errors returned by InstructionStore. var ( ErrNotFound = fmt.Errorf("instruction not found") ErrAlreadyConsumed = fmt.Errorf("instruction already consumed") ) // scanner is satisfied by both *sql.Row and *sql.Rows. type scanner interface { Scan(dest ...any) error } func scanInstruction(r scanner) (models.Instruction, error) { var it models.Instruction var createdAtStr, updatedAtStr string var consumedAtStr sql.NullString var consumedByAgentID sql.NullString err := r.Scan( &it.ID, &it.Content, &it.Status, &createdAtStr, &updatedAtStr, &consumedAtStr, &consumedByAgentID, &it.Position, ) if err != nil { return it, err } it.CreatedAt, _ = time.Parse(time.RFC3339Nano, createdAtStr) it.UpdatedAt, _ = time.Parse(time.RFC3339Nano, updatedAtStr) if consumedAtStr.Valid { t, _ := time.Parse(time.RFC3339Nano, consumedAtStr.String) it.ConsumedAt = &t } if consumedByAgentID.Valid { s := consumedByAgentID.String it.ConsumedByAgentID = &s } return it, nil }