From 328a81bc9a09135942220184a9c6cd5d2db4339b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 9 Dec 2024 21:20:34 +0000 Subject: [PATCH] CustomSync --- flow/workflows/cdc_flow.go | 5 ++++- flow/workflows/sync_flow.go | 8 ++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 60d42e024..9358093b9 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -526,9 +526,12 @@ func CDCFlowWorkflow( handleError("sync", err) } - logger.Info("sync finished, finishing normalize") + logger.Info("sync finished") syncFlowFuture = nil restart = true + if state.SyncFlowOptions.NumberOfSyncs > 0 { + state.ActiveSignal = model.PauseSignal + } }) flowSignalChan.AddToSelector(mainLoopSelector, func(val model.CDCFlowSignal, _ bool) { diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 3da4b3464..5b3d04b3d 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -47,7 +47,7 @@ func SyncFlowWorkflow( ) var stop, syncErr bool - currentSyncFlowNum := 0 + currentSyncFlowNum := int32(0) totalRecordsSynced := int64(0) selector := workflow.NewNamedSelector(ctx, "SyncLoop") @@ -73,7 +73,7 @@ func SyncFlowWorkflow( var syncDone bool currentSyncFlowNum += 1 - logger.Info("executing sync flow", slog.Int("count", currentSyncFlowNum)) + logger.Info("executing sync flow", slog.Int("count", int(currentSyncFlowNum))) var syncFlowFuture workflow.Future if config.System == protos.TypeSystem_Q { @@ -99,7 +99,7 @@ func SyncFlowWorkflow( selector.Select(ctx) } - if syncErr || ctx.Err() != nil || workflow.GetInfo(ctx).GetContinueAsNewSuggested() { + if currentSyncFlowNum >= options.NumberOfSyncs || syncErr || ctx.Err() != nil || workflow.GetInfo(ctx).GetContinueAsNewSuggested() { break } } @@ -123,7 +123,7 @@ func SyncFlowWorkflow( logger.Warn("UnmaintainPull failed", slog.Any("error", err)) } - if stop { + if stop || currentSyncFlowNum >= options.NumberOfSyncs { return nil } else if _, stop := stopChan.ReceiveAsync(); stop { // if sync flow erroring may outrace receiving stop