Skip to content

Commit c6a3406

Browse files
authored
feat: Move the sendMessage with a MessageSendParams to the AbstractClient (#665)
and made it public. Thought it might not be widely used, it allows full control of the parameters sent by the request (including the tenant). This fixes #663 Signed-off-by: Jeff Mesnil <jmesnil@ibm.com>
1 parent 5a48d7b commit c6a3406

2 files changed

Lines changed: 59 additions & 39 deletions

File tree

client/base/src/main/java/io/a2a/client/AbstractClient.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.a2a.spec.ListTaskPushNotificationConfigResult;
1818
import io.a2a.spec.ListTasksParams;
1919
import io.a2a.spec.Message;
20+
import io.a2a.spec.MessageSendParams;
2021
import io.a2a.spec.PushNotificationConfig;
2122
import io.a2a.spec.Task;
2223
import io.a2a.spec.TaskIdParams;
@@ -160,6 +161,26 @@ public abstract void sendMessage(@NonNull Message request,
160161
@Nullable Map<String, Object> metadata,
161162
@Nullable ClientCallContext context) throws A2AClientException;
162163

164+
/**
165+
* Send a message to the remote agent. This method will automatically use
166+
* the streaming or non-streaming approach as determined by the server's
167+
* agent card and the client configuration. The specified client consumers
168+
* will be used to handle messages, tasks, and update events received
169+
* from the remote agent. The specified streaming error handler will be used
170+
* if an error occurs during streaming. The configured client push notification
171+
* configuration will get used for streaming.
172+
*
173+
* @param params the request parameters
174+
* @param consumers a list of consumers to pass responses from the remote agent to
175+
* @param streamingErrorHandler an error handler that should be used for the streaming case if an error occurs
176+
* @param context optional client call context for the request
177+
* @throws A2AClientException if sending the message fails for any reason
178+
*/
179+
public abstract void sendMessage(@NonNull MessageSendParams params,
180+
@NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers,
181+
@Nullable Consumer<Throwable> streamingErrorHandler,
182+
@Nullable ClientCallContext context) throws A2AClientException;
183+
163184
/**
164185
* Retrieve the current state and history of a specific task.
165186
*

client/base/src/main/java/io/a2a/client/Client.java

Lines changed: 38 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,13 @@ public void sendMessage(@NonNull Message request,
234234
@NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers,
235235
@Nullable Consumer<Throwable> streamingErrorHandler,
236236
@Nullable ClientCallContext context) throws A2AClientException {
237-
MessageSendParams messageSendParams = getMessageSendParams(request, clientConfig);
237+
MessageSendConfiguration messageSendConfiguration = createMessageSendConfiguration(clientConfig.getPushNotificationConfig());
238+
239+
MessageSendParams messageSendParams = MessageSendParams.builder()
240+
.message(request)
241+
.configuration(messageSendConfiguration)
242+
.metadata(clientConfig.getMetadata())
243+
.build();
238244
sendMessage(messageSendParams, consumers, streamingErrorHandler, context);
239245
}
240246

@@ -289,7 +295,6 @@ public void sendMessage(@NonNull Message request,
289295
@Nullable Map<String, Object> metadata,
290296
@Nullable ClientCallContext context) throws A2AClientException {
291297
MessageSendConfiguration messageSendConfiguration = createMessageSendConfiguration(pushNotificationConfiguration);
292-
293298
MessageSendParams messageSendParams = MessageSendParams.builder()
294299
.message(request)
295300
.configuration(messageSendConfiguration)
@@ -299,6 +304,37 @@ public void sendMessage(@NonNull Message request,
299304
sendMessage(messageSendParams, consumers, streamingErrorHandler, context);
300305
}
301306

307+
@Override
308+
public void sendMessage(@NonNull MessageSendParams messageSendParams,
309+
@NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers,
310+
@Nullable Consumer<Throwable> streamingErrorHandler,
311+
@Nullable ClientCallContext context) throws A2AClientException {
312+
if (! clientConfig.isStreaming() || ! agentCard.capabilities().streaming()) {
313+
EventKind eventKind = clientTransport.sendMessage(messageSendParams, context);
314+
ClientEvent clientEvent;
315+
if (eventKind instanceof Task task) {
316+
clientEvent = new TaskEvent(task);
317+
} else {
318+
// must be a message
319+
clientEvent = new MessageEvent((Message) eventKind);
320+
}
321+
consume(clientEvent, agentCard, consumers);
322+
} else {
323+
ClientTaskManager tracker = new ClientTaskManager();
324+
Consumer<Throwable> overriddenErrorHandler = getOverriddenErrorHandler(streamingErrorHandler);
325+
Consumer<StreamingEventKind> eventHandler = event -> {
326+
try {
327+
ClientEvent clientEvent = getClientEvent(event, tracker);
328+
consume(clientEvent, agentCard, consumers);
329+
} catch (A2AClientError e) {
330+
overriddenErrorHandler.accept(e);
331+
}
332+
};
333+
clientTransport.sendMessageStreaming(messageSendParams, eventHandler, overriddenErrorHandler, context);
334+
}
335+
}
336+
337+
302338
/**
303339
* Retrieve a specific task by ID.
304340
* <p>
@@ -666,33 +702,6 @@ private MessageSendConfiguration createMessageSendConfiguration(@Nullable PushNo
666702
.build();
667703
}
668704

669-
private void sendMessage(@NonNull MessageSendParams messageSendParams, @NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers,
670-
@Nullable Consumer<Throwable> errorHandler, @Nullable ClientCallContext context) throws A2AClientException {
671-
if (! clientConfig.isStreaming() || ! agentCard.capabilities().streaming()) {
672-
EventKind eventKind = clientTransport.sendMessage(messageSendParams, context);
673-
ClientEvent clientEvent;
674-
if (eventKind instanceof Task task) {
675-
clientEvent = new TaskEvent(task);
676-
} else {
677-
// must be a message
678-
clientEvent = new MessageEvent((Message) eventKind);
679-
}
680-
consume(clientEvent, agentCard, consumers);
681-
} else {
682-
ClientTaskManager tracker = new ClientTaskManager();
683-
Consumer<Throwable> overriddenErrorHandler = getOverriddenErrorHandler(errorHandler);
684-
Consumer<StreamingEventKind> eventHandler = event -> {
685-
try {
686-
ClientEvent clientEvent = getClientEvent(event, tracker);
687-
consume(clientEvent, agentCard, consumers);
688-
} catch (A2AClientError e) {
689-
overriddenErrorHandler.accept(e);
690-
}
691-
};
692-
clientTransport.sendMessageStreaming(messageSendParams, eventHandler, overriddenErrorHandler, context);
693-
}
694-
}
695-
696705
private @NonNull Consumer<Throwable> getOverriddenErrorHandler(@Nullable Consumer<Throwable> errorHandler) {
697706
return e -> {
698707
if (errorHandler != null) {
@@ -710,14 +719,4 @@ private void consume(ClientEvent clientEvent, AgentCard agentCard, @NonNull List
710719
consumer.accept(clientEvent, agentCard);
711720
}
712721
}
713-
714-
private MessageSendParams getMessageSendParams(Message request, ClientConfig clientConfig) {
715-
MessageSendConfiguration messageSendConfiguration = createMessageSendConfiguration(clientConfig.getPushNotificationConfig());
716-
717-
return MessageSendParams.builder()
718-
.message(request)
719-
.configuration(messageSendConfiguration)
720-
.metadata(clientConfig.getMetadata())
721-
.build();
722-
}
723722
}

0 commit comments

Comments
 (0)