From 536b5565c3f76e7b26f61cdc1b0bd6a1eadfc08f Mon Sep 17 00:00:00 2001 From: justinp-tt <174377431+justinp-tt@users.noreply.github.com> Date: Fri, 24 Jan 2025 17:55:51 -0600 Subject: [PATCH] Fix ErrStateMachineNotFound handling in HSM state replication (#7032) ## What changed? - Modified `HSMStateReplicatorImpl.syncHSMNode()` to handle `ErrStateMachineNotFound` gracefully - Added debug logging with correct field reference to `OriginalExecutionRunId` - Added unit test `TestSyncHSM_StateMachineNotFound` to verify behavior ## Why? After adding support for state deletion in terminal states in Nexus, nightly tests started failing when sync HSM tasks tried to replicate state machines that had been legitimately deleted. Since the deletion is intentional for terminal states, we should gracefully handle these cases by logging and continuing replication of other state machines. ## How did you test it? - Added unit test verifying graceful handling of `ErrStateMachineNotFound` - Existing nightly test failures should be resolved by this change ## Potential risks - If there are cases where a state machine is temporarily unavailable (rather than legitimately deleted), we might incorrectly continue processing - However, based on the HSM implementation, state machines are either present in persistence or not - there is no transient state - Suppressing `ErrStateMachineNotFound` could potentially mask other issues if the error occurs for unexpected reasons ## Documentation No documentation changes required as this is an internal implementation detail handling error cases in the replication path. ## Is hotfix candidate? No - while this fixes test failures, it's not causing production issues that would warrant a hotfix. --------- Co-authored-by: Roey Berman --- service/history/ndc/hsm_state_replicator.go | 39 +++++++- .../history/ndc/hsm_state_replicator_test.go | 88 +++++++++++++++++++ 2 files changed, 123 insertions(+), 4 deletions(-) diff --git a/service/history/ndc/hsm_state_replicator.go b/service/history/ndc/hsm_state_replicator.go index 722a9c2ce9d..e7da5b95443 100644 --- a/service/history/ndc/hsm_state_replicator.go +++ b/service/history/ndc/hsm_state_replicator.go @@ -183,10 +183,41 @@ func (r *HSMStateReplicatorImpl) syncHSMNode( incomingNodePath := incomingNode.Path() currentNode, err := currentHSM.Child(incomingNodePath) if err != nil { - // 1. Already done history resend if needed before, - // and node creation today always associated with an event - // 2. Node deletion is not supported right now. - // Based on 1 and 2, node should always be found here. + // The node may not be found if: + // State machine was deleted in terminal state. + // Both state machine creation and deletion are always associated with an event, so any missing state + // machine must have a corresponding event in history. + if errors.Is(err, hsm.ErrStateMachineNotFound) { + notFoundErr := err + // Get the last items from both version histories + currentVersionHistory, err := versionhistory.GetCurrentVersionHistory( + mutableState.GetExecutionInfo().GetVersionHistories(), + ) + if err != nil { + return err + } + lastLocalItem, err := versionhistory.GetLastVersionHistoryItem(currentVersionHistory) + if err != nil { + return err + } + lastIncomingItem, err := versionhistory.GetLastVersionHistoryItem(request.EventVersionHistory) + if err != nil { + return err + } + + // Only accept "not found" if our version history is ahead + if versionhistory.CompareVersionHistoryItem(lastLocalItem, lastIncomingItem) > 0 { + r.logger.Debug("State machine not found - likely deleted in terminal state", + tag.WorkflowNamespaceID(mutableState.GetExecutionInfo().NamespaceId), + tag.WorkflowID(mutableState.GetExecutionInfo().WorkflowId), + tag.WorkflowRunID(mutableState.GetExecutionInfo().OriginalExecutionRunId), + ) + return nil + } + + // Otherwise, we might be missing events + return notFoundErr + } return err } diff --git a/service/history/ndc/hsm_state_replicator_test.go b/service/history/ndc/hsm_state_replicator_test.go index c623627c448..a126d8d05ac 100644 --- a/service/history/ndc/hsm_state_replicator_test.go +++ b/service/history/ndc/hsm_state_replicator_test.go @@ -24,6 +24,7 @@ package ndc import ( "context" + "errors" "testing" "github.com/pborman/uuid" @@ -702,6 +703,93 @@ func (s *hsmStateReplicatorSuite) TestSyncHSM_IncomingStateNewer_WorkflowClosed( s.NoError(err) } +func (s *hsmStateReplicatorSuite) TestSyncHSM_StateMachineNotFound() { + const ( + deletedMachineID = "child1" + initialCount = 50 + ) + + baseVersion := s.namespaceEntry.FailoverVersion() + persistedState := s.buildWorkflowMutableState() + + // Remove the state machine to simulate deletion + delete(persistedState.ExecutionInfo.SubStateMachinesByType[s.stateMachineDef.Type()].MachinesById, deletedMachineID) + + s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), &persistence.GetWorkflowExecutionRequest{ + ShardID: s.mockShard.GetShardID(), + NamespaceID: s.workflowKey.NamespaceID, + WorkflowID: s.workflowKey.WorkflowID, + RunID: s.workflowKey.RunID, + }).Return(&persistence.GetWorkflowExecutionResponse{ + State: persistedState, + DBRecordVersion: 777, + }, nil).AnyTimes() + + testCases := []struct { + name string + versionHistory *historyspb.VersionHistory + expectError bool + }{ + { + name: "local version higher - ignore missing state machine", + versionHistory: &historyspb.VersionHistory{ + Items: []*historyspb.VersionHistoryItem{ + {EventId: 50, Version: baseVersion - 100}, + {EventId: 102, Version: baseVersion - 50}, + }, + }, + expectError: false, + }, + { + name: "incoming version higher - return notFoundErr", + versionHistory: &historyspb.VersionHistory{ + Items: []*historyspb.VersionHistoryItem{ + {EventId: 50, Version: baseVersion - 100}, + {EventId: 102, Version: baseVersion}, + }, + }, + expectError: true, + }, + } + + for _, tc := range testCases { + tc := tc + s.Run(tc.name, func() { + lastVersion := tc.versionHistory.Items[len(tc.versionHistory.Items)-1].Version + + err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &shard.SyncHSMRequest{ + WorkflowKey: s.workflowKey, + EventVersionHistory: tc.versionHistory, + StateMachineNode: &persistencespb.StateMachineNode{ + Children: map[string]*persistencespb.StateMachineMap{ + s.stateMachineDef.Type(): { + MachinesById: map[string]*persistencespb.StateMachineNode{ + deletedMachineID: { + Data: []byte(hsmtest.State3), + InitialVersionedTransition: &persistencespb.VersionedTransition{ + NamespaceFailoverVersion: lastVersion, + }, + LastUpdateVersionedTransition: &persistencespb.VersionedTransition{ + NamespaceFailoverVersion: lastVersion, + }, + TransitionCount: initialCount, + }, + }, + }, + }, + }, + }) + + if tc.expectError { + s.Error(err) + s.True(errors.Is(err, hsm.ErrStateMachineNotFound), "expected ErrStateMachineNotFound error") + } else { + s.NoError(err) + } + }) + } +} + func (s *hsmStateReplicatorSuite) buildWorkflowMutableState() *persistencespb.WorkflowMutableState { info := &persistencespb.WorkflowExecutionInfo{