Skip to content

Commit

Permalink
cdc replay from catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jun 4, 2024
1 parent c1f3f71 commit ba64ecc
Showing 1 changed file with 28 additions and 2 deletions.
30 changes: 28 additions & 2 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,8 +580,34 @@ func (rp *cdcRecordProcessor[Items]) processMessage(
rp.records.UpdateLatestCheckpoint(int64(msg.CommitLSN))
p.commitLock = nil
case *pglogrepl.StreamCommitMessageV2:
// TODO first replay streams from catalog
// TODO select streams from v2cdc where flow_name = $1 and xid = $2 order by lsn
// TODO track first stream bit so we can skip reading catalog when transaction is in single batch
rows, err := p.CatalogPool.Query(ctx,
"select stream from v2cdc where flow_name = $1 and xid = $2 order by lsn",
p.FlowJobName, msg.Xid)
if err != nil {
return err
}
for rows.Next() {
var stream [][]byte
if err := rows.Scan(&stream); err != nil {
return err
}

for _, m := range stream {
mxld, err := pglogrepl.ParseXLogData(m)
if err != nil {
return err
}
logicalMsg, err = pglogrepl.ParseV2(mxld.WALData, p.inStream)
if err != nil {
return err
}
if err := rp.processMessage(ctx, p, mxld.WALStart, logicalMsg, currentClientXlogPos); err != nil {
return err
}
}
}

txbufs := p.txBuffer[msg.Xid]
for _, txbuf := range txbufs {
for _, m := range txbuf.Streams {
Expand Down

0 comments on commit ba64ecc

Please sign in to comment.