diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index f253bf228..a87f1af48 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -6,6 +6,7 @@ import ( "log/slog" "time" + "github.com/jackc/pglogrepl" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" @@ -135,7 +136,7 @@ func (p *PostgresMetadata) GetLastNormalizeBatchID(ctx context.Context, jobName } func (p *PostgresMetadata) SetLastOffset(ctx context.Context, jobName string, offset int64) error { - p.logger.Info("updating last offset", "offset", offset) + p.logger.Debug("updating last offset", slog.String("offset", pglogrepl.LSN(offset).String())) _, err := p.pool.Exec(ctx, ` INSERT INTO `+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id) VALUES ($1, $2, $3) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 65924c13f..be93ecaa2 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -495,15 +495,13 @@ func PullCdcRecords[Items model.Items]( return fmt.Errorf("ParsePrimaryKeepaliveMessage failed: %w", err) } - logger.Debug("Primary Keepalive Message", slog.Any("data", pkm)) + logger.Debug("Primary Keepalive Message", slog.Bool("replyRequested", pkm.ReplyRequested), + slog.String("ServerWALEnd", pkm.ServerWALEnd.String()), slog.String("ServerTime", pkm.ServerTime.String())) if pkm.ServerWALEnd > clientXLogPos { clientXLogPos = pkm.ServerWALEnd } - - if pkm.ReplyRequested { - pkmRequiresResponse = true - } + pkmRequiresResponse = true case pglogrepl.XLogDataByteID: xld, err := pglogrepl.ParseXLogData(msg.Data[1:]) @@ -704,7 +702,7 @@ func processMessage[Items model.Items]( logger.Info("LogicalDecodingMessage", slog.Bool("Transactional", msg.Transactional), slog.String("Prefix", msg.Prefix), - slog.Int64("LSN", int64(msg.LSN))) + slog.String("LSN", msg.LSN.String())) if !msg.Transactional { batch.UpdateLatestCheckpoint(int64(msg.LSN)) }