Skip to content

Commit

Permalink
post merge fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jun 12, 2024
1 parent 8ebc32b commit 1f31f25
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 140 deletions.
181 changes: 42 additions & 139 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,116 +480,6 @@ func PullCdcRecords[Items model.Items](
return fmt.Errorf("error processing message: %w", err)
}

if rec != nil {
tableName := rec.GetDestinationTableName()
switch r := rec.(type) {
case *model.UpdateRecord[Items]:
// tableName here is destination tableName.
// should be ideally sourceTableName as we are in PullRecords.
// will change in future
isFullReplica := req.TableNameSchemaMapping[tableName].IsReplicaIdentityFull
if isFullReplica {
err := addRecordWithKey(model.TableWithPkey{}, rec)
if err != nil {
return err
}
} else {
tablePkeyVal, err := model.RecToTablePKey[Items](req.TableNameSchemaMapping, rec)
if err != nil {
return err
}

latestRecord, ok, err := cdcRecordsStorage.Get(tablePkeyVal)
if err != nil {
return err
}
if !ok {
err = addRecordWithKey(tablePkeyVal, rec)
} else {
// iterate through unchanged toast cols and set them in new record
updatedCols := r.NewItems.UpdateIfNotExists(latestRecord.GetItems())
for _, col := range updatedCols {
delete(r.UnchangedToastColumns, col)
}
err = addRecordWithKey(tablePkeyVal, rec)
}
if err != nil {
return err
}
}

case *model.InsertRecord[Items]:
isFullReplica := req.TableNameSchemaMapping[tableName].IsReplicaIdentityFull
if isFullReplica {
err := addRecordWithKey(model.TableWithPkey{}, rec)
if err != nil {
return err
}
} else {
tablePkeyVal, err := model.RecToTablePKey[Items](req.TableNameSchemaMapping, rec)
if err != nil {
return err
}

err = addRecordWithKey(tablePkeyVal, rec)
if err != nil {
return err
}
}
case *model.DeleteRecord[Items]:
isFullReplica := req.TableNameSchemaMapping[tableName].IsReplicaIdentityFull
if isFullReplica {
err := addRecordWithKey(model.TableWithPkey{}, rec)
if err != nil {
return err
}
} else {
tablePkeyVal, err := model.RecToTablePKey[Items](req.TableNameSchemaMapping, rec)
if err != nil {
return err
}

latestRecord, ok, err := cdcRecordsStorage.Get(tablePkeyVal)
if err != nil {
return err
}
if ok {
r.Items = latestRecord.GetItems()
if updateRecord, ok := latestRecord.(*model.UpdateRecord[Items]); ok {
r.UnchangedToastColumns = updateRecord.UnchangedToastColumns
}
} else {
// there is nothing to backfill the items in the delete record with,
// so don't update the row with this record
// add sentinel value to prevent update statements from selecting
r.UnchangedToastColumns = map[string]struct{}{
"_peerdb_not_backfilled_delete": {},
}
}

// A delete can only be followed by an INSERT, which does not need backfilling
// No need to store DeleteRecords in memory or disk.
err = addRecordWithKey(model.TableWithPkey{}, rec)
if err != nil {
return err
}
}

case *model.RelationRecord[Items]:
tableSchemaDelta := r.TableSchemaDelta
if len(tableSchemaDelta.AddedColumns) > 0 {
logger.Info(fmt.Sprintf("Detected schema change for table %s, addedColumns: %v",
tableSchemaDelta.SrcTableName, tableSchemaDelta.AddedColumns))
records.AddSchemaDelta(req.TableNameMapping, tableSchemaDelta)
}

case *model.MessageRecord[Items]:
if err := addRecordWithKey(model.TableWithPkey{}, rec); err != nil {
return err
}
}
}

if xld.WALStart > clientXLogPos {
clientXLogPos = xld.WALStart
}
Expand Down Expand Up @@ -678,10 +568,18 @@ func (rp *cdcRecordProcessor[Items]) processMessage(
return rp.processDeleteMessage(p, lsn, msg)
case *pglogrepl.DeleteMessageV2:
return rp.processDeleteMessage(p, lsn, &msg.DeleteMessage)
case *pglogrepl.RelationMessage:
return rp.processRelationMessage(ctx, p, currentClientXlogPos, msg)
case *pglogrepl.RelationMessageV2:
return rp.processRelationMessage(ctx, p, currentClientXlogPos, &msg.RelationMessage)
case *pglogrepl.LogicalDecodingMessage:
return rp.processLogicalDecodingMessage(p, lsn, msg)
case *pglogrepl.LogicalDecodingMessageV2:
return rp.processLogicalDecodingMessage(p, lsn, &msg.LogicalDecodingMessage)
case *pglogrepl.CommitMessage:
// for a commit message, update the last checkpoint id for the record batch.
logger.Debug("CommitMessage", slog.Any("CommitLSN", msg.CommitLSN), slog.Any("TransactionEndLSN", msg.TransactionEndLSN))
rp.records.UpdateLatestCheckpoint(int64(msg.CommitLSN))
logger.Debug("CommitMessage", slog.Any("CommitLSN", msg.CommitLSN), slog.Any("TransactionEndLSN", msg.TransactionEndLSN))
rp.records.UpdateLatestCheckpoint(int64(msg.CommitLSN))
p.commitLock = nil
case *pglogrepl.StreamCommitMessageV2:
txbuf := p.txBuffer[msg.Xid]
Expand Down Expand Up @@ -733,39 +631,22 @@ func (rp *cdcRecordProcessor[Items]) processMessage(
rp.records.UpdateLatestCheckpoint(int64(msg.CommitLSN))
delete(p.txBuffer, msg.Xid)
case *pglogrepl.StreamAbortMessageV2:
if txbuf, ok := p.txBuffer[msg.Xid]; ok && !txbuf.FirstSegment {
if _, err := p.CatalogPool.Exec(ctx,
"delete from v2cdc where flow_name = $1 and xid = $2",
p.FlowJobName, msg.Xid,
); err != nil {
return err
}
}
delete(p.txBuffer, msg.Xid)
case *pglogrepl.RelationMessage:
return rp.processRelationMessage(ctx, p, currentClientXlogPos, msg)
case *pglogrepl.RelationMessageV2:
return rp.processRelationMessage(ctx, p, currentClientXlogPos, &msg.RelationMessage)
case *pglogrepl.StreamStartMessageV2:
if _, ok := p.txBuffer[msg.Xid]; !ok {
p.txBuffer[msg.Xid] = &TxBuffer{Lsn: lsn, FirstSegment: msg.FirstSegment != 0}
}
p.inStream = true
case *pglogrepl.StreamStopMessageV2:
p.inStream = false
case *pglogrepl.RelationMessage:
logger.Debug("RelationMessage",
slog.Any("RelationID", msg.RelationID),
slog.String("Namespace", msg.Namespace),
slog.String("RelationName", msg.RelationName),
slog.Any("Columns", msg.Columns))

return processRelationMessage[Items](ctx, p, currentClientXlogPos, msg)
case *pglogrepl.LogicalDecodingMessage:
logger.Info("LogicalDecodingMessage",
slog.Bool("Transactional", msg.Transactional),
slog.String("Prefix", msg.Prefix),
slog.Int64("LSN", int64(msg.LSN)))
if !msg.Transactional {
batch.UpdateLatestCheckpoint(int64(msg.LSN))
}
return &model.MessageRecord[Items]{
BaseRecord: p.baseRecord(msg.LSN),
Prefix: msg.Prefix,
Content: msg.Content,
}, nil

default:
logger.Warn(fmt.Sprintf("%T not supported", msg))
Expand Down Expand Up @@ -1002,8 +883,11 @@ func (rp *cdcRecordProcessor[Items]) processRelationMessage(
return nil
}

p.logger.Debug(fmt.Sprintf("RelationMessage => RelationID: %d, Namespace: %s, RelationName: %s, Columns: %v",
msg.RelationID, msg.Namespace, msg.RelationName, msg.Columns))
p.logger.Debug("RelationMessage",
slog.Any("RelationID", msg.RelationID),
slog.String("Namespace", msg.Namespace),
slog.String("RelationName", msg.RelationName),
slog.Any("Columns", msg.Columns))

// not present in tables to sync, return immediately
if _, ok := p.SrcTableIDNameMapping[msg.RelationID]; !ok {
Expand Down Expand Up @@ -1090,6 +974,25 @@ func (rp *cdcRecordProcessor[Items]) processRelationMessage(
return nil
}

func (rp *cdcRecordProcessor[Items]) processLogicalDecodingMessage(
p *PostgresCDCSource,
lsn pglogrepl.LSN,
msg *pglogrepl.LogicalDecodingMessage,
) error {
p.logger.Info("LogicalDecodingMessage",
slog.Bool("Transactional", msg.Transactional),
slog.String("Prefix", msg.Prefix),
slog.Int64("LSN", int64(msg.LSN)))
if !msg.Transactional {
rp.records.UpdateLatestCheckpoint(int64(msg.LSN))
}
return rp.addRecordWithKey(p.logger, model.TableWithPkey{}, &model.MessageRecord[Items]{
BaseRecord: p.baseRecord(lsn),
Prefix: msg.Prefix,
Content: msg.Content,
})
}

func (p *PostgresCDCSource) getParentRelIDIfPartitioned(relID uint32) uint32 {
if parentRelID, ok := p.ChildToParentRelIDMap[relID]; ok {
return parentRelID
Expand Down
6 changes: 5 additions & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,11 @@ func (c *PostgresConnector) MaybeStartReplication(
return nil
}

func (c *PostgresConnector) replicationOptions(ctx context.Context, version int32, publicationName string) (pglogrepl.StartReplicationOptions, error) {
func (c *PostgresConnector) replicationOptions(
ctx context.Context,
version int32,
publicationName string,
) (pglogrepl.StartReplicationOptions, error) {
var pluginArguments []string
switch version {
case 1:
Expand Down

0 comments on commit 1f31f25

Please sign in to comment.