diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 92f6827182..acdeb105b0 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -580,8 +580,34 @@ func (rp *cdcRecordProcessor[Items]) processMessage( rp.records.UpdateLatestCheckpoint(int64(msg.CommitLSN)) p.commitLock = nil case *pglogrepl.StreamCommitMessageV2: - // TODO first replay streams from catalog - // TODO select streams from v2cdc where flow_name = $1 and xid = $2 order by lsn + // TODO track first stream bit so we can skip reading catalog when transaction is in single batch + rows, err := p.CatalogPool.Query(ctx, + "select stream from v2cdc where flow_name = $1 and xid = $2 order by lsn", + p.FlowJobName, msg.Xid) + if err != nil { + return err + } + for rows.Next() { + var stream [][]byte + if err := rows.Scan(&stream); err != nil { + return err + } + + for _, m := range stream { + mxld, err := pglogrepl.ParseXLogData(m) + if err != nil { + return err + } + logicalMsg, err = pglogrepl.ParseV2(mxld.WALData, p.inStream) + if err != nil { + return err + } + if err := rp.processMessage(ctx, p, mxld.WALStart, logicalMsg, currentClientXlogPos); err != nil { + return err + } + } + } + txbufs := p.txBuffer[msg.Xid] for _, txbuf := range txbufs { for _, m := range txbuf.Streams {