diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 70b0d15d1d..6ab8c32c9a 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -6,6 +6,7 @@ import ( "fmt" "log/slog" "strings" + "time" "github.com/jackc/pgerrcode" "github.com/jackc/pglogrepl" @@ -643,15 +644,76 @@ func (c *PostgresConnector) getTableNametoUnchangedCols( return resultMap, nil } +func (c *PostgresConnector) executeReadOnlyTxn(ctx context.Context, fn func(tx pgx.Tx) error) error { + tx, err := c.conn.BeginTx(ctx, pgx.TxOptions{IsoLevel: pgx.ReadCommitted, AccessMode: pgx.ReadOnly}) + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + + defer func() { + if rollbackErr := tx.Rollback(ctx); rollbackErr != nil && rollbackErr != pgx.ErrTxClosed { + c.logger.Error("rollback failed", slog.Any("error", rollbackErr)) + } + }() + + if err := fn(tx); err != nil { + return fmt.Errorf("transaction function execution failed: %w", err) + } + + if err := tx.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + + return nil +} + +func (c *PostgresConnector) inRetryableReadOnlyTxn(ctx context.Context, fn func(tx pgx.Tx) error) error { + var lastErr error + const maxRetries = 3 + const retryDelay = 5 * time.Second + + for range maxRetries { + if err := c.executeReadOnlyTxn(ctx, fn); err != nil { + lastErr = err + if ctx.Err() != nil { + return ctx.Err() + } + time.Sleep(retryDelay) + continue + } + return nil + } + + return fmt.Errorf("transaction failed after %d retries: %w", maxRetries, lastErr) +} + func (c *PostgresConnector) getCurrentLSN(ctx context.Context) (pglogrepl.LSN, error) { - row := c.conn.QueryRow(ctx, - "SELECT CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() ELSE pg_current_wal_lsn() END") - var result pgtype.Text - err := row.Scan(&result) + var lsn pglogrepl.LSN + err := c.inRetryableReadOnlyTxn(ctx, func(tx pgx.Tx) error { + query := ` + SELECT CASE + WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() + ELSE pg_current_wal_lsn() + END` + + var lsnString string + if err := tx.QueryRow(ctx, query).Scan(&lsnString); err != nil { + return fmt.Errorf("query failed: %w", err) + } + + parsedLSN, err := pglogrepl.ParseLSN(lsnString) + if err != nil { + return fmt.Errorf("invalid LSN format - %s: %w", lsnString, err) + } + lsn = parsedLSN + return nil + }) + if err != nil { - return 0, fmt.Errorf("error while running query: %w", err) + return 0, err } - return pglogrepl.ParseLSN(result.String) + + return lsn, nil } func (c *PostgresConnector) getDefaultPublicationName(jobName string) string {