package telegram import ( "context" "database/sql" "encoding/json" "errors" "fmt" "log" "mime" "os" "path/filepath" "strconv" "strings" "sync" "time" "codex-telegram-bot/internal/codexapp" "codex-telegram-bot/internal/codexstate" "codex-telegram-bot/internal/store" ) const ( telegramDownloadLimit = 20 * 1024 * 1024 resumeThreadPageSize = 8 telegramPhotoDirectiveStart = "" telegramPhotoCaptionLimit = 1024 pictureMediaGroupLimit = 10 ) type Bot struct { tg *Client store *store.Store codex *codexapp.Client logger *log.Logger uploadDir string codexHome string codexStateDB string defaultModel string defaultSandbox string pollTimeout time.Duration mu sync.Mutex outputs map[string]*outputState diffs map[string]string } type assistantMessageSegment struct { Text string Photo *assistantPhotoDirective ThreadRename *assistantThreadRenameDirective ThreadCWD *assistantThreadCWDDirective } type assistantPhotoDirective struct { Path string `json:"path"` Caption string `json:"caption,omitempty"` } type assistantThreadRenameDirective struct { Title string `json:"title"` } type assistantThreadCWDDirective struct { CWD string `json:"cwd"` } type outputState struct { chatID int64 assistant strings.Builder sentAny bool pictureRequest bool tools map[string]toolMessageState sentImages map[string]bool generatedImages []generatedImageOutput workingIndicatorOff context.CancelFunc } type generatedImageOutput struct { Path string } type toolMessageState struct { chatID int64 messageID int toolHTML string approvalHTML string approvalMarkup *InlineKeyboardMarkup editedAt string } type codexThreadItemView struct { Type string `json:"type"` ID string `json:"id"` Command string `json:"command"` CWD string `json:"cwd"` Status string `json:"status"` Tool string `json:"tool"` Server string `json:"server"` Namespace string `json:"namespace"` Query string `json:"query"` Path string `json:"path"` SavedPath string `json:"savedPath"` Text string `json:"text"` AggregatedOutput *string `json:"aggregatedOutput"` ExitCode *int `json:"exitCode"` DurationMs *int64 `json:"durationMs"` Success *bool `json:"success"` Arguments json.RawMessage `json:"arguments"` Changes []struct { Path string `json:"path"` } `json:"changes"` } func NewBot(tg *Client, st *store.Store, codex *codexapp.Client, uploadDir, codexHome, codexStateDB, defaultModel, defaultSandbox string, pollTimeout time.Duration, logger *log.Logger) *Bot { if logger == nil { logger = log.Default() } return &Bot{ tg: tg, store: st, codex: codex, logger: logger, uploadDir: uploadDir, codexHome: codexHome, codexStateDB: codexStateDB, defaultModel: defaultModel, defaultSandbox: defaultSandbox, pollTimeout: pollTimeout, outputs: make(map[string]*outputState), diffs: make(map[string]string), } } func (b *Bot) Run(ctx context.Context) error { if err := b.store.ClearActiveTurns(ctx); err != nil { return err } go b.handleCodexEvents(ctx) offset := 0 for ctx.Err() == nil { updates, err := b.tg.GetUpdates(ctx, offset, int(b.pollTimeout.Seconds())) if err != nil { if ctx.Err() != nil { break } b.logger.Printf("getUpdates: %v", err) time.Sleep(2 * time.Second) continue } for _, update := range updates { if update.UpdateID >= offset { offset = update.UpdateID + 1 } if err := b.handleUpdate(ctx, update); err != nil { b.logger.Printf("handle update %d: %v", update.UpdateID, err) } } } return ctx.Err() } func (b *Bot) handleUpdate(ctx context.Context, update Update) error { switch { case update.Message != nil: return b.handleMessage(ctx, update.Message) case update.CallbackQuery != nil: return b.handleCallback(ctx, update.CallbackQuery) default: return nil } } func (b *Bot) handleMessage(ctx context.Context, message *Message) error { if message.PinnedMessage != nil { return nil } if message.Chat.Type != "private" { if message.From != nil { _ = b.store.Audit(ctx, message.From.ID, "reject_non_private", message.Chat.Type) } _, err := b.tg.SendMessage(ctx, message.Chat.ID, "This bot only supports one-to-one chats.", SendMessageOptions{}) return err } if message.From == nil { return nil } allowed, err := b.store.IsAllowed(ctx, message.From.ID) if err != nil { return err } if !allowed { _ = b.store.Audit(ctx, message.From.ID, "reject_not_allowed", message.From.Username) _, err := b.tg.SendMessage(ctx, message.Chat.ID, "Access denied.", SendMessageOptions{}) return err } session, err := b.store.GetOrCreateSession(ctx, message.From.ID, b.defaultModel, b.defaultSandbox) if err != nil { return err } if handled, err := b.handleCommand(ctx, message, session); handled || err != nil { return err } return b.continueThread(ctx, message, session) } func (b *Bot) handleCommand(ctx context.Context, message *Message, session store.Session) (bool, error) { command, args, ok := parseCommand(message.Text) if !ok { return false, nil } userID := message.From.ID chatID := message.Chat.ID switch command { case "start", "help": return true, b.sendHelp(ctx, chatID) case "new": _, _, err := b.createNewThread(ctx, userID, chatID, session, true) return true, err case "thread", "threads": return true, b.sendThreads(ctx, userID, chatID) case "resume": return true, b.resumeThread(ctx, userID, chatID, args) case "rename": return true, b.renameThread(ctx, userID, chatID, session, args) case "fork": return true, b.forkThread(ctx, userID, chatID, session) case "archive": return true, b.archiveThread(ctx, userID, chatID, session, args) case "status": return true, b.sendStatus(ctx, userID, chatID, session) case "cancel": return true, b.cancelTurn(ctx, userID, chatID, session) case "workspaces": return true, b.sendWorkspaces(ctx, userID, chatID) case "workspace": return true, b.handleWorkspaceCommand(ctx, userID, chatID, session, args) case "model": return true, b.handleModelCommand(ctx, userID, chatID, session, args) case "sandbox": return true, b.handleSandboxCommand(ctx, userID, chatID, session, args) case "pic": return true, b.handlePictureCommand(ctx, userID, chatID, session, args) case "diff": return true, b.sendDiff(ctx, chatID, session) default: _, err := b.tg.SendMessage(ctx, chatID, "Unknown command. Use /help.", SendMessageOptions{}) return true, err } } func (b *Bot) sendHelp(ctx context.Context, chatID int64) error { text := strings.Join([]string{ "Codex Telegram Bot", "", "/new - start a new Codex thread", "/thread or /threads - list recent threads", "/resume - choose a recent thread", "/resume ID - resume a thread", "/rename TITLE or /rename ID TITLE - rename a thread", "/fork - fork the active thread", "/archive [ID] - archive a thread", "/status - show active settings", "/cancel - interrupt the active turn", "/workspaces - list workspaces", "/workspace [ID] - select workspace", "/model - choose model and reasoning effort", "/sandbox [read-only|workspace-write|danger-full-access] - show or set sandbox", "/pic PROMPT - generate image(s) from a prompt", "/diff - show the latest streamed diff", "", "Plain text continues the active thread. Images are staged as local Codex image inputs; other files are staged and sent as paths.", }, "\n") return b.sendLong(ctx, chatID, text) } func (b *Bot) sendThreads(ctx context.Context, userID, chatID int64) error { return b.sendResumeChoices(ctx, userID, chatID, 0, 0) } func (b *Bot) resumeThread(ctx context.Context, userID, chatID int64, args []string) error { if len(args) == 0 { return b.sendResumeChoices(ctx, userID, chatID, 0, 0) } if len(args) != 1 { _, err := b.tg.SendMessage(ctx, chatID, "Use /resume to choose a thread, or /resume ID to resume directly.", SendMessageOptions{}) return err } id, err := strconv.ParseInt(args[0], 10, 64) if err != nil { _, sendErr := b.tg.SendMessage(ctx, chatID, "Thread ID must be a number.", SendMessageOptions{}) return sendErr } return b.resumeThreadByID(ctx, userID, chatID, id, 0) } func (b *Bot) sendResumeChoices(ctx context.Context, userID, chatID int64, page int, messageID int) error { if page < 0 { page = 0 } threads, err := b.store.ListThreadsPage(ctx, userID, false, resumeThreadPageSize+1, page*resumeThreadPageSize) if err != nil { return err } if len(threads) == 0 && page > 0 { return b.sendResumeChoices(ctx, userID, chatID, page-1, messageID) } if len(threads) == 0 { text := "No threads yet. Use /new." if messageID != 0 { _, err := b.tg.EditMessageText(ctx, chatID, messageID, text, EditMessageTextOptions{}) return err } _, err := b.tg.SendMessage(ctx, chatID, text, SendMessageOptions{}) return err } threads = b.syncThreadStates(ctx, threads) hasNext := len(threads) > resumeThreadPageSize if hasNext { threads = threads[:resumeThreadPageSize] } text := resumeThreadListText(threads, page) markup := resumeThreadMarkup(threads, page, hasNext) if messageID != 0 { _, err := b.tg.EditMessageText(ctx, chatID, messageID, EscapeHTML(text), EditMessageTextOptions{ParseMode: "HTML", ReplyMarkup: editReplyMarkup(markup)}) return err } _, err = b.tg.SendMessage(ctx, chatID, EscapeHTML(text), SendMessageOptions{ParseMode: "HTML", ReplyMarkup: markup}) return err } func (b *Bot) resumeThreadByID(ctx context.Context, userID, chatID int64, id int64, messageID int) error { thread, err := b.store.GetThreadByID(ctx, userID, id) if err != nil { if errors.Is(err, sql.ErrNoRows) { text := "Thread not found." if messageID != 0 { _, editErr := b.tg.EditMessageText(ctx, chatID, messageID, text, EditMessageTextOptions{}) return editErr } _, sendErr := b.tg.SendMessage(ctx, chatID, text, SendMessageOptions{}) return sendErr } return err } resumed, err := b.codex.ResumeThread(ctx, thread.CodexThreadID) if err != nil { return b.sendError(ctx, chatID, "Could not resume Codex thread", err) } thread, err = b.applyCodexThreadState(ctx, thread, resumed) if err != nil { return err } if err := b.store.SetActiveThread(ctx, userID, thread.ID); err != nil { return err } if err := b.store.SetSessionWorkspace(ctx, userID, thread.WorkspaceID); err != nil { return err } text := fmt.Sprintf("Active thread ID %d: %s", thread.ID, threadDisplayTitle(thread)) if messageID != 0 { _, err = b.tg.EditMessageText(ctx, chatID, messageID, EscapeHTML(text), EditMessageTextOptions{ParseMode: "HTML", ReplyMarkup: clearInlineKeyboardMarkup()}) return err } _, err = b.tg.SendMessage(ctx, chatID, EscapeHTML(text), SendMessageOptions{ParseMode: "HTML"}) return err } func (b *Bot) renameThread(ctx context.Context, userID, chatID int64, session store.Session, args []string) error { if len(args) == 0 { _, err := b.tg.SendMessage(ctx, chatID, "Use /rename TITLE for the active thread, or /rename THREAD_ID TITLE.", SendMessageOptions{}) return err } var thread store.Thread titleArgs := args var err error if len(args) > 1 { if id, parseErr := strconv.ParseInt(args[0], 10, 64); parseErr == nil { thread, err = b.store.GetThreadByID(ctx, userID, id) if err != nil { if errors.Is(err, sql.ErrNoRows) { _, sendErr := b.tg.SendMessage(ctx, chatID, "Thread not found.", SendMessageOptions{}) return sendErr } return err } titleArgs = args[1:] } } if thread.ID == 0 { thread, err = b.activeThread(ctx, userID, session) } if err != nil { return b.sendNoActiveThread(ctx, chatID, err) } title := normalizeThreadTitle(strings.Join(titleArgs, " ")) if title == "" { _, err := b.tg.SendMessage(ctx, chatID, "Thread title cannot be empty.", SendMessageOptions{}) return err } if err := b.codex.SetThreadName(ctx, thread.CodexThreadID, title); err != nil { return b.sendError(ctx, chatID, "Could not rename Codex thread", err) } if codexThread, readErr := b.codex.ReadThread(ctx, thread.CodexThreadID); readErr == nil { thread, err = b.applyCodexThreadState(ctx, thread, codexThread) if err != nil { return err } title = thread.Title } else { b.logger.Printf("read renamed thread %s: %v", thread.CodexThreadID, readErr) if err := b.store.SyncThreadTitle(ctx, userID, thread.ID, title); err != nil { return err } } _, err = b.tg.SendMessage(ctx, chatID, fmt.Sprintf("Renamed thread ID %d: %s", thread.ID, title), SendMessageOptions{}) return err } func (b *Bot) forkThread(ctx context.Context, userID, chatID int64, session store.Session) error { thread, err := b.activeThread(ctx, userID, session) if err != nil { return b.sendNoActiveThread(ctx, chatID, err) } forked, err := b.codex.ForkThread(ctx, thread.CodexThreadID) if err != nil { return b.sendError(ctx, chatID, "Could not fork Codex thread", err) } workspaceID := thread.WorkspaceID if workspace, ok, workspaceErr := b.workspaceForCodexCWD(ctx, forked.CWD); workspaceErr == nil && ok { workspaceID = workspace.ID } else if workspaceErr != nil { b.logger.Printf("sync fork cwd %s: %v", forked.CWD, workspaceErr) } title := codexThreadTitle(forked, "fork of ID "+strconv.FormatInt(thread.ID, 10)) local, err := b.store.CreateThread(ctx, userID, forked.ID, workspaceID, title) if err != nil { return err } if err := b.store.SetActiveThread(ctx, userID, local.ID); err != nil { return err } if err := b.store.SetSessionWorkspace(ctx, userID, local.WorkspaceID); err != nil { return err } _, err = b.tg.SendMessage(ctx, chatID, fmt.Sprintf("Forked active thread to #%d.", local.ID), SendMessageOptions{}) return err } func (b *Bot) archiveThread(ctx context.Context, userID, chatID int64, session store.Session, args []string) error { var thread store.Thread var err error if len(args) > 0 { id, parseErr := strconv.ParseInt(args[0], 10, 64) if parseErr != nil { _, sendErr := b.tg.SendMessage(ctx, chatID, "Thread ID must be a number.", SendMessageOptions{}) return sendErr } thread, err = b.store.GetThreadByID(ctx, userID, id) } else { thread, err = b.activeThread(ctx, userID, session) } if err != nil { return b.sendNoActiveThread(ctx, chatID, err) } if err := b.codex.ArchiveThread(ctx, thread.CodexThreadID); err != nil { return b.sendError(ctx, chatID, "Could not archive Codex thread", err) } if err := b.store.ArchiveThread(ctx, userID, thread.ID); err != nil { return err } _, err = b.tg.SendMessage(ctx, chatID, fmt.Sprintf("Archived thread #%d.", thread.ID), SendMessageOptions{}) return err } func (b *Bot) sendStatus(ctx context.Context, userID, chatID int64, session store.Session) error { workspace := "(none)" if session.ActiveWorkspaceID != 0 { if ws, err := b.store.GetWorkspaceByID(ctx, session.ActiveWorkspaceID); err == nil { workspace = fmt.Sprintf("%s (%s)", ws.Label, ws.Path) } } thread := "(none)" if session.ActiveThreadID != 0 { thread = fmt.Sprintf("ID %d", session.ActiveThreadID) if active, err := b.store.GetThreadByID(ctx, userID, session.ActiveThreadID); err == nil { if synced, syncErr := b.syncThreadState(ctx, active); syncErr == nil { active = synced } else { b.logger.Printf("sync status thread state %s: %v", active.CodexThreadID, syncErr) } if ws, wsErr := b.store.GetWorkspaceByID(ctx, active.WorkspaceID); wsErr == nil { workspace = fmt.Sprintf("%s (%s)", ws.Label, ws.Path) } thread = fmt.Sprintf("ID %d: %s", active.ID, threadDisplayTitle(active)) } } model := session.Model if model == "" { model = "(Codex default)" } turn := session.ActiveTurnID if turn == "" { turn = "(none)" } effort := session.ReasoningEffort if effort == "" { effort = "(model default)" } text := fmt.Sprintf("Workspace: %s\nThread: %s\nModel: %s\nReasoning effort: %s\nSandbox: %s\nActive turn: %s", workspace, thread, model, effort, session.Sandbox, turn) _ = userID return b.sendLong(ctx, chatID, text) } func (b *Bot) cancelTurn(ctx context.Context, userID, chatID int64, session store.Session) error { if session.ActiveTurnID == "" { _, err := b.tg.SendMessage(ctx, chatID, "No active turn to cancel.", SendMessageOptions{}) return err } thread, err := b.activeThread(ctx, userID, session) if err != nil { return b.sendNoActiveThread(ctx, chatID, err) } if err := b.codex.InterruptTurn(ctx, thread.CodexThreadID, session.ActiveTurnID); err != nil { return b.sendError(ctx, chatID, "Could not interrupt turn", err) } _, err = b.tg.SendMessage(ctx, chatID, "Cancellation requested.", SendMessageOptions{}) return err } func (b *Bot) sendWorkspaces(ctx context.Context, userID, chatID int64) error { if err := b.syncUserThreadStates(ctx, userID); err != nil { b.logger.Printf("sync workspaces for user %d: %v", userID, err) } workspaces, err := b.store.ListWorkspaces(ctx) if err != nil { return err } if len(workspaces) == 0 { _, err := b.tg.SendMessage(ctx, chatID, "No workspaces configured. Ask an admin to run scripts/add-workspace.", SendMessageOptions{}) return err } var lines []string lines = append(lines, "Workspaces:") for _, ws := range workspaces { marker := "" if ws.IsDefault { marker = " default" } lines = append(lines, fmt.Sprintf("#%d %s%s\n%s", ws.ID, ws.Label, marker, ws.Path)) } _, err = b.tg.SendMessage(ctx, chatID, EscapeHTML(strings.Join(lines, "\n")), SendMessageOptions{ ParseMode: "HTML", ReplyMarkup: workspaceMarkup(workspaces), }) return err } func (b *Bot) handleWorkspaceCommand(ctx context.Context, userID, chatID int64, session store.Session, args []string) error { if len(args) == 0 { return b.sendWorkspaces(ctx, userID, chatID) } id, err := strconv.ParseInt(args[0], 10, 64) if err != nil { _, sendErr := b.tg.SendMessage(ctx, chatID, "Workspace ID must be a number.", SendMessageOptions{}) return sendErr } return b.setWorkspace(ctx, userID, chatID, session, id) } func (b *Bot) setWorkspace(ctx context.Context, userID, chatID int64, session store.Session, workspaceID int64) error { workspace, err := b.store.GetWorkspaceByID(ctx, workspaceID) if err != nil { if errors.Is(err, sql.ErrNoRows) { _, sendErr := b.tg.SendMessage(ctx, chatID, "Workspace not found.", SendMessageOptions{}) return sendErr } return err } if err := b.store.SetSessionWorkspace(ctx, userID, workspace.ID); err != nil { return err } _ = session _, err = b.tg.SendMessage(ctx, chatID, "Workspace set to "+workspace.Label+".", SendMessageOptions{}) return err } func (b *Bot) handleModelCommand(ctx context.Context, userID, chatID int64, session store.Session, args []string) error { _ = userID if len(args) > 0 { _, err := b.tg.SendMessage(ctx, chatID, "Use /model and choose from the buttons.", SendMessageOptions{}) return err } return b.sendModelChoices(ctx, chatID, session) } func (b *Bot) sendModelChoices(ctx context.Context, chatID int64, session store.Session) error { models, err := b.codex.ListModels(ctx) if err != nil { return b.sendError(ctx, chatID, "Could not list Codex models", err) } if len(models) == 0 { _, err := b.tg.SendMessage(ctx, chatID, "Codex returned no available models.", SendMessageOptions{}) return err } model := sessionModelLabel(models, session.Model) text := settingsStatusText(model, session.ReasoningEffort) + "\nChoose a model:" message, err := b.tg.SendMessage(ctx, chatID, EscapeHTML(text), SendMessageOptions{ ParseMode: "HTML", ReplyMarkup: modelMarkup(models), }) if err != nil { return err } b.rememberSettingsMessage(ctx, session.TelegramUserID, chatID, message.MessageID) return nil } func (b *Bot) handleModelCallback(ctx context.Context, callback *CallbackQuery, modelID string) error { models, err := b.codex.ListModels(ctx) if err != nil { _ = b.tg.AnswerCallbackQuery(ctx, callback.ID, "Could not list models.") return b.sendError(ctx, callback.Message.Chat.ID, "Could not list Codex models", err) } var selected codexapp.Model found := false for _, model := range models { if model.ID == modelID { selected = model found = true break } } if !found { return b.tg.AnswerCallbackQuery(ctx, callback.ID, "Model is no longer available.") } if err := b.store.SetSessionModel(ctx, callback.From.ID, selected.ID); err != nil { return err } if err := b.tg.AnswerCallbackQuery(ctx, callback.ID, "Model selected."); err != nil { return err } label := modelLabel(selected) if len(selected.SupportedReasoningEfforts) == 0 { text := settingsStatusText(label, "") + "\nNo reasoning effort choices are advertised for this model." message, err := b.tg.EditMessageText(ctx, callback.Message.Chat.ID, callback.Message.MessageID, EscapeHTML(text), EditMessageTextOptions{ParseMode: "HTML"}) if err == nil { b.rememberSettingsMessage(ctx, callback.From.ID, callback.Message.Chat.ID, message.MessageID) } return err } text := settingsStatusText(label, "") + "\nChoose reasoning effort:" message, err := b.tg.EditMessageText(ctx, callback.Message.Chat.ID, callback.Message.MessageID, EscapeHTML(text), EditMessageTextOptions{ ParseMode: "HTML", ReplyMarkup: effortMarkup(selected), }) if err == nil { b.rememberSettingsMessage(ctx, callback.From.ID, callback.Message.Chat.ID, message.MessageID) } return err } func (b *Bot) handleEffortCallback(ctx context.Context, callback *CallbackQuery, effort string) error { session, err := b.store.GetOrCreateSession(ctx, callback.From.ID, b.defaultModel, b.defaultSandbox) if err != nil { return err } if session.Model == "" { return b.tg.AnswerCallbackQuery(ctx, callback.ID, "Choose a model first.") } models, err := b.codex.ListModels(ctx) if err != nil { _ = b.tg.AnswerCallbackQuery(ctx, callback.ID, "Could not validate effort.") return b.sendError(ctx, callback.Message.Chat.ID, "Could not list Codex models", err) } var selected codexapp.Model found := false for _, model := range models { if model.ID == session.Model { selected = model found = true break } } if !found { return b.tg.AnswerCallbackQuery(ctx, callback.ID, "Selected model is no longer available.") } if !modelSupportsEffort(selected, effort) { return b.tg.AnswerCallbackQuery(ctx, callback.ID, "Effort is no longer available for this model.") } if err := b.store.SetSessionReasoningEffort(ctx, callback.From.ID, effort); err != nil { return err } if err := b.tg.AnswerCallbackQuery(ctx, callback.ID, "Reasoning effort selected."); err != nil { return err } text := settingsStatusText(modelLabel(selected), effort) message, err := b.tg.EditMessageText(ctx, callback.Message.Chat.ID, callback.Message.MessageID, EscapeHTML(text), EditMessageTextOptions{ParseMode: "HTML"}) if err == nil { b.rememberSettingsMessage(ctx, callback.From.ID, callback.Message.Chat.ID, message.MessageID) } return err } func (b *Bot) handleSandboxCommand(ctx context.Context, userID, chatID int64, session store.Session, args []string) error { if len(args) == 0 { _, err := b.tg.SendMessage(ctx, chatID, "Current sandbox: "+session.Sandbox, SendMessageOptions{}) return err } sandbox, err := codexapp.NormalizeSandbox(args[0]) if err != nil { _, sendErr := b.tg.SendMessage(ctx, chatID, "Use one of: read-only, workspace-write, danger-full-access.", SendMessageOptions{}) return sendErr } if err := b.store.SetSessionSandbox(ctx, userID, sandbox); err != nil { return err } _, err = b.tg.SendMessage(ctx, chatID, "Sandbox set to "+sandbox+".", SendMessageOptions{}) return err } func isPicturePath(path string) bool { switch strings.ToLower(filepath.Ext(path)) { case ".jpg", ".jpeg", ".png", ".webp", ".gif": return true default: return false } } func (b *Bot) sendDiff(ctx context.Context, chatID int64, session store.Session) error { if session.ActiveThreadID == 0 { _, err := b.tg.SendMessage(ctx, chatID, "No active thread.", SendMessageOptions{}) return err } thread, err := b.store.GetThreadByID(ctx, chatID, session.ActiveThreadID) if err != nil { _, sendErr := b.tg.SendMessage(ctx, chatID, "No active thread.", SendMessageOptions{}) return sendErr } b.mu.Lock() diff := b.diffs[thread.CodexThreadID] b.mu.Unlock() if diff == "" { _, err := b.tg.SendMessage(ctx, chatID, "No diff has been streamed for this thread.", SendMessageOptions{}) return err } return b.sendLong(ctx, chatID, diff) } func (b *Bot) continueThread(ctx context.Context, message *Message, session store.Session) error { userID := message.From.ID chatID := message.Chat.ID input, err := b.messageInput(ctx, userID, message) if err != nil { return b.sendError(ctx, chatID, "Could not stage Telegram input", err) } if len(input) == 0 { _, err := b.tg.SendMessage(ctx, chatID, "Send text, an image, or a document.", SendMessageOptions{}) return err } thread, _, err := b.ensureThread(ctx, userID, chatID, session) if err != nil { return err } if session.ActiveTurnID != "" { if err := b.codex.SteerTurn(ctx, thread.CodexThreadID, session.ActiveTurnID, input); err != nil { return b.sendError(ctx, chatID, "Could not append to active turn", err) } _, err := b.tg.SendMessage(ctx, chatID, "Added to the running turn.", SendMessageOptions{}) return err } b.registerOutput(thread.CodexThreadID, chatID) turn, err := b.codex.StartTurn(ctx, thread.CodexThreadID, "", session.Model, session.ReasoningEffort, session.Sandbox, input) if err != nil { b.clearOutput(thread.CodexThreadID) return b.sendError(ctx, chatID, "Codex turn failed", err) } if err := b.store.SetActiveTurn(ctx, userID, turn.ID); err != nil { return err } _ = b.store.TouchThread(ctx, thread.CodexThreadID) return nil } func codexThreadTitle(thread codexapp.Thread, fallback string) string { if title := normalizeThreadTitle(thread.Name); title != "" { return title } if title := normalizeThreadTitle(thread.Preview); title != "" { return title } return normalizeThreadTitle(fallback) } func (b *Bot) applyCodexThreadState(ctx context.Context, thread store.Thread, codexThread codexapp.Thread) (store.Thread, error) { title := codexThreadTitle(codexThread, "") if title != thread.Title { if err := b.store.SyncThreadTitle(ctx, thread.TelegramUserID, thread.ID, title); err != nil { return thread, err } thread.Title = title } workspace, ok, err := b.workspaceForCodexCWD(ctx, codexThread.CWD) if err != nil { return thread, err } if ok && workspace.ID != thread.WorkspaceID { if err := b.store.SyncThreadWorkspace(ctx, thread.TelegramUserID, thread.ID, workspace.ID); err != nil { return thread, err } thread.WorkspaceID = workspace.ID } return thread, nil } func (b *Bot) workspaceForCodexCWD(ctx context.Context, cwd string) (store.Workspace, bool, error) { cwd = strings.TrimSpace(cwd) if cwd == "" { return store.Workspace{}, false, nil } clean, err := store.ValidateWorkspacePath(cwd) if err != nil { return store.Workspace{}, false, err } workspace, err := b.store.GetWorkspaceByPath(ctx, clean) if err == nil { return workspace, true, nil } if !errors.Is(err, sql.ErrNoRows) { return store.Workspace{}, false, err } label := filepath.Base(clean) workspace, err = b.store.AddWorkspace(ctx, clean, label, false) if err != nil { return store.Workspace{}, false, err } return workspace, true, nil } func (b *Bot) syncThreadState(ctx context.Context, thread store.Thread) (store.Thread, error) { if thread.CodexThreadID == "" { return thread, nil } codexThread, err := b.codex.ReadThread(ctx, thread.CodexThreadID) if err != nil { return thread, err } return b.applyCodexThreadState(ctx, thread, codexThread) } func (b *Bot) syncThreadStates(ctx context.Context, threads []store.Thread) []store.Thread { for i := range threads { synced, err := b.syncThreadState(ctx, threads[i]) if err != nil { b.logger.Printf("sync thread state %s: %v", threads[i].CodexThreadID, err) continue } threads[i] = synced } return threads } func (b *Bot) syncUserThreadStates(ctx context.Context, userID int64) error { threads, err := b.store.ListThreadsPage(ctx, userID, false, 200, 0) if err != nil { return err } b.syncThreadStates(ctx, threads) return nil } func (b *Bot) createNewThread(ctx context.Context, userID, chatID int64, session store.Session, announce bool) (store.Thread, store.Workspace, error) { workspace, err := b.resolveWorkspace(ctx, userID, session) if err != nil { return store.Thread{}, store.Workspace{}, b.sendWorkspaceMissing(ctx, chatID) } codexThread, err := b.codex.StartThread(ctx, workspace.Path, session.Model, session.Sandbox) if err != nil { return store.Thread{}, store.Workspace{}, b.sendError(ctx, chatID, "Could not start Codex thread", err) } threadWorkspace := workspace if codexWorkspace, ok, workspaceErr := b.workspaceForCodexCWD(ctx, codexThread.CWD); workspaceErr == nil && ok { threadWorkspace = codexWorkspace } else if workspaceErr != nil { b.logger.Printf("sync new thread cwd %s: %v", codexThread.CWD, workspaceErr) } title := codexThreadTitle(codexThread, threadWorkspace.Label) thread, err := b.store.CreateThread(ctx, userID, codexThread.ID, threadWorkspace.ID, title) if err != nil { return store.Thread{}, store.Workspace{}, err } if err := b.store.SetActiveThread(ctx, userID, thread.ID); err != nil { return store.Thread{}, store.Workspace{}, err } if err := b.store.SetSessionWorkspace(ctx, userID, thread.WorkspaceID); err != nil { return store.Thread{}, store.Workspace{}, err } if announce { _, err = b.tg.SendMessage(ctx, chatID, fmt.Sprintf("New thread #%d in %s.", thread.ID, threadWorkspace.Label), SendMessageOptions{}) } return thread, threadWorkspace, err } func (b *Bot) ensureThread(ctx context.Context, userID, chatID int64, session store.Session) (store.Thread, store.Workspace, error) { if session.ActiveThreadID != 0 { thread, err := b.store.GetThreadByID(ctx, userID, session.ActiveThreadID) if err == nil && !thread.Archived { if synced, syncErr := b.syncThreadState(ctx, thread); syncErr == nil { thread = synced } else { b.logger.Printf("sync active thread state %s: %v", thread.CodexThreadID, syncErr) } workspace, err := b.store.GetWorkspaceByID(ctx, thread.WorkspaceID) return thread, workspace, err } } return b.createNewThread(ctx, userID, chatID, session, true) } func (b *Bot) ensureThreadForPicture(ctx context.Context, userID, chatID int64, session store.Session) (store.Thread, store.Workspace, error) { if session.ActiveThreadID != 0 { return b.ensureThread(ctx, userID, chatID, session) } return b.createNewThread(ctx, userID, chatID, session, false) } func (b *Bot) handlePictureCommand(ctx context.Context, userID, chatID int64, session store.Session, args []string) error { prompt := strings.TrimSpace(strings.Join(args, " ")) if prompt == "" { _, err := b.tg.SendMessage(ctx, chatID, "Use /pic PROMPT to generate image(s).", SendMessageOptions{}) return err } if session.ActiveTurnID != "" { _, err := b.tg.SendMessage(ctx, chatID, "A Codex turn is already running. Use /cancel first, or wait for it to finish.", SendMessageOptions{}) return err } thread, _, err := b.ensureThreadForPicture(ctx, userID, chatID, session) if err != nil { return err } input := []codexapp.InputItem{{Type: "text", Text: pictureGenerationInstruction(prompt)}} b.registerPictureOutput(thread.CodexThreadID, chatID) turn, err := b.codex.StartTurn(ctx, thread.CodexThreadID, "", session.Model, session.ReasoningEffort, session.Sandbox, input) if err != nil { b.clearOutput(thread.CodexThreadID) return b.sendError(ctx, chatID, "Codex image generation failed", err) } if err := b.store.SetActiveTurn(ctx, userID, turn.ID); err != nil { return err } _ = b.store.TouchThread(ctx, thread.CodexThreadID) return nil } func pictureGenerationInstruction(prompt string) string { return strings.Join([]string{ "You are handling a Telegram /pic command.", "Use only the built-in image generation capability to create image(s) from the user prompt below.", "Do not browse the web, run shell commands, call MCP tools, edit files, or ask follow-up questions.", "Avoid extra explanatory text; the Telegram bot will send generated image files automatically.", "", "User image prompt:", strings.TrimSpace(prompt), }, "\n") } func (b *Bot) activeThread(ctx context.Context, userID int64, session store.Session) (store.Thread, error) { if session.ActiveThreadID == 0 { return store.Thread{}, sql.ErrNoRows } return b.store.GetThreadByID(ctx, userID, session.ActiveThreadID) } func (b *Bot) resolveWorkspace(ctx context.Context, userID int64, session store.Session) (store.Workspace, error) { if session.ActiveWorkspaceID != 0 { return b.store.GetWorkspaceByID(ctx, session.ActiveWorkspaceID) } workspace, err := b.store.DefaultWorkspace(ctx) if err != nil { return store.Workspace{}, err } if err := b.store.SetSessionWorkspace(ctx, userID, workspace.ID); err != nil { return store.Workspace{}, err } return workspace, nil } func (b *Bot) messageInput(ctx context.Context, userID int64, message *Message) ([]codexapp.InputItem, error) { var input []codexapp.InputItem text := strings.TrimSpace(message.Text) if text == "" { text = strings.TrimSpace(message.Caption) } if text != "" { input = append(input, codexapp.InputItem{Type: "text", Text: text}) } if len(message.Photo) > 0 { photo := largestPhoto(message.Photo) path, err := b.stageFile(ctx, userID, photo.FileID, "photo.jpg", "image/jpeg", photo.FileSize) if err != nil { return nil, err } input = append(input, codexapp.InputItem{Type: "localImage", Path: path}) } if message.Document != nil { path, err := b.stageFile(ctx, userID, message.Document.FileID, message.Document.FileName, message.Document.MimeType, message.Document.FileSize) if err != nil { return nil, err } if isImage(message.Document.MimeType, message.Document.FileName) { input = append(input, codexapp.InputItem{Type: "localImage", Path: path}) } else { input = append(input, codexapp.InputItem{Type: "text", Text: "User uploaded a file staged at: " + path}) } } return input, nil } func (b *Bot) stageFile(ctx context.Context, userID int64, fileID, filename, mimeType string, size int64) (string, error) { if size > telegramDownloadLimit { return "", fmt.Errorf("file exceeds Telegram cloud download limit of 20 MB") } file, err := b.tg.GetFile(ctx, fileID) if err != nil { return "", err } if file.FileSize > telegramDownloadLimit { return "", fmt.Errorf("file exceeds Telegram cloud download limit of 20 MB") } if file.FilePath == "" { return "", errors.New("telegram did not return a file path") } data, err := b.tg.DownloadFilePath(ctx, file.FilePath) if err != nil { return "", err } if len(data) > telegramDownloadLimit { return "", fmt.Errorf("file exceeds Telegram cloud download limit of 20 MB") } if filename == "" { filename = filepath.Base(file.FilePath) } if filename == "." || filename == string(filepath.Separator) || filename == "" { filename = fileID if ext := extensionForMime(mimeType); ext != "" { filename += ext } } if err := os.MkdirAll(b.uploadDir, 0o755); err != nil { return "", err } name := fmt.Sprintf("%d_%d_%s", userID, time.Now().UnixNano(), safeFilename(filename)) path := filepath.Join(b.uploadDir, name) if err := os.WriteFile(path, data, 0o644); err != nil { return "", err } return path, nil } func (b *Bot) handleCallback(ctx context.Context, callback *CallbackQuery) error { if callback.Message == nil { return b.tg.AnswerCallbackQuery(ctx, callback.ID, "Unsupported callback.") } if callback.Message.Chat.Type != "private" { return b.tg.AnswerCallbackQuery(ctx, callback.ID, "Use a private chat.") } allowed, err := b.store.IsAllowed(ctx, callback.From.ID) if err != nil { return err } if !allowed { _ = b.store.Audit(ctx, callback.From.ID, "reject_callback_not_allowed", callback.From.Username) return b.tg.AnswerCallbackQuery(ctx, callback.ID, "Access denied.") } session, err := b.store.GetOrCreateSession(ctx, callback.From.ID, b.defaultModel, b.defaultSandbox) if err != nil { return err } if workspaceID, ok := ParseWorkspaceCallbackData(callback.Data); ok { if err := b.tg.AnswerCallbackQuery(ctx, callback.ID, ""); err != nil { return err } return b.setWorkspace(ctx, callback.From.ID, callback.Message.Chat.ID, session, workspaceID) } if resumeID, ok := ParseResumeThreadCallbackData(callback.Data); ok { if err := b.tg.AnswerCallbackQuery(ctx, callback.ID, "Thread selected."); err != nil { return err } return b.resumeThreadByID(ctx, callback.From.ID, callback.Message.Chat.ID, resumeID, callback.Message.MessageID) } if resumePage, ok := ParseResumePageCallbackData(callback.Data); ok { if err := b.tg.AnswerCallbackQuery(ctx, callback.ID, ""); err != nil { return err } return b.sendResumeChoices(ctx, callback.From.ID, callback.Message.Chat.ID, resumePage, callback.Message.MessageID) } if modelID, ok := ParseModelCallbackData(callback.Data); ok { return b.handleModelCallback(ctx, callback, modelID) } if effort, ok := ParseEffortCallbackData(callback.Data); ok { return b.handleEffortCallback(ctx, callback, effort) } if approvalID, decision, ok := ParseApprovalCallbackData(callback.Data); ok { return b.handleApprovalCallback(ctx, callback, approvalID, decision) } return b.tg.AnswerCallbackQuery(ctx, callback.ID, "Unknown action.") } func (b *Bot) handleApprovalCallback(ctx context.Context, callback *CallbackQuery, approvalID int64, decision string) error { approval, err := b.store.GetPendingApproval(ctx, callback.From.ID, approvalID) if err != nil { _ = b.tg.AnswerCallbackQuery(ctx, callback.ID, "Approval not found.") return nil } if decision == "details" { if err := b.tg.AnswerCallbackQuery(ctx, callback.ID, "Details sent."); err != nil { return err } return b.sendHTML(ctx, callback.Message.Chat.ID, renderApprovalHTML(approval.Kind, json.RawMessage(approval.PayloadJSON), "")) } if approval.Status != "pending" { return b.tg.AnswerCallbackQuery(ctx, callback.ID, "Already resolved.") } requestID, err := strconv.ParseInt(approval.CodexRequestID, 10, 64) if err != nil { return b.tg.AnswerCallbackQuery(ctx, callback.ID, "Invalid request id.") } if err := b.codex.RespondServerRequest(ctx, requestID, approvalResponse(approval, decision)); err != nil { _ = b.tg.AnswerCallbackQuery(ctx, callback.ID, "Could not answer Codex.") return b.sendError(ctx, callback.Message.Chat.ID, "Could not answer Codex approval", err) } if err := b.store.ResolvePendingApproval(ctx, callback.From.ID, approval.ID, decision); err != nil { return err } if err := b.tg.AnswerCallbackQuery(ctx, callback.ID, "Sent to Codex."); err != nil { return err } updated := b.resolveApprovalMessageHTML(approval, decision) _, err = b.tg.EditMessageText(ctx, callback.Message.Chat.ID, callback.Message.MessageID, updated, EditMessageTextOptions{ParseMode: "HTML", ReplyMarkup: clearInlineKeyboardMarkup()}) return ignoreTelegramMessageNotModified(err) } func (b *Bot) handleCodexEvents(ctx context.Context) { for { select { case <-ctx.Done(): return case event := <-b.codex.Events(): if event.Err != nil { b.logger.Printf("codex event %s: %v", event.Method, event.Err) if event.Method == "connection/closed" { b.failActiveOutputs(ctx, "Codex connection closed: "+event.Err.Error()) } continue } if event.ServerRequest { if err := b.handleCodexServerRequest(ctx, event); err != nil { b.logger.Printf("server request %s: %v", event.Method, err) } continue } if err := b.handleCodexNotification(ctx, event); err != nil { b.logger.Printf("notification %s: %v", event.Method, err) } } } } func parseCodexThreadItem(raw json.RawMessage) (codexThreadItemView, error) { var item codexThreadItemView if len(raw) == 0 { return item, nil } if err := json.Unmarshal(raw, &item); err != nil { return item, err } return item, nil } func renderCodexItemStarted(item codexThreadItemView) string { switch item.Type { case "commandExecution": return SummaryDetailsRawHTMLLimited("Tool call: command started", commandStartedDetailsHTML(item), TelegramHTMLMessageLimit) case "fileChange": return "Tool call: file change started" case "mcpToolCall": return joinNonEmpty("Tool call: MCP started", "Tool: "+toolDisplayName(item.Server, item.Tool)) case "dynamicToolCall": return joinNonEmpty("Tool call: started", "Tool: "+toolDisplayName(item.Namespace, item.Tool)) case "webSearch": return joinNonEmpty("Tool call: web search started", "Query: "+item.Query) case "imageView": return joinNonEmpty("Tool call: image view", "Path: "+item.Path) case "imageGeneration": return "Tool call: image generation started" case "collabAgentToolCall": return joinNonEmpty("Tool call: agent started", "Tool: "+item.Tool) default: return "" } } func renderCodexItemCompleted(item codexThreadItemView) string { switch item.Type { case "commandExecution": status := "" if item.ExitCode != nil { status = fmt.Sprintf("Exit code: %d", *item.ExitCode) } return SummaryDetailsRawHTMLLimited(joinNonEmpty("Tool call: command finished", status), renderCodexItemDetailsHTML(item), TelegramHTMLMessageLimit) case "fileChange": return joinNonEmpty("Tool call: file change finished", fmt.Sprintf("Changed files: %d", len(item.Changes)), "Status: "+item.Status) case "mcpToolCall": summary := joinNonEmpty("Tool call: MCP finished", "Tool: "+toolDisplayName(item.Server, item.Tool), "Status: "+item.Status) return SummaryDetailsRawHTMLLimited(summary, renderCodexItemDetailsHTML(item), TelegramHTMLMessageLimit) case "dynamicToolCall": status := item.Status if item.Success != nil { status = fmt.Sprintf("success=%t", *item.Success) } summary := joinNonEmpty("Tool call: finished", "Tool: "+toolDisplayName(item.Namespace, item.Tool), "Status: "+status) return SummaryDetailsRawHTMLLimited(summary, renderCodexItemDetailsHTML(item), TelegramHTMLMessageLimit) case "webSearch": return joinNonEmpty("Tool call: web search finished", "Query: "+item.Query) case "imageView": return joinNonEmpty("Tool call: image view finished", "Path: "+item.Path) case "imageGeneration": return joinNonEmpty("Tool call: image generation finished", "Status: "+item.Status, "Saved path: "+item.SavedPath) case "collabAgentToolCall": return joinNonEmpty("Tool call: agent finished", "Tool: "+item.Tool, "Status: "+item.Status) default: return "" } } func commandStartedDetailsHTML(item codexThreadItemView) string { var parts []string if command := strings.TrimSpace(item.Command); command != "" { parts = append(parts, "Command", CodeBlockHTML("bash", command)) } if cwd := strings.TrimSpace(item.CWD); cwd != "" { parts = append(parts, FieldHTML("CWD", cwd)) } return strings.Join(parts, "\n") } func renderCodexItemDetailsHTML(item codexThreadItemView) string { var parts []string appendField := func(label, value string) { if html := FieldHTML(label, value); html != "" { parts = append(parts, html) } } appendInt := func(label string, value *int) { if value != nil { appendField(label, strconv.Itoa(*value)) } } appendInt64 := func(label string, value *int64) { if value != nil { appendField(label, strconv.FormatInt(*value, 10)) } } appendBool := func(label string, value *bool) { if value != nil { appendField(label, strconv.FormatBool(*value)) } } switch item.Type { case "commandExecution": appendField("CWD", item.CWD) if command := strings.TrimSpace(item.Command); command != "" { parts = append(parts, "Command", CodeBlockHTML("bash", command)) } appendInt("Exit code", item.ExitCode) appendInt64("Duration ms", item.DurationMs) if item.AggregatedOutput != nil && strings.TrimSpace(*item.AggregatedOutput) != "" { parts = append(parts, "Output", CodeBlockHTML("text", *item.AggregatedOutput)) } case "fileChange": appendField("Status", item.Status) for _, change := range item.Changes { appendField("Changed", change.Path) } case "mcpToolCall": appendField("Tool", toolDisplayName(item.Server, item.Tool)) appendField("Status", item.Status) parts = append(parts, renderArgumentsDetailsHTML(item.Arguments)...) case "dynamicToolCall": appendField("Tool", toolDisplayName(item.Namespace, item.Tool)) appendField("Status", item.Status) appendBool("Success", item.Success) parts = append(parts, renderArgumentsDetailsHTML(item.Arguments)...) case "webSearch": appendField("Query", item.Query) case "imageView": appendField("Path", item.Path) case "imageGeneration": appendField("Status", item.Status) appendField("Saved path", item.SavedPath) case "collabAgentToolCall": appendField("Tool", item.Tool) appendField("Status", item.Status) default: appendField("Type", item.Type) appendField("Status", item.Status) parts = append(parts, renderArgumentsDetailsHTML(item.Arguments)...) } return strings.Join(nonEmptyHTML(parts), "\n") } func renderArgumentsDetailsHTML(raw json.RawMessage) []string { if len(raw) == 0 || string(raw) == "null" { return nil } var value any if err := json.Unmarshal(raw, &value); err != nil { return []string{"Arguments", CodeBlockHTML("json", string(raw))} } object, ok := value.(map[string]any) if !ok { return []string{"Arguments", CodeBlockHTML("json", compactPrettyJSON(value))} } var parts []string for _, key := range preferredArgumentKeys(object) { part := renderArgumentFieldHTML(key, object[key]) if strings.TrimSpace(part) != "" { parts = append(parts, part) } } if len(parts) == 0 && len(object) > 0 { parts = append(parts, "Arguments", CodeBlockHTML("json", compactPrettyJSON(object))) } return parts } func preferredArgumentKeys(object map[string]any) []string { preferred := []string{"cmd", "command", "script", "code", "content", "input", "query", "path", "file", "filename", "cwd", "args", "config", "patch"} seen := make(map[string]bool, len(object)) var keys []string for _, key := range preferred { if _, ok := object[key]; ok { keys = append(keys, key) seen[key] = true } } for key := range object { if !seen[key] && isMeaningfulArgumentKey(key) { keys = append(keys, key) } } return keys } func isMeaningfulArgumentKey(key string) bool { key = strings.ToLower(key) for _, part := range []string{"command", "cmd", "code", "content", "path", "file", "query", "input", "config", "patch", "cwd"} { if strings.Contains(key, part) { return true } } return false } func renderArgumentFieldHTML(key string, value any) string { label := argumentLabel(key) text, complex := argumentValueText(value) if strings.TrimSpace(text) == "" { return "" } if complex || shouldUseCodeBlock(key, text) { return "" + EscapeHTML(label) + "\n" + CodeBlockHTML(languageForArgument(key, text), text) } return FieldHTML(label, text) } func argumentLabel(key string) string { key = strings.TrimSpace(key) if key == "" { return "Argument" } switch strings.ToLower(key) { case "cwd": return "CWD" case "cmd": return "cmd" } label := strings.ReplaceAll(key, "_", " ") return strings.ToUpper(label[:1]) + label[1:] } func argumentValueText(value any) (string, bool) { switch v := value.(type) { case string: return v, false case float64: return strconv.FormatFloat(v, 'f', -1, 64), false case bool: return strconv.FormatBool(v), false case nil: return "", false default: return compactPrettyJSON(v), true } } func shouldUseCodeBlock(key, text string) bool { key = strings.ToLower(key) if strings.Contains(text, "\n") { return true } for _, marker := range []string{"command", "cmd", "script", "code", "content", "config", "patch"} { if strings.Contains(key, marker) { return true } } return false } func languageForArgument(key, text string) string { key = strings.ToLower(key) switch { case strings.Contains(key, "command") || strings.Contains(key, "cmd") || strings.Contains(key, "script"): return "bash" case strings.Contains(key, "config") || looksLikeJSON(text): return "json" case strings.Contains(key, "patch"): return "diff" case strings.Contains(key, "code") || strings.Contains(key, "content"): return languageForContent(text) default: return "text" } } func languageForContent(text string) string { trimmed := strings.TrimSpace(text) switch { case looksLikeJSON(trimmed): return "json" case strings.HasPrefix(trimmed, "package ") || strings.Contains(trimmed, "func "): return "go" case strings.Contains(trimmed, "#!/bin/sh") || strings.Contains(trimmed, "#!/usr/bin/env bash"): return "bash" case strings.Contains(trimmed, "apiVersion:") || strings.Contains(trimmed, "kind:"): return "yaml" default: return "text" } } func looksLikeJSON(text string) bool { text = strings.TrimSpace(text) return (strings.HasPrefix(text, "{") && strings.HasSuffix(text, "}")) || (strings.HasPrefix(text, "[") && strings.HasSuffix(text, "]")) } func compactPrettyJSON(value any) string { data, err := json.MarshalIndent(value, "", " ") if err != nil { return fmt.Sprint(value) } return string(data) } func nonEmptyHTML(parts []string) []string { out := make([]string, 0, len(parts)) for _, part := range parts { if strings.TrimSpace(part) != "" { out = append(out, part) } } return out } func joinNonEmpty(lines ...string) string { out := make([]string, 0, len(lines)) for _, line := range lines { if strings.TrimSpace(line) != "" { out = append(out, line) } } return strings.Join(out, "\n") } func toolDisplayName(namespace, tool string) string { if namespace == "" { return tool } if tool == "" { return namespace } return namespace + "." + tool } func truncateForStatus(text string) string { runes := []rune(text) if len(runes) <= 1200 { return text } return string(runes[:1200]) + "\n..." } func (b *Bot) handleCodexNotification(ctx context.Context, event codexapp.Event) error { switch event.Method { case "error": var params struct { ThreadID string `json:"threadId"` TurnID string `json:"turnId"` WillRetry bool `json:"willRetry"` Error struct { Message string `json:"message"` AdditionalDetails string `json:"additionalDetails"` } `json:"error"` } if err := json.Unmarshal(event.Params, ¶ms); err != nil { return err } if params.ThreadID != "" && !params.WillRetry { if thread, err := b.store.GetThreadByCodexID(ctx, params.ThreadID); err == nil { _ = b.store.SetActiveTurn(ctx, thread.TelegramUserID, "") } message := "Codex error" if params.Error.Message != "" { message += ": " + params.Error.Message } if params.Error.AdditionalDetails != "" { message += "\n" + params.Error.AdditionalDetails } if err := b.flushAssistantMessage(ctx, params.ThreadID); err != nil { return err } if err := b.sendOutputBlock(ctx, params.ThreadID, message); err != nil { return err } b.clearOutput(params.ThreadID) return nil } case "item/started": var params struct { ThreadID string `json:"threadId"` Item json.RawMessage `json:"item"` } if err := json.Unmarshal(event.Params, ¶ms); err != nil { return err } item, err := parseCodexThreadItem(params.Item) if err != nil { return err } if params.ThreadID != "" && item.Type == "agentMessage" && b.hasAssistantText(params.ThreadID) { return b.flushAssistantMessage(ctx, params.ThreadID) } if params.ThreadID != "" { if b.shouldSuppressPictureToolMessage(params.ThreadID, item) { return nil } return b.upsertToolMessage(ctx, params.ThreadID, item.ID, renderCodexItemStarted(item)) } case "item/agentMessage/delta": var params struct { ThreadID string `json:"threadId"` Delta string `json:"delta"` } if err := json.Unmarshal(event.Params, ¶ms); err != nil { return err } if params.ThreadID != "" && params.Delta != "" { return b.appendAssistantDelta(ctx, params.ThreadID, params.Delta) } case "item/completed": var params struct { ThreadID string `json:"threadId"` Item json.RawMessage `json:"item"` } if err := json.Unmarshal(event.Params, ¶ms); err != nil { return err } item, err := parseCodexThreadItem(params.Item) if err != nil { return err } if params.ThreadID != "" && item.Type == "agentMessage" { if item.Text != "" && !b.hasAssistantText(params.ThreadID) { if err := b.appendAssistantDelta(ctx, params.ThreadID, item.Text); err != nil { return err } } return b.flushAssistantMessage(ctx, params.ThreadID) } if params.ThreadID != "" { if b.queuePictureImageOutput(params.ThreadID, item) { return nil } if err := b.upsertToolMessage(ctx, params.ThreadID, item.ID, renderCodexItemCompleted(item)); err != nil { return err } return b.sendImageOutput(ctx, params.ThreadID, item) } case "turn/diff/updated": var params struct { ThreadID string `json:"threadId"` Diff string `json:"diff"` } if err := json.Unmarshal(event.Params, ¶ms); err != nil { return err } if params.ThreadID != "" { b.mu.Lock() b.diffs[params.ThreadID] = params.Diff b.mu.Unlock() } case "turn/completed": var params struct { ThreadID string `json:"threadId"` Turn struct { ID string `json:"id"` Status string `json:"status"` } `json:"turn"` } if err := json.Unmarshal(event.Params, ¶ms); err != nil { return err } if params.ThreadID != "" { if thread, err := b.store.GetThreadByCodexID(ctx, params.ThreadID); err == nil { _ = b.store.SetActiveTurn(ctx, thread.TelegramUserID, "") _ = b.store.TouchThread(ctx, params.ThreadID) } return b.completeTurnOutput(ctx, params.ThreadID) } case "thread/name/updated": var params struct { ThreadID string `json:"threadId"` ThreadName *string `json:"threadName"` } if err := json.Unmarshal(event.Params, ¶ms); err != nil { return err } if params.ThreadID != "" { title := "" if params.ThreadName != nil { title = normalizeThreadTitle(*params.ThreadName) } return b.store.SyncThreadTitleByCodexID(ctx, params.ThreadID, title) } case "serverRequest/resolved": var params struct { ThreadID string `json:"threadId"` RequestID string `json:"requestId"` } _ = json.Unmarshal(event.Params, ¶ms) } return nil } func (b *Bot) handleCodexServerRequest(ctx context.Context, event codexapp.Event) error { if event.ID == nil { return nil } switch event.Method { case "item/commandExecution/requestApproval", "item/fileChange/requestApproval", "item/permissions/requestApproval": default: b.logger.Printf("unhandled server request: %s", event.Method) return nil } var params struct { ThreadID string `json:"threadId"` TurnID string `json:"turnId"` ItemID string `json:"itemId"` Reason string `json:"reason"` } if err := json.Unmarshal(event.Params, ¶ms); err != nil { return err } if params.ThreadID == "" { return errors.New("approval request missing threadId") } thread, err := b.store.GetThreadByCodexID(ctx, params.ThreadID) if err != nil { return err } pretty, _ := json.MarshalIndent(json.RawMessage(event.Params), "", " ") if len(pretty) == 0 { pretty = event.Params } kind := event.Method approval, err := b.store.UpsertPendingApproval(ctx, store.PendingApproval{ TelegramUserID: thread.TelegramUserID, CodexRequestID: strconv.FormatInt(*event.ID, 10), CodexThreadID: params.ThreadID, TurnID: params.TurnID, ItemID: params.ItemID, Kind: kind, PayloadJSON: string(pretty), }) if err != nil { return err } if approval.Status != "pending" { return nil } text := renderApprovalHTML(kind, event.Params, "") markup := approvalMarkup(approval.ID) if msg, ok, err := b.attachApprovalToToolMessage(ctx, params.ThreadID, params.ItemID, text, markup); err != nil { return err } else if ok { return b.store.UpdatePendingApprovalMessage(ctx, approval.ID, msg.Chat.ID, msg.MessageID) } msg, err := b.tg.SendMessage(ctx, thread.TelegramUserID, text, SendMessageOptions{ ParseMode: "HTML", ReplyMarkup: markup, }) if err != nil { return err } return b.store.UpdatePendingApprovalMessage(ctx, approval.ID, msg.Chat.ID, msg.MessageID) } func (b *Bot) newOutputState(chatID int64) *outputState { return &outputState{ chatID: chatID, tools: make(map[string]toolMessageState), sentImages: make(map[string]bool), workingIndicatorOff: b.startWorkingIndicator(chatID), } } func (b *Bot) registerOutput(threadID string, chatID int64) { b.mu.Lock() defer b.mu.Unlock() if state := b.outputs[threadID]; state != nil && state.workingIndicatorOff != nil { state.workingIndicatorOff() } b.outputs[threadID] = b.newOutputState(chatID) } func (b *Bot) registerPictureOutput(threadID string, chatID int64) { b.registerOutput(threadID, chatID) b.mu.Lock() defer b.mu.Unlock() if state := b.outputs[threadID]; state != nil { state.pictureRequest = true } } func (b *Bot) clearOutput(threadID string) { b.mu.Lock() state := b.outputs[threadID] delete(b.outputs, threadID) b.mu.Unlock() if state != nil && state.workingIndicatorOff != nil { state.workingIndicatorOff() } } func (b *Bot) startWorkingIndicator(chatID int64) context.CancelFunc { ctx, cancel := context.WithCancel(context.Background()) draftID := time.Now().UnixNano() go func() { useDraft := true sendDraft := func() bool { return b.tg.SendMessageDraft(ctx, chatID, draftID, "") == nil } sendTyping := func() { if err := b.tg.SendChatAction(ctx, chatID, "typing"); err != nil && ctx.Err() == nil { b.logger.Printf("send typing action: %v", err) } } if !sendDraft() { useDraft = false sendTyping() } draftTicker := time.NewTicker(25 * time.Second) typingTicker := time.NewTicker(4 * time.Second) defer draftTicker.Stop() defer typingTicker.Stop() for { select { case <-ctx.Done(): return case <-draftTicker.C: if useDraft && !sendDraft() { useDraft = false sendTyping() } case <-typingTicker.C: if !useDraft { sendTyping() } } } }() return cancel } func (b *Bot) hasAssistantText(threadID string) bool { b.mu.Lock() defer b.mu.Unlock() state := b.outputs[threadID] return state != nil && state.assistant.Len() > 0 } func (b *Bot) failActiveOutputs(ctx context.Context, message string) { b.mu.Lock() threadIDs := make([]string, 0, len(b.outputs)) for threadID := range b.outputs { threadIDs = append(threadIDs, threadID) } b.mu.Unlock() for _, threadID := range threadIDs { if thread, err := b.store.GetThreadByCodexID(ctx, threadID); err == nil { _ = b.store.SetActiveTurn(ctx, thread.TelegramUserID, "") } if err := b.flushAssistantMessage(ctx, threadID); err != nil { b.logger.Printf("flush failed output %s: %v", threadID, err) } if err := b.sendOutputBlock(ctx, threadID, message); err != nil { b.logger.Printf("send failed output %s: %v", threadID, err) } b.clearOutput(threadID) } } func (b *Bot) shouldSuppressPictureToolMessage(threadID string, item codexThreadItemView) bool { b.mu.Lock() defer b.mu.Unlock() state := b.outputs[threadID] return state != nil && state.pictureRequest && item.Type == "imageGeneration" } func (b *Bot) queuePictureImageOutput(threadID string, item codexThreadItemView) bool { if item.Type != "imageGeneration" { return false } b.mu.Lock() defer b.mu.Unlock() state := b.outputs[threadID] if state == nil || !state.pictureRequest { return false } path := strings.TrimSpace(item.SavedPath) if path == "" { return true } if state.sentImages == nil { state.sentImages = make(map[string]bool) } if state.sentImages[path] { return true } state.sentImages[path] = true state.generatedImages = append(state.generatedImages, generatedImageOutput{Path: path}) return true } func (b *Bot) sendImageOutput(ctx context.Context, threadID string, item codexThreadItemView) error { if item.Type != "imageGeneration" || strings.TrimSpace(item.SavedPath) == "" { return nil } path := strings.TrimSpace(item.SavedPath) if !b.markImageOutputPending(threadID, path) { return nil } data, err := os.ReadFile(path) if err != nil { b.logger.Printf("read generated image %s: %v", path, err) return nil } chatID, err := b.outputChatID(ctx, threadID) if err != nil { return nil } caption := "Generated image" if item.Status != "" { caption += ": " + item.Status } if _, err := b.tg.SendPhotoBytes(ctx, chatID, path, data, caption); err != nil { b.logger.Printf("send generated image %s: %v", path, err) return nil } b.markOutputSent(threadID) return nil } func (b *Bot) markImageOutputPending(threadID, path string) bool { b.mu.Lock() defer b.mu.Unlock() state := b.outputs[threadID] if state == nil { return false } if state.sentImages == nil { state.sentImages = make(map[string]bool) } if state.sentImages[path] { return false } state.sentImages[path] = true return true } func (b *Bot) sendOutputBlock(ctx context.Context, threadID, block string) error { block = strings.TrimSpace(block) if block == "" { return nil } if err := b.flushAssistantMessage(ctx, threadID); err != nil { return err } chatID, err := b.outputChatID(ctx, threadID) if err != nil { return nil } if err := b.sendLong(ctx, chatID, block); err != nil { return err } b.markOutputSent(threadID) return nil } func (b *Bot) sendOutputHTMLBlock(ctx context.Context, threadID, htmlText string) error { htmlText = strings.TrimSpace(htmlText) if htmlText == "" { return nil } if err := b.flushAssistantMessage(ctx, threadID); err != nil { return err } chatID, err := b.outputChatID(ctx, threadID) if err != nil { return nil } if err := b.sendHTML(ctx, chatID, htmlText); err != nil { return err } b.markOutputSent(threadID) return nil } func (s toolMessageState) html() string { return FitHTMLMessage(addEditedAtLine(combineToolApprovalHTML(s.toolHTML, s.approvalHTML), s.editedAt), TelegramHTMLMessageLimit) } func combineToolApprovalHTML(toolHTML, approvalHTML string) string { toolHTML = strings.TrimSpace(toolHTML) approvalHTML = strings.TrimSpace(approvalHTML) switch { case toolHTML == "": return approvalHTML case approvalHTML == "": return toolHTML default: return toolHTML + "\n\n" + approvalHTML } } func addEditedAtLine(htmlText, editedAt string) string { htmlText = strings.TrimSpace(htmlText) if htmlText == "" || editedAt == "" { return htmlText } line := EscapeHTML("Edited at: " + editedAt) quoteIndex := strings.Index(htmlText, "
") if quoteIndex < 0 { return htmlText + "\n" + line } summary := strings.TrimRight(htmlText[:quoteIndex], "\n") details := strings.TrimLeft(htmlText[quoteIndex:], "\n") if summary == "" { return line + "\n" + details } return summary + "\n" + line + "\n" + details } func editedAtTimestamp() string { return time.Now().UTC().Format("2006-01-02 15:04:05 MST") } func (b *Bot) upsertToolMessage(ctx context.Context, threadID, itemID, htmlText string) error { htmlText = strings.TrimSpace(htmlText) if htmlText == "" { return nil } if itemID == "" { return b.sendOutputHTMLBlock(ctx, threadID, htmlText) } if err := b.flushAssistantMessage(ctx, threadID); err != nil { return err } chatID, err := b.outputChatID(ctx, threadID) if err != nil { return nil } b.mu.Lock() state := b.outputs[threadID] if state != nil && state.tools == nil { state.tools = make(map[string]toolMessageState) } if state != nil { tool, ok := state.tools[itemID] if ok && tool.messageID != 0 { tool.toolHTML = htmlText tool.editedAt = editedAtTimestamp() state.tools[itemID] = tool combined := tool.html() msgChatID := tool.chatID msgID := tool.messageID markup := tool.approvalMarkup b.mu.Unlock() _, err := b.tg.EditMessageText(ctx, msgChatID, msgID, combined, EditMessageTextOptions{ParseMode: "HTML", ReplyMarkup: editReplyMarkup(markup)}) if err := ignoreTelegramMessageNotModified(err); err != nil { return err } b.markOutputSent(threadID) return nil } } b.mu.Unlock() msg, err := b.sendHTMLMessage(ctx, chatID, htmlText, nil) if err != nil { return err } b.mu.Lock() state = b.outputs[threadID] if state != nil { if state.tools == nil { state.tools = make(map[string]toolMessageState) } state.tools[itemID] = toolMessageState{chatID: msg.Chat.ID, messageID: msg.MessageID, toolHTML: htmlText} } b.mu.Unlock() b.markOutputSent(threadID) return nil } func (b *Bot) attachApprovalToToolMessage(ctx context.Context, threadID, itemID, approvalHTML string, markup *InlineKeyboardMarkup) (Message, bool, error) { approvalHTML = strings.TrimSpace(approvalHTML) if threadID == "" || itemID == "" || approvalHTML == "" { return Message{}, false, nil } if err := b.flushAssistantMessage(ctx, threadID); err != nil { return Message{}, false, err } b.mu.Lock() state := b.outputs[threadID] if state == nil { b.mu.Unlock() return Message{}, false, nil } tool, ok := state.tools[itemID] if !ok || tool.messageID == 0 { b.mu.Unlock() return Message{}, false, nil } tool.approvalHTML = approvalHTML tool.approvalMarkup = markup tool.editedAt = editedAtTimestamp() state.tools[itemID] = tool combined := tool.html() msg := Message{MessageID: tool.messageID, Chat: Chat{ID: tool.chatID}} b.mu.Unlock() _, err := b.tg.EditMessageText(ctx, msg.Chat.ID, msg.MessageID, combined, EditMessageTextOptions{ParseMode: "HTML", ReplyMarkup: editReplyMarkup(markup)}) if err := ignoreTelegramMessageNotModified(err); err != nil { b.clearToolApproval(threadID, itemID) b.logger.Printf("edit tool approval message %s/%s: %v", threadID, itemID, err) return Message{}, false, nil } b.markOutputSent(threadID) return msg, true, nil } func (b *Bot) clearToolApproval(threadID, itemID string) { b.mu.Lock() defer b.mu.Unlock() if state := b.outputs[threadID]; state != nil { if tool, ok := state.tools[itemID]; ok { tool.approvalHTML = "" tool.approvalMarkup = nil state.tools[itemID] = tool } } } func (b *Bot) resolveApprovalMessageHTML(approval store.PendingApproval, decision string) string { approvalHTML := renderApprovalHTML(approval.Kind, json.RawMessage(approval.PayloadJSON), approvalStatusLine(decision)) if approval.ItemID == "" { return approvalHTML } b.mu.Lock() defer b.mu.Unlock() state := b.outputs[approval.CodexThreadID] if state == nil { return approvalHTML } tool, ok := state.tools[approval.ItemID] if !ok || tool.messageID == 0 || tool.messageID != approval.MessageID { return approvalHTML } tool.approvalHTML = approvalHTML tool.approvalMarkup = nil tool.editedAt = editedAtTimestamp() state.tools[approval.ItemID] = tool return tool.html() } func ignoreTelegramMessageNotModified(err error) error { if err != nil && strings.Contains(err.Error(), "message is not modified") { return nil } return err } func splitAssistantMessageSegments(text string) []assistantMessageSegment { var segments []assistantMessageSegment var visible strings.Builder flushVisible := func() { if visible.Len() == 0 { return } segments = append(segments, assistantMessageSegment{Text: visible.String()}) visible.Reset() } for _, line := range strings.SplitAfter(text, "\n") { body := strings.TrimSuffix(line, "\n") body = strings.TrimSuffix(body, "\r") if directive, ok := parseAssistantPhotoDirectiveLine(body); ok { flushVisible() segments = append(segments, assistantMessageSegment{Photo: &directive}) continue } if directive, ok := parseAssistantThreadRenameDirectiveLine(body); ok { flushVisible() segments = append(segments, assistantMessageSegment{ThreadRename: &directive}) continue } if directive, ok := parseAssistantThreadCWDDirectiveLine(body); ok { flushVisible() segments = append(segments, assistantMessageSegment{ThreadCWD: &directive}) continue } visible.WriteString(line) } flushVisible() return segments } func parseAssistantThreadRenameDirectiveLine(line string) (assistantThreadRenameDirective, bool) { trimmed := strings.TrimSpace(line) if !strings.HasPrefix(trimmed, telegramThreadRenameDirectiveStart) || !strings.HasSuffix(trimmed, telegramDirectiveEnd) { return assistantThreadRenameDirective{}, false } raw := strings.TrimSuffix(strings.TrimPrefix(trimmed, telegramThreadRenameDirectiveStart), telegramDirectiveEnd) raw = strings.TrimSpace(raw) var directive assistantThreadRenameDirective if err := json.Unmarshal([]byte(raw), &directive); err != nil { return assistantThreadRenameDirective{}, false } directive.Title = normalizeThreadTitle(directive.Title) return directive, true } func parseAssistantThreadCWDDirectiveLine(line string) (assistantThreadCWDDirective, bool) { trimmed := strings.TrimSpace(line) if !strings.HasPrefix(trimmed, telegramThreadCWDDirectiveStart) || !strings.HasSuffix(trimmed, telegramDirectiveEnd) { return assistantThreadCWDDirective{}, false } raw := strings.TrimSuffix(strings.TrimPrefix(trimmed, telegramThreadCWDDirectiveStart), telegramDirectiveEnd) raw = strings.TrimSpace(raw) var directive assistantThreadCWDDirective if err := json.Unmarshal([]byte(raw), &directive); err != nil { return assistantThreadCWDDirective{}, false } directive.CWD = strings.TrimSpace(directive.CWD) return directive, true } func parseAssistantPhotoDirectiveLine(line string) (assistantPhotoDirective, bool) { trimmed := strings.TrimSpace(line) if !strings.HasPrefix(trimmed, telegramPhotoDirectiveStart) || !strings.HasSuffix(trimmed, telegramDirectiveEnd) { return assistantPhotoDirective{}, false } raw := strings.TrimSuffix(strings.TrimPrefix(trimmed, telegramPhotoDirectiveStart), telegramDirectiveEnd) raw = strings.TrimSpace(raw) var directive assistantPhotoDirective if err := json.Unmarshal([]byte(raw), &directive); err != nil { return assistantPhotoDirective{}, false } directive.Path = strings.TrimSpace(directive.Path) directive.Caption = strings.TrimSpace(directive.Caption) return directive, true } func (b *Bot) sendAssistantText(ctx context.Context, threadID string, chatID int64, text string) error { for _, segment := range splitAssistantMessageSegments(text) { if segment.Text != "" && strings.TrimSpace(segment.Text) != "" { if err := b.sendLong(ctx, chatID, segment.Text); err != nil { return err } } if segment.Photo != nil { if err := b.sendAssistantPhoto(ctx, chatID, *segment.Photo); err != nil { b.logger.Printf("send assistant photo: %v", err) if sendErr := b.sendLong(ctx, chatID, "Could not send photo: "+err.Error()); sendErr != nil { return sendErr } } } if segment.ThreadRename != nil { if err := b.applyAssistantThreadRename(ctx, threadID, *segment.ThreadRename); err != nil { b.logger.Printf("apply assistant thread rename: %v", err) if sendErr := b.sendLong(ctx, chatID, "Could not rename thread: "+err.Error()); sendErr != nil { return sendErr } } } if segment.ThreadCWD != nil { if err := b.applyAssistantThreadCWD(ctx, threadID, *segment.ThreadCWD); err != nil { b.logger.Printf("apply assistant thread cwd: %v", err) if sendErr := b.sendLong(ctx, chatID, "Could not change thread cwd: "+err.Error()); sendErr != nil { return sendErr } } } } return nil } func (b *Bot) applyAssistantThreadRename(ctx context.Context, threadID string, directive assistantThreadRenameDirective) error { title := normalizeThreadTitle(directive.Title) if title == "" { return errors.New("thread title cannot be empty") } if err := b.codex.SetThreadName(ctx, threadID, title); err != nil { return err } return b.store.SyncThreadTitleByCodexID(ctx, threadID, title) } func (b *Bot) applyAssistantThreadCWD(ctx context.Context, threadID string, directive assistantThreadCWDDirective) error { cwd, err := store.ValidateWorkspacePath(directive.CWD) if err != nil { return err } if _, err := codexstate.SetThreadCWD(ctx, b.codexHome, b.codexStateDB, threadID, cwd); err != nil { return err } workspace, ok, err := b.workspaceForCodexCWD(ctx, cwd) if err != nil { return err } if !ok { return nil } thread, err := b.store.GetThreadByCodexID(ctx, threadID) if err != nil { if errors.Is(err, sql.ErrNoRows) { return nil } return err } if err := b.store.SyncThreadWorkspace(ctx, thread.TelegramUserID, thread.ID, workspace.ID); err != nil { return err } return b.store.SetSessionWorkspace(ctx, thread.TelegramUserID, workspace.ID) } func (b *Bot) sendAssistantPhoto(ctx context.Context, chatID int64, directive assistantPhotoDirective) error { path := strings.TrimSpace(directive.Path) if path == "" { return errors.New("photo directive is missing a path") } if !filepath.IsAbs(path) { return fmt.Errorf("photo path must be absolute: %s", path) } if !isPicturePath(path) { return fmt.Errorf("unsupported photo type: %s", filepath.Base(path)) } data, err := os.ReadFile(path) if err != nil { return fmt.Errorf("read %s: %v", filepath.Base(path), err) } caption := truncateTelegramPhotoCaption(directive.Caption) if _, err := b.tg.SendPhotoBytes(ctx, chatID, path, data, caption); err != nil { return fmt.Errorf("send %s: %v", filepath.Base(path), err) } return nil } func truncateTelegramPhotoCaption(caption string) string { runes := []rune(caption) if len(runes) <= telegramPhotoCaptionLimit { return caption } if telegramPhotoCaptionLimit <= 3 { return string(runes[:telegramPhotoCaptionLimit]) } return string(runes[:telegramPhotoCaptionLimit-3]) + "..." } func (b *Bot) appendAssistantDelta(ctx context.Context, threadID, delta string) error { if delta == "" { return nil } if _, err := b.outputChatID(ctx, threadID); err != nil { return nil } b.mu.Lock() state := b.outputs[threadID] if state != nil { _, _ = state.assistant.WriteString(delta) } b.mu.Unlock() return nil } func (b *Bot) flushAssistantMessage(ctx context.Context, threadID string) error { b.mu.Lock() state := b.outputs[threadID] if state == nil || state.assistant.Len() == 0 { b.mu.Unlock() return nil } chatID := state.chatID text := state.assistant.String() pictureRequest := state.pictureRequest state.assistant.Reset() b.mu.Unlock() if pictureRequest { return nil } if err := b.sendAssistantText(ctx, threadID, chatID, text); err != nil { return err } b.markOutputSent(threadID) return nil } func (b *Bot) completeTurnOutput(ctx context.Context, threadID string) error { if err := b.flushAssistantMessage(ctx, threadID); err != nil { return err } b.mu.Lock() state := b.outputs[threadID] if state == nil { b.mu.Unlock() return nil } chatID := state.chatID sentAny := state.sentAny pictureRequest := state.pictureRequest generatedImages := append([]generatedImageOutput(nil), state.generatedImages...) workingIndicatorOff := state.workingIndicatorOff delete(b.outputs, threadID) b.mu.Unlock() if workingIndicatorOff != nil { workingIndicatorOff() } if pictureRequest { if len(generatedImages) == 0 { _, err := b.tg.SendMessage(ctx, chatID, "No image was generated.", SendMessageOptions{}) return err } return b.sendGeneratedImageOutputs(ctx, chatID, generatedImages) } if !sentAny { _, err := b.tg.SendMessage(ctx, chatID, "Done.", SendMessageOptions{}) return err } return nil } func (b *Bot) sendGeneratedImageOutputs(ctx context.Context, chatID int64, images []generatedImageOutput) error { uploads := make([]PhotoUpload, 0, len(images)) for _, image := range images { path := strings.TrimSpace(image.Path) if path == "" { continue } data, err := os.ReadFile(path) if err != nil { b.logger.Printf("read generated image %s: %v", path, err) continue } uploads = append(uploads, PhotoUpload{Filename: path, Data: data}) } if len(uploads) == 0 { _, err := b.tg.SendMessage(ctx, chatID, "Generated image file was not readable by the bot.", SendMessageOptions{}) return err } for len(uploads) > 0 { count := len(uploads) if count > pictureMediaGroupLimit { count = pictureMediaGroupLimit } if _, err := b.tg.SendPhotoGroupBytes(ctx, chatID, uploads[:count]); err != nil { return err } uploads = uploads[count:] } return nil } func (b *Bot) outputChatID(ctx context.Context, threadID string) (int64, error) { b.mu.Lock() state := b.outputs[threadID] if state != nil { chatID := state.chatID b.mu.Unlock() return chatID, nil } b.mu.Unlock() thread, err := b.store.GetThreadByCodexID(ctx, threadID) if err != nil { return 0, err } b.registerOutput(threadID, thread.TelegramUserID) return thread.TelegramUserID, nil } func (b *Bot) markOutputSent(threadID string) { b.mu.Lock() defer b.mu.Unlock() if state := b.outputs[threadID]; state != nil { state.sentAny = true } } func (b *Bot) sendLong(ctx context.Context, chatID int64, text string) error { for _, chunk := range ChunkText(text, TelegramHTMLMessageLimit) { if err := b.sendHTML(ctx, chatID, EscapeHTML(chunk)); err != nil { return err } } return nil } func (b *Bot) sendHTML(ctx context.Context, chatID int64, htmlText string) error { _, err := b.sendHTMLMessage(ctx, chatID, htmlText, nil) return err } func (b *Bot) sendHTMLMessage(ctx context.Context, chatID int64, htmlText string, markup *InlineKeyboardMarkup) (Message, error) { return b.tg.SendMessage(ctx, chatID, htmlText, SendMessageOptions{ParseMode: "HTML", ReplyMarkup: markup}) } func (b *Bot) sendError(ctx context.Context, chatID int64, prefix string, err error) error { _, sendErr := b.tg.SendMessage(ctx, chatID, EscapeHTML(prefix+": "+err.Error()), SendMessageOptions{ParseMode: "HTML"}) return sendErr } func (b *Bot) sendWorkspaceMissing(ctx context.Context, chatID int64) error { _, err := b.tg.SendMessage(ctx, chatID, "No workspace configured. Ask an admin to run scripts/add-workspace, then use /workspace.", SendMessageOptions{}) return err } func (b *Bot) sendNoActiveThread(ctx context.Context, chatID int64, err error) error { if errors.Is(err, sql.ErrNoRows) { _, sendErr := b.tg.SendMessage(ctx, chatID, "No active thread. Use /new.", SendMessageOptions{}) return sendErr } return err } func parseCommand(text string) (string, []string, bool) { text = strings.TrimSpace(text) if !strings.HasPrefix(text, "/") { return "", nil, false } fields := strings.Fields(text) if len(fields) == 0 { return "", nil, false } name := strings.TrimPrefix(fields[0], "/") if before, _, ok := strings.Cut(name, "@"); ok { name = before } return strings.ToLower(name), fields[1:], true } func resumeThreadListText(threads []store.Thread, page int) string { lines := []string{fmt.Sprintf("Resume a thread (page %d):", page+1), ""} for _, thread := range threads { lines = append(lines, fmt.Sprintf("Thread ID %d: %s", thread.ID, threadDisplayTitle(thread))) } lines = append(lines, "", "Choose a button below, or use /resume THREAD_ID directly.") return strings.Join(lines, "\n") } func resumeThreadMarkup(threads []store.Thread, page int, hasNext bool) *InlineKeyboardMarkup { keyboard := make([][]InlineKeyboardButton, 0, 4) for _, thread := range threads { button := InlineKeyboardButton{ Text: fmt.Sprintf("ID %d", thread.ID), CallbackData: ResumeThreadCallbackData(thread.ID), } if len(keyboard) == 0 || len(keyboard[len(keyboard)-1]) >= 4 { keyboard = append(keyboard, []InlineKeyboardButton{button}) continue } keyboard[len(keyboard)-1] = append(keyboard[len(keyboard)-1], button) } var nav []InlineKeyboardButton if page > 0 { nav = append(nav, InlineKeyboardButton{Text: "Prev", CallbackData: ResumePageCallbackData(page - 1)}) } if hasNext { nav = append(nav, InlineKeyboardButton{Text: "Next", CallbackData: ResumePageCallbackData(page + 1)}) } if len(nav) > 0 { keyboard = append(keyboard, nav) } return &InlineKeyboardMarkup{InlineKeyboard: keyboard} } func normalizeThreadTitle(title string) string { title = strings.Join(strings.Fields(title), " ") runes := []rune(title) if len(runes) > 80 { title = string(runes[:80]) } return title } func threadDisplayTitle(thread store.Thread) string { title := strings.Join(strings.Fields(thread.Title), " ") if title == "" { title = thread.CodexThreadID } runes := []rune(title) if len(runes) > 90 { title = string(runes[:90]) + "..." } return title } func workspaceMarkup(workspaces []store.Workspace) *InlineKeyboardMarkup { keyboard := make([][]InlineKeyboardButton, 0, len(workspaces)) for _, ws := range workspaces { text := ws.Label if ws.IsDefault { text += " default" } keyboard = append(keyboard, []InlineKeyboardButton{{ Text: text, CallbackData: WorkspaceCallbackData(ws.ID), }}) } return &InlineKeyboardMarkup{InlineKeyboard: keyboard} } func modelMarkup(models []codexapp.Model) *InlineKeyboardMarkup { keyboard := make([][]InlineKeyboardButton, 0, len(models)) for _, model := range models { callbackData, ok := ModelCallbackData(model.ID) if !ok { continue } keyboard = append(keyboard, []InlineKeyboardButton{{ Text: modelLabel(model), CallbackData: callbackData, }}) } return &InlineKeyboardMarkup{InlineKeyboard: keyboard} } func effortMarkup(model codexapp.Model) *InlineKeyboardMarkup { keyboard := make([][]InlineKeyboardButton, 0, len(model.SupportedReasoningEfforts)) for _, option := range model.SupportedReasoningEfforts { label := option.ReasoningEffort if option.ReasoningEffort == model.DefaultReasoningEffort { label += " default" } keyboard = append(keyboard, []InlineKeyboardButton{{ Text: label, CallbackData: EffortCallbackData(option.ReasoningEffort), }}) } return &InlineKeyboardMarkup{InlineKeyboard: keyboard} } func modelSupportsEffort(model codexapp.Model, effort string) bool { for _, option := range model.SupportedReasoningEfforts { if option.ReasoningEffort == effort { return true } } return false } func modelLabel(model codexapp.Model) string { label := model.DisplayName if label == "" { label = model.ID } if model.IsDefault { label += " default" } return label } func sessionModelLabel(models []codexapp.Model, modelID string) string { if modelID == "" { return "(Codex default)" } for _, model := range models { if model.ID == modelID { return modelLabel(model) } } return modelID } func settingsStatusText(model string, effort string) string { if effort == "" { effort = "(model default)" } return fmt.Sprintf("Current model: %s\nCurrent reasoning effort: %s", model, effort) } func (b *Bot) rememberSettingsMessage(ctx context.Context, userID int64, chatID int64, messageID int) { if messageID == 0 { return } if err := b.store.SetSessionSettingsMessage(ctx, userID, chatID, messageID); err != nil { b.logger.Printf("settings message store: %v", err) return } if err := b.tg.PinChatMessage(ctx, chatID, messageID, true); err != nil { b.logger.Printf("settings message pin: %v", err) } } func clearInlineKeyboardMarkup() *InlineKeyboardMarkup { return &InlineKeyboardMarkup{InlineKeyboard: [][]InlineKeyboardButton{}} } func editReplyMarkup(markup *InlineKeyboardMarkup) *InlineKeyboardMarkup { if markup != nil { return markup } return clearInlineKeyboardMarkup() } func approvalMarkup(id int64) *InlineKeyboardMarkup { return &InlineKeyboardMarkup{InlineKeyboard: [][]InlineKeyboardButton{ { {Text: "Approve", CallbackData: ApprovalCallbackData(id, "accept")}, {Text: "Deny", CallbackData: ApprovalCallbackData(id, "decline")}, }, { {Text: "Details", CallbackData: ApprovalCallbackData(id, "details")}, {Text: "Cancel", CallbackData: ApprovalCallbackData(id, "cancel")}, }, }} } func approvalResponse(approval store.PendingApproval, decision string) any { if approval.Kind != "item/permissions/requestApproval" { return map[string]any{"decision": decision} } scope := "turn" if decision == "acceptForSession" { scope = "session" } permissions := map[string]any{} if decision == "accept" || decision == "acceptForSession" { var params struct { Permissions map[string]any `json:"permissions"` } _ = json.Unmarshal([]byte(approval.PayloadJSON), ¶ms) if params.Permissions != nil { permissions = params.Permissions } } return map[string]any{ "permissions": permissions, "scope": scope, } } func renderApprovalHTML(kind string, raw json.RawMessage, status string) string { title := "Codex approval requested" if strings.Contains(kind, "commandExecution") { title = "Codex requests command approval" } if strings.Contains(kind, "fileChange") { title = "Codex requests file change approval" } if strings.Contains(kind, "permissions") { title = "Codex requests permission approval" } var params map[string]any _ = json.Unmarshal(raw, ¶ms) lines := []string{title} if reason, _ := params["reason"].(string); reason != "" { lines = append(lines, "", reason) } for _, key := range []string{"command", "cwd", "grantRoot", "permissions"} { if value, ok := params[key]; ok { lines = append(lines, fmt.Sprintf("%s: %s", argumentLabel(key), conciseValue(value))) } } summary := strings.Join(lines, "\n") details := renderApprovalDetailsHTML(kind, raw) limit := TelegramHTMLMessageLimit if status != "" { limit -= len([]rune(status)) + 1 } text := SummaryDetailsRawHTMLLimited(summary, details, limit) if status != "" { text += "\n" + EscapeHTML(status) } return text } func renderApprovalDetailsHTML(kind string, raw json.RawMessage) string { var params map[string]any if err := json.Unmarshal(raw, ¶ms); err != nil { return CodeBlockHTML("json", string(raw)) } var parts []string appendValue := func(label string, value any) { text, complex := argumentValueText(value) if strings.TrimSpace(text) == "" { return } if complex || strings.Contains(text, "\n") || strings.EqualFold(label, "Command") || strings.EqualFold(label, "Permissions") { language := "text" if strings.EqualFold(label, "Command") { language = "bash" } else if complex || strings.EqualFold(label, "Permissions") || looksLikeJSON(text) { language = "json" } parts = append(parts, ""+EscapeHTML(label)+"", CodeBlockHTML(language, text)) return } parts = append(parts, FieldHTML(label, text)) } for _, key := range []string{"command", "cwd", "grantRoot", "permissions", "reason"} { if value, ok := params[key]; ok { appendValue(argumentLabel(key), value) } } if len(parts) == 0 { return CodeBlockHTML("json", prettyJSON(raw)) } return strings.Join(nonEmptyHTML(parts), "\n") } func approvalStatusLine(decision string) string { switch decision { case "accept": return "Approved." case "acceptForSession": return "Approved for this session." case "decline": return "Disapproved." case "cancel": return "Canceled." default: return "Resolved: " + decision + "." } } func conciseValue(value any) string { text := fmt.Sprint(value) if stringValue, ok := value.(string); ok { text = stringValue } else if data, err := json.Marshal(value); err == nil { text = string(data) } text = strings.Join(strings.Fields(text), " ") runes := []rune(text) if len(runes) > 180 { return string(runes[:180]) + "..." } return text } func prettyJSON(raw json.RawMessage) string { if len(raw) == 0 { return "" } var value any if err := json.Unmarshal(raw, &value); err != nil { return string(raw) } data, err := json.MarshalIndent(value, "", " ") if err != nil { return string(raw) } return string(data) } func largestPhoto(photos []PhotoSize) PhotoSize { best := photos[0] bestScore := best.Width * best.Height for _, photo := range photos[1:] { score := photo.Width * photo.Height if score > bestScore { best = photo bestScore = score } } return best } func isImage(mimeType, filename string) bool { if strings.HasPrefix(strings.ToLower(mimeType), "image/") { return true } switch strings.ToLower(filepath.Ext(filename)) { case ".jpg", ".jpeg", ".png", ".gif", ".webp": return true default: return false } } func extensionForMime(mimeType string) string { extensions, err := mime.ExtensionsByType(mimeType) if err != nil || len(extensions) == 0 { return "" } return extensions[0] } func safeFilename(filename string) string { base := filepath.Base(filename) var builder strings.Builder for _, r := range base { switch { case r >= 'a' && r <= 'z': builder.WriteRune(r) case r >= 'A' && r <= 'Z': builder.WriteRune(r) case r >= '0' && r <= '9': builder.WriteRune(r) case r == '.', r == '-', r == '_': builder.WriteRune(r) default: builder.WriteByte('_') } } if builder.Len() == 0 { return "upload" } return builder.String() }