Skip to content

Commit

Permalink
update low-level replConn test
Browse files Browse the repository at this point in the history
It only sends commit IDs for sequenced transactions now because it
needs to work with concurrent transactions not created by a pg.DB
instance.
  • Loading branch information
jchappelow committed Mar 11, 2024
1 parent a2fa2ae commit 9d4dac6
Showing 1 changed file with 20 additions and 7 deletions.
27 changes: 20 additions & 7 deletions internal/sql/pg/repl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ func Test_repl(t *testing.T) {

ctx, cancel := context.WithDeadline(ctx, deadline.Add(-time.Second*5))
defer cancel()
connQ, err := pgx.Connect(ctx, connString(host, port, user, pass, dbName, false))
if err != nil {
t.Fatal(err)
}
_, err = connQ.Exec(ctx, sqlUpdateSentrySeq, 0)
if err != nil {
t.Fatal(err)
}

schemaFilter := func(string) bool { return true } // capture changes from all namespaces

Expand All @@ -56,11 +64,6 @@ func Test_repl(t *testing.T) {

t.Log("replication slot started and listening")

connQ, err := pgx.Connect(ctx, connString(host, port, user, pass, dbName, false))
if err != nil {
t.Fatal(err)
}

_, err = connQ.Exec(ctx, `DROP TABLE IF EXISTS blah`)
if err != nil {
t.Fatal(err)
Expand All @@ -71,7 +74,7 @@ func Test_repl(t *testing.T) {
t.Fatal(err)
}

wantCommitHash, _ := hex.DecodeString("9710a1c3b624c5a929425963c7441b0d8cf7d2bcf98aaaf8bc61519543aed1bc")
wantCommitHash, _ := hex.DecodeString("cb390afbf808256307ee0927999805ee3d5af193772e2c9b71823fbc1fe8867f")

var wg sync.WaitGroup
wg.Add(1)
Expand All @@ -91,7 +94,11 @@ func Test_repl(t *testing.T) {
}
cancel()
case err := <-errChan:
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
if errors.Is(err, context.Canceled) {
return
}
if errors.Is(err, context.DeadlineExceeded) {
t.Error("timeout")
return
}
if err != nil {
Expand All @@ -112,6 +119,12 @@ func Test_repl(t *testing.T) {
tx.Exec(ctx, `update blah SET stuff = 6, id = '{13}', val=41 where id = '{10}';`)
tx.Exec(ctx, `update blah SET stuff = 33;`)
tx.Exec(ctx, `delete FROM blah where id = '{11}';`)
// sends on commitChan are only expected from sequenced transactions.
// Bump seq in the sentry table!
_, err = tx.Exec(ctx, sqlUpdateSentrySeq, 1)
if err != nil {
t.Fatal(err)
}

err = tx.Commit(ctx) // this triggers the send
if err != nil {
Expand Down

0 comments on commit 9d4dac6

Please sign in to comment.