Skip to content

Commit aa2045f

Browse files
authored
ref: Use diagnostics channel for claude agent sdk wrapper (#1678)
Moves the claude agent sdk wrapper over to use diagnostic channels.
1 parent 238afd1 commit aa2045f

3 files changed

Lines changed: 39 additions & 903 deletions

File tree

js/src/instrumentation/core/stream-patcher.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,19 @@ export interface StreamPatchOptions<TChunk = unknown, TFinal = unknown> {
2828
* Called for each chunk as it's yielded.
2929
* Optional - if not provided, chunks are just collected.
3030
*/
31-
onChunk?: (chunk: TChunk) => void;
31+
onChunk?: (chunk: TChunk) => void | Promise<void>;
3232

3333
/**
3434
* Called when the stream completes successfully.
3535
* Receives all collected chunks.
3636
*/
37-
onComplete: (chunks: TChunk[]) => TFinal | void;
37+
onComplete: (chunks: TChunk[]) => TFinal | void | Promise<TFinal | void>;
3838

3939
/**
4040
* Called if the stream errors.
4141
* If not provided, errors are re-thrown after collection stops.
4242
*/
43-
onError?: (error: Error, chunks: TChunk[]) => void;
43+
onError?: (error: Error, chunks: TChunk[]) => void | Promise<void>;
4444

4545
/**
4646
* Filter to decide whether to collect a chunk.
@@ -135,7 +135,7 @@ export function patchStreamIfNeeded<TChunk = unknown, TFinal = unknown>(
135135
if (!completed) {
136136
completed = true;
137137
try {
138-
options.onComplete(chunks);
138+
await options.onComplete(chunks);
139139
} catch (error) {
140140
console.error("Error in stream onComplete handler:", error);
141141
}
@@ -155,7 +155,7 @@ export function patchStreamIfNeeded<TChunk = unknown, TFinal = unknown>(
155155
// Call onChunk handler if provided
156156
if (options.onChunk) {
157157
try {
158-
options.onChunk(chunk);
158+
await options.onChunk(chunk);
159159
} catch (error) {
160160
console.error("Error in stream onChunk handler:", error);
161161
}
@@ -170,7 +170,7 @@ export function patchStreamIfNeeded<TChunk = unknown, TFinal = unknown>(
170170
completed = true;
171171
if (options.onError) {
172172
try {
173-
options.onError(
173+
await options.onError(
174174
error instanceof Error ? error : new Error(String(error)),
175175
chunks,
176176
);
@@ -191,7 +191,7 @@ export function patchStreamIfNeeded<TChunk = unknown, TFinal = unknown>(
191191
completed = true;
192192
// Stream was cancelled/returned early
193193
try {
194-
options.onComplete(chunks);
194+
await options.onComplete(chunks);
195195
} catch (error) {
196196
console.error("Error in stream onComplete handler:", error);
197197
}
@@ -213,7 +213,7 @@ export function patchStreamIfNeeded<TChunk = unknown, TFinal = unknown>(
213213
: new Error(String(rawError));
214214
if (options.onError) {
215215
try {
216-
options.onError(error, chunks);
216+
await options.onError(error, chunks);
217217
} catch (handlerError) {
218218
console.error("Error in stream onError handler:", handlerError);
219219
}

js/src/instrumentation/plugins/claude-agent-sdk-plugin.ts

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -834,15 +834,14 @@ export class ClaudeAgentSDKPlugin extends BasePlugin {
834834
);
835835
});
836836
},
837-
onComplete: () => {
838-
void state.processing
837+
onComplete: () =>
838+
state.processing
839839
.then(() => finalizeQuerySpan(state))
840840
.finally(() => {
841841
spans.delete(event);
842-
});
843-
},
844-
onError: (error: Error) => {
845-
void state.processing
842+
}),
843+
onError: (error: Error) =>
844+
state.processing
846845
.then(() => {
847846
state.span.log({
848847
error: error.message,
@@ -851,8 +850,7 @@ export class ClaudeAgentSDKPlugin extends BasePlugin {
851850
.then(() => finalizeQuerySpan(state))
852851
.finally(() => {
853852
spans.delete(event);
854-
});
855-
},
853+
}),
856854
});
857855

858856
return;

0 commit comments

Comments
 (0)