From 8cb12097e376b92df573122d9f73bd76ec4a8dd1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 14 May 2024 12:14:39 +0000 Subject: [PATCH] correct lsn --- flow/connectors/postgres/cdc.go | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 10513cedeb..71d3314a72 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -24,12 +24,17 @@ import ( "github.com/PeerDB-io/peer-flow/model/qvalue" ) +type MessageLSN struct { + msg pglogrepl.Message + lsn pglogrepl.LSN +} + type PostgresCDCSource struct { *PostgresConnector *PostgresCDCConfig typeMap *pgtype.Map commitLock *pglogrepl.BeginMessage - txBuffer map[uint32][]pglogrepl.Message + txBuffer map[uint32][]MessageLSN inStream bool } @@ -55,9 +60,9 @@ type startReplicationOpts struct { } func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) *PostgresCDCSource { - var txBuffer map[uint32][]pglogrepl.Message + var txBuffer map[uint32][]MessageLSN if cdcConfig.Version >= 2 { - txBuffer = make(map[uint32][]pglogrepl.Message) + txBuffer = make(map[uint32][]MessageLSN) } return &PostgresCDCSource{ PostgresConnector: c, @@ -279,9 +284,9 @@ func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, forma type cdcRecordProcessor[Items model.Items] struct { recordStore *utils.CdcStore[Items] records *model.CDCStream[Items] - nextStandbyMessageDeadline time.Time pullRequest *model.PullRecordsRequest[Items] processor replProcessor[Items] + nextStandbyMessageDeadline time.Time } func (rp *cdcRecordProcessor[Items]) addRecordWithKey(logger log.Logger, key model.TableWithPkey, rec model.Record[Items]) error { @@ -509,13 +514,13 @@ func (rp *cdcRecordProcessor[Items]) processXLogData( if err != nil { return fmt.Errorf("error parsing logical message: %w", err) } - return rp.processMessage(ctx, p, xld, logicalMsg, currentClientXlogPos) + return rp.processMessage(ctx, p, xld.WALStart, logicalMsg, currentClientXlogPos) } func (rp *cdcRecordProcessor[Items]) processMessage( ctx context.Context, p *PostgresCDCSource, - xld pglogrepl.XLogData, + lsn pglogrepl.LSN, logicalMsg pglogrepl.Message, currentClientXlogPos pglogrepl.LSN, ) error { @@ -527,17 +532,17 @@ func (rp *cdcRecordProcessor[Items]) processMessage( logger.Debug("Locking PullRecords at BeginMessage, awaiting CommitMessage") p.commitLock = msg case *pglogrepl.InsertMessage: - return rp.processInsertMessage(p, xld.WALStart, msg) + return rp.processInsertMessage(p, lsn, msg) case *pglogrepl.InsertMessageV2: - p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], &msg.InsertMessage) + p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], MessageLSN{msg: &msg.InsertMessage, lsn: lsn}) case *pglogrepl.UpdateMessage: - return rp.processUpdateMessage(p, xld.WALStart, msg) + return rp.processUpdateMessage(p, lsn, msg) case *pglogrepl.UpdateMessageV2: - p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], &msg.UpdateMessage) + p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], MessageLSN{msg: &msg.UpdateMessage, lsn: lsn}) case *pglogrepl.DeleteMessage: - return rp.processDeleteMessage(p, xld.WALStart, msg) + return rp.processDeleteMessage(p, lsn, msg) case *pglogrepl.DeleteMessageV2: - p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], &msg.DeleteMessage) + p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], MessageLSN{msg: &msg.DeleteMessage, lsn: lsn}) case *pglogrepl.CommitMessage: // for a commit message, update the last checkpoint id for the record batch. logger.Debug(fmt.Sprintf("CommitMessage => CommitLSN: %v, TransactionEndLSN: %v", @@ -546,7 +551,7 @@ func (rp *cdcRecordProcessor[Items]) processMessage( p.commitLock = nil case *pglogrepl.StreamCommitMessageV2: for _, m := range p.txBuffer[msg.Xid] { - if err := rp.processMessage(ctx, p, xld, m, currentClientXlogPos); err != nil { + if err := rp.processMessage(ctx, p, m.lsn, m.msg, currentClientXlogPos); err != nil { return err } } @@ -567,7 +572,7 @@ func (rp *cdcRecordProcessor[Items]) processMessage( return rp.processRelationMessage(ctx, p, currentClientXlogPos, msg) case *pglogrepl.RelationMessageV2: - p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], &msg.RelationMessage) + p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], MessageLSN{msg: &msg.RelationMessage, lsn: lsn}) case *pglogrepl.StreamStartMessageV2: p.inStream = true case *pglogrepl.StreamStopMessageV2: