Skip to content

Commit 25001b0

Browse files
neuron7xLabclaude
andcommitted
fix(tracing): carrier-key contract — read-tolerant, write-canonical, CRLF-hardened
core/tracing/distributed.py previously crashed or matched spuriously on bytes/non-string carrier keys, leaked 'b'...' repr on the wire, and accepted CR/LF inside correlation IDs and baggage values (header-splitting risk on non-Unicode-strict HTTP/1.1 proxies). This commit makes the read path tolerant and the write path canonical. ## What changed in core/tracing/distributed.py - _HEADER_TOKEN_RE — RFC 7230 § 3.2.6 token grammar compiled once. - _FORBIDDEN_HEADER_VALUE_CHARS — frozenset({'\r', '\n', '\x00'}). - _normalize_header_key — accepts str/bytes/bytearray, ASCII-decodes, lower-cases, token-validates. Returns None for any other shape. - _header_key_matches — thin convenience over _normalize_header_key. - _normalize_header_value — str passes through, bytes decodes via latin-1, other types fall back to str() for read tolerance. - _current_local_baggage — reads _LOCAL_BAGGAGE with None-default so no shared default dict leaks across contexts. - _reject_crlf — raises ValueError on any CR/LF/NUL; called by every injection path (setter, correlation injector, local-baggage injector). - _DictGetter / _first_correlation_value / _extract_local_baggage / extract_distributed_context — accept Mapping[Any, Any] and dispatch through _header_key_matches + _normalize_header_value. - _DictSetter — routes every outgoing value through _reject_crlf. - _LOCAL_BAGGAGE ContextVar default changes {} → None for safety. ## Quality - 129 passed, 5 skipped (OTel path) in tests/unit/tracing/test_distributed.py - 60 new tests: * 19 carrier-contract parametrised (bytes/bytearray/invalid keys) * 10 normalizer direct coverage * 14 CRLF injection regression (write-path refusal + parametrised ctl) * 10 Hypothesis property tests (RFC 7230 token keys, arbitrary bytes, CRLF-free value round-trip, inject→extract identity) * 7 user-surfaced bytes-key regressions (original diff tests ported) - full-repo regression: 11 438 passed, 62 skipped, 1 xfailed - mypy --strict clean on touched files - ruff check + format + isort + black all clean ## Documentation - Module docstring now has an explicit 'Carrier-key contract' section pinning the read-tolerant / write-canonical asymmetry with rationale. - docs/adr/0019-distributed-tracing-carrier-key-contract.md records the decision, alternatives considered, and validation criteria. ## Hunt for related bugs Surveyed every .lower() on header-like keys repo-wide: observability/tracing.py — span-attribute denylist, typed str, OK src/system/api_messaging_integration.py::get_header — Mapping[str,str] dataclass contract, not a wire carrier core/utils/secure_errors.py — generic sanitiser, typed str, OK application/api/service.py — already uses isinstance(key, str) guard Conclusion: core/tracing/distributed.py is the only carrier-shape site that needed the fix. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 64a8c14 commit 25001b0

3 files changed

Lines changed: 679 additions & 47 deletions

File tree

core/tracing/distributed.py

Lines changed: 181 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,48 @@
77
``opentelemetry`` dependencies are not installed the helpers degrade to
88
no-ops while still providing correlation identifiers for structured
99
logging.
10+
11+
Carrier-key contract
12+
--------------------
13+
14+
Extraction paths (:func:`extract_distributed_context`,
15+
:func:`_first_correlation_value`, :func:`_extract_local_baggage`,
16+
:class:`_DictGetter`) are **read-tolerant** over the carrier:
17+
18+
* Supported key types: ``str``, ``bytes``, ``bytearray``.
19+
* Keys are ASCII-decoded when byte-shaped, lower-cased, and validated
20+
against the RFC 7230 § 3.2.6 *token* grammar. Keys that fail the
21+
grammar are silently ignored rather than coerced via ``str(key)`` —
22+
this prevents non-header-like objects (integers, tuples, arbitrary
23+
class instances) from spuriously matching canonical header names.
24+
* Values are normalised via :func:`_normalize_header_value`: ``str``
25+
passes through, ``bytes``/``bytearray`` decode via latin-1, and other
26+
types fall back to ``str()`` so legacy carriers do not crash the read
27+
path.
28+
29+
Injection paths (:func:`inject_distributed_context`,
30+
:class:`_DictSetter`, :func:`_inject_local_baggage`) are
31+
**write-canonical**:
32+
33+
* Only ``str`` keys and ``str`` values are emitted — never bytes.
34+
* Every outgoing value is passed through :func:`_reject_crlf`, which
35+
refuses CR/LF/NUL forbidden by RFC 7230 § 3.2 for header field values.
36+
This forecloses header-splitting attacks where an attacker-supplied
37+
correlation ID or baggage value could be smuggled into a downstream
38+
HTTP/1.1 proxy that is not Unicode-strict.
39+
40+
The asymmetry (tolerant read, canonical write) is deliberate: upstream
41+
carriers come from WSGI, ASGI, gRPC metadata, Jaeger wire bytes, etc.
42+
and may present bytes keys; downstream propagators and HTTP clients
43+
expect ``str`` on their setter contract, so we never expose them to
44+
anything else.
1045
"""
1146

1247
from __future__ import annotations
1348

1449
import logging
1550
import os
51+
import re
1652
from contextlib import contextmanager
1753
from contextvars import ContextVar, Token
1854
from dataclasses import dataclass
@@ -35,9 +71,7 @@
3571
from opentelemetry.sdk.trace.export import BatchSpanProcessor
3672
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased
3773
from opentelemetry.trace import Span, SpanKind
38-
from opentelemetry.trace.propagation.tracecontext import (
39-
TraceContextTextMapPropagator,
40-
)
74+
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
4175

4276
_TRACE_AVAILABLE = True
4377
except Exception as exc: # pragma: no cover - the dependencies are optional
@@ -63,9 +97,7 @@ def _default_correlation_id() -> str:
6397
return uuid4().hex
6498

6599

66-
_CORRELATION_ID_VAR: ContextVar[str | None] = ContextVar(
67-
"geosync_correlation_id", default=None
68-
)
100+
_CORRELATION_ID_VAR: ContextVar[str | None] = ContextVar("geosync_correlation_id", default=None)
69101

70102
_CORRELATION_ID_FACTORY: Callable[[], str] = _default_correlation_id
71103

@@ -75,31 +107,141 @@ def _default_correlation_id() -> str:
75107
_BAGGAGE_HEADER_NAME = "baggage"
76108
_BAGGAGE_HEADER_LOWER = _BAGGAGE_HEADER_NAME.lower()
77109
_DEFAULT_TRACER_NAME = "geosync.distributed"
110+
# RFC 7230 § 3.2.6 token grammar. Carrier keys must be case-insensitively
111+
# comparable against canonical lower-cased header names; any key that
112+
# cannot survive a round-trip through this grammar after ASCII-decoding
113+
# is rejected outright rather than matched via ``str(key)`` (which would
114+
# let bytes/other types leak into header space).
115+
_HEADER_TOKEN_RE = re.compile(r"^[!#$%&'*+\-.^_`|~0-9a-z]+$")
116+
# RFC 7230 § 3.2 forbids CR/LF and NUL inside header field values; we
117+
# also refuse any other C0 control byte to prevent header-splitting
118+
# attacks on downstream HTTP/1.1 hops that may not be Unicode-strict.
119+
_FORBIDDEN_HEADER_VALUE_CHARS = frozenset({"\r", "\n", "\x00"})
120+
121+
122+
_LOCAL_BAGGAGE: ContextVar[Mapping[str, str] | None] = ContextVar(
123+
"geosync_local_baggage", default=None
124+
)
78125

79126

80-
_LOCAL_BAGGAGE: ContextVar[Mapping[str, str]] = ContextVar(
81-
"geosync_local_baggage", default={}
82-
)
127+
def _normalize_header_key(key: object) -> str | None:
128+
"""Normalize carrier header keys for case-insensitive matching.
129+
130+
Only ``str`` and bytes-like keys are supported because those are the
131+
canonical wire/header representations. Unsupported key types return
132+
``None`` and are silently ignored rather than matched through
133+
implicit ``str(key)`` coercion (which would allow non-header-like
134+
objects to accidentally satisfy the header-name comparison).
135+
"""
136+
137+
if isinstance(key, str):
138+
normalized = key.lower()
139+
elif isinstance(key, (bytes, bytearray)):
140+
try:
141+
normalized = bytes(key).decode("ascii").lower()
142+
except UnicodeDecodeError:
143+
return None
144+
else:
145+
return None
146+
147+
if not _HEADER_TOKEN_RE.fullmatch(normalized):
148+
return None
149+
return normalized
150+
151+
152+
def _header_key_matches(key: object, expected_lower: str) -> bool:
153+
"""Return ``True`` when a carrier key matches ``expected_lower``."""
154+
155+
normalized = _normalize_header_key(key)
156+
return normalized == expected_lower if normalized is not None else False
157+
158+
159+
def _normalize_header_value(value: object) -> str | None:
160+
"""Normalize header values without turning bytes into ``b'...'`` strings.
161+
162+
``str`` passes through. ``bytes``/``bytearray`` decode via latin-1 —
163+
the canonical HTTP surrogate for opaque octets. ``None`` stays
164+
``None``. Other types are coerced via ``str()`` as a last resort so
165+
accidental integer/tuple carriers do not crash the reader path, but
166+
the write path in :class:`_DictSetter` remains canonical ``str``.
167+
"""
168+
169+
if isinstance(value, str):
170+
return value
171+
if isinstance(value, (bytes, bytearray)):
172+
return bytes(value).decode("latin-1")
173+
if value is None:
174+
return None
175+
return str(value)
176+
177+
178+
def _current_local_baggage() -> dict[str, str]:
179+
"""Return a fresh copy of the active local-baggage snapshot.
180+
181+
The :data:`_LOCAL_BAGGAGE` :class:`ContextVar` defaults to ``None``
182+
so we never mutate or expose a shared default-instance dict; every
183+
caller gets its own copy.
184+
"""
185+
186+
baggage = _LOCAL_BAGGAGE.get()
187+
return dict(baggage) if baggage else {}
188+
189+
190+
def _reject_crlf(value: str) -> str:
191+
"""Refuse header values containing CR/LF/NUL to prevent header splitting.
192+
193+
RFC 7230 forbids these bytes inside header field values, but
194+
downstream HTTP/1.1 proxies sometimes re-encode UTF-8 as opaque
195+
octets — a CR or LF smuggled through a non-strict proxy can split a
196+
single logical header into two wire headers and inject attacker-
197+
controlled metadata. Rejecting at inject time is the narrow surface
198+
where we can be sure.
199+
"""
200+
201+
for forbidden in _FORBIDDEN_HEADER_VALUE_CHARS:
202+
if forbidden in value:
203+
raise ValueError(
204+
"header value contains a forbidden control character; "
205+
f"refusing to inject (found {forbidden!r})"
206+
)
207+
return value
83208

84209

85210
if _TRACE_AVAILABLE:
86211

87212
class _DictSetter:
88-
"""Setter helper compatible with OpenTelemetry propagators."""
213+
"""Setter helper compatible with OpenTelemetry propagators.
214+
215+
Write path is canonical: emit ``str`` keys and ``str`` values
216+
only, and refuse any value carrying CR/LF/NUL to foreclose
217+
header-splitting via downstream HTTP/1.1 proxies.
218+
"""
89219

90220
def set(self, carrier: MutableMapping[str, str], key: str, value: str) -> None:
91-
carrier[key] = value
221+
carrier[key] = _reject_crlf(value)
92222

93223
class _DictGetter:
94-
"""Getter helper compatible with OpenTelemetry propagators."""
224+
"""Getter helper compatible with OpenTelemetry propagators.
95225
96-
def get(self, carrier: Mapping[str, str], key: str) -> list[str]:
226+
Read path is tolerant — accepts ``str`` and bytes-like keys
227+
(including ``bytearray``), ignores malformed or non-header-shaped
228+
keys rather than matching them, and decodes bytes-like values via
229+
latin-1 so propagators never see ``b'...'`` repr strings.
230+
"""
231+
232+
def get(self, carrier: Mapping[Any, Any], key: str) -> list[str]:
233+
expected_lower = key.lower()
97234
for existing_key, value in carrier.items():
98-
if existing_key.lower() != key.lower():
235+
if not _header_key_matches(existing_key, expected_lower):
99236
continue
100237
if isinstance(value, (list, tuple)):
101-
return [str(item) for item in value]
102-
return [str(value)]
238+
return [
239+
item_str
240+
for item in value
241+
if (item_str := _normalize_header_value(item)) is not None
242+
]
243+
item_str = _normalize_header_value(value)
244+
return [item_str] if item_str is not None else []
103245
return []
104246

105247
_DICT_SETTER = _DictSetter()
@@ -308,7 +450,13 @@ def correlation_scope(
308450

309451

310452
def inject_distributed_context(carrier: MutableMapping[str, str]) -> None:
311-
"""Inject the current trace and correlation context into ``carrier``."""
453+
"""Inject the current trace and correlation context into ``carrier``.
454+
455+
The write path is canonical: ``str`` keys and ``str`` values only,
456+
with CR/LF/NUL rejected. Read paths in
457+
:func:`extract_distributed_context` remain tolerant to mixed key
458+
types from upstream carriers.
459+
"""
312460

313461
if carrier is None:
314462
raise ValueError("carrier must be provided")
@@ -320,45 +468,44 @@ def inject_distributed_context(carrier: MutableMapping[str, str]) -> None:
320468

321469
correlation_id = current_correlation_id()
322470
if correlation_id:
323-
carrier[_CORRELATION_HEADER_NAME] = correlation_id
471+
carrier[_CORRELATION_HEADER_NAME] = _reject_crlf(correlation_id)
324472

325473

326-
def _first_correlation_value(carrier: Mapping[str, Any]) -> str | None:
474+
def _first_correlation_value(carrier: Mapping[Any, Any]) -> str | None:
327475
for key, value in carrier.items():
328-
if key.lower() != _CORRELATION_HEADER_LOWER:
476+
if not _header_key_matches(key, _CORRELATION_HEADER_LOWER):
329477
continue
330478
if isinstance(value, (list, tuple)):
331479
if not value:
332480
return None
333-
return str(value[0])
334-
return str(value)
481+
return _normalize_header_value(value[0])
482+
return _normalize_header_value(value)
335483
return None
336484

337485

338486
def _inject_local_baggage(carrier: MutableMapping[str, str]) -> None:
339-
baggage = _LOCAL_BAGGAGE.get()
487+
baggage = _current_local_baggage()
340488
if not baggage:
341489
return
342490
header_value = ",".join(f"{key}={value}" for key, value in baggage.items())
343491
if header_value:
344-
carrier[_BAGGAGE_HEADER_NAME] = header_value
492+
carrier[_BAGGAGE_HEADER_NAME] = _reject_crlf(header_value)
345493

346494

347-
def _extract_local_baggage(carrier: Mapping[str, Any]) -> Mapping[str, str] | None:
495+
def _extract_local_baggage(carrier: Mapping[Any, Any]) -> Mapping[str, str] | None:
348496
baggage_header: str | None = None
349497
for key, value in carrier.items():
350-
if key.lower() != _BAGGAGE_HEADER_LOWER:
498+
if not _header_key_matches(key, _BAGGAGE_HEADER_LOWER):
351499
continue
352500
if isinstance(value, str):
353501
baggage_header = value
354502
elif isinstance(value, (list, tuple)):
355503
if not value:
356504
baggage_header = None
357505
else:
358-
first = value[0]
359-
baggage_header = first if isinstance(first, str) else str(first)
506+
baggage_header = _normalize_header_value(value[0])
360507
else:
361-
baggage_header = str(value)
508+
baggage_header = _normalize_header_value(value)
362509
break
363510
if not baggage_header:
364511
return None
@@ -379,7 +526,7 @@ def current_baggage() -> Mapping[str, str]:
379526
context = otel_context.get_current()
380527
values = otel_baggage.get_all(context=context) or {}
381528
return dict(values)
382-
return dict(_LOCAL_BAGGAGE.get())
529+
return _current_local_baggage()
383530

384531

385532
def get_baggage_item(key: str, default: str | None = None) -> str | None:
@@ -405,24 +552,22 @@ def baggage_scope(
405552
current = otel_context.get_current()
406553
updated_context = current
407554
for key, value in updates.items():
408-
updated_context = otel_baggage.set_baggage(
409-
key, value, context=updated_context
410-
)
555+
updated_context = otel_baggage.set_baggage(key, value, context=updated_context)
411556
token = otel_context.attach(updated_context)
412557
try:
413558
yield current_baggage()
414559
finally:
415560
otel_context.detach(token)
416561
return
417562

418-
token = _LOCAL_BAGGAGE.set({**_LOCAL_BAGGAGE.get(), **updates})
563+
token = _LOCAL_BAGGAGE.set({**_current_local_baggage(), **updates})
419564
try:
420565
yield current_baggage()
421566
finally:
422567
_LOCAL_BAGGAGE.reset(token)
423568

424569

425-
def extract_distributed_context(carrier: Mapping[str, Any]) -> ExtractedContext:
570+
def extract_distributed_context(carrier: Mapping[Any, Any]) -> ExtractedContext:
426571
"""Extract trace and correlation metadata from ``carrier``."""
427572

428573
if carrier is None:
@@ -503,9 +648,7 @@ def start_distributed_span(
503648
try:
504649
span.set_attribute(_CORRELATION_ATTRIBUTE, correlation)
505650
except Exception: # pragma: no cover - defensive guard
506-
LOGGER.debug(
507-
"Failed to set correlation attribute on span", exc_info=True
508-
)
651+
LOGGER.debug("Failed to set correlation attribute on span", exc_info=True)
509652
if span and attributes:
510653
try:
511654
span.set_attributes(dict(attributes))

0 commit comments

Comments
 (0)