Skip to content

Commit 88762a2

Browse files
committed
fix bugs in mergeAndUpdate
1 parent c2a602f commit 88762a2

2 files changed

Lines changed: 174 additions & 15 deletions

File tree

service/history/api/updateworkflowoptions/api.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,11 @@ func getOptionsFromMutableState(ms historyi.MutableState) *workflowpb.WorkflowEx
172172
opts.Priority = cloned
173173
}
174174
}
175+
if tsInfo := ms.GetExecutionInfo().GetTimeSkippingInfo(); tsInfo != nil {
176+
if cloned, ok := proto.Clone(tsInfo.GetConfig()).(*workflowpb.TimeSkippingConfig); ok {
177+
opts.TimeSkippingConfig = cloned
178+
}
179+
}
175180
return opts
176181
}
177182

@@ -230,5 +235,52 @@ func mergeWorkflowExecutionOptions(
230235
mergeInto.Priority.FairnessWeight = mergeFrom.Priority.GetFairnessWeight()
231236
}
232237

238+
// ==== TimeSkippingConfig
239+
240+
if _, ok := updateFields["timeSkippingConfig"]; ok {
241+
mergeInto.TimeSkippingConfig = mergeFrom.GetTimeSkippingConfig()
242+
}
243+
244+
if _, ok := updateFields["timeSkippingConfig.enabled"]; ok {
245+
if mergeInto.TimeSkippingConfig == nil {
246+
mergeInto.TimeSkippingConfig = &workflowpb.TimeSkippingConfig{}
247+
}
248+
mergeInto.TimeSkippingConfig.Enabled = mergeFrom.GetTimeSkippingConfig().GetEnabled()
249+
}
250+
251+
if _, ok := updateFields["timeSkippingConfig.disablePropagation"]; ok {
252+
if mergeInto.TimeSkippingConfig == nil {
253+
mergeInto.TimeSkippingConfig = &workflowpb.TimeSkippingConfig{}
254+
}
255+
mergeInto.TimeSkippingConfig.DisablePropagation = mergeFrom.GetTimeSkippingConfig().GetDisablePropagation()
256+
}
257+
258+
if _, ok := updateFields["timeSkippingConfig.maxSkippedDuration"]; ok {
259+
if mergeInto.TimeSkippingConfig == nil {
260+
mergeInto.TimeSkippingConfig = &workflowpb.TimeSkippingConfig{}
261+
}
262+
mergeInto.TimeSkippingConfig.Bound = &workflowpb.TimeSkippingConfig_MaxSkippedDuration{
263+
MaxSkippedDuration: mergeFrom.GetTimeSkippingConfig().GetMaxSkippedDuration(),
264+
}
265+
}
266+
267+
if _, ok := updateFields["timeSkippingConfig.maxElapsedDuration"]; ok {
268+
if mergeInto.TimeSkippingConfig == nil {
269+
mergeInto.TimeSkippingConfig = &workflowpb.TimeSkippingConfig{}
270+
}
271+
mergeInto.TimeSkippingConfig.Bound = &workflowpb.TimeSkippingConfig_MaxElapsedDuration{
272+
MaxElapsedDuration: mergeFrom.GetTimeSkippingConfig().GetMaxElapsedDuration(),
273+
}
274+
}
275+
276+
if _, ok := updateFields["timeSkippingConfig.maxTargetTime"]; ok {
277+
if mergeInto.TimeSkippingConfig == nil {
278+
mergeInto.TimeSkippingConfig = &workflowpb.TimeSkippingConfig{}
279+
}
280+
mergeInto.TimeSkippingConfig.Bound = &workflowpb.TimeSkippingConfig_MaxTargetTime{
281+
MaxTargetTime: mergeFrom.GetTimeSkippingConfig().GetMaxTargetTime(),
282+
}
283+
}
284+
233285
return mergeInto, nil
234286
}

service/history/api/updateworkflowoptions/api_test.go

Lines changed: 122 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ package updateworkflowoptions
33
import (
44
"context"
55
"testing"
6+
"time"
67

7-
"github.com/stretchr/testify/assert"
88
"github.com/stretchr/testify/require"
99
"github.com/stretchr/testify/suite"
1010
commonpb "go.temporal.io/api/common/v1"
@@ -28,7 +28,9 @@ import (
2828
wcache "go.temporal.io/server/service/history/workflow/cache"
2929
"go.uber.org/mock/gomock"
3030
"google.golang.org/protobuf/proto"
31+
"google.golang.org/protobuf/types/known/durationpb"
3132
"google.golang.org/protobuf/types/known/fieldmaskpb"
33+
"google.golang.org/protobuf/types/known/timestamppb"
3234
)
3335

3436
type noopVersionMembershipCache struct{}
@@ -94,28 +96,28 @@ func TestMergeOptions_VersionOverrideMask(t *testing.T) {
9496
if err != nil {
9597
t.Error(err)
9698
}
97-
assert.EqualExportedValues(t, unpinnedOverrideOptions, merged)
99+
require.EqualExportedValues(t, unpinnedOverrideOptions, merged)
98100

99101
// Merge pinned_A into unpinned options
100102
merged, err = mergeWorkflowExecutionOptions(input, pinnedOverrideOptionsA, updateMask)
101103
if err != nil {
102104
t.Error(err)
103105
}
104-
assert.EqualExportedValues(t, pinnedOverrideOptionsA, merged)
106+
require.EqualExportedValues(t, pinnedOverrideOptionsA, merged)
105107

106108
// Merge pinned_B into pinned_A options
107109
merged, err = mergeWorkflowExecutionOptions(input, pinnedOverrideOptionsB, updateMask)
108110
if err != nil {
109111
t.Error(err)
110112
}
111-
assert.EqualExportedValues(t, pinnedOverrideOptionsB, merged)
113+
require.EqualExportedValues(t, pinnedOverrideOptionsB, merged)
112114

113115
// Unset versioning override
114116
merged, err = mergeWorkflowExecutionOptions(input, emptyOptions, updateMask)
115117
if err != nil {
116118
t.Error(err)
117119
}
118-
assert.EqualExportedValues(t, emptyOptions, merged)
120+
require.EqualExportedValues(t, emptyOptions, merged)
119121
}
120122

121123
func TestMergeOptions_PartialMask(t *testing.T) {
@@ -124,14 +126,14 @@ func TestMergeOptions_PartialMask(t *testing.T) {
124126
deploymentOnlyUpdateMask := &fieldmaskpb.FieldMask{Paths: []string{"versioning_override.deployment"}}
125127

126128
_, err := mergeWorkflowExecutionOptions(emptyOptions, unpinnedOverrideOptions, behaviorOnlyUpdateMask)
127-
assert.Error(t, err)
129+
require.Error(t, err)
128130

129131
_, err = mergeWorkflowExecutionOptions(emptyOptions, unpinnedOverrideOptions, deploymentOnlyUpdateMask)
130-
assert.Error(t, err)
132+
require.Error(t, err)
131133

132134
merged, err := mergeWorkflowExecutionOptions(emptyOptions, unpinnedOverrideOptions, bothUpdateMask)
133-
assert.NoError(t, err)
134-
assert.EqualExportedValues(t, unpinnedOverrideOptions, merged)
135+
require.NoError(t, err)
136+
require.EqualExportedValues(t, unpinnedOverrideOptions, merged)
135137
}
136138

137139
func TestMergeOptions_EmptyMask(t *testing.T) {
@@ -140,25 +142,25 @@ func TestMergeOptions_EmptyMask(t *testing.T) {
140142

141143
// Don't merge anything
142144
merged, err := mergeWorkflowExecutionOptions(input, pinnedOverrideOptionsA, emptyUpdateMask)
143-
assert.NoError(t, err)
144-
assert.EqualExportedValues(t, input, merged)
145+
require.NoError(t, err)
146+
require.EqualExportedValues(t, input, merged)
145147

146148
// Don't merge anything
147149
merged, err = mergeWorkflowExecutionOptions(input, nil, emptyUpdateMask)
148-
assert.NoError(t, err)
149-
assert.EqualExportedValues(t, input, merged)
150+
require.NoError(t, err)
151+
require.EqualExportedValues(t, input, merged)
150152
}
151153

152154
func TestMergeOptions_AsteriskMask(t *testing.T) {
153155
asteriskUpdateMask := &fieldmaskpb.FieldMask{Paths: []string{"*"}}
154156
_, err := mergeWorkflowExecutionOptions(emptyOptions, unpinnedOverrideOptions, asteriskUpdateMask)
155-
assert.Error(t, err)
157+
require.Error(t, err)
156158
}
157159

158160
func TestMergeOptions_FooMask(t *testing.T) {
159161
fooUpdateMask := &fieldmaskpb.FieldMask{Paths: []string{"foo"}}
160162
_, err := mergeWorkflowExecutionOptions(emptyOptions, unpinnedOverrideOptions, fooUpdateMask)
161-
assert.Error(t, err)
163+
require.Error(t, err)
162164
}
163165

164166
type (
@@ -283,3 +285,108 @@ func (s *updateWorkflowOptionsSuite) TestInvoke_Success() {
283285
s.NotNil(resp)
284286
proto.Equal(expectedOverrideOptions, resp.GetWorkflowExecutionOptions())
285287
}
288+
289+
func TestMergeAndApply_TimeSkippingConfig(t *testing.T) {
290+
oneHour := durationpb.New(time.Hour)
291+
twoHours := durationpb.New(2 * time.Hour)
292+
thirtyMin := durationpb.New(30 * time.Minute)
293+
targetTime := timestamppb.New(time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC))
294+
295+
testCases := []struct {
296+
name string
297+
initialConfig *workflowpb.TimeSkippingConfig
298+
updateOptions *workflowpb.WorkflowExecutionOptions
299+
updateMask *fieldmaskpb.FieldMask
300+
expectedConfig *workflowpb.TimeSkippingConfig
301+
}{
302+
{
303+
name: "update max_skipped_duration preserves enabled and disable_propagation",
304+
initialConfig: &workflowpb.TimeSkippingConfig{
305+
Enabled: true,
306+
DisablePropagation: true,
307+
Bound: &workflowpb.TimeSkippingConfig_MaxSkippedDuration{
308+
MaxSkippedDuration: oneHour,
309+
},
310+
},
311+
updateOptions: &workflowpb.WorkflowExecutionOptions{
312+
TimeSkippingConfig: &workflowpb.TimeSkippingConfig{
313+
Bound: &workflowpb.TimeSkippingConfig_MaxSkippedDuration{
314+
MaxSkippedDuration: twoHours,
315+
},
316+
},
317+
},
318+
updateMask: &fieldmaskpb.FieldMask{Paths: []string{"time_skipping_config.max_skipped_duration"}},
319+
expectedConfig: &workflowpb.TimeSkippingConfig{
320+
Enabled: true,
321+
DisablePropagation: true,
322+
Bound: &workflowpb.TimeSkippingConfig_MaxSkippedDuration{
323+
MaxSkippedDuration: twoHours,
324+
},
325+
},
326+
},
327+
{
328+
name: "change bound type to max_elapsed_duration preserves enabled",
329+
initialConfig: &workflowpb.TimeSkippingConfig{
330+
Enabled: true,
331+
Bound: &workflowpb.TimeSkippingConfig_MaxSkippedDuration{
332+
MaxSkippedDuration: oneHour,
333+
},
334+
},
335+
updateOptions: &workflowpb.WorkflowExecutionOptions{
336+
TimeSkippingConfig: &workflowpb.TimeSkippingConfig{
337+
Bound: &workflowpb.TimeSkippingConfig_MaxElapsedDuration{
338+
MaxElapsedDuration: thirtyMin,
339+
},
340+
},
341+
},
342+
updateMask: &fieldmaskpb.FieldMask{Paths: []string{"time_skipping_config.max_elapsed_duration"}},
343+
expectedConfig: &workflowpb.TimeSkippingConfig{
344+
Enabled: true,
345+
Bound: &workflowpb.TimeSkippingConfig_MaxElapsedDuration{
346+
MaxElapsedDuration: thirtyMin,
347+
},
348+
},
349+
},
350+
{
351+
name: "change bound type to max_target_time preserves enabled",
352+
initialConfig: &workflowpb.TimeSkippingConfig{
353+
Enabled: true,
354+
Bound: &workflowpb.TimeSkippingConfig_MaxSkippedDuration{
355+
MaxSkippedDuration: oneHour,
356+
},
357+
},
358+
updateOptions: &workflowpb.WorkflowExecutionOptions{
359+
TimeSkippingConfig: &workflowpb.TimeSkippingConfig{
360+
Bound: &workflowpb.TimeSkippingConfig_MaxTargetTime{
361+
MaxTargetTime: targetTime,
362+
},
363+
},
364+
},
365+
updateMask: &fieldmaskpb.FieldMask{Paths: []string{"time_skipping_config.max_target_time"}},
366+
expectedConfig: &workflowpb.TimeSkippingConfig{
367+
Enabled: true,
368+
Bound: &workflowpb.TimeSkippingConfig_MaxTargetTime{
369+
MaxTargetTime: targetTime,
370+
},
371+
},
372+
},
373+
}
374+
375+
for _, tc := range testCases {
376+
t.Run(tc.name, func(t *testing.T) {
377+
ctrl := gomock.NewController(t)
378+
ms := historyi.NewMockMutableState(ctrl)
379+
ms.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
380+
TimeSkippingInfo: &persistencespb.TimeSkippingInfo{
381+
Config: tc.initialConfig,
382+
},
383+
}).AnyTimes()
384+
ms.EXPECT().AddWorkflowExecutionOptionsUpdatedEvent(nil, true, "", nil, nil, "", nil).Return(&historypb.HistoryEvent{}, nil)
385+
386+
result, hasChanges, err := MergeAndApply(ms, tc.updateOptions, tc.updateMask, "")
387+
require.NoError(t, err)
388+
require.True(t, hasChanges)
389+
require.True(t, proto.Equal(tc.expectedConfig, result.GetTimeSkippingConfig()))
390+
})
391+
}
392+
}

0 commit comments

Comments
 (0)