Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[clickhouse] apply TableSchemaDeltas in normalize #2148

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"google.golang.org/protobuf/proto"

"github.com/PeerDB-io/peer-flow/connectors"
connmetadata "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/generated/protos"
Expand Down Expand Up @@ -134,7 +135,7 @@
if err != nil {
return nil, err
}
if err := srcConn.ConnectionActive(ctx); err != nil {

Check failure on line 138 in flow/activities/flowable_core.go

View workflow job for this annotation

GitHub Actions / lint

srcConn.ConnectionActive undefined (type TPull has no field or method ConnectionActive) (typecheck)
return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil)
}

Expand Down Expand Up @@ -182,6 +183,15 @@
startTime := time.Now()
errGroup, errCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
pgMetadata := connmetadata.NewPostgresMetadataFromCatalog(logger, a.CatalogPool)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Further down we have

syncBatchID, err := dstConn.GetLastSyncBatchID(errCtx, flowName)

Could lift that up to where we get lastOffset. Then use same value between two goroutines. Would make this logic work for connectors (pg:pg) which don't use catalog for metadata too

// for connectors that aren't stored in metadata, returns 0
// CH is stored in metadata and is the only connector that cares about syncBatchID in schema_delta_audit_log
syncBatchID, err := pgMetadata.GetLastSyncBatchID(ctx, flowName)
if err != nil {
return err
}
syncBatchID += 1

return pull(srcConn, errCtx, a.CatalogPool, a.OtelManager, &model.PullRecordsRequest[Items]{
FlowJobName: flowName,
SrcTableIDNameMapping: options.SrcTableIdNameMapping,
Expand All @@ -197,6 +207,7 @@
OverrideReplicationSlotName: config.ReplicationSlotName,
RecordStream: recordBatchPull,
Env: config.Env,
SyncBatchID: syncBatchID,
})
})

Expand Down Expand Up @@ -301,7 +312,7 @@
logger.Info(fmt.Sprintf("pushed %d records in %d seconds", numRecords, int(syncDuration.Seconds())))

lastCheckpoint := recordBatchSync.GetLastCheckpoint()
srcConn.UpdateReplStateLastOffset(lastCheckpoint)

Check failure on line 315 in flow/activities/flowable_core.go

View workflow job for this annotation

GitHub Actions / lint

srcConn.UpdateReplStateLastOffset undefined (type TPull has no field or method UpdateReplStateLastOffset) (typecheck)

if err := monitoring.UpdateNumRowsAndEndLSNForCDCBatch(
ctx, a.CatalogPool, flowName, res.CurrentSyncBatchID, uint32(numRecords), lastCheckpoint,
Expand Down
19 changes: 14 additions & 5 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,6 @@ func (c *ClickHouseConnector) syncRecordsViaAvro(
return nil, err
}

if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.Records.SchemaDeltas); err != nil {
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}

return &model.SyncResponse{
LastSyncedCheckpointID: req.Records.GetLastCheckpoint(),
NumRecordsSynced: int64(numRecords),
Expand All @@ -106,7 +102,9 @@ func (c *ClickHouseConnector) syncRecordsViaAvro(
}, nil
}

func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error) {
func (c *ClickHouseConnector) SyncRecords(ctx context.Context,
req *model.SyncRecordsRequest[model.RecordItems],
) (*model.SyncResponse, error) {
res, err := c.syncRecordsViaAvro(ctx, req, req.SyncBatchID)
if err != nil {
return nil, err
Expand All @@ -120,11 +118,22 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe
return res, nil
}

// For CH specifically, we don't want to replay during sync, but rather during the normalization phase.
// This a no-op for CH, private variant to be called during normalize.
func (c *ClickHouseConnector) ReplayTableSchemaDeltas(
ctx context.Context,
env map[string]string,
flowJobName string,
schemaDeltas []*protos.TableSchemaDelta,
) error {
return nil
}

func (c *ClickHouseConnector) replayTableSchemaDeltas(
ctx context.Context,
env map[string]string,
flowJobName string,
schemaDeltas []*protos.TableSchemaDelta,
) error {
if len(schemaDeltas) == 0 {
return nil
Expand Down
11 changes: 11 additions & 0 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,17 @@ func (c *ClickHouseConnector) NormalizeRecords(
return nil, err
}

schemaDeltas, err := c.GetSchemaDeltasForBatches(ctx, req.FlowJobName, req.SyncBatchID, normBatchID)
if err != nil {
c.logger.Error("[clickhouse] error while getting schema deltas for batches", "error", err)
return nil, err
}

if err = c.replayTableSchemaDeltas(ctx, schemaDeltas); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if err = c.replayTableSchemaDeltas(ctx, schemaDeltas); err != nil {
if err := c.replayTableSchemaDeltas(ctx, schemaDeltas); err != nil {

c.logger.Error("[clickhouse] error while replaying table schema deltas", "error", err)
return nil, err
}

// normalize has caught up with sync, chill until more records are loaded.
if normBatchID >= req.SyncBatchID {
return &model.NormalizeResponse{
Expand Down
34 changes: 34 additions & 0 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,37 @@ func (p *PostgresMetadata) SyncFlowCleanup(ctx context.Context, jobName string)

return nil
}

func (p *PostgresMetadata) GetSchemaDeltasForBatches(ctx context.Context,
jobName string, syncBatchID int64, normalizeBatchID int64,
) ([]*protos.TableSchemaDelta, error) {
p.logger.Info("getting schema deltas for batches",
slog.String("jobName", jobName),
slog.Int64("syncBatchID", syncBatchID),
slog.Int64("normalizeBatchID", normalizeBatchID))

rows, err := p.pool.Query(ctx,
`SELECT (delta_info->>'tableSchemaDelta')::JSONB
FROM peerdb_stats.schema_deltas_audit_log
WHERE flow_job_name=$1 AND batch_id BETWEEN $2 AND $3`,
jobName, normalizeBatchID+1, syncBatchID)
if err != nil {
return nil, fmt.Errorf("failed to query for SchemaDeltas: %w", err)
}

deltas, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.TableSchemaDelta, error) {
var schemaDelta protos.TableSchemaDelta
err := rows.Scan(&schemaDelta)
if err != nil {
return nil, fmt.Errorf("failed to scan SchemaDelta: %w", err)
}

return &schemaDelta, nil
})
if err != nil {
return nil, fmt.Errorf("failed to collect SchemaDeltas: %w", err)
}
p.logger.Info("got schema deltas", slog.Any("deltas", deltas))

return deltas, nil
}
9 changes: 6 additions & 3 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type PostgresCDCSource struct {
hushWarnUnhandledMessageType map[pglogrepl.MessageType]struct{}
hushWarnUnknownTableDetected map[uint32]struct{}
flowJobName string
syncBatchID int64
}

type PostgresCDCConfig struct {
Expand All @@ -62,6 +63,7 @@ type PostgresCDCConfig struct {
FlowJobName string
Slot string
Publication string
SyncBatchID int64
}

// Create a new PostgresCDCSource
Expand All @@ -82,6 +84,7 @@ func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) *
hushWarnUnhandledMessageType: make(map[pglogrepl.MessageType]struct{}),
hushWarnUnknownTableDetected: make(map[uint32]struct{}),
flowJobName: cdcConfig.FlowJobName,
syncBatchID: cdcConfig.SyncBatchID,
}
}

Expand Down Expand Up @@ -861,9 +864,9 @@ func auditSchemaDelta[Items model.Items](ctx context.Context, p *PostgresCDCSour

_, err := p.catalogPool.Exec(ctx,
`INSERT INTO
peerdb_stats.schema_deltas_audit_log(flow_job_name,workflow_id,run_id,delta_info)
VALUES($1,$2,$3,$4)`,
p.flowJobName, workflowID, runID, rec)
peerdb_stats.schema_deltas_audit_log(flow_job_name,workflow_id,run_id,delta_info,batch_id)
VALUES($1,$2,$3,$4,$5)`,
p.flowJobName, workflowID, runID, rec, p.syncBatchID)
if err != nil {
return fmt.Errorf("failed to insert row into table: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ func pullCore[Items model.Items](
FlowJobName: req.FlowJobName,
Slot: slotName,
Publication: publicationName,
SyncBatchID: req.SyncBatchID,
})

if err := PullCdcRecords(ctx, cdc, req, processor, &c.replLock); err != nil {
Expand Down
1 change: 1 addition & 0 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type PullRecordsRequest[T Items] struct {
MaxBatchSize uint32
// IdleTimeout is the timeout to wait for new records.
IdleTimeout time.Duration
SyncBatchID int64
}

type ToJSONOptions struct {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE peerdb_stats.schema_deltas_audit_log ADD COLUMN IF NOT EXISTS batch_id BIGINT;
Loading