Skip to content

Commit a0db01b

Browse files
committed
camel-telemetry: fix trace context propagation for messaging components
AbstractMessagingSpanDecorator used the default CamelHeadersSpanContextPropagationExtractor which only handles String-valued headers. Messaging transports like Kafka deliver headers as byte[], so trace context headers (e.g. traceparent) were silently dropped, breaking distributed trace propagation. Add CamelMessagingHeadersSpanContextPropagationExtractor that handles both String and byte[] headers, and override getExtractor() in AbstractMessagingSpanDecorator so all messaging components (Kafka, AMQP, SJMS, STOMP, Spring RabbitMQ, Azure Service Bus, etc.) benefit from the fix. The issue was discovered while upgrading the camel-spring-boot opentelemetry example from camel-opentelemetry to camel-opentelemetry2.
1 parent c099847 commit a0db01b

6 files changed

Lines changed: 173 additions & 51 deletions

File tree

components/camel-telemetry/src/main/java/org/apache/camel/telemetry/decorators/AbstractMessagingSpanDecorator.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,14 @@
1616
*/
1717
package org.apache.camel.telemetry.decorators;
1818

19+
import java.util.Map;
20+
1921
import org.apache.camel.Endpoint;
2022
import org.apache.camel.Exchange;
2123
import org.apache.camel.telemetry.Span;
24+
import org.apache.camel.telemetry.SpanContextPropagationExtractor;
2225
import org.apache.camel.telemetry.TagConstants;
26+
import org.apache.camel.telemetry.propagation.CamelMessagingHeadersSpanContextPropagationExtractor;
2327

2428
public abstract class AbstractMessagingSpanDecorator extends AbstractSpanDecorator {
2529

@@ -40,6 +44,22 @@ public void beforeTracingEvent(Span span, Exchange exchange, Endpoint endpoint)
4044
}
4145
}
4246

47+
@Override
48+
public SpanContextPropagationExtractor getExtractor(Exchange exchange) {
49+
return new CamelMessagingHeadersSpanContextPropagationExtractor(processHeaders(exchange.getIn().getHeaders()));
50+
}
51+
52+
/**
53+
* Hook for subclasses to transform headers before span context extraction. For example, JMS needs to decode
54+
* dash-encoded header keys ({@code __dash__} back to {@code -}).
55+
*
56+
* @param headers the raw exchange headers
57+
* @return the processed headers (default: unchanged)
58+
*/
59+
protected Map<String, Object> processHeaders(Map<String, Object> headers) {
60+
return headers;
61+
}
62+
4363
/**
4464
* This method identifies the destination from the supplied exchange and/or endpoint.
4565
*

components/camel-telemetry/src/main/java/org/apache/camel/telemetry/decorators/JmsSpanDecorator.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,12 @@
1616
*/
1717
package org.apache.camel.telemetry.decorators;
1818

19+
import java.util.HashMap;
20+
import java.util.Map;
21+
1922
import org.apache.camel.Endpoint;
2023
import org.apache.camel.Exchange;
21-
import org.apache.camel.telemetry.SpanContextPropagationExtractor;
2224
import org.apache.camel.telemetry.SpanContextPropagationInjector;
23-
import org.apache.camel.telemetry.propagation.CamelJMSHeadersSpanContextPropagationExtractor;
2425
import org.apache.camel.telemetry.propagation.CamelJMSHeadersSpanContextPropagationInjector;
2526

2627
public class JmsSpanDecorator extends AbstractMessagingSpanDecorator {
@@ -53,8 +54,10 @@ protected String getMessageId(Exchange exchange) {
5354
}
5455

5556
@Override
56-
public SpanContextPropagationExtractor getExtractor(Exchange exchange) {
57-
return new CamelJMSHeadersSpanContextPropagationExtractor(exchange.getIn().getHeaders());
57+
protected Map<String, Object> processHeaders(Map<String, Object> headers) {
58+
Map<String, Object> decoded = new HashMap<>();
59+
headers.forEach((k, v) -> decoded.put(k.replace(CamelJMSHeadersSpanContextPropagationInjector.JMS_DASH, "-"), v));
60+
return decoded;
5861
}
5962

6063
@Override

components/camel-telemetry/src/main/java/org/apache/camel/telemetry/propagation/CamelJMSHeadersSpanContextPropagationInjector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public final class CamelJMSHeadersSpanContextPropagationInjector implements Span
2727
// This means that any header names that contain illegal characters ("-", for
2828
// example) should be handled correctly,
2929
// Opentracing java-jms does it as follows.
30-
static final String JMS_DASH = "__dash__";
30+
public static final String JMS_DASH = "__dash__";
3131
private final Map<String, Object> map;
3232

3333
public CamelJMSHeadersSpanContextPropagationInjector(final Map<String, Object> map) {

components/camel-telemetry/src/main/java/org/apache/camel/telemetry/propagation/CamelJMSHeadersSpanContextPropagationExtractor.java renamed to components/camel-telemetry/src/main/java/org/apache/camel/telemetry/propagation/CamelMessagingHeadersSpanContextPropagationExtractor.java

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,22 @@
2424
import org.apache.camel.telemetry.SpanContextPropagationExtractor;
2525
import org.apache.camel.util.CaseInsensitiveMap;
2626

27-
public final class CamelJMSHeadersSpanContextPropagationExtractor implements SpanContextPropagationExtractor {
27+
/**
28+
* Messaging-aware span context propagation extractor that handles both String and byte[] header values. Many messaging
29+
* transports (Kafka, AMQP, JMS via SJMS, STOMP, Azure Service Bus, etc.) may deliver headers as raw byte arrays, so
30+
* trace context headers like {@code traceparent} need to be converted from byte[] to String for the W3C propagator to
31+
* extract them.
32+
*/
33+
public final class CamelMessagingHeadersSpanContextPropagationExtractor implements SpanContextPropagationExtractor {
2834

2935
private final Map<String, Object> map = new CaseInsensitiveMap();
3036

31-
public CamelJMSHeadersSpanContextPropagationExtractor(final Map<String, Object> map) {
37+
public CamelMessagingHeadersSpanContextPropagationExtractor(final Map<String, Object> map) {
3238
map.entrySet().stream().filter(e -> e.getValue() instanceof String || e.getValue() instanceof byte[]).forEach(e -> {
3339
if (e.getValue() instanceof byte[] bytes) {
34-
this.map.put(decodeDash(e.getKey()), new String(bytes, StandardCharsets.UTF_8));
40+
this.map.put(e.getKey(), new String(bytes, StandardCharsets.UTF_8));
3541
} else {
36-
this.map.put(decodeDash(e.getKey()), e.getValue());
42+
this.map.put(e.getKey(), e.getValue());
3743
}
3844
});
3945
}
@@ -52,16 +58,4 @@ public Object get(String key) {
5258
public Set<String> keys() {
5359
return map.keySet();
5460
}
55-
56-
/**
57-
* Decode dashes (encoded in {@link CamelJMSHeadersSpanContextPropagationInjector} Dash encoding and decoding is
58-
* required by JMS. This is implemented here rather than specifically to JMS so that other Camel messaging endpoints
59-
* can take part in traces where the peer is using JMS.
60-
*/
61-
private String decodeDash(String key) {
62-
if (key != null) {
63-
return key.replace(CamelJMSHeadersSpanContextPropagationInjector.JMS_DASH, "-");
64-
}
65-
return key;
66-
}
6761
}

components/camel-telemetry/src/test/java/org/apache/camel/telemetry/propagation/CamelMessagingHeadersExtractAdapterTest.java

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -17,57 +17,60 @@
1717
package org.apache.camel.telemetry.propagation;
1818

1919
import java.util.HashMap;
20-
import java.util.Iterator;
2120
import java.util.Map;
2221

22+
import org.apache.camel.Exchange;
23+
import org.apache.camel.Message;
2324
import org.apache.camel.telemetry.SpanContextPropagationExtractor;
24-
import org.junit.jupiter.api.BeforeEach;
25+
import org.apache.camel.telemetry.decorators.JmsSpanDecorator;
2526
import org.junit.jupiter.api.Test;
27+
import org.mockito.Mockito;
2628

2729
import static org.apache.camel.telemetry.propagation.CamelJMSHeadersSpanContextPropagationInjector.JMS_DASH;
2830
import static org.junit.jupiter.api.Assertions.assertEquals;
29-
import static org.junit.jupiter.api.Assertions.assertFalse;
3031

32+
/**
33+
* Tests that JMS dash-encoded header keys are properly decoded when extracting span context via JmsSpanDecorator.
34+
*/
3135
public class CamelMessagingHeadersExtractAdapterTest {
3236

33-
private Map<String, Object> map;
34-
35-
@BeforeEach
36-
public void before() {
37-
map = new HashMap<>();
37+
private SpanContextPropagationExtractor createJmsExtractor(Map<String, Object> headers) {
38+
Exchange exchange = Mockito.mock(Exchange.class);
39+
Message message = Mockito.mock(Message.class);
40+
Mockito.when(exchange.getIn()).thenReturn(message);
41+
Mockito.when(message.getHeaders()).thenReturn(headers);
42+
return new JmsSpanDecorator().getExtractor(exchange);
3843
}
3944

4045
@Test
41-
public void noProperties() {
42-
SpanContextPropagationExtractor adapter = new CamelJMSHeadersSpanContextPropagationExtractor(map);
43-
Iterator<Map.Entry<String, Object>> iterator = adapter.iterator();
44-
assertFalse(iterator.hasNext());
45-
}
46-
47-
@Test
48-
public void oneProperty() {
49-
map.put("key", "value");
50-
SpanContextPropagationExtractor adapter = new CamelJMSHeadersSpanContextPropagationExtractor(map);
51-
Iterator<Map.Entry<String, Object>> iterator = adapter.iterator();
52-
Map.Entry<String, Object> entry = iterator.next();
53-
assertEquals("key", entry.getKey());
54-
assertEquals("value", entry.getValue());
46+
public void propertyWithDash() {
47+
Map<String, Object> map = new HashMap<>();
48+
map.put(JMS_DASH + "key" + JMS_DASH + "1" + JMS_DASH, "value1");
49+
SpanContextPropagationExtractor adapter = createJmsExtractor(map);
50+
assertEquals("value1", adapter.get("-key-1-"));
5551
}
5652

5753
@Test
58-
public void propertyWithDash() {
59-
map.put(JMS_DASH + "key" + JMS_DASH + "1" + JMS_DASH, "value1");
60-
SpanContextPropagationExtractor adapter = new CamelJMSHeadersSpanContextPropagationExtractor(map);
61-
Iterator<Map.Entry<String, Object>> iterator = adapter.iterator();
62-
Map.Entry<String, Object> entry = iterator.next();
63-
assertEquals("-key-1-", entry.getKey());
64-
assertEquals("value1", entry.getValue());
54+
public void traceparentWithEncodedDashes() {
55+
Map<String, Object> map = new HashMap<>();
56+
map.put("traceparent", "00-abc123-def456-01");
57+
SpanContextPropagationExtractor adapter = createJmsExtractor(map);
58+
assertEquals("00-abc123-def456-01", adapter.get("traceparent"));
6559
}
6660

6761
@Test
6862
public void keyWithDifferentCase() {
63+
Map<String, Object> map = new HashMap<>();
6964
map.put("key", "value");
70-
SpanContextPropagationExtractor adapter = new CamelJMSHeadersSpanContextPropagationExtractor(map);
65+
SpanContextPropagationExtractor adapter = createJmsExtractor(map);
7166
assertEquals("value", adapter.get("KeY"));
7267
}
68+
69+
@Test
70+
public void byteArrayPropertyWithDashDecode() {
71+
Map<String, Object> map = new HashMap<>();
72+
map.put(JMS_DASH + "traceparent", "00-abc-def-01".getBytes());
73+
SpanContextPropagationExtractor adapter = createJmsExtractor(map);
74+
assertEquals("00-abc-def-01", adapter.get("-traceparent"));
75+
}
7376
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.camel.telemetry.propagation;
18+
19+
import java.nio.charset.StandardCharsets;
20+
import java.util.HashMap;
21+
import java.util.Iterator;
22+
import java.util.Map;
23+
24+
import org.apache.camel.telemetry.SpanContextPropagationExtractor;
25+
import org.junit.jupiter.api.BeforeEach;
26+
import org.junit.jupiter.api.Test;
27+
28+
import static org.junit.jupiter.api.Assertions.assertEquals;
29+
import static org.junit.jupiter.api.Assertions.assertFalse;
30+
import static org.junit.jupiter.api.Assertions.assertNull;
31+
import static org.junit.jupiter.api.Assertions.assertTrue;
32+
33+
public class CamelMessagingHeadersSpanContextPropagationExtractorTest {
34+
35+
private Map<String, Object> map;
36+
37+
@BeforeEach
38+
public void before() {
39+
map = new HashMap<>();
40+
}
41+
42+
@Test
43+
public void noProperties() {
44+
SpanContextPropagationExtractor adapter = new CamelMessagingHeadersSpanContextPropagationExtractor(map);
45+
Iterator<Map.Entry<String, Object>> iterator = adapter.iterator();
46+
assertFalse(iterator.hasNext());
47+
}
48+
49+
@Test
50+
public void stringProperty() {
51+
map.put("key", "value");
52+
SpanContextPropagationExtractor adapter = new CamelMessagingHeadersSpanContextPropagationExtractor(map);
53+
Iterator<Map.Entry<String, Object>> iterator = adapter.iterator();
54+
Map.Entry<String, Object> entry = iterator.next();
55+
assertEquals("key", entry.getKey());
56+
assertEquals("value", entry.getValue());
57+
}
58+
59+
@Test
60+
public void byteArrayProperty() {
61+
map.put("traceparent", "00-abc123-def456-01".getBytes(StandardCharsets.UTF_8));
62+
SpanContextPropagationExtractor adapter = new CamelMessagingHeadersSpanContextPropagationExtractor(map);
63+
Iterator<Map.Entry<String, Object>> iterator = adapter.iterator();
64+
Map.Entry<String, Object> entry = iterator.next();
65+
assertEquals("traceparent", entry.getKey());
66+
assertEquals("00-abc123-def456-01", entry.getValue());
67+
}
68+
69+
@Test
70+
public void mixedStringAndByteArrayProperties() {
71+
map.put("traceparent", "00-abc123-def456-01".getBytes(StandardCharsets.UTF_8));
72+
map.put("custom-header", "custom-value");
73+
SpanContextPropagationExtractor adapter = new CamelMessagingHeadersSpanContextPropagationExtractor(map);
74+
assertEquals("00-abc123-def456-01", adapter.get("traceparent"));
75+
assertEquals("custom-value", adapter.get("custom-header"));
76+
}
77+
78+
@Test
79+
public void nonStringNonByteArrayPropertyIsFiltered() {
80+
map.put("integer-header", 42);
81+
map.put("key", "value");
82+
SpanContextPropagationExtractor adapter = new CamelMessagingHeadersSpanContextPropagationExtractor(map);
83+
assertNull(adapter.get("integer-header"));
84+
assertEquals("value", adapter.get("key"));
85+
assertTrue(adapter.keys().contains("key"));
86+
assertFalse(adapter.keys().contains("integer-header"));
87+
}
88+
89+
@Test
90+
public void keyWithDifferentCase() {
91+
map.put("traceparent", "00-abc123-def456-01".getBytes(StandardCharsets.UTF_8));
92+
SpanContextPropagationExtractor adapter = new CamelMessagingHeadersSpanContextPropagationExtractor(map);
93+
assertEquals("00-abc123-def456-01", adapter.get("TraceParent"));
94+
}
95+
96+
@Test
97+
public void propertyWithDashInKey() {
98+
map.put("x-custom-header", "value".getBytes(StandardCharsets.UTF_8));
99+
SpanContextPropagationExtractor adapter = new CamelMessagingHeadersSpanContextPropagationExtractor(map);
100+
assertEquals("value", adapter.get("x-custom-header"));
101+
}
102+
}

0 commit comments

Comments
 (0)