From 595e8aee0eea582eb93d21210f6311c9eb852a55 Mon Sep 17 00:00:00 2001 From: Codex Date: Tue, 23 Jun 2026 11:19:48 +0000 Subject: [PATCH] Refine Telegram thread commands --- README.md | 4 +- internal/codexapp/client.go | 16 ++ internal/codexapp/client_test.go | 69 ++++++ internal/store/store.go | 69 +++++- internal/store/store_test.go | 56 +++++ internal/telegram/api.go | 8 +- internal/telegram/bot.go | 393 ++++++++++++++++++++++++------- internal/telegram/render.go | 35 +++ internal/telegram/render_test.go | 80 ++++++- 9 files changed, 628 insertions(+), 102 deletions(-) diff --git a/README.md b/README.md index b2f263e..2cdcc6e 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,9 @@ Docker Compose runs only the Go Telegram bot. Codex runs on the host through `co The bot accepts one-to-one chats from allowlisted Telegram user IDs only. It rejects group, supergroup, and channel updates in code. -Supported commands: `/start`, `/help`, `/new`, `/thread`, `/rename`, `/fork`, `/archive`, `/status`, `/cancel`, `/workspaces`, `/workspace`, `/model`, `/sandbox`, `/pic`, `/diff`. `/model` lists available Codex models as inline buttons, then shows reasoning-effort buttons for the selected model. +Supported commands: `/start`, `/new`, `/resume`, `/rename`, `/fork`, `/archive [ID]`, `/unarchive [ID]`, `/delete [ID]`, `/status`, `/cancel`, `/workspace`, `/model`, `/sandbox`, `/pic`. `/resume`, `/archive`, `/unarchive`, `/delete`, `/workspace`, `/model`, and `/sandbox` show inline pickers when no ID or option is provided. `/model` lists available Codex models as inline buttons, then shows reasoning-effort buttons for the selected model. + +When changing `botCommands()`, rebuild and restart the bot so it republishes Telegram commands. The bot sets both the default command scope and the `all_private_chats` scope; private chats can keep showing stale commands if only the default scope is updated. Verify both scopes with `getMyCommands` after deployment. Plain text continues the active Codex thread and creates one if needed. `/pic PROMPT` starts a dedicated Codex image-generation turn and sends generated images back as Telegram photos. Telegram images are staged under `HOST_UPLOAD_DIR` and sent as `localImage` inputs. Other uploaded documents are staged and passed to Codex as host-visible file paths. diff --git a/internal/codexapp/client.go b/internal/codexapp/client.go index 2cd5df7..9fb5ed8 100644 --- a/internal/codexapp/client.go +++ b/internal/codexapp/client.go @@ -319,6 +319,22 @@ func (c *Client) ArchiveThread(ctx context.Context, threadID string) error { return c.call(ctx, "thread/archive", map[string]any{"threadId": threadID}, &ignored) } +func (c *Client) UnarchiveThread(ctx context.Context, threadID string) error { + if err := c.EnsureConnected(ctx); err != nil { + return err + } + var ignored json.RawMessage + return c.call(ctx, "thread/unarchive", map[string]any{"threadId": threadID}, &ignored) +} + +func (c *Client) DeleteThread(ctx context.Context, threadID string) error { + if err := c.EnsureConnected(ctx); err != nil { + return err + } + var ignored json.RawMessage + return c.call(ctx, "thread/delete", map[string]any{"threadId": threadID}, &ignored) +} + func (c *Client) SetThreadName(ctx context.Context, threadID, name string) error { if err := c.EnsureConnected(ctx); err != nil { return err diff --git a/internal/codexapp/client_test.go b/internal/codexapp/client_test.go index 45cf642..baaf976 100644 --- a/internal/codexapp/client_test.go +++ b/internal/codexapp/client_test.go @@ -159,6 +159,66 @@ func TestClientWebSocketUnixJSONRPC(t *testing.T) { return } + var archiveThread map[string]any + if err := conn.ReadJSON(&archiveThread); err != nil { + serverDone <- err + return + } + if archiveThread["method"] != "thread/archive" { + serverDone <- unexpectedMessage("thread/archive", archiveThread["method"]) + return + } + archiveParams := archiveThread["params"].(map[string]any) + if archiveParams["threadId"] != "thr_1" { + payload, _ := json.Marshal(archiveParams) + serverDone <- unexpectedMessage("thread/archive params", string(payload)) + return + } + if err := conn.WriteJSON(map[string]any{"id": archiveThread["id"], "result": map[string]any{}}); err != nil { + serverDone <- err + return + } + + var unarchiveThread map[string]any + if err := conn.ReadJSON(&unarchiveThread); err != nil { + serverDone <- err + return + } + if unarchiveThread["method"] != "thread/unarchive" { + serverDone <- unexpectedMessage("thread/unarchive", unarchiveThread["method"]) + return + } + unarchiveParams := unarchiveThread["params"].(map[string]any) + if unarchiveParams["threadId"] != "thr_1" { + payload, _ := json.Marshal(unarchiveParams) + serverDone <- unexpectedMessage("thread/unarchive params", string(payload)) + return + } + if err := conn.WriteJSON(map[string]any{"id": unarchiveThread["id"], "result": map[string]any{}}); err != nil { + serverDone <- err + return + } + + var deleteThread map[string]any + if err := conn.ReadJSON(&deleteThread); err != nil { + serverDone <- err + return + } + if deleteThread["method"] != "thread/delete" { + serverDone <- unexpectedMessage("thread/delete", deleteThread["method"]) + return + } + deleteParams := deleteThread["params"].(map[string]any) + if deleteParams["threadId"] != "thr_1" { + payload, _ := json.Marshal(deleteParams) + serverDone <- unexpectedMessage("thread/delete params", string(payload)) + return + } + if err := conn.WriteJSON(map[string]any{"id": deleteThread["id"], "result": map[string]any{}}); err != nil { + serverDone <- err + return + } + var response map[string]any if err := conn.ReadJSON(&response); err != nil { serverDone <- err @@ -218,6 +278,15 @@ func TestClientWebSocketUnixJSONRPC(t *testing.T) { if err := client.SetThreadName(ctx, "thr_1", "Short title"); err != nil { t.Fatal(err) } + if err := client.ArchiveThread(ctx, "thr_1"); err != nil { + t.Fatal(err) + } + if err := client.UnarchiveThread(ctx, "thr_1"); err != nil { + t.Fatal(err) + } + if err := client.DeleteThread(ctx, "thr_1"); err != nil { + t.Fatal(err) + } if err := client.RespondServerRequest(ctx, approvalRequestID, "accept"); err != nil { t.Fatal(err) } diff --git a/internal/store/store.go b/internal/store/store.go index 8a5b07f..7a1c1b9 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -339,6 +339,13 @@ WHERE telegram_user_id = ?`, threadID, telegramUserID) return err } +func (s *Store) ClearActiveThread(ctx context.Context, telegramUserID, threadID int64) error { + _, err := s.db.ExecContext(ctx, ` +UPDATE sessions SET active_thread_id = NULL, active_turn_id = '', updated_at = datetime('now') +WHERE telegram_user_id = ? AND active_thread_id = ?`, telegramUserID, threadID) + return err +} + func (s *Store) SetActiveTurn(ctx context.Context, telegramUserID int64, turnID string) error { _, err := s.db.ExecContext(ctx, "UPDATE sessions SET active_turn_id = ?, updated_at = datetime('now') WHERE telegram_user_id = ?", turnID, telegramUserID) return err @@ -413,6 +420,18 @@ func (s *Store) ListThreads(ctx context.Context, telegramUserID int64, includeAr } func (s *Store) ListThreadsPage(ctx context.Context, telegramUserID int64, includeArchived bool, limit, offset int) ([]Thread, error) { + archivedFilter := "" + if !includeArchived { + archivedFilter = "archived = 0" + } + return s.listThreadsPage(ctx, telegramUserID, archivedFilter, limit, offset) +} + +func (s *Store) ListArchivedThreadsPage(ctx context.Context, telegramUserID int64, limit, offset int) ([]Thread, error) { + return s.listThreadsPage(ctx, telegramUserID, "archived = 1", limit, offset) +} + +func (s *Store) listThreadsPage(ctx context.Context, telegramUserID int64, archivedFilter string, limit, offset int) ([]Thread, error) { if limit <= 0 { limit = 20 } @@ -423,8 +442,8 @@ func (s *Store) ListThreadsPage(ctx context.Context, telegramUserID int64, inclu SELECT id, telegram_user_id, codex_thread_id, workspace_id, title, archived, created_at, updated_at FROM threads WHERE telegram_user_id = ?` args := []any{telegramUserID} - if !includeArchived { - query += " AND archived = 0" + if archivedFilter != "" { + query += " AND " + archivedFilter } query += " ORDER BY updated_at DESC, id DESC LIMIT ? OFFSET ?" args = append(args, limit, offset) @@ -453,6 +472,52 @@ WHERE telegram_user_id = ? AND id = ?`, telegramUserID, id) return err } +func (s *Store) UnarchiveThread(ctx context.Context, telegramUserID, id int64) error { + _, err := s.db.ExecContext(ctx, ` +UPDATE threads SET archived = 0, updated_at = datetime('now') +WHERE telegram_user_id = ? AND id = ?`, telegramUserID, id) + return err +} + +func (s *Store) DeleteThread(ctx context.Context, telegramUserID, id int64) error { + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return err + } + if _, err := tx.ExecContext(ctx, ` +UPDATE sessions SET active_thread_id = NULL, active_turn_id = '', updated_at = datetime('now') +WHERE telegram_user_id = ? AND active_thread_id = ?`, telegramUserID, id); err != nil { + _ = tx.Rollback() + return err + } + if _, err := tx.ExecContext(ctx, ` +DELETE FROM threads +WHERE telegram_user_id = ? AND id = ?`, telegramUserID, id); err != nil { + _ = tx.Rollback() + return err + } + return tx.Commit() +} + +func (s *Store) DeleteThreadByCodexID(ctx context.Context, codexThreadID string) error { + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return err + } + if _, err := tx.ExecContext(ctx, ` +UPDATE sessions +SET active_thread_id = NULL, active_turn_id = '', updated_at = datetime('now') +WHERE active_thread_id IN (SELECT id FROM threads WHERE codex_thread_id = ?)`, codexThreadID); err != nil { + _ = tx.Rollback() + return err + } + if _, err := tx.ExecContext(ctx, "DELETE FROM threads WHERE codex_thread_id = ?", codexThreadID); err != nil { + _ = tx.Rollback() + return err + } + return tx.Commit() +} + func (s *Store) TouchThread(ctx context.Context, codexThreadID string) error { _, err := s.db.ExecContext(ctx, "UPDATE threads SET updated_at = datetime('now') WHERE codex_thread_id = ?", codexThreadID) return err diff --git a/internal/store/store_test.go b/internal/store/store_test.go index bb5553a..9b88de9 100644 --- a/internal/store/store_test.go +++ b/internal/store/store_test.go @@ -109,6 +109,42 @@ func TestStoreUsersWorkspacesSessions(t *testing.T) { if session.ActiveTurnID != "" { t.Fatalf("active turn not cleared: %+v", session) } + if err := st.SetActiveThread(ctx, 42, thread.ID); err != nil { + t.Fatal(err) + } + if err := st.SetActiveTurn(ctx, 42, "turn-delete"); err != nil { + t.Fatal(err) + } + if err := st.DeleteThread(ctx, 42, thread.ID); err != nil { + t.Fatal(err) + } + session, err = st.GetSession(ctx, 42) + if err != nil { + t.Fatal(err) + } + if session.ActiveThreadID != 0 || session.ActiveTurnID != "" { + t.Fatalf("delete should clear active thread and turn: %+v", session) + } + if _, err := st.GetThreadByID(ctx, 42, thread.ID); err == nil { + t.Fatal("deleted thread should not be found") + } + thread, err = st.CreateThread(ctx, 42, "codex-thread-delete-by-id", ws.ID, "delete by codex id") + if err != nil { + t.Fatal(err) + } + if err := st.SetActiveThread(ctx, 42, thread.ID); err != nil { + t.Fatal(err) + } + if err := st.DeleteThreadByCodexID(ctx, "codex-thread-delete-by-id"); err != nil { + t.Fatal(err) + } + session, err = st.GetSession(ctx, 42) + if err != nil { + t.Fatal(err) + } + if session.ActiveThreadID != 0 { + t.Fatalf("delete by codex id should clear active thread: %+v", session) + } } func TestListThreadsPage(t *testing.T) { @@ -142,6 +178,26 @@ func TestListThreadsPage(t *testing.T) { if len(threads) != 1 { t.Fatalf("got %d threads on second page, want 1", len(threads)) } + if err := st.ArchiveThread(ctx, 42, threads[0].ID); err != nil { + t.Fatal(err) + } + archived, err := st.ListArchivedThreadsPage(ctx, 42, 10, 0) + if err != nil { + t.Fatal(err) + } + if len(archived) != 1 || !archived[0].Archived { + t.Fatalf("archived threads = %+v, want one archived thread", archived) + } + if err := st.UnarchiveThread(ctx, 42, archived[0].ID); err != nil { + t.Fatal(err) + } + archived, err = st.ListArchivedThreadsPage(ctx, 42, 10, 0) + if err != nil { + t.Fatal(err) + } + if len(archived) != 0 { + t.Fatalf("archived threads after unarchive = %+v, want none", archived) + } } func TestRenameThread(t *testing.T) { diff --git a/internal/telegram/api.go b/internal/telegram/api.go index 84e26c6..574ac06 100644 --- a/internal/telegram/api.go +++ b/internal/telegram/api.go @@ -52,7 +52,13 @@ func (c *Client) redact(text string) string { func (c *Client) SetMyCommands(ctx context.Context, commands []BotCommand) error { var ok bool - return c.postJSON(ctx, "setMyCommands", map[string]any{"commands": commands}, &ok) + if err := c.postJSON(ctx, "setMyCommands", map[string]any{"commands": commands}, &ok); err != nil { + return err + } + return c.postJSON(ctx, "setMyCommands", map[string]any{ + "commands": commands, + "scope": map[string]any{"type": "all_private_chats"}, + }, &ok) } func (c *Client) GetUpdates(ctx context.Context, offset int, timeoutSeconds int) ([]Update, error) { diff --git a/internal/telegram/bot.go b/internal/telegram/bot.go index 0f14ac1..4d8bbe2 100644 --- a/internal/telegram/bot.go +++ b/internal/telegram/bot.go @@ -33,6 +33,10 @@ const ( telegramDirectiveEnd = " -->" telegramCaptionLimit = 1024 pictureMediaGroupLimit = 10 + threadActionResume = "resume" + threadActionArchive = "archive" + threadActionUnarchive = "unarchive" + threadActionDelete = "delete" ) type Bot struct { @@ -50,7 +54,6 @@ type Bot struct { mu sync.Mutex outputs map[string]*outputState - diffs map[string]string } type assistantMessageSegment struct { @@ -150,7 +153,6 @@ func NewBot(tg *Client, st *store.Store, codex *codexapp.Client, uploadDir, code defaultSandbox: defaultSandbox, pollTimeout: pollTimeout, outputs: make(map[string]*outputState), - diffs: make(map[string]string), } } @@ -227,19 +229,20 @@ func (b *Bot) clearStaleActiveTurn(ctx context.Context, userID int64, thread sto func botCommands() []BotCommand { return []BotCommand{ + {Command: "start", Description: "Show help"}, {Command: "new", Description: "Start a new thread"}, - {Command: "thread", Description: "List or switch threads"}, + {Command: "resume", Description: "List or switch threads"}, {Command: "rename", Description: "Rename a thread"}, {Command: "fork", Description: "Fork the active thread"}, {Command: "archive", Description: "Archive a thread"}, + {Command: "unarchive", Description: "Restore an archived thread"}, + {Command: "delete", Description: "Delete a thread"}, {Command: "status", Description: "Show active settings"}, {Command: "cancel", Description: "Interrupt the active turn"}, {Command: "workspace", Description: "Select workspace"}, {Command: "model", Description: "Choose model"}, {Command: "sandbox", Description: "Choose sandbox"}, {Command: "pic", Description: "Generate images"}, - {Command: "diff", Description: "Show latest diff"}, - {Command: "help", Description: "Show help"}, } } @@ -297,27 +300,27 @@ func (b *Bot) handleCommand(ctx context.Context, message *Message, session store chatID := message.Chat.ID switch command { - case "start", "help": - return true, b.sendHelp(ctx, chatID) + case "start": + return true, b.sendStart(ctx, chatID) case "new": _, _, err := b.createNewThread(ctx, userID, chatID, session, true) return true, err - case "thread": - return true, b.threadCommand(ctx, userID, chatID, args) - case "threads", "resume": - return true, b.legacyThreadCommand(ctx, userID, chatID, args) + case "resume": + return true, b.resumeCommand(ctx, userID, chatID, args) case "rename": return true, b.renameThread(ctx, userID, chatID, session, args) case "fork": return true, b.forkThread(ctx, userID, chatID, session) case "archive": return true, b.archiveThread(ctx, userID, chatID, session, args) + case "unarchive": + return true, b.unarchiveThread(ctx, userID, chatID, session, args) + case "delete": + return true, b.deleteThread(ctx, userID, chatID, session, args) case "status": return true, b.sendStatus(ctx, userID, chatID, session) case "cancel": return true, b.cancelTurn(ctx, userID, chatID, session) - case "workspaces": - return true, b.sendWorkspaces(ctx, userID, chatID) case "workspace": return true, b.handleWorkspaceCommand(ctx, userID, chatID, session, args) case "model": @@ -326,44 +329,42 @@ func (b *Bot) handleCommand(ctx context.Context, message *Message, session store return true, b.handleSandboxCommand(ctx, userID, chatID, session, args) case "pic": return true, b.handlePictureCommand(ctx, userID, chatID, session, args) - case "diff": - return true, b.sendDiff(ctx, chatID, session) default: - _, err := b.tg.SendMessage(ctx, chatID, "Unknown command. Use /help.", SendMessageOptions{}) + _, err := b.tg.SendMessage(ctx, chatID, "Unknown command.", SendMessageOptions{}) return true, err } } -func (b *Bot) sendHelp(ctx context.Context, chatID int64) error { +func (b *Bot) sendStart(ctx context.Context, chatID int64) error { text := strings.Join([]string{ "Codex Telegram Bot", "", "/new - start a new Codex thread", - "/thread - list recent threads", - "/thread ID - switch to a thread", + "/resume - list recent threads", + "/resume ID - switch to a thread", "/rename TITLE or /rename ID TITLE - rename a thread", "/fork - fork the active thread", - "/archive [ID] - archive a thread", + "/archive [ID] - choose or archive a thread", + "/unarchive [ID] - choose or restore an archived thread", + "/delete [ID] - choose or delete a thread", "/status - show active settings", "/cancel - interrupt the active turn", - "/workspaces - list workspaces", "/workspace [ID] - select workspace", "/model - choose model and reasoning effort", "/sandbox - choose sandbox", "/pic PROMPT - generate image(s) from a prompt", - "/diff - show the latest streamed diff", "", "Plain text continues the active thread. Images are staged as local Codex image inputs; other files are staged and sent as paths.", }, "\n") return b.sendLong(ctx, chatID, text) } -func (b *Bot) threadCommand(ctx context.Context, userID, chatID int64, args []string) error { +func (b *Bot) resumeCommand(ctx context.Context, userID, chatID int64, args []string) error { if len(args) == 0 { return b.sendResumeChoices(ctx, userID, chatID, 0, 0) } if len(args) != 1 { - _, err := b.tg.SendMessage(ctx, chatID, "Use /thread to choose a thread, or /thread ID to switch directly.", SendMessageOptions{}) + _, err := b.tg.SendMessage(ctx, chatID, "Use /resume to choose a thread, or /resume ID to switch directly.", SendMessageOptions{}) return err } id, err := strconv.ParseInt(args[0], 10, 64) @@ -374,26 +375,29 @@ func (b *Bot) threadCommand(ctx context.Context, userID, chatID int64, args []st return b.resumeThreadByID(ctx, userID, chatID, id, 0) } -func (b *Bot) legacyThreadCommand(ctx context.Context, userID, chatID int64, args []string) error { - if len(args) == 0 { - return b.sendResumeChoices(ctx, userID, chatID, 0, 0) - } - return b.threadCommand(ctx, userID, chatID, args) +func (b *Bot) sendResumeChoices(ctx context.Context, userID, chatID int64, page int, messageID int) error { + return b.sendThreadActionChoices(ctx, userID, chatID, threadActionResume, page, messageID) } -func (b *Bot) sendResumeChoices(ctx context.Context, userID, chatID int64, page int, messageID int) error { +func (b *Bot) sendThreadActionChoices(ctx context.Context, userID, chatID int64, action string, page int, messageID int) error { if page < 0 { page = 0 } - threads, err := b.store.ListThreadsPage(ctx, userID, false, resumeThreadPageSize+1, page*resumeThreadPageSize) + var threads []store.Thread + var err error + if action == threadActionUnarchive { + threads, err = b.store.ListArchivedThreadsPage(ctx, userID, resumeThreadPageSize+1, page*resumeThreadPageSize) + } else { + threads, err = b.store.ListThreadsPage(ctx, userID, false, resumeThreadPageSize+1, page*resumeThreadPageSize) + } if err != nil { return err } if len(threads) == 0 && page > 0 { - return b.sendResumeChoices(ctx, userID, chatID, page-1, messageID) + return b.sendThreadActionChoices(ctx, userID, chatID, action, page-1, messageID) } if len(threads) == 0 { - text := "No threads yet. Use /new." + text := noThreadActionChoicesText(action) if messageID != 0 { _, err := b.tg.EditMessageText(ctx, chatID, messageID, text, EditMessageTextOptions{}) return err @@ -406,8 +410,8 @@ func (b *Bot) sendResumeChoices(ctx context.Context, userID, chatID int64, page if hasNext { threads = threads[:resumeThreadPageSize] } - text := resumeThreadListText(threads, page) - markup := resumeThreadMarkup(threads, page, hasNext) + text := threadActionListText(threads, page, action) + markup := threadActionMarkup(threads, page, hasNext, action) if messageID != 0 { _, err := b.tg.EditMessageText(ctx, chatID, messageID, EscapeHTML(text), EditMessageTextOptions{ParseMode: "HTML", ReplyMarkup: editReplyMarkup(markup)}) return err @@ -537,28 +541,149 @@ func (b *Bot) forkThread(ctx context.Context, userID, chatID int64, session stor } func (b *Bot) archiveThread(ctx context.Context, userID, chatID int64, session store.Session, args []string) error { - var thread store.Thread - var err error - if len(args) > 0 { - id, parseErr := strconv.ParseInt(args[0], 10, 64) - if parseErr != nil { - _, sendErr := b.tg.SendMessage(ctx, chatID, "Thread ID must be a number.", SendMessageOptions{}) - return sendErr - } - thread, err = b.store.GetThreadByID(ctx, userID, id) - } else { - thread, err = b.activeThread(ctx, userID, session) + _ = session + if len(args) == 0 { + return b.sendThreadActionChoices(ctx, userID, chatID, threadActionArchive, 0, 0) } + id, handled, err := b.threadIDFromArgs(ctx, chatID, args, "Use /archive to choose a thread, or /archive ID.") + if handled { + return err + } + return b.archiveThreadByID(ctx, userID, chatID, id, 0) +} + +func (b *Bot) archiveThreadByID(ctx context.Context, userID, chatID int64, id int64, messageID int) error { + thread, err := b.store.GetThreadByID(ctx, userID, id) if err != nil { - return b.sendNoActiveThread(ctx, chatID, err) + if !errors.Is(err, sql.ErrNoRows) { + return err + } + return b.sendThreadActionNotFound(ctx, chatID, messageID) } if err := b.codex.ArchiveThread(ctx, thread.CodexThreadID); err != nil { - return b.sendError(ctx, chatID, "Could not archive Codex thread", err) + if !isMissingCodexThreadError(err) { + return b.sendError(ctx, chatID, "Could not archive Codex thread", err) + } + b.logger.Printf("archive stale local thread #%d codex_thread_id=%s: %v", thread.ID, thread.CodexThreadID, err) } if err := b.store.ArchiveThread(ctx, userID, thread.ID); err != nil { return err } - _, err = b.tg.SendMessage(ctx, chatID, fmt.Sprintf("Archived thread #%d.", thread.ID), SendMessageOptions{}) + text := fmt.Sprintf("Archived thread #%d.", thread.ID) + if messageID != 0 { + _, err = b.tg.EditMessageText(ctx, chatID, messageID, text, EditMessageTextOptions{ReplyMarkup: clearInlineKeyboardMarkup()}) + return err + } + _, err = b.tg.SendMessage(ctx, chatID, text, SendMessageOptions{}) + return err +} + +func (b *Bot) unarchiveThread(ctx context.Context, userID, chatID int64, session store.Session, args []string) error { + _ = session + if len(args) == 0 { + return b.sendThreadActionChoices(ctx, userID, chatID, threadActionUnarchive, 0, 0) + } + id, handled, err := b.threadIDFromArgs(ctx, chatID, args, "Use /unarchive to choose a thread, or /unarchive ID.") + if handled { + return err + } + return b.unarchiveThreadByID(ctx, userID, chatID, id, 0) +} + +func (b *Bot) unarchiveThreadByID(ctx context.Context, userID, chatID int64, id int64, messageID int) error { + thread, err := b.store.GetThreadByID(ctx, userID, id) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + return err + } + return b.sendThreadActionNotFound(ctx, chatID, messageID) + } + if err := b.codex.UnarchiveThread(ctx, thread.CodexThreadID); err != nil { + return b.sendError(ctx, chatID, "Could not unarchive Codex thread", err) + } + if err := b.store.UnarchiveThread(ctx, userID, thread.ID); err != nil { + return err + } + text := fmt.Sprintf("Restored thread #%d.", thread.ID) + if messageID != 0 { + _, err = b.tg.EditMessageText(ctx, chatID, messageID, text, EditMessageTextOptions{ReplyMarkup: clearInlineKeyboardMarkup()}) + return err + } + _, err = b.tg.SendMessage(ctx, chatID, text, SendMessageOptions{}) + return err +} + +func (b *Bot) deleteThread(ctx context.Context, userID, chatID int64, session store.Session, args []string) error { + _ = session + if len(args) == 0 { + return b.sendThreadActionChoices(ctx, userID, chatID, threadActionDelete, 0, 0) + } + id, handled, err := b.threadIDFromArgs(ctx, chatID, args, "Use /delete to choose a thread, or /delete ID.") + if handled { + return err + } + return b.deleteThreadByID(ctx, userID, chatID, id, 0) +} + +func (b *Bot) deleteThreadByID(ctx context.Context, userID, chatID int64, id int64, messageID int) error { + thread, err := b.store.GetThreadByID(ctx, userID, id) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + return err + } + return b.sendThreadActionNotFound(ctx, chatID, messageID) + } + if err := b.codex.DeleteThread(ctx, thread.CodexThreadID); err != nil { + if !isMissingCodexThreadError(err) { + return b.sendError(ctx, chatID, "Could not delete Codex thread", err) + } + b.logger.Printf("delete stale local thread #%d codex_thread_id=%s: %v", thread.ID, thread.CodexThreadID, err) + } + b.clearOutput(thread.CodexThreadID) + if err := b.store.DeleteThread(ctx, userID, thread.ID); err != nil { + return err + } + text := fmt.Sprintf("Deleted thread #%d.", thread.ID) + if messageID != 0 { + _, err = b.tg.EditMessageText(ctx, chatID, messageID, text, EditMessageTextOptions{ReplyMarkup: clearInlineKeyboardMarkup()}) + return err + } + _, err = b.tg.SendMessage(ctx, chatID, text, SendMessageOptions{}) + return err +} + +func isMissingCodexThreadError(err error) bool { + var rpcErr codexapp.RPCError + if !errors.As(err, &rpcErr) { + return false + } + if rpcErr.Code != -32600 { + return false + } + message := strings.ToLower(rpcErr.Message) + return strings.Contains(message, "no rollout found") || strings.Contains(message, "thread not loaded") +} + +func (b *Bot) threadIDFromArgs(ctx context.Context, chatID int64, args []string, usage string) (int64, bool, error) { + if len(args) != 1 { + _, err := b.tg.SendMessage(ctx, chatID, usage, SendMessageOptions{}) + return 0, true, err + } + id, err := strconv.ParseInt(args[0], 10, 64) + if err != nil || id <= 0 { + _, sendErr := b.tg.SendMessage(ctx, chatID, "Thread ID must be a number.", SendMessageOptions{}) + return 0, true, sendErr + } + return id, false, nil +} + +func (b *Bot) sendThreadActionNotFound(ctx context.Context, chatID int64, messageID int) error { + text := "Thread not found." + if messageID != 0 { + _, err := b.tg.EditMessageText(ctx, chatID, messageID, text, EditMessageTextOptions{ReplyMarkup: clearInlineKeyboardMarkup()}) + return err + } + _, err := b.tg.SendMessage(ctx, chatID, text, SendMessageOptions{}) return err } @@ -842,26 +967,6 @@ func isPicturePath(path string) bool { } } -func (b *Bot) sendDiff(ctx context.Context, chatID int64, session store.Session) error { - if session.ActiveThreadID == 0 { - _, err := b.tg.SendMessage(ctx, chatID, "No active thread.", SendMessageOptions{}) - return err - } - thread, err := b.store.GetThreadByID(ctx, chatID, session.ActiveThreadID) - if err != nil { - _, sendErr := b.tg.SendMessage(ctx, chatID, "No active thread.", SendMessageOptions{}) - return sendErr - } - b.mu.Lock() - diff := b.diffs[thread.CodexThreadID] - b.mu.Unlock() - if diff == "" { - _, err := b.tg.SendMessage(ctx, chatID, "No diff has been streamed for this thread.", SendMessageOptions{}) - return err - } - return b.sendLong(ctx, chatID, diff) -} - func (b *Bot) continueThread(ctx context.Context, message *Message, session store.Session) error { userID := message.From.ID chatID := message.Chat.ID @@ -1221,6 +1326,15 @@ func (b *Bot) handleCallback(ctx context.Context, callback *CallbackQuery) error } return b.sendResumeChoices(ctx, callback.From.ID, callback.Message.Chat.ID, resumePage, callback.Message.MessageID) } + if action, threadID, ok := ParseThreadActionCallbackData(callback.Data); ok { + return b.handleThreadActionCallback(ctx, callback, action, threadID) + } + if action, page, ok := ParseThreadActionPageCallbackData(callback.Data); ok { + if err := b.tg.AnswerCallbackQuery(ctx, callback.ID, ""); err != nil { + return err + } + return b.sendThreadActionChoices(ctx, callback.From.ID, callback.Message.Chat.ID, action, page, callback.Message.MessageID) + } if modelID, ok := ParseModelCallbackData(callback.Data); ok { return b.handleModelCallback(ctx, callback, modelID) } @@ -1236,6 +1350,33 @@ func (b *Bot) handleCallback(ctx context.Context, callback *CallbackQuery) error return b.tg.AnswerCallbackQuery(ctx, callback.ID, "Unknown action.") } +func (b *Bot) handleThreadActionCallback(ctx context.Context, callback *CallbackQuery, action string, threadID int64) error { + switch action { + case threadActionResume: + if err := b.tg.AnswerCallbackQuery(ctx, callback.ID, "Thread selected."); err != nil { + return err + } + return b.resumeThreadByID(ctx, callback.From.ID, callback.Message.Chat.ID, threadID, callback.Message.MessageID) + case threadActionArchive: + if err := b.tg.AnswerCallbackQuery(ctx, callback.ID, "Archiving thread."); err != nil { + return err + } + return b.archiveThreadByID(ctx, callback.From.ID, callback.Message.Chat.ID, threadID, callback.Message.MessageID) + case threadActionUnarchive: + if err := b.tg.AnswerCallbackQuery(ctx, callback.ID, "Restoring thread."); err != nil { + return err + } + return b.unarchiveThreadByID(ctx, callback.From.ID, callback.Message.Chat.ID, threadID, callback.Message.MessageID) + case threadActionDelete: + if err := b.tg.AnswerCallbackQuery(ctx, callback.ID, "Deleting thread."); err != nil { + return err + } + return b.deleteThreadByID(ctx, callback.From.ID, callback.Message.Chat.ID, threadID, callback.Message.MessageID) + default: + return b.tg.AnswerCallbackQuery(ctx, callback.ID, "Unknown thread action.") + } +} + func (b *Bot) handleApprovalCallback(ctx context.Context, callback *CallbackQuery, approvalID int64, decision string) error { approval, err := b.store.GetPendingApproval(ctx, callback.From.ID, approvalID) if err != nil { @@ -1550,6 +1691,8 @@ func argumentLabel(key string) string { return "CWD" case "cmd": return "cmd" + case "environmentid": + return "Environment ID" } label := strings.ReplaceAll(key, "_", " ") return strings.ToUpper(label[:1]) + label[1:] @@ -1804,20 +1947,6 @@ func (b *Bot) handleCodexNotification(ctx context.Context, event codexapp.Event) if params.ThreadID != "" && b.hasOutputThread(params.ThreadID) { return b.sendOutputBlock(ctx, params.ThreadID, "Codex warning: "+params.Message) } - case "turn/diff/updated": - var params struct { - ThreadID string `json:"threadId"` - TurnID string `json:"turnId"` - Diff string `json:"diff"` - } - if err := json.Unmarshal(event.Params, ¶ms); err != nil { - return err - } - if params.ThreadID != "" && b.shouldHandleOutputEvent(params.ThreadID, params.TurnID) { - b.mu.Lock() - b.diffs[params.ThreadID] = params.Diff - b.mu.Unlock() - } case "turn/completed": var params struct { ThreadID string `json:"threadId"` @@ -1854,6 +1983,17 @@ func (b *Bot) handleCodexNotification(ctx context.Context, event codexapp.Event) } return b.store.SyncThreadTitleByCodexID(ctx, params.ThreadID, title) } + case "thread/deleted": + var params struct { + ThreadID string `json:"threadId"` + } + if err := json.Unmarshal(event.Params, ¶ms); err != nil { + return err + } + if params.ThreadID != "" { + b.clearOutput(params.ThreadID) + return b.store.DeleteThreadByCodexID(ctx, params.ThreadID) + } case "thread/settings/updated": var params struct { ThreadID string `json:"threadId"` @@ -3090,20 +3230,28 @@ func parseCommand(text string) (string, []string, bool) { } func resumeThreadListText(threads []store.Thread, page int) string { - lines := []string{fmt.Sprintf("Threads (page %d):", page+1), ""} + return threadActionListText(threads, page, threadActionResume) +} + +func threadActionListText(threads []store.Thread, page int, action string) string { + lines := []string{fmt.Sprintf("%s (page %d):", threadActionListTitle(action), page+1), ""} for _, thread := range threads { lines = append(lines, fmt.Sprintf("Thread ID %d: %s", thread.ID, threadDisplayTitle(thread))) } - lines = append(lines, "", "Choose a button below, or use /thread THREAD_ID directly.") + lines = append(lines, "", threadActionListFooter(action)) return strings.Join(lines, "\n") } func resumeThreadMarkup(threads []store.Thread, page int, hasNext bool) *InlineKeyboardMarkup { + return threadActionMarkup(threads, page, hasNext, threadActionResume) +} + +func threadActionMarkup(threads []store.Thread, page int, hasNext bool, action string) *InlineKeyboardMarkup { keyboard := make([][]InlineKeyboardButton, 0, 4) for _, thread := range threads { button := InlineKeyboardButton{ - Text: fmt.Sprintf("ID %d", thread.ID), - CallbackData: ResumeThreadCallbackData(thread.ID), + Text: threadActionButtonLabel(action, thread.ID), + CallbackData: threadActionButtonCallback(action, thread.ID), } if len(keyboard) == 0 || len(keyboard[len(keyboard)-1]) >= 4 { keyboard = append(keyboard, []InlineKeyboardButton{button}) @@ -3113,10 +3261,10 @@ func resumeThreadMarkup(threads []store.Thread, page int, hasNext bool) *InlineK } var nav []InlineKeyboardButton if page > 0 { - nav = append(nav, InlineKeyboardButton{Text: "Prev", CallbackData: ResumePageCallbackData(page - 1)}) + nav = append(nav, InlineKeyboardButton{Text: "Prev", CallbackData: threadActionPageCallback(action, page-1)}) } if hasNext { - nav = append(nav, InlineKeyboardButton{Text: "Next", CallbackData: ResumePageCallbackData(page + 1)}) + nav = append(nav, InlineKeyboardButton{Text: "Next", CallbackData: threadActionPageCallback(action, page+1)}) } if len(nav) > 0 { keyboard = append(keyboard, nav) @@ -3124,6 +3272,72 @@ func resumeThreadMarkup(threads []store.Thread, page int, hasNext bool) *InlineK return &InlineKeyboardMarkup{InlineKeyboard: keyboard} } +func noThreadActionChoicesText(action string) string { + switch action { + case threadActionArchive: + return "No threads to archive." + case threadActionUnarchive: + return "No archived threads to restore." + case threadActionDelete: + return "No threads to delete." + default: + return "No threads yet. Use /new." + } +} + +func threadActionListTitle(action string) string { + switch action { + case threadActionArchive: + return "Choose a thread to archive" + case threadActionUnarchive: + return "Choose an archived thread to restore" + case threadActionDelete: + return "Choose a thread to delete" + default: + return "Threads" + } +} + +func threadActionListFooter(action string) string { + switch action { + case threadActionArchive: + return "Choose a button below, or use /archive THREAD_ID directly." + case threadActionUnarchive: + return "Choose a button below, or use /unarchive THREAD_ID directly." + case threadActionDelete: + return "Choose a button below, or use /delete THREAD_ID directly." + default: + return "Choose a button below, or use /resume THREAD_ID directly." + } +} + +func threadActionButtonLabel(action string, id int64) string { + switch action { + case threadActionArchive: + return fmt.Sprintf("Archive %d", id) + case threadActionUnarchive: + return fmt.Sprintf("Restore %d", id) + case threadActionDelete: + return fmt.Sprintf("Delete %d", id) + default: + return fmt.Sprintf("ID %d", id) + } +} + +func threadActionButtonCallback(action string, id int64) string { + if action == threadActionResume { + return ResumeThreadCallbackData(id) + } + return ThreadActionCallbackData(action, id) +} + +func threadActionPageCallback(action string, page int) string { + if action == threadActionResume { + return ResumePageCallbackData(page) + } + return ThreadActionPageCallbackData(action, page) +} + func normalizeThreadTitle(title string) string { title = strings.Join(strings.Fields(title), " ") runes := []rune(title) @@ -3610,6 +3824,7 @@ func renderApprovalPayloadDetailsHTML(raw json.RawMessage, params map[string]any } appendPart(renderApprovalFieldHTML("cwd", params["cwd"])) + appendPart(renderApprovalFieldHTML("environmentId", params["environmentId"])) appendPart(renderApprovalFieldHTML("command", params["command"])) appendPart(renderApprovalFieldHTML("parsedCmd", params["parsedCmd"])) appendPart(renderApprovalFieldHTML("additionalPermissions", params["additionalPermissions"])) diff --git a/internal/telegram/render.go b/internal/telegram/render.go index b023927..89c35f3 100644 --- a/internal/telegram/render.go +++ b/internal/telegram/render.go @@ -496,6 +496,41 @@ func ParseResumePageCallbackData(data string) (int, bool) { return page, err == nil && page >= 0 } +func ThreadActionCallbackData(action string, id int64) string { + return fmt.Sprintf("thread:%s:%d", action, id) +} + +func ParseThreadActionCallbackData(data string) (string, int64, bool) { + parts := strings.Split(data, ":") + if len(parts) != 3 || parts[0] != "thread" || !isThreadAction(parts[1]) { + return "", 0, false + } + id, err := strconv.ParseInt(parts[2], 10, 64) + return parts[1], id, err == nil && id > 0 +} + +func ThreadActionPageCallbackData(action string, page int) string { + return fmt.Sprintf("threadpage:%s:%d", action, page) +} + +func ParseThreadActionPageCallbackData(data string) (string, int, bool) { + parts := strings.Split(data, ":") + if len(parts) != 3 || parts[0] != "threadpage" || !isThreadAction(parts[1]) { + return "", 0, false + } + page, err := strconv.Atoi(parts[2]) + return parts[1], page, err == nil && page >= 0 +} + +func isThreadAction(action string) bool { + switch action { + case threadActionResume, threadActionArchive, threadActionUnarchive, threadActionDelete: + return true + default: + return false + } +} + func ModelCallbackData(modelID string) (string, bool) { encoded := base64.RawURLEncoding.EncodeToString([]byte(modelID)) data := "model:" + encoded diff --git a/internal/telegram/render_test.go b/internal/telegram/render_test.go index 95a79d4..6e2f41b 100644 --- a/internal/telegram/render_test.go +++ b/internal/telegram/render_test.go @@ -7,6 +7,7 @@ import ( "strings" "testing" + "codex-telegram-bot/internal/codexapp" "codex-telegram-bot/internal/store" ) @@ -197,16 +198,18 @@ func TestEditReplyMarkupClearsInlineKeyboard(t *testing.T) { } } -func TestBotCommandsUseSingleThreadCommand(t *testing.T) { +func TestBotCommandsExposeCurrentPromptList(t *testing.T) { commands := botCommands() seen := map[string]bool{} for _, command := range commands { seen[command.Command] = true } - if !seen["thread"] { - t.Fatal("bot command list should include /thread") + for _, command := range []string{"start", "new", "resume", "rename", "fork", "archive", "unarchive", "delete", "status", "cancel", "workspace", "model", "sandbox", "pic"} { + if !seen[command] { + t.Fatalf("bot command list should include /%s", command) + } } - for _, removed := range []string{"threads", "resume"} { + for _, removed := range []string{"help", "thread", "threads", "workspaces", "diff"} { if seen[removed] { t.Fatalf("bot command list should not include /%s", removed) } @@ -214,8 +217,8 @@ func TestBotCommandsUseSingleThreadCommand(t *testing.T) { } func TestParseCommand(t *testing.T) { - name, args, ok := parseCommand("/thread@my_bot 123") - if !ok || name != "thread" || len(args) != 1 || args[0] != "123" { + name, args, ok := parseCommand("/resume@my_bot 123") + if !ok || name != "resume" || len(args) != 1 || args[0] != "123" { t.Fatalf("unexpected command parse: %q %#v %v", name, args, ok) } } @@ -394,9 +397,9 @@ func TestRenderDynamicToolDetailsSelectsUsefulArguments(t *testing.T) { } func TestRenderApprovalDetailsAvoidsRawJSONDump(t *testing.T) { - raw := json.RawMessage(`{"command":"go test ./...","cwd":"/workspace/project","unused":{"nested":true}}`) + raw := json.RawMessage(`{"command":"go test ./...","cwd":"/workspace/project","environmentId":"env_123","unused":{"nested":true}}`) text := renderApprovalHTML("item/commandExecution/requestApproval", raw, "") - for _, want := range []string{"Codex requests command approval", "language-bash", "go test ./...", "CWD"} { + for _, want := range []string{"Codex requests command approval", "language-bash", "go test ./...", "CWD", "Environment ID", "env_123"} { if !strings.Contains(text, want) { t.Fatalf("approval render missing %q in %q", want, text) } @@ -534,10 +537,39 @@ func TestResumeCallbackData(t *testing.T) { } } +func TestThreadActionCallbackData(t *testing.T) { + action, threadID, ok := ParseThreadActionCallbackData(ThreadActionCallbackData(threadActionDelete, 123)) + if !ok || action != threadActionDelete || threadID != 123 { + t.Fatalf("unexpected thread action callback: action=%q id=%d ok=%v", action, threadID, ok) + } + action, page, ok := ParseThreadActionPageCallbackData(ThreadActionPageCallbackData(threadActionUnarchive, 2)) + if !ok || action != threadActionUnarchive || page != 2 { + t.Fatalf("unexpected thread action page callback: action=%q page=%d ok=%v", action, page, ok) + } + if _, _, ok := ParseThreadActionCallbackData("thread:unknown:123"); ok { + t.Fatal("unknown thread action should not parse") + } +} + +func TestIsMissingCodexThreadError(t *testing.T) { + for _, message := range []string{ + "no rollout found for thread id 019ef2ea", + "thread not loaded: 019ef2ea", + } { + err := codexapp.RPCError{Code: -32600, Message: message} + if !isMissingCodexThreadError(err) { + t.Fatalf("expected stale thread error for %q", message) + } + } + if isMissingCodexThreadError(codexapp.RPCError{Code: -32600, Message: "permission denied"}) { + t.Fatal("unrelated -32600 error should not be treated as stale thread") + } +} + func TestResumeThreadListText(t *testing.T) { threads := []store.Thread{{ID: 42, Title: "do xyz"}, {ID: 43, Title: "executed xxx command"}} text := resumeThreadListText(threads, 0) - for _, want := range []string{"Thread ID 42: do xyz", "Thread ID 43: executed xxx command"} { + for _, want := range []string{"Thread ID 42: do xyz", "Thread ID 43: executed xxx command", "/resume THREAD_ID"} { if !strings.Contains(text, want) { t.Fatalf("resume list missing %q in %q", want, text) } @@ -554,6 +586,36 @@ func TestResumeThreadListText(t *testing.T) { if !ok || secondID != 43 { t.Fatalf("second resume button targets id=%d ok=%v", secondID, ok) } + + deleteText := threadActionListText(threads, 0, threadActionDelete) + if !strings.Contains(deleteText, "Choose a thread to delete") || !strings.Contains(deleteText, "/delete THREAD_ID") { + t.Fatalf("delete list text missing action copy: %q", deleteText) + } + deleteMarkup := threadActionMarkup(threads, 0, true, threadActionDelete) + if deleteMarkup.InlineKeyboard[0][0].Text != "Delete 42" { + t.Fatalf("unexpected delete button label: %#v", deleteMarkup.InlineKeyboard) + } + action, deleteID, ok := ParseThreadActionCallbackData(deleteMarkup.InlineKeyboard[0][0].CallbackData) + if !ok || action != threadActionDelete || deleteID != 42 { + t.Fatalf("delete button targets action=%q id=%d ok=%v", action, deleteID, ok) + } + action, page, ok := ParseThreadActionPageCallbackData(deleteMarkup.InlineKeyboard[1][0].CallbackData) + if !ok || action != threadActionDelete || page != 1 { + t.Fatalf("delete next button targets action=%q page=%d ok=%v", action, page, ok) + } + + unarchiveText := threadActionListText(threads, 0, threadActionUnarchive) + if !strings.Contains(unarchiveText, "Choose an archived thread to restore") || !strings.Contains(unarchiveText, "/unarchive THREAD_ID") { + t.Fatalf("unarchive list text missing action copy: %q", unarchiveText) + } + unarchiveMarkup := threadActionMarkup(threads, 0, false, threadActionUnarchive) + if unarchiveMarkup.InlineKeyboard[0][0].Text != "Restore 42" { + t.Fatalf("unexpected unarchive button label: %#v", unarchiveMarkup.InlineKeyboard) + } + action, unarchiveID, ok := ParseThreadActionCallbackData(unarchiveMarkup.InlineKeyboard[0][0].CallbackData) + if !ok || action != threadActionUnarchive || unarchiveID != 42 { + t.Fatalf("unarchive button targets action=%q id=%d ok=%v", action, unarchiveID, ok) + } } func TestModelEffortAndSandboxCallbackData(t *testing.T) {