diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index b3161161e1..c8a853287e 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -891,15 +891,17 @@ func (c *PostgresConnector) SetupNormalizedTable( if tableAlreadyExists { c.logger.Info("[postgres] table already exists, skipping", slog.String("table", tableIdentifier)) - if config.IsResync { - err := c.ExecuteCommand(ctx, fmt.Sprintf(dropTableIfExistsSQL, - QuoteIdentifier(parsedNormalizedTable.Schema), - QuoteIdentifier(parsedNormalizedTable.Table))) - if err != nil { - return false, fmt.Errorf("error while dropping _resync table: %w", err) - } + if !config.IsResync { + return true, nil + } + + err := c.ExecuteCommand(ctx, fmt.Sprintf(dropTableIfExistsSQL, + QuoteIdentifier(parsedNormalizedTable.Schema), + QuoteIdentifier(parsedNormalizedTable.Table))) + if err != nil { + return false, fmt.Errorf("error while dropping _resync table: %w", err) } - return true, nil + c.logger.Info("[postgres] dropped resync table for resync", slog.String("resyncTable", parsedNormalizedTable.String())) } // convert the column names and types to Postgres types @@ -1437,7 +1439,7 @@ func (c *PostgresConnector) RenameTables( } // rename the src table to dst - _, err = c.execWithLoggingTx(ctx, fmt.Sprintf("ALTER TABLE %s RENAME TO %s", src, dstTable.Table), renameTablesTx) + _, err = c.execWithLoggingTx(ctx, fmt.Sprintf("ALTER TABLE %s RENAME TO %s", src, QuoteIdentifier(dstTable.Table)), renameTablesTx) if err != nil { return nil, fmt.Errorf("unable to rename table %s to %s: %w", src, dst, err) }