From 55a7dc6c0944fc4529fdab8e60335dde4fdf188e Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Tue, 17 Dec 2024 23:19:02 +0530 Subject: [PATCH] [cdc] treat all pkms as reply_requested (#2353) Looking at walsender code, `PrimaryKeepaliveMessages` are _mostly_ only sent with `reply_requested` when `wal_sender_timeout` is non-zero. We recommend `wal_sender_timeout` be set to 0. We currently only send a `StandbyStatusMessage` when we see a pkm that has `reply_requested` so we almost never send them. Treat all pkms as `reply_requested` for hopefully more aggressive slot flushes. --- flow/connectors/external_metadata/store.go | 3 ++- flow/connectors/postgres/cdc.go | 10 ++++------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index f253bf228..a87f1af48 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -6,6 +6,7 @@ import ( "log/slog" "time" + "github.com/jackc/pglogrepl" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" @@ -135,7 +136,7 @@ func (p *PostgresMetadata) GetLastNormalizeBatchID(ctx context.Context, jobName } func (p *PostgresMetadata) SetLastOffset(ctx context.Context, jobName string, offset int64) error { - p.logger.Info("updating last offset", "offset", offset) + p.logger.Debug("updating last offset", slog.String("offset", pglogrepl.LSN(offset).String())) _, err := p.pool.Exec(ctx, ` INSERT INTO `+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id) VALUES ($1, $2, $3) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 65924c13f..be93ecaa2 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -495,15 +495,13 @@ func PullCdcRecords[Items model.Items]( return fmt.Errorf("ParsePrimaryKeepaliveMessage failed: %w", err) } - logger.Debug("Primary Keepalive Message", slog.Any("data", pkm)) + logger.Debug("Primary Keepalive Message", slog.Bool("replyRequested", pkm.ReplyRequested), + slog.String("ServerWALEnd", pkm.ServerWALEnd.String()), slog.String("ServerTime", pkm.ServerTime.String())) if pkm.ServerWALEnd > clientXLogPos { clientXLogPos = pkm.ServerWALEnd } - - if pkm.ReplyRequested { - pkmRequiresResponse = true - } + pkmRequiresResponse = true case pglogrepl.XLogDataByteID: xld, err := pglogrepl.ParseXLogData(msg.Data[1:]) @@ -704,7 +702,7 @@ func processMessage[Items model.Items]( logger.Info("LogicalDecodingMessage", slog.Bool("Transactional", msg.Transactional), slog.String("Prefix", msg.Prefix), - slog.Int64("LSN", int64(msg.LSN))) + slog.String("LSN", msg.LSN.String())) if !msg.Transactional { batch.UpdateLatestCheckpoint(int64(msg.LSN)) }