Skip to content

Commit

Permalink
Do not send backfill task when event is empty (#7130)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
Do not send backfill task when event is empty
## Why?
<!-- Tell your future self why have you made these changes -->
Backfill tasks should always have associated events. If there is no
event(i.e. for a state only transition), then no backfill to perform and
we should skip the task
## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
n/a
## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
n/a
## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->
n/a
## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
no
  • Loading branch information
xwduan authored Jan 22, 2025
1 parent 767cc29 commit b8b3cda
Show file tree
Hide file tree
Showing 7 changed files with 1,189 additions and 1,160 deletions.
2,253 changes: 1,135 additions & 1,118 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

42 changes: 22 additions & 20 deletions common/persistence/serialization/task_serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1265,18 +1265,19 @@ func (s *TaskSerializer) replicationSyncVersionedTransitionTaskToProto(
}

return &persistencespb.ReplicationTaskInfo{
NamespaceId: syncVersionedTransitionTask.WorkflowKey.NamespaceID,
WorkflowId: syncVersionedTransitionTask.WorkflowKey.WorkflowID,
RunId: syncVersionedTransitionTask.WorkflowKey.RunID,
TaskType: enumsspb.TASK_TYPE_REPLICATION_SYNC_VERSIONED_TRANSITION,
TaskId: syncVersionedTransitionTask.TaskID,
VisibilityTime: timestamppb.New(syncVersionedTransitionTask.VisibilityTimestamp),
VersionedTransition: syncVersionedTransitionTask.VersionedTransition,
FirstEventId: syncVersionedTransitionTask.FirstEventID,
Version: syncVersionedTransitionTask.FirstEventVersion,
NextEventId: syncVersionedTransitionTask.NextEventID,
NewRunId: syncVersionedTransitionTask.NewRunID,
TaskEquivalents: taskInfoEquivalents,
NamespaceId: syncVersionedTransitionTask.WorkflowKey.NamespaceID,
WorkflowId: syncVersionedTransitionTask.WorkflowKey.WorkflowID,
RunId: syncVersionedTransitionTask.WorkflowKey.RunID,
TaskType: enumsspb.TASK_TYPE_REPLICATION_SYNC_VERSIONED_TRANSITION,
TaskId: syncVersionedTransitionTask.TaskID,
VisibilityTime: timestamppb.New(syncVersionedTransitionTask.VisibilityTimestamp),
VersionedTransition: syncVersionedTransitionTask.VersionedTransition,
FirstEventId: syncVersionedTransitionTask.FirstEventID,
Version: syncVersionedTransitionTask.FirstEventVersion,
NextEventId: syncVersionedTransitionTask.NextEventID,
NewRunId: syncVersionedTransitionTask.NewRunID,
LastVersionHistoryItem: syncVersionedTransitionTask.LastVersionHistoryItem,
TaskEquivalents: taskInfoEquivalents,
}, nil
}

Expand All @@ -1303,14 +1304,15 @@ func (s *TaskSerializer) replicationSyncVersionedTransitionTaskFromProto(
syncVersionedTransitionTask.WorkflowId,
syncVersionedTransitionTask.RunId,
),
VisibilityTimestamp: visibilityTimestamp,
TaskID: syncVersionedTransitionTask.TaskId,
FirstEventID: syncVersionedTransitionTask.FirstEventId,
FirstEventVersion: syncVersionedTransitionTask.Version,
NextEventID: syncVersionedTransitionTask.NextEventId,
NewRunID: syncVersionedTransitionTask.NewRunId,
VersionedTransition: syncVersionedTransitionTask.VersionedTransition,
TaskEquivalents: taskEquivalents,
VisibilityTimestamp: visibilityTimestamp,
TaskID: syncVersionedTransitionTask.TaskId,
FirstEventID: syncVersionedTransitionTask.FirstEventId,
FirstEventVersion: syncVersionedTransitionTask.Version,
NextEventID: syncVersionedTransitionTask.NextEventId,
NewRunID: syncVersionedTransitionTask.NewRunId,
VersionedTransition: syncVersionedTransitionTask.VersionedTransition,
LastVersionHistoryItem: syncVersionedTransitionTask.LastVersionHistoryItem,
TaskEquivalents: taskEquivalents,
}, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ message ReplicationTaskInfo {
// TODO: Remove this field when state-based replication is stable and
// doesn't need to be disabled.
repeated ReplicationTaskInfo task_equivalents = 20;
history.v1.VersionHistoryItem last_version_history_item = 21;
}

// visibility_task_data column
Expand Down
6 changes: 3 additions & 3 deletions service/history/ndc/workflow_state_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (r *WorkflowStateReplicatorImpl) ReplicateVersionedTransition(
// TODO: Revisit these logic when working on roll out/back plan
if snapshot == nil {
return serviceerrors.NewSyncState(
"failed to apply mutation due to missing mutable state",
"failed to apply versioned transition due to missing snapshot",
namespaceID.String(),
wid,
rid,
Expand Down Expand Up @@ -350,7 +350,7 @@ func (r *WorkflowStateReplicatorImpl) applyMutation(
}
if localMutableState == nil {
return serviceerrors.NewSyncState(
"failed to apply mutation due to missing mutable state",
"failed to apply mutation due to missing local mutable state",
namespaceID.String(),
workflowID,
runID,
Expand Down Expand Up @@ -438,7 +438,7 @@ func (r *WorkflowStateReplicatorImpl) applySnapshot(
versionHistories = localMutableState.GetExecutionInfo().VersionHistories
}
return serviceerrors.NewSyncState(
"failed to apply mutation due to missing mutable state",
"failed to apply mutation due to missing task snapshot",
namespaceID.String(),
workflowID,
runID,
Expand Down
10 changes: 7 additions & 3 deletions service/history/replication/raw_task_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,12 +730,16 @@ func (c *syncVersionedTransitionTaskConverter) generateVerifyVersionedTransition
if err != nil {
return nil, err
}
lastEventVersion, err := versionhistory.GetVersionHistoryEventVersion(currentHistory, taskInfo.NextEventID-1)
var nextEventId = taskInfo.NextEventID
if nextEventId == common.EmptyEventID {
nextEventId = taskInfo.LastVersionHistoryItem.GetEventId() + 1
}
lastEventVersion, err := versionhistory.GetVersionHistoryEventVersion(currentHistory, nextEventId-1)
if err != nil {
return nil, err
}
capItems, err := versionhistory.CopyVersionHistoryUntilLCAVersionHistoryItem(currentHistory, &historyspb.VersionHistoryItem{
EventId: taskInfo.NextEventID - 1,
EventId: nextEventId - 1,
Version: lastEventVersion,
})
if err != nil {
Expand All @@ -751,7 +755,7 @@ func (c *syncVersionedTransitionTaskConverter) generateVerifyVersionedTransition
RunId: taskInfo.RunID,
NewRunId: taskInfo.NewRunID,
EventVersionHistory: capItems.Items,
NextEventId: taskInfo.NextEventID,
NextEventId: nextEventId,
},
},
VersionedTransition: taskInfo.VersionedTransition,
Expand Down
12 changes: 7 additions & 5 deletions service/history/tasks/sync_versioned_transition_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

enumsspb "go.temporal.io/server/api/enums/v1"
historyspb "go.temporal.io/server/api/history/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/definition"
)
Expand All @@ -40,11 +41,12 @@ type (
TaskID int64
Priority enumsspb.TaskPriority

VersionedTransition *persistencespb.VersionedTransition
FirstEventVersion int64
FirstEventID int64
NextEventID int64
NewRunID string
VersionedTransition *persistencespb.VersionedTransition
FirstEventVersion int64
FirstEventID int64 // First event ID of version transition
NextEventID int64 // Next event ID after version transition
LastVersionHistoryItem *historyspb.VersionHistoryItem // Last version history item of version transition when version transition does not have associated events
NewRunID string

TaskEquivalents []Task
}
Expand Down
25 changes: 14 additions & 11 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -6096,6 +6096,7 @@ func (ms *MutableStateImpl) closeTransactionPrepareReplicationTasks(
ms.executionState.RunId,
)
var firstEventID, firstEventVersion, nextEventID int64
var lastVersionHistoryItem *historyspb.VersionHistoryItem
if len(eventBatches) > 0 {
firstEventID = eventBatches[0][0].EventId
firstEventVersion = eventBatches[0][0].Version
Expand All @@ -6110,24 +6111,26 @@ func (ms *MutableStateImpl) closeTransactionPrepareReplicationTasks(
if err != nil {
return err
}
firstEventID = item.EventId
firstEventVersion = item.Version
nextEventID = item.EventId + 1
firstEventID = common.EmptyEventID
firstEventVersion = common.EmptyVersion
nextEventID = common.EmptyEventID
lastVersionHistoryItem = versionhistory.CopyVersionHistoryItem(item)
}
transitionHistory := ms.executionInfo.TransitionHistory
if len(transitionHistory) > 0 && CompareVersionedTransition(
ms.versionedTransitionInDB,
ms.executionInfo.TransitionHistory[len(ms.executionInfo.TransitionHistory)-1],
) != 0 {
syncVersionedTransitionTask := &tasks.SyncVersionedTransitionTask{
WorkflowKey: workflowKey,
VisibilityTimestamp: now,
Priority: enumsspb.TASK_PRIORITY_HIGH,
VersionedTransition: transitionHistory[len(transitionHistory)-1],
FirstEventID: firstEventID,
FirstEventVersion: firstEventVersion,
NextEventID: nextEventID,
TaskEquivalents: replicationTasks,
WorkflowKey: workflowKey,
VisibilityTimestamp: now,
Priority: enumsspb.TASK_PRIORITY_HIGH,
VersionedTransition: transitionHistory[len(transitionHistory)-1],
FirstEventID: firstEventID,
FirstEventVersion: firstEventVersion,
NextEventID: nextEventID,
TaskEquivalents: replicationTasks,
LastVersionHistoryItem: lastVersionHistoryItem,
}

// versioned transition updated in the transaction
Expand Down

0 comments on commit b8b3cda

Please sign in to comment.