Skip to content

Commit

Permalink
resync for Postgres CDC
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Jun 20, 2024
1 parent f1f858a commit 98acc01
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 1 deletion.
1 change: 1 addition & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ var (

_ RenameTablesConnector = &connsnowflake.SnowflakeConnector{}
_ RenameTablesConnector = &connbigquery.BigQueryConnector{}
_ RenameTablesConnector = &connpostgres.PostgresConnector{}

_ ValidationConnector = &connsnowflake.SnowflakeConnector{}
_ ValidationConnector = &connclickhouse.ClickhouseConnector{}
Expand Down
83 changes: 83 additions & 0 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -1230,3 +1230,86 @@ func (c *PostgresConnector) AddTablesToPublication(ctx context.Context, req *pro

return nil
}

func (c *PostgresConnector) RenameTables(ctx context.Context, req *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) {
renameTablesTx, err := c.conn.Begin(ctx)
if err != nil {
return nil, fmt.Errorf("unable to begin transaction for rename tables: %w", err)
}
defer shared.RollbackTx(renameTablesTx, c.logger)

for _, renameRequest := range req.RenameTableOptions {
srcTable, err := utils.ParseSchemaTable(renameRequest.CurrentName)
if err != nil {
return nil, fmt.Errorf("unable to parse source %s: %w", renameRequest.CurrentName, err)
}
src := srcTable.String()

dstTable, err := utils.ParseSchemaTable(renameRequest.NewName)
if err != nil {
return nil, fmt.Errorf("unable to parse destination %s: %w", renameRequest.NewName, err)
}
dst := dstTable.String()

c.logger.Info(fmt.Sprintf("setting synced at column for table '%s'...", src))

if req.SyncedAtColName != nil && *req.SyncedAtColName != "" {
_, err = renameTablesTx.Exec(ctx,
fmt.Sprintf("UPDATE %s SET %s=now()", src, QuoteIdentifier(*req.SyncedAtColName)))
if err != nil {
return nil, fmt.Errorf("unable to set synced at column for table %s: %w", src, err)
}
}

if req.SoftDeleteColName != nil && *req.SoftDeleteColName != "" {
columnNames := make([]string, 0, len(renameRequest.TableSchema.Columns))
for _, col := range renameRequest.TableSchema.Columns {
columnNames = append(columnNames, QuoteIdentifier(col.Name))
}

pkeyColumnNames := make([]string, 0, len(renameRequest.TableSchema.PrimaryKeyColumns))
for _, col := range renameRequest.TableSchema.PrimaryKeyColumns {
pkeyColumnNames = append(pkeyColumnNames, QuoteIdentifier(col))
}

allCols := strings.Join(columnNames, ",")
pkeyCols := strings.Join(pkeyColumnNames, ",")

c.logger.Info(fmt.Sprintf("handling soft-deletes for table '%s'...", dst))

_, err = renameTablesTx.Exec(ctx,
fmt.Sprintf("INSERT INTO %s(%s) SELECT %s,true AS %s FROM %s WHERE (%s) NOT IN (SELECT %s FROM %s)",
src, fmt.Sprintf("%s,%s", allCols, QuoteIdentifier(*req.SoftDeleteColName)), allCols, *req.SoftDeleteColName,
dst, pkeyCols, pkeyCols, src))
if err != nil {
return nil, fmt.Errorf("unable to handle soft-deletes for table %s: %w", dst, err)
}
}

// renaming and dropping such that the _resync table is the new destination
c.logger.Info(fmt.Sprintf("renaming table '%s' to '%s'...", src, dst))

// drop the dst table if exists
_, err = renameTablesTx.Exec(ctx, "DROP TABLE IF EXISTS "+dst)
if err != nil {
return nil, fmt.Errorf("unable to drop table %s: %w", dst, err)
}

// rename the src table to dst
_, err = renameTablesTx.Exec(ctx, fmt.Sprintf("ALTER TABLE %s RENAME TO %s", src, dstTable.Table))
if err != nil {
return nil, fmt.Errorf("unable to rename table %s to %s: %w", src, dst, err)
}

c.logger.Info(fmt.Sprintf("successfully renamed table '%s' to '%s'", src, dst))
}

err = renameTablesTx.Commit(ctx)
if err != nil {
return nil, fmt.Errorf("unable to commit transaction for rename tables: %w", err)
}

return &protos.RenameTablesOutput{
FlowJobName: req.FlowJobName,
}, nil
}
3 changes: 2 additions & 1 deletion ui/app/mirrors/[mirrorId]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ export default async function ViewMirror({
mirrorStatus.currentFlowState.toString() !==
FlowStatus[FlowStatus.STATUS_SETUP] &&
(dbType.valueOf() === DBType.BIGQUERY.valueOf() ||
dbType.valueOf() === DBType.SNOWFLAKE.valueOf());
dbType.valueOf() === DBType.SNOWFLAKE.valueOf() ||
dbType.valueOf() === DBType.POSTGRES.valueOf());

actionsDropdown = (
<MirrorActions
Expand Down

0 comments on commit 98acc01

Please sign in to comment.