Skip to content

Commit f0f0f4b

Browse files
committed
add timeskipping to executeUserTimerTimeoutTask time check
1 parent 7e31ae7 commit f0f0f4b

5 files changed

Lines changed: 260 additions & 6 deletions

File tree

proto/internal/temporal/server/api/persistence/v1/executions.proto

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -328,8 +328,6 @@ message TimeSkippingInfo {
328328
google.protobuf.Duration accumulated_skipped_duration = 2;
329329
}
330330

331-
332-
333331
// Internal wrapper message to distinguish "never notified" (nil wrapper) from
334332
// "notified about an unversioned target" (non-nil wrapper with nil deployment_version).
335333
// Used only within server persistence; never flows to the public API.

service/history/timer_queue_active_task_executor.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,9 @@ func (t *timerQueueActiveTaskExecutor) executeUserTimerTimeoutTask(
158158

159159
timerSequence := t.getTimerSequence(mutableState)
160160
referenceTime := t.Now()
161+
if tsInfo := mutableState.GetExecutionInfo().GetTimeSkippingInfo(); tsInfo != nil && tsInfo.GetAccumulatedSkippedDuration() != nil {
162+
referenceTime = mutableState.GetTimeSkippingVirtualTime()
163+
}
161164
timerFired := false
162165
Loop:
163166
for _, timerSequenceID := range timerSequence.LoadAndSortUserTimers() {

service/history/workflow/mutable_state_impl.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4054,6 +4054,7 @@ func (ms *MutableStateImpl) ApplyWorkflowExecutionTimeSkippingTransitionedEvent(
40544054
}
40554055

40564056
func (ms *MutableStateImpl) GetTimeSkippingVirtualTime() time.Time {
4057+
// TODO@time-skipping: need to use this to adjust all timestamps for tasks sent to SDK worker.
40574058
offset := ms.GetExecutionInfo().TimeSkippingInfo.AccumulatedSkippedDuration
40584059
return ms.timeSource.Now().Add(offset.AsDuration())
40594060
}

service/history/workflow/mutable_state_impl_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6584,10 +6584,10 @@ func (s *mutableStateSuite) TestCloseTransactionTimeSkipping() {
65846584
buildEligibleState := func(timerExpiry time.Time) *persistencespb.WorkflowMutableState {
65856585
return &persistencespb.WorkflowMutableState{
65866586
ExecutionInfo: &persistencespb.WorkflowExecutionInfo{
6587-
NamespaceId: s.namespaceEntry.ID().String(),
6588-
WorkflowId: tests.WorkflowID,
6589-
TaskQueue: "testTaskQueue",
6590-
WorkflowTypeName: "testWorkflowType",
6587+
NamespaceId: s.namespaceEntry.ID().String(),
6588+
WorkflowId: tests.WorkflowID,
6589+
TaskQueue: "testTaskQueue",
6590+
WorkflowTypeName: "testWorkflowType",
65916591
WorkflowExecutionTimerTaskStatus: TimerTaskStatusCreated,
65926592
TimeSkippingInfo: &persistencespb.TimeSkippingInfo{
65936593
Config: &workflowpb.TimeSkippingConfig{Enabled: true},

tests/timeskipping_test.go

Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ import (
55
"time"
66

77
"github.com/google/uuid"
8+
commandpb "go.temporal.io/api/command/v1"
89
commonpb "go.temporal.io/api/common/v1"
910
enumspb "go.temporal.io/api/enums/v1"
11+
historypb "go.temporal.io/api/history/v1"
1012
taskqueuepb "go.temporal.io/api/taskqueue/v1"
1113
updatepb "go.temporal.io/api/update/v1"
1214
workflowpb "go.temporal.io/api/workflow/v1"
@@ -16,6 +18,7 @@ import (
1618
"go.temporal.io/server/common/dynamicconfig"
1719
"go.temporal.io/server/common/persistence"
1820
"go.temporal.io/server/common/testing/parallelsuite"
21+
"go.temporal.io/server/common/testing/taskpoller"
1922
"go.temporal.io/server/common/testing/testvars"
2023
"go.temporal.io/server/tests/testcore"
2124
"google.golang.org/protobuf/proto"
@@ -174,6 +177,255 @@ func (s *TimeSkippingTestSuite) TestTimeSkipping_ExecuteMultiOperation_DCEnabled
174177
s.True(proto.Equal(inputConfig, ms.State.ExecutionInfo.GetTimeSkippingInfo().GetConfig()))
175178
}
176179

180+
// startTimeSkippingWorkflow is a small helper that starts a workflow with time-skipping
181+
// enabled and returns the run ID.
182+
func (s *TimeSkippingTestSuite) startTimeSkippingWorkflow(env *testcore.TestEnv, tv *testvars.TestVars) string {
183+
resp, err := env.FrontendClient().StartWorkflowExecution(testcore.NewContext(), &workflowservice.StartWorkflowExecutionRequest{
184+
RequestId: uuid.NewString(),
185+
Namespace: env.Namespace().String(),
186+
WorkflowId: tv.WorkflowID(),
187+
WorkflowType: tv.WorkflowType(),
188+
TaskQueue: tv.TaskQueue(),
189+
WorkflowRunTimeout: durationpb.New(300 * time.Second),
190+
WorkflowTaskTimeout: durationpb.New(10 * time.Second),
191+
TimeSkippingConfig: &workflowpb.TimeSkippingConfig{Enabled: true},
192+
})
193+
s.NoError(err)
194+
return resp.GetRunId()
195+
}
196+
197+
// scheduleActivityCmd returns a ScheduleActivityTask command that uses tv for all names /
198+
// queue / timeout values.
199+
func scheduleActivityCmd(tv *testvars.TestVars) *commandpb.Command {
200+
return &commandpb.Command{
201+
CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
202+
Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{
203+
ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{
204+
ActivityId: tv.ActivityID(),
205+
ActivityType: tv.ActivityType(),
206+
TaskQueue: tv.TaskQueue(),
207+
ScheduleToCloseTimeout: durationpb.New(30 * time.Second),
208+
},
209+
},
210+
}
211+
}
212+
213+
// startTimerCmd returns a StartTimer command with the given duration and timer ID.
214+
func startTimerCmd(timerID string, d time.Duration) *commandpb.Command {
215+
return &commandpb.Command{
216+
CommandType: enumspb.COMMAND_TYPE_START_TIMER,
217+
Attributes: &commandpb.Command_StartTimerCommandAttributes{
218+
StartTimerCommandAttributes: &commandpb.StartTimerCommandAttributes{
219+
TimerId: timerID,
220+
StartToFireTimeout: durationpb.New(d),
221+
},
222+
},
223+
}
224+
}
225+
226+
// completeWorkflowCmd returns a CompleteWorkflowExecution command.
227+
func completeWorkflowCmd() *commandpb.Command {
228+
return &commandpb.Command{
229+
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
230+
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{
231+
CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{},
232+
},
233+
}
234+
}
235+
236+
// hasEventType returns true if any event in the slice has the given type.
237+
func hasEventType(events []*historypb.HistoryEvent, t enumspb.EventType) bool {
238+
for _, e := range events {
239+
if e.GetEventType() == t {
240+
return true
241+
}
242+
}
243+
return false
244+
}
245+
246+
// TestTimeSkipping_ActivityOnly verifies that a workflow with time-skipping enabled but no
247+
// user timer runs to completion normally (time-skipping never triggers because
248+
// ShouldExecuteTimeSkipping requires a pending timer).
249+
func (s *TimeSkippingTestSuite) TestTimeSkipping_ActivityOnly() {
250+
env := testcore.NewEnv(s.T())
251+
env.OverrideDynamicConfig(dynamicconfig.TimeSkippingEnabled, true)
252+
tv := testvars.New(s.T())
253+
254+
runID := s.startTimeSkippingWorkflow(env, tv)
255+
poller := taskpoller.New(s.T(), env.FrontendClient(), env.Namespace().String())
256+
257+
// WT 1: schedule the activity.
258+
_, err := poller.PollAndHandleWorkflowTask(tv, func(_ *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
259+
return &workflowservice.RespondWorkflowTaskCompletedRequest{
260+
Commands: []*commandpb.Command{scheduleActivityCmd(tv)},
261+
}, nil
262+
})
263+
s.NoError(err)
264+
265+
// Activity: complete it immediately.
266+
_, err = poller.PollAndHandleActivityTask(tv, taskpoller.CompleteActivityTask(tv))
267+
s.NoError(err)
268+
269+
// WT 2: activity has completed; complete the workflow.
270+
_, err = poller.PollAndHandleWorkflowTask(tv, func(_ *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
271+
return &workflowservice.RespondWorkflowTaskCompletedRequest{
272+
Commands: []*commandpb.Command{completeWorkflowCmd()},
273+
}, nil
274+
})
275+
s.NoError(err)
276+
277+
// Verify: workflow completed; no time-skipping transitioned event (no timer was ever started).
278+
history := env.GetHistory(env.Namespace().String(), &commonpb.WorkflowExecution{WorkflowId: tv.WorkflowID(), RunId: runID})
279+
s.True(hasEventType(history, enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED), "workflow must complete")
280+
s.False(hasEventType(history, enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIME_SKIPPING_TRANSITIONED),
281+
"no time-skipping event expected when there is never a pending user timer")
282+
}
283+
284+
// TestTimeSkipping_TimerAndActivity verifies that when a workflow has both a long user
285+
// timer and a pending activity, time-skipping is blocked until the activity completes.
286+
// Once the activity is done and the workflow task is drained, time-skipping fires and
287+
// moves the timer's visibility timestamp to near-now, so the timer fires quickly.
288+
//
289+
// Sequence:
290+
//
291+
// WT1 → schedule activity + start 1-hour timer
292+
// AT1 → complete activity
293+
// WT2 → drain (return no commands; triggers time-skipping on close)
294+
// WT3 → complete workflow (timer has fired)
295+
func (s *TimeSkippingTestSuite) TestTimeSkipping_TimerAndActivity() {
296+
env := testcore.NewEnv(s.T())
297+
env.OverrideDynamicConfig(dynamicconfig.TimeSkippingEnabled, true)
298+
tv := testvars.New(s.T())
299+
300+
runID := s.startTimeSkippingWorkflow(env, tv)
301+
poller := taskpoller.New(s.T(), env.FrontendClient(), env.Namespace().String())
302+
303+
// WT 1: simultaneously schedule an activity and start a 1-hour timer.
304+
// Time-skipping cannot fire while both are pending.
305+
_, err := poller.PollAndHandleWorkflowTask(tv, func(_ *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
306+
return &workflowservice.RespondWorkflowTaskCompletedRequest{
307+
Commands: []*commandpb.Command{
308+
scheduleActivityCmd(tv),
309+
startTimerCmd("timer-1", time.Hour),
310+
},
311+
}, nil
312+
})
313+
s.NoError(err)
314+
315+
// Activity: complete it. After this, only the 1-hour timer is pending.
316+
_, err = poller.PollAndHandleActivityTask(tv, taskpoller.CompleteActivityTask(tv))
317+
s.NoError(err)
318+
319+
// WT 2: drain (return no commands). closeTransaction fires time-skipping here because
320+
// the workflow is now idle with a pending timer → regenerates the timer task at near-now.
321+
_, err = poller.PollAndHandleWorkflowTask(tv, taskpoller.DrainWorkflowTask)
322+
s.NoError(err)
323+
324+
// WT 3: timer has fired (due to time-skipping); complete the workflow.
325+
_, err = poller.PollAndHandleWorkflowTask(tv, func(_ *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
326+
return &workflowservice.RespondWorkflowTaskCompletedRequest{
327+
Commands: []*commandpb.Command{completeWorkflowCmd()},
328+
}, nil
329+
})
330+
s.NoError(err)
331+
332+
// Verify history.
333+
history := env.GetHistory(env.Namespace().String(), &commonpb.WorkflowExecution{WorkflowId: tv.WorkflowID(), RunId: runID})
334+
s.True(hasEventType(history, enumspb.EVENT_TYPE_TIMER_FIRED), "timer must have fired via time-skipping")
335+
s.True(hasEventType(history, enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIME_SKIPPING_TRANSITIONED),
336+
"time-skipping transitioned event expected")
337+
s.True(hasEventType(history, enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED), "workflow must complete")
338+
}
339+
340+
// TestTimeSkipping_ActivityTimerActivityTimer exercises two full time-skipping cycles:
341+
//
342+
// activity → timer → activity → timer → complete
343+
//
344+
// Each timer is set to 1 hour; time-skipping moves both to fire immediately so the
345+
// workflow completes in seconds rather than hours.
346+
//
347+
// Sequence:
348+
//
349+
// WT1 → schedule activity 1
350+
// AT1 → complete activity 1
351+
// WT2 → start 1-hour timer 1 (time-skipping fires on close)
352+
// WT3 → schedule activity 2 (timer 1 has fired)
353+
// AT2 → complete activity 2
354+
// WT4 → start 1-hour timer 2 (time-skipping fires on close)
355+
// WT5 → complete workflow (timer 2 has fired)
356+
func (s *TimeSkippingTestSuite) TestTimeSkipping_ActivityTimerActivityTimer() {
357+
env := testcore.NewEnv(s.T())
358+
env.OverrideDynamicConfig(dynamicconfig.TimeSkippingEnabled, true)
359+
tv := testvars.New(s.T())
360+
361+
runID := s.startTimeSkippingWorkflow(env, tv)
362+
poller := taskpoller.New(s.T(), env.FrontendClient(), env.Namespace().String())
363+
364+
// WT 1: schedule first activity.
365+
_, err := poller.PollAndHandleWorkflowTask(tv, func(_ *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
366+
return &workflowservice.RespondWorkflowTaskCompletedRequest{
367+
Commands: []*commandpb.Command{scheduleActivityCmd(tv.WithActivityIDNumber(1))},
368+
}, nil
369+
})
370+
s.NoError(err)
371+
372+
// AT 1: complete first activity.
373+
_, err = poller.PollAndHandleActivityTask(tv.WithActivityIDNumber(1), taskpoller.CompleteActivityTask(tv))
374+
s.NoError(err)
375+
376+
// WT 2: start a 1-hour timer. No pending activity → time-skipping fires on closeTransaction.
377+
_, err = poller.PollAndHandleWorkflowTask(tv, func(_ *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
378+
return &workflowservice.RespondWorkflowTaskCompletedRequest{
379+
Commands: []*commandpb.Command{startTimerCmd("timer-1", time.Hour)},
380+
}, nil
381+
})
382+
s.NoError(err)
383+
384+
// WT 3: timer 1 has fired; schedule second activity.
385+
_, err = poller.PollAndHandleWorkflowTask(tv, func(_ *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
386+
return &workflowservice.RespondWorkflowTaskCompletedRequest{
387+
Commands: []*commandpb.Command{scheduleActivityCmd(tv.WithActivityIDNumber(2))},
388+
}, nil
389+
})
390+
s.NoError(err)
391+
392+
// AT 2: complete second activity.
393+
_, err = poller.PollAndHandleActivityTask(tv.WithActivityIDNumber(2), taskpoller.CompleteActivityTask(tv))
394+
s.NoError(err)
395+
396+
// WT 4: start a second 1-hour timer. No pending activity → time-skipping fires on closeTransaction.
397+
_, err = poller.PollAndHandleWorkflowTask(tv, func(_ *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
398+
return &workflowservice.RespondWorkflowTaskCompletedRequest{
399+
Commands: []*commandpb.Command{startTimerCmd("timer-2", time.Hour)},
400+
}, nil
401+
})
402+
s.NoError(err)
403+
404+
// WT 5: timer 2 has fired; complete the workflow.
405+
_, err = poller.PollAndHandleWorkflowTask(tv, func(_ *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
406+
return &workflowservice.RespondWorkflowTaskCompletedRequest{
407+
Commands: []*commandpb.Command{completeWorkflowCmd()},
408+
}, nil
409+
})
410+
s.NoError(err)
411+
412+
// Verify history: two timer-fired events and two time-skipping events.
413+
history := env.GetHistory(env.Namespace().String(), &commonpb.WorkflowExecution{WorkflowId: tv.WorkflowID(), RunId: runID})
414+
415+
var timerFiredCount, timeSkippingCount int
416+
for _, e := range history {
417+
switch e.GetEventType() {
418+
case enumspb.EVENT_TYPE_TIMER_FIRED:
419+
timerFiredCount++
420+
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIME_SKIPPING_TRANSITIONED:
421+
timeSkippingCount++
422+
}
423+
}
424+
s.Equal(2, timerFiredCount, "both timers must fire via time-skipping")
425+
s.Equal(2, timeSkippingCount, "two time-skipping transitioned events expected")
426+
s.True(hasEventType(history, enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED), "workflow must complete")
427+
}
428+
177429
func (s *TimeSkippingTestSuite) getMutableState(env *testcore.TestEnv, workflowID, runID string) *persistence.GetWorkflowExecutionResponse {
178430
shardID := common.WorkflowIDToHistoryShard(
179431
env.NamespaceID().String(),

0 commit comments

Comments
 (0)