feat(sdk): add TypeScript streaming client for DeerFlow API#2102
feat(sdk): add TypeScript streaming client for DeerFlow API#2102mvanhorn wants to merge 2 commits intobytedance:mainfrom
Conversation
Adds @deerflow/client, a TypeScript SDK that mirrors the Python DeerFlowClient API with full SSE streaming support. Connects to the Gateway HTTP path documented in STREAMING.md. Methods: createThread, getThreadState, stream (AsyncGenerator), and chat (convenience). Zero external runtime dependencies, uses native fetch (Node 18+). This contribution was developed with AI assistance (Codex).
There was a problem hiding this comment.
Pull request overview
Adds a new sdk/typescript/ package (@deerflow/client) intended to provide a lightweight TypeScript client with SSE streaming support for DeerFlow’s HTTP APIs.
Changes:
- Introduces a
DeerFlowClientwith thread creation, state fetch, streaming, andchat()convenience APIs. - Adds a native
ReadableStream-based SSE parser and basic unit tests. - Defines TypeScript types and package build/test configuration for publishing.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| sdk/typescript/package.json | Defines @deerflow/client package metadata and build/test scripts. |
| sdk/typescript/tsconfig.json | TypeScript build configuration (NodeNext, declarations, strict). |
| sdk/typescript/src/index.ts | Public exports for the SDK. |
| sdk/typescript/src/types.ts | Streaming protocol and core SDK type definitions. |
| sdk/typescript/src/sse.ts | SSE stream parser implementation. |
| sdk/typescript/src/client.ts | HTTP client implementation (threads, streaming, chat aggregation). |
| sdk/typescript/tests/sse.test.ts | Unit tests for the SSE parser. |
| sdk/typescript/tests/types.test.ts | Type-shape smoke test for exported types and client. |
| sdk/typescript/README.md | Usage documentation and examples. |
| sdk/typescript/.gitignore | Ignores build output and dependencies. |
| for await (const rawEvent of parseSSEStream(response.body)) { | ||
| if (rawEvent.event !== "values" && rawEvent.event !== "messages-tuple" && rawEvent.event !== "custom" && rawEvent.event !== "end") { | ||
| continue; | ||
| } | ||
|
|
There was a problem hiding this comment.
The /api/langgraph SSE stream emits event: messages (see backend/docs/API.md), but this client only accepts messages-tuple and will silently drop messages events, breaking token-level streaming. Update the accepted event names (and/or map messages → messages-tuple) so message delta events are actually yielded/processed.
| const payload: RunStreamRequest = { | ||
| assistant_id: "lead_agent", | ||
| input: { | ||
| messages: [ | ||
| { | ||
| type: "human", | ||
| content: [{ type: "text", text: message }], | ||
| }, | ||
| ], | ||
| }, | ||
| stream_mode: ["values", "messages-tuple", "custom"], | ||
| stream_subgraphs: true, | ||
| context: { | ||
| thread_id: resolvedThreadId, | ||
| }, | ||
| }; |
There was a problem hiding this comment.
The request payload shape here doesn’t match the LangGraph API examples in backend/docs/API.md (which use input.messages[{role, content}] and do not include context.thread_id / stream_subgraphs). If this SDK targets /api/langgraph, align the request body with the documented schema (or make the schema configurable), otherwise the server may reject the request during validation.
| for await (const event of this.stream(message, threadId)) { | ||
| if (event.event === "values" && isValuesEvent(event.data)) { | ||
| lastValues = event.data; | ||
| continue; | ||
| } | ||
|
|
||
| if (event.event !== "messages-tuple" || !Array.isArray(event.data)) { | ||
| continue; | ||
| } | ||
|
|
||
| const tuple = event.data as MessagesTupleEvent; | ||
| const chunk = tuple[0]; | ||
| if (chunk?.type !== "ai" || typeof chunk.content !== "string") { | ||
| continue; | ||
| } | ||
|
|
||
| const id = typeof chunk.id === "string" ? chunk.id : "_final"; | ||
| textByMessageId.set(id, (textByMessageId.get(id) ?? "") + chunk.content); | ||
| } |
There was a problem hiding this comment.
chat() only accumulates text from messages-tuple events with tuple-shaped data. If the stream uses event: messages with object-shaped deltas (as shown in backend/docs/API.md), this path will never run and chat() will fall back to a less-streaming-friendly values snapshot. Handle the actual messages event format (and/or support both tuple and object forms) so chat() can reliably accumulate deltas.
| export type StreamEvent = { | ||
| event: "values" | "messages-tuple" | "custom" | "end"; | ||
| data: unknown; | ||
| }; | ||
|
|
There was a problem hiding this comment.
StreamEvent currently omits event names that the DeerFlow/LangGraph streaming endpoints can emit (e.g. messages, and gateway-compatible streams can also emit metadata and error). This makes correct handling awkward and encourages dropping events. Consider defining StreamEvent as a discriminated union that includes messages (and any other emitted event types you want to support), with data typed per event.
| export type StreamEvent = { | |
| event: "values" | "messages-tuple" | "custom" | "end"; | |
| data: unknown; | |
| }; | |
| export type MetadataEvent = Record<string, unknown>; | |
| export type ErrorEvent = { | |
| message?: string; | |
| [key: string]: unknown; | |
| }; | |
| export type StreamEvent = | |
| | { | |
| event: "values"; | |
| data: ValuesEvent; | |
| } | |
| | { | |
| event: "messages-tuple"; | |
| data: MessagesTupleEvent; | |
| } | |
| | { | |
| event: "messages"; | |
| data: MessagesTupleEvent; | |
| } | |
| | { | |
| event: "metadata"; | |
| data: MetadataEvent; | |
| } | |
| | { | |
| event: "error"; | |
| data: ErrorEvent; | |
| } | |
| | { | |
| event: "custom"; | |
| data: unknown; | |
| } | |
| | { | |
| event: "end"; | |
| data: unknown; | |
| }; |
| new TransformStream<Uint8Array, string>({ | ||
| transform(chunk, controller) { | ||
| controller.enqueue(utf8Decoder.decode(chunk, { stream: true })); | ||
| }, |
There was a problem hiding this comment.
The fallback decoder path (TransformStream + TextDecoder.decode(..., {stream:true})) never flushes the decoder at stream end, which can truncate the final code point when a multibyte UTF-8 sequence is split across chunks. Add a flush() handler to emit utf8Decoder.decode() at end (or avoid the custom transform when TextDecoderStream is unavailable).
| }, | |
| }, | |
| flush(controller) { | |
| const finalChunk = utf8Decoder.decode(); | |
| if (finalChunk.length > 0) { | |
| controller.enqueue(finalChunk); | |
| } | |
| }, |
| const reader = decodedStream.getReader(); | ||
| let pending = ""; | ||
| let currentEvent = "message"; | ||
| let dataLines: string[] = []; | ||
|
|
There was a problem hiding this comment.
parseSSEStream doesn’t cancel/release the underlying reader when the consumer stops iterating early (e.g. user breaks out of for await). That can keep the HTTP connection open longer than needed. Wrap the read loop in try/finally and reader.cancel()/reader.releaseLock() in the finally block.
| const streamEvent: StreamEvent = { | ||
| event: "messages-tuple", | ||
| data: tupleEvent, | ||
| }; |
There was a problem hiding this comment.
This test asserts a StreamEvent with event: "messages-tuple", but the documented /api/langgraph SSE stream uses event: messages (backend/docs/API.md). Once the client/types are corrected to handle the real event name(s), update this test to match the supported event contract.
| for await (const event of client.stream("Summarize the latest roadmap updates")) { | ||
| if (event.event === "messages-tuple") { | ||
| const [message] = event.data as [{ type?: string; content?: string }, Record<string, unknown>]; | ||
| if (message.type === "ai" && message.content) { | ||
| process.stdout.write(message.content); | ||
| } |
There was a problem hiding this comment.
The README’s streaming example checks for event.event === "messages-tuple", but DeerFlow’s documented /api/langgraph streaming response uses event: messages (backend/docs/API.md). Update the example (or note the compatibility mapping) so users can copy/paste a working snippet.
|
@mvanhorn thanks for your contribution, please take a look at the review comment of Copilot. |
- Expand StreamEvent to a discriminated union covering every event the gateway can emit (values, messages, messages-tuple, custom, metadata, error, end). Handlers can now branch on event.event without casts. - Fix TextDecoder flush on stream end in the fallback decode path. The TransformStream now implements flush() so a multibyte UTF-8 sequence split across the final two chunks is no longer truncated. - Release and cancel the SSE reader in a try/finally so early break (e.g. consumer leaving the for-await loop) releases the HTTP connection instead of keeping it open until GC. - Teach chat() to pull the message chunk from both "messages-tuple" (the useStream default) and plain "messages" mode so the convenience path works regardless of stream_mode. - Cover every branch of the StreamEvent union with a test. - Update README streaming example to match the new type narrowing and document the stream_mode / event name contract.
|
@WillemJiang thanks for the review. Addressed Copilot's feedback in fc6eb7d:
4 & 5. |
|
Thanks @WillemJiang! I went through Copilot's review - most of the findings are already addressed in the current code:
The one potentially valid note is about the request payload shape (client.ts:87) - the |
Summary
Adds
@deerflow/client, a TypeScript SDK that connects to DeerFlow's Gateway HTTP APIs with full SSE streaming support. Mirrors the PythonDeerFlowClientAPI shape documented inSTREAMING.md.Why this matters
DeerFlow's streaming protocol is well-documented (
backend/docs/STREAMING.md, 351 lines) and the PythonDeerFlowClientgot a major expansion recently (client.py, 1195 lines). But third-party developers building integrations or custom frontends in TypeScript have no official client library. OpenClaw ships a TypeScript SDK; DeerFlow should too.STREAMING.md(351 lines) documents the full protocolclient.pyexpanded to 1195 lines with streamingdeer-flow.code-workspaceChanges
New directory:
sdk/typescript/(not touching any existing code)src/client.ts-DeerFlowClientclass withcreateThread,getThreadState,stream(AsyncGenerator), andchatmethodssrc/types.ts- TypeScript types for the streaming protocol (StreamEvent, ValuesEvent, ThreadInfo, etc.)src/sse.ts- SSE parser using nativeReadableStream+TextDecoderStreamtests/sse.test.ts- SSE parser unit teststests/types.test.ts- Type validation testsREADME.md- Usage docs with examplesZero external runtime dependencies. Uses native
fetch(Node 18+).Testing
Build + 4 tests pass with strict TypeScript compilation.
Demo
This contribution was developed with AI assistance (Codex).