Skip to content

Commit

Permalink
CustomSync
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 10, 2024
1 parent b4f9a11 commit 328a81b
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
5 changes: 4 additions & 1 deletion flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,9 +526,12 @@ func CDCFlowWorkflow(
handleError("sync", err)
}

logger.Info("sync finished, finishing normalize")
logger.Info("sync finished")
syncFlowFuture = nil
restart = true
if state.SyncFlowOptions.NumberOfSyncs > 0 {
state.ActiveSignal = model.PauseSignal
}
})

flowSignalChan.AddToSelector(mainLoopSelector, func(val model.CDCFlowSignal, _ bool) {
Expand Down
8 changes: 4 additions & 4 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func SyncFlowWorkflow(
)

var stop, syncErr bool
currentSyncFlowNum := 0
currentSyncFlowNum := int32(0)
totalRecordsSynced := int64(0)

selector := workflow.NewNamedSelector(ctx, "SyncLoop")
Expand All @@ -73,7 +73,7 @@ func SyncFlowWorkflow(
var syncDone bool

currentSyncFlowNum += 1
logger.Info("executing sync flow", slog.Int("count", currentSyncFlowNum))
logger.Info("executing sync flow", slog.Int("count", int(currentSyncFlowNum)))

var syncFlowFuture workflow.Future
if config.System == protos.TypeSystem_Q {
Expand All @@ -99,7 +99,7 @@ func SyncFlowWorkflow(
selector.Select(ctx)
}

if syncErr || ctx.Err() != nil || workflow.GetInfo(ctx).GetContinueAsNewSuggested() {
if currentSyncFlowNum >= options.NumberOfSyncs || syncErr || ctx.Err() != nil || workflow.GetInfo(ctx).GetContinueAsNewSuggested() {
break
}
}
Expand All @@ -123,7 +123,7 @@ func SyncFlowWorkflow(
logger.Warn("UnmaintainPull failed", slog.Any("error", err))
}

if stop {
if stop || currentSyncFlowNum >= options.NumberOfSyncs {
return nil
} else if _, stop := stopChan.ReceiveAsync(); stop {
// if sync flow erroring may outrace receiving stop
Expand Down

0 comments on commit 328a81b

Please sign in to comment.