From cff08920e289c0b22109968a8bc2c35de1f52db5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 14 May 2024 22:19:49 +0000 Subject: [PATCH] wip persist --- flow/connectors/postgres/cdc.go | 28 +++++++++++----------------- nexus/catalog/migrations/V26__v2.sql | 7 +++++++ 2 files changed, 18 insertions(+), 17 deletions(-) create mode 100644 nexus/catalog/migrations/V26__v2.sql diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 71d3314a72..7216471114 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -24,17 +24,12 @@ import ( "github.com/PeerDB-io/peer-flow/model/qvalue" ) -type MessageLSN struct { - msg pglogrepl.Message - lsn pglogrepl.LSN -} - type PostgresCDCSource struct { *PostgresConnector *PostgresCDCConfig typeMap *pgtype.Map commitLock *pglogrepl.BeginMessage - txBuffer map[uint32][]MessageLSN + txBuffer map[uint32][][]byte inStream bool } @@ -60,9 +55,9 @@ type startReplicationOpts struct { } func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) *PostgresCDCSource { - var txBuffer map[uint32][]MessageLSN + var txBuffer map[uint32][][]byte if cdcConfig.Version >= 2 { - txBuffer = make(map[uint32][]MessageLSN) + txBuffer = make(map[uint32][][]byte) } return &PostgresCDCSource{ PostgresConnector: c, @@ -476,7 +471,7 @@ func PullCdcRecords[Items model.Items]( logger.Debug(fmt.Sprintf("XLogData => WALStart %s ServerWALEnd %s ServerTime %s\n", xld.WALStart, xld.ServerWALEnd, xld.ServerTime)) - if err := recordProcessor.processXLogData(ctx, p, xld, clientXLogPos); err != nil { + if err := recordProcessor.processXLogData(ctx, p, xld, msg.Data[1:], clientXLogPos); err != nil { return fmt.Errorf("error processing message: %w", err) } @@ -502,6 +497,7 @@ func (rp *cdcRecordProcessor[Items]) processXLogData( ctx context.Context, p *PostgresCDCSource, xld pglogrepl.XLogData, + xldbytes []byte, currentClientXlogPos pglogrepl.LSN, ) error { var logicalMsg pglogrepl.Message @@ -534,15 +530,15 @@ func (rp *cdcRecordProcessor[Items]) processMessage( case *pglogrepl.InsertMessage: return rp.processInsertMessage(p, lsn, msg) case *pglogrepl.InsertMessageV2: - p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], MessageLSN{msg: &msg.InsertMessage, lsn: lsn}) + p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], xldbytes) case *pglogrepl.UpdateMessage: return rp.processUpdateMessage(p, lsn, msg) case *pglogrepl.UpdateMessageV2: - p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], MessageLSN{msg: &msg.UpdateMessage, lsn: lsn}) + p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], xldbytes) case *pglogrepl.DeleteMessage: return rp.processDeleteMessage(p, lsn, msg) case *pglogrepl.DeleteMessageV2: - p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], MessageLSN{msg: &msg.DeleteMessage, lsn: lsn}) + p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], xldbytes) case *pglogrepl.CommitMessage: // for a commit message, update the last checkpoint id for the record batch. logger.Debug(fmt.Sprintf("CommitMessage => CommitLSN: %v, TransactionEndLSN: %v", @@ -572,7 +568,7 @@ func (rp *cdcRecordProcessor[Items]) processMessage( return rp.processRelationMessage(ctx, p, currentClientXlogPos, msg) case *pglogrepl.RelationMessageV2: - p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], MessageLSN{msg: &msg.RelationMessage, lsn: lsn}) + p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], xldbytes) case *pglogrepl.StreamStartMessageV2: p.inStream = true case *pglogrepl.StreamStopMessageV2: @@ -748,8 +744,7 @@ func (rp *cdcRecordProcessor[Items]) processDeleteMessage( } isFullReplica := rp.pullRequest.TableNameSchemaMapping[tableName].IsReplicaIdentityFull if isFullReplica { - err := rp.addRecordWithKey(p.logger, model.TableWithPkey{}, rec) - if err != nil { + if err := rp.addRecordWithKey(p.logger, model.TableWithPkey{}, rec); err != nil { return err } } else { @@ -778,8 +773,7 @@ func (rp *cdcRecordProcessor[Items]) processDeleteMessage( // A delete can only be followed by an INSERT, which does not need backfilling // No need to store DeleteRecords in memory or disk. - err = rp.addRecordWithKey(p.logger, model.TableWithPkey{}, rec) - if err != nil { + if err := rp.addRecordWithKey(p.logger, model.TableWithPkey{}, rec); err != nil { return err } } diff --git a/nexus/catalog/migrations/V26__v2.sql b/nexus/catalog/migrations/V26__v2.sql new file mode 100644 index 0000000000..4dbd34b173 --- /dev/null +++ b/nexus/catalog/migrations/V26__v2.sql @@ -0,0 +1,7 @@ +create table v2cdc ( + flow_name text, + xid xid, + lsn pg_lsn, + stream bytea[], + primary key (flow_name, xid, lsn) +);