Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap get current LSN in txn as pgx will retry #2284

Closed
wants to merge 2 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 54 additions & 15 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log/slog"
"strings"
"time"

"github.com/jackc/pgerrcode"
"github.com/jackc/pglogrepl"
Expand Down Expand Up @@ -643,35 +644,73 @@ func (c *PostgresConnector) getTableNametoUnchangedCols(
return resultMap, nil
}

func (c *PostgresConnector) getCurrentLSN(ctx context.Context) (pglogrepl.LSN, error) {
tx, err := c.conn.BeginTx(ctx, pgx.TxOptions{IsoLevel: pgx.ReadCommitted})
func (c *PostgresConnector) executeReadOnlyTxn(ctx context.Context, fn func(tx pgx.Tx) error) error {
tx, err := c.conn.BeginTx(ctx, pgx.TxOptions{IsoLevel: pgx.ReadCommitted, AccessMode: pgx.ReadOnly})
if err != nil {
return 0, fmt.Errorf("begin transaction failed: %w", err)
return fmt.Errorf("failed to begin transaction: %w", err)
}

defer func() {
if rollbackErr := tx.Rollback(ctx); rollbackErr != nil && rollbackErr != pgx.ErrTxClosed {
c.logger.Error("rollback failed", slog.Any("error", rollbackErr))
}
}()

query := `
SELECT CASE
WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn()
ELSE pg_current_wal_lsn()
END`

var lsnString string
if err := tx.QueryRow(ctx, query).Scan(&lsnString); err != nil {
return 0, fmt.Errorf("query failed: %w", err)
if err := fn(tx); err != nil {
Copy link
Contributor

@serprex serprex Nov 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this accomplishes what you want still, you need to reconnect on net errors. Since retry logic of pgx exists in the pooling code

At that point there's no benefit to starting read only transaction since we aren't dealing with pooling

return fmt.Errorf("transaction function execution failed: %w", err)
}

if err := tx.Commit(ctx); err != nil {
return 0, fmt.Errorf("commit transaction failed: %w", err)
return fmt.Errorf("failed to commit transaction: %w", err)
}

return nil
}

func (c *PostgresConnector) inRetryableReadOnlyTxn(ctx context.Context, fn func(tx pgx.Tx) error) error {
var lastErr error
const maxRetries = 3
const retryDelay = 5 * time.Second

for range maxRetries {
if err := c.executeReadOnlyTxn(ctx, fn); err != nil {
lastErr = err
if ctx.Err() != nil {
return ctx.Err()
}
time.Sleep(retryDelay)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will give an extra sleep on last try

continue
}
return nil
}

lsn, err := pglogrepl.ParseLSN(lsnString)
return fmt.Errorf("transaction failed after %d retries: %w", maxRetries, lastErr)
}

func (c *PostgresConnector) getCurrentLSN(ctx context.Context) (pglogrepl.LSN, error) {
var lsn pglogrepl.LSN
err := c.inRetryableReadOnlyTxn(ctx, func(tx pgx.Tx) error {
query := `
SELECT CASE
WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn()
ELSE pg_current_wal_lsn()
END`

var lsnString string
if err := tx.QueryRow(ctx, query).Scan(&lsnString); err != nil {
return fmt.Errorf("query failed: %w", err)
}

parsedLSN, err := pglogrepl.ParseLSN(lsnString)
if err != nil {
return fmt.Errorf("invalid LSN format - %s: %w", lsnString, err)
}
lsn = parsedLSN
return nil
})

if err != nil {
return 0, fmt.Errorf("invalid LSN format - %s: %w", lsnString, err)
return 0, err
}

return lsn, nil
Expand Down
Loading