Skip to content

Commit

Permalink
move waiting on sync/norm shutdown into main loop selector
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Feb 24, 2024
1 parent 8e484c0 commit d33a4aa
Showing 1 changed file with 34 additions and 21 deletions.
55 changes: 34 additions & 21 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func CDCFlowWorkflow(
syncFlowID := GetChildWorkflowID("sync-flow", cfg.FlowJobName, originalRunID)
normalizeFlowID := GetChildWorkflowID("normalize-flow", cfg.FlowJobName, originalRunID)

var restart bool
var restart, finished bool
syncCount := 0

syncFlowOpts := workflow.ChildWorkflowOptions{
Expand Down Expand Up @@ -402,23 +402,22 @@ func CDCFlowWorkflow(
)
}
handleError := func(name string, err error) {
w.logger.Error("error finishing sync flow", slog.Any("error", err))
var panicErr *temporal.PanicError
if errors.As(err, &panicErr) {
w.logger.Error("PANIC", panicErr.Error(), panicErr.StackTrace())
w.logger.Error(
"panic in flow",
slog.String("name", name),
slog.Any("error", panicErr.Error()),
slog.String("stack", panicErr.StackTrace()),
)
} else {
w.logger.Error("error in flow", slog.String("name", name), slog.Any("error", err))
}
}

finishSyncNormalize := func() {
restart = true
model.SyncStopSignal.SignalChildWorkflow(ctx, syncFlowFuture, struct{}{}).Get(ctx, nil)

model.NormalizeSignal.SignalChildWorkflow(ctx, normFlowFuture, model.NormalizePayload{
Done: true,
SyncBatchID: -1,
}).Get(ctx, nil)

state.TruncateProgress(w.logger)
_ = model.SyncStopSignal.SignalChildWorkflow(ctx, syncFlowFuture, struct{}{}).Get(ctx, nil)
}

mainLoopSelector := workflow.NewNamedSelector(ctx, "MainLoop")
Expand All @@ -432,10 +431,18 @@ func CDCFlowWorkflow(
state.SyncFlowErrors = append(state.SyncFlowErrors, err.Error())
}

w.logger.Warn("sync flow ended, restarting", slog.Any("error", err))
state.TruncateProgress(w.logger)
startSyncFlow()
mainLoopSelector.AddFuture(syncFlowFuture, handleSyncFlow)
if restart {
w.logger.Info("sync finished, finishing normalize")
_ = model.NormalizeSignal.SignalChildWorkflow(ctx, normFlowFuture, model.NormalizePayload{
Done: true,
SyncBatchID: -1,
}).Get(ctx, nil)
} else {
w.logger.Warn("sync flow ended, restarting", slog.Any("error", err))
state.TruncateProgress(w.logger)
startSyncFlow()
mainLoopSelector.AddFuture(syncFlowFuture, handleSyncFlow)
}
}
handleNormFlow = func(f workflow.Future) {
err := f.Get(ctx, nil)
Expand All @@ -444,10 +451,15 @@ func CDCFlowWorkflow(
state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error())
}

w.logger.Warn("normalize flow ended, restarting", slog.Any("error", err))
state.TruncateProgress(w.logger)
startNormFlow()
mainLoopSelector.AddFuture(normFlowFuture, handleNormFlow)
if restart {
w.logger.Info("normalize finished")
finished = true
} else {
w.logger.Warn("normalize flow ended, restarting", slog.Any("error", err))
state.TruncateProgress(w.logger)
startNormFlow()
mainLoopSelector.AddFuture(normFlowFuture, handleNormFlow)
}
}

startSyncFlow()
Expand Down Expand Up @@ -489,7 +501,7 @@ func CDCFlowWorkflow(

normChan := model.NormalizeSignal.GetSignalChannel(ctx)
normChan.AddToSelector(mainLoopSelector, func(payload model.NormalizePayload, _ bool) {
model.NormalizeSignal.SignalChildWorkflow(ctx, normFlowFuture, payload).Get(ctx, nil)
_ = model.NormalizeSignal.SignalChildWorkflow(ctx, normFlowFuture, payload).Get(ctx, nil)
maps.Copy(state.SyncFlowOptions.TableNameSchemaMapping, payload.TableNameSchemaMapping)
})

Expand Down Expand Up @@ -529,6 +541,7 @@ func CDCFlowWorkflow(
mainLoopSelector.Select(ctx)
}
if err := ctx.Err(); err != nil {
w.logger.Info("mirror canceled: %v", err)
return state, err
}

Expand Down Expand Up @@ -561,7 +574,7 @@ func CDCFlowWorkflow(
}

if restart {
for ctx.Err() == nil && mainLoopSelector.HasPending() {
for ctx.Err() == nil && (!finished || mainLoopSelector.HasPending()) {
mainLoopSelector.Select(ctx)
}
if err := ctx.Err(); err != nil {
Expand Down

0 comments on commit d33a4aa

Please sign in to comment.