Skip to content

Commit

Permalink
Merge branch 'main' into ignore-read-only-drop-publication
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Dec 20, 2024
2 parents b58383e + b87d0a3 commit 99cb438
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 7 deletions.
10 changes: 3 additions & 7 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,13 +397,9 @@ func pullCore[Items model.Items](
if err != nil {
return err
}
var childToParentRelIDMap map[uint32]uint32
// only initialize the map if needed, escape hatch because custom publications may not have the right setting
if req.OverridePublicationName != "" || pgVersion < shared.POSTGRES_13 {
childToParentRelIDMap, err = GetChildToParentRelIDMap(ctx, c.conn, slices.Collect(maps.Keys(req.SrcTableIDNameMapping)))
if err != nil {
return fmt.Errorf("error getting child to parent relid map: %w", err)
}
childToParentRelIDMap, err := GetChildToParentRelIDMap(ctx, c.conn, slices.Collect(maps.Keys(req.SrcTableIDNameMapping)))
if err != nil {
return fmt.Errorf("error getting child to parent relid map: %w", err)
}

if err := c.MaybeStartReplication(ctx, slotName, publicationName, req.LastOffset, pgVersion); err != nil {
Expand Down
51 changes: 51 additions & 0 deletions flow/e2e/generic/generic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,3 +299,54 @@ func (s Generic) Test_Simple_Schema_Changes() {

e2e.RequireEnvCanceled(t, env)
}

func (s Generic) Test_Partitioned_Table() {
t := s.T()
srcTable := "test_partition"
dstTable := "test_partition_dst"
srcSchemaTable := e2e.AttachSchema(s, srcTable)

_, err := s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE %[1]s(
id SERIAL NOT NULL,
name TEXT,
created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT now(),
PRIMARY KEY (created_at, id)
) PARTITION BY RANGE(created_at);
CREATE TABLE %[1]s_2024q1
PARTITION OF %[1]s
FOR VALUES FROM ('2024-01-01') TO ('2024-04-01');
CREATE TABLE %[1]s_2024q2
PARTITION OF %[1]s
FOR VALUES FROM ('2024-04-01') TO ('2024-07-01');
CREATE TABLE %[1]s_2024q3
PARTITION OF %[1]s
FOR VALUES FROM ('2024-07-01') TO ('2024-10-01');
`, srcSchemaTable))
require.NoError(t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: e2e.AddSuffix(s, "test_partition"),
TableMappings: e2e.TableMappings(s, srcTable, dstTable),
Destination: s.Peer().Name,
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(t)

tc := e2e.NewTemporalClient(t)
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)

e2e.SetupCDCFlowStatusQuery(t, env, flowConnConfig)
// insert 10 rows into the source table
for i := range 10 {
testName := fmt.Sprintf("test_name_%d", i)
_, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(name, created_at) VALUES ($1, '2024-%d-01')
`, srcSchemaTable, max(1, i)), testName)
e2e.EnvNoError(t, env, err)
}
t.Log("Inserted 10 rows into the source table")

e2e.EnvWaitForEqualTablesWithNames(env, s, "normalizing 10 rows", srcTable, dstTable, `id,name,created_at`)
env.Cancel()
e2e.RequireEnvCanceled(t, env)
}

0 comments on commit 99cb438

Please sign in to comment.