Skip to content

Commit bd63236

Browse files
authored
💥 Standalone activity support (#1307)
1 parent 3b0b1f9 commit bd63236

17 files changed

Lines changed: 5525 additions & 1271 deletions

‎temporalio/activity.py‎

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -108,16 +108,25 @@ class Info:
108108
heartbeat_details: Sequence[Any]
109109
heartbeat_timeout: timedelta | None
110110
is_local: bool
111+
namespace: str
111112
schedule_to_close_timeout: timedelta | None
112113
scheduled_time: datetime
113114
start_to_close_timeout: timedelta | None
114115
started_time: datetime
115116
task_queue: str
116117
task_token: bytes
117-
workflow_id: str
118-
workflow_namespace: str
119-
workflow_run_id: str
120-
workflow_type: str
118+
workflow_id: str | None
119+
"""ID of the workflow. None if the activity was not started by a workflow."""
120+
workflow_namespace: str | None
121+
"""Namespace of the workflow. None if the activity was not started by a workflow.
122+
123+
.. deprecated::
124+
Use :py:attr:`namespace` instead.
125+
"""
126+
workflow_run_id: str | None
127+
"""Run ID of the workflow. None if the activity was not started by a workflow."""
128+
workflow_type: str | None
129+
"""Type of the workflow. None if the activity was not started by a workflow."""
121130
priority: temporalio.common.Priority
122131
retry_policy: temporalio.common.RetryPolicy | None
123132
"""The retry policy of this activity.
@@ -126,14 +135,22 @@ class Info:
126135
If the value is None, it means the server didn't send information about retry policy (e.g. due to old server
127136
version), but it may still be defined server-side."""
128137

138+
activity_run_id: str | None = None
139+
"""Run ID of this activity. None for workflow activities."""
140+
141+
@property
142+
def in_workflow(self) -> bool:
143+
"""Was this activity started by a workflow?"""
144+
return self.workflow_id is not None
145+
129146
# TODO(cretz): Consider putting identity on here for "worker_id" for logger?
130147

131148
def _logger_details(self) -> Mapping[str, Any]:
132149
return {
133150
"activity_id": self.activity_id,
134151
"activity_type": self.activity_type,
135152
"attempt": self.attempt,
136-
"namespace": self.workflow_namespace,
153+
"namespace": self.namespace,
137154
"task_queue": self.task_queue,
138155
"workflow_id": self.workflow_id,
139156
"workflow_run_id": self.workflow_run_id,
@@ -242,7 +259,7 @@ def metric_meter(self) -> temporalio.common.MetricMeter:
242259
info = self.info()
243260
self._metric_meter = self.runtime_metric_meter.with_additional_attributes(
244261
{
245-
"namespace": info.workflow_namespace,
262+
"namespace": info.namespace,
246263
"task_queue": info.task_queue,
247264
"activity_type": info.activity_type,
248265
}
@@ -551,6 +568,20 @@ def must_from_callable(fn: Callable) -> _Definition:
551568
f"Activity {fn_name} missing attributes, was it decorated with @activity.defn?"
552569
)
553570

571+
@classmethod
572+
def get_name_and_result_type(
573+
cls, name_or_run_fn: str | Callable[..., Any]
574+
) -> tuple[str, type | None]:
575+
if isinstance(name_or_run_fn, str):
576+
return name_or_run_fn, None
577+
elif callable(name_or_run_fn):
578+
defn = cls.must_from_callable(name_or_run_fn)
579+
if not defn.name:
580+
raise ValueError(f"Activity {name_or_run_fn} definition has no name")
581+
return defn.name, defn.ret_type
582+
else:
583+
raise TypeError("Activity must be a string or callable") # type:ignore[reportUnreachable]
584+
554585
@staticmethod
555586
def _apply_to_callable(
556587
fn: Callable,

0 commit comments

Comments
 (0)