diff --git a/flow/connectors/core.go b/flow/connectors/core.go index a8e39b67b1..6adb3a2429 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -27,11 +27,16 @@ type Connector interface { ConnectionActive(context.Context) error } -type CDCPullConnector interface { +type GetTableSchemaConnector interface { Connector // GetTableSchema returns the schema of a table. GetTableSchema(ctx context.Context, req *protos.GetTableSchemaBatchInput) (*protos.GetTableSchemaBatchOutput, error) +} + +type CDCPullConnector interface { + Connector + GetTableSchemaConnector // EnsurePullability ensures that the connector is pullable. EnsurePullability(ctx context.Context, req *protos.EnsurePullabilityBatchInput) ( @@ -255,6 +260,9 @@ var ( _ CDCNormalizeConnector = &connsnowflake.SnowflakeConnector{} _ CDCNormalizeConnector = &connclickhouse.ClickhouseConnector{} + _ GetTableSchemaConnector = &connpostgres.PostgresConnector{} + _ GetTableSchemaConnector = &connsnowflake.SnowflakeConnector{} + _ NormalizedTablesConnector = &connpostgres.PostgresConnector{} _ NormalizedTablesConnector = &connbigquery.BigQueryConnector{} _ NormalizedTablesConnector = &connsnowflake.SnowflakeConnector{} diff --git a/flow/connectors/snowflake/get_schema_for_tests.go b/flow/connectors/snowflake/get_schema_for_tests.go index 05631e635f..476f16f165 100644 --- a/flow/connectors/snowflake/get_schema_for_tests.go +++ b/flow/connectors/snowflake/get_schema_for_tests.go @@ -3,7 +3,6 @@ package connsnowflake import ( "context" - "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" ) @@ -47,7 +46,6 @@ func (c *SnowflakeConnector) GetTableSchema( return nil, err } res[tableName] = tableSchema - utils.RecordHeartbeat(ctx, "fetched schema for table "+tableName) } return &protos.GetTableSchemaBatchOutput{ diff --git a/flow/e2e/bigquery/bigquery.go b/flow/e2e/bigquery/bigquery.go index 73f5c38d6e..1e2a3842ee 100644 --- a/flow/e2e/bigquery/bigquery.go +++ b/flow/e2e/bigquery/bigquery.go @@ -8,6 +8,7 @@ import ( "github.com/jackc/pgx/v5" + "github.com/PeerDB-io/peer-flow/connectors" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -35,6 +36,11 @@ func (s PeerFlowE2ETestSuiteBQ) Connector() *connpostgres.PostgresConnector { return s.conn } +func (s PeerFlowE2ETestSuiteBQ) DestinationConnector() connectors.Connector { + // TODO have BQ connector + return nil +} + func (s PeerFlowE2ETestSuiteBQ) Suffix() string { return s.bqSuffix } diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index ec28b5f97b..3dff6e310e 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -643,7 +643,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { e2e.RequireEnvCanceled(s.t, env) } -// TODO: not checking schema exactly, add later +// TODO: not checking schema exactly +// write a GetTableSchemaConnector for BQ to enable generic_test func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { tc := e2e.NewTemporalClient(s.t) diff --git a/flow/e2e/generic/generic_test.go b/flow/e2e/generic/generic_test.go new file mode 100644 index 0000000000..97e64f5ed3 --- /dev/null +++ b/flow/e2e/generic/generic_test.go @@ -0,0 +1,304 @@ +package e2e_generic + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/PeerDB-io/peer-flow/connectors" + "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2e/bigquery" + "github.com/PeerDB-io/peer-flow/e2e/postgres" + "github.com/PeerDB-io/peer-flow/e2e/snowflake" + "github.com/PeerDB-io/peer-flow/e2eshared" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model/qvalue" + peerflow "github.com/PeerDB-io/peer-flow/workflows" +) + +func TestGenericPG(t *testing.T) { + e2eshared.RunSuite(t, SetupGenericSuite(e2e_postgres.SetupSuite)) +} + +func TestGenericSF(t *testing.T) { + e2eshared.RunSuite(t, SetupGenericSuite(e2e_snowflake.SetupSuite)) +} + +func TestGenericBQ(t *testing.T) { + e2eshared.RunSuite(t, SetupGenericSuite(e2e_bigquery.SetupSuite)) +} + +type Generic struct { + e2e.GenericSuite +} + +func SetupGenericSuite[T e2e.GenericSuite](f func(t *testing.T) T) func(t *testing.T) Generic { + return func(t *testing.T) Generic { + t.Helper() + return Generic{f(t)} + } +} + +func (s Generic) Test_Simple_Flow() { + t := s.T() + srcTable := "test_simple" + dstTable := "test_simple_dst" + srcSchemaTable := e2e.AttachSchema(s, srcTable) + + _, err := s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id SERIAL PRIMARY KEY, + key TEXT NOT NULL, + value TEXT NOT NULL, + myh HSTORE NOT NULL + ); + `, srcSchemaTable)) + require.NoError(t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: e2e.AddSuffix(s, "test_simple"), + TableMappings: e2e.TableMappings(s, srcTable, dstTable), + Destination: s.Peer(), + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() + + tc := e2e.NewTemporalClient(t) + env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + + e2e.SetupCDCFlowStatusQuery(t, env, connectionGen) + // insert 10 rows into the source table + for i := range 10 { + testKey := fmt.Sprintf("test_key_%d", i) + testValue := fmt.Sprintf("test_value_%d", i) + _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(key, value, myh) VALUES ($1, $2, '"a"=>"b"') + `, srcSchemaTable), testKey, testValue) + e2e.EnvNoError(t, env, err) + } + t.Log("Inserted 10 rows into the source table") + + e2e.EnvWaitForEqualTablesWithNames(env, s, "normalizing 10 rows", srcTable, dstTable, `id,key,value,myh`) + env.Cancel() + e2e.RequireEnvCanceled(t, env) +} + +func (s Generic) Test_Simple_Schema_Changes() { + t := s.T() + + destinationSchemaConnector, ok := s.DestinationConnector().(connectors.GetTableSchemaConnector) + if !ok { + t.SkipNow() + } + + srcTable := "test_simple_schema_changes" + dstTable := "test_simple_schema_changes_dst" + srcTableName := e2e.AttachSchema(s, srcTable) + dstTableName := s.DestinationTable(dstTable) + + _, err := s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + c1 BIGINT + ); + `, srcTableName)) + require.NoError(t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: e2e.AddSuffix(s, srcTable), + TableMappings: e2e.TableMappings(s, srcTable, dstTable), + Destination: s.Peer(), + } + + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() + + // wait for PeerFlowStatusQuery to finish setup + // and then insert and mutate schema repeatedly. + tc := e2e.NewTemporalClient(t) + env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + e2e.SetupCDCFlowStatusQuery(t, env, connectionGen) + _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) + e2e.EnvNoError(t, env, err) + t.Log("Inserted initial row in the source table") + + e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize reinsert", srcTable, dstTable, "id,c1") + + expectedTableSchema := &protos.TableSchema{ + TableIdentifier: e2e.ExpectedDestinationTableName(s, dstTable), + Columns: []*protos.FieldDescription{ + { + Name: e2e.ExpectedDestinationIdentifier(s, "id"), + Type: string(qvalue.QValueKindNumeric), + TypeModifier: -1, + }, + { + Name: e2e.ExpectedDestinationIdentifier(s, "c1"), + Type: string(qvalue.QValueKindNumeric), + TypeModifier: -1, + }, + { + Name: "_PEERDB_IS_DELETED", + Type: string(qvalue.QValueKindBoolean), + TypeModifier: -1, + }, + { + Name: "_PEERDB_SYNCED_AT", + Type: string(qvalue.QValueKindTimestamp), + TypeModifier: -1, + }, + }, + } + output, err := destinationSchemaConnector.GetTableSchema(context.Background(), &protos.GetTableSchemaBatchInput{ + TableIdentifiers: []string{dstTableName}, + }) + e2e.EnvNoError(t, env, err) + e2e.EnvTrue(t, env, e2e.CompareTableSchemas(expectedTableSchema, output.TableNameSchemaMapping[dstTableName])) + + // alter source table, add column c2 and insert another row. + _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) + e2e.EnvNoError(t, env, err) + t.Log("Altered source table, added column c2") + _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) + e2e.EnvNoError(t, env, err) + t.Log("Inserted row with added c2 in the source table") + + // verify we got our two rows, if schema did not match up it will error. + e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize altered row", srcTable, dstTable, "id,c1,c2") + expectedTableSchema = &protos.TableSchema{ + TableIdentifier: e2e.ExpectedDestinationTableName(s, dstTable), + Columns: []*protos.FieldDescription{ + { + Name: e2e.ExpectedDestinationIdentifier(s, "id"), + Type: string(qvalue.QValueKindNumeric), + TypeModifier: -1, + }, + { + Name: e2e.ExpectedDestinationIdentifier(s, "c1"), + Type: string(qvalue.QValueKindNumeric), + TypeModifier: -1, + }, + { + Name: "_PEERDB_SYNCED_AT", + Type: string(qvalue.QValueKindTimestamp), + TypeModifier: -1, + }, + { + Name: e2e.ExpectedDestinationIdentifier(s, "c2"), + Type: string(qvalue.QValueKindNumeric), + TypeModifier: -1, + }, + }, + } + output, err = destinationSchemaConnector.GetTableSchema(context.Background(), &protos.GetTableSchemaBatchInput{ + TableIdentifiers: []string{dstTableName}, + }) + e2e.EnvNoError(t, env, err) + e2e.EnvTrue(t, env, e2e.CompareTableSchemas(expectedTableSchema, output.TableNameSchemaMapping[dstTableName])) + e2e.EnvEqualTablesWithNames(env, s, srcTable, dstTable, "id,c1,c2") + + // alter source table, add column c3, drop column c2 and insert another row. + _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) + e2e.EnvNoError(t, env, err) + t.Log("Altered source table, dropped column c2 and added column c3") + _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) + e2e.EnvNoError(t, env, err) + t.Log("Inserted row with added c3 in the source table") + + // verify we got our two rows, if schema did not match up it will error. + e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize dropped c2 column", srcTable, dstTable, "id,c1,c3") + expectedTableSchema = &protos.TableSchema{ + TableIdentifier: e2e.ExpectedDestinationTableName(s, dstTable), + Columns: []*protos.FieldDescription{ + { + Name: e2e.ExpectedDestinationIdentifier(s, "id"), + Type: string(qvalue.QValueKindNumeric), + TypeModifier: -1, + }, + { + Name: e2e.ExpectedDestinationIdentifier(s, "c1"), + Type: string(qvalue.QValueKindNumeric), + TypeModifier: -1, + }, + { + Name: "_PEERDB_SYNCED_AT", + Type: string(qvalue.QValueKindTimestamp), + TypeModifier: -1, + }, + { + Name: e2e.ExpectedDestinationIdentifier(s, "c2"), + Type: string(qvalue.QValueKindNumeric), + TypeModifier: -1, + }, + { + Name: e2e.ExpectedDestinationIdentifier(s, "c3"), + Type: string(qvalue.QValueKindNumeric), + TypeModifier: -1, + }, + }, + } + output, err = destinationSchemaConnector.GetTableSchema(context.Background(), &protos.GetTableSchemaBatchInput{ + TableIdentifiers: []string{dstTableName}, + }) + e2e.EnvNoError(t, env, err) + e2e.EnvTrue(t, env, e2e.CompareTableSchemas(expectedTableSchema, output.TableNameSchemaMapping[dstTableName])) + e2e.EnvEqualTablesWithNames(env, s, srcTable, dstTable, "id,c1,c3") + + // alter source table, drop column c3 and insert another row. + _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + ALTER TABLE %s DROP COLUMN c3`, srcTableName)) + e2e.EnvNoError(t, env, err) + t.Log("Altered source table, dropped column c3") + _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) + e2e.EnvNoError(t, env, err) + t.Log("Inserted row after dropping all columns in the source table") + + // verify we got our two rows, if schema did not match up it will error. + e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize dropped c3 column", srcTable, dstTable, "id,c1") + expectedTableSchema = &protos.TableSchema{ + TableIdentifier: e2e.ExpectedDestinationTableName(s, dstTable), + Columns: []*protos.FieldDescription{ + { + Name: e2e.ExpectedDestinationIdentifier(s, "id"), + Type: string(qvalue.QValueKindNumeric), + TypeModifier: -1, + }, + { + Name: e2e.ExpectedDestinationIdentifier(s, "c1"), + Type: string(qvalue.QValueKindNumeric), + TypeModifier: -1, + }, + { + Name: "_PEERDB_SYNCED_AT", + Type: string(qvalue.QValueKindTimestamp), + TypeModifier: -1, + }, + { + Name: e2e.ExpectedDestinationIdentifier(s, "c2"), + Type: string(qvalue.QValueKindNumeric), + TypeModifier: -1, + }, + { + Name: e2e.ExpectedDestinationIdentifier(s, "c3"), + Type: string(qvalue.QValueKindNumeric), + TypeModifier: -1, + }, + }, + } + output, err = destinationSchemaConnector.GetTableSchema(context.Background(), &protos.GetTableSchemaBatchInput{ + TableIdentifiers: []string{dstTableName}, + }) + e2e.EnvNoError(t, env, err) + e2e.EnvTrue(t, env, e2e.CompareTableSchemas(expectedTableSchema, output.TableNameSchemaMapping[dstTableName])) + e2e.EnvEqualTablesWithNames(env, s, srcTable, dstTable, "id,c1") + + env.Cancel() + + e2e.RequireEnvCanceled(t, env) +} diff --git a/flow/e2e/generic/peer_flow_test.go b/flow/e2e/generic/peer_flow_test.go deleted file mode 100644 index 20c5847df4..0000000000 --- a/flow/e2e/generic/peer_flow_test.go +++ /dev/null @@ -1,82 +0,0 @@ -package e2e_generic - -import ( - "context" - "fmt" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/PeerDB-io/peer-flow/e2e" - "github.com/PeerDB-io/peer-flow/e2e/bigquery" - "github.com/PeerDB-io/peer-flow/e2e/postgres" - "github.com/PeerDB-io/peer-flow/e2e/snowflake" - "github.com/PeerDB-io/peer-flow/e2eshared" - peerflow "github.com/PeerDB-io/peer-flow/workflows" -) - -func TestGenericPG(t *testing.T) { - e2eshared.RunSuite(t, SetupGenericSuite(e2e_postgres.SetupSuite)) -} - -func TestGenericSF(t *testing.T) { - e2eshared.RunSuite(t, SetupGenericSuite(e2e_snowflake.SetupSuite)) -} - -func TestGenericBQ(t *testing.T) { - e2eshared.RunSuite(t, SetupGenericSuite(e2e_bigquery.SetupSuite)) -} - -type Generic struct { - e2e.GenericSuite -} - -func SetupGenericSuite[T e2e.GenericSuite](f func(t *testing.T) T) func(t *testing.T) Generic { - return func(t *testing.T) Generic { - t.Helper() - return Generic{f(t)} - } -} - -func (s Generic) Test_Simple_Flow() { - t := s.T() - srcTable := "test_simple" - dstTable := "test_simple_dst" - srcSchemaTable := e2e.AttachSchema(s, srcTable) - - _, err := s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS %s ( - id SERIAL PRIMARY KEY, - key TEXT NOT NULL, - value TEXT NOT NULL, - myh HSTORE NOT NULL - ); - `, srcSchemaTable)) - require.NoError(t, err) - - connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: e2e.AddSuffix(s, "test_simple"), - TableMappings: e2e.TableMappings(s, srcTable, dstTable), - Destination: s.Peer(), - } - flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - tc := e2e.NewTemporalClient(t) - env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) - - e2e.SetupCDCFlowStatusQuery(t, env, connectionGen) - // insert 10 rows into the source table - for i := range 10 { - testKey := fmt.Sprintf("test_key_%d", i) - testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s(key, value, myh) VALUES ($1, $2, '"a"=>"b"') - `, srcSchemaTable), testKey, testValue) - e2e.EnvNoError(t, env, err) - } - t.Log("Inserted 10 rows into the source table") - - e2e.EnvWaitForEqualTablesWithNames(env, s, "normalizing 10 rows", srcTable, dstTable, `id,key,value,myh`) - env.Cancel() - e2e.RequireEnvCanceled(t, env) -} diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 2e69376b01..eeec5e373d 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -17,7 +17,6 @@ import ( "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" ) @@ -51,32 +50,6 @@ func (s PeerFlowE2ETestSuitePG) checkPeerdbColumns(dstSchemaQualified string, ro return nil } -func (s PeerFlowE2ETestSuitePG) WaitForSchema( - env e2e.WorkflowRun, - reason string, - srcTableName string, - dstTableName string, - cols string, - expectedSchema *protos.TableSchema, -) { - s.t.Helper() - e2e.EnvWaitFor(s.t, env, 3*time.Minute, reason, func() bool { - s.t.Helper() - output, err := s.conn.GetTableSchema(context.Background(), &protos.GetTableSchemaBatchInput{ - TableIdentifiers: []string{dstTableName}, - }) - if err != nil { - return false - } - tableSchema := output.TableNameSchemaMapping[dstTableName] - if !e2e.CompareTableSchemas(expectedSchema, tableSchema) { - s.t.Log("schemas unequal", expectedSchema, tableSchema) - return false - } - return s.comparePGTables(srcTableName, dstTableName, cols) == nil - }) -} - func (s PeerFlowE2ETestSuitePG) Test_Geospatial_PG() { srcTableName := s.attachSchemaSuffix("test_geospatial_pg") dstTableName := s.attachSchemaSuffix("test_geospatial_pg_dst") @@ -224,188 +197,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Enums_PG() { e2e.RequireEnvCanceled(s.t, env) } -func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { - tc := e2e.NewTemporalClient(s.t) - - srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") - dstTableName := s.attachSchemaSuffix("test_simple_schema_changes_dst") - - _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS %s ( - id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, - c1 BIGINT - ); - `, srcTableName)) - require.NoError(s.t, err) - - connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_simple_schema_changes"), - TableNameMapping: map[string]string{srcTableName: dstTableName}, - Destination: s.peer, - } - - flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - flowConnConfig.MaxBatchSize = 1 - - // wait for PeerFlowStatusQuery to finish setup - // and then insert and mutate schema repeatedly. - env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) - e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - - // insert first row. - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) - e2e.EnvNoError(s.t, env, err) - s.t.Log("Inserted initial row in the source table") - - s.WaitForSchema(env, "normalizing first row", srcTableName, dstTableName, "id,c1", &protos.TableSchema{ - TableIdentifier: dstTableName, - PrimaryKeyColumns: []string{"id"}, - Columns: []*protos.FieldDescription{ - { - Name: "id", - Type: string(qvalue.QValueKindInt64), - TypeModifier: -1, - }, - { - Name: "c1", - Type: string(qvalue.QValueKindInt64), - TypeModifier: -1, - }, - { - Name: "_PEERDB_SYNCED_AT", - Type: string(qvalue.QValueKindTimestamp), - TypeModifier: -1, - }, - }, - }) - - // alter source table, add column c2 and insert another row. - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) - e2e.EnvNoError(s.t, env, err) - s.t.Log("Altered source table, added column c2") - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) - e2e.EnvNoError(s.t, env, err) - s.t.Log("Inserted row with added c2 in the source table") - - s.WaitForSchema(env, "normalizing altered row", srcTableName, dstTableName, "id,c1,c2", &protos.TableSchema{ - TableIdentifier: dstTableName, - PrimaryKeyColumns: []string{"id"}, - Columns: []*protos.FieldDescription{ - { - Name: "id", - Type: string(qvalue.QValueKindInt64), - TypeModifier: -1, - }, - { - Name: "c1", - Type: string(qvalue.QValueKindInt64), - TypeModifier: -1, - }, - { - Name: "_PEERDB_SYNCED_AT", - Type: string(qvalue.QValueKindTimestamp), - TypeModifier: -1, - }, - { - Name: "c2", - Type: string(qvalue.QValueKindInt64), - TypeModifier: -1, - }, - }, - }) - - // alter source table, add column c3, drop column c2 and insert another row. - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) - e2e.EnvNoError(s.t, env, err) - s.t.Log("Altered source table, dropped column c2 and added column c3") - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) - e2e.EnvNoError(s.t, env, err) - s.t.Log("Inserted row with added c3 in the source table") - - s.WaitForSchema(env, "normalizing dropped column row", srcTableName, dstTableName, "id,c1,c3", &protos.TableSchema{ - TableIdentifier: dstTableName, - PrimaryKeyColumns: []string{"id"}, - Columns: []*protos.FieldDescription{ - { - Name: "id", - Type: string(qvalue.QValueKindInt64), - TypeModifier: -1, - }, - { - Name: "c1", - Type: string(qvalue.QValueKindInt64), - TypeModifier: -1, - }, - { - Name: "c2", - Type: string(qvalue.QValueKindInt64), - TypeModifier: -1, - }, - { - Name: "_PEERDB_SYNCED_AT", - Type: string(qvalue.QValueKindTimestamp), - TypeModifier: -1, - }, - { - Name: "c3", - Type: string(qvalue.QValueKindInt64), - TypeModifier: -1, - }, - }, - }) - - // alter source table, drop column c3 and insert another row. - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - ALTER TABLE %s DROP COLUMN c3`, srcTableName)) - e2e.EnvNoError(s.t, env, err) - s.t.Log("Altered source table, dropped column c3") - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) - e2e.EnvNoError(s.t, env, err) - s.t.Log("Inserted row after dropping all columns in the source table") - - s.WaitForSchema(env, "normalizing 2nd dropped column row", srcTableName, dstTableName, "id,c1", &protos.TableSchema{ - TableIdentifier: dstTableName, - PrimaryKeyColumns: []string{"id"}, - Columns: []*protos.FieldDescription{ - { - Name: "id", - Type: string(qvalue.QValueKindInt64), - TypeModifier: -1, - }, - { - Name: "c1", - Type: string(qvalue.QValueKindInt64), - TypeModifier: -1, - }, - { - Name: "_PEERDB_SYNCED_AT", - Type: string(qvalue.QValueKindTimestamp), - TypeModifier: -1, - }, - { - Name: "c2", - Type: string(qvalue.QValueKindInt64), - TypeModifier: -1, - }, - { - Name: "c3", - Type: string(qvalue.QValueKindInt64), - TypeModifier: -1, - }, - }, - }) - - env.Cancel() - - e2e.RequireEnvCanceled(s.t, env) -} - func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { tc := e2e.NewTemporalClient(s.t) diff --git a/flow/e2e/postgres/postgres.go b/flow/e2e/postgres/postgres.go index 23ca778c8d..8eafd6ade0 100644 --- a/flow/e2e/postgres/postgres.go +++ b/flow/e2e/postgres/postgres.go @@ -9,6 +9,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/stretchr/testify/require" + "github.com/PeerDB-io/peer-flow/connectors" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -32,6 +33,10 @@ func (s PeerFlowE2ETestSuitePG) Connector() *connpostgres.PostgresConnector { return s.conn } +func (s PeerFlowE2ETestSuitePG) DestinationConnector() connectors.Connector { + return s.conn +} + func (s PeerFlowE2ETestSuitePG) Conn() *pgx.Conn { return s.conn.Conn() } diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 525d2c7256..56084a1a27 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "strings" "testing" "time" @@ -16,7 +15,6 @@ import ( "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/model/qvalue" peerflow "github.com/PeerDB-io/peer-flow/workflows" ) @@ -516,218 +514,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { e2e.RequireEnvCanceled(s.t, env) } -func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { - tc := e2e.NewTemporalClient(s.t) - - srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") - dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_simple_schema_changes") - - _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS %s ( - id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, - c1 BIGINT - ); - `, srcTableName)) - require.NoError(s.t, err) - - connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_simple_schema_changes"), - TableNameMapping: map[string]string{srcTableName: dstTableName}, - Destination: s.sfHelper.Peer, - } - - flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - flowConnConfig.MaxBatchSize = 100 - - // wait for PeerFlowStatusQuery to finish setup - // and then insert and mutate schema repeatedly. - env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) - e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) - e2e.EnvNoError(s.t, env, err) - s.t.Log("Inserted initial row in the source table") - - e2e.EnvWaitForEqualTables(env, s, "normalize reinsert", "test_simple_schema_changes", "id,c1") - - expectedTableSchema := &protos.TableSchema{ - TableIdentifier: strings.ToUpper(dstTableName), - Columns: []*protos.FieldDescription{ - { - Name: "ID", - Type: string(qvalue.QValueKindNumeric), - TypeModifier: -1, - }, - { - Name: "C1", - Type: string(qvalue.QValueKindNumeric), - TypeModifier: -1, - }, - { - Name: "_PEERDB_IS_DELETED", - Type: string(qvalue.QValueKindBoolean), - TypeModifier: -1, - }, - { - Name: "_PEERDB_SYNCED_AT", - Type: string(qvalue.QValueKindTimestamp), - TypeModifier: -1, - }, - }, - } - output, err := s.connector.GetTableSchema(context.Background(), &protos.GetTableSchemaBatchInput{ - TableIdentifiers: []string{dstTableName}, - }) - e2e.EnvNoError(s.t, env, err) - e2e.EnvTrue(s.t, env, e2e.CompareTableSchemas(expectedTableSchema, output.TableNameSchemaMapping[dstTableName])) - - // alter source table, add column c2 and insert another row. - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) - e2e.EnvNoError(s.t, env, err) - s.t.Log("Altered source table, added column c2") - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) - e2e.EnvNoError(s.t, env, err) - s.t.Log("Inserted row with added c2 in the source table") - - // verify we got our two rows, if schema did not match up it will error. - e2e.EnvWaitForEqualTables(env, s, "normalize altered row", "test_simple_schema_changes", "id,c1,c2") - expectedTableSchema = &protos.TableSchema{ - TableIdentifier: strings.ToUpper(dstTableName), - Columns: []*protos.FieldDescription{ - { - Name: "ID", - Type: string(qvalue.QValueKindNumeric), - TypeModifier: -1, - }, - { - Name: "C1", - Type: string(qvalue.QValueKindNumeric), - TypeModifier: -1, - }, - { - Name: "_PEERDB_SYNCED_AT", - Type: string(qvalue.QValueKindTimestamp), - TypeModifier: -1, - }, - { - Name: "C2", - Type: string(qvalue.QValueKindNumeric), - TypeModifier: -1, - }, - }, - } - output, err = s.connector.GetTableSchema(context.Background(), &protos.GetTableSchemaBatchInput{ - TableIdentifiers: []string{dstTableName}, - }) - e2e.EnvNoError(s.t, env, err) - e2e.EnvTrue(s.t, env, e2e.CompareTableSchemas(expectedTableSchema, output.TableNameSchemaMapping[dstTableName])) - e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1,c2") - - // alter source table, add column c3, drop column c2 and insert another row. - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) - e2e.EnvNoError(s.t, env, err) - s.t.Log("Altered source table, dropped column c2 and added column c3") - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) - e2e.EnvNoError(s.t, env, err) - s.t.Log("Inserted row with added c3 in the source table") - - // verify we got our two rows, if schema did not match up it will error. - e2e.EnvWaitForEqualTables(env, s, "normalize dropped c2 column", "test_simple_schema_changes", "id,c1,c3") - expectedTableSchema = &protos.TableSchema{ - TableIdentifier: strings.ToUpper(dstTableName), - Columns: []*protos.FieldDescription{ - { - Name: "ID", - Type: string(qvalue.QValueKindNumeric), - TypeModifier: -1, - }, - { - Name: "C1", - Type: string(qvalue.QValueKindNumeric), - TypeModifier: -1, - }, - { - Name: "_PEERDB_SYNCED_AT", - Type: string(qvalue.QValueKindTimestamp), - TypeModifier: -1, - }, - { - Name: "C2", - Type: string(qvalue.QValueKindNumeric), - TypeModifier: -1, - }, - { - Name: "C3", - Type: string(qvalue.QValueKindNumeric), - TypeModifier: -1, - }, - }, - } - output, err = s.connector.GetTableSchema(context.Background(), &protos.GetTableSchemaBatchInput{ - TableIdentifiers: []string{dstTableName}, - }) - e2e.EnvNoError(s.t, env, err) - e2e.EnvTrue(s.t, env, e2e.CompareTableSchemas(expectedTableSchema, output.TableNameSchemaMapping[dstTableName])) - e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1,c3") - - // alter source table, drop column c3 and insert another row. - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - ALTER TABLE %s DROP COLUMN c3`, srcTableName)) - e2e.EnvNoError(s.t, env, err) - s.t.Log("Altered source table, dropped column c3") - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) - e2e.EnvNoError(s.t, env, err) - s.t.Log("Inserted row after dropping all columns in the source table") - - // verify we got our two rows, if schema did not match up it will error. - e2e.EnvWaitForEqualTables(env, s, "normalize dropped c3 column", "test_simple_schema_changes", "id,c1") - expectedTableSchema = &protos.TableSchema{ - TableIdentifier: strings.ToUpper(dstTableName), - Columns: []*protos.FieldDescription{ - { - Name: "ID", - Type: string(qvalue.QValueKindNumeric), - TypeModifier: -1, - }, - { - Name: "C1", - Type: string(qvalue.QValueKindNumeric), - TypeModifier: -1, - }, - { - Name: "_PEERDB_SYNCED_AT", - Type: string(qvalue.QValueKindTimestamp), - TypeModifier: -1, - }, - { - Name: "C2", - Type: string(qvalue.QValueKindNumeric), - TypeModifier: -1, - }, - { - Name: "C3", - Type: string(qvalue.QValueKindNumeric), - TypeModifier: -1, - }, - }, - } - output, err = s.connector.GetTableSchema(context.Background(), &protos.GetTableSchemaBatchInput{ - TableIdentifiers: []string{dstTableName}, - }) - e2e.EnvNoError(s.t, env, err) - e2e.EnvTrue(s.t, env, e2e.CompareTableSchemas(expectedTableSchema, output.TableNameSchemaMapping[dstTableName])) - e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1") - - env.Cancel() - - e2e.RequireEnvCanceled(s.t, env) -} - func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { tc := e2e.NewTemporalClient(s.t) diff --git a/flow/e2e/snowflake/snowflake.go b/flow/e2e/snowflake/snowflake.go index 45132ef601..06c46d1046 100644 --- a/flow/e2e/snowflake/snowflake.go +++ b/flow/e2e/snowflake/snowflake.go @@ -10,6 +10,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/stretchr/testify/require" + "github.com/PeerDB-io/peer-flow/connectors" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/e2e" @@ -35,6 +36,10 @@ func (s PeerFlowE2ETestSuiteSF) Connector() *connpostgres.PostgresConnector { return s.conn } +func (s PeerFlowE2ETestSuiteSF) DestinationConnector() connectors.Connector { + return s.connector +} + func (s PeerFlowE2ETestSuiteSF) Conn() *pgx.Conn { return s.Connector().Conn() } diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index c018f32df2..dcaef74291 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -22,6 +22,7 @@ import ( "go.temporal.io/sdk/converter" "go.temporal.io/sdk/temporal" + "github.com/PeerDB-io/peer-flow/connectors" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/connectors/utils" @@ -55,6 +56,7 @@ type RowSource interface { type GenericSuite interface { RowSource Peer() *protos.Peer + DestinationConnector() connectors.Connector DestinationTable(table string) string } @@ -112,13 +114,17 @@ func RequireEqualTables(suite RowSource, table string, cols string) { } func EnvEqualTables(env WorkflowRun, suite RowSource, table string, cols string) { + EnvEqualTablesWithNames(env, suite, table, table, cols) +} + +func EnvEqualTablesWithNames(env WorkflowRun, suite RowSource, srcTable string, dstTable string, cols string) { t := suite.T() t.Helper() - pgRows, err := GetPgRows(suite.Connector(), suite.Suffix(), table, cols) + pgRows, err := GetPgRows(suite.Connector(), suite.Suffix(), srcTable, cols) EnvNoError(t, env, err) - rows, err := suite.GetRows(table, cols) + rows, err := suite.GetRows(dstTable, cols) EnvNoError(t, env, err) EnvEqualRecordBatches(t, env, pgRows, rows) @@ -519,6 +525,19 @@ func GetOwnersSelectorStringsSF() [2]string { return [2]string{strings.Join(pgFields, ","), strings.Join(sfFields, ",")} } +func ExpectedDestinationIdentifier(s GenericSuite, ident string) string { + switch s.DestinationConnector().(type) { + case *connsnowflake.SnowflakeConnector: + return strings.ToUpper(ident) + default: + return ident + } +} + +func ExpectedDestinationTableName(s GenericSuite, table string) string { + return ExpectedDestinationIdentifier(s, s.DestinationTable(table)) +} + type testWriter struct { *testing.T }