Skip to content

Commit 5a48d7b

Browse files
authored
fix: flaky Agent to Agent tests (#672)
1 parent e8fd7f2 commit 5a48d7b

2 files changed

Lines changed: 32 additions & 31 deletions

File tree

tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2491,7 +2491,7 @@ public void testAgentToAgentDelegation() throws Exception {
24912491
BiConsumer<ClientEvent, AgentCard> delegationConsumer =
24922492
AgentToAgentClientFactory.createTaskCaptureConsumer(delegationResultRef, delegationLatch);
24932493

2494-
getClient().sendMessage(delegationMessage, List.of(delegationConsumer), error -> {
2494+
getNonStreamingClient().sendMessage(delegationMessage, List.of(delegationConsumer), error -> {
24952495
delegationErrorRef.set(error);
24962496
delegationLatch.countDown();
24972497
});

tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java

Lines changed: 31 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,7 @@
33
import static io.a2a.server.ServerCallContext.TRANSPORT_KEY;
44

55
import java.util.List;
6-
import java.util.concurrent.CountDownLatch;
7-
import java.util.concurrent.TimeUnit;
86
import java.util.concurrent.atomic.AtomicReference;
9-
import java.util.function.BiConsumer;
107

118
import jakarta.enterprise.context.ApplicationScoped;
129
import jakarta.enterprise.inject.Produces;
@@ -176,49 +173,56 @@ private void handleAgentToAgentTest(RequestContext context, AgentEmitter agentEm
176173

177174
/**
178175
* Handles delegation by forwarding to another agent via client.
176+
* <p>
177+
* Uses blocking client call (streaming=false) which should return the final task state
178+
* synchronously without requiring async callbacks and latches. This simplified approach
179+
* avoids race conditions between event consumption and callback invocation.
179180
*/
180181
private void handleDelegation(String userInput, TransportProtocol transportProtocol,
181182
AgentEmitter agentEmitter) {
182183
// Strip "delegate:" prefix
183184
String delegatedContent = userInput.substring("delegate:".length()).trim();
184185

185-
// Create client for same transport
186+
// Create client for same transport (streaming=false for blocking behavior)
186187
try (Client client = AgentToAgentClientFactory.createClient(agentCard, transportProtocol)) {
187188
agentEmitter.startWork();
188189

189-
// Set up consumer to capture task result
190-
CountDownLatch latch = new CountDownLatch(1);
191-
AtomicReference<Task> resultRef = new AtomicReference<>();
192-
AtomicReference<Throwable> errorRef = new AtomicReference<>();
193-
194-
BiConsumer<ClientEvent, AgentCard> consumer =
195-
AgentToAgentClientFactory.createTaskCaptureConsumer(resultRef, latch);
190+
// Store the result task from blocking call
191+
AtomicReference<Task> taskRef = new AtomicReference<>();
196192

197193
// Delegate to another agent (new task on same server)
198194
// Add a marker so the receiving agent knows to complete the task
199195
Message delegatedMessage = A2A.toUserMessage("#a2a-delegated#" + delegatedContent);
200-
client.sendMessage(delegatedMessage, List.of(consumer), error -> {
201-
errorRef.set(error);
202-
latch.countDown();
203-
});
204196

205-
// Wait for response
206-
if (!latch.await(30, TimeUnit.SECONDS)) {
207-
agentEmitter.fail(new InternalError("Timeout waiting for delegated response"));
208-
return;
209-
}
197+
// Blocking call should return final task synchronously
198+
client.sendMessage(delegatedMessage, List.of((event, card) -> {
199+
if (event instanceof TaskEvent te) {
200+
taskRef.set(te.getTask());
201+
} else if (event instanceof TaskUpdateEvent tue) {
202+
taskRef.set(tue.getTask());
203+
}
204+
}), null);
210205

211-
Task delegatedResult = resultRef.get();
206+
// Blocking call should have completed before returning
207+
Task delegatedResult = taskRef.get();
212208

213-
// Check for error only if we didn't get a successful result
214-
// (errors can occur after completion due to stream cleanup)
215-
if (delegatedResult == null && errorRef.get() != null) {
216-
agentEmitter.fail(new InternalError("Delegation failed: " + errorRef.get().getMessage()));
209+
if (delegatedResult == null) {
210+
agentEmitter.fail(new InternalError("No result received from blocking delegation call"));
217211
return;
218212
}
219213

220-
if (delegatedResult == null) {
221-
agentEmitter.fail(new InternalError("No result received from delegation"));
214+
// DIAGNOSTIC: Check if task is actually final
215+
// If blocking call returns non-final task, it indicates a server-side race condition
216+
if (!delegatedResult.status().state().isFinal()) {
217+
String diagnostic = String.format(
218+
"RACE CONDITION DETECTED: Blocking call returned non-final task! " +
219+
"State: %s, TaskId: %s, Artifacts: %d. " +
220+
"This indicates DefaultRequestHandler wait logic failed to synchronize with MainEventBusProcessor.",
221+
delegatedResult.status().state(),
222+
delegatedResult.id(),
223+
delegatedResult.artifacts() != null ? delegatedResult.artifacts().size() : 0);
224+
System.err.println(diagnostic); // Also print to stderr for CI visibility
225+
agentEmitter.fail(new InternalError(diagnostic));
222226
return;
223227
}
224228

@@ -234,9 +238,6 @@ private void handleDelegation(String userInput, TransportProtocol transportProto
234238
agentEmitter.complete();
235239
} catch (A2AClientException e) {
236240
agentEmitter.fail(new InternalError("Failed to create client: " + e.getMessage()));
237-
} catch (InterruptedException e) {
238-
Thread.currentThread().interrupt();
239-
agentEmitter.fail(new InternalError("Interrupted while waiting for response"));
240241
}
241242
}
242243

0 commit comments

Comments
 (0)