diff --git a/pkg/source/postgres.go b/pkg/source/postgres.go index 7091ca6..27c7307 100644 --- a/pkg/source/postgres.go +++ b/pkg/source/postgres.go @@ -164,17 +164,20 @@ func (p *PGXSource) Capture(cp cursor.Checkpoint) (changes chan Change, err erro } func (p *PGXSource) fetching(ctx context.Context) (change Change, err error) { - if time.Now().After(p.nextReportTime) { - if err = p.reportLSN(ctx); err != nil { - return change, err + defer func() { + needReply := isTimeout(err) + if time.Now().After(p.nextReportTime) || needReply { + if err = p.reportLSN(ctx, needReply); err != nil { + p.log.WithFields(logrus.Fields{"Error": err}).Error("failed to report LSN") + } + p.nextReportTime = time.Now().Add(5 * time.Second) } - p.nextReportTime = time.Now().Add(5 * time.Second) - } + }() + msg, err := p.replConn.ReceiveMessage(ctx) if err != nil { return change, err } - switch msg := msg.(type) { case *pgproto3.CopyData: switch msg.Data[0] { @@ -259,9 +262,15 @@ func (p *PGXSource) committedLSN() (lsn pglogrepl.LSN) { return pglogrepl.LSN(atomic.LoadUint64(&p.ackLsn)) } -func (p *PGXSource) reportLSN(ctx context.Context) error { +func (p *PGXSource) reportLSN(ctx context.Context, replyRequested bool) error { if committed := p.committedLSN(); committed != 0 { - return pglogrepl.SendStandbyStatusUpdate(ctx, p.replConn, pglogrepl.StandbyStatusUpdate{WALWritePosition: committed}) + return pglogrepl.SendStandbyStatusUpdate(ctx, + p.replConn, + pglogrepl.StandbyStatusUpdate{ + WALWritePosition: committed, + ReplyRequested: replyRequested, + }, + ) } return nil } @@ -272,7 +281,7 @@ func (p *PGXSource) cleanup() { p.setupConn.Close(ctx) } if p.replConn != nil { - p.reportLSN(ctx) + p.reportLSN(ctx, false) p.replConn.Close(ctx) } }