Skip to content

Commit

Permalink
Normalize: avoid deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Feb 23, 2024
1 parent c27c9b6 commit 18b1ab3
Showing 1 changed file with 5 additions and 4 deletions.
9 changes: 5 additions & 4 deletions flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type NormalizeState struct {
}

// returns reason string when workflow should exit
// signals are flushed when ProcessLoop returns
func ProcessLoop(ctx workflow.Context, logger log.Logger, selector workflow.Selector, state *NormalizeState) string {
canceled := ctx.Err() != nil
for !canceled && selector.HasPending() {
Expand Down Expand Up @@ -62,7 +63,7 @@ func NormalizeFlowWorkflow(

syncChan := model.NormalizeSignal.GetSignalChannel(ctx)

selector := workflow.NewNamedSelector(ctx, config.FlowJobName+"-normalize")
selector := workflow.NewNamedSelector(ctx, "NormalizeLoop")
selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {})
syncChan.AddToSelector(selector, func(s model.NormalizePayload, _ bool) {
if s.Done {
Expand All @@ -75,12 +76,12 @@ func NormalizeFlowWorkflow(
state.Wait = false
})

for state.Wait {
for state.Wait && ctx.Err() == nil {
selector.Select(ctx)
}
if exit := ProcessLoop(ctx, logger, selector, state); exit != "" {
logger.Info(exit)
return nil
return ctx.Err()
}

if state.LastSyncBatchID != state.SyncBatchID {
Expand Down Expand Up @@ -128,7 +129,7 @@ func NormalizeFlowWorkflow(
state.Wait = true
if exit := ProcessLoop(ctx, logger, selector, state); exit != "" {
logger.Info(exit)
return nil
return ctx.Err()
}
return workflow.NewContinueAsNewError(ctx, NormalizeFlowWorkflow, config, state)
}

0 comments on commit 18b1ab3

Please sign in to comment.