diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 12bf02ad1a..ca8f6e9518 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -359,7 +359,7 @@ func CDCFlowWorkflow( syncFlowID := GetChildWorkflowID("sync-flow", cfg.FlowJobName, originalRunID) normalizeFlowID := GetChildWorkflowID("normalize-flow", cfg.FlowJobName, originalRunID) - var restart bool + var restart, finished bool syncCount := 0 syncFlowOpts := workflow.ChildWorkflowOptions{ @@ -402,23 +402,22 @@ func CDCFlowWorkflow( ) } handleError := func(name string, err error) { - w.logger.Error("error finishing sync flow", slog.Any("error", err)) var panicErr *temporal.PanicError if errors.As(err, &panicErr) { - w.logger.Error("PANIC", panicErr.Error(), panicErr.StackTrace()) + w.logger.Error( + "panic in flow", + slog.String("name", name), + slog.Any("error", panicErr.Error()), + slog.String("stack", panicErr.StackTrace()), + ) + } else { + w.logger.Error("error in flow", slog.String("name", name), slog.Any("error", err)) } } finishSyncNormalize := func() { restart = true - model.SyncStopSignal.SignalChildWorkflow(ctx, syncFlowFuture, struct{}{}).Get(ctx, nil) - - model.NormalizeSignal.SignalChildWorkflow(ctx, normFlowFuture, model.NormalizePayload{ - Done: true, - SyncBatchID: -1, - }).Get(ctx, nil) - - state.TruncateProgress(w.logger) + _ = model.SyncStopSignal.SignalChildWorkflow(ctx, syncFlowFuture, struct{}{}).Get(ctx, nil) } mainLoopSelector := workflow.NewNamedSelector(ctx, "MainLoop") @@ -432,10 +431,18 @@ func CDCFlowWorkflow( state.SyncFlowErrors = append(state.SyncFlowErrors, err.Error()) } - w.logger.Warn("sync flow ended, restarting", slog.Any("error", err)) - state.TruncateProgress(w.logger) - startSyncFlow() - mainLoopSelector.AddFuture(syncFlowFuture, handleSyncFlow) + if restart { + w.logger.Info("sync finished, finishing normalize") + _ = model.NormalizeSignal.SignalChildWorkflow(ctx, normFlowFuture, model.NormalizePayload{ + Done: true, + SyncBatchID: -1, + }).Get(ctx, nil) + } else { + w.logger.Warn("sync flow ended, restarting", slog.Any("error", err)) + state.TruncateProgress(w.logger) + startSyncFlow() + mainLoopSelector.AddFuture(syncFlowFuture, handleSyncFlow) + } } handleNormFlow = func(f workflow.Future) { err := f.Get(ctx, nil) @@ -444,10 +451,15 @@ func CDCFlowWorkflow( state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error()) } - w.logger.Warn("normalize flow ended, restarting", slog.Any("error", err)) - state.TruncateProgress(w.logger) - startNormFlow() - mainLoopSelector.AddFuture(normFlowFuture, handleNormFlow) + if restart { + w.logger.Info("normalize finished") + finished = true + } else { + w.logger.Warn("normalize flow ended, restarting", slog.Any("error", err)) + state.TruncateProgress(w.logger) + startNormFlow() + mainLoopSelector.AddFuture(normFlowFuture, handleNormFlow) + } } startSyncFlow() @@ -489,7 +501,7 @@ func CDCFlowWorkflow( normChan := model.NormalizeSignal.GetSignalChannel(ctx) normChan.AddToSelector(mainLoopSelector, func(payload model.NormalizePayload, _ bool) { - model.NormalizeSignal.SignalChildWorkflow(ctx, normFlowFuture, payload).Get(ctx, nil) + _ = model.NormalizeSignal.SignalChildWorkflow(ctx, normFlowFuture, payload).Get(ctx, nil) maps.Copy(state.SyncFlowOptions.TableNameSchemaMapping, payload.TableNameSchemaMapping) }) @@ -529,6 +541,7 @@ func CDCFlowWorkflow( mainLoopSelector.Select(ctx) } if err := ctx.Err(); err != nil { + w.logger.Info("mirror canceled: %v", err) return state, err } @@ -561,7 +574,7 @@ func CDCFlowWorkflow( } if restart { - for ctx.Err() == nil && mainLoopSelector.HasPending() { + for ctx.Err() == nil && (!finished || mainLoopSelector.HasPending()) { mainLoopSelector.Select(ctx) } if err := ctx.Err(); err != nil {