diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 6e2232b905..5907101954 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -326,10 +326,7 @@ func (a *FlowableActivity) MaintainPull( if !ok { break loop } - res, err := a.StartNormalize(ctx, &protos.StartNormalizeInput{ - FlowConnectionConfigs: config, - SyncBatchID: req.BatchID, - }) + res, err := a.StartNormalize(ctx, config, req.BatchID) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) } else if req.Done != nil { @@ -437,21 +434,21 @@ func (a *FlowableActivity) SyncPg( func (a *FlowableActivity) StartNormalize( ctx context.Context, - input *protos.StartNormalizeInput, + config *protos.FlowConnectionConfigs, + batchID int64, ) (model.NormalizeResponse, error) { - conn := input.FlowConnectionConfigs - ctx = context.WithValue(ctx, shared.FlowNameKey, conn.FlowJobName) + ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) logger := activity.GetLogger(ctx) dstConn, err := connectors.GetByNameAs[connectors.CDCNormalizeConnector]( ctx, - input.FlowConnectionConfigs.Env, + config.Env, a.CatalogPool, - conn.DestinationName, + config.DestinationName, ) if errors.Is(err, errors.ErrUnsupported) { return model.NormalizeResponse{}, - monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, input.SyncBatchID) + monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, config.FlowJobName, batchID) } else if err != nil { return model.NormalizeResponse{}, fmt.Errorf("failed to get normalize connector: %w", err) } @@ -462,32 +459,27 @@ func (a *FlowableActivity) StartNormalize( }) defer shutdown() - tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, input.FlowConnectionConfigs.FlowJobName) + tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, config.FlowJobName) if err != nil { return model.NormalizeResponse{}, fmt.Errorf("failed to get table name schema mapping: %w", err) } - logger.Info("Normalizing batch", slog.Int64("SyncBatchID", input.SyncBatchID)) + logger.Info("Normalizing batch", slog.Int64("SyncBatchID", batchID)) res, err := dstConn.NormalizeRecords(ctx, &model.NormalizeRecordsRequest{ - FlowJobName: input.FlowConnectionConfigs.FlowJobName, - Env: input.FlowConnectionConfigs.Env, + FlowJobName: config.FlowJobName, + Env: config.Env, TableNameSchemaMapping: tableNameSchemaMapping, - TableMappings: input.FlowConnectionConfigs.TableMappings, - SyncBatchID: input.SyncBatchID, - SoftDeleteColName: input.FlowConnectionConfigs.SoftDeleteColName, - SyncedAtColName: input.FlowConnectionConfigs.SyncedAtColName, + TableMappings: config.TableMappings, + SoftDeleteColName: config.SoftDeleteColName, + SyncedAtColName: config.SyncedAtColName, + SyncBatchID: batchID, }) if err != nil { - a.Alerter.LogFlowError(ctx, input.FlowConnectionConfigs.FlowJobName, err) + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return model.NormalizeResponse{}, fmt.Errorf("failed to normalized records: %w", err) } if _, dstPg := dstConn.(*connpostgres.PostgresConnector); dstPg { - if err := monitoring.UpdateEndTimeForCDCBatch( - ctx, - a.CatalogPool, - input.FlowConnectionConfigs.FlowJobName, - input.SyncBatchID, - ); err != nil { + if err := monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, config.FlowJobName, batchID); err != nil { return model.NormalizeResponse{}, fmt.Errorf("failed to update end time for cdc batch: %w", err) } } diff --git a/protos/flow.proto b/protos/flow.proto index 42170a5630..b917b6dae5 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -117,11 +117,6 @@ message SyncFlowOptions { int32 number_of_syncs = 7; } -message StartNormalizeInput { - FlowConnectionConfigs flow_connection_configs = 1; - int64 SyncBatchID = 3; -} - message EnsurePullabilityBatchInput { string flow_job_name = 2; repeated string source_table_identifiers = 3;