Skip to content

Commit 5e70e98

Browse files
committed
add tests for closeTransaction and fix bugs
1 parent 79f7257 commit 5e70e98

8 files changed

Lines changed: 300 additions & 23 deletions

service/history/historybuilder/event_factory.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1081,15 +1081,15 @@ func (b *EventFactory) createHistoryEvent(
10811081
}
10821082

10831083
func (b *EventFactory) CreateWorkflowExecutionTimeSkippingTransitionedEvent(
1084-
targetTime *time.Time,
1084+
targetTime time.Time,
10851085
triggeredDisable bool,
10861086
) *historypb.HistoryEvent {
10871087
event := b.createHistoryEvent(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIME_SKIPPING_TRANSITIONED, b.timeSource.Now())
10881088
transitionedAttr := &historypb.WorkflowExecutionTimeSkippingTransitionedEventAttributes{
10891089
WallClockTime: timestamppb.New(b.timeSource.Now()),
10901090
DisabledAfterBound: triggeredDisable,
10911091
}
1092-
if targetTime != nil {
1092+
if !targetTime.IsZero() {
10931093
transitionedAttr.TargetTime = timestamppb.New(targetTime.UTC())
10941094
}
10951095
event.Attributes = &historypb.HistoryEvent_WorkflowExecutionTimeSkippingTransitionedEventAttributes{

service/history/historybuilder/history_builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ func (b *HistoryBuilder) AddActivityTaskScheduledEvent(
323323
}
324324

325325
func (b *HistoryBuilder) AddWorkflowExecutionTimeSkippingTransitionedEvent(
326-
targetTime *time.Time,
326+
targetTime time.Time,
327327
triggeredDisable bool,
328328
) *historypb.HistoryEvent {
329329
event := b.CreateWorkflowExecutionTimeSkippingTransitionedEvent(targetTime, triggeredDisable)

service/history/historybuilder/history_builder_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2580,7 +2580,7 @@ func (s *historyBuilderSuite) TestAddWorkflowExecutionTimeSkippingTransitionedEv
25802580
targetTime := s.now.Add(2 * time.Hour)
25812581

25822582
s.Run("WorkerMayIgnoreAndAttributesPopulated", func() {
2583-
event := s.historyBuilder.AddWorkflowExecutionTimeSkippingTransitionedEvent(&targetTime, false)
2583+
event := s.historyBuilder.AddWorkflowExecutionTimeSkippingTransitionedEvent(targetTime, false)
25842584

25852585
s.NotNil(event)
25862586
s.True(event.WorkerMayIgnore)
@@ -2595,7 +2595,7 @@ func (s *historyBuilderSuite) TestAddWorkflowExecutionTimeSkippingTransitionedEv
25952595
})
25962596

25972597
s.Run("DisabledAfterBoundPropagated", func() {
2598-
event := s.historyBuilder.AddWorkflowExecutionTimeSkippingTransitionedEvent(&targetTime, true)
2598+
event := s.historyBuilder.AddWorkflowExecutionTimeSkippingTransitionedEvent(targetTime, true)
25992599

26002600
attrs := event.GetWorkflowExecutionTimeSkippingTransitionedEventAttributes()
26012601
s.NotNil(attrs)
@@ -2605,7 +2605,7 @@ func (s *historyBuilderSuite) TestAddWorkflowExecutionTimeSkippingTransitionedEv
26052605

26062606
// no targetTime and only disabledAfterBound is true
26072607
s.Run("NoTargetTimeAndOnlyDisabledAfterBoundIsTrue", func() {
2608-
event := s.historyBuilder.AddWorkflowExecutionTimeSkippingTransitionedEvent(nil, true)
2608+
event := s.historyBuilder.AddWorkflowExecutionTimeSkippingTransitionedEvent(time.Time{}, true)
26092609

26102610
attrs := event.GetWorkflowExecutionTimeSkippingTransitionedEventAttributes()
26112611
s.NotNil(attrs)

service/history/timer_queue_active_task_executor.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,6 @@ func (t *timerQueueActiveTaskExecutor) executeUserTimerTimeoutTask(
158158

159159
timerSequence := t.getTimerSequence(mutableState)
160160
referenceTime := t.Now()
161-
if tsInfo := mutableState.GetExecutionInfo().GetTimeSkippingInfo(); tsInfo != nil && tsInfo.GetAccumulatedSkippedDuration() != nil {
162-
referenceTime = mutableState.GetTimeSkippingVirtualTime()
163-
}
164161
timerFired := false
165162
Loop:
166163
for _, timerSequenceID := range timerSequence.LoadAndSortUserTimers() {

service/history/workflow/mutable_state_impl.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4032,19 +4032,11 @@ func (ms *MutableStateImpl) AddWorkflowExecutionTimeSkippingTransitionedEvent(ct
40324032
return nil, serviceerror.NewInternal("time skipping is triggered when conditions are not met")
40334033
}
40344034

4035-
if nextTimePointToSkip == nil {
4036-
event := ms.hBuilder.AddWorkflowExecutionTimeSkippingTransitionedEvent(
4037-
nil,
4038-
disabledAfterBound,
4039-
)
4040-
return event, ms.ApplyWorkflowExecutionTimeSkippingTransitionedEvent(ctx, event)
4035+
var targetTime time.Time
4036+
if nextTimePointToSkip != nil {
4037+
targetTime = nextTimePointToSkip.ExpiryTime.AsTime()
40414038
}
4042-
4043-
targetTime := nextTimePointToSkip.ExpiryTime.AsTime()
4044-
event := ms.hBuilder.AddWorkflowExecutionTimeSkippingTransitionedEvent(
4045-
&targetTime,
4046-
disabledAfterBound,
4047-
)
4039+
event := ms.hBuilder.AddWorkflowExecutionTimeSkippingTransitionedEvent(targetTime, disabledAfterBound)
40484040
return event, ms.ApplyWorkflowExecutionTimeSkippingTransitionedEvent(ctx, event)
40494041
}
40504042

service/history/workflow/mutable_state_impl_test.go

Lines changed: 287 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6661,7 +6661,7 @@ func (s *mutableStateSuite) TestAddWorkflowExecutionTimeSkippingTransitionedEven
66616661
}
66626662
targetTime := s.mockShard.GetTimeSource().Now().Add(time.Hour)
66636663
// Build an event with DisabledAfterBound=true directly via the history builder.
6664-
event := s.mutableState.hBuilder.AddWorkflowExecutionTimeSkippingTransitionedEvent(&targetTime, true)
6664+
event := s.mutableState.hBuilder.AddWorkflowExecutionTimeSkippingTransitionedEvent(targetTime, true)
66656665
err := s.mutableState.ApplyWorkflowExecutionTimeSkippingTransitionedEvent(context.Background(), event)
66666666
s.NoError(err)
66676667
s.False(s.mutableState.GetExecutionInfo().TimeSkippingInfo.Config.Enabled)
@@ -6852,3 +6852,289 @@ func (s *mutableStateSuite) TestCloseTransactionTimeSkipping() {
68526852
}
68536853
})
68546854
}
6855+
6856+
// TestCloseTransactionPrepareTasks exercises closeTransactionPrepareTasks and
6857+
// closeTransactionHandleActivityUserTimerTasks together.
6858+
//
6859+
// closeTransactionHandleActivityUserTimerTasks is responsible for generating exactly one
6860+
// ActivityTimeoutTask and one UserTimerTask (the earliest-firing ones) per transaction.
6861+
// closeTransactionPrepareTasks orchestrates this call and then, when time-skipping is
6862+
// active, calls closeTransactionRegenerateTimerTasksForTimeSkipping afterward — testing
6863+
// that the ordering is correct.
6864+
func (s *mutableStateSuite) TestCloseTransactionPrepareTasks() {
6865+
failoverVersion := s.namespaceEntry.FailoverVersion(tests.WorkflowID)
6866+
now := s.mockShard.GetTimeSource().Now()
6867+
timerExpiry := now.Add(time.Hour)
6868+
6869+
// buildRunningState returns a minimal running workflow with no pending workflow task,
6870+
// activities, or timers. Individual sub-tests layer what they need on top.
6871+
buildRunningState := func() *persistencespb.WorkflowMutableState {
6872+
return &persistencespb.WorkflowMutableState{
6873+
ExecutionInfo: &persistencespb.WorkflowExecutionInfo{
6874+
NamespaceId: s.namespaceEntry.ID().String(),
6875+
WorkflowId: tests.WorkflowID,
6876+
TaskQueue: "testTaskQueue",
6877+
WorkflowTypeName: "testWorkflowType",
6878+
WorkflowExecutionTimerTaskStatus: TimerTaskStatusCreated,
6879+
VersionHistories: &historyspb.VersionHistories{
6880+
Histories: []*historyspb.VersionHistory{
6881+
{
6882+
BranchToken: []byte("token#1"),
6883+
Items: []*historyspb.VersionHistoryItem{{EventId: 2, Version: failoverVersion}},
6884+
},
6885+
},
6886+
},
6887+
TransitionHistory: []*persistencespb.VersionedTransition{
6888+
{NamespaceFailoverVersion: failoverVersion, TransitionCount: 1},
6889+
},
6890+
},
6891+
ExecutionState: &persistencespb.WorkflowExecutionState{
6892+
RunId: tests.RunID,
6893+
State: enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING,
6894+
Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
6895+
},
6896+
NextEventId: 3,
6897+
}
6898+
}
6899+
6900+
// pendingTimer returns a timer map with one timer whose TaskStatus is as given.
6901+
pendingTimer := func(taskStatus int64) map[string]*persistencespb.TimerInfo {
6902+
return map[string]*persistencespb.TimerInfo{
6903+
"t1": {
6904+
TimerId: "t1",
6905+
StartedEventId: 1,
6906+
ExpiryTime: timestamppb.New(timerExpiry),
6907+
TaskStatus: taskStatus,
6908+
},
6909+
}
6910+
}
6911+
6912+
// pendingScheduledActivity returns an activity map with one scheduled (not-started)
6913+
// activity whose TimerTaskStatus is as given.
6914+
// ScheduleToStart fires at now+1h, ScheduleToClose fires at now+2h — so
6915+
// ScheduleToStart is always the earliest timer for this activity.
6916+
pendingScheduledActivity := func(timerTaskStatus int32) map[int64]*persistencespb.ActivityInfo {
6917+
return map[int64]*persistencespb.ActivityInfo{
6918+
5: {
6919+
Version: failoverVersion,
6920+
ScheduledEventId: 5,
6921+
ScheduledTime: timestamppb.New(now),
6922+
StartedEventId: common.EmptyEventID,
6923+
ActivityId: "act1",
6924+
ScheduleToStartTimeout: durationpb.New(time.Hour),
6925+
ScheduleToCloseTimeout: durationpb.New(2 * time.Hour),
6926+
TimerTaskStatus: timerTaskStatus,
6927+
Stamp: 1,
6928+
},
6929+
}
6930+
}
6931+
6932+
collectUserTimerTasks := func(timerTasks []tasks.Task) []*tasks.UserTimerTask {
6933+
var result []*tasks.UserTimerTask
6934+
for _, task := range timerTasks {
6935+
if ut, ok := task.(*tasks.UserTimerTask); ok {
6936+
result = append(result, ut)
6937+
}
6938+
}
6939+
return result
6940+
}
6941+
6942+
collectActivityTimerTasks := func(timerTasks []tasks.Task) []*tasks.ActivityTimeoutTask {
6943+
var result []*tasks.ActivityTimeoutTask
6944+
for _, task := range timerTasks {
6945+
if at, ok := task.(*tasks.ActivityTimeoutTask); ok {
6946+
result = append(result, at)
6947+
}
6948+
}
6949+
return result
6950+
}
6951+
6952+
// ── closeTransactionHandleActivityUserTimerTasks scenarios ──────────────────
6953+
6954+
s.Run("HandleActivityUserTimerTasks/Active_Running_UserTimer_NotCreated", func() {
6955+
// TaskStatus=None: CreateNextUserTimer generates a UserTimerTask and marks the
6956+
// timer as created so the next transaction skips it.
6957+
dbState := buildRunningState()
6958+
dbState.TimerInfos = pendingTimer(TimerTaskStatusNone)
6959+
6960+
ms, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 1)
6961+
s.Require().NoError(err)
6962+
_, err = ms.StartTransaction(s.namespaceEntry)
6963+
s.Require().NoError(err)
6964+
6965+
mutation, _, err := ms.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyActive)
6966+
s.Require().NoError(err)
6967+
6968+
utTasks := collectUserTimerTasks(mutation.Tasks[tasks.CategoryTimer])
6969+
s.Require().Len(utTasks, 1)
6970+
s.Equal(int64(1), utTasks[0].EventID)
6971+
s.Equal(timerExpiry, utTasks[0].VisibilityTimestamp)
6972+
6973+
// The timer must be marked as created in mutable state.
6974+
s.Equal(int64(TimerTaskStatusCreated), ms.pendingTimerInfoIDs["t1"].TaskStatus)
6975+
})
6976+
6977+
s.Run("HandleActivityUserTimerTasks/Active_Running_UserTimer_AlreadyCreated", func() {
6978+
// TaskStatus=Created: CreateNextUserTimer skips generation — no duplicate task.
6979+
dbState := buildRunningState()
6980+
dbState.TimerInfos = pendingTimer(TimerTaskStatusCreated)
6981+
6982+
ms, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 1)
6983+
s.Require().NoError(err)
6984+
_, err = ms.StartTransaction(s.namespaceEntry)
6985+
s.Require().NoError(err)
6986+
6987+
mutation, _, err := ms.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyActive)
6988+
s.Require().NoError(err)
6989+
6990+
s.Empty(collectUserTimerTasks(mutation.Tasks[tasks.CategoryTimer]))
6991+
})
6992+
6993+
s.Run("HandleActivityUserTimerTasks/Active_Running_Activity_NotCreated", func() {
6994+
// No timer task status bits set: CreateNextActivityTimer generates an
6995+
// ActivityTimeoutTask for the earliest-firing timeout (ScheduleToStart).
6996+
dbState := buildRunningState()
6997+
dbState.ActivityInfos = pendingScheduledActivity(0)
6998+
6999+
ms, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 1)
7000+
s.Require().NoError(err)
7001+
_, err = ms.StartTransaction(s.namespaceEntry)
7002+
s.Require().NoError(err)
7003+
7004+
mutation, _, err := ms.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyActive)
7005+
s.Require().NoError(err)
7006+
7007+
atTasks := collectActivityTimerTasks(mutation.Tasks[tasks.CategoryTimer])
7008+
s.Require().Len(atTasks, 1)
7009+
s.Equal(int64(5), atTasks[0].EventID)
7010+
s.Equal(enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START, atTasks[0].TimeoutType)
7011+
})
7012+
7013+
s.Run("HandleActivityUserTimerTasks/Active_Running_Activity_AlreadyCreated", func() {
7014+
// ScheduleToStart bit set: that timer is already created, CreateNextActivityTimer
7015+
// returns without generating a task.
7016+
dbState := buildRunningState()
7017+
dbState.ActivityInfos = pendingScheduledActivity(TimerTaskStatusCreatedScheduleToStart)
7018+
7019+
ms, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 1)
7020+
s.Require().NoError(err)
7021+
_, err = ms.StartTransaction(s.namespaceEntry)
7022+
s.Require().NoError(err)
7023+
7024+
mutation, _, err := ms.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyActive)
7025+
s.Require().NoError(err)
7026+
7027+
s.Empty(collectActivityTimerTasks(mutation.Tasks[tasks.CategoryTimer]))
7028+
})
7029+
7030+
s.Run("HandleActivityUserTimerTasks/Active_NotRunning", func() {
7031+
// Completed workflow: closeTransactionHandleActivityUserTimerTasks short-circuits
7032+
// on !IsWorkflowExecutionRunning(), generating no timer tasks.
7033+
dbState := buildRunningState()
7034+
dbState.ExecutionState.State = enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED
7035+
dbState.ExecutionState.Status = enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED
7036+
dbState.TimerInfos = pendingTimer(TimerTaskStatusNone)
7037+
dbState.ActivityInfos = pendingScheduledActivity(0)
7038+
7039+
ms, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 1)
7040+
s.Require().NoError(err)
7041+
// StartTransaction is needed to initialize currentVersion so closeTransactionWithPolicyCheck
7042+
// can call ClusterNameForFailoverVersion with the right version.
7043+
_, err = ms.StartTransaction(s.namespaceEntry)
7044+
s.Require().NoError(err)
7045+
7046+
mutation, _, err := ms.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyActive)
7047+
s.Require().NoError(err)
7048+
7049+
s.Empty(collectUserTimerTasks(mutation.Tasks[tasks.CategoryTimer]))
7050+
s.Empty(collectActivityTimerTasks(mutation.Tasks[tasks.CategoryTimer]))
7051+
})
7052+
7053+
s.Run("HandleActivityUserTimerTasks/Passive", func() {
7054+
// Passive policy: closeTransactionHandleActivityUserTimerTasks returns immediately,
7055+
// generating no timer tasks regardless of pending timers or activities.
7056+
dbState := buildRunningState()
7057+
dbState.TimerInfos = pendingTimer(TimerTaskStatusNone)
7058+
dbState.ActivityInfos = pendingScheduledActivity(0)
7059+
7060+
ms, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 1)
7061+
s.Require().NoError(err)
7062+
7063+
mutation, _, err := ms.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyPassive)
7064+
s.Require().NoError(err)
7065+
7066+
s.Empty(collectUserTimerTasks(mutation.Tasks[tasks.CategoryTimer]))
7067+
s.Empty(collectActivityTimerTasks(mutation.Tasks[tasks.CategoryTimer]))
7068+
})
7069+
7070+
// ── Ordering: closeTransactionHandleActivityUserTimerTasks runs before
7071+
// closeTransactionRegenerateTimerTasksForTimeSkipping ───────────
7072+
7073+
s.Run("Ordering/UserTimer_NotCreated_WithTimeSkipping", func() {
7074+
// Timer has TaskStatus=None AND the workflow becomes eligible for time-skipping.
7075+
// closeTransactionHandleActivityUserTimerTasks runs first and generates a normal
7076+
// UserTimerTask at ExpiryTime. closeTransactionRegenerateTimerTasksForTimeSkipping
7077+
// then generates a second UserTimerTask at ExpiryTime-accumulatedDuration.
7078+
// Both tasks must be present; the time-skipped one has an earlier timestamp.
7079+
dbState := buildRunningState()
7080+
dbState.ExecutionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{
7081+
Config: &workflowpb.TimeSkippingConfig{Enabled: true},
7082+
}
7083+
dbState.TimerInfos = pendingTimer(TimerTaskStatusNone)
7084+
7085+
ms, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 1)
7086+
s.Require().NoError(err)
7087+
_, err = ms.StartTransaction(s.namespaceEntry)
7088+
s.Require().NoError(err)
7089+
7090+
mutation, _, err := ms.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyActive)
7091+
s.Require().NoError(err)
7092+
7093+
accumulated := ms.GetExecutionInfo().TimeSkippingInfo.AccumulatedSkippedDuration
7094+
s.Require().NotNil(accumulated)
7095+
s.Require().Greater(accumulated.AsDuration(), time.Duration(0))
7096+
7097+
utTasks := collectUserTimerTasks(mutation.Tasks[tasks.CategoryTimer])
7098+
s.Require().Len(utTasks, 2, "expected one normal task and one time-skipped task")
7099+
7100+
// Sort by visibility timestamp so we can check order deterministically.
7101+
if utTasks[0].VisibilityTimestamp.After(utTasks[1].VisibilityTimestamp) {
7102+
utTasks[0], utTasks[1] = utTasks[1], utTasks[0]
7103+
}
7104+
7105+
expectedShifted := timerExpiry.Add(-accumulated.AsDuration())
7106+
s.Equal(expectedShifted, utTasks[0].VisibilityTimestamp, "time-skipped task must fire first")
7107+
s.Equal(timerExpiry, utTasks[1].VisibilityTimestamp, "normal task fires at real expiry")
7108+
s.Equal(int64(1), utTasks[0].EventID)
7109+
s.Equal(int64(1), utTasks[1].EventID)
7110+
})
7111+
7112+
s.Run("Ordering/UserTimer_AlreadyCreated_WithTimeSkipping", func() {
7113+
// Timer has TaskStatus=Created: closeTransactionHandleActivityUserTimerTasks skips
7114+
// it (already created). closeTransactionRegenerateTimerTasksForTimeSkipping still
7115+
// generates the time-shifted task. Only the time-skipped task appears.
7116+
dbState := buildRunningState()
7117+
dbState.ExecutionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{
7118+
Config: &workflowpb.TimeSkippingConfig{Enabled: true},
7119+
}
7120+
dbState.TimerInfos = pendingTimer(TimerTaskStatusCreated)
7121+
7122+
ms, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 1)
7123+
s.Require().NoError(err)
7124+
_, err = ms.StartTransaction(s.namespaceEntry)
7125+
s.Require().NoError(err)
7126+
7127+
mutation, _, err := ms.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyActive)
7128+
s.Require().NoError(err)
7129+
7130+
accumulated := ms.GetExecutionInfo().TimeSkippingInfo.AccumulatedSkippedDuration
7131+
s.Require().NotNil(accumulated)
7132+
7133+
utTasks := collectUserTimerTasks(mutation.Tasks[tasks.CategoryTimer])
7134+
s.Require().Len(utTasks, 1, "only the time-skipped task must be present")
7135+
7136+
expectedShifted := timerExpiry.Add(-accumulated.AsDuration())
7137+
s.Equal(expectedShifted, utTasks[0].VisibilityTimestamp)
7138+
s.Equal(int64(1), utTasks[0].EventID)
7139+
})
7140+
}

0 commit comments

Comments
 (0)