From 5d583c3384ac5e0cd533369f22299fdefa903ced Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Sat, 30 Nov 2024 01:53:54 +0530 Subject: [PATCH 1/2] retry slot drop for active pid --- flow/e2e/congen.go | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index 91c5817d40..323c498ab5 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -7,7 +7,9 @@ import ( "testing" "time" + "github.com/jackc/pgerrcode" "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" "github.com/stretchr/testify/require" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" @@ -23,12 +25,26 @@ func cleanPostgres(conn *pgx.Conn, suffix string) error { } // drop all open slots with the given suffix - if _, err := conn.Exec( - context.Background(), - "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name LIKE $1", - "%_"+suffix, - ); err != nil { - return fmt.Errorf("failed to drop replication slots: %w", err) + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + deadline := time.Now().Add(2 * time.Minute) + for { + _, err := conn.Exec( + context.Background(), + "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name LIKE $1", + "%_"+suffix, + ) + if err == nil { + break + } + if time.Now().After(deadline) { + return fmt.Errorf("failed to drop replication slots: %w", err) + } + var pgxErr *pgconn.PgError + if !errors.As(err, &pgxErr) || pgxErr.Code != pgerrcode.ObjectInUse { + return fmt.Errorf("failed to drop replication slots: %w", err) + } + <-ticker.C } // list all publications from pg_publication table From 1b8d2f9766762145187302fad0798bf9b702ad13 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Sat, 30 Nov 2024 01:55:47 +0530 Subject: [PATCH 2/2] change to 1 min --- flow/e2e/congen.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index 323c498ab5..8a3b5e18e0 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -27,7 +27,7 @@ func cleanPostgres(conn *pgx.Conn, suffix string) error { // drop all open slots with the given suffix ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() - deadline := time.Now().Add(2 * time.Minute) + deadline := time.Now().Add(1 * time.Minute) for { _, err := conn.Exec( context.Background(),