Skip to content

Commit

Permalink
[cdc] better manual handling of partitioned tables (#2323)
Browse files Browse the repository at this point in the history
1. log when we're remapping child table OID to parent table OID
2. partition fetching query also allows non-partitioned tables,
extensions use this
3. in case we see an unknown table OID after checking
`childToParentRelIDMapping`, check again to ensure the table wasn't
created in the middle of CDC
4. remove log since it could log PII
  • Loading branch information
heavycrystal authored Dec 6, 2024
1 parent 572aa1d commit 8e87695
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 8 deletions.
55 changes: 50 additions & 5 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package connpostgres

import (
"context"
"errors"
"fmt"
"log/slog"
"sync"
Expand Down Expand Up @@ -46,6 +47,7 @@ type PostgresCDCSource struct {
catalogPool *pgxpool.Pool
otelManager *otel_metrics.OtelManager
hushWarnUnhandledMessageType map[pglogrepl.MessageType]struct{}
hushWarnUnknownTableDetected map[uint32]struct{}
flowJobName string
}

Expand Down Expand Up @@ -77,21 +79,24 @@ func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) *
childToParentRelIDMapping: cdcConfig.ChildToParentRelIDMap,
catalogPool: cdcConfig.CatalogPool,
otelManager: cdcConfig.OtelManager,
flowJobName: cdcConfig.FlowJobName,
hushWarnUnhandledMessageType: make(map[pglogrepl.MessageType]struct{}),
hushWarnUnknownTableDetected: make(map[uint32]struct{}),
flowJobName: cdcConfig.FlowJobName,
}
}

func GetChildToParentRelIDMap(ctx context.Context, conn *pgx.Conn) (map[uint32]uint32, error) {
func GetChildToParentRelIDMap(ctx context.Context,
conn *pgx.Conn, parentTableOIDs []uint32,
) (map[uint32]uint32, error) {
query := `
SELECT parent.oid AS parentrelid, child.oid AS childrelid
FROM pg_inherits
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
WHERE parent.relkind='p';
WHERE parent.relkind IN ('p','r') AND parent.oid=ANY($1);
`

rows, err := conn.Query(ctx, query)
rows, err := conn.Query(ctx, query, parentTableOIDs)
if err != nil {
return nil, fmt.Errorf("error querying for child to parent relid map: %w", err)
}
Expand Down Expand Up @@ -679,7 +684,10 @@ func processMessage[Items model.Items](
p.commitLock = nil
case *pglogrepl.RelationMessage:
// treat all relation messages as corresponding to parent if partitioned.
msg.RelationID = p.getParentRelIDIfPartitioned(msg.RelationID)
msg.RelationID, err = p.checkIfUnknownTableInherits(ctx, msg.RelationID)
if err != nil {
return nil, err
}

if _, exists := p.srcTableIDNameMapping[msg.RelationID]; !exists {
return nil, nil
Expand Down Expand Up @@ -964,8 +972,45 @@ func processRelationMessage[Items model.Items](
func (p *PostgresCDCSource) getParentRelIDIfPartitioned(relID uint32) uint32 {
parentRelID, ok := p.childToParentRelIDMapping[relID]
if ok {
if _, ok := p.hushWarnUnknownTableDetected[relID]; !ok {
p.logger.Info("Detected child table in CDC stream, remapping to parent table",
slog.Uint64("childRelID", uint64(relID)),
slog.Uint64("parentRelID", uint64(parentRelID)),
slog.String("parentTableName", p.srcTableIDNameMapping[parentRelID]))
p.hushWarnUnknownTableDetected[relID] = struct{}{}
}
return parentRelID
}

return relID
}

// since we generate the child to parent mapping at the beginning of the CDC stream,
// some tables could be created after the CDC stream starts,
// and we need to check if they inherit from a known table
func (p *PostgresCDCSource) checkIfUnknownTableInherits(ctx context.Context,
relID uint32,
) (uint32, error) {
relID = p.getParentRelIDIfPartitioned(relID)

if _, ok := p.srcTableIDNameMapping[relID]; !ok {
var parentRelID uint32
err := p.conn.QueryRow(ctx,
`SELECT inhparent FROM pg_inherits WHERE inhrelid=$1`, relID).Scan(&parentRelID)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return relID, nil
}
return 0, fmt.Errorf("failed to query pg_inherits: %w", err)
}
p.childToParentRelIDMapping[relID] = parentRelID
p.hushWarnUnknownTableDetected[relID] = struct{}{}
p.logger.Info("Detected new child table in CDC stream, remapping to parent table",
slog.Uint64("childRelID", uint64(relID)),
slog.Uint64("parentRelID", uint64(parentRelID)),
slog.String("parentTableName", p.srcTableIDNameMapping[parentRelID]))
return parentRelID, nil
}

return relID, nil
}
4 changes: 3 additions & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"errors"
"fmt"
"log/slog"
"maps"
"slices"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -398,7 +400,7 @@ func pullCore[Items model.Items](
var childToParentRelIDMap map[uint32]uint32
// only initialize the map if needed, escape hatch because custom publications may not have the right setting
if req.OverridePublicationName != "" || pgVersion < shared.POSTGRES_13 {
childToParentRelIDMap, err = GetChildToParentRelIDMap(ctx, c.conn)
childToParentRelIDMap, err = GetChildToParentRelIDMap(ctx, c.conn, slices.Collect(maps.Keys(req.SrcTableIDNameMapping)))
if err != nil {
return fmt.Errorf("error getting child to parent relid map: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions flow/model/cdc_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ func (r *CDCStream[T]) AddRecord(ctx context.Context, record Record[T]) error {
case r.records <- record:
return nil
case <-ticker.C:
logger.Warn("waiting on adding record to stream", slog.Any("record", record))
logger.Warn("waiting on adding record to stream", slog.String("dstTableName", record.GetDestinationTableName()))
case <-ctx.Done():
logger.Warn("context cancelled while adding record to stream", slog.Any("record", record))
logger.Warn("context cancelled while adding record to stream", slog.String("dstTableName", record.GetDestinationTableName()))
return ctx.Err()
}
}
Expand Down

0 comments on commit 8e87695

Please sign in to comment.