diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index b1eec8b702..969c61eb65 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -375,7 +375,7 @@ func (a *FlowableActivity) SyncRecords( config *protos.FlowConnectionConfigs, options *protos.SyncFlowOptions, sessionID string, -) (*model.SyncResponse, error) { +) (int64, error) { var adaptStream func(stream *model.CDCStream[model.RecordItems]) (*model.CDCStream[model.RecordItems], error) if config.Script != "" { var onErr context.CancelCauseFunc @@ -416,7 +416,7 @@ func (a *FlowableActivity) SyncPg( config *protos.FlowConnectionConfigs, options *protos.SyncFlowOptions, sessionID string, -) (*model.SyncResponse, error) { +) (int64, error) { return syncCore(ctx, a, config, options, sessionID, nil, connectors.CDCPullPgConnector.PullPg, connectors.CDCSyncPgConnector.SyncPg) diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 08af39036e..b6edd72486 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -110,6 +110,34 @@ func (a *FlowableActivity) getTableNameSchemaMapping(ctx context.Context, flowNa return tableNameSchemaMapping, nil } +func (a *FlowableActivity) applySchemaDeltas( + ctx context.Context, + config *protos.FlowConnectionConfigs, + options *protos.SyncFlowOptions, + schemaDeltas []*protos.TableSchemaDelta, +) error { + tableSchemaDeltasCount := len(schemaDeltas) + if tableSchemaDeltasCount > 0 { + modifiedSrcTables := make([]string, 0, tableSchemaDeltasCount) + for _, tableSchemaDelta := range schemaDeltas { + modifiedSrcTables = append(modifiedSrcTables, tableSchemaDelta.SrcTableName) + } + + if err := a.SetupTableSchema(ctx, &protos.SetupTableSchemaBatchInput{ + PeerName: config.SourceName, + TableIdentifiers: modifiedSrcTables, + TableMappings: options.TableMappings, + FlowName: config.FlowJobName, + System: config.System, + }); err != nil { + err = fmt.Errorf("failed to execute schema update at source: %w", err) + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + return err + } + } + return nil +} + func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncConnectorCore, Items model.Items]( ctx context.Context, a *FlowableActivity, @@ -119,7 +147,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon adaptStream func(*model.CDCStream[Items]) (*model.CDCStream[Items], error), pull func(TPull, context.Context, *pgxpool.Pool, *otel_metrics.OtelManager, *model.PullRecordsRequest[Items]) error, sync func(TSync, context.Context, *model.SyncRecordsRequest[Items]) (*model.SyncResponse, error), -) (*model.SyncResponse, error) { +) (int64, error) { flowName := config.FlowJobName ctx = context.WithValue(ctx, shared.FlowNameKey, flowName) logger := activity.GetLogger(ctx) @@ -135,10 +163,10 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon srcConn, normChan, err := waitForCdcCache[TPull](ctx, a, sessionID) if err != nil { - return nil, err + return 0, err } if err := srcConn.ConnectionActive(ctx); err != nil { - return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil) + return 0, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil) } batchSize := options.BatchSize @@ -157,7 +185,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon }() if err != nil { a.Alerter.LogFlowError(ctx, flowName, err) - return nil, err + return 0, err } logger.Info("pulling records...", slog.Int64("LastOffset", lastOffset)) @@ -166,20 +194,20 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon channelBufferSize, err := peerdbenv.PeerDBCDCChannelBufferSize(ctx, config.Env) if err != nil { - return nil, fmt.Errorf("failed to get CDC channel buffer size: %w", err) + return 0, fmt.Errorf("failed to get CDC channel buffer size: %w", err) } recordBatchPull := model.NewCDCStream[Items](int(channelBufferSize)) recordBatchSync := recordBatchPull if adaptStream != nil { var err error if recordBatchSync, err = adaptStream(recordBatchPull); err != nil { - return nil, err + return 0, err } } tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, flowName) if err != nil { - return nil, err + return 0, err } startTime := time.Now() @@ -215,27 +243,24 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon a.Alerter.LogFlowError(ctx, flowName, err) } if temporal.IsApplicationError(err) { - return nil, err + return 0, err } else { - return nil, fmt.Errorf("failed in pull records when: %w", err) + return 0, fmt.Errorf("failed in pull records when: %w", err) } } logger.Info("no records to push") dstConn, err := connectors.GetByNameAs[TSync](ctx, config.Env, a.CatalogPool, config.DestinationName) if err != nil { - return nil, fmt.Errorf("failed to recreate destination connector: %w", err) + return 0, fmt.Errorf("failed to recreate destination connector: %w", err) } defer connectors.CloseConnector(ctx, dstConn) if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, recordBatchSync.SchemaDeltas); err != nil { - return nil, fmt.Errorf("failed to sync schema: %w", err) + return 0, fmt.Errorf("failed to sync schema: %w", err) } - return &model.SyncResponse{ - CurrentSyncBatchID: -1, - TableSchemaDeltas: recordBatchSync.SchemaDeltas, - }, nil + return -1, a.applySchemaDeltas(ctx, config, options, recordBatchSync.SchemaDeltas) } var syncStartTime time.Time @@ -289,9 +314,9 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon a.Alerter.LogFlowError(ctx, flowName, err) } if temporal.IsApplicationError(err) { - return nil, err + return 0, err } else { - return nil, fmt.Errorf("failed to pull records: %w", err) + return 0, fmt.Errorf("failed to pull records: %w", err) } } @@ -307,18 +332,18 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon ctx, a.CatalogPool, flowName, res.CurrentSyncBatchID, uint32(numRecords), lastCheckpoint, ); err != nil { a.Alerter.LogFlowError(ctx, flowName, err) - return nil, err + return 0, err } if err := monitoring.UpdateLatestLSNAtTargetForCDCFlow(ctx, a.CatalogPool, flowName, lastCheckpoint); err != nil { a.Alerter.LogFlowError(ctx, flowName, err) - return nil, err + return 0, err } if res.TableNameRowsMapping != nil { if err := monitoring.AddCDCBatchTablesForFlow( ctx, a.CatalogPool, flowName, res.CurrentSyncBatchID, res.TableNameRowsMapping, ); err != nil { - return nil, err + return 0, err } } @@ -326,30 +351,14 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon activity.RecordHeartbeat(ctx, pushedRecordsWithCount) a.Alerter.LogFlowInfo(ctx, flowName, pushedRecordsWithCount) - tableSchemaDeltasCount := len(res.TableSchemaDeltas) - if tableSchemaDeltasCount > 0 { - modifiedSrcTables := make([]string, 0, tableSchemaDeltasCount) - for _, tableSchemaDelta := range res.TableSchemaDeltas { - modifiedSrcTables = append(modifiedSrcTables, tableSchemaDelta.SrcTableName) - } - - if err := a.SetupTableSchema(ctx, &protos.SetupTableSchemaBatchInput{ - PeerName: config.SourceName, - TableIdentifiers: modifiedSrcTables, - TableMappings: options.TableMappings, - FlowName: config.FlowJobName, - System: config.System, - }); err != nil { - err = fmt.Errorf("failed to execute schema update at source: %w", err) - a.Alerter.LogFlowError(ctx, flowName, err) - return nil, err - } + if err := a.applySchemaDeltas(ctx, config, options, res.TableSchemaDeltas); err != nil { + return 0, err } if recordBatchSync.NeedsNormalize() { parallel, err := peerdbenv.PeerDBEnableParallelSyncNormalize(ctx, config.Env) if err != nil { - return nil, err + return 0, err } var done chan model.NormalizeResponse if !parallel { @@ -361,14 +370,14 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon } if done != nil { if normRes, ok := <-done; !ok { - return nil, errors.New("failed to normalize") + return 0, errors.New("failed to normalize") } else { a.Alerter.LogFlowInfo(ctx, flowName, fmt.Sprintf("normalized from %d to %d", normRes.StartBatchID, normRes.EndBatchID)) } } } - return res, nil + return res.NumRecordsSynced, nil } func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) { diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index b4bc0f29f7..3da4b3464b 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -84,13 +84,14 @@ func SyncFlowWorkflow( selector.AddFuture(syncFlowFuture, func(f workflow.Future) { syncDone = true - var childSyncFlowRes *model.SyncResponse - if err := f.Get(ctx, &childSyncFlowRes); err != nil { + var numRecordsSynced int64 + if err := f.Get(ctx, &numRecordsSynced); err != nil { logger.Error("failed to execute sync flow", slog.Any("error", err)) syncErr = true - } else if childSyncFlowRes != nil { - totalRecordsSynced += childSyncFlowRes.NumRecordsSynced - logger.Info("Total records synced", slog.Int64("totalRecordsSynced", totalRecordsSynced)) + } else { + totalRecordsSynced += numRecordsSynced + logger.Info("Total records synced", + slog.Int64("numRecordsSynced", numRecordsSynced), slog.Int64("totalRecordsSynced", totalRecordsSynced)) } })