Skip to content

Commit

Permalink
correct lsn
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed May 14, 2024
1 parent bd65c7a commit 8cb1209
Showing 1 changed file with 19 additions and 14 deletions.
33 changes: 19 additions & 14 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,17 @@ import (
"github.com/PeerDB-io/peer-flow/model/qvalue"
)

type MessageLSN struct {
msg pglogrepl.Message
lsn pglogrepl.LSN
}

type PostgresCDCSource struct {
*PostgresConnector
*PostgresCDCConfig
typeMap *pgtype.Map
commitLock *pglogrepl.BeginMessage
txBuffer map[uint32][]pglogrepl.Message
txBuffer map[uint32][]MessageLSN
inStream bool
}

Expand All @@ -55,9 +60,9 @@ type startReplicationOpts struct {
}

func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) *PostgresCDCSource {
var txBuffer map[uint32][]pglogrepl.Message
var txBuffer map[uint32][]MessageLSN
if cdcConfig.Version >= 2 {
txBuffer = make(map[uint32][]pglogrepl.Message)
txBuffer = make(map[uint32][]MessageLSN)
}
return &PostgresCDCSource{
PostgresConnector: c,
Expand Down Expand Up @@ -279,9 +284,9 @@ func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, forma
type cdcRecordProcessor[Items model.Items] struct {
recordStore *utils.CdcStore[Items]
records *model.CDCStream[Items]
nextStandbyMessageDeadline time.Time
pullRequest *model.PullRecordsRequest[Items]
processor replProcessor[Items]
nextStandbyMessageDeadline time.Time
}

func (rp *cdcRecordProcessor[Items]) addRecordWithKey(logger log.Logger, key model.TableWithPkey, rec model.Record[Items]) error {
Expand Down Expand Up @@ -509,13 +514,13 @@ func (rp *cdcRecordProcessor[Items]) processXLogData(
if err != nil {
return fmt.Errorf("error parsing logical message: %w", err)
}
return rp.processMessage(ctx, p, xld, logicalMsg, currentClientXlogPos)
return rp.processMessage(ctx, p, xld.WALStart, logicalMsg, currentClientXlogPos)
}

func (rp *cdcRecordProcessor[Items]) processMessage(
ctx context.Context,
p *PostgresCDCSource,
xld pglogrepl.XLogData,
lsn pglogrepl.LSN,
logicalMsg pglogrepl.Message,
currentClientXlogPos pglogrepl.LSN,
) error {
Expand All @@ -527,17 +532,17 @@ func (rp *cdcRecordProcessor[Items]) processMessage(
logger.Debug("Locking PullRecords at BeginMessage, awaiting CommitMessage")
p.commitLock = msg
case *pglogrepl.InsertMessage:
return rp.processInsertMessage(p, xld.WALStart, msg)
return rp.processInsertMessage(p, lsn, msg)
case *pglogrepl.InsertMessageV2:
p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], &msg.InsertMessage)
p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], MessageLSN{msg: &msg.InsertMessage, lsn: lsn})
case *pglogrepl.UpdateMessage:
return rp.processUpdateMessage(p, xld.WALStart, msg)
return rp.processUpdateMessage(p, lsn, msg)
case *pglogrepl.UpdateMessageV2:
p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], &msg.UpdateMessage)
p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], MessageLSN{msg: &msg.UpdateMessage, lsn: lsn})
case *pglogrepl.DeleteMessage:
return rp.processDeleteMessage(p, xld.WALStart, msg)
return rp.processDeleteMessage(p, lsn, msg)
case *pglogrepl.DeleteMessageV2:
p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], &msg.DeleteMessage)
p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], MessageLSN{msg: &msg.DeleteMessage, lsn: lsn})
case *pglogrepl.CommitMessage:
// for a commit message, update the last checkpoint id for the record batch.
logger.Debug(fmt.Sprintf("CommitMessage => CommitLSN: %v, TransactionEndLSN: %v",
Expand All @@ -546,7 +551,7 @@ func (rp *cdcRecordProcessor[Items]) processMessage(
p.commitLock = nil
case *pglogrepl.StreamCommitMessageV2:
for _, m := range p.txBuffer[msg.Xid] {
if err := rp.processMessage(ctx, p, xld, m, currentClientXlogPos); err != nil {
if err := rp.processMessage(ctx, p, m.lsn, m.msg, currentClientXlogPos); err != nil {
return err
}
}
Expand All @@ -567,7 +572,7 @@ func (rp *cdcRecordProcessor[Items]) processMessage(

return rp.processRelationMessage(ctx, p, currentClientXlogPos, msg)
case *pglogrepl.RelationMessageV2:
p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], &msg.RelationMessage)
p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], MessageLSN{msg: &msg.RelationMessage, lsn: lsn})
case *pglogrepl.StreamStartMessageV2:
p.inStream = true
case *pglogrepl.StreamStopMessageV2:
Expand Down

0 comments on commit 8cb1209

Please sign in to comment.