diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index 405e7b858e..95d4d34584 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -22,6 +22,7 @@ type NormalizeState struct { } // returns reason string when workflow should exit +// signals are flushed when ProcessLoop returns func ProcessLoop(ctx workflow.Context, logger log.Logger, selector workflow.Selector, state *NormalizeState) string { canceled := ctx.Err() != nil for !canceled && selector.HasPending() { @@ -62,7 +63,7 @@ func NormalizeFlowWorkflow( syncChan := model.NormalizeSignal.GetSignalChannel(ctx) - selector := workflow.NewNamedSelector(ctx, config.FlowJobName+"-normalize") + selector := workflow.NewNamedSelector(ctx, "NormalizeLoop") selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) syncChan.AddToSelector(selector, func(s model.NormalizePayload, _ bool) { if s.Done { @@ -75,12 +76,12 @@ func NormalizeFlowWorkflow( state.Wait = false }) - for state.Wait { + for state.Wait && ctx.Err() == nil { selector.Select(ctx) } if exit := ProcessLoop(ctx, logger, selector, state); exit != "" { logger.Info(exit) - return nil + return ctx.Err() } if state.LastSyncBatchID != state.SyncBatchID { @@ -128,7 +129,7 @@ func NormalizeFlowWorkflow( state.Wait = true if exit := ProcessLoop(ctx, logger, selector, state); exit != "" { logger.Info(exit) - return nil + return ctx.Err() } return workflow.NewContinueAsNewError(ctx, NormalizeFlowWorkflow, config, state) }