-
Notifications
You must be signed in to change notification settings - Fork 102
Expand file tree
/
Copy pathhello_cancellation.py
More file actions
102 lines (86 loc) · 3.28 KB
/
hello_cancellation.py
File metadata and controls
102 lines (86 loc) · 3.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import timedelta
from typing import NoReturn
from temporalio import activity, workflow
from temporalio.client import Client, WorkflowFailureError
from temporalio.exceptions import ActivityError, CancelledError
from temporalio.worker import Worker
@activity.defn
def never_complete_activity() -> NoReturn:
# All long-running activities should heartbeat. Heartbeat is how
# cancellation is delivered from the server.
try:
while True:
print("Heartbeating activity")
activity.heartbeat()
time.sleep(1)
except CancelledError as err:
print(
f"Got expected exception in activity. Cause chain is {format_exception_cause_chain(err)}"
)
raise
@activity.defn
def cleanup_activity() -> None:
print("Executing cleanup activity")
@workflow.defn
class CancellationWorkflow:
@workflow.run
async def run(self) -> None:
# Execute the forever running activity, and do a cleanup activity when
# it is complete (on error or cancel)
try:
await workflow.execute_activity(
never_complete_activity,
start_to_close_timeout=timedelta(seconds=1000),
# Always set a heartbeat timeout for long-running activities
heartbeat_timeout=timedelta(seconds=2),
)
except ActivityError as err:
print(
f"Got expected exception in workflow. Cause chain is {format_exception_cause_chain(err)}"
)
raise
finally:
await workflow.execute_activity(
cleanup_activity, start_to_close_timeout=timedelta(seconds=5)
)
async def main():
# Start client
client = await Client.connect("localhost:7233")
# Run a worker for the workflow
async with Worker(
client,
task_queue="hello-cancellation-task-queue",
workflows=[CancellationWorkflow],
activities=[never_complete_activity, cleanup_activity],
activity_executor=ThreadPoolExecutor(5),
):
# While the worker is running, use the client to start the workflow.
# Note, in many production setups, the client would be in a completely
# separate process from the worker.
handle = await client.start_workflow(
CancellationWorkflow.run,
id="hello-cancellation-workflow-id",
task_queue="hello-cancellation-task-queue",
)
# Now that we've started, wait a couple of seconds then cancel it
await asyncio.sleep(2)
await handle.cancel()
# Now wait on the result which we expect will fail since it was
# cancelled
try:
await handle.result()
raise RuntimeError("Should not succeed")
except WorkflowFailureError as err:
print(
f"Got expected exception in client. Cause chain is {format_exception_cause_chain(err)}"
)
def format_exception_cause_chain(err: BaseException) -> str:
causes = [err]
while cause := causes[-1].__cause__:
causes.append(cause)
return " -> ".join([f"{e.__class__.__name__}" for e in causes])
if __name__ == "__main__":
asyncio.run(main())