Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

postgres cdc: update mirror lsn_offset when wal processing raises consumedXLogPos #823

Merged
merged 13 commits into from
Dec 20, 2023
Merged
3 changes: 3 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName,
RelationMessageMapping: input.RelationMessageMapping,
RecordStream: recordBatch,
SetLastOffset: func(lastOffset int64) error {
serprex marked this conversation as resolved.
Show resolved Hide resolved
return dstConn.SetLastOffset(input.FlowConnectionConfigs.FlowJobName, lastOffset)
},
})
})

Expand Down
17 changes: 17 additions & 0 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,23 @@ func (c *BigQueryConnector) GetLastOffset(jobName string) (int64, error) {
}
}

func (c *BigQueryConnector) SetLastOffset(jobName string, lastOffset int64) error {
query := fmt.Sprintf(
"UPDATE %s.%s SET offset = GREATEST(offset, %d) WHERE mirror_job_name = '%s'",
c.datasetID,
MirrorJobsTable,
lastOffset,
jobName,
)
q := c.client.Query(query)
_, err := q.Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err)
}

return nil
}

func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) {
query := fmt.Sprintf("SELECT sync_batch_id FROM %s.%s WHERE mirror_job_name = '%s'",
c.datasetID, MirrorJobsTable, jobName)
Expand Down
3 changes: 3 additions & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type CDCSyncConnector interface {
// GetLastOffset gets the last offset from the metadata table on the destination
GetLastOffset(jobName string) (int64, error)
serprex marked this conversation as resolved.
Show resolved Hide resolved

// SetLastOffset updates the last offset on the metadata table on the destination
SetLastOffset(jobName string, lastOffset int64) error

// GetLastSyncBatchID gets the last batch synced to the destination from the metadata table
GetLastSyncBatchID(jobName string) (int64, error)

Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (c *EventHubConnector) GetLastOffset(jobName string) (int64, error) {
return c.pgMetadata.FetchLastOffset(jobName)
}

func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error {
func (c *EventHubConnector) SetLastOffset(jobName string, offset int64) error {
err := c.pgMetadata.UpdateLastOffset(jobName, offset)
if err != nil {
c.logger.Error(fmt.Sprintf("failed to update last offset: %v", err))
Expand Down Expand Up @@ -187,7 +187,7 @@ func (c *EventHubConnector) processBatch(
}

if lastSeenLSN > lastUpdatedOffset {
err = c.updateLastOffset(flowJobName, lastSeenLSN)
err = c.SetLastOffset(flowJobName, lastSeenLSN)
lastUpdatedOffset = lastSeenLSN
c.logger.Info("processBatch", slog.Int64("updated last offset", lastSeenLSN))
if err != nil {
Expand Down Expand Up @@ -233,7 +233,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
return nil, err
}

err = c.updateLastOffset(req.FlowJobName, lastCheckpoint)
err = c.SetLastOffset(req.FlowJobName, lastCheckpoint)
if err != nil {
c.logger.Error("failed to update last offset", slog.Any("error", err))
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) {
var offset pgtype.Int8
err := rows.Scan(&offset)
if err != nil {
// if the job doesn't exist, return 0
if err.Error() == "no rows in result set" {
return 0, nil
}
Expand Down Expand Up @@ -198,7 +197,8 @@ func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) e
INSERT INTO `+p.schemaName+`.`+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id)
VALUES ($1, $2, $3)
ON CONFLICT (job_name)
DO UPDATE SET last_offset = $2, updated_at = NOW()
DO UPDATE SET last_offset = GREATEST(`+lastSyncStateTableName+`.last_offset, excluded.last_offset),
updated_at = NOW()
`, jobName, offset, 0)

if err != nil {
Expand Down
38 changes: 25 additions & 13 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ type PostgresCDCSource struct {
SrcTableIDNameMapping map[uint32]string
TableNameMapping map[string]model.NameAndExclude
slot string
SetLastOffset func(int64) error
publication string
relationMessageMapping model.RelationMessageMapping
typeMap *pgtype.Map
startLSN pglogrepl.LSN
commitLock bool
customTypeMapping map[uint32]string

Expand All @@ -56,6 +56,7 @@ type PostgresCDCConfig struct {
RelationMessageMapping model.RelationMessageMapping
CatalogPool *pgxpool.Pool
FlowJobName string
SetLastOffset func(int64) error
}

// Create a new PostgresCDCSource
Expand All @@ -72,6 +73,7 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32
SrcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping,
TableNameMapping: cdcConfig.TableNameMapping,
slot: cdcConfig.Slot,
SetLastOffset: cdcConfig.SetLastOffset,
publication: cdcConfig.Publication,
relationMessageMapping: cdcConfig.RelationMessageMapping,
typeMap: pgtype.NewMap(),
Expand Down Expand Up @@ -152,19 +154,20 @@ func (p *PostgresCDCSource) PullRecords(req *model.PullRecordsRequest) error {
sysident.SystemID, sysident.Timeline, sysident.XLogPos, sysident.DBName))

// start replication
p.startLSN = 0
var clientXLogPos, startLSN pglogrepl.LSN
serprex marked this conversation as resolved.
Show resolved Hide resolved
if req.LastOffset > 0 {
p.logger.Info("starting replication from last sync state", slog.Int64("last checkpoint", req.LastOffset))
p.startLSN = pglogrepl.LSN(req.LastOffset + 1)
clientXLogPos = pglogrepl.LSN(req.LastOffset)
startLSN = clientXLogPos + 1
}

err = pglogrepl.StartReplication(p.ctx, pgConn, replicationSlot, p.startLSN, replicationOpts)
err = pglogrepl.StartReplication(p.ctx, pgConn, replicationSlot, startLSN, replicationOpts)
if err != nil {
return fmt.Errorf("error starting replication at startLsn - %d: %w", p.startLSN, err)
return fmt.Errorf("error starting replication at startLsn - %d: %w", startLSN, err)
}
p.logger.Info(fmt.Sprintf("started replication on slot %s at startLSN: %d", p.slot, p.startLSN))
p.logger.Info(fmt.Sprintf("started replication on slot %s at startLSN: %d", p.slot, startLSN))

return p.consumeStream(pgConn, req, p.startLSN, req.RecordStream)
return p.consumeStream(pgConn, req, clientXLogPos, req.RecordStream)
}

// start consuming the cdc stream
Expand All @@ -181,19 +184,20 @@ func (p *PostgresCDCSource) consumeStream(
}
}()

// clientXLogPos is the last checkpoint id + 1, we need to ack that we have processed
// until clientXLogPos - 1 each time we send a standby status update.
// clientXLogPos is the last checkpoint id, we need to ack that we have processed
// until clientXLogPos each time we send a standby status update.
// consumedXLogPos is the lsn that has been committed on the destination.
consumedXLogPos := pglogrepl.LSN(0)
if clientXLogPos > 0 {
consumedXLogPos = clientXLogPos - 1
consumedXLogPos = clientXLogPos

err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn,
pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos})
if err != nil {
return fmt.Errorf("[initial-flush] SendStandbyStatusUpdate failed: %w", err)
}
}
proposedConsumedXLogPos := consumedXLogPos

var standByLastLogged time.Time
cdcRecordsStorage := cdc_records.NewCDCRecordsStore(p.flowJobName)
Expand Down Expand Up @@ -252,19 +256,27 @@ func (p *PostgresCDCSource) consumeStream(
if pkmRequiresResponse {
// Update XLogPos to the last processed position, we can only confirm
// that this is the last row committed on the destination.
if proposedConsumedXLogPos > consumedXLogPos {
p.logger.Info(fmt.Sprintf("Heartbeat adjusting lsn from %d to %d", consumedXLogPos, proposedConsumedXLogPos))
consumedXLogPos = proposedConsumedXLogPos
err := p.SetLastOffset(int64(consumedXLogPos))
if err != nil {
return fmt.Errorf("storing updated LSN failed: %w", err)
}
}

err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn,
pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos})
if err != nil {
return fmt.Errorf("SendStandbyStatusUpdate failed: %w", err)
}
pkmRequiresResponse = false

if time.Since(standByLastLogged) > 10*time.Second {
numRowsProcessedMessage := fmt.Sprintf("processed %d rows", cdcRecordsStorage.Len())
p.logger.Info(fmt.Sprintf("Sent Standby status message. %s", numRowsProcessedMessage))
standByLastLogged = time.Now()
}

pkmRequiresResponse = false
}

if (cdcRecordsStorage.Len() >= int(req.MaxBatchSize)) && !p.commitLock {
Expand Down Expand Up @@ -469,7 +481,7 @@ func (p *PostgresCDCSource) consumeStream(
if cdcRecordsStorage.IsEmpty() {
// given that we have no records it is safe to update the flush wal position
// to the clientXLogPos. clientXLogPos can be moved forward due to PKM messages.
consumedXLogPos = clientXLogPos
proposedConsumedXLogPos = clientXLogPos
records.UpdateLatestCheckpoint(int64(clientXLogPos))
}
}
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ const (
createRawTableDstTableIndexSQL = "CREATE INDEX IF NOT EXISTS %s_dst_table_idx ON %s.%s(_peerdb_destination_table_name)"

getLastOffsetSQL = "SELECT lsn_offset FROM %s.%s WHERE mirror_job_name=$1"
setLastOffsetSQL = "UPDATE %s.%s SET lsn_offset=GREATEST(lsn_offset, $1) WHERE mirror_job_name=$2"
getLastSyncBatchID_SQL = "SELECT sync_batch_id FROM %s.%s WHERE mirror_job_name=$1"
getLastNormalizeBatchID_SQL = "SELECT normalize_batch_id FROM %s.%s WHERE mirror_job_name=$1"
createNormalizedTableSQL = "CREATE TABLE IF NOT EXISTS %s(%s)"

insertJobMetadataSQL = "INSERT INTO %s.%s VALUES ($1,$2,$3,$4)"
checkIfJobMetadataExistsSQL = "SELECT COUNT(1)::TEXT::BOOL FROM %s.%s WHERE mirror_job_name=$1"
updateMetadataForSyncRecordsSQL = "UPDATE %s.%s SET lsn_offset=$1, sync_batch_id=$2 WHERE mirror_job_name=$3"
updateMetadataForSyncRecordsSQL = "UPDATE %s.%s SET lsn_offset=GREATEST(lsn_offset, $1), sync_batch_id=$2 WHERE mirror_job_name=$3"
serprex marked this conversation as resolved.
Show resolved Hide resolved
updateMetadataForNormalizeRecordsSQL = "UPDATE %s.%s SET normalize_batch_id=$1 WHERE mirror_job_name=$2"

getTableNameToUnchangedToastColsSQL = `SELECT _peerdb_destination_table_name,
Expand Down Expand Up @@ -486,6 +487,7 @@ func (c *PostgresConnector) jobMetadataExistsTx(tx pgx.Tx, jobName string) (bool
if err != nil {
return false, fmt.Errorf("error reading result row: %w", err)
}

return result.Bool, nil
}

Expand Down
14 changes: 13 additions & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,24 @@ func (c *PostgresConnector) GetLastOffset(jobName string) (int64, error) {
if err != nil {
return 0, fmt.Errorf("error while reading result row: %w", err)
}

if result.Int64 == 0 {
c.logger.Warn("Assuming zero offset means no sync has happened")
}

return result.Int64, nil
}

// SetLastOffset updates the last synced offset for a job.
func (c *PostgresConnector) SetLastOffset(jobName string, lastOffset int64) error {
_, err := c.pool.
Exec(c.ctx, fmt.Sprintf(setLastOffsetSQL, c.metadataSchema, mirrorJobsTableIdentifier), lastOffset, jobName)
if err != nil {
return fmt.Errorf("error setting last offset for job %s: %w", jobName, err)
}

return nil
}

// PullRecords pulls records from the source.
func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.PullRecordsRequest) error {
defer func() {
Expand Down Expand Up @@ -238,6 +249,7 @@ func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.Pu
RelationMessageMapping: req.RelationMessageMapping,
CatalogPool: catalogPool,
FlowJobName: req.FlowJobName,
SetLastOffset: req.SetLastOffset,
}, c.customTypesMapping)
if err != nil {
return fmt.Errorf("failed to create cdc source: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (c *S3Connector) GetLastOffset(jobName string) (int64, error) {
}

// update offset for a job
func (c *S3Connector) updateLastOffset(jobName string, offset int64) error {
func (c *S3Connector) SetLastOffset(jobName string, offset int64) error {
err := c.pgMetadata.UpdateLastOffset(jobName, offset)
if err != nil {
c.logger.Error("failed to update last offset: ", slog.Any("error", err))
Expand Down Expand Up @@ -218,7 +218,7 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes
return nil, fmt.Errorf("failed to get last checkpoint: %w", err)
}

err = c.updateLastOffset(req.FlowJobName, lastCheckpoint)
err = c.SetLastOffset(req.FlowJobName, lastCheckpoint)
if err != nil {
c.logger.Error("failed to update last offset for s3 cdc", slog.Any("error", err))
return nil, err
Expand Down
13 changes: 12 additions & 1 deletion flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ const (
WHERE TABLE_SCHEMA=? and TABLE_NAME=?`
checkIfJobMetadataExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM %s.%s WHERE MIRROR_JOB_NAME=?"
getLastOffsetSQL = "SELECT OFFSET FROM %s.%s WHERE MIRROR_JOB_NAME=?"
setLastOffsetSQL = "UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?) WHERE MIRROR_JOB_NAME=?"
getLastSyncBatchID_SQL = "SELECT SYNC_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?"
getLastNormalizeBatchID_SQL = "SELECT NORMALIZE_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?"
dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s"
Expand Down Expand Up @@ -301,7 +302,7 @@ func (c *SnowflakeConnector) GetLastOffset(jobName string) (int64, error) {
}()

if !rows.Next() {
c.logger.Warn("No row found ,returning nil")
c.logger.Warn("No row found, returning 0")
return 0, nil
}
var result pgtype.Int8
Expand All @@ -311,10 +312,20 @@ func (c *SnowflakeConnector) GetLastOffset(jobName string) (int64, error) {
}
if result.Int64 == 0 {
c.logger.Warn("Assuming zero offset means no sync has happened")
return 0, nil
}
return result.Int64, nil
}

func (c *SnowflakeConnector) SetLastOffset(jobName string, lastOffset int64) error {
_, err := c.database.ExecContext(c.ctx, fmt.Sprintf(setLastOffsetSQL,
c.metadataSchema, mirrorJobsTableIdentifier), lastOffset, jobName)
if err != nil {
return fmt.Errorf("error querying Snowflake peer for last syncedID: %w", err)
}
return nil
}

func (c *SnowflakeConnector) GetLastSyncBatchID(jobName string) (int64, error) {
rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastSyncBatchID_SQL, c.metadataSchema,
mirrorJobsTableIdentifier), jobName)
Expand Down
2 changes: 2 additions & 0 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type PullRecordsRequest struct {
RelationMessageMapping RelationMessageMapping
// record batch for pushing changes into
RecordStream *CDCRecordStream
// last offset may be forwarded while processing records
SetLastOffset func(int64) error
}

type Record interface {
Expand Down
Loading