diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index dffa4e0840..ab39326b4c 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -95,7 +95,7 @@ jobs: - name: run tests run: | - gotestsum --format testname -- -p 8 ./... -timeout 1200s + gotestsum --format testname -- -p 16 ./... -timeout 1200s working-directory: ./flow env: AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go index 7d9b8e429c..13aff7f00c 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go @@ -2,22 +2,19 @@ package cdc_records import ( "crypto/rand" + "testing" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/stretchr/testify/suite" - "go.temporal.io/sdk/testsuite" + "github.com/stretchr/testify/require" ) -type CDCRecordStorageTestSuite struct { - suite.Suite - testsuite.WorkflowTestSuite -} +func genKeyAndRec(t *testing.T) (model.TableWithPkey, model.Record) { + t.Helper() -func (s *CDCRecordStorageTestSuite) genKeyAndRec() (model.TableWithPkey, model.Record) { - pkeyColVal := make([]byte, 0, 32) + pkeyColVal := make([]byte, 32) _, err := rand.Read(pkeyColVal) - s.NoError(err) + require.NoError(t, err) key := model.TableWithPkey{ TableName: "test_src_tbl", @@ -39,50 +36,52 @@ func (s *CDCRecordStorageTestSuite) genKeyAndRec() (model.TableWithPkey, model.R return key, rec } -func (s *CDCRecordStorageTestSuite) TestSingleRecord() { +func TestSingleRecord(t *testing.T) { + t.Parallel() cdcRecordsStore := NewCDCRecordsStore("test_single_record") cdcRecordsStore.numRecordsSwitchThreshold = 10 - key, rec := s.genKeyAndRec() + key, rec := genKeyAndRec(t) err := cdcRecordsStore.Set(key, rec) - s.NoError(err) + require.NoError(t, err) // should not spill into DB - s.Equal(1, len(cdcRecordsStore.inMemoryRecords)) - s.Nil(cdcRecordsStore.pebbleDB) + require.Equal(t, 1, len(cdcRecordsStore.inMemoryRecords)) + require.Nil(t, cdcRecordsStore.pebbleDB) reck, ok, err := cdcRecordsStore.Get(key) - s.NoError(err) - s.True(ok) - s.Equal(rec, reck) + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, rec, reck) - s.NoError(cdcRecordsStore.Close()) + require.NoError(t, cdcRecordsStore.Close()) } -func (s *CDCRecordStorageTestSuite) TestRecordsTillSpill() { +func TestRecordsTillSpill(t *testing.T) { + t.Parallel() cdcRecordsStore := NewCDCRecordsStore("test_records_till_spill") cdcRecordsStore.numRecordsSwitchThreshold = 10 // add records upto set limit for i := 0; i < 10; i++ { - key, rec := s.genKeyAndRec() + key, rec := genKeyAndRec(t) err := cdcRecordsStore.Set(key, rec) - s.NoError(err) - s.Equal(i+1, len(cdcRecordsStore.inMemoryRecords)) - s.Nil(cdcRecordsStore.pebbleDB) + require.NoError(t, err) + require.Equal(t, i+1, len(cdcRecordsStore.inMemoryRecords)) + require.Nil(t, cdcRecordsStore.pebbleDB) } // this record should be spilled to DB - key, rec := s.genKeyAndRec() + key, rec := genKeyAndRec(t) err := cdcRecordsStore.Set(key, rec) - s.NoError(err) + require.NoError(t, err) _, ok := cdcRecordsStore.inMemoryRecords[key] - s.False(ok) - s.NotNil(cdcRecordsStore.pebbleDB) + require.False(t, ok) + require.NotNil(t, cdcRecordsStore.pebbleDB) reck, ok, err := cdcRecordsStore.Get(key) - s.NoError(err) - s.True(ok) - s.Equal(rec, reck) + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, rec, reck) - s.NoError(cdcRecordsStore.Close()) + require.NoError(t, cdcRecordsStore.Close()) } diff --git a/flow/e2e/bigquery/bigquery_helper.go b/flow/e2e/bigquery/bigquery_helper.go index 40a42ea64f..0cf697020b 100644 --- a/flow/e2e/bigquery/bigquery_helper.go +++ b/flow/e2e/bigquery/bigquery_helper.go @@ -12,7 +12,7 @@ import ( "cloud.google.com/go/bigquery" "cloud.google.com/go/civil" peer_bq "github.com/PeerDB-io/peer-flow/connectors/bigquery" - "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" @@ -46,7 +46,7 @@ func NewBigQueryTestHelper() (*BigQueryTestHelper, error) { return nil, fmt.Errorf("TEST_BQ_CREDS env var not set") } - content, err := e2e.ReadFileToBytes(jsonPath) + content, err := e2eshared.ReadFileToBytes(jsonPath) if err != nil { return nil, fmt.Errorf("failed to read file: %w", err) } diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 4d47d2ff96..83f51879c0 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" @@ -30,20 +31,7 @@ type PeerFlowE2ETestSuiteBQ struct { } func TestPeerFlowE2ETestSuiteBQ(t *testing.T) { - got.Each(t, func(t *testing.T) PeerFlowE2ETestSuiteBQ { - t.Helper() - - g := got.New(t) - g.Parallel() - - suite := setupSuite(t, g) - - g.Cleanup(func() { - suite.tearDownSuite() - }) - - return suite - }) + got.Each(t, e2eshared.GotSuite(setupSuite)) } func (s PeerFlowE2ETestSuiteBQ) attachSchemaSuffix(tableName string) string { @@ -148,8 +136,7 @@ func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteBQ { } } -// Implement TearDownAllSuite interface to tear down the test suite -func (s PeerFlowE2ETestSuiteBQ) tearDownSuite() { +func (s PeerFlowE2ETestSuiteBQ) TearDownSuite() { err := e2e.TearDownPostgres(s.pool, s.bqSuffix) if err != nil { slog.Error("failed to tear down postgres", slog.Any("error", err)) diff --git a/flow/e2e/s3/s3_helper.go b/flow/e2e/s3/s3_helper.go index 249745dd3c..7c2117d538 100644 --- a/flow/e2e/s3/s3_helper.go +++ b/flow/e2e/s3/s3_helper.go @@ -9,7 +9,7 @@ import ( "time" "github.com/PeerDB-io/peer-flow/connectors/utils" - "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" @@ -34,7 +34,7 @@ func NewS3TestHelper(switchToGCS bool) (*S3TestHelper, error) { bucketName = "peerdb_staging" } - content, err := e2e.ReadFileToBytes(credsPath) + content, err := e2eshared.ReadFileToBytes(credsPath) if err != nil { return nil, fmt.Errorf("failed to read file: %w", err) } diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index f979198635..d1d5e3043e 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -13,6 +13,7 @@ import ( connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" @@ -33,20 +34,7 @@ type PeerFlowE2ETestSuiteSF struct { } func TestPeerFlowE2ETestSuiteSF(t *testing.T) { - got.Each(t, func(t *testing.T) PeerFlowE2ETestSuiteSF { - t.Helper() - - g := got.New(t) - g.Parallel() - - suite := SetupSuite(t, g) - - g.Cleanup(func() { - suite.tearDownSuite() - }) - - return suite - }) + got.Each(t, e2eshared.GotSuite(SetupSuite)) } func (s PeerFlowE2ETestSuiteSF) attachSchemaSuffix(tableName string) string { @@ -101,8 +89,7 @@ func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSF { return suite } -// Implement TearDownAllSuite interface to tear down the test suite -func (s PeerFlowE2ETestSuiteSF) tearDownSuite() { +func (s PeerFlowE2ETestSuiteSF) TearDownSuite() { err := e2e.TearDownPostgres(s.pool, s.pgSuffix) if err != nil { slog.Error("failed to tear down Postgres", slog.Any("error", err)) diff --git a/flow/e2e/snowflake/snowflake_helper.go b/flow/e2e/snowflake/snowflake_helper.go index 0401d34f58..8dd9bfa60a 100644 --- a/flow/e2e/snowflake/snowflake_helper.go +++ b/flow/e2e/snowflake/snowflake_helper.go @@ -9,7 +9,7 @@ import ( "time" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" - "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" @@ -37,7 +37,7 @@ func NewSnowflakeTestHelper() (*SnowflakeTestHelper, error) { return nil, fmt.Errorf("TEST_SF_CREDS env var not set") } - content, err := e2e.ReadFileToBytes(jsonPath) + content, err := e2eshared.ReadFileToBytes(jsonPath) if err != nil { return nil, fmt.Errorf("failed to read file: %w", err) } diff --git a/flow/e2e/snowflake/snowflake_schema_delta_test.go b/flow/e2e/snowflake/snowflake_schema_delta_test.go index c69205dfd1..cfe38093d3 100644 --- a/flow/e2e/snowflake/snowflake_schema_delta_test.go +++ b/flow/e2e/snowflake/snowflake_schema_delta_test.go @@ -7,6 +7,7 @@ import ( "testing" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/ysmood/got" @@ -58,7 +59,7 @@ func setupSchemaDeltaSuite( } } -func (suite SnowflakeSchemaDeltaTestSuite) tearDownSuite() { +func (suite SnowflakeSchemaDeltaTestSuite) TearDownSuite() { err := suite.sfTestHelper.Cleanup() suite.failTestError(err) err = suite.connector.Close() @@ -222,18 +223,5 @@ func (suite SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() { } func TestSnowflakeSchemaDeltaTestSuite(t *testing.T) { - got.Each(t, func(t *testing.T) SnowflakeSchemaDeltaTestSuite { - t.Helper() - - g := got.New(t) - g.Parallel() - - suite := setupSchemaDeltaSuite(t, g) - - g.Cleanup(func() { - suite.tearDownSuite() - }) - - return suite - }) + got.Each(t, e2eshared.GotSuite(setupSchemaDeltaSuite)) } diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 83f4760acd..beb4ae04b9 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "io" "log/slog" "os" "strings" @@ -24,25 +23,6 @@ import ( "go.temporal.io/sdk/testsuite" ) -// ReadFileToBytes reads a file to a byte array. -func ReadFileToBytes(path string) ([]byte, error) { - var ret []byte - - f, err := os.Open(path) - if err != nil { - return ret, fmt.Errorf("failed to open file: %w", err) - } - - defer f.Close() - - ret, err = io.ReadAll(f) - if err != nil { - return ret, fmt.Errorf("failed to read file: %w", err) - } - - return ret, nil -} - func RegisterWorkflowsAndActivities(t *testing.T, env *testsuite.TestWorkflowEnvironment) { t.Helper() diff --git a/flow/e2eshared/e2eshared.go b/flow/e2eshared/e2eshared.go new file mode 100644 index 0000000000..bc7d7bb60e --- /dev/null +++ b/flow/e2eshared/e2eshared.go @@ -0,0 +1,42 @@ +package e2eshared + +import ( + "fmt" + "io" + "os" + "testing" + + "github.com/ysmood/got" +) + +func GotSuite[T interface{ TearDownSuite() }](setup func(t *testing.T, g got.G) T) func(t *testing.T) T { + return func(t *testing.T) T { + t.Helper() + g := got.New(t) + g.Parallel() + suite := setup(t, g) + g.Cleanup(func() { + suite.TearDownSuite() + }) + return suite + } +} + +// ReadFileToBytes reads a file to a byte array. +func ReadFileToBytes(path string) ([]byte, error) { + var ret []byte + + f, err := os.Open(path) + if err != nil { + return ret, fmt.Errorf("failed to open file: %w", err) + } + + defer f.Close() + + ret, err = io.ReadAll(f) + if err != nil { + return ret, fmt.Errorf("failed to read file: %w", err) + } + + return ret, nil +}