diff --git a/flow/e2e/generic/generic_test.go b/flow/e2e/generic/generic_test.go index bded0a6d89..2b0d97e74b 100644 --- a/flow/e2e/generic/generic_test.go +++ b/flow/e2e/generic/generic_test.go @@ -299,3 +299,54 @@ func (s Generic) Test_Simple_Schema_Changes() { e2e.RequireEnvCanceled(t, env) } + +func (s Generic) Test_Partitioned_Table() { + t := s.T() + srcTable := "test_partition" + dstTable := "test_partition_dst" + srcSchemaTable := e2e.AttachSchema(s, srcTable) + + _, err := s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE %[1]s( + id SERIAL NOT NULL, + name TEXT, + created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT now(), + PRIMARY KEY (created_at, id) + ) PARTITION BY RANGE(created_at); + CREATE TABLE %[1]s_2024q1 + PARTITION OF %[1]s + FOR VALUES FROM ('2024-01-01') TO ('2024-04-01'); + CREATE TABLE %[1]s_2024q2 + PARTITION OF %[1]s + FOR VALUES FROM ('2024-04-01') TO ('2024-07-01'); + CREATE TABLE %[1]s_2024q3 + PARTITION OF %[1]s + FOR VALUES FROM ('2024-07-01') TO ('2024-10-01'); + `, srcSchemaTable)) + require.NoError(t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: e2e.AddSuffix(s, "test_partition"), + TableMappings: e2e.TableMappings(s, srcTable, dstTable), + Destination: s.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(t) + + tc := e2e.NewTemporalClient(t) + env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + + e2e.SetupCDCFlowStatusQuery(t, env, flowConnConfig) + // insert 10 rows into the source table + for i := range 10 { + testName := fmt.Sprintf("test_name_%d", i) + _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(name, created_at) VALUES ($1, '2024-%d-01') + `, srcSchemaTable, i), testName) + e2e.EnvNoError(t, env, err) + } + t.Log("Inserted 10 rows into the source table") + + e2e.EnvWaitForEqualTablesWithNames(env, s, "normalizing 10 rows", srcTable, dstTable, `id,name,created_at`) + env.Cancel() + e2e.RequireEnvCanceled(t, env) +}