package telegram import ( "context" "database/sql" "encoding/json" "errors" "fmt" "log" "mime" "os" "path/filepath" "strconv" "strings" "sync" "time" "codex-telegram-bot/internal/codexapp" "codex-telegram-bot/internal/store" ) const ( telegramDownloadLimit = 20 * 1024 * 1024 resumeThreadPageSize = 8 commandSummaryLimit = 120 ) type Bot struct { tg *Client store *store.Store codex *codexapp.Client logger *log.Logger uploadDir string defaultModel string defaultSandbox string pollTimeout time.Duration mu sync.Mutex outputs map[string]*outputState diffs map[string]string } type outputState struct { chatID int64 assistant strings.Builder sentAny bool } type codexThreadItemView struct { Type string `json:"type"` ID string `json:"id"` Command string `json:"command"` CWD string `json:"cwd"` Status string `json:"status"` Tool string `json:"tool"` Server string `json:"server"` Namespace string `json:"namespace"` Query string `json:"query"` Path string `json:"path"` SavedPath string `json:"savedPath"` Text string `json:"text"` AggregatedOutput *string `json:"aggregatedOutput"` ExitCode *int `json:"exitCode"` DurationMs *int64 `json:"durationMs"` Success *bool `json:"success"` Arguments json.RawMessage `json:"arguments"` Changes []struct { Path string `json:"path"` } `json:"changes"` } func NewBot(tg *Client, st *store.Store, codex *codexapp.Client, uploadDir, defaultModel, defaultSandbox string, pollTimeout time.Duration, logger *log.Logger) *Bot { if logger == nil { logger = log.Default() } return &Bot{ tg: tg, store: st, codex: codex, logger: logger, uploadDir: uploadDir, defaultModel: defaultModel, defaultSandbox: defaultSandbox, pollTimeout: pollTimeout, outputs: make(map[string]*outputState), diffs: make(map[string]string), } } func (b *Bot) Run(ctx context.Context) error { if err := b.store.ClearActiveTurns(ctx); err != nil { return err } go b.handleCodexEvents(ctx) offset := 0 for ctx.Err() == nil { updates, err := b.tg.GetUpdates(ctx, offset, int(b.pollTimeout.Seconds())) if err != nil { if ctx.Err() != nil { break } b.logger.Printf("getUpdates: %v", err) time.Sleep(2 * time.Second) continue } for _, update := range updates { if update.UpdateID >= offset { offset = update.UpdateID + 1 } if err := b.handleUpdate(ctx, update); err != nil { b.logger.Printf("handle update %d: %v", update.UpdateID, err) } } } return ctx.Err() } func (b *Bot) handleUpdate(ctx context.Context, update Update) error { switch { case update.Message != nil: return b.handleMessage(ctx, update.Message) case update.CallbackQuery != nil: return b.handleCallback(ctx, update.CallbackQuery) default: return nil } } func (b *Bot) handleMessage(ctx context.Context, message *Message) error { if message.PinnedMessage != nil { return nil } if message.Chat.Type != "private" { if message.From != nil { _ = b.store.Audit(ctx, message.From.ID, "reject_non_private", message.Chat.Type) } _, err := b.tg.SendMessage(ctx, message.Chat.ID, "This bot only supports one-to-one chats.", SendMessageOptions{}) return err } if message.From == nil { return nil } allowed, err := b.store.IsAllowed(ctx, message.From.ID) if err != nil { return err } if !allowed { _ = b.store.Audit(ctx, message.From.ID, "reject_not_allowed", message.From.Username) _, err := b.tg.SendMessage(ctx, message.Chat.ID, "Access denied.", SendMessageOptions{}) return err } session, err := b.store.GetOrCreateSession(ctx, message.From.ID, b.defaultModel, b.defaultSandbox) if err != nil { return err } if handled, err := b.handleCommand(ctx, message, session); handled || err != nil { return err } return b.continueThread(ctx, message, session) } func (b *Bot) handleCommand(ctx context.Context, message *Message, session store.Session) (bool, error) { command, args, ok := parseCommand(message.Text) if !ok { return false, nil } userID := message.From.ID chatID := message.Chat.ID switch command { case "start", "help": return true, b.sendHelp(ctx, chatID) case "new": _, _, err := b.createNewThread(ctx, userID, chatID, session) return true, err case "thread", "threads": return true, b.sendThreads(ctx, userID, chatID) case "resume": return true, b.resumeThread(ctx, userID, chatID, args) case "rename": return true, b.renameThread(ctx, userID, chatID, session, args) case "fork": return true, b.forkThread(ctx, userID, chatID, session) case "archive": return true, b.archiveThread(ctx, userID, chatID, session, args) case "status": return true, b.sendStatus(ctx, userID, chatID, session) case "cancel": return true, b.cancelTurn(ctx, userID, chatID, session) case "workspaces": return true, b.sendWorkspaces(ctx, chatID) case "workspace": return true, b.handleWorkspaceCommand(ctx, userID, chatID, session, args) case "model": return true, b.handleModelCommand(ctx, userID, chatID, session, args) case "sandbox": return true, b.handleSandboxCommand(ctx, userID, chatID, session, args) case "diff": return true, b.sendDiff(ctx, chatID, session) default: _, err := b.tg.SendMessage(ctx, chatID, "Unknown command. Use /help.", SendMessageOptions{}) return true, err } } func (b *Bot) sendHelp(ctx context.Context, chatID int64) error { text := strings.Join([]string{ "Codex Telegram Bot", "", "/new - start a new Codex thread", "/thread or /threads - list recent threads", "/resume - choose a recent thread", "/resume ID - resume a thread", "/rename TITLE or /rename ID TITLE - rename a thread", "/fork - fork the active thread", "/archive [ID] - archive a thread", "/status - show active settings", "/cancel - interrupt the active turn", "/workspaces - list workspaces", "/workspace [ID] - select workspace", "/model - choose model and reasoning effort", "/sandbox [read-only|workspace-write|danger-full-access] - show or set sandbox", "/diff - show the latest streamed diff", "", "Plain text continues the active thread. Images are staged as local Codex image inputs; other files are staged and sent as paths.", }, "\n") return b.sendLong(ctx, chatID, text) } func (b *Bot) sendThreads(ctx context.Context, userID, chatID int64) error { return b.sendResumeChoices(ctx, userID, chatID, 0, 0) } func (b *Bot) resumeThread(ctx context.Context, userID, chatID int64, args []string) error { if len(args) == 0 { return b.sendResumeChoices(ctx, userID, chatID, 0, 0) } if len(args) != 1 { _, err := b.tg.SendMessage(ctx, chatID, "Use /resume to choose a thread, or /resume ID to resume directly.", SendMessageOptions{}) return err } id, err := strconv.ParseInt(args[0], 10, 64) if err != nil { _, sendErr := b.tg.SendMessage(ctx, chatID, "Thread ID must be a number.", SendMessageOptions{}) return sendErr } return b.resumeThreadByID(ctx, userID, chatID, id, 0) } func (b *Bot) sendResumeChoices(ctx context.Context, userID, chatID int64, page int, messageID int) error { if page < 0 { page = 0 } threads, err := b.store.ListThreadsPage(ctx, userID, false, resumeThreadPageSize+1, page*resumeThreadPageSize) if err != nil { return err } if len(threads) == 0 && page > 0 { return b.sendResumeChoices(ctx, userID, chatID, page-1, messageID) } if len(threads) == 0 { text := "No threads yet. Use /new." if messageID != 0 { _, err := b.tg.EditMessageText(ctx, chatID, messageID, text, EditMessageTextOptions{}) return err } _, err := b.tg.SendMessage(ctx, chatID, text, SendMessageOptions{}) return err } threads = b.syncThreadTitles(ctx, threads) hasNext := len(threads) > resumeThreadPageSize if hasNext { threads = threads[:resumeThreadPageSize] } text := resumeThreadListText(threads, page) markup := resumeThreadMarkup(threads, page, hasNext) if messageID != 0 { _, err := b.tg.EditMessageText(ctx, chatID, messageID, EscapeHTML(text), EditMessageTextOptions{ParseMode: "HTML", ReplyMarkup: markup}) return err } _, err = b.tg.SendMessage(ctx, chatID, EscapeHTML(text), SendMessageOptions{ParseMode: "HTML", ReplyMarkup: markup}) return err } func (b *Bot) resumeThreadByID(ctx context.Context, userID, chatID int64, id int64, messageID int) error { thread, err := b.store.GetThreadByID(ctx, userID, id) if err != nil { if errors.Is(err, sql.ErrNoRows) { text := "Thread not found." if messageID != 0 { _, editErr := b.tg.EditMessageText(ctx, chatID, messageID, text, EditMessageTextOptions{}) return editErr } _, sendErr := b.tg.SendMessage(ctx, chatID, text, SendMessageOptions{}) return sendErr } return err } resumed, err := b.codex.ResumeThread(ctx, thread.CodexThreadID) if err != nil { return b.sendError(ctx, chatID, "Could not resume Codex thread", err) } thread, err = b.applyCodexThreadTitle(ctx, thread, resumed) if err != nil { return err } if err := b.store.SetActiveThread(ctx, userID, thread.ID); err != nil { return err } text := fmt.Sprintf("Active thread ID %d: %s", thread.ID, threadDisplayTitle(thread)) if messageID != 0 { _, err = b.tg.EditMessageText(ctx, chatID, messageID, EscapeHTML(text), EditMessageTextOptions{ParseMode: "HTML"}) return err } _, err = b.tg.SendMessage(ctx, chatID, EscapeHTML(text), SendMessageOptions{ParseMode: "HTML"}) return err } func (b *Bot) renameThread(ctx context.Context, userID, chatID int64, session store.Session, args []string) error { if len(args) == 0 { _, err := b.tg.SendMessage(ctx, chatID, "Use /rename TITLE for the active thread, or /rename THREAD_ID TITLE.", SendMessageOptions{}) return err } var thread store.Thread titleArgs := args var err error if len(args) > 1 { if id, parseErr := strconv.ParseInt(args[0], 10, 64); parseErr == nil { thread, err = b.store.GetThreadByID(ctx, userID, id) if err != nil { if errors.Is(err, sql.ErrNoRows) { _, sendErr := b.tg.SendMessage(ctx, chatID, "Thread not found.", SendMessageOptions{}) return sendErr } return err } titleArgs = args[1:] } } if thread.ID == 0 { thread, err = b.activeThread(ctx, userID, session) } if err != nil { return b.sendNoActiveThread(ctx, chatID, err) } title := normalizeThreadTitle(strings.Join(titleArgs, " ")) if title == "" { _, err := b.tg.SendMessage(ctx, chatID, "Thread title cannot be empty.", SendMessageOptions{}) return err } if err := b.codex.SetThreadName(ctx, thread.CodexThreadID, title); err != nil { return b.sendError(ctx, chatID, "Could not rename Codex thread", err) } if codexThread, readErr := b.codex.ReadThread(ctx, thread.CodexThreadID); readErr == nil { thread, err = b.applyCodexThreadTitle(ctx, thread, codexThread) if err != nil { return err } title = thread.Title } else { b.logger.Printf("read renamed thread %s: %v", thread.CodexThreadID, readErr) if err := b.store.SyncThreadTitle(ctx, userID, thread.ID, title); err != nil { return err } } _, err = b.tg.SendMessage(ctx, chatID, fmt.Sprintf("Renamed thread ID %d: %s", thread.ID, title), SendMessageOptions{}) return err } func (b *Bot) forkThread(ctx context.Context, userID, chatID int64, session store.Session) error { thread, err := b.activeThread(ctx, userID, session) if err != nil { return b.sendNoActiveThread(ctx, chatID, err) } forked, err := b.codex.ForkThread(ctx, thread.CodexThreadID) if err != nil { return b.sendError(ctx, chatID, "Could not fork Codex thread", err) } title := codexThreadTitle(forked, "fork of ID "+strconv.FormatInt(thread.ID, 10)) local, err := b.store.CreateThread(ctx, userID, forked.ID, thread.WorkspaceID, title) if err != nil { return err } if err := b.store.SetActiveThread(ctx, userID, local.ID); err != nil { return err } _, err = b.tg.SendMessage(ctx, chatID, fmt.Sprintf("Forked active thread to #%d.", local.ID), SendMessageOptions{}) return err } func (b *Bot) archiveThread(ctx context.Context, userID, chatID int64, session store.Session, args []string) error { var thread store.Thread var err error if len(args) > 0 { id, parseErr := strconv.ParseInt(args[0], 10, 64) if parseErr != nil { _, sendErr := b.tg.SendMessage(ctx, chatID, "Thread ID must be a number.", SendMessageOptions{}) return sendErr } thread, err = b.store.GetThreadByID(ctx, userID, id) } else { thread, err = b.activeThread(ctx, userID, session) } if err != nil { return b.sendNoActiveThread(ctx, chatID, err) } if err := b.codex.ArchiveThread(ctx, thread.CodexThreadID); err != nil { return b.sendError(ctx, chatID, "Could not archive Codex thread", err) } if err := b.store.ArchiveThread(ctx, userID, thread.ID); err != nil { return err } _, err = b.tg.SendMessage(ctx, chatID, fmt.Sprintf("Archived thread #%d.", thread.ID), SendMessageOptions{}) return err } func (b *Bot) sendStatus(ctx context.Context, userID, chatID int64, session store.Session) error { workspace := "(none)" if session.ActiveWorkspaceID != 0 { if ws, err := b.store.GetWorkspaceByID(ctx, session.ActiveWorkspaceID); err == nil { workspace = fmt.Sprintf("%s (%s)", ws.Label, ws.Path) } } thread := "(none)" if session.ActiveThreadID != 0 { thread = fmt.Sprintf("ID %d", session.ActiveThreadID) if active, err := b.store.GetThreadByID(ctx, userID, session.ActiveThreadID); err == nil { if synced, syncErr := b.syncThreadTitle(ctx, active); syncErr == nil { active = synced } else { b.logger.Printf("sync status thread title %s: %v", active.CodexThreadID, syncErr) } thread = fmt.Sprintf("ID %d: %s", active.ID, threadDisplayTitle(active)) } } model := session.Model if model == "" { model = "(Codex default)" } turn := session.ActiveTurnID if turn == "" { turn = "(none)" } effort := session.ReasoningEffort if effort == "" { effort = "(model default)" } text := fmt.Sprintf("Workspace: %s\nThread: %s\nModel: %s\nReasoning effort: %s\nSandbox: %s\nActive turn: %s", workspace, thread, model, effort, session.Sandbox, turn) _ = userID return b.sendLong(ctx, chatID, text) } func (b *Bot) cancelTurn(ctx context.Context, userID, chatID int64, session store.Session) error { if session.ActiveTurnID == "" { _, err := b.tg.SendMessage(ctx, chatID, "No active turn to cancel.", SendMessageOptions{}) return err } thread, err := b.activeThread(ctx, userID, session) if err != nil { return b.sendNoActiveThread(ctx, chatID, err) } if err := b.codex.InterruptTurn(ctx, thread.CodexThreadID, session.ActiveTurnID); err != nil { return b.sendError(ctx, chatID, "Could not interrupt turn", err) } _, err = b.tg.SendMessage(ctx, chatID, "Cancellation requested.", SendMessageOptions{}) return err } func (b *Bot) sendWorkspaces(ctx context.Context, chatID int64) error { workspaces, err := b.store.ListWorkspaces(ctx) if err != nil { return err } if len(workspaces) == 0 { _, err := b.tg.SendMessage(ctx, chatID, "No workspaces configured. Ask an admin to run scripts/add-workspace.", SendMessageOptions{}) return err } var lines []string lines = append(lines, "Workspaces:") for _, ws := range workspaces { marker := "" if ws.IsDefault { marker = " default" } lines = append(lines, fmt.Sprintf("#%d %s%s\n%s", ws.ID, ws.Label, marker, ws.Path)) } _, err = b.tg.SendMessage(ctx, chatID, EscapeHTML(strings.Join(lines, "\n")), SendMessageOptions{ ParseMode: "HTML", ReplyMarkup: workspaceMarkup(workspaces), }) return err } func (b *Bot) handleWorkspaceCommand(ctx context.Context, userID, chatID int64, session store.Session, args []string) error { if len(args) == 0 { return b.sendWorkspaces(ctx, chatID) } id, err := strconv.ParseInt(args[0], 10, 64) if err != nil { _, sendErr := b.tg.SendMessage(ctx, chatID, "Workspace ID must be a number.", SendMessageOptions{}) return sendErr } return b.setWorkspace(ctx, userID, chatID, session, id) } func (b *Bot) setWorkspace(ctx context.Context, userID, chatID int64, session store.Session, workspaceID int64) error { workspace, err := b.store.GetWorkspaceByID(ctx, workspaceID) if err != nil { if errors.Is(err, sql.ErrNoRows) { _, sendErr := b.tg.SendMessage(ctx, chatID, "Workspace not found.", SendMessageOptions{}) return sendErr } return err } if err := b.store.SetSessionWorkspace(ctx, userID, workspace.ID); err != nil { return err } _ = session _, err = b.tg.SendMessage(ctx, chatID, "Workspace set to "+workspace.Label+".", SendMessageOptions{}) return err } func (b *Bot) handleModelCommand(ctx context.Context, userID, chatID int64, session store.Session, args []string) error { _ = userID if len(args) > 0 { _, err := b.tg.SendMessage(ctx, chatID, "Use /model and choose from the buttons.", SendMessageOptions{}) return err } return b.sendModelChoices(ctx, chatID, session) } func (b *Bot) sendModelChoices(ctx context.Context, chatID int64, session store.Session) error { models, err := b.codex.ListModels(ctx) if err != nil { return b.sendError(ctx, chatID, "Could not list Codex models", err) } if len(models) == 0 { _, err := b.tg.SendMessage(ctx, chatID, "Codex returned no available models.", SendMessageOptions{}) return err } model := sessionModelLabel(models, session.Model) text := settingsStatusText(model, session.ReasoningEffort) + "\nChoose a model:" message, err := b.tg.SendMessage(ctx, chatID, EscapeHTML(text), SendMessageOptions{ ParseMode: "HTML", ReplyMarkup: modelMarkup(models), }) if err != nil { return err } b.rememberSettingsMessage(ctx, session.TelegramUserID, chatID, message.MessageID) return nil } func (b *Bot) handleModelCallback(ctx context.Context, callback *CallbackQuery, modelID string) error { models, err := b.codex.ListModels(ctx) if err != nil { _ = b.tg.AnswerCallbackQuery(ctx, callback.ID, "Could not list models.") return b.sendError(ctx, callback.Message.Chat.ID, "Could not list Codex models", err) } var selected codexapp.Model found := false for _, model := range models { if model.ID == modelID { selected = model found = true break } } if !found { return b.tg.AnswerCallbackQuery(ctx, callback.ID, "Model is no longer available.") } if err := b.store.SetSessionModel(ctx, callback.From.ID, selected.ID); err != nil { return err } if err := b.tg.AnswerCallbackQuery(ctx, callback.ID, "Model selected."); err != nil { return err } label := modelLabel(selected) if len(selected.SupportedReasoningEfforts) == 0 { text := settingsStatusText(label, "") + "\nNo reasoning effort choices are advertised for this model." message, err := b.tg.EditMessageText(ctx, callback.Message.Chat.ID, callback.Message.MessageID, EscapeHTML(text), EditMessageTextOptions{ParseMode: "HTML"}) if err == nil { b.rememberSettingsMessage(ctx, callback.From.ID, callback.Message.Chat.ID, message.MessageID) } return err } text := settingsStatusText(label, "") + "\nChoose reasoning effort:" message, err := b.tg.EditMessageText(ctx, callback.Message.Chat.ID, callback.Message.MessageID, EscapeHTML(text), EditMessageTextOptions{ ParseMode: "HTML", ReplyMarkup: effortMarkup(selected), }) if err == nil { b.rememberSettingsMessage(ctx, callback.From.ID, callback.Message.Chat.ID, message.MessageID) } return err } func (b *Bot) handleEffortCallback(ctx context.Context, callback *CallbackQuery, effort string) error { session, err := b.store.GetOrCreateSession(ctx, callback.From.ID, b.defaultModel, b.defaultSandbox) if err != nil { return err } if session.Model == "" { return b.tg.AnswerCallbackQuery(ctx, callback.ID, "Choose a model first.") } models, err := b.codex.ListModels(ctx) if err != nil { _ = b.tg.AnswerCallbackQuery(ctx, callback.ID, "Could not validate effort.") return b.sendError(ctx, callback.Message.Chat.ID, "Could not list Codex models", err) } var selected codexapp.Model found := false for _, model := range models { if model.ID == session.Model { selected = model found = true break } } if !found { return b.tg.AnswerCallbackQuery(ctx, callback.ID, "Selected model is no longer available.") } if !modelSupportsEffort(selected, effort) { return b.tg.AnswerCallbackQuery(ctx, callback.ID, "Effort is no longer available for this model.") } if err := b.store.SetSessionReasoningEffort(ctx, callback.From.ID, effort); err != nil { return err } if err := b.tg.AnswerCallbackQuery(ctx, callback.ID, "Reasoning effort selected."); err != nil { return err } text := settingsStatusText(modelLabel(selected), effort) message, err := b.tg.EditMessageText(ctx, callback.Message.Chat.ID, callback.Message.MessageID, EscapeHTML(text), EditMessageTextOptions{ParseMode: "HTML"}) if err == nil { b.rememberSettingsMessage(ctx, callback.From.ID, callback.Message.Chat.ID, message.MessageID) } return err } func (b *Bot) handleSandboxCommand(ctx context.Context, userID, chatID int64, session store.Session, args []string) error { if len(args) == 0 { _, err := b.tg.SendMessage(ctx, chatID, "Current sandbox: "+session.Sandbox, SendMessageOptions{}) return err } sandbox, err := codexapp.NormalizeSandbox(args[0]) if err != nil { _, sendErr := b.tg.SendMessage(ctx, chatID, "Use one of: read-only, workspace-write, danger-full-access.", SendMessageOptions{}) return sendErr } if err := b.store.SetSessionSandbox(ctx, userID, sandbox); err != nil { return err } _, err = b.tg.SendMessage(ctx, chatID, "Sandbox set to "+sandbox+".", SendMessageOptions{}) return err } func (b *Bot) sendDiff(ctx context.Context, chatID int64, session store.Session) error { if session.ActiveThreadID == 0 { _, err := b.tg.SendMessage(ctx, chatID, "No active thread.", SendMessageOptions{}) return err } thread, err := b.store.GetThreadByID(ctx, chatID, session.ActiveThreadID) if err != nil { _, sendErr := b.tg.SendMessage(ctx, chatID, "No active thread.", SendMessageOptions{}) return sendErr } b.mu.Lock() diff := b.diffs[thread.CodexThreadID] b.mu.Unlock() if diff == "" { _, err := b.tg.SendMessage(ctx, chatID, "No diff has been streamed for this thread.", SendMessageOptions{}) return err } return b.sendLong(ctx, chatID, diff) } func (b *Bot) continueThread(ctx context.Context, message *Message, session store.Session) error { userID := message.From.ID chatID := message.Chat.ID input, err := b.messageInput(ctx, userID, message) if err != nil { return b.sendError(ctx, chatID, "Could not stage Telegram input", err) } if len(input) == 0 { _, err := b.tg.SendMessage(ctx, chatID, "Send text, an image, or a document.", SendMessageOptions{}) return err } thread, workspace, err := b.ensureThread(ctx, userID, chatID, session) if err != nil { return err } if session.ActiveTurnID != "" { if err := b.codex.SteerTurn(ctx, thread.CodexThreadID, session.ActiveTurnID, input); err != nil { return b.sendError(ctx, chatID, "Could not append to active turn", err) } _, err := b.tg.SendMessage(ctx, chatID, "Added to the running turn.", SendMessageOptions{}) return err } b.registerOutput(thread.CodexThreadID, chatID) turn, err := b.codex.StartTurn(ctx, thread.CodexThreadID, workspace.Path, session.Model, session.ReasoningEffort, session.Sandbox, input) if err != nil { b.clearOutput(thread.CodexThreadID) return b.sendError(ctx, chatID, "Codex turn failed", err) } if err := b.store.SetActiveTurn(ctx, userID, turn.ID); err != nil { return err } _ = b.store.TouchThread(ctx, thread.CodexThreadID) return nil } func codexThreadTitle(thread codexapp.Thread, fallback string) string { if title := normalizeThreadTitle(thread.Name); title != "" { return title } if title := normalizeThreadTitle(thread.Preview); title != "" { return title } return normalizeThreadTitle(fallback) } func (b *Bot) applyCodexThreadTitle(ctx context.Context, thread store.Thread, codexThread codexapp.Thread) (store.Thread, error) { title := codexThreadTitle(codexThread, "") if title == thread.Title { return thread, nil } if err := b.store.SyncThreadTitle(ctx, thread.TelegramUserID, thread.ID, title); err != nil { return thread, err } thread.Title = title return thread, nil } func (b *Bot) syncThreadTitle(ctx context.Context, thread store.Thread) (store.Thread, error) { if thread.CodexThreadID == "" { return thread, nil } codexThread, err := b.codex.ReadThread(ctx, thread.CodexThreadID) if err != nil { return thread, err } return b.applyCodexThreadTitle(ctx, thread, codexThread) } func (b *Bot) syncThreadTitles(ctx context.Context, threads []store.Thread) []store.Thread { for i := range threads { synced, err := b.syncThreadTitle(ctx, threads[i]) if err != nil { b.logger.Printf("sync thread title %s: %v", threads[i].CodexThreadID, err) continue } threads[i] = synced } return threads } func (b *Bot) createNewThread(ctx context.Context, userID, chatID int64, session store.Session) (store.Thread, store.Workspace, error) { workspace, err := b.resolveWorkspace(ctx, userID, session) if err != nil { return store.Thread{}, store.Workspace{}, b.sendWorkspaceMissing(ctx, chatID) } codexThread, err := b.codex.StartThread(ctx, workspace.Path, session.Model, session.Sandbox) if err != nil { return store.Thread{}, store.Workspace{}, b.sendError(ctx, chatID, "Could not start Codex thread", err) } title := codexThreadTitle(codexThread, workspace.Label) thread, err := b.store.CreateThread(ctx, userID, codexThread.ID, workspace.ID, title) if err != nil { return store.Thread{}, store.Workspace{}, err } if err := b.store.SetActiveThread(ctx, userID, thread.ID); err != nil { return store.Thread{}, store.Workspace{}, err } _, err = b.tg.SendMessage(ctx, chatID, fmt.Sprintf("New thread #%d in %s.", thread.ID, workspace.Label), SendMessageOptions{}) return thread, workspace, err } func (b *Bot) ensureThread(ctx context.Context, userID, chatID int64, session store.Session) (store.Thread, store.Workspace, error) { if session.ActiveThreadID != 0 { thread, err := b.store.GetThreadByID(ctx, userID, session.ActiveThreadID) if err == nil && !thread.Archived { if synced, syncErr := b.syncThreadTitle(ctx, thread); syncErr == nil { thread = synced } else { b.logger.Printf("sync active thread title %s: %v", thread.CodexThreadID, syncErr) } workspace, err := b.store.GetWorkspaceByID(ctx, thread.WorkspaceID) return thread, workspace, err } } return b.createNewThread(ctx, userID, chatID, session) } func (b *Bot) activeThread(ctx context.Context, userID int64, session store.Session) (store.Thread, error) { if session.ActiveThreadID == 0 { return store.Thread{}, sql.ErrNoRows } return b.store.GetThreadByID(ctx, userID, session.ActiveThreadID) } func (b *Bot) resolveWorkspace(ctx context.Context, userID int64, session store.Session) (store.Workspace, error) { if session.ActiveWorkspaceID != 0 { return b.store.GetWorkspaceByID(ctx, session.ActiveWorkspaceID) } workspace, err := b.store.DefaultWorkspace(ctx) if err != nil { return store.Workspace{}, err } if err := b.store.SetSessionWorkspace(ctx, userID, workspace.ID); err != nil { return store.Workspace{}, err } return workspace, nil } func (b *Bot) messageInput(ctx context.Context, userID int64, message *Message) ([]codexapp.InputItem, error) { var input []codexapp.InputItem text := strings.TrimSpace(message.Text) if text == "" { text = strings.TrimSpace(message.Caption) } if text != "" { input = append(input, codexapp.InputItem{Type: "text", Text: text}) } if len(message.Photo) > 0 { photo := largestPhoto(message.Photo) path, err := b.stageFile(ctx, userID, photo.FileID, "photo.jpg", "image/jpeg", photo.FileSize) if err != nil { return nil, err } input = append(input, codexapp.InputItem{Type: "localImage", Path: path}) } if message.Document != nil { path, err := b.stageFile(ctx, userID, message.Document.FileID, message.Document.FileName, message.Document.MimeType, message.Document.FileSize) if err != nil { return nil, err } if isImage(message.Document.MimeType, message.Document.FileName) { input = append(input, codexapp.InputItem{Type: "localImage", Path: path}) } else { input = append(input, codexapp.InputItem{Type: "text", Text: "User uploaded a file staged at: " + path}) } } return input, nil } func (b *Bot) stageFile(ctx context.Context, userID int64, fileID, filename, mimeType string, size int64) (string, error) { if size > telegramDownloadLimit { return "", fmt.Errorf("file exceeds Telegram cloud download limit of 20 MB") } file, err := b.tg.GetFile(ctx, fileID) if err != nil { return "", err } if file.FileSize > telegramDownloadLimit { return "", fmt.Errorf("file exceeds Telegram cloud download limit of 20 MB") } if file.FilePath == "" { return "", errors.New("telegram did not return a file path") } data, err := b.tg.DownloadFilePath(ctx, file.FilePath) if err != nil { return "", err } if len(data) > telegramDownloadLimit { return "", fmt.Errorf("file exceeds Telegram cloud download limit of 20 MB") } if filename == "" { filename = filepath.Base(file.FilePath) } if filename == "." || filename == string(filepath.Separator) || filename == "" { filename = fileID if ext := extensionForMime(mimeType); ext != "" { filename += ext } } if err := os.MkdirAll(b.uploadDir, 0o755); err != nil { return "", err } name := fmt.Sprintf("%d_%d_%s", userID, time.Now().UnixNano(), safeFilename(filename)) path := filepath.Join(b.uploadDir, name) if err := os.WriteFile(path, data, 0o644); err != nil { return "", err } return path, nil } func (b *Bot) handleCallback(ctx context.Context, callback *CallbackQuery) error { if callback.Message == nil { return b.tg.AnswerCallbackQuery(ctx, callback.ID, "Unsupported callback.") } if callback.Message.Chat.Type != "private" { return b.tg.AnswerCallbackQuery(ctx, callback.ID, "Use a private chat.") } allowed, err := b.store.IsAllowed(ctx, callback.From.ID) if err != nil { return err } if !allowed { _ = b.store.Audit(ctx, callback.From.ID, "reject_callback_not_allowed", callback.From.Username) return b.tg.AnswerCallbackQuery(ctx, callback.ID, "Access denied.") } session, err := b.store.GetOrCreateSession(ctx, callback.From.ID, b.defaultModel, b.defaultSandbox) if err != nil { return err } if workspaceID, ok := ParseWorkspaceCallbackData(callback.Data); ok { if err := b.tg.AnswerCallbackQuery(ctx, callback.ID, ""); err != nil { return err } return b.setWorkspace(ctx, callback.From.ID, callback.Message.Chat.ID, session, workspaceID) } if resumeID, ok := ParseResumeThreadCallbackData(callback.Data); ok { if err := b.tg.AnswerCallbackQuery(ctx, callback.ID, "Thread selected."); err != nil { return err } return b.resumeThreadByID(ctx, callback.From.ID, callback.Message.Chat.ID, resumeID, callback.Message.MessageID) } if resumePage, ok := ParseResumePageCallbackData(callback.Data); ok { if err := b.tg.AnswerCallbackQuery(ctx, callback.ID, ""); err != nil { return err } return b.sendResumeChoices(ctx, callback.From.ID, callback.Message.Chat.ID, resumePage, callback.Message.MessageID) } if modelID, ok := ParseModelCallbackData(callback.Data); ok { return b.handleModelCallback(ctx, callback, modelID) } if effort, ok := ParseEffortCallbackData(callback.Data); ok { return b.handleEffortCallback(ctx, callback, effort) } if approvalID, decision, ok := ParseApprovalCallbackData(callback.Data); ok { return b.handleApprovalCallback(ctx, callback, approvalID, decision) } return b.tg.AnswerCallbackQuery(ctx, callback.ID, "Unknown action.") } func (b *Bot) handleApprovalCallback(ctx context.Context, callback *CallbackQuery, approvalID int64, decision string) error { approval, err := b.store.GetPendingApproval(ctx, callback.From.ID, approvalID) if err != nil { _ = b.tg.AnswerCallbackQuery(ctx, callback.ID, "Approval not found.") return nil } if decision == "details" { if err := b.tg.AnswerCallbackQuery(ctx, callback.ID, "Details sent."); err != nil { return err } return b.sendLong(ctx, callback.Message.Chat.ID, approval.PayloadJSON) } if approval.Status != "pending" { return b.tg.AnswerCallbackQuery(ctx, callback.ID, "Already resolved.") } requestID, err := strconv.ParseInt(approval.CodexRequestID, 10, 64) if err != nil { return b.tg.AnswerCallbackQuery(ctx, callback.ID, "Invalid request id.") } if err := b.codex.RespondServerRequest(ctx, requestID, approvalResponse(approval, decision)); err != nil { _ = b.tg.AnswerCallbackQuery(ctx, callback.ID, "Could not answer Codex.") return b.sendError(ctx, callback.Message.Chat.ID, "Could not answer Codex approval", err) } if err := b.store.ResolvePendingApproval(ctx, callback.From.ID, approval.ID, decision); err != nil { return err } if err := b.tg.AnswerCallbackQuery(ctx, callback.ID, "Sent to Codex."); err != nil { return err } updated := renderApprovalHTML(approval.Kind, json.RawMessage(approval.PayloadJSON), approvalStatusLine(decision)) _, err = b.tg.EditMessageText(ctx, callback.Message.Chat.ID, callback.Message.MessageID, updated, EditMessageTextOptions{ParseMode: "HTML"}) return err } func (b *Bot) handleCodexEvents(ctx context.Context) { for { select { case <-ctx.Done(): return case event := <-b.codex.Events(): if event.Err != nil { b.logger.Printf("codex event %s: %v", event.Method, event.Err) if event.Method == "connection/closed" { b.failActiveOutputs(ctx, "Codex connection closed: "+event.Err.Error()) } continue } if event.ServerRequest { if err := b.handleCodexServerRequest(ctx, event); err != nil { b.logger.Printf("server request %s: %v", event.Method, err) } continue } if err := b.handleCodexNotification(ctx, event); err != nil { b.logger.Printf("notification %s: %v", event.Method, err) } } } } func parseCodexThreadItem(raw json.RawMessage) (codexThreadItemView, error) { var item codexThreadItemView if len(raw) == 0 { return item, nil } if err := json.Unmarshal(raw, &item); err != nil { return item, err } return item, nil } func renderCodexItemStarted(item codexThreadItemView) string { switch item.Type { case "commandExecution": return SummaryDetailsHTMLLimited(joinNonEmpty("Tool call: command started", commandSummaryLine(item.Command)), commandStartedDetails(item), TelegramHTMLMessageLimit) case "fileChange": return "Tool call: file change started" case "mcpToolCall": return joinNonEmpty("Tool call: MCP started", "Tool: "+toolDisplayName(item.Server, item.Tool)) case "dynamicToolCall": return joinNonEmpty("Tool call: started", "Tool: "+toolDisplayName(item.Namespace, item.Tool)) case "webSearch": return joinNonEmpty("Tool call: web search started", "Query: "+item.Query) case "imageView": return joinNonEmpty("Tool call: image view", "Path: "+item.Path) case "imageGeneration": return "Tool call: image generation started" case "collabAgentToolCall": return joinNonEmpty("Tool call: agent started", "Tool: "+item.Tool) default: return "" } } func renderCodexItemCompleted(item codexThreadItemView) string { switch item.Type { case "commandExecution": status := "" if item.ExitCode != nil { status = fmt.Sprintf("Exit code: %d", *item.ExitCode) } return SummaryDetailsHTMLLimited(joinNonEmpty("Tool call: command finished", commandSummaryLine(item.Command), status), renderCodexItemDetails(item), TelegramHTMLMessageLimit) case "fileChange": return joinNonEmpty("Tool call: file change finished", fmt.Sprintf("Changed files: %d", len(item.Changes)), "Status: "+item.Status) case "mcpToolCall": return joinNonEmpty("Tool call: MCP finished", "Tool: "+toolDisplayName(item.Server, item.Tool), "Status: "+item.Status) case "dynamicToolCall": status := item.Status if item.Success != nil { status = fmt.Sprintf("success=%t", *item.Success) } return joinNonEmpty("Tool call: finished", "Tool: "+toolDisplayName(item.Namespace, item.Tool), "Status: "+status) case "webSearch": return joinNonEmpty("Tool call: web search finished", "Query: "+item.Query) case "imageView": return joinNonEmpty("Tool call: image view finished", "Path: "+item.Path) case "imageGeneration": return joinNonEmpty("Tool call: image generation finished", "Status: "+item.Status, "Saved path: "+item.SavedPath) case "collabAgentToolCall": return joinNonEmpty("Tool call: agent finished", "Tool: "+item.Tool, "Status: "+item.Status) default: return "" } } func commandSummaryLine(command string) string { command = strings.TrimSpace(command) if command == "" { return "" } runes := []rune(command) if len(runes) <= commandSummaryLimit { return "Command: " + command } return "Command: " + string(runes[:commandSummaryLimit]) + "..." } func commandStartedDetails(item codexThreadItemView) string { var lines []string if strings.TrimSpace(item.Command) != "" && len([]rune(strings.TrimSpace(item.Command))) > commandSummaryLimit { lines = append(lines, "command: "+strings.TrimSpace(item.Command)) } if strings.TrimSpace(item.CWD) != "" { lines = append(lines, "cwd: "+strings.TrimSpace(item.CWD)) } return strings.Join(lines, "\n") } func renderCodexItemDetails(item codexThreadItemView) string { var lines []string appendKV := func(key string, value any) { switch v := value.(type) { case string: if strings.TrimSpace(v) != "" { lines = append(lines, fmt.Sprintf("%s: %s", key, v)) } case *int: if v != nil { lines = append(lines, fmt.Sprintf("%s: %d", key, *v)) } case *int64: if v != nil { lines = append(lines, fmt.Sprintf("%s: %d", key, *v)) } case *bool: if v != nil { lines = append(lines, fmt.Sprintf("%s: %t", key, *v)) } } } appendKV("type", item.Type) appendKV("id", item.ID) appendKV("command", item.Command) appendKV("cwd", item.CWD) appendKV("status", item.Status) appendKV("tool", toolDisplayName(item.Namespace, item.Tool)) appendKV("server", item.Server) appendKV("query", item.Query) appendKV("path", item.Path) appendKV("savedPath", item.SavedPath) appendKV("exitCode", item.ExitCode) appendKV("durationMs", item.DurationMs) appendKV("success", item.Success) if len(item.Arguments) > 0 && string(item.Arguments) != "null" { lines = append(lines, "arguments: "+string(item.Arguments)) } if len(item.Changes) > 0 { for _, change := range item.Changes { if change.Path != "" { lines = append(lines, "changed: "+change.Path) } } } if item.AggregatedOutput != nil && strings.TrimSpace(*item.AggregatedOutput) != "" { lines = append(lines, "output:\n"+strings.TrimSpace(*item.AggregatedOutput)) } return strings.Join(lines, "\n") } func joinNonEmpty(lines ...string) string { out := make([]string, 0, len(lines)) for _, line := range lines { if strings.TrimSpace(line) != "" { out = append(out, line) } } return strings.Join(out, "\n") } func toolDisplayName(namespace, tool string) string { if namespace == "" { return tool } if tool == "" { return namespace } return namespace + "." + tool } func truncateForStatus(text string) string { runes := []rune(text) if len(runes) <= 1200 { return text } return string(runes[:1200]) + "\n..." } func (b *Bot) handleCodexNotification(ctx context.Context, event codexapp.Event) error { switch event.Method { case "error": var params struct { ThreadID string `json:"threadId"` TurnID string `json:"turnId"` WillRetry bool `json:"willRetry"` Error struct { Message string `json:"message"` AdditionalDetails string `json:"additionalDetails"` } `json:"error"` } if err := json.Unmarshal(event.Params, ¶ms); err != nil { return err } if params.ThreadID != "" && !params.WillRetry { if thread, err := b.store.GetThreadByCodexID(ctx, params.ThreadID); err == nil { _ = b.store.SetActiveTurn(ctx, thread.TelegramUserID, "") } message := "Codex error" if params.Error.Message != "" { message += ": " + params.Error.Message } if params.Error.AdditionalDetails != "" { message += "\n" + params.Error.AdditionalDetails } if err := b.flushAssistantMessage(ctx, params.ThreadID); err != nil { return err } if err := b.sendOutputBlock(ctx, params.ThreadID, message); err != nil { return err } b.clearOutput(params.ThreadID) return nil } case "item/started": var params struct { ThreadID string `json:"threadId"` Item json.RawMessage `json:"item"` } if err := json.Unmarshal(event.Params, ¶ms); err != nil { return err } item, err := parseCodexThreadItem(params.Item) if err != nil { return err } if params.ThreadID != "" && item.Type == "agentMessage" && b.hasAssistantText(params.ThreadID) { return b.flushAssistantMessage(ctx, params.ThreadID) } if params.ThreadID != "" { return b.sendOutputHTMLBlock(ctx, params.ThreadID, renderCodexItemStarted(item)) } case "item/agentMessage/delta": var params struct { ThreadID string `json:"threadId"` Delta string `json:"delta"` } if err := json.Unmarshal(event.Params, ¶ms); err != nil { return err } if params.ThreadID != "" && params.Delta != "" { return b.appendAssistantDelta(ctx, params.ThreadID, params.Delta) } case "item/completed": var params struct { ThreadID string `json:"threadId"` Item json.RawMessage `json:"item"` } if err := json.Unmarshal(event.Params, ¶ms); err != nil { return err } item, err := parseCodexThreadItem(params.Item) if err != nil { return err } if params.ThreadID != "" && item.Type == "agentMessage" { if item.Text != "" && !b.hasAssistantText(params.ThreadID) { if err := b.appendAssistantDelta(ctx, params.ThreadID, item.Text); err != nil { return err } } return b.flushAssistantMessage(ctx, params.ThreadID) } if params.ThreadID != "" { return b.sendOutputHTMLBlock(ctx, params.ThreadID, renderCodexItemCompleted(item)) } case "turn/diff/updated": var params struct { ThreadID string `json:"threadId"` Diff string `json:"diff"` } if err := json.Unmarshal(event.Params, ¶ms); err != nil { return err } if params.ThreadID != "" { b.mu.Lock() b.diffs[params.ThreadID] = params.Diff b.mu.Unlock() } case "turn/completed": var params struct { ThreadID string `json:"threadId"` Turn struct { ID string `json:"id"` Status string `json:"status"` } `json:"turn"` } if err := json.Unmarshal(event.Params, ¶ms); err != nil { return err } if params.ThreadID != "" { if thread, err := b.store.GetThreadByCodexID(ctx, params.ThreadID); err == nil { _ = b.store.SetActiveTurn(ctx, thread.TelegramUserID, "") _ = b.store.TouchThread(ctx, params.ThreadID) } return b.completeTurnOutput(ctx, params.ThreadID) } case "thread/name/updated": var params struct { ThreadID string `json:"threadId"` ThreadName *string `json:"threadName"` } if err := json.Unmarshal(event.Params, ¶ms); err != nil { return err } if params.ThreadID != "" { title := "" if params.ThreadName != nil { title = normalizeThreadTitle(*params.ThreadName) } return b.store.SyncThreadTitleByCodexID(ctx, params.ThreadID, title) } case "serverRequest/resolved": var params struct { ThreadID string `json:"threadId"` RequestID string `json:"requestId"` } _ = json.Unmarshal(event.Params, ¶ms) } return nil } func (b *Bot) handleCodexServerRequest(ctx context.Context, event codexapp.Event) error { if event.ID == nil { return nil } switch event.Method { case "item/commandExecution/requestApproval", "item/fileChange/requestApproval", "item/permissions/requestApproval": default: b.logger.Printf("unhandled server request: %s", event.Method) return nil } var params struct { ThreadID string `json:"threadId"` TurnID string `json:"turnId"` ItemID string `json:"itemId"` Reason string `json:"reason"` } if err := json.Unmarshal(event.Params, ¶ms); err != nil { return err } if params.ThreadID == "" { return errors.New("approval request missing threadId") } thread, err := b.store.GetThreadByCodexID(ctx, params.ThreadID) if err != nil { return err } pretty, _ := json.MarshalIndent(json.RawMessage(event.Params), "", " ") if len(pretty) == 0 { pretty = event.Params } kind := event.Method approval, err := b.store.UpsertPendingApproval(ctx, store.PendingApproval{ TelegramUserID: thread.TelegramUserID, CodexRequestID: strconv.FormatInt(*event.ID, 10), CodexThreadID: params.ThreadID, TurnID: params.TurnID, ItemID: params.ItemID, Kind: kind, PayloadJSON: string(pretty), }) if err != nil { return err } text := renderApprovalHTML(kind, event.Params, "") msg, err := b.tg.SendMessage(ctx, thread.TelegramUserID, text, SendMessageOptions{ ParseMode: "HTML", ReplyMarkup: approvalMarkup(approval.ID), }) if err != nil { return err } return b.store.UpdatePendingApprovalMessage(ctx, approval.ID, msg.Chat.ID, msg.MessageID) } func (b *Bot) registerOutput(threadID string, chatID int64) { b.mu.Lock() defer b.mu.Unlock() b.outputs[threadID] = &outputState{chatID: chatID} } func (b *Bot) clearOutput(threadID string) { b.mu.Lock() defer b.mu.Unlock() delete(b.outputs, threadID) } func (b *Bot) hasAssistantText(threadID string) bool { b.mu.Lock() defer b.mu.Unlock() state := b.outputs[threadID] return state != nil && state.assistant.Len() > 0 } func (b *Bot) failActiveOutputs(ctx context.Context, message string) { b.mu.Lock() threadIDs := make([]string, 0, len(b.outputs)) for threadID := range b.outputs { threadIDs = append(threadIDs, threadID) } b.mu.Unlock() for _, threadID := range threadIDs { if thread, err := b.store.GetThreadByCodexID(ctx, threadID); err == nil { _ = b.store.SetActiveTurn(ctx, thread.TelegramUserID, "") } if err := b.flushAssistantMessage(ctx, threadID); err != nil { b.logger.Printf("flush failed output %s: %v", threadID, err) } if err := b.sendOutputBlock(ctx, threadID, message); err != nil { b.logger.Printf("send failed output %s: %v", threadID, err) } b.clearOutput(threadID) } } func (b *Bot) sendOutputBlock(ctx context.Context, threadID, block string) error { block = strings.TrimSpace(block) if block == "" { return nil } if err := b.flushAssistantMessage(ctx, threadID); err != nil { return err } chatID, err := b.outputChatID(ctx, threadID) if err != nil { return nil } if err := b.sendLong(ctx, chatID, block); err != nil { return err } b.markOutputSent(threadID) return nil } func (b *Bot) sendOutputHTMLBlock(ctx context.Context, threadID, htmlText string) error { htmlText = strings.TrimSpace(htmlText) if htmlText == "" { return nil } if err := b.flushAssistantMessage(ctx, threadID); err != nil { return err } chatID, err := b.outputChatID(ctx, threadID) if err != nil { return nil } if err := b.sendHTML(ctx, chatID, htmlText); err != nil { return err } b.markOutputSent(threadID) return nil } func (b *Bot) appendAssistantDelta(ctx context.Context, threadID, delta string) error { if delta == "" { return nil } if _, err := b.outputChatID(ctx, threadID); err != nil { return nil } b.mu.Lock() state := b.outputs[threadID] if state != nil { _, _ = state.assistant.WriteString(delta) } b.mu.Unlock() return nil } func (b *Bot) flushAssistantMessage(ctx context.Context, threadID string) error { b.mu.Lock() state := b.outputs[threadID] if state == nil || state.assistant.Len() == 0 { b.mu.Unlock() return nil } chatID := state.chatID text := state.assistant.String() state.assistant.Reset() b.mu.Unlock() if err := b.sendLong(ctx, chatID, text); err != nil { return err } b.markOutputSent(threadID) return nil } func (b *Bot) completeTurnOutput(ctx context.Context, threadID string) error { if err := b.flushAssistantMessage(ctx, threadID); err != nil { return err } b.mu.Lock() state := b.outputs[threadID] if state == nil { b.mu.Unlock() return nil } chatID := state.chatID sentAny := state.sentAny delete(b.outputs, threadID) b.mu.Unlock() if !sentAny { _, err := b.tg.SendMessage(ctx, chatID, "Done.", SendMessageOptions{}) return err } return nil } func (b *Bot) outputChatID(ctx context.Context, threadID string) (int64, error) { b.mu.Lock() state := b.outputs[threadID] if state != nil { chatID := state.chatID b.mu.Unlock() return chatID, nil } b.mu.Unlock() thread, err := b.store.GetThreadByCodexID(ctx, threadID) if err != nil { return 0, err } b.registerOutput(threadID, thread.TelegramUserID) return thread.TelegramUserID, nil } func (b *Bot) markOutputSent(threadID string) { b.mu.Lock() defer b.mu.Unlock() if state := b.outputs[threadID]; state != nil { state.sentAny = true } } func (b *Bot) sendLong(ctx context.Context, chatID int64, text string) error { for _, chunk := range ChunkText(text, TelegramHTMLMessageLimit) { if err := b.sendHTML(ctx, chatID, EscapeHTML(chunk)); err != nil { return err } } return nil } func (b *Bot) sendHTML(ctx context.Context, chatID int64, htmlText string) error { _, err := b.tg.SendMessage(ctx, chatID, htmlText, SendMessageOptions{ParseMode: "HTML"}) return err } func (b *Bot) sendError(ctx context.Context, chatID int64, prefix string, err error) error { _, sendErr := b.tg.SendMessage(ctx, chatID, EscapeHTML(prefix+": "+err.Error()), SendMessageOptions{ParseMode: "HTML"}) return sendErr } func (b *Bot) sendWorkspaceMissing(ctx context.Context, chatID int64) error { _, err := b.tg.SendMessage(ctx, chatID, "No workspace configured. Ask an admin to run scripts/add-workspace, then use /workspace.", SendMessageOptions{}) return err } func (b *Bot) sendNoActiveThread(ctx context.Context, chatID int64, err error) error { if errors.Is(err, sql.ErrNoRows) { _, sendErr := b.tg.SendMessage(ctx, chatID, "No active thread. Use /new.", SendMessageOptions{}) return sendErr } return err } func parseCommand(text string) (string, []string, bool) { text = strings.TrimSpace(text) if !strings.HasPrefix(text, "/") { return "", nil, false } fields := strings.Fields(text) if len(fields) == 0 { return "", nil, false } name := strings.TrimPrefix(fields[0], "/") if before, _, ok := strings.Cut(name, "@"); ok { name = before } return strings.ToLower(name), fields[1:], true } func resumeThreadListText(threads []store.Thread, page int) string { lines := []string{fmt.Sprintf("Resume a thread (page %d):", page+1), ""} for _, thread := range threads { lines = append(lines, fmt.Sprintf("Thread ID %d: %s", thread.ID, threadDisplayTitle(thread))) } lines = append(lines, "", "Choose a button below, or use /resume THREAD_ID directly.") return strings.Join(lines, "\n") } func resumeThreadMarkup(threads []store.Thread, page int, hasNext bool) *InlineKeyboardMarkup { keyboard := make([][]InlineKeyboardButton, 0, 4) for _, thread := range threads { button := InlineKeyboardButton{ Text: fmt.Sprintf("ID %d", thread.ID), CallbackData: ResumeThreadCallbackData(thread.ID), } if len(keyboard) == 0 || len(keyboard[len(keyboard)-1]) >= 4 { keyboard = append(keyboard, []InlineKeyboardButton{button}) continue } keyboard[len(keyboard)-1] = append(keyboard[len(keyboard)-1], button) } var nav []InlineKeyboardButton if page > 0 { nav = append(nav, InlineKeyboardButton{Text: "Prev", CallbackData: ResumePageCallbackData(page - 1)}) } if hasNext { nav = append(nav, InlineKeyboardButton{Text: "Next", CallbackData: ResumePageCallbackData(page + 1)}) } if len(nav) > 0 { keyboard = append(keyboard, nav) } return &InlineKeyboardMarkup{InlineKeyboard: keyboard} } func normalizeThreadTitle(title string) string { title = strings.Join(strings.Fields(title), " ") runes := []rune(title) if len(runes) > 80 { title = string(runes[:80]) } return title } func threadDisplayTitle(thread store.Thread) string { title := strings.Join(strings.Fields(thread.Title), " ") if title == "" { title = thread.CodexThreadID } runes := []rune(title) if len(runes) > 90 { title = string(runes[:90]) + "..." } return title } func workspaceMarkup(workspaces []store.Workspace) *InlineKeyboardMarkup { keyboard := make([][]InlineKeyboardButton, 0, len(workspaces)) for _, ws := range workspaces { text := ws.Label if ws.IsDefault { text += " default" } keyboard = append(keyboard, []InlineKeyboardButton{{ Text: text, CallbackData: WorkspaceCallbackData(ws.ID), }}) } return &InlineKeyboardMarkup{InlineKeyboard: keyboard} } func modelMarkup(models []codexapp.Model) *InlineKeyboardMarkup { keyboard := make([][]InlineKeyboardButton, 0, len(models)) for _, model := range models { callbackData, ok := ModelCallbackData(model.ID) if !ok { continue } keyboard = append(keyboard, []InlineKeyboardButton{{ Text: modelLabel(model), CallbackData: callbackData, }}) } return &InlineKeyboardMarkup{InlineKeyboard: keyboard} } func effortMarkup(model codexapp.Model) *InlineKeyboardMarkup { keyboard := make([][]InlineKeyboardButton, 0, len(model.SupportedReasoningEfforts)) for _, option := range model.SupportedReasoningEfforts { label := option.ReasoningEffort if option.ReasoningEffort == model.DefaultReasoningEffort { label += " default" } keyboard = append(keyboard, []InlineKeyboardButton{{ Text: label, CallbackData: EffortCallbackData(option.ReasoningEffort), }}) } return &InlineKeyboardMarkup{InlineKeyboard: keyboard} } func modelSupportsEffort(model codexapp.Model, effort string) bool { for _, option := range model.SupportedReasoningEfforts { if option.ReasoningEffort == effort { return true } } return false } func modelLabel(model codexapp.Model) string { label := model.DisplayName if label == "" { label = model.ID } if model.IsDefault { label += " default" } return label } func sessionModelLabel(models []codexapp.Model, modelID string) string { if modelID == "" { return "(Codex default)" } for _, model := range models { if model.ID == modelID { return modelLabel(model) } } return modelID } func settingsStatusText(model string, effort string) string { if effort == "" { effort = "(model default)" } return fmt.Sprintf("Current model: %s\nCurrent reasoning effort: %s", model, effort) } func (b *Bot) rememberSettingsMessage(ctx context.Context, userID int64, chatID int64, messageID int) { if messageID == 0 { return } if err := b.store.SetSessionSettingsMessage(ctx, userID, chatID, messageID); err != nil { b.logger.Printf("settings message store: %v", err) return } if err := b.tg.PinChatMessage(ctx, chatID, messageID, true); err != nil { b.logger.Printf("settings message pin: %v", err) } } func approvalMarkup(id int64) *InlineKeyboardMarkup { return &InlineKeyboardMarkup{InlineKeyboard: [][]InlineKeyboardButton{ { {Text: "Approve", CallbackData: ApprovalCallbackData(id, "accept")}, {Text: "Deny", CallbackData: ApprovalCallbackData(id, "decline")}, }, { {Text: "Details", CallbackData: ApprovalCallbackData(id, "details")}, {Text: "Cancel", CallbackData: ApprovalCallbackData(id, "cancel")}, }, }} } func approvalResponse(approval store.PendingApproval, decision string) any { if approval.Kind != "item/permissions/requestApproval" { return map[string]any{"decision": decision} } scope := "turn" if decision == "acceptForSession" { scope = "session" } permissions := map[string]any{} if decision == "accept" || decision == "acceptForSession" { var params struct { Permissions map[string]any `json:"permissions"` } _ = json.Unmarshal([]byte(approval.PayloadJSON), ¶ms) if params.Permissions != nil { permissions = params.Permissions } } return map[string]any{ "permissions": permissions, "scope": scope, } } func renderApprovalHTML(kind string, raw json.RawMessage, status string) string { title := "Codex approval requested" if strings.Contains(kind, "commandExecution") { title = "Codex requests command approval" } if strings.Contains(kind, "fileChange") { title = "Codex requests file change approval" } if strings.Contains(kind, "permissions") { title = "Codex requests permission approval" } var params map[string]any _ = json.Unmarshal(raw, ¶ms) lines := []string{title} if reason, _ := params["reason"].(string); reason != "" { lines = append(lines, "", reason) } for _, key := range []string{"command", "cwd", "grantRoot", "permissions"} { if value, ok := params[key]; ok { lines = append(lines, fmt.Sprintf("%s: %s", key, conciseValue(value))) } } summary := strings.Join(lines, "\n") details := prettyJSON(raw) limit := TelegramHTMLMessageLimit if status != "" { limit -= len([]rune(status)) + 1 } text := SummaryDetailsHTMLLimited(summary, details, limit) if status != "" { text += "\n" + EscapeHTML(status) } return text } func approvalStatusLine(decision string) string { switch decision { case "accept": return "Approved." case "acceptForSession": return "Approved for this session." case "decline": return "Disapproved." case "cancel": return "Canceled." default: return "Resolved: " + decision + "." } } func conciseValue(value any) string { text := fmt.Sprint(value) if data, err := json.Marshal(value); err == nil { text = string(data) } text = strings.Join(strings.Fields(text), " ") runes := []rune(text) if len(runes) > 180 { return string(runes[:180]) + "..." } return text } func prettyJSON(raw json.RawMessage) string { if len(raw) == 0 { return "" } var value any if err := json.Unmarshal(raw, &value); err != nil { return string(raw) } data, err := json.MarshalIndent(value, "", " ") if err != nil { return string(raw) } return string(data) } func largestPhoto(photos []PhotoSize) PhotoSize { best := photos[0] bestScore := best.Width * best.Height for _, photo := range photos[1:] { score := photo.Width * photo.Height if score > bestScore { best = photo bestScore = score } } return best } func isImage(mimeType, filename string) bool { if strings.HasPrefix(strings.ToLower(mimeType), "image/") { return true } switch strings.ToLower(filepath.Ext(filename)) { case ".jpg", ".jpeg", ".png", ".gif", ".webp": return true default: return false } } func extensionForMime(mimeType string) string { extensions, err := mime.ExtensionsByType(mimeType) if err != nil || len(extensions) == 0 { return "" } return extensions[0] } func safeFilename(filename string) string { base := filepath.Base(filename) var builder strings.Builder for _, r := range base { switch { case r >= 'a' && r <= 'z': builder.WriteRune(r) case r >= 'A' && r <= 'Z': builder.WriteRune(r) case r >= '0' && r <= '9': builder.WriteRune(r) case r == '.', r == '-', r == '_': builder.WriteRune(r) default: builder.WriteByte('_') } } if builder.Len() == 0 { return "upload" } return builder.String() }