Skip to content

Commit

Permalink
avoid races
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 10, 2024
1 parent 286d64f commit 8dd52a3
Showing 1 changed file with 37 additions and 24 deletions.
61 changes: 37 additions & 24 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ type NormalizeBatchRequest struct {
}

type CdcCacheEntry struct {
connector connectors.CDCPullConnectorCore
done chan struct{}
normalize chan NormalizeBatchRequest
connector connectors.CDCPullConnectorCore
done chan struct{}
normalize chan NormalizeBatchRequest
normalizeDone chan struct{}
}

type FlowableActivity struct {
Expand Down Expand Up @@ -304,38 +305,46 @@ func (a *FlowableActivity) MaintainPull(

done := make(chan struct{})
normalize := make(chan NormalizeBatchRequest)
normalizeDone := make(chan struct{})
a.CdcCacheRw.Lock()
a.CdcCache[sessionID] = CdcCacheEntry{
connector: srcConn,
done: done,
normalize: normalize,
connector: srcConn,
done: done,
normalize: normalize,
normalizeDone: normalizeDone,
}
a.CdcCacheRw.Unlock()

ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()

normDone := make(chan struct{})
go func() {
for req := range normalize {
res, err := a.StartNormalize(ctx, &protos.StartNormalizeInput{
FlowConnectionConfigs: config,
SyncBatchID: req.BatchID,
})
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
} else if req.Done != nil {
req.Done <- res
}
if req.Done != nil {
close(req.Done)
loop:
for {
select {
case req, ok := <-normalize:
if !ok {
break loop
}
res, err := a.StartNormalize(ctx, &protos.StartNormalizeInput{
FlowConnectionConfigs: config,
SyncBatchID: req.BatchID,
})
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
} else if req.Done != nil {
req.Done <- res
}
if req.Done != nil {
close(req.Done)
}
case <-done:
break loop
case <-ctx.Done():
break loop
}
}
close(normDone)
}()
defer func() {
close(normalize) // TODO race, this will cause sync to panic if it tries to send to normalize after maintainpull ends
<-normDone
close(normalizeDone)
}()

for {
Expand All @@ -361,12 +370,16 @@ func (a *FlowableActivity) MaintainPull(
}

func (a *FlowableActivity) UnmaintainPull(ctx context.Context, sessionID string) error {
var normalizeDone chan struct{}
a.CdcCacheRw.Lock()
if entry, ok := a.CdcCache[sessionID]; ok {
close(entry.done)
close(entry.normalize)
delete(a.CdcCache, sessionID)
normalizeDone = entry.normalizeDone
}
a.CdcCacheRw.Unlock()
<-normalizeDone
return nil
}

Expand Down

0 comments on commit 8dd52a3

Please sign in to comment.