Skip to content

Commit

Permalink
apply schema deltas on empty batches, reduce response size of sync ba…
Browse files Browse the repository at this point in the history
…tch to a single int64
  • Loading branch information
serprex committed Dec 7, 2024
1 parent 174911e commit d64ebdc
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 48 deletions.
4 changes: 2 additions & 2 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func (a *FlowableActivity) SyncRecords(
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
sessionID string,
) (*model.SyncResponse, error) {
) (int64, error) {
var adaptStream func(stream *model.CDCStream[model.RecordItems]) (*model.CDCStream[model.RecordItems], error)
if config.Script != "" {
var onErr context.CancelCauseFunc
Expand Down Expand Up @@ -416,7 +416,7 @@ func (a *FlowableActivity) SyncPg(
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
sessionID string,
) (*model.SyncResponse, error) {
) (int64, error) {
return syncCore(ctx, a, config, options, sessionID, nil,
connectors.CDCPullPgConnector.PullPg,
connectors.CDCSyncPgConnector.SyncPg)
Expand Down
91 changes: 50 additions & 41 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,34 @@ func (a *FlowableActivity) getTableNameSchemaMapping(ctx context.Context, flowNa
return tableNameSchemaMapping, nil
}

func (a *FlowableActivity) applySchemaDeltas(
ctx context.Context,
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
schemaDeltas []*protos.TableSchemaDelta,
) error {
tableSchemaDeltasCount := len(schemaDeltas)
if tableSchemaDeltasCount > 0 {
modifiedSrcTables := make([]string, 0, tableSchemaDeltasCount)
for _, tableSchemaDelta := range schemaDeltas {
modifiedSrcTables = append(modifiedSrcTables, tableSchemaDelta.SrcTableName)
}

if err := a.SetupTableSchema(ctx, &protos.SetupTableSchemaBatchInput{
PeerName: config.SourceName,
TableIdentifiers: modifiedSrcTables,
TableMappings: options.TableMappings,
FlowName: config.FlowJobName,
System: config.System,
}); err != nil {
err = fmt.Errorf("failed to execute schema update at source: %w", err)
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return err
}
}
return nil
}

func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncConnectorCore, Items model.Items](
ctx context.Context,
a *FlowableActivity,
Expand All @@ -119,7 +147,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
adaptStream func(*model.CDCStream[Items]) (*model.CDCStream[Items], error),
pull func(TPull, context.Context, *pgxpool.Pool, *otel_metrics.OtelManager, *model.PullRecordsRequest[Items]) error,
sync func(TSync, context.Context, *model.SyncRecordsRequest[Items]) (*model.SyncResponse, error),
) (*model.SyncResponse, error) {
) (int64, error) {
flowName := config.FlowJobName
ctx = context.WithValue(ctx, shared.FlowNameKey, flowName)
logger := activity.GetLogger(ctx)
Expand All @@ -135,10 +163,10 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon

srcConn, normChan, err := waitForCdcCache[TPull](ctx, a, sessionID)
if err != nil {
return nil, err
return 0, err
}
if err := srcConn.ConnectionActive(ctx); err != nil {
return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil)
return 0, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil)
}

batchSize := options.BatchSize
Expand All @@ -157,7 +185,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
}()
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, err
return 0, err
}

logger.Info("pulling records...", slog.Int64("LastOffset", lastOffset))
Expand All @@ -166,20 +194,20 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon

channelBufferSize, err := peerdbenv.PeerDBCDCChannelBufferSize(ctx, config.Env)
if err != nil {
return nil, fmt.Errorf("failed to get CDC channel buffer size: %w", err)
return 0, fmt.Errorf("failed to get CDC channel buffer size: %w", err)
}
recordBatchPull := model.NewCDCStream[Items](int(channelBufferSize))
recordBatchSync := recordBatchPull
if adaptStream != nil {
var err error
if recordBatchSync, err = adaptStream(recordBatchPull); err != nil {
return nil, err
return 0, err
}
}

tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, flowName)
if err != nil {
return nil, err
return 0, err
}

startTime := time.Now()
Expand Down Expand Up @@ -215,27 +243,24 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
a.Alerter.LogFlowError(ctx, flowName, err)
}
if temporal.IsApplicationError(err) {
return nil, err
return 0, err
} else {
return nil, fmt.Errorf("failed in pull records when: %w", err)
return 0, fmt.Errorf("failed in pull records when: %w", err)
}
}
logger.Info("no records to push")

dstConn, err := connectors.GetByNameAs[TSync](ctx, config.Env, a.CatalogPool, config.DestinationName)
if err != nil {
return nil, fmt.Errorf("failed to recreate destination connector: %w", err)
return 0, fmt.Errorf("failed to recreate destination connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, recordBatchSync.SchemaDeltas); err != nil {
return nil, fmt.Errorf("failed to sync schema: %w", err)
return 0, fmt.Errorf("failed to sync schema: %w", err)
}

return &model.SyncResponse{
CurrentSyncBatchID: -1,
TableSchemaDeltas: recordBatchSync.SchemaDeltas,
}, nil
return -1, a.applySchemaDeltas(ctx, config, options, recordBatchSync.SchemaDeltas)
}

var syncStartTime time.Time
Expand Down Expand Up @@ -289,9 +314,9 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
a.Alerter.LogFlowError(ctx, flowName, err)
}
if temporal.IsApplicationError(err) {
return nil, err
return 0, err
} else {
return nil, fmt.Errorf("failed to pull records: %w", err)
return 0, fmt.Errorf("failed to pull records: %w", err)
}
}

Expand All @@ -307,49 +332,33 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
ctx, a.CatalogPool, flowName, res.CurrentSyncBatchID, uint32(numRecords), lastCheckpoint,
); err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, err
return 0, err
}

if err := monitoring.UpdateLatestLSNAtTargetForCDCFlow(ctx, a.CatalogPool, flowName, lastCheckpoint); err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, err
return 0, err
}
if res.TableNameRowsMapping != nil {
if err := monitoring.AddCDCBatchTablesForFlow(
ctx, a.CatalogPool, flowName, res.CurrentSyncBatchID, res.TableNameRowsMapping,
); err != nil {
return nil, err
return 0, err
}
}

pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords)
activity.RecordHeartbeat(ctx, pushedRecordsWithCount)
a.Alerter.LogFlowInfo(ctx, flowName, pushedRecordsWithCount)

tableSchemaDeltasCount := len(res.TableSchemaDeltas)
if tableSchemaDeltasCount > 0 {
modifiedSrcTables := make([]string, 0, tableSchemaDeltasCount)
for _, tableSchemaDelta := range res.TableSchemaDeltas {
modifiedSrcTables = append(modifiedSrcTables, tableSchemaDelta.SrcTableName)
}

if err := a.SetupTableSchema(ctx, &protos.SetupTableSchemaBatchInput{
PeerName: config.SourceName,
TableIdentifiers: modifiedSrcTables,
TableMappings: options.TableMappings,
FlowName: config.FlowJobName,
System: config.System,
}); err != nil {
err = fmt.Errorf("failed to execute schema update at source: %w", err)
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, err
}
if err := a.applySchemaDeltas(ctx, config, options, res.TableSchemaDeltas); err != nil {
return 0, err
}

if recordBatchSync.NeedsNormalize() {
parallel, err := peerdbenv.PeerDBEnableParallelSyncNormalize(ctx, config.Env)
if err != nil {
return nil, err
return 0, err
}
var done chan model.NormalizeResponse
if !parallel {
Expand All @@ -361,14 +370,14 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
}
if done != nil {
if normRes, ok := <-done; !ok {
return nil, errors.New("failed to normalize")
return 0, errors.New("failed to normalize")
} else {
a.Alerter.LogFlowInfo(ctx, flowName, fmt.Sprintf("normalized from %d to %d", normRes.StartBatchID, normRes.EndBatchID))
}
}
}

return res, nil
return res.NumRecordsSynced, nil
}

func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) {
Expand Down
11 changes: 6 additions & 5 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,14 @@ func SyncFlowWorkflow(
selector.AddFuture(syncFlowFuture, func(f workflow.Future) {
syncDone = true

var childSyncFlowRes *model.SyncResponse
if err := f.Get(ctx, &childSyncFlowRes); err != nil {
var numRecordsSynced int64
if err := f.Get(ctx, &numRecordsSynced); err != nil {
logger.Error("failed to execute sync flow", slog.Any("error", err))
syncErr = true
} else if childSyncFlowRes != nil {
totalRecordsSynced += childSyncFlowRes.NumRecordsSynced
logger.Info("Total records synced", slog.Int64("totalRecordsSynced", totalRecordsSynced))
} else {
totalRecordsSynced += numRecordsSynced
logger.Info("Total records synced",
slog.Int64("numRecordsSynced", numRecordsSynced), slog.Int64("totalRecordsSynced", totalRecordsSynced))
}
})

Expand Down

0 comments on commit d64ebdc

Please sign in to comment.