Skip to content

Commit

Permalink
e2eshared & remove testify/suite from cdc_records_storage_test.go
Browse files Browse the repository at this point in the history
Splitting up changes from #871
  • Loading branch information
serprex committed Dec 27, 2023
1 parent a8f8061 commit 5fa9717
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 105 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ jobs:

- name: run tests
run: |
gotestsum --format testname -- -p 8 ./... -timeout 1200s
gotestsum --format testname -- -p 16 ./... -timeout 1200s
working-directory: ./flow
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
Expand Down
61 changes: 30 additions & 31 deletions flow/connectors/utils/cdc_records/cdc_records_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,19 @@ package cdc_records

import (
"crypto/rand"
"testing"

"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/stretchr/testify/suite"
"go.temporal.io/sdk/testsuite"
"github.com/stretchr/testify/require"
)

type CDCRecordStorageTestSuite struct {
suite.Suite
testsuite.WorkflowTestSuite
}
func genKeyAndRec(t *testing.T) (model.TableWithPkey, model.Record) {
t.Helper()

func (s *CDCRecordStorageTestSuite) genKeyAndRec() (model.TableWithPkey, model.Record) {
pkeyColVal := make([]byte, 0, 32)
pkeyColVal := make([]byte, 32)
_, err := rand.Read(pkeyColVal)
s.NoError(err)
require.NoError(t, err)

key := model.TableWithPkey{
TableName: "test_src_tbl",
Expand All @@ -39,50 +36,52 @@ func (s *CDCRecordStorageTestSuite) genKeyAndRec() (model.TableWithPkey, model.R
return key, rec
}

func (s *CDCRecordStorageTestSuite) TestSingleRecord() {
func TestSingleRecord(t *testing.T) {
t.Parallel()
cdcRecordsStore := NewCDCRecordsStore("test_single_record")
cdcRecordsStore.numRecordsSwitchThreshold = 10

key, rec := s.genKeyAndRec()
key, rec := genKeyAndRec(t)
err := cdcRecordsStore.Set(key, rec)
s.NoError(err)
require.NoError(t, err)
// should not spill into DB
s.Equal(1, len(cdcRecordsStore.inMemoryRecords))
s.Nil(cdcRecordsStore.pebbleDB)
require.Equal(t, 1, len(cdcRecordsStore.inMemoryRecords))
require.Nil(t, cdcRecordsStore.pebbleDB)

reck, ok, err := cdcRecordsStore.Get(key)
s.NoError(err)
s.True(ok)
s.Equal(rec, reck)
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, rec, reck)

s.NoError(cdcRecordsStore.Close())
require.NoError(t, cdcRecordsStore.Close())
}

func (s *CDCRecordStorageTestSuite) TestRecordsTillSpill() {
func TestRecordsTillSpill(t *testing.T) {
t.Parallel()
cdcRecordsStore := NewCDCRecordsStore("test_records_till_spill")
cdcRecordsStore.numRecordsSwitchThreshold = 10

// add records upto set limit
for i := 0; i < 10; i++ {
key, rec := s.genKeyAndRec()
key, rec := genKeyAndRec(t)
err := cdcRecordsStore.Set(key, rec)
s.NoError(err)
s.Equal(i+1, len(cdcRecordsStore.inMemoryRecords))
s.Nil(cdcRecordsStore.pebbleDB)
require.NoError(t, err)
require.Equal(t, i+1, len(cdcRecordsStore.inMemoryRecords))
require.Nil(t, cdcRecordsStore.pebbleDB)
}

// this record should be spilled to DB
key, rec := s.genKeyAndRec()
key, rec := genKeyAndRec(t)
err := cdcRecordsStore.Set(key, rec)
s.NoError(err)
require.NoError(t, err)
_, ok := cdcRecordsStore.inMemoryRecords[key]
s.False(ok)
s.NotNil(cdcRecordsStore.pebbleDB)
require.False(t, ok)
require.NotNil(t, cdcRecordsStore.pebbleDB)

reck, ok, err := cdcRecordsStore.Get(key)
s.NoError(err)
s.True(ok)
s.Equal(rec, reck)
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, rec, reck)

s.NoError(cdcRecordsStore.Close())
require.NoError(t, cdcRecordsStore.Close())
}
4 changes: 2 additions & 2 deletions flow/e2e/bigquery/bigquery_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"cloud.google.com/go/bigquery"
"cloud.google.com/go/civil"
peer_bq "github.com/PeerDB-io/peer-flow/connectors/bigquery"
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
Expand Down Expand Up @@ -46,7 +46,7 @@ func NewBigQueryTestHelper() (*BigQueryTestHelper, error) {
return nil, fmt.Errorf("TEST_BQ_CREDS env var not set")
}

content, err := e2e.ReadFileToBytes(jsonPath)
content, err := e2eshared.ReadFileToBytes(jsonPath)
if err != nil {
return nil, fmt.Errorf("failed to read file: %w", err)
}
Expand Down
19 changes: 3 additions & 16 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
Expand All @@ -30,20 +31,7 @@ type PeerFlowE2ETestSuiteBQ struct {
}

func TestPeerFlowE2ETestSuiteBQ(t *testing.T) {
got.Each(t, func(t *testing.T) PeerFlowE2ETestSuiteBQ {
t.Helper()

g := got.New(t)
g.Parallel()

suite := setupSuite(t, g)

g.Cleanup(func() {
suite.tearDownSuite()
})

return suite
})
got.Each(t, e2eshared.GotSuite(setupSuite))
}

func (s PeerFlowE2ETestSuiteBQ) attachSchemaSuffix(tableName string) string {
Expand Down Expand Up @@ -148,8 +136,7 @@ func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteBQ {
}
}

// Implement TearDownAllSuite interface to tear down the test suite
func (s PeerFlowE2ETestSuiteBQ) tearDownSuite() {
func (s PeerFlowE2ETestSuiteBQ) TearDownSuite() {
err := e2e.TearDownPostgres(s.pool, s.bqSuffix)
if err != nil {
slog.Error("failed to tear down postgres", slog.Any("error", err))
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/s3/s3_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"time"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
Expand All @@ -34,7 +34,7 @@ func NewS3TestHelper(switchToGCS bool) (*S3TestHelper, error) {
bucketName = "peerdb_staging"
}

content, err := e2e.ReadFileToBytes(credsPath)
content, err := e2eshared.ReadFileToBytes(credsPath)
if err != nil {
return nil, fmt.Errorf("failed to read file: %w", err)
}
Expand Down
19 changes: 3 additions & 16 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
Expand All @@ -33,20 +34,7 @@ type PeerFlowE2ETestSuiteSF struct {
}

func TestPeerFlowE2ETestSuiteSF(t *testing.T) {
got.Each(t, func(t *testing.T) PeerFlowE2ETestSuiteSF {
t.Helper()

g := got.New(t)
g.Parallel()

suite := SetupSuite(t, g)

g.Cleanup(func() {
suite.tearDownSuite()
})

return suite
})
got.Each(t, e2eshared.GotSuite(SetupSuite))
}

func (s PeerFlowE2ETestSuiteSF) attachSchemaSuffix(tableName string) string {
Expand Down Expand Up @@ -101,8 +89,7 @@ func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSF {
return suite
}

// Implement TearDownAllSuite interface to tear down the test suite
func (s PeerFlowE2ETestSuiteSF) tearDownSuite() {
func (s PeerFlowE2ETestSuiteSF) TearDownSuite() {
err := e2e.TearDownPostgres(s.pool, s.pgSuffix)
if err != nil {
slog.Error("failed to tear down Postgres", slog.Any("error", err))
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/snowflake/snowflake_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"time"

connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
Expand Down Expand Up @@ -37,7 +37,7 @@ func NewSnowflakeTestHelper() (*SnowflakeTestHelper, error) {
return nil, fmt.Errorf("TEST_SF_CREDS env var not set")
}

content, err := e2e.ReadFileToBytes(jsonPath)
content, err := e2eshared.ReadFileToBytes(jsonPath)
if err != nil {
return nil, fmt.Errorf("failed to read file: %w", err)
}
Expand Down
18 changes: 3 additions & 15 deletions flow/e2e/snowflake/snowflake_schema_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/ysmood/got"
Expand Down Expand Up @@ -58,7 +59,7 @@ func setupSchemaDeltaSuite(
}
}

func (suite SnowflakeSchemaDeltaTestSuite) tearDownSuite() {
func (suite SnowflakeSchemaDeltaTestSuite) TearDownSuite() {
err := suite.sfTestHelper.Cleanup()
suite.failTestError(err)
err = suite.connector.Close()
Expand Down Expand Up @@ -222,18 +223,5 @@ func (suite SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() {
}

func TestSnowflakeSchemaDeltaTestSuite(t *testing.T) {
got.Each(t, func(t *testing.T) SnowflakeSchemaDeltaTestSuite {
t.Helper()

g := got.New(t)
g.Parallel()

suite := setupSchemaDeltaSuite(t, g)

g.Cleanup(func() {
suite.tearDownSuite()
})

return suite
})
got.Each(t, e2eshared.GotSuite(setupSchemaDeltaSuite))
}
20 changes: 0 additions & 20 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"os"
"strings"
Expand All @@ -24,25 +23,6 @@ import (
"go.temporal.io/sdk/testsuite"
)

// ReadFileToBytes reads a file to a byte array.
func ReadFileToBytes(path string) ([]byte, error) {
var ret []byte

f, err := os.Open(path)
if err != nil {
return ret, fmt.Errorf("failed to open file: %w", err)
}

defer f.Close()

ret, err = io.ReadAll(f)
if err != nil {
return ret, fmt.Errorf("failed to read file: %w", err)
}

return ret, nil
}

func RegisterWorkflowsAndActivities(t *testing.T, env *testsuite.TestWorkflowEnvironment) {
t.Helper()

Expand Down
42 changes: 42 additions & 0 deletions flow/e2eshared/e2eshared.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package e2eshared

import (
"fmt"
"io"
"os"
"testing"

"github.com/ysmood/got"
)

func GotSuite[T interface{ TearDownSuite() }](setup func(t *testing.T, g got.G) T) func(t *testing.T) T {
return func(t *testing.T) T {
t.Helper()
g := got.New(t)
g.Parallel()
suite := setup(t, g)
g.Cleanup(func() {
suite.TearDownSuite()
})
return suite
}
}

// ReadFileToBytes reads a file to a byte array.
func ReadFileToBytes(path string) ([]byte, error) {
var ret []byte

f, err := os.Open(path)
if err != nil {
return ret, fmt.Errorf("failed to open file: %w", err)
}

defer f.Close()

ret, err = io.ReadAll(f)
if err != nil {
return ret, fmt.Errorf("failed to read file: %w", err)
}

return ret, nil
}

0 comments on commit 5fa9717

Please sign in to comment.