diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index f83efd8221..5ba56d9545 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -45,7 +45,7 @@ type NormalizeBatchRequest struct { type CdcCacheEntry struct { connector connectors.CDCPullConnectorCore - done chan struct{} + syncDone chan struct{} normalize chan NormalizeBatchRequest normalizeDone chan struct{} } @@ -303,13 +303,16 @@ func (a *FlowableActivity) MaintainPull( return err } - done := make(chan struct{}) + // syncDone & normalize will be closed by UnmaintainPull, + // whereas normalizeDone will be closed by the normalize goroutine + // Wait on normalizeDone at end to not interrupt final normalize + syncDone := make(chan struct{}) normalize := make(chan NormalizeBatchRequest) normalizeDone := make(chan struct{}) a.CdcCacheRw.Lock() a.CdcCache[sessionID] = CdcCacheEntry{ connector: srcConn, - done: done, + syncDone: syncDone, normalize: normalize, normalizeDone: normalizeDone, } @@ -318,41 +321,7 @@ func (a *FlowableActivity) MaintainPull( ticker := time.NewTicker(15 * time.Second) defer ticker.Stop() - go func() { - loop: - for { - select { - case req, ok := <-normalize: - if !ok { - break loop - } - retry: - if err := a.StartNormalize(ctx, config, req.BatchID); err != nil { - a.Alerter.LogFlowError(ctx, config.FlowJobName, err) - for { - // update req to latest normalize request & retry - select { - case req = <-normalize: - case <-done: - break loop - case <-ctx.Done(): - break loop - default: - time.Sleep(time.Second) - goto retry - } - } - } else if req.Done != nil { - close(req.Done) - } - case <-done: - break loop - case <-ctx.Done(): - break loop - } - } - close(normalizeDone) - }() + go a.normalizeLoop(ctx, config, syncDone, normalize, normalizeDone) for { select { @@ -365,7 +334,7 @@ func (a *FlowableActivity) MaintainPull( a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", err) } - case <-done: + case <-syncDone: return nil case <-ctx.Done(): a.CdcCacheRw.Lock() @@ -380,8 +349,7 @@ func (a *FlowableActivity) UnmaintainPull(ctx context.Context, sessionID string) var normalizeDone chan struct{} a.CdcCacheRw.Lock() if entry, ok := a.CdcCache[sessionID]; ok { - close(entry.done) - close(entry.normalize) + close(entry.syncDone) delete(a.CdcCache, sessionID) normalizeDone = entry.normalizeDone } diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index e9a28adbb9..4361f4d5e1 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -629,3 +629,51 @@ func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn return currentSnapshotXmin, nil } + +// Suitable to be run as goroutine +func (a *FlowableActivity) normalizeLoop( + ctx context.Context, + config *protos.FlowConnectionConfigs, + syncDone <-chan struct{}, + normalize <-chan NormalizeBatchRequest, + normalizeDone chan struct{}, +) { + defer close(normalizeDone) + logger := activity.GetLogger(ctx) + + for { + select { + case req := <-normalize: + retryLoop: + for { + if err := a.StartNormalize(ctx, config, req.BatchID); err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + for { + // update req to latest normalize request & retry + select { + case req = <-normalize: + case <-syncDone: + logger.Info("[normalize-loop] syncDone closed before retry") + return + case <-ctx.Done(): + logger.Info("[normalize-loop] context closed before retry") + return + default: + time.Sleep(30 * time.Second) + continue retryLoop + } + } + } else if req.Done != nil { + close(req.Done) + } + break + } + case <-syncDone: + logger.Info("[normalize-loop] syncDone closed") + return + case <-ctx.Done(): + logger.Info("[normalize-loop] context closed") + return + } + } +}