Skip to content

Commit

Permalink
just fetch childToParentRelIDMapping for tables we want
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Dec 5, 2024
1 parent 49c2e28 commit 5307cdb
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
8 changes: 5 additions & 3 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,18 @@ func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) *
}
}

func GetChildToParentRelIDMap(ctx context.Context, conn *pgx.Conn) (map[uint32]uint32, error) {
func GetChildToParentRelIDMap(ctx context.Context,
conn *pgx.Conn, parentTableOIDs []uint32,
) (map[uint32]uint32, error) {
query := `
SELECT parent.oid AS parentrelid, child.oid AS childrelid
FROM pg_inherits
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
WHERE parent.relkind IN ('p','r');
WHERE parent.relkind IN ('p','r') AND parent.oid=ANY($1);
`

rows, err := conn.Query(ctx, query)
rows, err := conn.Query(ctx, query, parentTableOIDs)
if err != nil {
return nil, fmt.Errorf("error querying for child to parent relid map: %w", err)
}
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"errors"
"fmt"
"log/slog"
"maps"
"slices"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -398,7 +400,7 @@ func pullCore[Items model.Items](
var childToParentRelIDMap map[uint32]uint32
// only initialize the map if needed, escape hatch because custom publications may not have the right setting
if req.OverridePublicationName != "" || pgVersion < shared.POSTGRES_13 {
childToParentRelIDMap, err = GetChildToParentRelIDMap(ctx, c.conn)
childToParentRelIDMap, err = GetChildToParentRelIDMap(ctx, c.conn, slices.Collect(maps.Keys(req.SrcTableIDNameMapping)))
if err != nil {
return fmt.Errorf("error getting child to parent relid map: %w", err)
}
Expand Down

0 comments on commit 5307cdb

Please sign in to comment.