Skip to content

Commit

Permalink
feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 17, 2024
1 parent 55e967a commit 998e2a6
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 41 deletions.
50 changes: 9 additions & 41 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type NormalizeBatchRequest struct {

type CdcCacheEntry struct {
connector connectors.CDCPullConnectorCore
done chan struct{}
syncDone chan struct{}
normalize chan NormalizeBatchRequest
normalizeDone chan struct{}
}
Expand Down Expand Up @@ -303,13 +303,16 @@ func (a *FlowableActivity) MaintainPull(
return err
}

done := make(chan struct{})
// syncDone & normalize will be closed by UnmaintainPull,
// whereas normalizeDone will be closed by the normalize goroutine
// Wait on normalizeDone at end to not interrupt final normalize
syncDone := make(chan struct{})
normalize := make(chan NormalizeBatchRequest)
normalizeDone := make(chan struct{})
a.CdcCacheRw.Lock()
a.CdcCache[sessionID] = CdcCacheEntry{
connector: srcConn,
done: done,
syncDone: syncDone,
normalize: normalize,
normalizeDone: normalizeDone,
}
Expand All @@ -318,41 +321,7 @@ func (a *FlowableActivity) MaintainPull(
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()

go func() {
loop:
for {
select {
case req, ok := <-normalize:
if !ok {
break loop
}
retry:
if err := a.StartNormalize(ctx, config, req.BatchID); err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
for {
// update req to latest normalize request & retry
select {
case req = <-normalize:
case <-done:
break loop
case <-ctx.Done():
break loop
default:
time.Sleep(time.Second)
goto retry
}
}
} else if req.Done != nil {
close(req.Done)
}
case <-done:
break loop
case <-ctx.Done():
break loop
}
}
close(normalizeDone)
}()
go a.normalizeLoop(ctx, config, syncDone, normalize, normalizeDone)

for {
select {
Expand All @@ -365,7 +334,7 @@ func (a *FlowableActivity) MaintainPull(
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", err)
}
case <-done:
case <-syncDone:
return nil
case <-ctx.Done():
a.CdcCacheRw.Lock()
Expand All @@ -380,8 +349,7 @@ func (a *FlowableActivity) UnmaintainPull(ctx context.Context, sessionID string)
var normalizeDone chan struct{}
a.CdcCacheRw.Lock()
if entry, ok := a.CdcCache[sessionID]; ok {
close(entry.done)
close(entry.normalize)
close(entry.syncDone)
delete(a.CdcCache, sessionID)
normalizeDone = entry.normalizeDone
}
Expand Down
48 changes: 48 additions & 0 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,3 +629,51 @@ func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn

return currentSnapshotXmin, nil
}

// Suitable to be run as goroutine
func (a *FlowableActivity) normalizeLoop(
ctx context.Context,
config *protos.FlowConnectionConfigs,
syncDone <-chan struct{},
normalize <-chan NormalizeBatchRequest,
normalizeDone chan struct{},
) {
defer close(normalizeDone)
logger := activity.GetLogger(ctx)

for {
select {
case req := <-normalize:
retryLoop:
for {
if err := a.StartNormalize(ctx, config, req.BatchID); err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
for {
// update req to latest normalize request & retry
select {
case req = <-normalize:
case <-syncDone:
logger.Info("[normalize-loop] syncDone closed before retry")
return
case <-ctx.Done():
logger.Info("[normalize-loop] context closed before retry")
return
default:
time.Sleep(30 * time.Second)
continue retryLoop
}
}
} else if req.Done != nil {
close(req.Done)
}
break
}
case <-syncDone:
logger.Info("[normalize-loop] syncDone closed")
return
case <-ctx.Done():
logger.Info("[normalize-loop] context closed")
return
}
}
}

0 comments on commit 998e2a6

Please sign in to comment.