Skip to content

Commit 17add5c

Browse files
committed
add tests for closeTransaction
1 parent 79f7257 commit 17add5c

4 files changed

Lines changed: 289 additions & 4 deletions

File tree

service/history/timer_queue_active_task_executor.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,6 @@ func (t *timerQueueActiveTaskExecutor) executeUserTimerTimeoutTask(
158158

159159
timerSequence := t.getTimerSequence(mutableState)
160160
referenceTime := t.Now()
161-
if tsInfo := mutableState.GetExecutionInfo().GetTimeSkippingInfo(); tsInfo != nil && tsInfo.GetAccumulatedSkippedDuration() != nil {
162-
referenceTime = mutableState.GetTimeSkippingVirtualTime()
163-
}
164161
timerFired := false
165162
Loop:
166163
for _, timerSequenceID := range timerSequence.LoadAndSortUserTimers() {

service/history/workflow/mutable_state_impl_test.go

Lines changed: 286 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6852,3 +6852,289 @@ func (s *mutableStateSuite) TestCloseTransactionTimeSkipping() {
68526852
}
68536853
})
68546854
}
6855+
6856+
// TestCloseTransactionPrepareTasks exercises closeTransactionPrepareTasks and
6857+
// closeTransactionHandleActivityUserTimerTasks together.
6858+
//
6859+
// closeTransactionHandleActivityUserTimerTasks is responsible for generating exactly one
6860+
// ActivityTimeoutTask and one UserTimerTask (the earliest-firing ones) per transaction.
6861+
// closeTransactionPrepareTasks orchestrates this call and then, when time-skipping is
6862+
// active, calls closeTransactionRegenerateTimerTasksForTimeSkipping afterward — testing
6863+
// that the ordering is correct.
6864+
func (s *mutableStateSuite) TestCloseTransactionPrepareTasks() {
6865+
failoverVersion := s.namespaceEntry.FailoverVersion(tests.WorkflowID)
6866+
now := s.mockShard.GetTimeSource().Now()
6867+
timerExpiry := now.Add(time.Hour)
6868+
6869+
// buildRunningState returns a minimal running workflow with no pending workflow task,
6870+
// activities, or timers. Individual sub-tests layer what they need on top.
6871+
buildRunningState := func() *persistencespb.WorkflowMutableState {
6872+
return &persistencespb.WorkflowMutableState{
6873+
ExecutionInfo: &persistencespb.WorkflowExecutionInfo{
6874+
NamespaceId: s.namespaceEntry.ID().String(),
6875+
WorkflowId: tests.WorkflowID,
6876+
TaskQueue: "testTaskQueue",
6877+
WorkflowTypeName: "testWorkflowType",
6878+
WorkflowExecutionTimerTaskStatus: TimerTaskStatusCreated,
6879+
VersionHistories: &historyspb.VersionHistories{
6880+
Histories: []*historyspb.VersionHistory{
6881+
{
6882+
BranchToken: []byte("token#1"),
6883+
Items: []*historyspb.VersionHistoryItem{{EventId: 2, Version: failoverVersion}},
6884+
},
6885+
},
6886+
},
6887+
TransitionHistory: []*persistencespb.VersionedTransition{
6888+
{NamespaceFailoverVersion: failoverVersion, TransitionCount: 1},
6889+
},
6890+
},
6891+
ExecutionState: &persistencespb.WorkflowExecutionState{
6892+
RunId: tests.RunID,
6893+
State: enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING,
6894+
Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
6895+
},
6896+
NextEventId: 3,
6897+
}
6898+
}
6899+
6900+
// pendingTimer returns a timer map with one timer whose TaskStatus is as given.
6901+
pendingTimer := func(taskStatus int64) map[string]*persistencespb.TimerInfo {
6902+
return map[string]*persistencespb.TimerInfo{
6903+
"t1": {
6904+
TimerId: "t1",
6905+
StartedEventId: 1,
6906+
ExpiryTime: timestamppb.New(timerExpiry),
6907+
TaskStatus: taskStatus,
6908+
},
6909+
}
6910+
}
6911+
6912+
// pendingScheduledActivity returns an activity map with one scheduled (not-started)
6913+
// activity whose TimerTaskStatus is as given.
6914+
// ScheduleToStart fires at now+1h, ScheduleToClose fires at now+2h — so
6915+
// ScheduleToStart is always the earliest timer for this activity.
6916+
pendingScheduledActivity := func(timerTaskStatus int32) map[int64]*persistencespb.ActivityInfo {
6917+
return map[int64]*persistencespb.ActivityInfo{
6918+
5: {
6919+
Version: failoverVersion,
6920+
ScheduledEventId: 5,
6921+
ScheduledTime: timestamppb.New(now),
6922+
StartedEventId: common.EmptyEventID,
6923+
ActivityId: "act1",
6924+
ScheduleToStartTimeout: durationpb.New(time.Hour),
6925+
ScheduleToCloseTimeout: durationpb.New(2 * time.Hour),
6926+
TimerTaskStatus: timerTaskStatus,
6927+
Stamp: 1,
6928+
},
6929+
}
6930+
}
6931+
6932+
collectUserTimerTasks := func(timerTasks []tasks.Task) []*tasks.UserTimerTask {
6933+
var result []*tasks.UserTimerTask
6934+
for _, task := range timerTasks {
6935+
if ut, ok := task.(*tasks.UserTimerTask); ok {
6936+
result = append(result, ut)
6937+
}
6938+
}
6939+
return result
6940+
}
6941+
6942+
collectActivityTimerTasks := func(timerTasks []tasks.Task) []*tasks.ActivityTimeoutTask {
6943+
var result []*tasks.ActivityTimeoutTask
6944+
for _, task := range timerTasks {
6945+
if at, ok := task.(*tasks.ActivityTimeoutTask); ok {
6946+
result = append(result, at)
6947+
}
6948+
}
6949+
return result
6950+
}
6951+
6952+
// ── closeTransactionHandleActivityUserTimerTasks scenarios ──────────────────
6953+
6954+
s.Run("HandleActivityUserTimerTasks/Active_Running_UserTimer_NotCreated", func() {
6955+
// TaskStatus=None: CreateNextUserTimer generates a UserTimerTask and marks the
6956+
// timer as created so the next transaction skips it.
6957+
dbState := buildRunningState()
6958+
dbState.TimerInfos = pendingTimer(TimerTaskStatusNone)
6959+
6960+
ms, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 1)
6961+
s.Require().NoError(err)
6962+
_, err = ms.StartTransaction(s.namespaceEntry)
6963+
s.Require().NoError(err)
6964+
6965+
mutation, _, err := ms.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyActive)
6966+
s.Require().NoError(err)
6967+
6968+
utTasks := collectUserTimerTasks(mutation.Tasks[tasks.CategoryTimer])
6969+
s.Require().Len(utTasks, 1)
6970+
s.Equal(int64(1), utTasks[0].EventID)
6971+
s.Equal(timerExpiry, utTasks[0].VisibilityTimestamp)
6972+
6973+
// The timer must be marked as created in mutable state.
6974+
s.Equal(int64(TimerTaskStatusCreated), ms.pendingTimerInfoIDs["t1"].TaskStatus)
6975+
})
6976+
6977+
s.Run("HandleActivityUserTimerTasks/Active_Running_UserTimer_AlreadyCreated", func() {
6978+
// TaskStatus=Created: CreateNextUserTimer skips generation — no duplicate task.
6979+
dbState := buildRunningState()
6980+
dbState.TimerInfos = pendingTimer(TimerTaskStatusCreated)
6981+
6982+
ms, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 1)
6983+
s.Require().NoError(err)
6984+
_, err = ms.StartTransaction(s.namespaceEntry)
6985+
s.Require().NoError(err)
6986+
6987+
mutation, _, err := ms.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyActive)
6988+
s.Require().NoError(err)
6989+
6990+
s.Empty(collectUserTimerTasks(mutation.Tasks[tasks.CategoryTimer]))
6991+
})
6992+
6993+
s.Run("HandleActivityUserTimerTasks/Active_Running_Activity_NotCreated", func() {
6994+
// No timer task status bits set: CreateNextActivityTimer generates an
6995+
// ActivityTimeoutTask for the earliest-firing timeout (ScheduleToStart).
6996+
dbState := buildRunningState()
6997+
dbState.ActivityInfos = pendingScheduledActivity(0)
6998+
6999+
ms, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 1)
7000+
s.Require().NoError(err)
7001+
_, err = ms.StartTransaction(s.namespaceEntry)
7002+
s.Require().NoError(err)
7003+
7004+
mutation, _, err := ms.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyActive)
7005+
s.Require().NoError(err)
7006+
7007+
atTasks := collectActivityTimerTasks(mutation.Tasks[tasks.CategoryTimer])
7008+
s.Require().Len(atTasks, 1)
7009+
s.Equal(int64(5), atTasks[0].EventID)
7010+
s.Equal(enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START, atTasks[0].TimeoutType)
7011+
})
7012+
7013+
s.Run("HandleActivityUserTimerTasks/Active_Running_Activity_AlreadyCreated", func() {
7014+
// ScheduleToStart bit set: that timer is already created, CreateNextActivityTimer
7015+
// returns without generating a task.
7016+
dbState := buildRunningState()
7017+
dbState.ActivityInfos = pendingScheduledActivity(TimerTaskStatusCreatedScheduleToStart)
7018+
7019+
ms, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 1)
7020+
s.Require().NoError(err)
7021+
_, err = ms.StartTransaction(s.namespaceEntry)
7022+
s.Require().NoError(err)
7023+
7024+
mutation, _, err := ms.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyActive)
7025+
s.Require().NoError(err)
7026+
7027+
s.Empty(collectActivityTimerTasks(mutation.Tasks[tasks.CategoryTimer]))
7028+
})
7029+
7030+
s.Run("HandleActivityUserTimerTasks/Active_NotRunning", func() {
7031+
// Completed workflow: closeTransactionHandleActivityUserTimerTasks short-circuits
7032+
// on !IsWorkflowExecutionRunning(), generating no timer tasks.
7033+
dbState := buildRunningState()
7034+
dbState.ExecutionState.State = enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED
7035+
dbState.ExecutionState.Status = enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED
7036+
dbState.TimerInfos = pendingTimer(TimerTaskStatusNone)
7037+
dbState.ActivityInfos = pendingScheduledActivity(0)
7038+
7039+
ms, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 1)
7040+
s.Require().NoError(err)
7041+
// StartTransaction is needed to initialize currentVersion so closeTransactionWithPolicyCheck
7042+
// can call ClusterNameForFailoverVersion with the right version.
7043+
_, err = ms.StartTransaction(s.namespaceEntry)
7044+
s.Require().NoError(err)
7045+
7046+
mutation, _, err := ms.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyActive)
7047+
s.Require().NoError(err)
7048+
7049+
s.Empty(collectUserTimerTasks(mutation.Tasks[tasks.CategoryTimer]))
7050+
s.Empty(collectActivityTimerTasks(mutation.Tasks[tasks.CategoryTimer]))
7051+
})
7052+
7053+
s.Run("HandleActivityUserTimerTasks/Passive", func() {
7054+
// Passive policy: closeTransactionHandleActivityUserTimerTasks returns immediately,
7055+
// generating no timer tasks regardless of pending timers or activities.
7056+
dbState := buildRunningState()
7057+
dbState.TimerInfos = pendingTimer(TimerTaskStatusNone)
7058+
dbState.ActivityInfos = pendingScheduledActivity(0)
7059+
7060+
ms, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 1)
7061+
s.Require().NoError(err)
7062+
7063+
mutation, _, err := ms.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyPassive)
7064+
s.Require().NoError(err)
7065+
7066+
s.Empty(collectUserTimerTasks(mutation.Tasks[tasks.CategoryTimer]))
7067+
s.Empty(collectActivityTimerTasks(mutation.Tasks[tasks.CategoryTimer]))
7068+
})
7069+
7070+
// ── Ordering: closeTransactionHandleActivityUserTimerTasks runs before
7071+
// closeTransactionRegenerateTimerTasksForTimeSkipping ───────────
7072+
7073+
s.Run("Ordering/UserTimer_NotCreated_WithTimeSkipping", func() {
7074+
// Timer has TaskStatus=None AND the workflow becomes eligible for time-skipping.
7075+
// closeTransactionHandleActivityUserTimerTasks runs first and generates a normal
7076+
// UserTimerTask at ExpiryTime. closeTransactionRegenerateTimerTasksForTimeSkipping
7077+
// then generates a second UserTimerTask at ExpiryTime-accumulatedDuration.
7078+
// Both tasks must be present; the time-skipped one has an earlier timestamp.
7079+
dbState := buildRunningState()
7080+
dbState.ExecutionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{
7081+
Config: &workflowpb.TimeSkippingConfig{Enabled: true},
7082+
}
7083+
dbState.TimerInfos = pendingTimer(TimerTaskStatusNone)
7084+
7085+
ms, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 1)
7086+
s.Require().NoError(err)
7087+
_, err = ms.StartTransaction(s.namespaceEntry)
7088+
s.Require().NoError(err)
7089+
7090+
mutation, _, err := ms.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyActive)
7091+
s.Require().NoError(err)
7092+
7093+
accumulated := ms.GetExecutionInfo().TimeSkippingInfo.AccumulatedSkippedDuration
7094+
s.Require().NotNil(accumulated)
7095+
s.Require().Greater(accumulated.AsDuration(), time.Duration(0))
7096+
7097+
utTasks := collectUserTimerTasks(mutation.Tasks[tasks.CategoryTimer])
7098+
s.Require().Len(utTasks, 2, "expected one normal task and one time-skipped task")
7099+
7100+
// Sort by visibility timestamp so we can check order deterministically.
7101+
if utTasks[0].VisibilityTimestamp.After(utTasks[1].VisibilityTimestamp) {
7102+
utTasks[0], utTasks[1] = utTasks[1], utTasks[0]
7103+
}
7104+
7105+
expectedShifted := timerExpiry.Add(-accumulated.AsDuration())
7106+
s.Equal(expectedShifted, utTasks[0].VisibilityTimestamp, "time-skipped task must fire first")
7107+
s.Equal(timerExpiry, utTasks[1].VisibilityTimestamp, "normal task fires at real expiry")
7108+
s.Equal(int64(1), utTasks[0].EventID)
7109+
s.Equal(int64(1), utTasks[1].EventID)
7110+
})
7111+
7112+
s.Run("Ordering/UserTimer_AlreadyCreated_WithTimeSkipping", func() {
7113+
// Timer has TaskStatus=Created: closeTransactionHandleActivityUserTimerTasks skips
7114+
// it (already created). closeTransactionRegenerateTimerTasksForTimeSkipping still
7115+
// generates the time-shifted task. Only the time-skipped task appears.
7116+
dbState := buildRunningState()
7117+
dbState.ExecutionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{
7118+
Config: &workflowpb.TimeSkippingConfig{Enabled: true},
7119+
}
7120+
dbState.TimerInfos = pendingTimer(TimerTaskStatusCreated)
7121+
7122+
ms, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 1)
7123+
s.Require().NoError(err)
7124+
_, err = ms.StartTransaction(s.namespaceEntry)
7125+
s.Require().NoError(err)
7126+
7127+
mutation, _, err := ms.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyActive)
7128+
s.Require().NoError(err)
7129+
7130+
accumulated := ms.GetExecutionInfo().TimeSkippingInfo.AccumulatedSkippedDuration
7131+
s.Require().NotNil(accumulated)
7132+
7133+
utTasks := collectUserTimerTasks(mutation.Tasks[tasks.CategoryTimer])
7134+
s.Require().Len(utTasks, 1, "only the time-skipped task must be present")
7135+
7136+
expectedShifted := timerExpiry.Add(-accumulated.AsDuration())
7137+
s.Equal(expectedShifted, utTasks[0].VisibilityTimestamp)
7138+
s.Equal(int64(1), utTasks[0].EventID)
7139+
})
7140+
}

service/history/workflow/task_generator.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1036,13 +1036,15 @@ func isPathAffectedByDelete(deletePath []hsm.Key, timerPath []*persistencespb.St
10361036
// and the only difference is the TaskID.
10371037
// TODO@time-skipping: currently not safe to call in replication context
10381038
func (r *TaskGeneratorImpl) RegenerateTimerTasksForTimeSkipping() {
1039+
10391040
if r.mutableState.GetExecutionInfo().TimeSkippingInfo == nil {
10401041
return
10411042
}
10421043
accumulatedSkippedDuration := r.mutableState.GetExecutionInfo().TimeSkippingInfo.AccumulatedSkippedDuration.AsDuration()
10431044
if accumulatedSkippedDuration == 0 {
10441045
return
10451046
}
1047+
10461048
userTimerSequenceIDs := r.getTimerSequence().LoadAndSortUserTimers()
10471049
if len(userTimerSequenceIDs) == 0 {
10481050
// This method maybe called when there are no user timers to regenerate,

service/history/workflow/task_generator_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1178,7 +1178,7 @@ func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping(t *testing.T) {
11781178
now := time.Now().UTC()
11791179
skippedDuration := time.Hour
11801180

1181-
// Two pending timers: expiry at now+1h and now+2h.
1181+
// Two pending timers: expiry at now+1h and now+2h; only the earliest is regenerated.
11821182
timer1ExpiryTime := now.Add(1 * time.Hour)
11831183
timer2ExpiryTime := now.Add(2 * time.Hour)
11841184

0 commit comments

Comments
 (0)