Skip to content

Commit

Permalink
[Reset] Add AllowChildReconnect flag in task generator (#7157)
Browse files Browse the repository at this point in the history
## What changed?
Added a `StartChildExecutionTask.AllowChildReconnect` bool flag to mark
the task at the time of reset. This flag is used to determine if we want
to attempt to reconnect the child or not (instead of just checking if
the workflow was reset)

## Why?
In `transferQueueActiveTaskExecutor.processStartChildExecution()` I was
checking the condition `mutableState.IsResetRun()` before attempting to
reconnect to children. The problem is `mutableState.IsResetRun()` is
true for the rest of the lifetime of the workflow. So if the workflow
starts another instance of the same child somewhere down the line, we
will reconnect to the previously completed instance of the child. So
added an explicit flag in StartChildExecutionTask to determine if we
should reconnect to the child or start a new instance.

## How did you test it?
Existing unit test + manual testing.

## Potential risks
N/A. The feature is gated behind a feature flag.

## Documentation
N/A

## Is hotfix candidate?
No
  • Loading branch information
gow authored Jan 25, 2025
1 parent 536b556 commit 8b5cd4d
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
4 changes: 3 additions & 1 deletion service/history/transfer_queue_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,9 @@ func (t *transferQueueActiveTaskExecutor) processStartChildExecution(
// Note: childStarted flag above is computed from the parent's history. When this is TRUE it's guaranteed that the child was succesfully started.
// But if it's FALSE then the child *may or maynot* be started (ex: we failed to record ChildExecutionStarted event previously.)
// Hence we need to check the child workflow ID and attempt to reconnect before proceeding to start a new instance of the child.
if mutableState.IsResetRun() {
// This path is usually taken when the parent is being reset and the reset point (i.e baseWorkflowInfo.LowestCommonAncestorEventId) is after the child was initiated.
baseWorkflowInfo := mutableState.GetBaseWorkflowInfo()
if mutableState.IsResetRun() && baseWorkflowInfo != nil && baseWorkflowInfo.LowestCommonAncestorEventId >= childInfo.InitiatedEventId {
childRunID, err := t.verifyChildWorkflow(ctx, mutableState, targetNamespaceEntry, attributes.WorkflowId)
if err != nil {
return err
Expand Down
8 changes: 5 additions & 3 deletions service/history/transfer_queue_active_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2066,7 +2066,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Re
s.Nil(err)
mutableState.GetExecutionInfo().OriginalExecutionRunId = originalExecutionRunID

event, _ := addStartChildWorkflowExecutionInitiatedEvent(
childInitEvent, _ := addStartChildWorkflowExecutionInitiatedEvent(
mutableState,
1111,
uuid.New(),
Expand All @@ -2081,6 +2081,8 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Re
1*time.Second,
enumspb.PARENT_CLOSE_POLICY_TERMINATE,
)
// Set the base workflow for the reset run and simulate a reset point that is after the childInitEvent.EventId
mutableState.SetBaseWorkflow("baseRunID", childInitEvent.EventId+1, 123)

taskID := s.mustGenerateTaskID()
transferTask := &tasks.StartChildExecutionTask{
Expand All @@ -2093,11 +2095,11 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Re
TargetNamespaceID: tests.ChildNamespaceID.String(),
TargetWorkflowID: childWorkflowID,
TaskID: taskID,
InitiatedEventID: event.GetEventId(),
InitiatedEventID: childInitEvent.GetEventId(),
VisibilityTimestamp: time.Now().UTC(),
}

persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion())
persistenceMutableState := s.createPersistenceMutableState(mutableState, childInitEvent.GetEventId(), childInitEvent.GetVersion())
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
// Assert that child workflow describe is called.
// The child describe returns a mock parent whose originalExecutionRunID points to the same as the current reset run's originalExecutionRunID
Expand Down

0 comments on commit 8b5cd4d

Please sign in to comment.