Use Telegram drafts for assistant streaming
This commit is contained in:
@@ -24,6 +24,8 @@ import (
|
|||||||
const (
|
const (
|
||||||
telegramDownloadLimit = 20 * 1024 * 1024
|
telegramDownloadLimit = 20 * 1024 * 1024
|
||||||
resumeThreadPageSize = 8
|
resumeThreadPageSize = 8
|
||||||
|
assistantStreamEditInterval = 1200 * time.Millisecond
|
||||||
|
assistantStreamInitialRunes = 24
|
||||||
telegramPhotoDirectiveStart = "<!-- telegram-photo "
|
telegramPhotoDirectiveStart = "<!-- telegram-photo "
|
||||||
telegramThreadRenameDirectiveStart = "<!-- codex-thread-rename "
|
telegramThreadRenameDirectiveStart = "<!-- codex-thread-rename "
|
||||||
telegramThreadCWDDirectiveStart = "<!-- codex-thread-cwd "
|
telegramThreadCWDDirectiveStart = "<!-- codex-thread-cwd "
|
||||||
@@ -76,12 +78,19 @@ type outputState struct {
|
|||||||
assistant strings.Builder
|
assistant strings.Builder
|
||||||
sentAny bool
|
sentAny bool
|
||||||
pictureRequest bool
|
pictureRequest bool
|
||||||
|
assistantStream assistantStreamState
|
||||||
tools map[string]toolMessageState
|
tools map[string]toolMessageState
|
||||||
sentImages map[string]bool
|
sentImages map[string]bool
|
||||||
generatedImages []generatedImageOutput
|
generatedImages []generatedImageOutput
|
||||||
workingIndicatorOff context.CancelFunc
|
workingIndicatorOff context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type assistantStreamState struct {
|
||||||
|
draftID int64
|
||||||
|
text string
|
||||||
|
sentAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
type generatedImageOutput struct {
|
type generatedImageOutput struct {
|
||||||
Path string
|
Path string
|
||||||
}
|
}
|
||||||
@@ -1246,6 +1255,9 @@ func (b *Bot) handleApprovalCallback(ctx context.Context, callback *CallbackQuer
|
|||||||
if err := b.store.ResolvePendingApproval(ctx, callback.From.ID, approval.ID, decision); err != nil {
|
if err := b.store.ResolvePendingApproval(ctx, callback.From.ID, approval.ID, decision); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if decisionApproves(decision) {
|
||||||
|
b.resumeWorkingIndicator(approval.CodexThreadID)
|
||||||
|
}
|
||||||
if err := b.tg.AnswerCallbackQuery(ctx, callback.ID, "Sent to Codex."); err != nil {
|
if err := b.tg.AnswerCallbackQuery(ctx, callback.ID, "Sent to Codex."); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -1254,6 +1266,18 @@ func (b *Bot) handleApprovalCallback(ctx context.Context, callback *CallbackQuer
|
|||||||
return ignoreTelegramMessageNotModified(err)
|
return ignoreTelegramMessageNotModified(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func decisionApproves(decision string) bool {
|
||||||
|
if strings.HasPrefix(decision, "networkPolicy") {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
switch decision {
|
||||||
|
case "accept", "acceptForSession", "acceptWithExecpolicyAmendment":
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (b *Bot) handleCodexEvents(ctx context.Context) {
|
func (b *Bot) handleCodexEvents(ctx context.Context) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@@ -1935,6 +1959,7 @@ func (b *Bot) handleCodexServerRequest(ctx context.Context, event codexapp.Event
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
b.pauseWorkingIndicator(threadID)
|
||||||
b.logger.Printf("codex approval telegram sent: request=%s approval_id=%d chat=%d message=%d", event.ID.Key(), approval.ID, msg.Chat.ID, msg.MessageID)
|
b.logger.Printf("codex approval telegram sent: request=%s approval_id=%d chat=%d message=%d", event.ID.Key(), approval.ID, msg.Chat.ID, msg.MessageID)
|
||||||
return b.store.UpdatePendingApprovalMessage(ctx, approval.ID, msg.Chat.ID, msg.MessageID)
|
return b.store.UpdatePendingApprovalMessage(ctx, approval.ID, msg.Chat.ID, msg.MessageID)
|
||||||
}
|
}
|
||||||
@@ -2029,42 +2054,48 @@ func (b *Bot) clearOutput(threadID string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *Bot) pauseWorkingIndicator(threadID string) {
|
||||||
|
b.mu.Lock()
|
||||||
|
state := b.outputs[threadID]
|
||||||
|
if state == nil || state.workingIndicatorOff == nil {
|
||||||
|
b.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
cancel := state.workingIndicatorOff
|
||||||
|
state.workingIndicatorOff = nil
|
||||||
|
b.mu.Unlock()
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Bot) resumeWorkingIndicator(threadID string) {
|
||||||
|
b.mu.Lock()
|
||||||
|
state := b.outputs[threadID]
|
||||||
|
if state == nil || state.workingIndicatorOff != nil {
|
||||||
|
b.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
state.workingIndicatorOff = b.startWorkingIndicator(state.chatID)
|
||||||
|
b.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
func (b *Bot) startWorkingIndicator(chatID int64) context.CancelFunc {
|
func (b *Bot) startWorkingIndicator(chatID int64) context.CancelFunc {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
draftID := time.Now().UnixNano()
|
|
||||||
go func() {
|
go func() {
|
||||||
useDraft := true
|
|
||||||
sendDraft := func() bool {
|
|
||||||
return b.tg.SendMessageDraft(ctx, chatID, draftID, "") == nil
|
|
||||||
}
|
|
||||||
sendTyping := func() {
|
sendTyping := func() {
|
||||||
if err := b.tg.SendChatAction(ctx, chatID, "typing"); err != nil && ctx.Err() == nil {
|
if err := b.tg.SendChatAction(ctx, chatID, "typing"); err != nil && ctx.Err() == nil {
|
||||||
b.logger.Printf("send typing action: %v", err)
|
b.logger.Printf("send typing action: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !sendDraft() {
|
sendTyping()
|
||||||
useDraft = false
|
|
||||||
sendTyping()
|
|
||||||
}
|
|
||||||
|
|
||||||
draftTicker := time.NewTicker(25 * time.Second)
|
|
||||||
typingTicker := time.NewTicker(4 * time.Second)
|
typingTicker := time.NewTicker(4 * time.Second)
|
||||||
defer draftTicker.Stop()
|
|
||||||
defer typingTicker.Stop()
|
defer typingTicker.Stop()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-draftTicker.C:
|
|
||||||
if useDraft && !sendDraft() {
|
|
||||||
useDraft = false
|
|
||||||
sendTyping()
|
|
||||||
}
|
|
||||||
case <-typingTicker.C:
|
case <-typingTicker.C:
|
||||||
if !useDraft {
|
sendTyping()
|
||||||
sendTyping()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@@ -2723,6 +2754,100 @@ func truncateTelegramPhotoCaption(caption string) string {
|
|||||||
return string(runes[:telegramPhotoCaptionLimit-3]) + "..."
|
return string(runes[:telegramPhotoCaptionLimit-3]) + "..."
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func assistantStreamPreview(text string) string {
|
||||||
|
var visible strings.Builder
|
||||||
|
for _, line := range strings.SplitAfter(text, "\n") {
|
||||||
|
body := strings.TrimSuffix(strings.TrimSuffix(line, "\n"), "\r")
|
||||||
|
trimmed := strings.TrimSpace(body)
|
||||||
|
if isAssistantDirectiveStart(trimmed) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
visible.WriteString(line)
|
||||||
|
}
|
||||||
|
return visible.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
func isAssistantDirectiveStart(line string) bool {
|
||||||
|
return strings.HasPrefix(line, telegramPhotoDirectiveStart) ||
|
||||||
|
strings.HasPrefix(line, strings.TrimSpace(telegramPhotoDirectiveStart)) ||
|
||||||
|
strings.HasPrefix(line, telegramThreadRenameDirectiveStart) ||
|
||||||
|
strings.HasPrefix(line, strings.TrimSpace(telegramThreadRenameDirectiveStart)) ||
|
||||||
|
strings.HasPrefix(line, telegramThreadCWDDirectiveStart) ||
|
||||||
|
strings.HasPrefix(line, strings.TrimSpace(telegramThreadCWDDirectiveStart))
|
||||||
|
}
|
||||||
|
|
||||||
|
func assistantStreamText(preview string) string {
|
||||||
|
preview = strings.TrimSpace(preview)
|
||||||
|
if preview == "" {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
const suffix = "\n..."
|
||||||
|
runes := []rune(preview)
|
||||||
|
if len(runes) <= TelegramMessageLimit {
|
||||||
|
return preview
|
||||||
|
}
|
||||||
|
limit := TelegramMessageLimit - len([]rune(suffix))
|
||||||
|
if limit < 0 {
|
||||||
|
limit = TelegramMessageLimit
|
||||||
|
}
|
||||||
|
return strings.TrimRight(string(runes[:limit]), "\n") + suffix
|
||||||
|
}
|
||||||
|
|
||||||
|
func assistantStreamReady(preview string) bool {
|
||||||
|
if strings.Contains(preview, "\n") {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return len([]rune(strings.TrimSpace(preview))) >= assistantStreamInitialRunes
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Bot) updateAssistantStream(ctx context.Context, threadID string, force bool) error {
|
||||||
|
b.mu.Lock()
|
||||||
|
state := b.outputs[threadID]
|
||||||
|
if state == nil || state.pictureRequest || state.assistant.Len() == 0 {
|
||||||
|
b.mu.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
preview := assistantStreamPreview(state.assistant.String())
|
||||||
|
if strings.TrimSpace(preview) == "" {
|
||||||
|
b.mu.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if state.assistantStream.draftID == 0 && !force && !assistantStreamReady(preview) {
|
||||||
|
b.mu.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
draftText := assistantStreamText(preview)
|
||||||
|
if draftText == "" || draftText == state.assistantStream.text {
|
||||||
|
b.mu.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
stream := state.assistantStream
|
||||||
|
chatID := state.chatID
|
||||||
|
now := time.Now()
|
||||||
|
if stream.draftID != 0 && !force && now.Sub(stream.sentAt) < assistantStreamEditInterval {
|
||||||
|
b.mu.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if stream.draftID == 0 {
|
||||||
|
stream.draftID = now.UnixNano()
|
||||||
|
if stream.draftID == 0 {
|
||||||
|
stream.draftID = 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
b.mu.Unlock()
|
||||||
|
|
||||||
|
if err := b.tg.SendMessageDraft(ctx, chatID, stream.draftID, draftText); err != nil {
|
||||||
|
b.logger.Printf("send assistant draft: %v", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
b.mu.Lock()
|
||||||
|
if state := b.outputs[threadID]; state != nil && (state.assistantStream.draftID == 0 || state.assistantStream.draftID == stream.draftID) {
|
||||||
|
state.assistantStream = assistantStreamState{draftID: stream.draftID, text: draftText, sentAt: now}
|
||||||
|
}
|
||||||
|
b.mu.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (b *Bot) appendAssistantDelta(ctx context.Context, threadID, delta string) error {
|
func (b *Bot) appendAssistantDelta(ctx context.Context, threadID, delta string) error {
|
||||||
if delta == "" {
|
if delta == "" {
|
||||||
return nil
|
return nil
|
||||||
@@ -2736,7 +2861,7 @@ func (b *Bot) appendAssistantDelta(ctx context.Context, threadID, delta string)
|
|||||||
_, _ = state.assistant.WriteString(delta)
|
_, _ = state.assistant.WriteString(delta)
|
||||||
}
|
}
|
||||||
b.mu.Unlock()
|
b.mu.Unlock()
|
||||||
return nil
|
return b.updateAssistantStream(ctx, threadID, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Bot) flushAssistantMessage(ctx context.Context, threadID string) error {
|
func (b *Bot) flushAssistantMessage(ctx context.Context, threadID string) error {
|
||||||
@@ -2749,12 +2874,19 @@ func (b *Bot) flushAssistantMessage(ctx context.Context, threadID string) error
|
|||||||
chatID := state.chatID
|
chatID := state.chatID
|
||||||
text := state.assistant.String()
|
text := state.assistant.String()
|
||||||
pictureRequest := state.pictureRequest
|
pictureRequest := state.pictureRequest
|
||||||
|
stream := state.assistantStream
|
||||||
state.assistant.Reset()
|
state.assistant.Reset()
|
||||||
|
state.assistantStream = assistantStreamState{}
|
||||||
b.mu.Unlock()
|
b.mu.Unlock()
|
||||||
|
|
||||||
if pictureRequest {
|
if pictureRequest {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
if stream.draftID != 0 {
|
||||||
|
if err := b.tg.SendMessageDraft(ctx, chatID, stream.draftID, ""); err != nil {
|
||||||
|
b.logger.Printf("clear assistant draft: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
if err := b.sendAssistantText(ctx, threadID, chatID, text); err != nil {
|
if err := b.sendAssistantText(ctx, threadID, chatID, text); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -260,6 +260,29 @@ func TestInvalidPhotoDirectiveStaysVisible(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAssistantStreamPreviewHidesDirectives(t *testing.T) {
|
||||||
|
text := "before\n<!-- telegram-photo {\"path\":\"/workspace/photo.jpg\"} -->\nafter\n<!-- codex-thread-rename "
|
||||||
|
got := assistantStreamPreview(text)
|
||||||
|
want := "before\nafter\n"
|
||||||
|
if got != want {
|
||||||
|
t.Fatalf("assistantStreamPreview() = %q, want %q", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAssistantStreamTextFitsLimit(t *testing.T) {
|
||||||
|
text := strings.Repeat("<>&", TelegramMessageLimit)
|
||||||
|
got := assistantStreamText(text)
|
||||||
|
if got == "" {
|
||||||
|
t.Fatal("assistantStreamText returned empty text")
|
||||||
|
}
|
||||||
|
if len([]rune(got)) > TelegramMessageLimit {
|
||||||
|
t.Fatalf("stream text length = %d, want <= %d", len([]rune(got)), TelegramMessageLimit)
|
||||||
|
}
|
||||||
|
if strings.Contains(got, "<") {
|
||||||
|
t.Fatalf("draft stream should use plain text, got %q", got[:20])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestSplitAssistantMessageSegmentsWithThreadDirectives(t *testing.T) {
|
func TestSplitAssistantMessageSegmentsWithThreadDirectives(t *testing.T) {
|
||||||
cwd := filepath.Join(string(filepath.Separator), "workspace", "project")
|
cwd := filepath.Join(string(filepath.Separator), "workspace", "project")
|
||||||
text := fmt.Sprintf("<!-- codex-thread-rename {\"title\":\" A Better Thread Title \"} -->\n<!-- codex-thread-cwd {\"cwd\":%q} -->", cwd)
|
text := fmt.Sprintf("<!-- codex-thread-rename {\"title\":\" A Better Thread Title \"} -->\n<!-- codex-thread-cwd {\"cwd\":%q} -->", cwd)
|
||||||
|
|||||||
Reference in New Issue
Block a user