Skip to content

Commit

Permalink
Fix ErrStateMachineNotFound handling in HSM state replication (#7032)
Browse files Browse the repository at this point in the history
## What changed?
- Modified `HSMStateReplicatorImpl.syncHSMNode()` to handle
`ErrStateMachineNotFound` gracefully
- Added debug logging with correct field reference to
`OriginalExecutionRunId`
- Added unit test `TestSyncHSM_StateMachineNotFound` to verify behavior

## Why?
After adding support for state deletion in terminal states in Nexus,
nightly tests started failing when sync HSM tasks tried to replicate
state machines that had been legitimately deleted. Since the deletion is
intentional for terminal states, we should gracefully handle these cases
by logging and continuing replication of other state machines.

## How did you test it?
- Added unit test verifying graceful handling of
`ErrStateMachineNotFound`
- Existing nightly test failures should be resolved by this change

## Potential risks
- If there are cases where a state machine is temporarily unavailable
(rather than legitimately deleted), we might incorrectly continue
processing
- However, based on the HSM implementation, state machines are either
present in persistence or not - there is no transient state
- Suppressing `ErrStateMachineNotFound` could potentially mask other
issues if the error occurs for unexpected reasons

## Documentation
No documentation changes required as this is an internal implementation
detail handling error cases in the replication path.

## Is hotfix candidate?
No - while this fixes test failures, it's not causing production issues
that would warrant a hotfix.

---------

Co-authored-by: Roey Berman <[email protected]>
  • Loading branch information
justinp-tt and bergundy authored Jan 24, 2025
1 parent 087f4df commit 536b556
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 4 deletions.
39 changes: 35 additions & 4 deletions service/history/ndc/hsm_state_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,41 @@ func (r *HSMStateReplicatorImpl) syncHSMNode(
incomingNodePath := incomingNode.Path()
currentNode, err := currentHSM.Child(incomingNodePath)
if err != nil {
// 1. Already done history resend if needed before,
// and node creation today always associated with an event
// 2. Node deletion is not supported right now.
// Based on 1 and 2, node should always be found here.
// The node may not be found if:
// State machine was deleted in terminal state.
// Both state machine creation and deletion are always associated with an event, so any missing state
// machine must have a corresponding event in history.
if errors.Is(err, hsm.ErrStateMachineNotFound) {
notFoundErr := err
// Get the last items from both version histories
currentVersionHistory, err := versionhistory.GetCurrentVersionHistory(
mutableState.GetExecutionInfo().GetVersionHistories(),
)
if err != nil {
return err
}
lastLocalItem, err := versionhistory.GetLastVersionHistoryItem(currentVersionHistory)
if err != nil {
return err
}
lastIncomingItem, err := versionhistory.GetLastVersionHistoryItem(request.EventVersionHistory)
if err != nil {
return err
}

// Only accept "not found" if our version history is ahead
if versionhistory.CompareVersionHistoryItem(lastLocalItem, lastIncomingItem) > 0 {
r.logger.Debug("State machine not found - likely deleted in terminal state",
tag.WorkflowNamespaceID(mutableState.GetExecutionInfo().NamespaceId),
tag.WorkflowID(mutableState.GetExecutionInfo().WorkflowId),
tag.WorkflowRunID(mutableState.GetExecutionInfo().OriginalExecutionRunId),
)
return nil
}

// Otherwise, we might be missing events
return notFoundErr
}
return err
}

Expand Down
88 changes: 88 additions & 0 deletions service/history/ndc/hsm_state_replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package ndc

import (
"context"
"errors"
"testing"

"github.com/pborman/uuid"
Expand Down Expand Up @@ -702,6 +703,93 @@ func (s *hsmStateReplicatorSuite) TestSyncHSM_IncomingStateNewer_WorkflowClosed(
s.NoError(err)
}

func (s *hsmStateReplicatorSuite) TestSyncHSM_StateMachineNotFound() {
const (
deletedMachineID = "child1"
initialCount = 50
)

baseVersion := s.namespaceEntry.FailoverVersion()
persistedState := s.buildWorkflowMutableState()

// Remove the state machine to simulate deletion
delete(persistedState.ExecutionInfo.SubStateMachinesByType[s.stateMachineDef.Type()].MachinesById, deletedMachineID)

s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), &persistence.GetWorkflowExecutionRequest{
ShardID: s.mockShard.GetShardID(),
NamespaceID: s.workflowKey.NamespaceID,
WorkflowID: s.workflowKey.WorkflowID,
RunID: s.workflowKey.RunID,
}).Return(&persistence.GetWorkflowExecutionResponse{
State: persistedState,
DBRecordVersion: 777,
}, nil).AnyTimes()

testCases := []struct {
name string
versionHistory *historyspb.VersionHistory
expectError bool
}{
{
name: "local version higher - ignore missing state machine",
versionHistory: &historyspb.VersionHistory{
Items: []*historyspb.VersionHistoryItem{
{EventId: 50, Version: baseVersion - 100},
{EventId: 102, Version: baseVersion - 50},
},
},
expectError: false,
},
{
name: "incoming version higher - return notFoundErr",
versionHistory: &historyspb.VersionHistory{
Items: []*historyspb.VersionHistoryItem{
{EventId: 50, Version: baseVersion - 100},
{EventId: 102, Version: baseVersion},
},
},
expectError: true,
},
}

for _, tc := range testCases {
tc := tc
s.Run(tc.name, func() {
lastVersion := tc.versionHistory.Items[len(tc.versionHistory.Items)-1].Version

err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &shard.SyncHSMRequest{
WorkflowKey: s.workflowKey,
EventVersionHistory: tc.versionHistory,
StateMachineNode: &persistencespb.StateMachineNode{
Children: map[string]*persistencespb.StateMachineMap{
s.stateMachineDef.Type(): {
MachinesById: map[string]*persistencespb.StateMachineNode{
deletedMachineID: {
Data: []byte(hsmtest.State3),
InitialVersionedTransition: &persistencespb.VersionedTransition{
NamespaceFailoverVersion: lastVersion,
},
LastUpdateVersionedTransition: &persistencespb.VersionedTransition{
NamespaceFailoverVersion: lastVersion,
},
TransitionCount: initialCount,
},
},
},
},
},
})

if tc.expectError {
s.Error(err)
s.True(errors.Is(err, hsm.ErrStateMachineNotFound), "expected ErrStateMachineNotFound error")
} else {
s.NoError(err)
}
})
}
}

func (s *hsmStateReplicatorSuite) buildWorkflowMutableState() *persistencespb.WorkflowMutableState {

info := &persistencespb.WorkflowExecutionInfo{
Expand Down

0 comments on commit 536b556

Please sign in to comment.