Skip to content

Commit

Permalink
Remove testify/suite from connectors tests
Browse files Browse the repository at this point in the history
Splitting up changes from #871
  • Loading branch information
serprex committed Dec 27, 2023
1 parent a8f8061 commit 00c0e19
Show file tree
Hide file tree
Showing 14 changed files with 334 additions and 335 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ jobs:

- name: run tests
run: |
gotestsum --format testname -- -p 8 ./... -timeout 1200s
gotestsum --format testname -- -p 16 ./... -timeout 1200s
working-directory: ./flow
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
Expand Down
85 changes: 50 additions & 35 deletions flow/connectors/postgres/postgres_repl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,106 +7,121 @@ import (
"testing"
"time"

"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/ysmood/got"
)

type PostgresReplicationSnapshotTestSuite struct {
suite.Suite
t *testing.T

connector *PostgresConnector
schema string
}

func (suite *PostgresReplicationSnapshotTestSuite) SetupSuite() {
var err error
suite.connector, err = NewPostgresConnector(context.Background(), &protos.PostgresConfig{
func setupSuite(t *testing.T) PostgresReplicationSnapshotTestSuite {
t.Helper()

connector, err := NewPostgresConnector(context.Background(), &protos.PostgresConfig{
Host: "localhost",
Port: 7132,
User: "postgres",
Password: "postgres",
Database: "postgres",
}, true)
require.NoError(suite.T(), err)
require.NoError(t, err)

setupTx, err := suite.connector.pool.Begin(context.Background())
require.NoError(suite.T(), err)
setupTx, err := connector.pool.Begin(context.Background())
require.NoError(t, err)
defer func() {
err := setupTx.Rollback(context.Background())
if err != pgx.ErrTxClosed {
require.NoError(suite.T(), err)
require.NoError(t, err)
}
}()

_, err = setupTx.Exec(context.Background(), "DROP SCHEMA IF EXISTS pgpeer_repl_test CASCADE")
require.NoError(suite.T(), err)
schema := "repltest_" + shared.RandomString(8)

_, err = setupTx.Exec(context.Background(),
fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", schema))
require.NoError(t, err)

_, err = setupTx.Exec(context.Background(), "CREATE SCHEMA pgpeer_repl_test")
require.NoError(suite.T(), err)
_, err = setupTx.Exec(context.Background(),
fmt.Sprintf("CREATE SCHEMA %s", schema))
require.NoError(t, err)

// setup 3 tables in pgpeer_repl_test schema
// test_1, test_2, test_3, all have 5 columns all text, c1, c2, c3, c4, c5
// setup 3 tables test_1, test_2, test_3
// all have 5 text columns c1, c2, c3, c4, c5
tables := []string{"test_1", "test_2", "test_3"}
for _, table := range tables {
_, err = setupTx.Exec(context.Background(),
fmt.Sprintf("CREATE TABLE pgpeer_repl_test.%s (c1 text, c2 text, c3 text, c4 text, c5 text)", table))
require.NoError(suite.T(), err)
fmt.Sprintf("CREATE TABLE %s.%s (c1 text, c2 text, c3 text, c4 text, c5 text)", schema, table))
require.NoError(t, err)
}

err = setupTx.Commit(context.Background())
require.NoError(suite.T(), err)
require.NoError(t, err)

return PostgresReplicationSnapshotTestSuite{
t: t,
connector: connector,
schema: schema,
}
}

func (suite *PostgresReplicationSnapshotTestSuite) TearDownSuite() {
func (suite PostgresReplicationSnapshotTestSuite) TearDownSuite() {
teardownTx, err := suite.connector.pool.Begin(context.Background())
require.NoError(suite.T(), err)
require.NoError(suite.t, err)
defer func() {
err := teardownTx.Rollback(context.Background())
if err != pgx.ErrTxClosed {
require.NoError(suite.T(), err)
require.NoError(suite.t, err)
}
}()

_, err = teardownTx.Exec(context.Background(), "DROP SCHEMA IF EXISTS pgpeer_test CASCADE")
require.NoError(suite.T(), err)
_, err = teardownTx.Exec(context.Background(),
fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", suite.schema))
require.NoError(suite.t, err)

// Fetch all the publications
rows, err := teardownTx.Query(context.Background(),
"SELECT pubname FROM pg_publication WHERE pubname LIKE $1", fmt.Sprintf("%%%s", "test_simple_slot_creation"))
require.NoError(suite.T(), err)
require.NoError(suite.t, err)

// Iterate over the publications and drop them
for rows.Next() {
var pubname pgtype.Text
err := rows.Scan(&pubname)
require.NoError(suite.T(), err)
require.NoError(suite.t, err)

// Drop the publication in a new transaction
_, err = suite.connector.pool.Exec(context.Background(), fmt.Sprintf("DROP PUBLICATION %s", pubname.String))
require.NoError(suite.T(), err)
require.NoError(suite.t, err)
}

_, err = teardownTx.Exec(context.Background(),
"SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name LIKE $1",
fmt.Sprintf("%%%s", "test_simple_slot_creation"))
require.NoError(suite.T(), err)
require.NoError(suite.t, err)

err = teardownTx.Commit(context.Background())
require.NoError(suite.T(), err)
require.NoError(suite.t, err)

suite.True(suite.connector.ConnectionActive() == nil)
require.True(suite.t, suite.connector.ConnectionActive() == nil)

err = suite.connector.Close()
require.NoError(suite.T(), err)
require.NoError(suite.t, err)

suite.False(suite.connector.ConnectionActive() == nil)
require.False(suite.t, suite.connector.ConnectionActive() == nil)
}

func (suite *PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() {
func (suite PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() {
tables := map[string]string{
"pgpeer_repl_test.test_1": "test_1_dst",
suite.schema + ".test_1": "test_1_dst",
}

flowJobName := "test_simple_slot_creation"
Expand All @@ -121,7 +136,7 @@ func (suite *PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() {
// Moved to a go routine
go func() {
err := suite.connector.SetupReplication(signal, setupReplicationInput)
require.NoError(suite.T(), err)
require.NoError(suite.t, err)
}()

slog.Info("waiting for slot creation to complete", flowLog)
Expand All @@ -136,5 +151,5 @@ func (suite *PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() {
}

func TestPostgresReplTestSuite(t *testing.T) {
suite.Run(t, new(PostgresReplicationSnapshotTestSuite))
got.Each(t, e2eshared.GotSuite(setupSuite))
}
94 changes: 48 additions & 46 deletions flow/connectors/postgres/postgres_schema_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,79 +5,81 @@ import (
"fmt"
"testing"

"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/jackc/pgx/v5"
"github.com/stretchr/testify/suite"
"github.com/stretchr/testify/require"
"github.com/ysmood/got"
)

type PostgresSchemaDeltaTestSuite struct {
suite.Suite
t *testing.T

connector *PostgresConnector
}

const schemaDeltaTestSchemaName = "pgschema_delta_test"

func (suite *PostgresSchemaDeltaTestSuite) failTestError(err error) {
if err != nil {
suite.FailNow(err.Error())
}
}
func setupSchemaDeltaSuite(t *testing.T) PostgresSchemaDeltaTestSuite {
t.Helper()

func (suite *PostgresSchemaDeltaTestSuite) SetupSuite() {
var err error
suite.connector, err = NewPostgresConnector(context.Background(), &protos.PostgresConfig{
connector, err := NewPostgresConnector(context.Background(), &protos.PostgresConfig{
Host: "localhost",
Port: 7132,
User: "postgres",
Password: "postgres",
Database: "postgres",
}, false)
suite.failTestError(err)
require.NoError(t, err)

setupTx, err := suite.connector.pool.Begin(context.Background())
suite.failTestError(err)
setupTx, err := connector.pool.Begin(context.Background())
require.NoError(t, err)
defer func() {
err := setupTx.Rollback(context.Background())
if err != pgx.ErrTxClosed {
suite.failTestError(err)
require.NoError(t, err)
}
}()
_, err = setupTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE",
schemaDeltaTestSchemaName))
suite.failTestError(err)
require.NoError(t, err)
_, err = setupTx.Exec(context.Background(), fmt.Sprintf("CREATE SCHEMA %s", schemaDeltaTestSchemaName))
suite.failTestError(err)
require.NoError(t, err)
err = setupTx.Commit(context.Background())
suite.failTestError(err)
require.NoError(t, err)
return PostgresSchemaDeltaTestSuite{
t: t,
connector: connector,
}
}

func (suite *PostgresSchemaDeltaTestSuite) TearDownSuite() {
func (suite PostgresSchemaDeltaTestSuite) TearDownSuite() {
teardownTx, err := suite.connector.pool.Begin(context.Background())
suite.failTestError(err)
require.NoError(suite.t, err)
defer func() {
err := teardownTx.Rollback(context.Background())
if err != pgx.ErrTxClosed {
suite.failTestError(err)
require.NoError(suite.t, err)
}
}()
_, err = teardownTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE",
schemaDeltaTestSchemaName))
suite.failTestError(err)
require.NoError(suite.t, err)
err = teardownTx.Commit(context.Background())
suite.failTestError(err)
require.NoError(suite.t, err)

suite.True(suite.connector.ConnectionActive() == nil)
require.True(suite.t, suite.connector.ConnectionActive() == nil)
err = suite.connector.Close()
suite.failTestError(err)
suite.False(suite.connector.ConnectionActive() == nil)
require.NoError(suite.t, err)
require.False(suite.t, suite.connector.ConnectionActive() == nil)
}

func (suite *PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() {
func (suite PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() {
tableName := fmt.Sprintf("%s.simple_add_column", schemaDeltaTestSchemaName)
_, err := suite.connector.pool.Exec(context.Background(),
fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName))
suite.failTestError(err)
require.NoError(suite.t, err)

err = suite.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{
SrcTableName: tableName,
Expand All @@ -87,13 +89,13 @@ func (suite *PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() {
ColumnType: string(qvalue.QValueKindInt64),
}},
}})
suite.failTestError(err)
require.NoError(suite.t, err)

output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{
TableIdentifiers: []string{tableName},
})
suite.failTestError(err)
suite.Equal(&protos.TableSchema{
require.NoError(suite.t, err)
require.Equal(suite.t, &protos.TableSchema{
TableIdentifier: tableName,
Columns: map[string]string{
"id": string(qvalue.QValueKindInt32),
Expand All @@ -103,11 +105,11 @@ func (suite *PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() {
}, output.TableNameSchemaMapping[tableName])
}

func (suite *PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() {
func (suite PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() {
tableName := fmt.Sprintf("%s.add_drop_all_column_types", schemaDeltaTestSchemaName)
_, err := suite.connector.pool.Exec(context.Background(),
fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName))
suite.failTestError(err)
require.NoError(suite.t, err)

expectedTableSchema := &protos.TableSchema{
TableIdentifier: tableName,
Expand Down Expand Up @@ -148,20 +150,20 @@ func (suite *PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() {
DstTableName: tableName,
AddedColumns: addedColumns,
}})
suite.failTestError(err)
require.NoError(suite.t, err)

output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{
TableIdentifiers: []string{tableName},
})
suite.failTestError(err)
suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName])
require.NoError(suite.t, err)
require.Equal(suite.t, expectedTableSchema, output.TableNameSchemaMapping[tableName])
}

func (suite *PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() {
func (suite PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() {
tableName := fmt.Sprintf("%s.add_drop_tricky_column_names", schemaDeltaTestSchemaName)
_, err := suite.connector.pool.Exec(context.Background(),
fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName))
suite.failTestError(err)
require.NoError(suite.t, err)

expectedTableSchema := &protos.TableSchema{
TableIdentifier: tableName,
Expand Down Expand Up @@ -194,20 +196,20 @@ func (suite *PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() {
DstTableName: tableName,
AddedColumns: addedColumns,
}})
suite.failTestError(err)
require.NoError(suite.t, err)

output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{
TableIdentifiers: []string{tableName},
})
suite.failTestError(err)
suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName])
require.NoError(suite.t, err)
require.Equal(suite.t, expectedTableSchema, output.TableNameSchemaMapping[tableName])
}

func (suite *PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() {
func (suite PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() {
tableName := fmt.Sprintf("%s.add_drop_whitespace_column_names", schemaDeltaTestSchemaName)
_, err := suite.connector.pool.Exec(context.Background(),
fmt.Sprintf("CREATE TABLE %s(\" \" INT PRIMARY KEY)", tableName))
suite.failTestError(err)
require.NoError(suite.t, err)

expectedTableSchema := &protos.TableSchema{
TableIdentifier: tableName,
Expand All @@ -234,15 +236,15 @@ func (suite *PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() {
DstTableName: tableName,
AddedColumns: addedColumns,
}})
suite.failTestError(err)
require.NoError(suite.t, err)

output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{
TableIdentifiers: []string{tableName},
})
suite.failTestError(err)
suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName])
require.NoError(suite.t, err)
require.Equal(suite.t, expectedTableSchema, output.TableNameSchemaMapping[tableName])
}

func TestPostgresSchemaDeltaTestSuite(t *testing.T) {
suite.Run(t, new(PostgresSchemaDeltaTestSuite))
got.Each(t, e2eshared.GotSuite(setupSchemaDeltaSuite))
}
Loading

0 comments on commit 00c0e19

Please sign in to comment.