From 8aee197f278dc4d38b8569f60dbe26b60299ab8b Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Fri, 20 Dec 2024 17:31:01 +0530 Subject: [PATCH 1/2] [cdc] always initialize childToParentRelIDMapping (#2369) https://github.com/PeerDB-io/peerdb/pull/2323#discussion_r1888186520 --- flow/connectors/postgres/postgres.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 6cd3be11e..7dcdc3261 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -397,13 +397,9 @@ func pullCore[Items model.Items]( if err != nil { return err } - var childToParentRelIDMap map[uint32]uint32 - // only initialize the map if needed, escape hatch because custom publications may not have the right setting - if req.OverridePublicationName != "" || pgVersion < shared.POSTGRES_13 { - childToParentRelIDMap, err = GetChildToParentRelIDMap(ctx, c.conn, slices.Collect(maps.Keys(req.SrcTableIDNameMapping))) - if err != nil { - return fmt.Errorf("error getting child to parent relid map: %w", err) - } + childToParentRelIDMap, err := GetChildToParentRelIDMap(ctx, c.conn, slices.Collect(maps.Keys(req.SrcTableIDNameMapping))) + if err != nil { + return fmt.Errorf("error getting child to parent relid map: %w", err) } if err := c.MaybeStartReplication(ctx, slotName, publicationName, req.LastOffset, pgVersion); err != nil { From b87d0a34036de9188bdcf73a45aed0d91081ef5c Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Fri, 20 Dec 2024 20:10:33 +0530 Subject: [PATCH 2/2] generic e2e test for partitioned tables (#2383) --- flow/e2e/generic/generic_test.go | 51 ++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/flow/e2e/generic/generic_test.go b/flow/e2e/generic/generic_test.go index bded0a6d8..a56bcae47 100644 --- a/flow/e2e/generic/generic_test.go +++ b/flow/e2e/generic/generic_test.go @@ -299,3 +299,54 @@ func (s Generic) Test_Simple_Schema_Changes() { e2e.RequireEnvCanceled(t, env) } + +func (s Generic) Test_Partitioned_Table() { + t := s.T() + srcTable := "test_partition" + dstTable := "test_partition_dst" + srcSchemaTable := e2e.AttachSchema(s, srcTable) + + _, err := s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE %[1]s( + id SERIAL NOT NULL, + name TEXT, + created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT now(), + PRIMARY KEY (created_at, id) + ) PARTITION BY RANGE(created_at); + CREATE TABLE %[1]s_2024q1 + PARTITION OF %[1]s + FOR VALUES FROM ('2024-01-01') TO ('2024-04-01'); + CREATE TABLE %[1]s_2024q2 + PARTITION OF %[1]s + FOR VALUES FROM ('2024-04-01') TO ('2024-07-01'); + CREATE TABLE %[1]s_2024q3 + PARTITION OF %[1]s + FOR VALUES FROM ('2024-07-01') TO ('2024-10-01'); + `, srcSchemaTable)) + require.NoError(t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: e2e.AddSuffix(s, "test_partition"), + TableMappings: e2e.TableMappings(s, srcTable, dstTable), + Destination: s.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(t) + + tc := e2e.NewTemporalClient(t) + env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + + e2e.SetupCDCFlowStatusQuery(t, env, flowConnConfig) + // insert 10 rows into the source table + for i := range 10 { + testName := fmt.Sprintf("test_name_%d", i) + _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(name, created_at) VALUES ($1, '2024-%d-01') + `, srcSchemaTable, max(1, i)), testName) + e2e.EnvNoError(t, env, err) + } + t.Log("Inserted 10 rows into the source table") + + e2e.EnvWaitForEqualTablesWithNames(env, s, "normalizing 10 rows", srcTable, dstTable, `id,name,created_at`) + env.Cancel() + e2e.RequireEnvCanceled(t, env) +}