Skip to content

Commit b0e96ee

Browse files
committed
fix(bot): isolate todo and subtask tool messages
1 parent bd4504d commit b0e96ee

3 files changed

Lines changed: 151 additions & 29 deletions

File tree

src/bot/index.ts

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ import { foregroundSessionState } from "../scheduled-task/foreground-state.js";
7979
import { scheduledTaskRuntime } from "../scheduled-task/runtime.js";
8080
import { ResponseStreamer } from "./streaming/response-streamer.js";
8181
import type { StreamingMessagePayload } from "./streaming/response-streamer.js";
82-
import { ToolCallStreamer } from "./streaming/tool-call-streamer.js";
82+
import { ToolCallStreamer, type ToolStreamKey } from "./streaming/tool-call-streamer.js";
8383
import {
8484
editMessageWithMarkdownFallback,
8585
sendMessageWithMarkdownFallback,
@@ -315,6 +315,14 @@ const toolCallStreamer = new ToolCallStreamer({
315315
},
316316
});
317317

318+
function getToolStreamKey(tool: string): ToolStreamKey {
319+
if (tool === "todowrite") {
320+
return "todo";
321+
}
322+
323+
return "default";
324+
}
325+
318326
async function ensureCommandsInitialized(ctx: Context, next: NextFunction): Promise<void> {
319327
if (commandsInitialized || !ctx.from || ctx.from.id !== config.telegram.allowedUserId) {
320328
await next();
@@ -482,7 +490,7 @@ async function ensureEventSubscription(directory: string): Promise<void> {
482490
try {
483491
const message = formatToolInfo(toolInfo);
484492
if (message) {
485-
toolCallStreamer.append(toolInfo.sessionId, message);
493+
toolCallStreamer.append(toolInfo.sessionId, message, getToolStreamKey(toolInfo.tool));
486494
}
487495
} catch (err) {
488496
logger.error("Failed to send tool notification to Telegram:", err);
@@ -509,7 +517,12 @@ async function ensureEventSubscription(directory: string): Promise<void> {
509517
return;
510518
}
511519

512-
toolCallStreamer.replaceByPrefix(sessionId, SUBAGENT_STREAM_PREFIX, renderedCards);
520+
toolCallStreamer.replaceByPrefix(
521+
sessionId,
522+
SUBAGENT_STREAM_PREFIX,
523+
renderedCards,
524+
"subagent",
525+
);
513526
} catch (err) {
514527
logger.error("Failed to render subagent activity for Telegram:", err);
515528
}

src/bot/streaming/tool-call-streamer.ts

Lines changed: 46 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import { logger } from "../../utils/logger.js";
22

33
const TELEGRAM_MESSAGE_SAFE_LENGTH = 4000;
4+
const DEFAULT_STREAM_KEY = "default";
5+
6+
export type ToolStreamKey = "default" | "subagent" | "todo";
47

58
interface ToolCallStreamerOptions {
69
throttleMs: number;
@@ -15,6 +18,7 @@ interface StreamEntry {
1518
}
1619

1720
interface StreamState {
21+
key: ToolStreamKey;
1822
sessionId: string;
1923
entries: StreamEntry[];
2024
latestParts: string[];
@@ -114,26 +118,31 @@ export class ToolCallStreamer {
114118
this.deleteText = options.deleteText;
115119
}
116120

117-
append(sessionId: string, text: string): void {
121+
append(sessionId: string, text: string, streamKey: ToolStreamKey = DEFAULT_STREAM_KEY): void {
118122
const normalizedText = text.trim();
119123
if (!sessionId || !normalizedText) {
120124
return;
121125
}
122126

123-
const state = this.getOrCreateState(sessionId);
127+
const state = this.getOrCreateState(sessionId, streamKey);
124128
state.entries.push({ text: normalizedText });
125129
state.latestParts = buildParts(state.entries);
126130
this.ensureTimer(state);
127131
}
128132

129-
replaceByPrefix(sessionId: string, prefix: string, text: string): void {
133+
replaceByPrefix(
134+
sessionId: string,
135+
prefix: string,
136+
text: string,
137+
streamKey: ToolStreamKey = DEFAULT_STREAM_KEY,
138+
): void {
130139
const normalizedPrefix = prefix.trim();
131140
const normalizedText = text.trim();
132141
if (!sessionId || !normalizedPrefix || !normalizedText) {
133142
return;
134143
}
135144

136-
const state = this.getOrCreateState(sessionId);
145+
const state = this.getOrCreateState(sessionId, streamKey);
137146
const existingEntry = state.entries.find((entry) => entry.prefix === normalizedPrefix);
138147
if (existingEntry) {
139148
existingEntry.text = normalizedText;
@@ -146,27 +155,24 @@ export class ToolCallStreamer {
146155
}
147156

148157
async flushSession(sessionId: string, reason: string): Promise<void> {
149-
const state = this.states.get(sessionId);
150-
if (!state) {
151-
return;
152-
}
153-
154-
this.clearTimer(state);
155-
await this.enqueueTask(state, () => this.syncState(state, reason));
158+
const states = this.getStatesForSession(sessionId);
159+
await Promise.all(
160+
states.map(async (state) => {
161+
this.clearTimer(state);
162+
await this.enqueueTask(state, () => this.syncState(state, reason));
163+
}),
164+
);
156165
}
157166

158167
async breakSession(sessionId: string, reason: string): Promise<void> {
159-
const state = this.states.get(sessionId);
160-
if (!state) {
161-
return;
168+
const states = this.getStatesForSession(sessionId);
169+
for (const state of states) {
170+
state.isBreaking = true;
171+
this.clearTimer(state);
172+
await this.enqueueTask(state, () => this.syncState(state, reason));
173+
this.cancelState(state);
174+
this.removeState(state);
162175
}
163-
164-
state.isBreaking = true;
165-
this.getOrCreateState(sessionId);
166-
this.clearTimer(state);
167-
await this.enqueueTask(state, () => this.syncState(state, reason));
168-
this.cancelState(state);
169-
this.removeState(state);
170176
logger.debug(`[ToolCallStreamer] Broke session stream: session=${sessionId}, reason=${reason}`);
171177
}
172178

@@ -202,8 +208,20 @@ export class ToolCallStreamer {
202208
}
203209
}
204210

205-
private getOrCreateState(sessionId: string): StreamState {
206-
const existing = this.states.get(sessionId);
211+
private getStateId(sessionId: string, streamKey: ToolStreamKey): string {
212+
return `${sessionId}:${streamKey}`;
213+
}
214+
215+
private getStatesForSession(sessionId: string): StreamState[] {
216+
return Array.from(this.allStates).filter((state) => state.sessionId === sessionId);
217+
}
218+
219+
private getOrCreateState(
220+
sessionId: string,
221+
streamKey: ToolStreamKey = DEFAULT_STREAM_KEY,
222+
): StreamState {
223+
const stateId = this.getStateId(sessionId, streamKey);
224+
const existing = this.states.get(stateId);
207225
if (existing && !existing.isBroken && !existing.cancelled && !existing.isBreaking) {
208226
return existing;
209227
}
@@ -214,6 +232,7 @@ export class ToolCallStreamer {
214232
}
215233

216234
const state: StreamState = {
235+
key: streamKey,
217236
sessionId,
218237
entries: [],
219238
latestParts: [],
@@ -228,7 +247,7 @@ export class ToolCallStreamer {
228247
fatalErrorLogged: false,
229248
};
230249

231-
this.states.set(sessionId, state);
250+
this.states.set(stateId, state);
232251
this.allStates.add(state);
233252
return state;
234253
}
@@ -279,8 +298,9 @@ export class ToolCallStreamer {
279298
}
280299

281300
private removeState(state: StreamState): void {
282-
if (this.states.get(state.sessionId) === state) {
283-
this.states.delete(state.sessionId);
301+
const stateId = this.getStateId(state.sessionId, state.key);
302+
if (this.states.get(stateId) === state) {
303+
this.states.delete(stateId);
284304
}
285305

286306
this.allStates.delete(state);

tests/bot/streaming/tool-call-streamer.test.ts

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,72 @@ describe("bot/streaming/tool-call-streamer", () => {
5757
expect(editText).toHaveBeenCalledWith("s1", 10, "first\n\nsecond");
5858
});
5959

60+
it("keeps todo updates in a separate message stream", async () => {
61+
vi.useFakeTimers();
62+
63+
const sendText = vi.fn().mockResolvedValueOnce(10).mockResolvedValueOnce(11);
64+
const editText = vi.fn().mockResolvedValue(undefined);
65+
const deleteText = vi.fn().mockResolvedValue(undefined);
66+
const streamer = new ToolCallStreamer({
67+
throttleMs: 0,
68+
sendText,
69+
editText,
70+
deleteText,
71+
});
72+
73+
streamer.append("s1", "regular tool");
74+
await vi.waitFor(() => {
75+
expect(sendText).toHaveBeenCalledTimes(1);
76+
});
77+
78+
streamer.append("s1", "todo tool", "todo");
79+
await vi.waitFor(() => {
80+
expect(sendText).toHaveBeenCalledTimes(2);
81+
});
82+
83+
streamer.append("s1", "regular tool update");
84+
await vi.waitFor(() => {
85+
expect(editText).toHaveBeenCalledTimes(1);
86+
});
87+
88+
expect(sendText).toHaveBeenNthCalledWith(1, "s1", "regular tool");
89+
expect(sendText).toHaveBeenNthCalledWith(2, "s1", "todo tool");
90+
expect(editText).toHaveBeenCalledWith("s1", 10, "regular tool\n\nregular tool update");
91+
});
92+
93+
it("keeps subagent updates in a separate replace-by-prefix stream", async () => {
94+
vi.useFakeTimers();
95+
96+
const sendText = vi.fn().mockResolvedValueOnce(20).mockResolvedValueOnce(21);
97+
const editText = vi.fn().mockResolvedValue(undefined);
98+
const deleteText = vi.fn().mockResolvedValue(undefined);
99+
const streamer = new ToolCallStreamer({
100+
throttleMs: 0,
101+
sendText,
102+
editText,
103+
deleteText,
104+
});
105+
106+
streamer.append("s1", "regular tool");
107+
await vi.waitFor(() => {
108+
expect(sendText).toHaveBeenCalledTimes(1);
109+
});
110+
111+
streamer.replaceByPrefix("s1", "subagent", "subagent card", "subagent");
112+
await vi.waitFor(() => {
113+
expect(sendText).toHaveBeenCalledTimes(2);
114+
});
115+
116+
streamer.replaceByPrefix("s1", "subagent", "subagent card updated", "subagent");
117+
await vi.waitFor(() => {
118+
expect(editText).toHaveBeenCalledTimes(1);
119+
});
120+
121+
expect(sendText).toHaveBeenNthCalledWith(1, "s1", "regular tool");
122+
expect(sendText).toHaveBeenNthCalledWith(2, "s1", "subagent card");
123+
expect(editText).toHaveBeenCalledWith("s1", 21, "subagent card updated");
124+
});
125+
60126
it("creates continuation messages when the stream exceeds Telegram limits", async () => {
61127
vi.useFakeTimers();
62128

@@ -150,6 +216,29 @@ describe("bot/streaming/tool-call-streamer", () => {
150216
expect(sendText).toHaveBeenNthCalledWith(2, "s1", "after file");
151217
});
152218

219+
it("flushes all stream keys for the same session", async () => {
220+
vi.useFakeTimers();
221+
222+
const sendText = vi.fn().mockResolvedValueOnce(30).mockResolvedValueOnce(31);
223+
const editText = vi.fn().mockResolvedValue(undefined);
224+
const deleteText = vi.fn().mockResolvedValue(undefined);
225+
const streamer = new ToolCallStreamer({
226+
throttleMs: 200,
227+
sendText,
228+
editText,
229+
deleteText,
230+
});
231+
232+
streamer.append("s1", "regular tool");
233+
streamer.append("s1", "todo tool", "todo");
234+
235+
await streamer.flushSession("s1", "manual_flush");
236+
237+
expect(sendText).toHaveBeenCalledTimes(2);
238+
expect(sendText).toHaveBeenNthCalledWith(1, "s1", "regular tool");
239+
expect(sendText).toHaveBeenNthCalledWith(2, "s1", "todo tool");
240+
});
241+
153242
it("cancels throttled tool sends when clearing all streams", async () => {
154243
vi.useFakeTimers();
155244

0 commit comments

Comments
 (0)