Skip to content

Commit 4128a4a

Browse files
committed
add more unittests and functional tests
1 parent 03917b1 commit 4128a4a

7 files changed

Lines changed: 556 additions & 15 deletions

File tree

service/frontend/workflow_handler.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2455,6 +2455,17 @@ func (wh *WorkflowHandler) ResetWorkflowExecution(ctx context.Context, request *
24552455
return nil, serviceerror.NewInternalf("unknown reset reapply type: %v", request.GetResetReapplyType())
24562456
}
24572457

2458+
for _, postOp := range request.GetPostResetOperations() {
2459+
if updateOpts := postOp.GetUpdateWorkflowOptions(); updateOpts != nil {
2460+
if err := wh.validateTimeSkippingConfig(
2461+
updateOpts.GetWorkflowExecutionOptions().GetTimeSkippingConfig(),
2462+
namespace.Name(request.GetNamespace()),
2463+
); err != nil {
2464+
return nil, err
2465+
}
2466+
}
2467+
}
2468+
24582469
namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace()))
24592470
if err != nil {
24602471
return nil, err
@@ -5668,9 +5679,25 @@ func (wh *WorkflowHandler) StartBatchOperation(
56685679
case *workflowservice.StartBatchOperationRequest_ResetOperation:
56695680
input.BatchType = enumspb.BATCH_OPERATION_TYPE_RESET
56705681
identity = op.ResetOperation.GetIdentity()
5682+
for _, postOp := range op.ResetOperation.GetPostResetOperations() {
5683+
if updateOpts := postOp.GetUpdateWorkflowOptions(); updateOpts != nil {
5684+
if err := wh.validateTimeSkippingConfig(
5685+
updateOpts.GetWorkflowExecutionOptions().GetTimeSkippingConfig(),
5686+
namespace.Name(request.GetNamespace()),
5687+
); err != nil {
5688+
return nil, err
5689+
}
5690+
}
5691+
}
56715692
case *workflowservice.StartBatchOperationRequest_UpdateWorkflowOptionsOperation:
56725693
input.BatchType = enumspb.BATCH_OPERATION_TYPE_UPDATE_EXECUTION_OPTIONS
56735694
identity = op.UpdateWorkflowOptionsOperation.GetIdentity()
5695+
if err := wh.validateTimeSkippingConfig(
5696+
op.UpdateWorkflowOptionsOperation.GetWorkflowExecutionOptions().GetTimeSkippingConfig(),
5697+
namespace.Name(request.GetNamespace()),
5698+
); err != nil {
5699+
return nil, err
5700+
}
56745701
case *workflowservice.StartBatchOperationRequest_UnpauseActivitiesOperation:
56755702
input.BatchType = enumspb.BATCH_OPERATION_TYPE_UNPAUSE_ACTIVITY
56765703
identity = op.UnpauseActivitiesOperation.GetIdentity()

service/frontend/workflow_handler_test.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3486,6 +3486,107 @@ func (s *WorkflowHandlerSuite) TestSignalWithStartWorkflowExecution_TimeSkipping
34863486
s.True(resp.Started)
34873487
}
34883488

3489+
// TestResetWorkflowExecution_TimeSkipping_DCDisabled verifies that when the DC gate is off,
3490+
// a ResetWorkflowExecution request with a TimeSkippingConfig inside PostResetOperations is rejected.
3491+
func (s *WorkflowHandlerSuite) TestResetWorkflowExecution_TimeSkipping_DCDisabled() {
3492+
config := s.newConfig()
3493+
config.TimeSkippingEnabled = dc.GetBoolPropertyFnFilteredByNamespace(false)
3494+
wh := s.getWorkflowHandler(config)
3495+
3496+
_, err := wh.ResetWorkflowExecution(context.Background(), &workflowservice.ResetWorkflowExecutionRequest{
3497+
Namespace: s.testNamespace.String(),
3498+
RequestId: uuid.NewString(),
3499+
WorkflowExecution: &commonpb.WorkflowExecution{
3500+
WorkflowId: "workflow-id",
3501+
RunId: uuid.NewString(),
3502+
},
3503+
PostResetOperations: []*workflowpb.PostResetOperation{
3504+
{
3505+
Variant: &workflowpb.PostResetOperation_UpdateWorkflowOptions_{
3506+
UpdateWorkflowOptions: &workflowpb.PostResetOperation_UpdateWorkflowOptions{
3507+
WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{
3508+
TimeSkippingConfig: &workflowpb.TimeSkippingConfig{Enabled: true},
3509+
},
3510+
UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"time_skipping_config"}},
3511+
},
3512+
},
3513+
},
3514+
},
3515+
})
3516+
var unimplemented *serviceerror.Unimplemented
3517+
s.ErrorAs(err, &unimplemented)
3518+
s.ErrorContains(err, "The Time-Skipping feature is not enabled for namespace")
3519+
}
3520+
3521+
// TestStartBatchOperation_ResetOperation_TimeSkipping_DCDisabled verifies that when the DC gate
3522+
// is off, a batch reset with a TimeSkippingConfig inside PostResetOperations is rejected.
3523+
func (s *WorkflowHandlerSuite) TestStartBatchOperation_ResetOperation_TimeSkipping_DCDisabled() {
3524+
config := s.newConfig()
3525+
config.TimeSkippingEnabled = dc.GetBoolPropertyFnFilteredByNamespace(false)
3526+
wh := s.getWorkflowHandler(config)
3527+
namespaceID := namespace.ID(uuid.NewString())
3528+
s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return(namespaceID, nil).AnyTimes()
3529+
s.mockVisibilityMgr.EXPECT().CountWorkflowExecutions(gomock.Any(), gomock.Any()).Return(&manager.CountWorkflowExecutionsResponse{Count: 0}, nil)
3530+
3531+
_, err := wh.StartBatchOperation(context.Background(), &workflowservice.StartBatchOperationRequest{
3532+
Namespace: s.testNamespace.String(),
3533+
JobId: uuid.NewString(),
3534+
Reason: "test",
3535+
VisibilityQuery: "WorkflowId='test'",
3536+
Operation: &workflowservice.StartBatchOperationRequest_ResetOperation{
3537+
ResetOperation: &batchpb.BatchOperationReset{
3538+
Options: &commonpb.ResetOptions{
3539+
Target: &commonpb.ResetOptions_WorkflowTaskId{WorkflowTaskId: 1},
3540+
},
3541+
PostResetOperations: []*workflowpb.PostResetOperation{
3542+
{
3543+
Variant: &workflowpb.PostResetOperation_UpdateWorkflowOptions_{
3544+
UpdateWorkflowOptions: &workflowpb.PostResetOperation_UpdateWorkflowOptions{
3545+
WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{
3546+
TimeSkippingConfig: &workflowpb.TimeSkippingConfig{Enabled: true},
3547+
},
3548+
UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"time_skipping_config"}},
3549+
},
3550+
},
3551+
},
3552+
},
3553+
},
3554+
},
3555+
})
3556+
var unimplemented *serviceerror.Unimplemented
3557+
s.ErrorAs(err, &unimplemented)
3558+
s.ErrorContains(err, "The Time-Skipping feature is not enabled for namespace")
3559+
}
3560+
3561+
// TestStartBatchOperation_UpdateWorkflowOptionsOperation_TimeSkipping_DCDisabled verifies that
3562+
// when the DC gate is off, a batch UpdateWorkflowOptions with a TimeSkippingConfig is rejected.
3563+
func (s *WorkflowHandlerSuite) TestStartBatchOperation_UpdateWorkflowOptionsOperation_TimeSkipping_DCDisabled() {
3564+
config := s.newConfig()
3565+
config.TimeSkippingEnabled = dc.GetBoolPropertyFnFilteredByNamespace(false)
3566+
wh := s.getWorkflowHandler(config)
3567+
namespaceID := namespace.ID(uuid.NewString())
3568+
s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return(namespaceID, nil).AnyTimes()
3569+
s.mockVisibilityMgr.EXPECT().CountWorkflowExecutions(gomock.Any(), gomock.Any()).Return(&manager.CountWorkflowExecutionsResponse{Count: 0}, nil)
3570+
3571+
_, err := wh.StartBatchOperation(context.Background(), &workflowservice.StartBatchOperationRequest{
3572+
Namespace: s.testNamespace.String(),
3573+
JobId: uuid.NewString(),
3574+
Reason: "test",
3575+
VisibilityQuery: "WorkflowId='test'",
3576+
Operation: &workflowservice.StartBatchOperationRequest_UpdateWorkflowOptionsOperation{
3577+
UpdateWorkflowOptionsOperation: &batchpb.BatchOperationUpdateWorkflowExecutionOptions{
3578+
WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{
3579+
TimeSkippingConfig: &workflowpb.TimeSkippingConfig{Enabled: true},
3580+
},
3581+
UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"time_skipping_config"}},
3582+
},
3583+
},
3584+
})
3585+
var unimplemented *serviceerror.Unimplemented
3586+
s.ErrorAs(err, &unimplemented)
3587+
s.ErrorContains(err, "The Time-Skipping feature is not enabled for namespace")
3588+
}
3589+
34893590
func (s *WorkflowHandlerSuite) newConfig() *Config {
34903591
return NewConfig(dc.NewNoopCollection(), numHistoryShards)
34913592
}
@@ -4475,6 +4576,29 @@ func (s *WorkflowHandlerSuite) TestUpdateWorkflowExecutionOptions_Priority() {
44754576
// NOTE: only testing a single validation scenario here; the priority validation has its own unit tests
44764577
}
44774578

4579+
// TestUpdateWorkflowExecutionOptions_TimeSkipping_DCDisabled verifies that when the DC gate is
4580+
// off, UpdateWorkflowExecutionOptions rejects a request containing a TimeSkippingConfig.
4581+
func (s *WorkflowHandlerSuite) TestUpdateWorkflowExecutionOptions_TimeSkipping_DCDisabled() {
4582+
config := s.newConfig()
4583+
config.TimeSkippingEnabled = dc.GetBoolPropertyFnFilteredByNamespace(false)
4584+
wh := s.getWorkflowHandler(config)
4585+
4586+
_, err := wh.UpdateWorkflowExecutionOptions(context.Background(), &workflowservice.UpdateWorkflowExecutionOptionsRequest{
4587+
Namespace: s.testNamespace.String(),
4588+
WorkflowExecution: &commonpb.WorkflowExecution{
4589+
WorkflowId: "workflow-id",
4590+
RunId: "run-id",
4591+
},
4592+
WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{
4593+
TimeSkippingConfig: &workflowpb.TimeSkippingConfig{Enabled: true},
4594+
},
4595+
UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"time_skipping_config"}},
4596+
})
4597+
var unimplemented *serviceerror.Unimplemented
4598+
s.ErrorAs(err, &unimplemented)
4599+
s.ErrorContains(err, "The Time-Skipping feature is not enabled for namespace")
4600+
}
4601+
44784602
func (s *WorkflowHandlerSuite) TestUpdateActivityOptions_Priority() {
44794603
config := s.newConfig()
44804604
wh := s.getWorkflowHandler(config)

service/history/api/updateworkflowoptions/api_test.go

Lines changed: 104 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ package updateworkflowoptions
33
import (
44
"context"
55
"testing"
6+
"time"
67

7-
"github.com/stretchr/testify/assert"
88
"github.com/stretchr/testify/require"
99
"github.com/stretchr/testify/suite"
1010
commonpb "go.temporal.io/api/common/v1"
@@ -28,6 +28,7 @@ import (
2828
wcache "go.temporal.io/server/service/history/workflow/cache"
2929
"go.uber.org/mock/gomock"
3030
"google.golang.org/protobuf/proto"
31+
"google.golang.org/protobuf/types/known/durationpb"
3132
"google.golang.org/protobuf/types/known/fieldmaskpb"
3233
)
3334

@@ -94,28 +95,28 @@ func TestMergeOptions_VersionOverrideMask(t *testing.T) {
9495
if err != nil {
9596
t.Error(err)
9697
}
97-
assert.EqualExportedValues(t, unpinnedOverrideOptions, merged)
98+
require.EqualExportedValues(t, unpinnedOverrideOptions, merged)
9899

99100
// Merge pinned_A into unpinned options
100101
merged, err = mergeWorkflowExecutionOptions(input, pinnedOverrideOptionsA, updateMask)
101102
if err != nil {
102103
t.Error(err)
103104
}
104-
assert.EqualExportedValues(t, pinnedOverrideOptionsA, merged)
105+
require.EqualExportedValues(t, pinnedOverrideOptionsA, merged)
105106

106107
// Merge pinned_B into pinned_A options
107108
merged, err = mergeWorkflowExecutionOptions(input, pinnedOverrideOptionsB, updateMask)
108109
if err != nil {
109110
t.Error(err)
110111
}
111-
assert.EqualExportedValues(t, pinnedOverrideOptionsB, merged)
112+
require.EqualExportedValues(t, pinnedOverrideOptionsB, merged)
112113

113114
// Unset versioning override
114115
merged, err = mergeWorkflowExecutionOptions(input, emptyOptions, updateMask)
115116
if err != nil {
116117
t.Error(err)
117118
}
118-
assert.EqualExportedValues(t, emptyOptions, merged)
119+
require.EqualExportedValues(t, emptyOptions, merged)
119120
}
120121

121122
func TestMergeOptions_PartialMask(t *testing.T) {
@@ -124,14 +125,14 @@ func TestMergeOptions_PartialMask(t *testing.T) {
124125
deploymentOnlyUpdateMask := &fieldmaskpb.FieldMask{Paths: []string{"versioning_override.deployment"}}
125126

126127
_, err := mergeWorkflowExecutionOptions(emptyOptions, unpinnedOverrideOptions, behaviorOnlyUpdateMask)
127-
assert.Error(t, err)
128+
require.Error(t, err)
128129

129130
_, err = mergeWorkflowExecutionOptions(emptyOptions, unpinnedOverrideOptions, deploymentOnlyUpdateMask)
130-
assert.Error(t, err)
131+
require.Error(t, err)
131132

132133
merged, err := mergeWorkflowExecutionOptions(emptyOptions, unpinnedOverrideOptions, bothUpdateMask)
133-
assert.NoError(t, err)
134-
assert.EqualExportedValues(t, unpinnedOverrideOptions, merged)
134+
require.NoError(t, err)
135+
require.EqualExportedValues(t, unpinnedOverrideOptions, merged)
135136
}
136137

137138
func TestMergeOptions_EmptyMask(t *testing.T) {
@@ -140,25 +141,113 @@ func TestMergeOptions_EmptyMask(t *testing.T) {
140141

141142
// Don't merge anything
142143
merged, err := mergeWorkflowExecutionOptions(input, pinnedOverrideOptionsA, emptyUpdateMask)
143-
assert.NoError(t, err)
144-
assert.EqualExportedValues(t, input, merged)
144+
require.NoError(t, err)
145+
require.EqualExportedValues(t, input, merged)
145146

146147
// Don't merge anything
147148
merged, err = mergeWorkflowExecutionOptions(input, nil, emptyUpdateMask)
148-
assert.NoError(t, err)
149-
assert.EqualExportedValues(t, input, merged)
149+
require.NoError(t, err)
150+
require.EqualExportedValues(t, input, merged)
150151
}
151152

152153
func TestMergeOptions_AsteriskMask(t *testing.T) {
153154
asteriskUpdateMask := &fieldmaskpb.FieldMask{Paths: []string{"*"}}
154155
_, err := mergeWorkflowExecutionOptions(emptyOptions, unpinnedOverrideOptions, asteriskUpdateMask)
155-
assert.Error(t, err)
156+
require.Error(t, err)
156157
}
157158

158159
func TestMergeOptions_FooMask(t *testing.T) {
159160
fooUpdateMask := &fieldmaskpb.FieldMask{Paths: []string{"foo"}}
160161
_, err := mergeWorkflowExecutionOptions(emptyOptions, unpinnedOverrideOptions, fooUpdateMask)
161-
assert.Error(t, err)
162+
require.Error(t, err)
163+
}
164+
165+
func TestMergeOptions_TimeSkippingConfig(t *testing.T) {
166+
tscMask := &fieldmaskpb.FieldMask{Paths: []string{"time_skipping_config"}}
167+
cfgA := &workflowpb.TimeSkippingConfig{Enabled: true}
168+
cfgB := &workflowpb.TimeSkippingConfig{
169+
Enabled: true,
170+
Bound: &workflowpb.TimeSkippingConfig_MaxSkippedDuration{MaxSkippedDuration: durationpb.New(time.Hour)},
171+
}
172+
173+
tcs := []struct {
174+
name string
175+
current *workflowpb.WorkflowExecutionOptions
176+
update *workflowpb.WorkflowExecutionOptions
177+
wantChanged bool
178+
wantConfig *workflowpb.TimeSkippingConfig
179+
}{
180+
// nil update means "don't touch" even when mask is present
181+
{
182+
name: "nil update - existing config preserved",
183+
current: &workflowpb.WorkflowExecutionOptions{TimeSkippingConfig: cfgA},
184+
update: &workflowpb.WorkflowExecutionOptions{},
185+
wantChanged: false,
186+
wantConfig: cfgA,
187+
},
188+
// non-nil update replaces and is detected as a change
189+
{
190+
name: "new config - changed",
191+
current: &workflowpb.WorkflowExecutionOptions{},
192+
update: &workflowpb.WorkflowExecutionOptions{TimeSkippingConfig: cfgB},
193+
wantChanged: true,
194+
wantConfig: cfgB,
195+
},
196+
// identical config is not detected as a change
197+
{
198+
name: "same config - no change",
199+
current: &workflowpb.WorkflowExecutionOptions{TimeSkippingConfig: cfgB},
200+
update: &workflowpb.WorkflowExecutionOptions{TimeSkippingConfig: cfgB},
201+
wantChanged: false,
202+
wantConfig: cfgB,
203+
},
204+
}
205+
206+
for _, tc := range tcs {
207+
t.Run(tc.name, func(t *testing.T) {
208+
original := proto.Clone(tc.current).(*workflowpb.WorkflowExecutionOptions)
209+
merged, err := mergeWorkflowExecutionOptions(tc.current, tc.update, tscMask)
210+
require.NoError(t, err)
211+
require.True(t, proto.Equal(tc.wantConfig, merged.GetTimeSkippingConfig()),
212+
"config mismatch: want %v, got %v", tc.wantConfig, merged.GetTimeSkippingConfig())
213+
require.Equal(t, tc.wantChanged, !proto.Equal(merged, original))
214+
})
215+
}
216+
}
217+
218+
func TestMergeAndApply_TimeSkippingConfig(t *testing.T) {
219+
tscMask := &fieldmaskpb.FieldMask{Paths: []string{"time_skipping_config"}}
220+
cfg := &workflowpb.TimeSkippingConfig{
221+
Enabled: true,
222+
Bound: &workflowpb.TimeSkippingConfig_MaxSkippedDuration{MaxSkippedDuration: durationpb.New(time.Hour)},
223+
}
224+
225+
t.Run("same config - no event written", func(t *testing.T) {
226+
ctrl := gomock.NewController(t)
227+
ms := historyi.NewMockMutableState(ctrl)
228+
ms.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
229+
TimeSkippingInfo: &persistencespb.TimeSkippingInfo{Config: cfg},
230+
}).AnyTimes()
231+
232+
merged, hasChanges, err := MergeAndApply(ms, &workflowpb.WorkflowExecutionOptions{TimeSkippingConfig: cfg}, tscMask, "")
233+
require.NoError(t, err)
234+
require.False(t, hasChanges)
235+
require.True(t, proto.Equal(cfg, merged.GetTimeSkippingConfig()))
236+
})
237+
238+
t.Run("new config - event written with config", func(t *testing.T) {
239+
ctrl := gomock.NewController(t)
240+
ms := historyi.NewMockMutableState(ctrl)
241+
ms.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{}).AnyTimes()
242+
ms.EXPECT().AddWorkflowExecutionOptionsUpdatedEvent(
243+
nil, true, "", nil, nil, "", nil, cfg,
244+
).Return(&historypb.HistoryEvent{}, nil)
245+
246+
merged, hasChanges, err := MergeAndApply(ms, &workflowpb.WorkflowExecutionOptions{TimeSkippingConfig: cfg}, tscMask, "")
247+
require.NoError(t, err)
248+
require.True(t, hasChanges)
249+
require.True(t, proto.Equal(cfg, merged.GetTimeSkippingConfig()))
250+
})
162251
}
163252

164253
type (

0 commit comments

Comments
 (0)