Skip to content

Commit

Permalink
retries, also avoid locking up on context cancelation
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 17, 2024
1 parent 7a75906 commit fb23122
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 37 deletions.
37 changes: 23 additions & 14 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type CheckConnectionResult struct {
}

type NormalizeBatchRequest struct {
Done chan model.NormalizeResponse
Done chan struct{}
BatchID int64
}

Expand Down Expand Up @@ -326,13 +326,23 @@ func (a *FlowableActivity) MaintainPull(
if !ok {
break loop
}
res, err := a.StartNormalize(ctx, config, req.BatchID)
if err != nil {
retry:
if err := a.StartNormalize(ctx, config, req.BatchID); err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
for {
// update req to latest normalize request & retry
select {
case req = <-normalize:
case <-done:
break loop
case <-ctx.Done():
break loop
default:
time.Sleep(time.Second)
goto retry
}
}
} else if req.Done != nil {
req.Done <- res
}
if req.Done != nil {
close(req.Done)
}
case <-done:
Expand Down Expand Up @@ -436,7 +446,7 @@ func (a *FlowableActivity) StartNormalize(
ctx context.Context,
config *protos.FlowConnectionConfigs,
batchID int64,
) (model.NormalizeResponse, error) {
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)

Expand All @@ -447,10 +457,9 @@ func (a *FlowableActivity) StartNormalize(
config.DestinationName,
)
if errors.Is(err, errors.ErrUnsupported) {
return model.NormalizeResponse{},
monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, config.FlowJobName, batchID)
return monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, config.FlowJobName, batchID)
} else if err != nil {
return model.NormalizeResponse{}, fmt.Errorf("failed to get normalize connector: %w", err)
return fmt.Errorf("failed to get normalize connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

Expand All @@ -461,7 +470,7 @@ func (a *FlowableActivity) StartNormalize(

tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, config.FlowJobName)
if err != nil {
return model.NormalizeResponse{}, fmt.Errorf("failed to get table name schema mapping: %w", err)
return fmt.Errorf("failed to get table name schema mapping: %w", err)
}

logger.Info("Normalizing batch", slog.Int64("SyncBatchID", batchID))
Expand All @@ -476,17 +485,17 @@ func (a *FlowableActivity) StartNormalize(
})
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return model.NormalizeResponse{}, fmt.Errorf("failed to normalized records: %w", err)
return fmt.Errorf("failed to normalized records: %w", err)
}
if _, dstPg := dstConn.(*connpostgres.PostgresConnector); dstPg {
if err := monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, config.FlowJobName, batchID); err != nil {
return model.NormalizeResponse{}, fmt.Errorf("failed to update end time for cdc batch: %w", err)
return fmt.Errorf("failed to update end time for cdc batch: %w", err)
}
}

logger.Info("normalized batches", slog.Int64("StartBatchID", res.StartBatchID), slog.Int64("EndBatchID", res.EndBatchID))

return res, nil
return nil
}

// SetupQRepMetadataTables sets up the metadata tables for QReplication.
Expand Down
20 changes: 10 additions & 10 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package activities

import (
"context"
"errors"
"fmt"
"log/slog"
"reflect"
Expand Down Expand Up @@ -360,19 +359,20 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
if err != nil {
return 0, err
}
var done chan model.NormalizeResponse
var done chan struct{}
if !parallel {
done = make(chan model.NormalizeResponse)
done = make(chan struct{})
}
normChan <- NormalizeBatchRequest{
BatchID: res.CurrentSyncBatchID,
Done: done,
select {
case normChan <- NormalizeBatchRequest{BatchID: res.CurrentSyncBatchID, Done: done}:
case <-ctx.Done():
return 0, nil
}
if done != nil {
if normRes, ok := <-done; !ok {
return 0, errors.New("failed to normalize")
} else {
a.Alerter.LogFlowInfo(ctx, flowName, fmt.Sprintf("normalized from %d to %d", normRes.StartBatchID, normRes.EndBatchID))
select {
case <-done:
case <-ctx.Done():
return 0, nil
}
}
}
Expand Down
13 changes: 0 additions & 13 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,19 +169,6 @@ type SyncResponse struct {
CurrentSyncBatchID int64
}

type NormalizePayload struct {
TableNameSchemaMapping map[string]*protos.TableSchema
Done bool
SyncBatchID int64
}

type NormalizeResponse struct {
// Flag to depict if normalization is done
Done bool
StartBatchID int64
EndBatchID int64
}

type RelationMessageMapping map[uint32]*pglogrepl.RelationMessage

type SyncCompositeResponse struct {
Expand Down

0 comments on commit fb23122

Please sign in to comment.