diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index b3776fd2fc..16a5e37892 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -521,6 +521,7 @@ func CDCFlowWorkflow( addCdcPropertiesSignalListener(ctx, logger, mainLoopSelector, state) state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING + maxSyncPerCDCFlow := int(getMaxSyncsPerCDCFlow(ctx, logger)) for { mainLoopSelector.Select(ctx) for ctx.Err() == nil && mainLoopSelector.HasPending() { @@ -531,7 +532,7 @@ func CDCFlowWorkflow( return state, err } - if state.ActiveSignal == model.PauseSignal || syncCount >= int(getMaxSyncsPerCDCFlow(ctx, logger)) { + if state.ActiveSignal == model.PauseSignal || syncCount >= maxSyncPerCDCFlow { restart = true if syncFlowFuture != nil { err := model.SyncStopSignal.SignalChildWorkflow(ctx, syncFlowFuture, struct{}{}).Get(ctx, nil)