Skip to content

Commit

Permalink
avoid getting stuck on pause waiting for finished=true which was remo…
Browse files Browse the repository at this point in the history
…ved when removing waiting on normalize
  • Loading branch information
serprex committed Dec 17, 2024
1 parent fd0456c commit cb55369
Showing 1 changed file with 11 additions and 9 deletions.
20 changes: 11 additions & 9 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,9 @@ func CDCFlowWorkflow(
syncFlowFuture := workflow.ExecuteChildWorkflow(syncCtx, SyncFlowWorkflow, cfg, state.SyncFlowOptions)

mainLoopSelector := workflow.NewNamedSelector(ctx, "MainLoop")
mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {})
mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {
finished = true
})
mainLoopSelector.AddFuture(syncFlowFuture, func(f workflow.Future) {
if err := f.Get(ctx, nil); err != nil {
var panicErr *temporal.PanicError
Expand All @@ -519,18 +521,22 @@ func CDCFlowWorkflow(
} else {
logger.Error("error in sync flow", slog.Any("error", err))
}
} else {
logger.Info("sync finished")
}

logger.Info("sync finished")
syncFlowFuture = nil
restart = true
finished = true
if state.SyncFlowOptions.NumberOfSyncs > 0 {
state.ActiveSignal = model.PauseSignal
}
})

flowSignalChan.AddToSelector(mainLoopSelector, func(val model.CDCFlowSignal, _ bool) {
state.ActiveSignal = model.FlowSignalHandler(state.ActiveSignal, val, logger)
if state.ActiveSignal == model.PauseSignal {
finished = true
}
})

addCdcPropertiesSignalListener(ctx, logger, mainLoopSelector, state)
Expand All @@ -546,7 +552,7 @@ func CDCFlowWorkflow(
return state, err
}

if state.ActiveSignal == model.PauseSignal || workflow.GetInfo(ctx).GetContinueAsNewSuggested() {
if workflow.GetInfo(ctx).GetContinueAsNewSuggested() {
restart = true
if syncFlowFuture != nil {
if err := model.SyncStopSignal.SignalChildWorkflow(ctx, syncFlowFuture, struct{}{}).Get(ctx, nil); err != nil {
Expand All @@ -556,11 +562,7 @@ func CDCFlowWorkflow(
}
}

if restart {
if state.ActiveSignal == model.PauseSignal {
finished = true
}

if restart || finished {
for ctx.Err() == nil && (!finished || mainLoopSelector.HasPending()) {
mainLoopSelector.Select(ctx)
}
Expand Down

0 comments on commit cb55369

Please sign in to comment.