Skip to content

Commit 0580a73

Browse files
committed
fix(streaming): finalize replies in place and split tool streams after interim answers
1 parent e85380f commit 0580a73

2 files changed

Lines changed: 32 additions & 1 deletion

File tree

src/bot/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,7 @@ async function ensureEventSubscription(directory: string): Promise<void> {
461461
flushPendingServiceMessages: () =>
462462
Promise.all([
463463
toolMessageBatcher.flushSession(sessionId, "assistant_message_completed"),
464-
toolCallStreamer.flushSession(sessionId, "assistant_message_completed"),
464+
toolCallStreamer.breakSession(sessionId, "assistant_message_completed"),
465465
]).then(() => undefined),
466466
prepareStreamingPayload: prepareFinalStreamingPayload,
467467
formatSummary,

tests/bot/streaming/tool-call-streamer.test.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,37 @@ describe("bot/streaming/tool-call-streamer", () => {
216216
expect(sendText).toHaveBeenNthCalledWith(2, "s1", "after file");
217217
});
218218

219+
it("starts a new tool stream after an assistant reply boundary break", async () => {
220+
vi.useFakeTimers();
221+
222+
let nextMessageId = 60;
223+
const sendText = vi.fn(async () => nextMessageId++);
224+
const editText = vi.fn().mockResolvedValue(undefined);
225+
const deleteText = vi.fn().mockResolvedValue(undefined);
226+
const streamer = new ToolCallStreamer({
227+
throttleMs: 0,
228+
sendText,
229+
editText,
230+
deleteText,
231+
});
232+
233+
streamer.append("s1", "before reply");
234+
await vi.waitFor(() => {
235+
expect(sendText).toHaveBeenCalledTimes(1);
236+
});
237+
238+
await streamer.breakSession("s1", "assistant_message_completed");
239+
240+
streamer.append("s1", "after reply");
241+
await vi.waitFor(() => {
242+
expect(sendText).toHaveBeenCalledTimes(2);
243+
});
244+
245+
expect(editText).not.toHaveBeenCalled();
246+
expect(deleteText).not.toHaveBeenCalled();
247+
expect(sendText).toHaveBeenNthCalledWith(2, "s1", "after reply");
248+
});
249+
219250
it("flushes all stream keys for the same session", async () => {
220251
vi.useFakeTimers();
221252

0 commit comments

Comments
 (0)