Skip to content

Commit

Permalink
Fix resync logic for postgres to postgres (#2240)
Browse files Browse the repository at this point in the history
This PR fixes the intended drop if exists logic for _resync tables when
a Postgres to Postgres mirror is resynced, along with a casing issue

Co-authored-by: Philip Dubé <[email protected]>
  • Loading branch information
Amogh-Bharadwaj and serprex authored Nov 13, 2024
1 parent 9641aec commit 1f969d1
Showing 1 changed file with 11 additions and 9 deletions.
20 changes: 11 additions & 9 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,15 +891,17 @@ func (c *PostgresConnector) SetupNormalizedTable(
if tableAlreadyExists {
c.logger.Info("[postgres] table already exists, skipping",
slog.String("table", tableIdentifier))
if config.IsResync {
err := c.ExecuteCommand(ctx, fmt.Sprintf(dropTableIfExistsSQL,
QuoteIdentifier(parsedNormalizedTable.Schema),
QuoteIdentifier(parsedNormalizedTable.Table)))
if err != nil {
return false, fmt.Errorf("error while dropping _resync table: %w", err)
}
if !config.IsResync {
return true, nil
}

err := c.ExecuteCommand(ctx, fmt.Sprintf(dropTableIfExistsSQL,
QuoteIdentifier(parsedNormalizedTable.Schema),
QuoteIdentifier(parsedNormalizedTable.Table)))
if err != nil {
return false, fmt.Errorf("error while dropping _resync table: %w", err)
}
return true, nil
c.logger.Info("[postgres] dropped resync table for resync", slog.String("resyncTable", parsedNormalizedTable.String()))
}

// convert the column names and types to Postgres types
Expand Down Expand Up @@ -1437,7 +1439,7 @@ func (c *PostgresConnector) RenameTables(
}

// rename the src table to dst
_, err = c.execWithLoggingTx(ctx, fmt.Sprintf("ALTER TABLE %s RENAME TO %s", src, dstTable.Table), renameTablesTx)
_, err = c.execWithLoggingTx(ctx, fmt.Sprintf("ALTER TABLE %s RENAME TO %s", src, QuoteIdentifier(dstTable.Table)), renameTablesTx)
if err != nil {
return nil, fmt.Errorf("unable to rename table %s to %s: %w", src, dst, err)
}
Expand Down

0 comments on commit 1f969d1

Please sign in to comment.