Skip to content

Commit

Permalink
apply SchemaDeltas in normalize
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Dec 11, 2024
1 parent 0a8fa90 commit 2c645b1
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 8 deletions.
11 changes: 11 additions & 0 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"google.golang.org/protobuf/proto"

"github.com/PeerDB-io/peer-flow/connectors"
connmetadata "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/generated/protos"
Expand Down Expand Up @@ -182,6 +183,15 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
startTime := time.Now()
errGroup, errCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
pgMetadata := connmetadata.NewPostgresMetadataFromCatalog(logger, a.CatalogPool)
// for connectors that aren't stored in metadata, returns 0
// CH is stored in metadata and is the only connector that cares about syncBatchID in schema_delta_audit_log
syncBatchID, err := pgMetadata.GetLastSyncBatchID(ctx, flowName)
if err != nil {
return err
}
syncBatchID += 1

return pull(srcConn, errCtx, a.CatalogPool, a.OtelManager, &model.PullRecordsRequest[Items]{
FlowJobName: flowName,
SrcTableIDNameMapping: options.SrcTableIdNameMapping,
Expand All @@ -197,6 +207,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
OverrideReplicationSlotName: config.ReplicationSlotName,
RecordStream: recordBatchPull,
Env: config.Env,
SyncBatchID: syncBatchID,
})
})

Expand Down
19 changes: 14 additions & 5 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,6 @@ func (c *ClickHouseConnector) syncRecordsViaAvro(
return nil, err
}

if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.Records.SchemaDeltas); err != nil {
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}

return &model.SyncResponse{
LastSyncedCheckpointID: req.Records.GetLastCheckpoint(),
NumRecordsSynced: int64(numRecords),
Expand All @@ -106,7 +102,9 @@ func (c *ClickHouseConnector) syncRecordsViaAvro(
}, nil
}

func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error) {
func (c *ClickHouseConnector) SyncRecords(ctx context.Context,
req *model.SyncRecordsRequest[model.RecordItems],
) (*model.SyncResponse, error) {
res, err := c.syncRecordsViaAvro(ctx, req, req.SyncBatchID)
if err != nil {
return nil, err
Expand All @@ -120,11 +118,22 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe
return res, nil
}

// For CH specifically, we don't want to replay during sync, but rather during the normalization phase.
// This a no-op for CH, private variant to be called during normalize.
func (c *ClickHouseConnector) ReplayTableSchemaDeltas(
ctx context.Context,
env map[string]string,
flowJobName string,
schemaDeltas []*protos.TableSchemaDelta,
) error {
return nil
}

func (c *ClickHouseConnector) replayTableSchemaDeltas(
ctx context.Context,
env map[string]string,
flowJobName string,
schemaDeltas []*protos.TableSchemaDelta,
) error {
if len(schemaDeltas) == 0 {
return nil
Expand Down
11 changes: 11 additions & 0 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,17 @@ func (c *ClickHouseConnector) NormalizeRecords(
return nil, err
}

schemaDeltas, err := c.GetSchemaDeltasForBatches(ctx, req.FlowJobName, req.SyncBatchID, normBatchID)
if err != nil {
c.logger.Error("[clickhouse] error while getting schema deltas for batches", "error", err)
return nil, err
}

if err = c.replayTableSchemaDeltas(ctx, schemaDeltas); err != nil {
c.logger.Error("[clickhouse] error while replaying table schema deltas", "error", err)
return nil, err
}

// normalize has caught up with sync, chill until more records are loaded.
if normBatchID >= req.SyncBatchID {
return &model.NormalizeResponse{
Expand Down
34 changes: 34 additions & 0 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,37 @@ func (p *PostgresMetadata) SyncFlowCleanup(ctx context.Context, jobName string)

return nil
}

func (p *PostgresMetadata) GetSchemaDeltasForBatches(ctx context.Context,
jobName string, syncBatchID int64, normalizeBatchID int64,
) ([]*protos.TableSchemaDelta, error) {
p.logger.Info("getting schema deltas for batches",
slog.String("jobName", jobName),
slog.Int64("syncBatchID", syncBatchID),
slog.Int64("normalizeBatchID", normalizeBatchID))

rows, err := p.pool.Query(ctx,
`SELECT (delta_info->>'tableSchemaDelta')::JSONB
FROM peerdb_stats.schema_deltas_audit_log
WHERE flow_job_name=$1 AND batch_id BETWEEN $2 AND $3`,
jobName, normalizeBatchID+1, syncBatchID)
if err != nil {
return nil, fmt.Errorf("failed to query for SchemaDeltas: %w", err)
}

deltas, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.TableSchemaDelta, error) {
var schemaDelta protos.TableSchemaDelta
err := rows.Scan(&schemaDelta)
if err != nil {
return nil, fmt.Errorf("failed to scan SchemaDelta: %w", err)
}

return &schemaDelta, nil
})
if err != nil {
return nil, fmt.Errorf("failed to collect SchemaDeltas: %w", err)
}
p.logger.Info("got schema deltas", slog.Any("deltas", deltas))

return deltas, nil
}
9 changes: 6 additions & 3 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type PostgresCDCSource struct {
hushWarnUnhandledMessageType map[pglogrepl.MessageType]struct{}
hushWarnUnknownTableDetected map[uint32]struct{}
flowJobName string
syncBatchID int64
}

type PostgresCDCConfig struct {
Expand All @@ -62,6 +63,7 @@ type PostgresCDCConfig struct {
FlowJobName string
Slot string
Publication string
SyncBatchID int64
}

// Create a new PostgresCDCSource
Expand All @@ -82,6 +84,7 @@ func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) *
hushWarnUnhandledMessageType: make(map[pglogrepl.MessageType]struct{}),
hushWarnUnknownTableDetected: make(map[uint32]struct{}),
flowJobName: cdcConfig.FlowJobName,
syncBatchID: cdcConfig.SyncBatchID,
}
}

Expand Down Expand Up @@ -861,9 +864,9 @@ func auditSchemaDelta[Items model.Items](ctx context.Context, p *PostgresCDCSour

_, err := p.catalogPool.Exec(ctx,
`INSERT INTO
peerdb_stats.schema_deltas_audit_log(flow_job_name,workflow_id,run_id,delta_info)
VALUES($1,$2,$3,$4)`,
p.flowJobName, workflowID, runID, rec)
peerdb_stats.schema_deltas_audit_log(flow_job_name,workflow_id,run_id,delta_info,batch_id)
VALUES($1,$2,$3,$4,$5)`,
p.flowJobName, workflowID, runID, rec, p.syncBatchID)
if err != nil {
return fmt.Errorf("failed to insert row into table: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ func pullCore[Items model.Items](
FlowJobName: req.FlowJobName,
Slot: slotName,
Publication: publicationName,
SyncBatchID: req.SyncBatchID,
})

if err := PullCdcRecords(ctx, cdc, req, processor, &c.replLock); err != nil {
Expand Down
1 change: 1 addition & 0 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type PullRecordsRequest[T Items] struct {
MaxBatchSize uint32
// IdleTimeout is the timeout to wait for new records.
IdleTimeout time.Duration
SyncBatchID int64
}

type ToJSONOptions struct {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE peerdb_stats.schema_deltas_audit_log ADD COLUMN IF NOT EXISTS batch_id BIGINT;

0 comments on commit 2c645b1

Please sign in to comment.