Skip to content

Commit f3dcfb0

Browse files
committed
camel-telemetry: fix trace context propagation for messaging components
AbstractMessagingSpanDecorator used the default CamelHeadersSpanContextPropagationExtractor which only handles String-valued headers. Messaging transports like Kafka deliver headers as byte[], so trace context headers (e.g. traceparent) were silently dropped, breaking distributed trace propagation. Add CamelMessagingHeadersSpanContextPropagationExtractor that handles both String and byte[] headers, and override getExtractor() in AbstractMessagingSpanDecorator so all messaging components (Kafka, AMQP, SJMS, STOMP, Spring RabbitMQ, Azure Service Bus, etc.) benefit from the fix. The issue was discovered while upgrading the camel-spring-boot opentelemetry example from camel-opentelemetry to camel-opentelemetry2.
1 parent 3d7a109 commit f3dcfb0

2 files changed

Lines changed: 48 additions & 3 deletions

File tree

components/camel-telemetry/src/main/java/org/apache/camel/telemetry/propagation/CamelHeadersSpanContextPropagationExtractor.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.camel.telemetry.propagation;
1818

19+
import java.nio.charset.StandardCharsets;
1920
import java.util.Iterator;
2021
import java.util.Map;
2122
import java.util.Set;
@@ -27,9 +28,16 @@ public final class CamelHeadersSpanContextPropagationExtractor implements SpanCo
2728
private final Map<String, Object> map = new CaseInsensitiveMap();
2829

2930
public CamelHeadersSpanContextPropagationExtractor(final Map<String, Object> map) {
30-
// Extract string valued map entries
31-
map.entrySet().stream().filter(e -> e.getValue() instanceof String)
32-
.forEach(e -> this.map.put(e.getKey(), e.getValue()));
31+
// Extract string and byte[] valued map entries.
32+
// Messaging transports (Kafka, AMQP, etc.) may deliver headers as byte arrays,
33+
// so we convert them to String for the W3C propagator to extract trace context.
34+
map.entrySet().stream().filter(e -> e.getValue() instanceof String || e.getValue() instanceof byte[]).forEach(e -> {
35+
if (e.getValue() instanceof byte[] bytes) {
36+
this.map.put(e.getKey(), new String(bytes, StandardCharsets.UTF_8));
37+
} else {
38+
this.map.put(e.getKey(), e.getValue());
39+
}
40+
});
3341
}
3442

3543
@Override

components/camel-telemetry/src/test/java/org/apache/camel/telemetry/propagation/CamelHeadersExtractAdapterTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.camel.telemetry.propagation;
1818

19+
import java.nio.charset.StandardCharsets;
1920
import java.util.HashMap;
2021
import java.util.Iterator;
2122
import java.util.Map;
@@ -26,6 +27,8 @@
2627

2728
import static org.junit.jupiter.api.Assertions.assertEquals;
2829
import static org.junit.jupiter.api.Assertions.assertFalse;
30+
import static org.junit.jupiter.api.Assertions.assertNull;
31+
import static org.junit.jupiter.api.Assertions.assertTrue;
2932

3033
public class CamelHeadersExtractAdapterTest {
3134

@@ -69,4 +72,38 @@ public void keyWithDifferentCase() {
6972
SpanContextPropagationExtractor adapter = new CamelHeadersSpanContextPropagationExtractor(map);
7073
assertEquals("value", adapter.get("KeY"));
7174
}
75+
76+
@Test
77+
public void byteArrayProperty() {
78+
map.put("traceparent", "00-abc123-def456-01".getBytes(StandardCharsets.UTF_8));
79+
SpanContextPropagationExtractor adapter = new CamelHeadersSpanContextPropagationExtractor(map);
80+
assertEquals("00-abc123-def456-01", adapter.get("traceparent"));
81+
}
82+
83+
@Test
84+
public void mixedStringAndByteArrayProperties() {
85+
map.put("traceparent", "00-abc123-def456-01".getBytes(StandardCharsets.UTF_8));
86+
map.put("custom-header", "custom-value");
87+
SpanContextPropagationExtractor adapter = new CamelHeadersSpanContextPropagationExtractor(map);
88+
assertEquals("00-abc123-def456-01", adapter.get("traceparent"));
89+
assertEquals("custom-value", adapter.get("custom-header"));
90+
}
91+
92+
@Test
93+
public void nonStringNonByteArrayPropertyIsFiltered() {
94+
map.put("integer-header", 42);
95+
map.put("key", "value");
96+
SpanContextPropagationExtractor adapter = new CamelHeadersSpanContextPropagationExtractor(map);
97+
assertNull(adapter.get("integer-header"));
98+
assertEquals("value", adapter.get("key"));
99+
assertTrue(adapter.keys().contains("key"));
100+
assertFalse(adapter.keys().contains("integer-header"));
101+
}
102+
103+
@Test
104+
public void byteArrayKeyWithDifferentCase() {
105+
map.put("traceparent", "00-abc123-def456-01".getBytes(StandardCharsets.UTF_8));
106+
SpanContextPropagationExtractor adapter = new CamelHeadersSpanContextPropagationExtractor(map);
107+
assertEquals("00-abc123-def456-01", adapter.get("TraceParent"));
108+
}
72109
}

0 commit comments

Comments
 (0)