Skip to content

Commit 5a229af

Browse files
committed
add tests for closeTransaction and fix bugs
1 parent 79f7257 commit 5a229af

8 files changed

Lines changed: 305 additions & 28 deletions

service/history/historybuilder/event_factory.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1081,15 +1081,15 @@ func (b *EventFactory) createHistoryEvent(
10811081
}
10821082

10831083
func (b *EventFactory) CreateWorkflowExecutionTimeSkippingTransitionedEvent(
1084-
targetTime *time.Time,
1084+
targetTime time.Time,
10851085
triggeredDisable bool,
10861086
) *historypb.HistoryEvent {
10871087
event := b.createHistoryEvent(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIME_SKIPPING_TRANSITIONED, b.timeSource.Now())
10881088
transitionedAttr := &historypb.WorkflowExecutionTimeSkippingTransitionedEventAttributes{
10891089
WallClockTime: timestamppb.New(b.timeSource.Now()),
10901090
DisabledAfterBound: triggeredDisable,
10911091
}
1092-
if targetTime != nil {
1092+
if !targetTime.IsZero() {
10931093
transitionedAttr.TargetTime = timestamppb.New(targetTime.UTC())
10941094
}
10951095
event.Attributes = &historypb.HistoryEvent_WorkflowExecutionTimeSkippingTransitionedEventAttributes{

service/history/historybuilder/history_builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ func (b *HistoryBuilder) AddActivityTaskScheduledEvent(
323323
}
324324

325325
func (b *HistoryBuilder) AddWorkflowExecutionTimeSkippingTransitionedEvent(
326-
targetTime *time.Time,
326+
targetTime time.Time,
327327
triggeredDisable bool,
328328
) *historypb.HistoryEvent {
329329
event := b.CreateWorkflowExecutionTimeSkippingTransitionedEvent(targetTime, triggeredDisable)

service/history/historybuilder/history_builder_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2580,7 +2580,7 @@ func (s *historyBuilderSuite) TestAddWorkflowExecutionTimeSkippingTransitionedEv
25802580
targetTime := s.now.Add(2 * time.Hour)
25812581

25822582
s.Run("WorkerMayIgnoreAndAttributesPopulated", func() {
2583-
event := s.historyBuilder.AddWorkflowExecutionTimeSkippingTransitionedEvent(&targetTime, false)
2583+
event := s.historyBuilder.AddWorkflowExecutionTimeSkippingTransitionedEvent(targetTime, false)
25842584

25852585
s.NotNil(event)
25862586
s.True(event.WorkerMayIgnore)
@@ -2595,7 +2595,7 @@ func (s *historyBuilderSuite) TestAddWorkflowExecutionTimeSkippingTransitionedEv
25952595
})
25962596

25972597
s.Run("DisabledAfterBoundPropagated", func() {
2598-
event := s.historyBuilder.AddWorkflowExecutionTimeSkippingTransitionedEvent(&targetTime, true)
2598+
event := s.historyBuilder.AddWorkflowExecutionTimeSkippingTransitionedEvent(targetTime, true)
25992599

26002600
attrs := event.GetWorkflowExecutionTimeSkippingTransitionedEventAttributes()
26012601
s.NotNil(attrs)
@@ -2605,7 +2605,7 @@ func (s *historyBuilderSuite) TestAddWorkflowExecutionTimeSkippingTransitionedEv
26052605

26062606
// no targetTime and only disabledAfterBound is true
26072607
s.Run("NoTargetTimeAndOnlyDisabledAfterBoundIsTrue", func() {
2608-
event := s.historyBuilder.AddWorkflowExecutionTimeSkippingTransitionedEvent(nil, true)
2608+
event := s.historyBuilder.AddWorkflowExecutionTimeSkippingTransitionedEvent(time.Time{}, true)
26092609

26102610
attrs := event.GetWorkflowExecutionTimeSkippingTransitionedEventAttributes()
26112611
s.NotNil(attrs)

service/history/timer_queue_active_task_executor.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,6 @@ func (t *timerQueueActiveTaskExecutor) executeUserTimerTimeoutTask(
158158

159159
timerSequence := t.getTimerSequence(mutableState)
160160
referenceTime := t.Now()
161-
if tsInfo := mutableState.GetExecutionInfo().GetTimeSkippingInfo(); tsInfo != nil && tsInfo.GetAccumulatedSkippedDuration() != nil {
162-
referenceTime = mutableState.GetTimeSkippingVirtualTime()
163-
}
164161
timerFired := false
165162
Loop:
166163
for _, timerSequenceID := range timerSequence.LoadAndSortUserTimers() {

service/history/workflow/mutable_state_impl.go

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4032,19 +4032,11 @@ func (ms *MutableStateImpl) AddWorkflowExecutionTimeSkippingTransitionedEvent(ct
40324032
return nil, serviceerror.NewInternal("time skipping is triggered when conditions are not met")
40334033
}
40344034

4035-
if nextTimePointToSkip == nil {
4036-
event := ms.hBuilder.AddWorkflowExecutionTimeSkippingTransitionedEvent(
4037-
nil,
4038-
disabledAfterBound,
4039-
)
4040-
return event, ms.ApplyWorkflowExecutionTimeSkippingTransitionedEvent(ctx, event)
4035+
var targetTime time.Time
4036+
if nextTimePointToSkip != nil {
4037+
targetTime = nextTimePointToSkip.ExpiryTime.AsTime()
40414038
}
4042-
4043-
targetTime := nextTimePointToSkip.ExpiryTime.AsTime()
4044-
event := ms.hBuilder.AddWorkflowExecutionTimeSkippingTransitionedEvent(
4045-
&targetTime,
4046-
disabledAfterBound,
4047-
)
4039+
event := ms.hBuilder.AddWorkflowExecutionTimeSkippingTransitionedEvent(targetTime, disabledAfterBound)
40484040
return event, ms.ApplyWorkflowExecutionTimeSkippingTransitionedEvent(ctx, event)
40494041
}
40504042

@@ -7256,7 +7248,7 @@ func (ms *MutableStateImpl) closeTransaction(
72567248
return closeTransactionResult{}, err
72577249
}
72587250

7259-
regenTimerTasksForTimeSkipping := ms.closeTransactionHandlerTimeSkipping(ctx, transactionPolicy)
7251+
regenTimerTasksForTimeSkipping := ms.closeTransactionHandleTimeSkipping(ctx, transactionPolicy)
72607252

72617253
// Save if the state is dirty before closeTransactionPrepareEvents since it flushes the buffer
72627254
// events, and therefore change the dirty state.
@@ -8458,7 +8450,7 @@ func (ms *MutableStateImpl) closeTransactionHandleActivityUserTimerTasks(
84588450
}
84598451
}
84608452

8461-
func (ms *MutableStateImpl) closeTransactionHandlerTimeSkipping(
8453+
func (ms *MutableStateImpl) closeTransactionHandleTimeSkipping(
84628454
ctx context.Context,
84638455
transactionPolicy historyi.TransactionPolicy,
84648456
) (regenTimerTasksForTimeSkipping bool) {
@@ -8485,7 +8477,7 @@ func (ms *MutableStateImpl) closeTransactionHandlerTimeSkipping(
84858477
case historyi.TransactionPolicyPassive:
84868478
return false
84878479
default:
8488-
ms.logger.Error(fmt.Sprintf("closeTransactionHandlerTimeSkipping: unknown transaction policy: %v", transactionPolicy),
8480+
ms.logger.Error(fmt.Sprintf("closeTransactionHandleTimeSkipping: unknown transaction policy: %v", transactionPolicy),
84898481
tag.WorkflowID(ms.GetExecutionInfo().WorkflowId),
84908482
tag.WorkflowRunID(ms.GetExecutionState().RunId),
84918483
)

0 commit comments

Comments
 (0)