Skip to content

Commit

Permalink
[cdc] treat all pkms as reply_requested (#2353)
Browse files Browse the repository at this point in the history
Looking at walsender code, `PrimaryKeepaliveMessages` are _mostly_ only
sent with `reply_requested` when `wal_sender_timeout` is non-zero. We
recommend `wal_sender_timeout` be set to 0. We currently only send a
`StandbyStatusMessage` when we see a pkm that has `reply_requested` so
we almost never send them.

Treat all pkms as `reply_requested` for hopefully more aggressive slot
flushes.
  • Loading branch information
heavycrystal authored Dec 17, 2024
1 parent 02da43f commit 55a7dc6
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 7 deletions.
3 changes: 2 additions & 1 deletion flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log/slog"
"time"

"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
Expand Down Expand Up @@ -135,7 +136,7 @@ func (p *PostgresMetadata) GetLastNormalizeBatchID(ctx context.Context, jobName
}

func (p *PostgresMetadata) SetLastOffset(ctx context.Context, jobName string, offset int64) error {
p.logger.Info("updating last offset", "offset", offset)
p.logger.Debug("updating last offset", slog.String("offset", pglogrepl.LSN(offset).String()))
_, err := p.pool.Exec(ctx, `
INSERT INTO `+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id)
VALUES ($1, $2, $3)
Expand Down
10 changes: 4 additions & 6 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,15 +495,13 @@ func PullCdcRecords[Items model.Items](
return fmt.Errorf("ParsePrimaryKeepaliveMessage failed: %w", err)
}

logger.Debug("Primary Keepalive Message", slog.Any("data", pkm))
logger.Debug("Primary Keepalive Message", slog.Bool("replyRequested", pkm.ReplyRequested),
slog.String("ServerWALEnd", pkm.ServerWALEnd.String()), slog.String("ServerTime", pkm.ServerTime.String()))

if pkm.ServerWALEnd > clientXLogPos {
clientXLogPos = pkm.ServerWALEnd
}

if pkm.ReplyRequested {
pkmRequiresResponse = true
}
pkmRequiresResponse = true

case pglogrepl.XLogDataByteID:
xld, err := pglogrepl.ParseXLogData(msg.Data[1:])
Expand Down Expand Up @@ -704,7 +702,7 @@ func processMessage[Items model.Items](
logger.Info("LogicalDecodingMessage",
slog.Bool("Transactional", msg.Transactional),
slog.String("Prefix", msg.Prefix),
slog.Int64("LSN", int64(msg.LSN)))
slog.String("LSN", msg.LSN.String()))
if !msg.Transactional {
batch.UpdateLatestCheckpoint(int64(msg.LSN))
}
Expand Down

0 comments on commit 55a7dc6

Please sign in to comment.