Skip to content

Commit 7e31ae7

Browse files
committed
add ut for ms methods for timeskipping
1 parent f58b67e commit 7e31ae7

2 files changed

Lines changed: 187 additions & 3 deletions

File tree

service/history/workflow/mutable_state_impl.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7137,9 +7137,6 @@ func (ms *MutableStateImpl) CloseTransactionAsSnapshot(
71377137
Checksum: result.checksum,
71387138
}
71397139

7140-
// todo: close transaction as snapshot may also need time skipping
7141-
// for features like start-with-delay feature that has something to skip before the first workflow task is scheduled.
7142-
71437140
ms.checksum = result.checksum
71447141
if err := ms.cleanupTransaction(); err != nil {
71457142
return nil, nil, err
@@ -8441,11 +8438,13 @@ func (ms *MutableStateImpl) closeTransactionHandlerTimeSkipping(
84418438
ctx context.Context,
84428439
transactionPolicy historyi.TransactionPolicy,
84438440
) (regenTimerTasksForTimeSkipping bool) {
8441+
// TODO@time-skipping: need to check if passive cluster need this function
84448442
switch transactionPolicy {
84458443
case historyi.TransactionPolicyActive:
84468444
if !ms.IsWorkflowExecutionRunning() {
84478445
return false
84488446
}
8447+
// TODO@time-skipping: need to support start-with-delay
84498448
if transactionPolicy == historyi.TransactionPolicyActive && ms.ShouldExecuteTimeSkipping() {
84508449
if _, err := ms.AddWorkflowExecutionTimeSkippingTransitionedEvent(ctx); err != nil {
84518450
ms.metricsHandler.Counter(metrics.ExecutionTimeSkippingTransitionedErrorCounter.Name()).Record(1)
@@ -8470,6 +8469,7 @@ func (ms *MutableStateImpl) closeTransactionHandlerTimeSkipping(
84708469
func (ms *MutableStateImpl) closeTransactionRegenerateTimerTasksForTimeSkipping(
84718470
transactionPolicy historyi.TransactionPolicy,
84728471
) error {
8472+
// TODO@time-skipping: need to check if passive cluster need this function
84738473
switch transactionPolicy {
84748474
case historyi.TransactionPolicyActive:
84758475
if !ms.IsWorkflowExecutionRunning() {

service/history/workflow/mutable_state_impl_test.go

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6562,3 +6562,187 @@ func (s *mutableStateSuite) TestAddWorkflowExecutionTimeSkippingTransitionedEven
65626562
s.False(s.mutableState.GetExecutionInfo().TimeSkippingInfo.Config.Enabled)
65636563
})
65646564
}
6565+
6566+
// TestCloseTransactionTimeSkipping exercises the time-skipping logic that runs inside
6567+
// closeTransaction:
6568+
//
6569+
// - closeTransactionHandlerTimeSkipping: decides whether to fire time skipping and, if so,
6570+
// calls AddWorkflowExecutionTimeSkippingTransitionedEvent to write the history event and
6571+
// accumulate the skipped duration.
6572+
// - closeTransactionRegenerateTimerTasksForTimeSkipping: regenerates UserTimerTask entries
6573+
// with timestamps shifted back by AccumulatedSkippedDuration.
6574+
func (s *mutableStateSuite) TestCloseTransactionTimeSkipping() {
6575+
failoverVersion := s.namespaceEntry.FailoverVersion(tests.WorkflowID)
6576+
6577+
// buildEligibleState returns a minimal running workflow state that satisfies
6578+
// ShouldExecuteTimeSkipping(): time-skipping enabled, one pending timer, no pending
6579+
// workflow task / activities / child executions.
6580+
//
6581+
// The timer's TaskStatus is pre-set to TimerTaskStatusCreated so that
6582+
// closeTransactionHandleActivityUserTimerTasks does NOT add a second UserTimerTask for it.
6583+
// That way the only timer task in the mutation is the regenerated (time-shifted) one.
6584+
buildEligibleState := func(timerExpiry time.Time) *persistencespb.WorkflowMutableState {
6585+
return &persistencespb.WorkflowMutableState{
6586+
ExecutionInfo: &persistencespb.WorkflowExecutionInfo{
6587+
NamespaceId: s.namespaceEntry.ID().String(),
6588+
WorkflowId: tests.WorkflowID,
6589+
TaskQueue: "testTaskQueue",
6590+
WorkflowTypeName: "testWorkflowType",
6591+
WorkflowExecutionTimerTaskStatus: TimerTaskStatusCreated,
6592+
TimeSkippingInfo: &persistencespb.TimeSkippingInfo{
6593+
Config: &workflowpb.TimeSkippingConfig{Enabled: true},
6594+
},
6595+
VersionHistories: &historyspb.VersionHistories{
6596+
Histories: []*historyspb.VersionHistory{
6597+
{
6598+
BranchToken: []byte("token#1"),
6599+
Items: []*historyspb.VersionHistoryItem{{EventId: 2, Version: failoverVersion}},
6600+
},
6601+
},
6602+
},
6603+
TransitionHistory: []*persistencespb.VersionedTransition{
6604+
{NamespaceFailoverVersion: failoverVersion, TransitionCount: 1},
6605+
},
6606+
},
6607+
ExecutionState: &persistencespb.WorkflowExecutionState{
6608+
RunId: tests.RunID,
6609+
State: enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING,
6610+
Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
6611+
},
6612+
NextEventId: 3,
6613+
TimerInfos: map[string]*persistencespb.TimerInfo{
6614+
"t1": {
6615+
TimerId: "t1",
6616+
StartedEventId: 1,
6617+
ExpiryTime: timestamppb.New(timerExpiry),
6618+
TaskStatus: TimerTaskStatusCreated,
6619+
},
6620+
},
6621+
}
6622+
}
6623+
6624+
s.Run("Active_Eligible_WritesEventAndRegeneratesTimerTask", func() {
6625+
// Happy path: active cluster, running workflow, time-skipping enabled, one pending
6626+
// timer, no other in-flight work. Expect:
6627+
// 1. A WorkflowExecutionTimeSkippingTransitioned event in the event batch.
6628+
// 2. AccumulatedSkippedDuration set on execution info.
6629+
// 3. A regenerated UserTimerTask at (timerExpiry - accumulatedDuration).
6630+
now := s.mockShard.GetTimeSource().Now()
6631+
timerExpiry := now.Add(2 * time.Hour)
6632+
6633+
ms, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, buildEligibleState(timerExpiry), 1)
6634+
s.Require().NoError(err)
6635+
_, err = ms.StartTransaction(s.namespaceEntry)
6636+
s.Require().NoError(err)
6637+
6638+
mutation, workflowEventsSeq, err := ms.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyActive)
6639+
s.Require().NoError(err)
6640+
6641+
// AccumulatedSkippedDuration must be positive.
6642+
accumulated := ms.GetExecutionInfo().TimeSkippingInfo.AccumulatedSkippedDuration
6643+
s.Require().NotNil(accumulated)
6644+
s.Greater(accumulated.AsDuration(), time.Duration(0))
6645+
6646+
// A WorkflowExecutionTimeSkippingTransitioned event must appear in the written batches.
6647+
var tsEvent *historypb.HistoryEvent
6648+
for _, we := range workflowEventsSeq {
6649+
for _, ev := range we.Events {
6650+
if ev.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIME_SKIPPING_TRANSITIONED {
6651+
tsEvent = ev
6652+
}
6653+
}
6654+
}
6655+
s.Require().NotNil(tsEvent, "expected WorkflowExecutionTimeSkippingTransitioned event in workflowEventsSeq")
6656+
s.Equal(timerExpiry, tsEvent.GetWorkflowExecutionTimeSkippingTransitionedEventAttributes().GetTargetTime().AsTime())
6657+
6658+
// The mutation must contain a regenerated UserTimerTask shifted back by the accumulated duration.
6659+
expectedTS := timerExpiry.Add(-accumulated.AsDuration())
6660+
var regenerated *tasks.UserTimerTask
6661+
for _, task := range mutation.Tasks[tasks.CategoryTimer] {
6662+
if ut, ok := task.(*tasks.UserTimerTask); ok && ut.EventID == 1 {
6663+
regenerated = ut
6664+
}
6665+
}
6666+
s.Require().NotNil(regenerated, "expected regenerated UserTimerTask for EventID=1")
6667+
s.Equal(expectedTS, regenerated.VisibilityTimestamp)
6668+
s.Equal(int64(0), regenerated.TaskID, "TaskID must be zero — assigned by shard, not the generator")
6669+
})
6670+
6671+
s.Run("Active_TimeSkippingDisabled_NoEvent", func() {
6672+
// When TimeSkippingInfo is nil, ShouldExecuteTimeSkipping returns false immediately.
6673+
// No time-skipping event should be emitted and AccumulatedSkippedDuration stays nil.
6674+
now := s.mockShard.GetTimeSource().Now()
6675+
timerExpiry := now.Add(2 * time.Hour)
6676+
6677+
dbState := buildEligibleState(timerExpiry)
6678+
dbState.ExecutionInfo.TimeSkippingInfo = nil // time skipping not configured
6679+
6680+
ms, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 1)
6681+
s.Require().NoError(err)
6682+
_, err = ms.StartTransaction(s.namespaceEntry)
6683+
s.Require().NoError(err)
6684+
6685+
_, workflowEventsSeq, err := ms.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyActive)
6686+
s.Require().NoError(err)
6687+
6688+
for _, we := range workflowEventsSeq {
6689+
for _, ev := range we.Events {
6690+
s.NotEqual(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIME_SKIPPING_TRANSITIONED, ev.GetEventType(),
6691+
"unexpected WorkflowExecutionTimeSkippingTransitioned event when time skipping is disabled")
6692+
}
6693+
}
6694+
})
6695+
6696+
s.Run("Active_NoPendingTimers_NoEvent", func() {
6697+
// ShouldExecuteTimeSkipping returns false when there are no pending timers and no
6698+
// time bound configured. No event should be emitted.
6699+
now := s.mockShard.GetTimeSource().Now()
6700+
6701+
dbState := buildEligibleState(now.Add(time.Hour))
6702+
dbState.TimerInfos = nil // no pending timers → nothing to skip to
6703+
6704+
ms, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 1)
6705+
s.Require().NoError(err)
6706+
_, err = ms.StartTransaction(s.namespaceEntry)
6707+
s.Require().NoError(err)
6708+
6709+
_, workflowEventsSeq, err := ms.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyActive)
6710+
s.Require().NoError(err)
6711+
6712+
for _, we := range workflowEventsSeq {
6713+
for _, ev := range we.Events {
6714+
s.NotEqual(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIME_SKIPPING_TRANSITIONED, ev.GetEventType(),
6715+
"unexpected WorkflowExecutionTimeSkippingTransitioned event when there are no pending timers")
6716+
}
6717+
}
6718+
s.Nil(ms.GetExecutionInfo().TimeSkippingInfo.AccumulatedSkippedDuration)
6719+
})
6720+
6721+
s.Run("Passive_EligibleState_NoEvent", func() {
6722+
// Passive policy always short-circuits time skipping.
6723+
// Even with an otherwise eligible state, no event or regenerated timer task is produced.
6724+
now := s.mockShard.GetTimeSource().Now()
6725+
timerExpiry := now.Add(2 * time.Hour)
6726+
6727+
ms, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, buildEligibleState(timerExpiry), 1)
6728+
s.Require().NoError(err)
6729+
6730+
mutation, workflowEventsSeq, err := ms.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyPassive)
6731+
s.Require().NoError(err)
6732+
6733+
for _, we := range workflowEventsSeq {
6734+
for _, ev := range we.Events {
6735+
s.NotEqual(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIME_SKIPPING_TRANSITIONED, ev.GetEventType(),
6736+
"passive cluster must not emit WorkflowExecutionTimeSkippingTransitioned event")
6737+
}
6738+
}
6739+
s.Nil(ms.GetExecutionInfo().TimeSkippingInfo.AccumulatedSkippedDuration)
6740+
6741+
// No regenerated UserTimerTask: the passive path skips both
6742+
// closeTransactionHandlerTimeSkipping and closeTransactionRegenerateTimerTasksForTimeSkipping.
6743+
for _, task := range mutation.Tasks[tasks.CategoryTimer] {
6744+
_, ok := task.(*tasks.UserTimerTask)
6745+
s.False(ok, "passive cluster must not produce regenerated UserTimerTask")
6746+
}
6747+
})
6748+
}

0 commit comments

Comments
 (0)