Skip to content

Commit b69ac92

Browse files
tconley1428claude
andauthored
Add OpenTelemetry v2 integration with enhanced features and comprehensive testing (#1314)
* Add OpenTelemetry v2 interceptor with enhanced features This commit adds a new OpenTelemetry interceptor (opentelemetryv2) with enhanced capabilities for Temporal workflow integration: Features: - Deterministic ID generation for spans/traces in workflows using TemporalIdGenerator - Context propagation across workflow and activity boundaries - Support for workflow-level span creation via workflow.start_as_current_span - Enhanced interceptor with context propagation to activities and nexus operations - Compatible with existing opentelemetry module while providing additional functionality Implementation: - New TemporalIdGenerator uses workflow.random() for deterministic IDs in workflows - TracingInterceptor handles client, worker, activity, workflow, and nexus operations - Workflow-safe span creation context manager in workflow module - Comprehensive test coverage for trace propagation scenarios This is separate from the OpenAI agents OTEL integration and provides general-purpose OpenTelemetry improvements for Temporal workflows. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * Enhance OpenTelemetry v2 integration with comprehensive testing and linting fixes This commit significantly improves the OpenTelemetry v2 integration for the Temporal SDK with the following enhancements: ## Core Features Added: - **Comprehensive test coverage**: Added `test_opentelemetryv2_comprehensive_tracing` covering all workflow operations including activities, local activities, child workflows, timers, signals, updates, queries, and Nexus operations - **Read-only mode detection**: Implemented `workflow.unsafe.is_read_only()` to prevent span ID generation errors during queries and update validators - **Test isolation**: Added pytest fixture to reset OpenTelemetry tracer provider state between test runs - **Span hierarchy validation**: Refactored tests to use `dump_spans()` hierarchy validation for better maintainability ## Linting and Documentation: - Fixed all import path issues for OpenTelemetry ID generators - Added comprehensive docstrings for all public classes and methods - Fixed type annotations and null handling throughout the codebase - Resolved Nexus headers access issues with proper type protocols - Achieved complete pydocstyle compliance ## Technical Improvements: - Enhanced `TemporalSpanProcessor` with proper replay handling - Improved `TemporalIdGenerator` with deterministic workflow-safe random generation - Updated span parenting validation to ensure proper trace relationships - Added max_cached_workflows=0 to all test workers for deterministic behavior 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * Add debugging to comprehensive test * Fix formatting * Skip test on timeskipping * Merge opentelemetry contribs * Switch to batch processor * Use new workflow random functionality for id generation * Update to remove global state modification from plugin, and span processor * Remove inaccurate comment * Move otel tests * Fix test import * PR Feedback * Rely on the global and pass it through. All uses of a global tracer provider in a workflow will use a replay safe version. Care should still be taken if creating one from scratch inside a workflow * Fix rebase issue with standalone activities context * Clean up some unused code paths * Change tracerprovider return type * Remove is read only * Clean up test prints * Return ReplaySafeTracerProvider * Fix lint * Add readme * Debugging * More debugging * More debugging logs * Change debugging * Switch to is_read_only because updatevalidators are not technically during replay * Linting * Remove interceptor tracer members * Change plugin interceptor logic to propagate client/worker combined interceptors if not present in the worker's client * Pass through all of otel * Remove debug file --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent 5baf94e commit b69ac92

14 files changed

Lines changed: 2168 additions & 44 deletions
Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
# OpenTelemetry Integration for Temporal Python SDK
2+
3+
This package provides OpenTelemetry tracing integration for Temporal workflows, activities, and other operations. It includes automatic span creation and propagation for distributed tracing across your Temporal applications.
4+
5+
## Overview
6+
7+
There are **two different approaches** for integrating OpenTelemetry with the Temporal Python SDK:
8+
9+
1. **🆕 New Approach (Recommended)**: `OpenTelemetryPlugin` - Provides accurate duration spans and direct OpenTelemetry usage within workflows
10+
2. **📊 Legacy Approach**: `TracingInterceptor` - Provides immediate span visibility but with zero-duration workflow spans
11+
12+
## Quick Start
13+
14+
### New Approach (OpenTelemetryPlugin)
15+
16+
```python
17+
import opentelemetry.trace
18+
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
19+
from temporalio.contrib.opentelemetry import OpenTelemetryPlugin, create_tracer_provider
20+
21+
# Create a replay-safe tracer provider
22+
provider = create_tracer_provider()
23+
provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
24+
opentelemetry.trace.set_tracer_provider(provider)
25+
26+
# Register plugin on CLIENT (automatically applies to workers using this client)
27+
client = await Client.connect(
28+
"localhost:7233",
29+
plugins=[OpenTelemetryPlugin()]
30+
)
31+
32+
# Workers created with this client automatically get the plugin
33+
worker = Worker(
34+
client,
35+
task_queue="my-task-queue",
36+
workflows=[MyWorkflow],
37+
activities=[my_activity]
38+
# NO NEED to specify plugins here - they come from the client
39+
)
40+
```
41+
42+
### Legacy Approach (TracingInterceptor)
43+
44+
```python
45+
from temporalio.contrib.opentelemetry import TracingInterceptor
46+
47+
# Register interceptor on CLIENT (automatically applies to workers using this client)
48+
client = await Client.connect(
49+
"localhost:7233",
50+
interceptors=[TracingInterceptor()]
51+
)
52+
53+
# Workers created with this client automatically get the interceptor
54+
worker = Worker(
55+
client,
56+
task_queue="my-task-queue",
57+
workflows=[MyWorkflow],
58+
activities=[my_activity]
59+
# NO NEED to specify interceptors here - they come from the client
60+
)
61+
```
62+
63+
## Detailed Comparison
64+
65+
### New Approach: OpenTelemetryPlugin
66+
67+
#### ✅ Advantages:
68+
- **Accurate Duration Spans**: Workflow spans have real durations reflecting actual execution time
69+
- **Direct OpenTelemetry Usage**: Use `opentelemetry.trace.get_tracer()` directly within workflows
70+
- **Better Span Hierarchy**: More accurate parent-child relationships within workflows
71+
- **Workflow Context Access**: Access spans within workflows using `temporalio.contrib.opentelemetry.workflow.tracer()`
72+
73+
#### ⚠️ Considerations:
74+
- **Experimental Status**: Subject to breaking changes in future versions
75+
- **Delayed Span Visibility**: Workflow spans only appear after workflow completion
76+
- **Different Trace Structure**: Migration from legacy approach may break dependencies on specific trace structures
77+
78+
#### Usage Example:
79+
```python
80+
@workflow.defn
81+
class MyWorkflow:
82+
@workflow.run
83+
async def run(self):
84+
# Direct OpenTelemetry usage works correctly
85+
tracer = get_tracer(__name__)
86+
with tracer.start_as_current_span("workflow-operation"):
87+
# This span will have accurate duration
88+
await workflow.execute_activity(
89+
my_activity,
90+
start_to_close_timeout=timedelta(seconds=30)
91+
)
92+
```
93+
94+
### Legacy Approach: TracingInterceptor
95+
96+
**File**: `temporalio/contrib/opentelemetry/_interceptor.py`
97+
98+
#### ✅ Advantages:
99+
- **Immediate Span Visibility**: Spans appear as soon as they're created
100+
- **Stable API**: Well-established interface, not subject to experimental changes
101+
- **Workflow Progress Tracking**: Can see workflow spans even before workflow completes
102+
103+
#### ⚠️ Limitations:
104+
- **Zero-Duration Workflow Spans**: All workflow spans are immediately ended with 0ms duration
105+
- **No Direct OpenTelemetry Usage**: Cannot use standard OpenTelemetry APIs within workflows
106+
- **Limited Workflow Span Creation**: Must use `temporalio.contrib.opentelemetry.workflow.completed_span()`
107+
108+
#### Usage Example:
109+
```python
110+
@workflow.defn
111+
class MyWorkflow:
112+
@workflow.run
113+
async def run(self):
114+
# Must use Temporal-specific span creation
115+
temporalio.contrib.opentelemetry.workflow.completed_span(
116+
"workflow-operation",
117+
attributes={"custom": "attribute"}
118+
)
119+
# Standard OpenTelemetry APIs don't work properly here
120+
```
121+
122+
## When to Use Each Approach
123+
124+
### Choose OpenTelemetryPlugin When:
125+
- You need accurate span durations for performance analysis
126+
- You want to use standard OpenTelemetry APIs within workflows
127+
- You're building new applications
128+
- You can tolerate experimental API changes
129+
130+
### Choose TracingInterceptor When:
131+
- You need immediate visibility into workflow progress
132+
- You have existing dependencies on the current trace structure
133+
- You require a stable, non-experimental API
134+
- You primarily need basic tracing without complex workflow span hierarchies
135+
136+
## Configuration Options
137+
138+
### OpenTelemetryPlugin Options
139+
140+
```python
141+
plugin = OpenTelemetryPlugin(
142+
add_temporal_spans=False # Whether to add additional Temporal-specific spans
143+
)
144+
```
145+
146+
### TracingInterceptor Options
147+
148+
```python
149+
interceptor = TracingInterceptor(
150+
tracer=None, # Custom tracer (defaults to global tracer)
151+
always_create_workflow_spans=False # Create spans even without parent context
152+
)
153+
```
154+
155+
## Migration Guide
156+
157+
### From TracingInterceptor to OpenTelemetryPlugin
158+
159+
1. **Replace interceptor with plugin on client**:
160+
```python
161+
# Old
162+
client = await Client.connect(
163+
"localhost:7233",
164+
interceptors=[TracingInterceptor()]
165+
)
166+
167+
# New
168+
provider = create_tracer_provider()
169+
opentelemetry.trace.set_tracer_provider(provider)
170+
client = await Client.connect(
171+
"localhost:7233",
172+
plugins=[OpenTelemetryPlugin()]
173+
)
174+
```
175+
176+
2. **Update workflow span creation**:
177+
```python
178+
# Old
179+
temporalio.contrib.opentelemetry.workflow.completed_span("my-span")
180+
181+
# New - use standard OpenTelemetry
182+
tracer = get_tracer(__name__)
183+
with tracer.start_as_current_span("my-span"):
184+
# Your workflow logic
185+
pass
186+
```
187+
188+
3. **Test trace structure changes**: Verify that any monitoring or analysis tools still work with the new trace structure.
189+
190+
## Advanced Usage
191+
192+
### Creating Custom Spans in Workflows (New Approach)
193+
194+
```python
195+
from opentelemetry.trace import get_tracer
196+
197+
@workflow.defn
198+
class MyWorkflow:
199+
@workflow.run
200+
async def run(self):
201+
tracer = get_tracer(__name__)
202+
203+
# Create spans with accurate durations
204+
with tracer.start_as_current_span("business-logic") as span:
205+
span.set_attribute("workflow.step", "processing")
206+
207+
# Nested spans work correctly
208+
with tracer.start_as_current_span("data-validation"):
209+
await self.validate_input()
210+
211+
await workflow.execute_activity(
212+
process_data,
213+
start_to_close_timeout=timedelta(seconds=60)
214+
)
215+
```
216+
217+
### Custom Span Attributes
218+
219+
Both approaches support adding custom attributes to spans:
220+
221+
```python
222+
# Legacy approach
223+
temporalio.contrib.opentelemetry.workflow.completed_span(
224+
"my-operation",
225+
attributes={
226+
"business.unit": "payments",
227+
"request.id": "req-123"
228+
}
229+
)
230+
231+
# New approach
232+
with tracer.start_as_current_span("my-operation") as span:
233+
span.set_attributes({
234+
"business.unit": "payments",
235+
"request.id": "req-123"
236+
})
237+
```
238+
239+
## Best Practices
240+
241+
1. **Register on Client**: Always register plugins/interceptors on the client, not the worker, to ensure proper context propagation
242+
243+
2. **Use create_tracer_provider()**: Always use the provided function to create replay-safe tracer providers when using the new approach
244+
245+
3. **Set Global Tracer Provider**: Ensure the tracer provider is set globally before creating clients
246+
247+
4. **Avoid Duplication**: Never register the same plugin/interceptor on both client and worker
248+
249+
## Troubleshooting
250+
251+
### Common Issues
252+
253+
1. **"ReplaySafeTracerProvider required" error**: Make sure you're using `create_tracer_provider()` when using OpenTelemetryPlugin
254+
255+
2. **Missing spans**: Verify that the tracer provider is set before creating clients, and that plugins/interceptors are registered on the client
256+
257+
3. **Duplicate spans**: Check that you haven't registered the same plugin/interceptor on both client and worker
258+
259+
4. **Zero-duration spans**: This is expected behavior with TracingInterceptor for workflow spans
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
"""OpenTelemetry v2 integration for Temporal SDK.
2+
3+
This package provides OpenTelemetry tracing integration for Temporal workflows,
4+
activities, and other operations. It includes automatic span creation and
5+
propagation for distributed tracing.
6+
"""
7+
8+
from temporalio.contrib.opentelemetry._interceptor import (
9+
TracingInterceptor,
10+
TracingWorkflowInboundInterceptor,
11+
)
12+
from temporalio.contrib.opentelemetry._otel_interceptor import OpenTelemetryInterceptor
13+
from temporalio.contrib.opentelemetry._plugin import OpenTelemetryPlugin
14+
from temporalio.contrib.opentelemetry._tracer_provider import create_tracer_provider
15+
16+
__all__ = [
17+
"TracingInterceptor",
18+
"TracingWorkflowInboundInterceptor",
19+
"OpenTelemetryInterceptor",
20+
"OpenTelemetryPlugin",
21+
"create_tracer_provider",
22+
]
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import random
2+
3+
from opentelemetry.sdk.trace.id_generator import IdGenerator
4+
from opentelemetry.trace import (
5+
INVALID_SPAN_ID,
6+
INVALID_TRACE_ID,
7+
)
8+
9+
import temporalio.workflow
10+
11+
12+
def _get_workflow_random() -> random.Random | None:
13+
if (
14+
temporalio.workflow.in_workflow()
15+
and not temporalio.workflow.unsafe.is_read_only()
16+
):
17+
if (
18+
getattr(temporalio.workflow.instance(), "__temporal_otel_id_random", None)
19+
is None
20+
):
21+
setattr(
22+
temporalio.workflow.instance(),
23+
"__temporal_otel_id_random",
24+
temporalio.workflow.new_random(),
25+
)
26+
return getattr(temporalio.workflow.instance(), "__temporal_otel_id_random")
27+
28+
return None
29+
30+
31+
class TemporalIdGenerator(IdGenerator):
32+
"""OpenTelemetry ID generator that uses Temporal's deterministic random generator.
33+
34+
.. warning::
35+
This class is experimental and may change in future versions.
36+
Use with caution in production environments.
37+
38+
This generator uses Temporal's workflow-safe random number generator when
39+
inside a workflow execution, ensuring deterministic span and trace IDs
40+
across workflow replays. Falls back to standard random generation outside
41+
of workflows.
42+
"""
43+
44+
def __init__(self, id_generator: IdGenerator):
45+
"""Initialize a TemporalIdGenerator."""
46+
self._id_generator = id_generator
47+
48+
def generate_span_id(self) -> int:
49+
"""Generate a span ID using Temporal's deterministic random when in workflow.
50+
51+
Returns:
52+
A 64-bit span ID.
53+
"""
54+
if workflow_random := _get_workflow_random():
55+
span_id = workflow_random.getrandbits(64)
56+
while span_id == INVALID_SPAN_ID:
57+
span_id = workflow_random.getrandbits(64)
58+
return span_id
59+
return self._id_generator.generate_span_id()
60+
61+
def generate_trace_id(self) -> int:
62+
"""Generate a trace ID using Temporal's deterministic random when in workflow.
63+
64+
Returns:
65+
A 128-bit trace ID.
66+
"""
67+
if workflow_random := _get_workflow_random():
68+
trace_id = workflow_random.getrandbits(128)
69+
while trace_id == INVALID_TRACE_ID:
70+
trace_id = workflow_random.getrandbits(128)
71+
return trace_id
72+
return self._id_generator.generate_trace_id()

temporalio/contrib/opentelemetry.py renamed to temporalio/contrib/opentelemetry/_interceptor.py

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -827,43 +827,3 @@ def _carrier_to_nexus_headers(
827827
else:
828828
out[k] = v
829829
return out
830-
831-
832-
class workflow:
833-
"""Contains static methods that are safe to call from within a workflow.
834-
835-
.. warning::
836-
Using any other ``opentelemetry`` API could cause non-determinism.
837-
"""
838-
839-
def __init__(self) -> None: # noqa: D107
840-
raise NotImplementedError
841-
842-
@staticmethod
843-
def completed_span(
844-
name: str,
845-
*,
846-
attributes: opentelemetry.util.types.Attributes = None,
847-
exception: Exception | None = None,
848-
) -> None:
849-
"""Create and end an OpenTelemetry span.
850-
851-
Note, this will only create and record when the workflow is not
852-
replaying and if there is a current span (meaning the client started a
853-
span and this interceptor is configured on the worker and the span is on
854-
the context).
855-
856-
There is currently no way to create a long-running span or to create a
857-
span that actually spans other code.
858-
859-
Args:
860-
name: Name of the span.
861-
attributes: Attributes to set on the span if any. Workflow ID and
862-
run ID are automatically added.
863-
exception: Optional exception to record on the span.
864-
"""
865-
interceptor = TracingWorkflowInboundInterceptor._from_context()
866-
if interceptor:
867-
interceptor._completed_span(
868-
name, additional_attributes=attributes, exception=exception
869-
)

0 commit comments

Comments
 (0)