Skip to content

Commit 26ec196

Browse files
chore: Port PushNotification sender test from Python (#296)
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
1 parent 8dc6f2e commit 26ec196

2 files changed

Lines changed: 268 additions & 1 deletion

File tree

server-common/src/main/java/io/a2a/server/tasks/BasePushNotificationSender.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ private CompletableFuture<Boolean> dispatch(Task task, PushNotificationConfig pu
6969
private boolean dispatchNotification(Task task, PushNotificationConfig pushInfo) {
7070
String url = pushInfo.url();
7171

72-
// TODO auth
72+
// TODO: Implement authentication and token header support
73+
// The Python implementation adds X-A2A-Notification-Token header when pushInfo.token is present
74+
// See: https://github.com/a2aproject/a2a-python/blob/main/src/a2a/server/tasks/base_push_notification_sender.py#L55-57
7375

7476
String body;
7577
try {
Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
package io.a2a.server.tasks;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertTrue;
5+
6+
import java.io.IOException;
7+
import java.util.ArrayList;
8+
import java.util.Collections;
9+
import java.util.List;
10+
import java.util.Map;
11+
import java.util.concurrent.CompletableFuture;
12+
import java.util.concurrent.CountDownLatch;
13+
import java.util.concurrent.TimeUnit;
14+
import java.util.function.Consumer;
15+
16+
import org.junit.jupiter.api.BeforeEach;
17+
import org.junit.jupiter.api.Test;
18+
19+
import io.a2a.client.http.A2AHttpClient;
20+
import io.a2a.client.http.A2AHttpResponse;
21+
import io.a2a.util.Utils;
22+
import io.a2a.spec.PushNotificationConfig;
23+
import io.a2a.spec.Task;
24+
import io.a2a.spec.TaskState;
25+
import io.a2a.spec.TaskStatus;
26+
27+
public class PushNotificationSenderTest {
28+
29+
private TestHttpClient testHttpClient;
30+
private InMemoryPushNotificationConfigStore configStore;
31+
private BasePushNotificationSender sender;
32+
33+
/**
34+
* Simple test implementation of A2AHttpClient that captures HTTP calls for verification
35+
*/
36+
private static class TestHttpClient implements A2AHttpClient {
37+
final List<Task> tasks = Collections.synchronizedList(new ArrayList<>());
38+
final List<String> urls = Collections.synchronizedList(new ArrayList<>());
39+
final List<Map<String, String>> headers = Collections.synchronizedList(new ArrayList<>());
40+
volatile CountDownLatch latch;
41+
volatile boolean shouldThrowException = false;
42+
43+
@Override
44+
public GetBuilder createGet() {
45+
return null;
46+
}
47+
48+
@Override
49+
public PostBuilder createPost() {
50+
return new TestPostBuilder();
51+
}
52+
53+
@Override
54+
public DeleteBuilder createDelete() {
55+
return null;
56+
}
57+
58+
class TestPostBuilder implements A2AHttpClient.PostBuilder {
59+
private volatile String body;
60+
private volatile String url;
61+
private final Map<String, String> requestHeaders = new java.util.HashMap<>();
62+
63+
@Override
64+
public PostBuilder body(String body) {
65+
this.body = body;
66+
return this;
67+
}
68+
69+
@Override
70+
public A2AHttpResponse post() throws IOException, InterruptedException {
71+
if (shouldThrowException) {
72+
throw new IOException("Simulated network error");
73+
}
74+
75+
try {
76+
Task task = Utils.OBJECT_MAPPER.readValue(body, Task.TYPE_REFERENCE);
77+
tasks.add(task);
78+
urls.add(url);
79+
headers.add(new java.util.HashMap<>(requestHeaders));
80+
81+
return new A2AHttpResponse() {
82+
@Override
83+
public int status() {
84+
return 200;
85+
}
86+
87+
@Override
88+
public boolean success() {
89+
return true;
90+
}
91+
92+
@Override
93+
public String body() {
94+
return "";
95+
}
96+
};
97+
} finally {
98+
if (latch != null) {
99+
latch.countDown();
100+
}
101+
}
102+
}
103+
104+
@Override
105+
public CompletableFuture<Void> postAsyncSSE(Consumer<String> messageConsumer, Consumer<Throwable> errorConsumer, Runnable completeRunnable) throws IOException, InterruptedException {
106+
return null;
107+
}
108+
109+
@Override
110+
public PostBuilder url(String url) {
111+
this.url = url;
112+
return this;
113+
}
114+
115+
@Override
116+
public PostBuilder addHeader(String name, String value) {
117+
requestHeaders.put(name, value);
118+
return this;
119+
}
120+
121+
@Override
122+
public PostBuilder addHeaders(Map<String, String> headers) {
123+
requestHeaders.putAll(headers);
124+
return this;
125+
}
126+
}
127+
}
128+
129+
@BeforeEach
130+
public void setUp() {
131+
testHttpClient = new TestHttpClient();
132+
configStore = new InMemoryPushNotificationConfigStore();
133+
sender = new BasePushNotificationSender(configStore, testHttpClient);
134+
}
135+
136+
private Task createSampleTask(String taskId, TaskState state) {
137+
return new Task.Builder()
138+
.id(taskId)
139+
.contextId("ctx456")
140+
.status(new TaskStatus(state))
141+
.build();
142+
}
143+
144+
private PushNotificationConfig createSamplePushConfig(String url, String configId, String token) {
145+
return new PushNotificationConfig.Builder()
146+
.url(url)
147+
.id(configId)
148+
.token(token)
149+
.build();
150+
}
151+
152+
@Test
153+
public void testSendNotificationSuccess() throws InterruptedException {
154+
String taskId = "task_send_success";
155+
Task taskData = createSampleTask(taskId, TaskState.COMPLETED);
156+
PushNotificationConfig config = createSamplePushConfig("http://notify.me/here", "cfg1", null);
157+
158+
// Set up the configuration in the store
159+
configStore.setInfo(taskId, config);
160+
161+
// Set up latch to wait for async completion
162+
testHttpClient.latch = new CountDownLatch(1);
163+
164+
sender.sendNotification(taskData);
165+
166+
// Wait for the async operation to complete
167+
assertTrue(testHttpClient.latch.await(5, TimeUnit.SECONDS), "HTTP call should complete within 5 seconds");
168+
169+
// Verify the task was sent via HTTP
170+
assertEquals(1, testHttpClient.tasks.size());
171+
Task sentTask = testHttpClient.tasks.get(0);
172+
assertEquals(taskData.getId(), sentTask.getId());
173+
assertEquals(taskData.getContextId(), sentTask.getContextId());
174+
assertEquals(taskData.getStatus().state(), sentTask.getStatus().state());
175+
}
176+
177+
@Test
178+
public void testSendNotificationWithTokenSuccess() throws InterruptedException {
179+
String taskId = "task_send_with_token";
180+
Task taskData = createSampleTask(taskId, TaskState.COMPLETED);
181+
PushNotificationConfig config = createSamplePushConfig("http://notify.me/here", "cfg1", "unique_token");
182+
183+
// Set up the configuration in the store
184+
configStore.setInfo(taskId, config);
185+
186+
// Set up latch to wait for async completion
187+
testHttpClient.latch = new CountDownLatch(1);
188+
189+
sender.sendNotification(taskData);
190+
191+
// Wait for the async operation to complete
192+
assertTrue(testHttpClient.latch.await(5, TimeUnit.SECONDS), "HTTP call should complete within 5 seconds");
193+
194+
// Verify the task was sent via HTTP
195+
assertEquals(1, testHttpClient.tasks.size());
196+
Task sentTask = testHttpClient.tasks.get(0);
197+
assertEquals(taskData.getId(), sentTask.getId());
198+
199+
// TODO: When authentication is implemented in BasePushNotificationSender, verify that the
200+
// X-A2A-Notification-Token header is sent.
201+
}
202+
203+
@Test
204+
public void testSendNotificationNoConfig() {
205+
String taskId = "task_send_no_config";
206+
Task taskData = createSampleTask(taskId, TaskState.COMPLETED);
207+
208+
// Don't set any configuration in the store
209+
sender.sendNotification(taskData);
210+
211+
// Verify no HTTP calls were made
212+
assertEquals(0, testHttpClient.tasks.size());
213+
}
214+
215+
@Test
216+
public void testSendNotificationMultipleConfigs() throws InterruptedException {
217+
String taskId = "task_multiple_configs";
218+
Task taskData = createSampleTask(taskId, TaskState.COMPLETED);
219+
PushNotificationConfig config1 = createSamplePushConfig("http://notify.me/cfg1", "cfg1", null);
220+
PushNotificationConfig config2 = createSamplePushConfig("http://notify.me/cfg2", "cfg2", null);
221+
222+
// Set up multiple configurations in the store
223+
configStore.setInfo(taskId, config1);
224+
configStore.setInfo(taskId, config2);
225+
226+
// Set up latch to wait for async completion (2 calls expected)
227+
testHttpClient.latch = new CountDownLatch(2);
228+
229+
sender.sendNotification(taskData);
230+
231+
// Wait for the async operations to complete
232+
assertTrue(testHttpClient.latch.await(5, TimeUnit.SECONDS), "HTTP calls should complete within 5 seconds");
233+
234+
// Verify both tasks were sent via HTTP
235+
assertEquals(2, testHttpClient.tasks.size());
236+
assertEquals(2, testHttpClient.urls.size());
237+
assertTrue(testHttpClient.urls.containsAll(java.util.List.of("http://notify.me/cfg1", "http://notify.me/cfg2")));
238+
239+
// Both tasks should be identical (same task sent to different endpoints)
240+
for (Task sentTask : testHttpClient.tasks) {
241+
assertEquals(taskData.getId(), sentTask.getId());
242+
assertEquals(taskData.getContextId(), sentTask.getContextId());
243+
assertEquals(taskData.getStatus().state(), sentTask.getStatus().state());
244+
}
245+
}
246+
247+
@Test
248+
public void testSendNotificationHttpError() {
249+
String taskId = "task_send_http_err";
250+
Task taskData = createSampleTask(taskId, TaskState.COMPLETED);
251+
PushNotificationConfig config = createSamplePushConfig("http://notify.me/http_error", "cfg1", null);
252+
253+
// Set up the configuration in the store
254+
configStore.setInfo(taskId, config);
255+
256+
// Configure the test client to throw an exception
257+
testHttpClient.shouldThrowException = true;
258+
259+
// This should not throw an exception - errors should be handled gracefully
260+
sender.sendNotification(taskData);
261+
262+
// Verify no tasks were successfully processed due to the error
263+
assertEquals(0, testHttpClient.tasks.size());
264+
}
265+
}

0 commit comments

Comments
 (0)