Skip to content

Commit

Permalink
validation: list missing tables from publication (#2195)
Browse files Browse the repository at this point in the history
recently a customer needed to check which table was missing since we lacked this info
  • Loading branch information
serprex authored Oct 28, 2024
1 parent 83465e9 commit 556e95e
Showing 1 changed file with 37 additions and 21 deletions.
58 changes: 37 additions & 21 deletions flow/connectors/postgres/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,41 +23,57 @@ func (c *PostgresConnector) CheckSourceTables(ctx context.Context,
tableArr := make([]string, 0, len(tableNames))
for _, parsedTable := range tableNames {
var row pgx.Row
tableArr = append(tableArr, fmt.Sprintf(`(%s::text, %s::text)`,
tableArr = append(tableArr, fmt.Sprintf(`(%s::text,%s::text)`,
QuoteLiteral(parsedTable.Schema), QuoteLiteral(parsedTable.Table)))
err := c.conn.QueryRow(ctx,
fmt.Sprintf("SELECT * FROM %s.%s LIMIT 0;",
QuoteIdentifier(parsedTable.Schema), QuoteIdentifier(parsedTable.Table))).Scan(&row)
if err != nil && err != pgx.ErrNoRows {
if err := c.conn.QueryRow(ctx,
fmt.Sprintf("SELECT * FROM %s.%s LIMIT 0", QuoteIdentifier(parsedTable.Schema), QuoteIdentifier(parsedTable.Table)),
).Scan(&row); err != nil && err != pgx.ErrNoRows {
return err
}
}

tableStr := strings.Join(tableArr, ",")

if pubName != "" && !noCDC {
// Check if publication exists
err := c.conn.QueryRow(ctx, "SELECT pubname FROM pg_publication WHERE pubname=$1", pubName).Scan(nil)
if err != nil {
var alltables bool
if err := c.conn.QueryRow(ctx, "SELECT puballtables FROM pg_publication WHERE pubname=$1", pubName).Scan(&alltables); err != nil {
if err == pgx.ErrNoRows {
return fmt.Errorf("publication does not exist: %s", pubName)
}
return fmt.Errorf("error while checking for publication existence: %w", err)
}

// Check if tables belong to publication
var pubTableCount int
err = c.conn.QueryRow(ctx, fmt.Sprintf(`
with source_table_components (sname, tname) as (values %s)
select COUNT(DISTINCT(schemaname,tablename)) from pg_publication_tables
INNER JOIN source_table_components stc
ON schemaname=stc.sname and tablename=stc.tname where pubname=$1;`, tableStr), pubName).Scan(&pubTableCount)
if err != nil {
return err
}
if !alltables {
// Check if tables belong to publication
tableStr := strings.Join(tableArr, ",")

rows, err := c.conn.Query(
ctx,
fmt.Sprintf(`select schemaname,tablename
from (values %s) as input(schemaname,tablename)
where not exists (
select * from pg_publication_tables pub
where pubname=$1 and pub.schemaname=input.schemaname and pub.tablename=input.tablename
)`, tableStr),
pubName,
)
if err != nil {
return err
}
missing, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (string, error) {
var schema string
var table string
if err := row.Scan(&schema, &table); err != nil {
return "", err
}
return fmt.Sprintf("%s.%s", QuoteIdentifier(schema), QuoteIdentifier(table)), nil
})
if err != nil {
return err
}

if pubTableCount != len(tableNames) {
return errors.New("not all tables belong to publication")
if len(missing) != 0 {
return errors.New("some tables missing from publication: " + strings.Join(missing, ", "))
}
}
}

Expand Down

0 comments on commit 556e95e

Please sign in to comment.