diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index a9f58a5f3f..be052f5460 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -19,6 +19,7 @@ import ( "google.golang.org/protobuf/proto" "github.com/PeerDB-io/peer-flow/connectors" + connmetadata "github.com/PeerDB-io/peer-flow/connectors/external_metadata" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/connectors/utils/monitoring" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -182,6 +183,15 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon startTime := time.Now() errGroup, errCtx := errgroup.WithContext(ctx) errGroup.Go(func() error { + pgMetadata := connmetadata.NewPostgresMetadataFromCatalog(logger, a.CatalogPool) + // for connectors that aren't stored in metadata, returns 0 + // CH is stored in metadata and is the only connector that cares about syncBatchID in schema_delta_audit_log + syncBatchID, err := pgMetadata.GetLastSyncBatchID(ctx, flowName) + if err != nil { + return err + } + syncBatchID += 1 + return pull(srcConn, errCtx, a.CatalogPool, a.OtelManager, &model.PullRecordsRequest[Items]{ FlowJobName: flowName, SrcTableIDNameMapping: options.SrcTableIdNameMapping, @@ -197,6 +207,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon OverrideReplicationSlotName: config.ReplicationSlotName, RecordStream: recordBatchPull, Env: config.Env, + SyncBatchID: syncBatchID, }) }) diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 5dc8a14628..72da6aa8e0 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -93,10 +93,6 @@ func (c *ClickHouseConnector) syncRecordsViaAvro( return nil, err } - if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.Records.SchemaDeltas); err != nil { - return nil, fmt.Errorf("failed to sync schema changes: %w", err) - } - return &model.SyncResponse{ LastSyncedCheckpointID: req.Records.GetLastCheckpoint(), NumRecordsSynced: int64(numRecords), @@ -106,7 +102,9 @@ func (c *ClickHouseConnector) syncRecordsViaAvro( }, nil } -func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error) { +func (c *ClickHouseConnector) SyncRecords(ctx context.Context, + req *model.SyncRecordsRequest[model.RecordItems], +) (*model.SyncResponse, error) { res, err := c.syncRecordsViaAvro(ctx, req, req.SyncBatchID) if err != nil { return nil, err @@ -120,11 +118,22 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe return res, nil } +// For CH specifically, we don't want to replay during sync, but rather during the normalization phase. +// This a no-op for CH, private variant to be called during normalize. func (c *ClickHouseConnector) ReplayTableSchemaDeltas( ctx context.Context, env map[string]string, flowJobName string, schemaDeltas []*protos.TableSchemaDelta, +) error { + return nil +} + +func (c *ClickHouseConnector) replayTableSchemaDeltas( + ctx context.Context, + env map[string]string, + flowJobName string, + schemaDeltas []*protos.TableSchemaDelta, ) error { if len(schemaDeltas) == 0 { return nil diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index fabe07a35f..df3e740093 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -241,6 +241,17 @@ func (c *ClickHouseConnector) NormalizeRecords( return nil, err } + schemaDeltas, err := c.GetSchemaDeltasForBatches(ctx, req.FlowJobName, req.SyncBatchID, normBatchID) + if err != nil { + c.logger.Error("[clickhouse] error while getting schema deltas for batches", "error", err) + return nil, err + } + + if err = c.replayTableSchemaDeltas(ctx, schemaDeltas); err != nil { + c.logger.Error("[clickhouse] error while replaying table schema deltas", "error", err) + return nil, err + } + // normalize has caught up with sync, chill until more records are loaded. if normBatchID >= req.SyncBatchID { return &model.NormalizeResponse{ diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index f253bf2288..814ce42369 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -233,3 +233,37 @@ func (p *PostgresMetadata) SyncFlowCleanup(ctx context.Context, jobName string) return nil } + +func (p *PostgresMetadata) GetSchemaDeltasForBatches(ctx context.Context, + jobName string, syncBatchID int64, normalizeBatchID int64, +) ([]*protos.TableSchemaDelta, error) { + p.logger.Info("getting schema deltas for batches", + slog.String("jobName", jobName), + slog.Int64("syncBatchID", syncBatchID), + slog.Int64("normalizeBatchID", normalizeBatchID)) + + rows, err := p.pool.Query(ctx, + `SELECT (delta_info->>'tableSchemaDelta')::JSONB + FROM peerdb_stats.schema_deltas_audit_log + WHERE flow_job_name=$1 AND batch_id BETWEEN $2 AND $3`, + jobName, normalizeBatchID+1, syncBatchID) + if err != nil { + return nil, fmt.Errorf("failed to query for SchemaDeltas: %w", err) + } + + deltas, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.TableSchemaDelta, error) { + var schemaDelta protos.TableSchemaDelta + err := rows.Scan(&schemaDelta) + if err != nil { + return nil, fmt.Errorf("failed to scan SchemaDelta: %w", err) + } + + return &schemaDelta, nil + }) + if err != nil { + return nil, fmt.Errorf("failed to collect SchemaDeltas: %w", err) + } + p.logger.Info("got schema deltas", slog.Any("deltas", deltas)) + + return deltas, nil +} diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 65924c13fc..295273bf86 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -49,6 +49,7 @@ type PostgresCDCSource struct { hushWarnUnhandledMessageType map[pglogrepl.MessageType]struct{} hushWarnUnknownTableDetected map[uint32]struct{} flowJobName string + syncBatchID int64 } type PostgresCDCConfig struct { @@ -62,6 +63,7 @@ type PostgresCDCConfig struct { FlowJobName string Slot string Publication string + SyncBatchID int64 } // Create a new PostgresCDCSource @@ -82,6 +84,7 @@ func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) * hushWarnUnhandledMessageType: make(map[pglogrepl.MessageType]struct{}), hushWarnUnknownTableDetected: make(map[uint32]struct{}), flowJobName: cdcConfig.FlowJobName, + syncBatchID: cdcConfig.SyncBatchID, } } @@ -861,9 +864,9 @@ func auditSchemaDelta[Items model.Items](ctx context.Context, p *PostgresCDCSour _, err := p.catalogPool.Exec(ctx, `INSERT INTO - peerdb_stats.schema_deltas_audit_log(flow_job_name,workflow_id,run_id,delta_info) - VALUES($1,$2,$3,$4)`, - p.flowJobName, workflowID, runID, rec) + peerdb_stats.schema_deltas_audit_log(flow_job_name,workflow_id,run_id,delta_info,batch_id) + VALUES($1,$2,$3,$4,$5)`, + p.flowJobName, workflowID, runID, rec, p.syncBatchID) if err != nil { return fmt.Errorf("failed to insert row into table: %w", err) } diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 435df65ed4..e70d741fa6 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -427,6 +427,7 @@ func pullCore[Items model.Items]( FlowJobName: req.FlowJobName, Slot: slotName, Publication: publicationName, + SyncBatchID: req.SyncBatchID, }) if err := PullCdcRecords(ctx, cdc, req, processor, &c.replLock); err != nil { diff --git a/flow/model/model.go b/flow/model/model.go index d2f66693c0..467308e857 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -80,6 +80,7 @@ type PullRecordsRequest[T Items] struct { MaxBatchSize uint32 // IdleTimeout is the timeout to wait for new records. IdleTimeout time.Duration + SyncBatchID int64 } type ToJSONOptions struct { diff --git a/nexus/catalog/migrations/V42__schema_deltas_audit_log_sync_batch_id.sql b/nexus/catalog/migrations/V42__schema_deltas_audit_log_sync_batch_id.sql new file mode 100644 index 0000000000..30c7ecccbc --- /dev/null +++ b/nexus/catalog/migrations/V42__schema_deltas_audit_log_sync_batch_id.sql @@ -0,0 +1 @@ +ALTER TABLE peerdb_stats.schema_deltas_audit_log ADD COLUMN IF NOT EXISTS batch_id BIGINT;