time-skipping runtime foundation#9965
Conversation
4ae2cfe to
c2a602f
Compare
| @@ -7110,6 +7232,8 @@ func (ms *MutableStateImpl) closeTransaction( | |||
| return closeTransactionResult{}, err | |||
| } | |||
|
|
|||
There was a problem hiding this comment.
@yux0 it seems both active and passive cluster may both call closeTransaction, is it correct that if we want to add mutations of ms and generation of timer tasks in closeTransaction, we need to filter the policy for only active cluster
There was a problem hiding this comment.
Correct, the passive will handle this by seeing the event in the history that the active added.
|
|
||
| // time-skipping related methods | ||
| AddWorkflowExecutionTimeSkippingTransitionedEvent(ctx context.Context) (*historypb.HistoryEvent, error) | ||
| ApplyWorkflowExecutionTimeSkippingTransitionedEvent(ctx context.Context, event *historypb.HistoryEvent) error |
There was a problem hiding this comment.
@yux0 is it correct that we still need to add ApplyXXXEvent to support event-based replication?
There was a problem hiding this comment.
why do we need to support event-based replication for this feature?
There was a problem hiding this comment.
emm ~ Not sure if this is a question for me. Do we have standards to exclude features out of event-base replication or other replication method? I am assuming for completeness I need to support this so that replication center will catchup.
| } | ||
| return true | ||
| } | ||
|
|
There was a problem hiding this comment.
I haven't added this RegenerateTimerTasksForTimeSkipping in Refresh/PartialRefresh, and my understanding that the passive cluster will need it to generate timerTasks? @yux0
There was a problem hiding this comment.
RegenerateTimerTasksForTimeSkipping would not be safe for use in partialRefresh as it adds new tasks rather than the idempotent adjustment that refreshTasksForTimer does. I think you would need to adjust refreshTaskForTimer by adding skipping to timer sequence.
There was a problem hiding this comment.
oh I see. I think we may consider several options
(option-1) I can add a timeSkipped field in TimerInfo to obtain idempotentcy
(option-2) refreshTasksForTimer -> I need to judge if I need to regenerate beased on if WorkflowExecutionInfo.TimeSkippingInfo changed
(option-3) in semantics (not in performance speaking), this method is idempotent calling twice will generate another timerTask with the same content -> is this acceptable?
I will leave a todo here and arrange a meeting when I add the replication feature.
| targetTime time.Time, | ||
| triggeredDisable bool, | ||
| ) *historypb.HistoryEvent { | ||
| event := b.createHistoryEvent(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIME_SKIPPING_TRANSITIONED, b.timeSource.Now()) |
There was a problem hiding this comment.
This b.timeSource.Now() can be called above and share in this line and 1092.
There was a problem hiding this comment.
I guess you wanted to say the VirtualTime()? (this method will be deleted)
|
|
||
| message TimeSkippingInfo { | ||
| // The config for the time skipping for the workflow. | ||
| // Current tconfiguration for the workflow. |
| skippedDuration := attr.GetTargetTime().AsTime().Sub(event.GetEventTime().AsTime()) | ||
| disabledAfterBound := attr.GetDisabledAfterBound() | ||
|
|
||
| if ms.executionInfo.TimeSkippingInfo.AccumulatedSkippedDuration == nil { |
There was a problem hiding this comment.
Better to check if TimeSkippingInfo is nil before accessing the member.
There was a problem hiding this comment.
added an error check
| nextUserTimerInfo = timerInfo | ||
| } | ||
| } | ||
| if !disabledAfterBound && nextUserTimerInfo == nil { |
There was a problem hiding this comment.
if disabledAfterBound is true and nextUserTimerInfo is nil, this check will not return an error. If that happens line 4027 below will panic.
I see that disabledAfterBound is always false now. But that could change when you fix the todo right?
|
|
||
| func (ms *MutableStateImpl) GetTimeSkippingVirtualTime() time.Time { | ||
| // TODO@time-skipping: need to use this to adjust all timestamps for tasks sent to SDK worker. | ||
| offset := ms.GetExecutionInfo().TimeSkippingInfo.AccumulatedSkippedDuration |
There was a problem hiding this comment.
Nil check for TimeSkippingInfo?
There was a problem hiding this comment.
method to be deleted.
There was a problem hiding this comment.
nit: This comment "this function must be the last call" could cause confusion later. Maybe clarify that this must be the last call before generating time skipping tasks.
88762a2 to
c2a602f
Compare
| enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT, | ||
| enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED, | ||
| enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW, | ||
| enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIME_SKIPPING_TRANSITIONED, |
There was a problem hiding this comment.
don't think it should be grouped under workflow state changes
| } | ||
|
|
||
| func (ms *MutableStateImpl) GetTimeSkippingVirtualTime() time.Time { | ||
| // TODO@time-skipping: need to use this to adjust all timestamps for tasks sent to SDK worker. |
There was a problem hiding this comment.
My understanding was that we will use a single TimeSource implementation and all events would use virtual timestamps, so no adjustment would be needed. Is that still the plan?
There was a problem hiding this comment.
you are right. let me change back the design and try to figure out how to make that simple. I will need to put that in another pr as this pr is growing large.
c2a602f to
0a9224f
Compare
| timerSequence := t.getTimerSequence(mutableState) | ||
| referenceTime := t.Now() | ||
| if tsInfo := mutableState.GetExecutionInfo().GetTimeSkippingInfo(); tsInfo != nil && tsInfo.GetAccumulatedSkippedDuration() != nil { | ||
| referenceTime = mutableState.GetTimeSkippingVirtualTime() |
There was a problem hiding this comment.
do we need this for other timer tasks?
I understand for example there's no running activity at time skipping time, but there could be after time is skipped and we need to use the workflow time?
There was a problem hiding this comment.
this is bad design, removed
| // pendingTimerInfoIDs in mutable state is only timers that need regenerated | ||
| // and the new timerTasks should not change the original pendingTimerInfoIDs in mutable state | ||
| for _, timerInfo := range r.mutableState.GetPendingTimerInfos() { | ||
| visibilityTimestamp := timerInfo.ExpiryTime.AsTime().Add(-accumulatedSkippedDuration) |
There was a problem hiding this comment.
maybe I confused myself. I think we need to adjust timestamp for all timer tasks generated if time for a workflow is ever skipped?
| } | ||
| } | ||
|
|
||
| func (ms *MutableStateImpl) closeTransactionHandlerTimeSkipping( |
There was a problem hiding this comment.
| func (ms *MutableStateImpl) closeTransactionHandlerTimeSkipping( | |
| func (ms *MutableStateImpl) closeTransactionHandleTimeSkipping( |
17add5c to
5e70e98
Compare
5e70e98 to
5a229af
Compare
What changed?
taskGenerator:mutableState:IsStateDirty: captures changes of time-skipping related fieldscloseTransaction: add closeTransactionHandlerTimeSkippingWhy?
add the foundational mechanism of how time skipping works in the runtime of a workflow execution
for reviewers:
this is a intergral foundation of t-s runtime, so it works without any granular and extended features as bound, replication, external-transfer tasks, etc. Over 50% of lines are tests/mocks/pb-gen. can focus on code first, then functional tests, and last ut.
How did you test it?