Skip to content
Merged
6 changes: 5 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,12 @@
"dotenv": "^17.2.3",
"grammy": "^1.39.2",
"https-proxy-agent": "^7.0.6",
"mdast-util-to-string": "^4.0.0",
"remark-gfm": "^4.0.1",
"remark-parse": "^11.0.0",
"socks-proxy-agent": "^8.0.5",
"telegram-markdown-v2": "^0.0.4"
"telegram-markdown-v2": "^0.0.4",
"unified": "^11.0.5"
},
"devDependencies": {
"@types/better-sqlite3": "^7.6.13",
Expand Down
81 changes: 26 additions & 55 deletions src/bot/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,7 @@ import { clearAllInteractionState } from "../interaction/cleanup.js";
import { keyboardManager } from "../keyboard/manager.js";
import { subscribeToEvents } from "../opencode/events.js";
import { summaryAggregator } from "../summary/aggregator.js";
import {
formatSummary,
formatSummaryWithMode,
formatToolInfo,
getAssistantParseMode,
} from "../summary/formatter.js";
import { formatToolInfo } from "../summary/formatter.js";
import { renderSubagentCards } from "../summary/subagent-formatter.js";
import { ToolMessageBatcher } from "../summary/tool-message-batcher.js";
import { getCurrentSession } from "../session/manager.js";
Expand All @@ -72,7 +67,12 @@ import { downloadTelegramFile, toDataUri } from "./utils/file-download.js";
import { finalizeAssistantResponse } from "./utils/finalize-assistant-response.js";
import { sendTtsResponseForSession } from "./utils/send-tts-response.js";
import { deliverThinkingMessage } from "./utils/thinking-message.js";
import { sendBotText } from "./utils/telegram-text.js";
import {
editRenderedBotPart,
getTelegramRenderedPartSignature,
sendBotText,
sendRenderedBotPart,
} from "./utils/telegram-text.js";
import { formatAssistantRunFooter } from "./utils/assistant-run-footer.js";
import { getModelCapabilities, supportsInput } from "../model/capabilities.js";
import { getStoredModel } from "../model/manager.js";
Expand All @@ -84,9 +84,10 @@ import { ResponseStreamer } from "./streaming/response-streamer.js";
import type { StreamingMessagePayload } from "./streaming/response-streamer.js";
import { ToolCallStreamer, type ToolStreamKey } from "./streaming/tool-call-streamer.js";
import {
editMessageWithMarkdownFallback,
sendMessageWithMarkdownFallback,
} from "./utils/send-with-markdown-fallback.js";
prepareAssistantFinalStreamingPayload,
prepareAssistantStreamingPayload,
renderAssistantFinalPartsSafe,
} from "./utils/assistant-rendering.js";

let botInstance: Bot<Context> | null = null;
let chatIdInstance: number | null = null;
Expand Down Expand Up @@ -124,32 +125,11 @@ function prepareDocumentCaption(caption: string): string {
}

function prepareStreamingPayload(messageText: string): StreamingMessagePayload | null {
const parts = formatSummaryWithMode(messageText, "raw", RESPONSE_STREAM_TEXT_LIMIT);
if (parts.length === 0) {
return null;
}

return {
parts,
format: "raw",
};
return prepareAssistantStreamingPayload(messageText, RESPONSE_STREAM_TEXT_LIMIT);
}

function prepareFinalStreamingPayload(messageText: string): StreamingMessagePayload | null {
const format = getAssistantParseMode() === "MarkdownV2" ? "markdown_v2" : "raw";
const parts = formatSummaryWithMode(
messageText,
format === "markdown_v2" ? "markdown" : "raw",
RESPONSE_STREAM_TEXT_LIMIT,
);
if (parts.length === 0) {
return null;
}

return {
parts,
format,
};
return prepareAssistantFinalStreamingPayload(messageText, RESPONSE_STREAM_TEXT_LIMIT);
}

function enqueueSessionCompletionTask(sessionId: string, task: () => Promise<void>): Promise<void> {
Expand Down Expand Up @@ -220,43 +200,38 @@ const toolMessageBatcher = new ToolMessageBatcher({

const responseStreamer = new ResponseStreamer({
throttleMs: RESPONSE_STREAM_THROTTLE_MS,
sendText: async (text, format, options) => {
sendPart: async (part, options) => {
if (!botInstance || !chatIdInstance || chatIdInstance <= 0) {
throw new Error("Bot context missing for streamed send");
}

const parseMode = format === "markdown_v2" ? "MarkdownV2" : undefined;
const sentMessage = await sendMessageWithMarkdownFallback({
return sendRenderedBotPart({
api: botInstance.api,
chatId: chatIdInstance,
text,
part,
options,
parseMode,
});

return sentMessage.message_id;
},
editText: async (messageId, text, format, options) => {
editPart: async (messageId, part, options) => {
if (!botInstance || !chatIdInstance || chatIdInstance <= 0) {
throw new Error("Bot context missing for streamed edit");
}

const parseMode = format === "markdown_v2" ? "MarkdownV2" : undefined;

try {
await editMessageWithMarkdownFallback({
return await editRenderedBotPart({
api: botInstance.api,
chatId: chatIdInstance,
messageId,
text,
part,
options,
parseMode,
});
} catch (error) {
const errorMessage =
error instanceof Error ? error.message.toLowerCase() : String(error).toLowerCase();
if (errorMessage.includes("message is not modified")) {
return;
return {
deliveredSignature: getTelegramRenderedPartSignature(part),
};
}

throw error;
Expand Down Expand Up @@ -464,18 +439,14 @@ async function ensureEventSubscription(directory: string): Promise<void> {
toolCallStreamer.breakSession(sessionId, "assistant_message_completed"),
]).then(() => undefined),
prepareStreamingPayload: prepareFinalStreamingPayload,
formatSummary,
formatRawSummary: (text) => formatSummaryWithMode(text, "raw"),
resolveFormat: () => (getAssistantParseMode() === "MarkdownV2" ? "markdown_v2" : "raw"),
renderFinalParts: (text) => renderAssistantFinalPartsSafe(text),
getReplyKeyboard: getCurrentReplyKeyboard,
sendText: async (text, rawFallbackText, options, format) => {
await sendBotText({
sendRenderedPart: async (part, options) => {
await sendRenderedBotPart({
api: botApi,
chatId,
text,
rawFallbackText,
part,
options: options as Parameters<typeof sendBotText>[0]["options"],
format,
});
},
});
Expand Down
70 changes: 41 additions & 29 deletions src/bot/streaming/response-streamer.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
import type { Api, RawApi } from "grammy";
import { logger } from "../../utils/logger.js";
import type { TelegramRenderedPart } from "../../telegram/render/types.js";

type SendMessageApi = Pick<Api<RawApi>, "sendMessage">;
type EditMessageApi = Pick<Api<RawApi>, "editMessageText">;

type TelegramSendMessageOptions = Parameters<SendMessageApi["sendMessage"]>[2];
type TelegramEditMessageOptions = Parameters<EditMessageApi["editMessageText"]>[3];

export type StreamingMessageFormat = "raw" | "markdown_v2";

export interface StreamingMessagePayload {
parts: string[];
format: StreamingMessageFormat;
parts: TelegramRenderedPart[];
sendOptions?: TelegramSendMessageOptions;
editOptions?: TelegramEditMessageOptions;
}
Expand All @@ -27,17 +25,15 @@ interface ResponseStreamerCompleteOptions {

interface ResponseStreamerOptions {
throttleMs: number;
sendText: (
text: string,
format: StreamingMessageFormat,
sendPart: (
part: TelegramRenderedPart,
options?: TelegramSendMessageOptions,
) => Promise<number>;
editText: (
) => Promise<{ messageId: number; deliveredSignature: string }>;
editPart: (
messageId: number,
text: string,
format: StreamingMessageFormat,
part: TelegramRenderedPart,
options?: TelegramEditMessageOptions,
) => Promise<void>;
) => Promise<{ deliveredSignature: string }>;
deleteText: (messageId: number) => Promise<void>;
}

Expand All @@ -60,17 +56,24 @@ function buildStateKey(sessionId: string, messageId: string): string {
return `${sessionId}:${messageId}`;
}

function clonePart(part: TelegramRenderedPart): TelegramRenderedPart {
return {
text: part.text,
entities: part.entities ? [...part.entities] : undefined,
fallbackText: part.fallbackText,
source: part.source,
};
}

function normalizePayload(payload: StreamingMessagePayload): StreamingMessagePayload | null {
const normalizedParts = payload.parts
.map((part) => part.trim())
.filter((part) => part.length > 0);
const normalizedParts = payload.parts.map(clonePart).filter((part) => part.text.length > 0);
if (normalizedParts.length === 0) {
logger.debug("[ResponseStreamer] Dropped empty streaming payload after normalization");
return null;
}

return {
parts: normalizedParts,
format: payload.format,
sendOptions: payload.sendOptions,
editOptions: payload.editOptions,
};
Expand Down Expand Up @@ -103,8 +106,8 @@ function getRetryAfterMs(error: unknown): number | null {
return seconds * 1000;
}

function createSignature(text: string, format: StreamingMessageFormat): string {
return `${format}\n${text}`;
function createSignature(part: Pick<TelegramRenderedPart, "text" | "entities">): string {
return `${part.text}\n${JSON.stringify(part.entities ?? null)}`;
}

function delay(ms: number): Promise<void> {
Expand All @@ -115,15 +118,15 @@ function delay(ms: number): Promise<void> {

export class ResponseStreamer {
private readonly throttleMs: number;
private readonly sendText: ResponseStreamerOptions["sendText"];
private readonly editText: ResponseStreamerOptions["editText"];
private readonly sendPart: ResponseStreamerOptions["sendPart"];
private readonly editPart: ResponseStreamerOptions["editPart"];
private readonly deleteText: ResponseStreamerOptions["deleteText"];
private readonly states: Map<string, StreamState> = new Map();

constructor(options: ResponseStreamerOptions) {
this.throttleMs = Math.max(0, Math.floor(options.throttleMs));
this.sendText = options.sendText;
this.editText = options.editText;
this.sendPart = options.sendPart;
this.editPart = options.editPart;
this.deleteText = options.deleteText;
}

Expand Down Expand Up @@ -152,6 +155,9 @@ export class ResponseStreamer {

const state = this.states.get(buildStateKey(sessionId, messageId));
if (!state) {
logger.debug(
`[ResponseStreamer] Complete skipped, no active stream state: session=${sessionId}, message=${messageId}`,
);
return notStreamed;
}

Expand All @@ -174,6 +180,9 @@ export class ResponseStreamer {
}

if (state.telegramMessageIds.length === 0) {
logger.debug(
`[ResponseStreamer] Complete returned not streamed: session=${sessionId}, message=${messageId}, reason=no_visible_partials`,
);
this.cancelState(state);
this.states.delete(state.key);
return notStreamed;
Expand Down Expand Up @@ -324,12 +333,15 @@ export class ResponseStreamer {
return state.telegramMessageIds.length > 0;
}

const targetSignatures = payload.parts.map((part) => createSignature(part, payload.format));
const targetSignatures = payload.parts.map((part) => createSignature(part));
const unchanged =
targetSignatures.length === state.lastSentSignatures.length &&
targetSignatures.every((signature, index) => signature === state.lastSentSignatures[index]);

if (unchanged) {
logger.debug(
`[ResponseStreamer] Skipped unchanged payload: session=${state.sessionId}, message=${state.messageId}, parts=${payload.parts.length}`,
);
return state.telegramMessageIds.length > 0;
}

Expand Down Expand Up @@ -407,7 +419,7 @@ export class ResponseStreamer {
targetSignatures: string[],
): Promise<void> {
for (let index = 0; index < payload.parts.length; index++) {
const text = payload.parts[index];
const part = payload.parts[index];
const nextSignature = targetSignatures[index];
const currentMessageId = state.telegramMessageIds[index];

Expand All @@ -416,14 +428,14 @@ export class ResponseStreamer {
continue;
}

await this.editText(currentMessageId, text, payload.format, payload.editOptions);
state.lastSentSignatures[index] = nextSignature;
const result = await this.editPart(currentMessageId, part, payload.editOptions);
state.lastSentSignatures[index] = result.deliveredSignature;
continue;
}

const messageId = await this.sendText(text, payload.format, payload.sendOptions);
state.telegramMessageIds[index] = messageId;
state.lastSentSignatures[index] = nextSignature;
const result = await this.sendPart(part, payload.sendOptions);
state.telegramMessageIds[index] = result.messageId;
state.lastSentSignatures[index] = result.deliveredSignature;
}

for (let index = state.telegramMessageIds.length - 1; index >= payload.parts.length; index--) {
Expand Down
Loading
Loading