11import type { Api , RawApi } from "grammy" ;
22import { logger } from "../../utils/logger.js" ;
3+ import type { TelegramRenderedPart } from "../../telegram/render/types.js" ;
34
45type SendMessageApi = Pick < Api < RawApi > , "sendMessage" > ;
56type EditMessageApi = Pick < Api < RawApi > , "editMessageText" > ;
67
78type TelegramSendMessageOptions = Parameters < SendMessageApi [ "sendMessage" ] > [ 2 ] ;
89type TelegramEditMessageOptions = Parameters < EditMessageApi [ "editMessageText" ] > [ 3 ] ;
910
10- export type StreamingMessageFormat = "raw" | "markdown_v2" ;
11-
1211export interface StreamingMessagePayload {
13- parts : string [ ] ;
14- format : StreamingMessageFormat ;
12+ parts : TelegramRenderedPart [ ] ;
1513 sendOptions ?: TelegramSendMessageOptions ;
1614 editOptions ?: TelegramEditMessageOptions ;
1715}
@@ -27,17 +25,15 @@ interface ResponseStreamerCompleteOptions {
2725
2826interface ResponseStreamerOptions {
2927 throttleMs : number ;
30- sendText : (
31- text : string ,
32- format : StreamingMessageFormat ,
28+ sendPart : (
29+ part : TelegramRenderedPart ,
3330 options ?: TelegramSendMessageOptions ,
34- ) => Promise < number > ;
35- editText : (
31+ ) => Promise < { messageId : number ; deliveredSignature : string } > ;
32+ editPart : (
3633 messageId : number ,
37- text : string ,
38- format : StreamingMessageFormat ,
34+ part : TelegramRenderedPart ,
3935 options ?: TelegramEditMessageOptions ,
40- ) => Promise < void > ;
36+ ) => Promise < { deliveredSignature : string } > ;
4137 deleteText : ( messageId : number ) => Promise < void > ;
4238}
4339
@@ -60,17 +56,24 @@ function buildStateKey(sessionId: string, messageId: string): string {
6056 return `${ sessionId } :${ messageId } ` ;
6157}
6258
59+ function clonePart ( part : TelegramRenderedPart ) : TelegramRenderedPart {
60+ return {
61+ text : part . text ,
62+ entities : part . entities ? [ ...part . entities ] : undefined ,
63+ fallbackText : part . fallbackText ,
64+ source : part . source ,
65+ } ;
66+ }
67+
6368function normalizePayload ( payload : StreamingMessagePayload ) : StreamingMessagePayload | null {
64- const normalizedParts = payload . parts
65- . map ( ( part ) => part . trim ( ) )
66- . filter ( ( part ) => part . length > 0 ) ;
69+ const normalizedParts = payload . parts . map ( clonePart ) . filter ( ( part ) => part . text . length > 0 ) ;
6770 if ( normalizedParts . length === 0 ) {
71+ logger . debug ( "[ResponseStreamer] Dropped empty streaming payload after normalization" ) ;
6872 return null ;
6973 }
7074
7175 return {
7276 parts : normalizedParts ,
73- format : payload . format ,
7477 sendOptions : payload . sendOptions ,
7578 editOptions : payload . editOptions ,
7679 } ;
@@ -103,8 +106,8 @@ function getRetryAfterMs(error: unknown): number | null {
103106 return seconds * 1000 ;
104107}
105108
106- function createSignature ( text : string , format : StreamingMessageFormat ) : string {
107- return `${ format } \n${ text } ` ;
109+ function createSignature ( part : Pick < TelegramRenderedPart , "text" | "entities" > ) : string {
110+ return `${ part . text } \n${ JSON . stringify ( part . entities ?? null ) } ` ;
108111}
109112
110113function delay ( ms : number ) : Promise < void > {
@@ -115,15 +118,15 @@ function delay(ms: number): Promise<void> {
115118
116119export class ResponseStreamer {
117120 private readonly throttleMs : number ;
118- private readonly sendText : ResponseStreamerOptions [ "sendText " ] ;
119- private readonly editText : ResponseStreamerOptions [ "editText " ] ;
121+ private readonly sendPart : ResponseStreamerOptions [ "sendPart " ] ;
122+ private readonly editPart : ResponseStreamerOptions [ "editPart " ] ;
120123 private readonly deleteText : ResponseStreamerOptions [ "deleteText" ] ;
121124 private readonly states : Map < string , StreamState > = new Map ( ) ;
122125
123126 constructor ( options : ResponseStreamerOptions ) {
124127 this . throttleMs = Math . max ( 0 , Math . floor ( options . throttleMs ) ) ;
125- this . sendText = options . sendText ;
126- this . editText = options . editText ;
128+ this . sendPart = options . sendPart ;
129+ this . editPart = options . editPart ;
127130 this . deleteText = options . deleteText ;
128131 }
129132
@@ -152,6 +155,9 @@ export class ResponseStreamer {
152155
153156 const state = this . states . get ( buildStateKey ( sessionId , messageId ) ) ;
154157 if ( ! state ) {
158+ logger . debug (
159+ `[ResponseStreamer] Complete skipped, no active stream state: session=${ sessionId } , message=${ messageId } ` ,
160+ ) ;
155161 return notStreamed ;
156162 }
157163
@@ -174,6 +180,9 @@ export class ResponseStreamer {
174180 }
175181
176182 if ( state . telegramMessageIds . length === 0 ) {
183+ logger . debug (
184+ `[ResponseStreamer] Complete returned not streamed: session=${ sessionId } , message=${ messageId } , reason=no_visible_partials` ,
185+ ) ;
177186 this . cancelState ( state ) ;
178187 this . states . delete ( state . key ) ;
179188 return notStreamed ;
@@ -324,12 +333,15 @@ export class ResponseStreamer {
324333 return state . telegramMessageIds . length > 0 ;
325334 }
326335
327- const targetSignatures = payload . parts . map ( ( part ) => createSignature ( part , payload . format ) ) ;
336+ const targetSignatures = payload . parts . map ( ( part ) => createSignature ( part ) ) ;
328337 const unchanged =
329338 targetSignatures . length === state . lastSentSignatures . length &&
330339 targetSignatures . every ( ( signature , index ) => signature === state . lastSentSignatures [ index ] ) ;
331340
332341 if ( unchanged ) {
342+ logger . debug (
343+ `[ResponseStreamer] Skipped unchanged payload: session=${ state . sessionId } , message=${ state . messageId } , parts=${ payload . parts . length } ` ,
344+ ) ;
333345 return state . telegramMessageIds . length > 0 ;
334346 }
335347
@@ -407,7 +419,7 @@ export class ResponseStreamer {
407419 targetSignatures : string [ ] ,
408420 ) : Promise < void > {
409421 for ( let index = 0 ; index < payload . parts . length ; index ++ ) {
410- const text = payload . parts [ index ] ;
422+ const part = payload . parts [ index ] ;
411423 const nextSignature = targetSignatures [ index ] ;
412424 const currentMessageId = state . telegramMessageIds [ index ] ;
413425
@@ -416,14 +428,14 @@ export class ResponseStreamer {
416428 continue ;
417429 }
418430
419- await this . editText ( currentMessageId , text , payload . format , payload . editOptions ) ;
420- state . lastSentSignatures [ index ] = nextSignature ;
431+ const result = await this . editPart ( currentMessageId , part , payload . editOptions ) ;
432+ state . lastSentSignatures [ index ] = result . deliveredSignature ;
421433 continue ;
422434 }
423435
424- const messageId = await this . sendText ( text , payload . format , payload . sendOptions ) ;
425- state . telegramMessageIds [ index ] = messageId ;
426- state . lastSentSignatures [ index ] = nextSignature ;
436+ const result = await this . sendPart ( part , payload . sendOptions ) ;
437+ state . telegramMessageIds [ index ] = result . messageId ;
438+ state . lastSentSignatures [ index ] = result . deliveredSignature ;
427439 }
428440
429441 for ( let index = state . telegramMessageIds . length - 1 ; index >= payload . parts . length ; index -- ) {
0 commit comments