Skip to content

Commit 5779a29

Browse files
tconley1428claude
andauthored
💥 Unify SimplePlugin interceptors into single parameter (#1328)
* 💥 Unify SimplePlugin interceptors into single parameter BREAKING CHANGE: SimplePlugin constructor now takes a single `interceptors` parameter instead of separate `client_interceptors` and `worker_interceptors` parameters. Changes: - Replace `client_interceptors` and `worker_interceptors` with unified `interceptors` parameter - Automatically separate client and worker interceptors based on type - Ensure exactly one instance of each interceptor per worker to prevent duplication - Update OpenTelemetryPlugin and OpenAIAgentsPlugin to use new parameter - Update all tests to use new unified parameter Migration guide for SimplePlugin users: - Before: `SimplePlugin("name", client_interceptors=[...], worker_interceptors=[...])` - After: `SimplePlugin("name", interceptors=[...])` 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * Update tests now that openai otel interceptor is on the client also * Only remove plugin provided duplicates * Remove callable form of interceptor from simple plugin. Usage is unclear and nonintuitive * Fix test flake --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent 0f84f8c commit 5779a29

6 files changed

Lines changed: 106 additions & 77 deletions

File tree

‎temporalio/contrib/openai_agents/_temporal_openai_agents.py‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ async def run_context() -> AsyncIterator[None]:
252252
super().__init__(
253253
name="OpenAIAgentsPlugin",
254254
data_converter=_data_converter,
255-
worker_interceptors=[OpenAIAgentsTracingInterceptor()],
255+
interceptors=[OpenAIAgentsTracingInterceptor()],
256256
activities=add_activities,
257257
workflow_runner=workflow_runner,
258258
workflow_failure_exception_types=[AgentsWorkflowError],

‎temporalio/contrib/openai_agents/_trace_interceptor.py‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,7 @@ async def handle_query(self, input: temporalio.worker.HandleQueryInput) -> Any:
347347
def handle_update_validator(
348348
self, input: temporalio.worker.HandleUpdateInput
349349
) -> None:
350+
_ensure_tracing_random()
350351
with context_from_header(
351352
"temporal:handleUpdateValidator",
352353
input,

‎temporalio/contrib/opentelemetry/_plugin.py‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,6 @@ def workflow_runner(runner: WorkflowRunner | None) -> WorkflowRunner:
4848

4949
super().__init__(
5050
"OpenTelemetryPlugin",
51-
client_interceptors=interceptors,
51+
interceptors=interceptors,
5252
workflow_runner=workflow_runner,
5353
)

‎temporalio/plugin.py‎

Lines changed: 51 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,14 @@ def __init__(
4242
name: str,
4343
*,
4444
data_converter: PluginParameter[temporalio.converter.DataConverter] = None,
45-
client_interceptors: PluginParameter[
46-
Sequence[temporalio.client.Interceptor]
47-
] = None,
45+
interceptors: Sequence[
46+
temporalio.client.Interceptor | temporalio.worker.Interceptor
47+
]
48+
| None = None,
4849
activities: PluginParameter[Sequence[Callable]] = None,
4950
nexus_service_handlers: PluginParameter[Sequence[Any]] = None,
5051
workflows: PluginParameter[Sequence[type]] = None,
5152
workflow_runner: PluginParameter[WorkflowRunner] = None,
52-
worker_interceptors: PluginParameter[
53-
Sequence[temporalio.worker.Interceptor]
54-
] = None,
5553
workflow_failure_exception_types: PluginParameter[
5654
Sequence[type[BaseException]]
5755
] = None,
@@ -66,9 +64,10 @@ def __init__(
6664
name: The name of the plugin.
6765
data_converter: Data converter for serialization, or callable to customize existing one.
6866
Applied to the Client and Replayer.
69-
client_interceptors: Client interceptors to append, or callable to customize existing ones.
70-
Applied to the Client. Note, if the provided interceptor is also a worker.Interceptor,
71-
it will be added to any worker which uses that client.
67+
interceptors: Interceptors to append.
68+
Client interceptors are applied to the Client, worker interceptors are applied
69+
to the Worker and Replayer. Interceptors that implement both interfaces will
70+
be applied to both, with exactly one instance used per worker to avoid duplication.
7271
activities: Activity functions to append, or callable to customize existing ones.
7372
Applied to the Worker.
7473
nexus_service_handlers: Nexus service handlers to append, or callable to customize existing ones.
@@ -77,8 +76,6 @@ def __init__(
7776
Applied to the Worker and Replayer.
7877
workflow_runner: Workflow runner, or callable to customize existing one.
7978
Applied to the Worker and Replayer.
80-
worker_interceptors: Worker interceptors to append, or callable to customize existing ones.
81-
Applied to the Worker and Replayer.
8279
workflow_failure_exception_types: Exception types for workflow failures to append,
8380
or callable to customize existing ones. Applied to the Worker and Replayer.
8481
run_context: A place to run custom code to wrap around the Worker (or Replayer) execution.
@@ -89,12 +86,11 @@ def __init__(
8986
"""
9087
self._name = name
9188
self.data_converter = data_converter
92-
self.client_interceptors = client_interceptors
89+
self.interceptors = interceptors
9390
self.activities = activities
9491
self.nexus_service_handlers = nexus_service_handlers
9592
self.workflows = workflows
9693
self.workflow_runner = workflow_runner
97-
self.worker_interceptors = worker_interceptors
9894
self.workflow_failure_exception_types = workflow_failure_exception_types
9995
self.run_context = run_context
10096

@@ -110,11 +106,22 @@ def configure_client(self, config: ClientConfig) -> ClientConfig:
110106
if data_converter:
111107
config["data_converter"] = data_converter
112108

113-
interceptors = _resolve_append_parameter(
114-
config.get("interceptors"), self.client_interceptors
109+
# Resolve the combined interceptors first, then filter to client ones
110+
all_interceptors = _resolve_append_parameter(
111+
cast(
112+
Sequence[temporalio.client.Interceptor | temporalio.worker.Interceptor]
113+
| None,
114+
config.get("interceptors"),
115+
),
116+
self.interceptors,
115117
)
116-
if interceptors is not None:
117-
config["interceptors"] = interceptors
118+
if all_interceptors is not None:
119+
client_interceptors = [
120+
interceptor
121+
for interceptor in all_interceptors
122+
if isinstance(interceptor, temporalio.client.Interceptor)
123+
]
124+
config["interceptors"] = client_interceptors
118125

119126
return config
120127

@@ -150,36 +157,24 @@ def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
150157
if workflow_runner:
151158
config["workflow_runner"] = workflow_runner
152159

153-
interceptors = list(
154-
_resolve_append_parameter(
155-
config.get("interceptors"), self.worker_interceptors
160+
if self.interceptors is not None:
161+
client_interceptors_list = (
162+
config["client"].config(active_config=True).get("interceptors", []) # type:ignore[reportTypedDictNotRequiredAccess]
156163
)
157-
or []
158-
)
159164

160-
# Only propagate client interceptors if they are provided as a simple list (not callable)
161-
if self.client_interceptors is not None and not callable(
162-
self.client_interceptors
163-
):
164-
client_worker_interceptors = [
165+
# Exclude any already registered interceptors and client only interceptors
166+
worker_interceptors = [
165167
interceptor
166-
for interceptor in self.client_interceptors
168+
for interceptor in self.interceptors
167169
if isinstance(interceptor, temporalio.worker.Interceptor)
170+
and interceptor not in client_interceptors_list
168171
]
169-
for interceptor in client_worker_interceptors:
170-
if interceptor not in interceptors:
171-
# Check if interceptor is already in client's interceptors to avoid duplication
172-
client_config = config.get("client")
173-
if client_config is not None:
174-
client_interceptors_list = client_config.config(
175-
active_config=True
176-
).get("interceptors", [])
177-
if interceptor not in client_interceptors_list:
178-
interceptors.append(interceptor)
179-
else:
180-
interceptors.append(interceptor)
181-
182-
config["interceptors"] = interceptors
172+
173+
provided_interceptors = _resolve_append_parameter(
174+
config.get("interceptors"), worker_interceptors
175+
)
176+
if provided_interceptors is not None:
177+
config["interceptors"] = provided_interceptors
183178

184179
failure_exception_types = _resolve_append_parameter(
185180
config.get("workflow_failure_exception_types"),
@@ -208,11 +203,21 @@ def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
208203
if workflow_runner:
209204
config["workflow_runner"] = workflow_runner
210205

211-
interceptors = _resolve_append_parameter(
212-
config.get("interceptors"), self.worker_interceptors
206+
all_interceptors = _resolve_append_parameter(
207+
cast(
208+
Sequence[temporalio.client.Interceptor | temporalio.worker.Interceptor]
209+
| None,
210+
config.get("interceptors"),
211+
),
212+
self.interceptors,
213213
)
214-
if interceptors is not None:
215-
config["interceptors"] = interceptors
214+
if all_interceptors is not None:
215+
worker_interceptors = [
216+
interceptor
217+
for interceptor in all_interceptors
218+
if isinstance(interceptor, temporalio.worker.Interceptor)
219+
]
220+
config["interceptors"] = worker_interceptors
216221

217222
failure_exception_types = _resolve_append_parameter(
218223
config.get("workflow_failure_exception_types"),

‎tests/contrib/openai_agents/test_openai_tracing.py‎

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,10 @@ async def test_tracing(client: Client):
6161
execution_timeout=timedelta(seconds=120),
6262
)
6363
await workflow_handle.result()
64+
print("\n".join([str({"name": t.name}) for t, _ in processor.trace_events]))
6465

65-
# There is one closed root trace
66-
assert len(processor.trace_events) == 2
66+
# There are two traces, one is created in the client because it is needed to start the temporal spans
67+
assert len(processor.trace_events) == 4
6768
assert (
6869
processor.trace_events[0][0].trace_id
6970
== processor.trace_events[1][0].trace_id
@@ -76,25 +77,48 @@ def paired_span(a: tuple[Span[Any], bool], b: tuple[Span[Any], bool]) -> None:
7677
assert a[1]
7778
assert not b[1]
7879

80+
print(
81+
"\n".join(
82+
[
83+
str({"id": t.span_id, "data": t.span_data.export()})
84+
for t, _ in processor.span_events
85+
]
86+
)
87+
)
88+
89+
# Start workflow traces
90+
paired_span(processor.span_events[0], processor.span_events[1])
91+
assert (
92+
processor.span_events[0][0].span_data.export().get("name")
93+
== "temporal:startWorkflow:ResearchWorkflow"
94+
)
95+
96+
# Execute workflow
97+
paired_span(processor.span_events[2], processor.span_events[-1])
98+
assert (
99+
processor.span_events[2][0].span_data.export().get("name")
100+
== "temporal:executeWorkflow"
101+
)
102+
79103
# Initial planner spans - There are only 3 because we don't make an actual model call
80-
paired_span(processor.span_events[0], processor.span_events[5])
104+
paired_span(processor.span_events[3], processor.span_events[8])
81105
assert (
82-
processor.span_events[0][0].span_data.export().get("name") == "PlannerAgent"
106+
processor.span_events[3][0].span_data.export().get("name") == "PlannerAgent"
83107
)
84108

85-
paired_span(processor.span_events[1], processor.span_events[4])
109+
paired_span(processor.span_events[4], processor.span_events[7])
86110
assert (
87-
processor.span_events[1][0].span_data.export().get("name")
111+
processor.span_events[4][0].span_data.export().get("name")
88112
== "temporal:startActivity"
89113
)
90114

91-
paired_span(processor.span_events[2], processor.span_events[3])
115+
paired_span(processor.span_events[5], processor.span_events[6])
92116
assert (
93-
processor.span_events[2][0].span_data.export().get("name")
117+
processor.span_events[5][0].span_data.export().get("name")
94118
== "temporal:executeActivity"
95119
)
96120

97-
for span, start in processor.span_events[6:-6]:
121+
for span, start in processor.span_events[9:-7]:
98122
span_data = span.span_data.export()
99123

100124
# All spans should be closed
@@ -126,19 +150,19 @@ def paired_span(a: tuple[Span[Any], bool], b: tuple[Span[Any], bool]) -> None:
126150
)
127151

128152
# Final writer spans - There are only 3 because we don't make an actual model call
129-
paired_span(processor.span_events[-6], processor.span_events[-1])
153+
paired_span(processor.span_events[-7], processor.span_events[-2])
130154
assert (
131-
processor.span_events[-6][0].span_data.export().get("name") == "WriterAgent"
155+
processor.span_events[-7][0].span_data.export().get("name") == "WriterAgent"
132156
)
133157

134-
paired_span(processor.span_events[-5], processor.span_events[-2])
158+
paired_span(processor.span_events[-6], processor.span_events[-3])
135159
assert (
136-
processor.span_events[-5][0].span_data.export().get("name")
160+
processor.span_events[-6][0].span_data.export().get("name")
137161
== "temporal:startActivity"
138162
)
139163

140-
paired_span(processor.span_events[-4], processor.span_events[-3])
164+
paired_span(processor.span_events[-5], processor.span_events[-4])
141165
assert (
142-
processor.span_events[-4][0].span_data.export().get("name")
166+
processor.span_events[-5][0].span_data.export().get("name")
143167
== "temporal:executeActivity"
144168
)

‎tests/test_plugins.py‎

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -436,15 +436,15 @@ async def test_simple_plugin_worker_interceptor_only_used_on_worker(
436436
client: Client,
437437
) -> None:
438438
"""Test that when a combined client/worker interceptor is provided by SimplePlugin
439-
to client_interceptors, and the plugin is only used on a worker (not on the client
439+
to interceptors, and the plugin is only used on a worker (not on the client
440440
used to create that worker), the worker interceptor functionality is still provided."""
441441

442442
interceptor = CombinedClientWorkerInterceptor()
443443

444-
# Create SimplePlugin that provides the combined interceptor as client_interceptors
444+
# Create SimplePlugin that provides the combined interceptor
445445
plugin = SimplePlugin(
446446
"TestCombinedPlugin",
447-
client_interceptors=[interceptor],
447+
interceptors=[interceptor],
448448
)
449449

450450
# Create worker with the plugin (but don't add plugin to client)
@@ -468,23 +468,23 @@ async def test_simple_plugin_worker_interceptor_only_used_on_worker(
468468
), "Client interceptor should not have been used"
469469

470470
# The interceptor SHOULD have been used for worker interception
471-
# even though it was specified in client_interceptors
471+
# even though it was specified in interceptors
472472
assert interceptor.worker_intercepted, "Worker interceptor should have been used"
473473

474474

475475
async def test_simple_plugin_interceptor_duplication_when_used_on_client_and_worker(
476476
client: Client,
477477
) -> None:
478478
"""Test that when a combined client/worker interceptor is provided by SimplePlugin
479-
to client_interceptors, and the plugin is used on both client and worker,
479+
to interceptors, and the plugin is used on both client and worker,
480480
the interceptor is not duplicated in the worker."""
481481

482482
interceptor = CombinedClientWorkerInterceptor()
483483

484-
# Create SimplePlugin that provides the combined interceptor as client_interceptors
484+
# Create SimplePlugin that provides the combined interceptor
485485
plugin = SimplePlugin(
486486
"TestCombinedPlugin",
487-
client_interceptors=[interceptor],
487+
interceptors=[interceptor],
488488
)
489489

490490
# Add plugin to client first
@@ -535,16 +535,15 @@ async def test_simple_plugin_interceptor_duplication_when_used_on_client_and_wor
535535
async def test_simple_plugin_no_duplication_when_interceptor_in_both_client_and_worker_params(
536536
client: Client,
537537
) -> None:
538-
"""Test that when the same interceptor is provided to both client_interceptors
539-
and worker_interceptors in a SimplePlugin, it doesn't get duplicated."""
538+
"""Test that when the same interceptor is provided to the unified interceptors
539+
parameter in a SimplePlugin, it doesn't get duplicated."""
540540

541541
interceptor = CombinedClientWorkerInterceptor()
542542

543-
# Create SimplePlugin that provides the same interceptor to both client and worker
543+
# Create SimplePlugin that provides the interceptor once to the unified parameter
544544
plugin = SimplePlugin(
545545
"TestCombinedPlugin",
546-
client_interceptors=[interceptor],
547-
worker_interceptors=[interceptor], # Same interceptor in both places
546+
interceptors=[interceptor], # Single unified parameter
548547
)
549548

550549
# Create worker with plugin (not on client)
@@ -585,10 +584,10 @@ async def test_simple_plugin_no_duplication_in_interceptor_chain(
585584

586585
interceptor = CombinedClientWorkerInterceptor()
587586

588-
# Create SimplePlugin that provides the combined interceptor as client_interceptors only
587+
# Create SimplePlugin that provides the combined interceptor
589588
plugin = SimplePlugin(
590589
"CountingPlugin",
591-
client_interceptors=[interceptor],
590+
interceptors=[interceptor],
592591
)
593592

594593
# Add plugin to client (like OpenTelemetryPlugin does)

0 commit comments

Comments
 (0)