Skip to content

Commit

Permalink
seems this is necessary to respond to cancelation
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Feb 23, 2024
1 parent 18b1ab3 commit 3104007
Showing 1 changed file with 7 additions and 4 deletions.
11 changes: 7 additions & 4 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,12 @@ func SyncFlowWorkflow(
return workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, config, options)
}

var waitChan model.TypedReceiveChannel[struct{}]
var waitSelector workflow.Selector
if !peerdbenv.PeerDBEnableParallelSyncNormalize() {
waitChan = model.NormalizeDoneSignal.GetSignalChannel(ctx)
waitSelector = workflow.NewNamedSelector(ctx, "NormalizeWait")
waitSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {})
waitChan := model.NormalizeDoneSignal.GetSignalChannel(ctx)
waitChan.AddToSelector(waitSelector, func(_ struct{}, _ bool) {})
}

var stop bool
Expand All @@ -96,7 +99,7 @@ func SyncFlowWorkflow(

for !stop && ctx.Err() == nil {
var syncDone, syncErr bool
mustWait := waitChan.Chan != nil
mustWait := waitSelector != nil

// execute the sync flow
currentSyncFlowNum += 1
Expand Down Expand Up @@ -203,7 +206,7 @@ func SyncFlowWorkflow(

restart := currentSyncFlowNum >= maxSyncsPerSyncFlow || syncErr
if mustWait {
waitChan.Receive(ctx)
waitSelector.Select(ctx)
if restart {
// must flush selector for signals received while waiting
for ctx.Err() == nil && selector.HasPending() {
Expand Down

0 comments on commit 3104007

Please sign in to comment.