Skip to content

Commit

Permalink
remove table name schema migration (#2375)
Browse files Browse the repository at this point in the history
migration introduced in 0.18.1
  • Loading branch information
serprex authored Dec 19, 2024
1 parent 4bb22cd commit bc5fff5
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 49 deletions.
32 changes: 0 additions & 32 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,38 +137,6 @@ func (a *FlowableActivity) CreateRawTable(
return res, nil
}

func (a *FlowableActivity) MigrateTableSchema(
ctx context.Context,
flowName string,
schemas map[string]*protos.TableSchema,
) error {
logger := activity.GetLogger(ctx)
tx, err := a.CatalogPool.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return err
}
defer shared.RollbackTx(tx, logger)

for k, v := range schemas {
processedBytes, err := proto.Marshal(v)
if err != nil {
return err
}
if _, err := tx.Exec(
ctx,
"insert into table_schema_mapping(flow_name, table_name, table_schema) values ($1, $2, $3) "+
"on conflict (flow_name, table_name) do update set table_schema = $3",
flowName,
k,
processedBytes,
); err != nil {
return err
}
}

return tx.Commit(ctx)
}

// SetupTableSchema populates table_schema_mapping
func (a *FlowableActivity) SetupTableSchema(
ctx context.Context,
Expand Down
16 changes: 0 additions & 16 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,22 +368,6 @@ func CDCFlowWorkflow(

originalRunID := workflow.GetInfo(ctx).OriginalRunID

// MIGRATION TableNameSchemaMapping moved to catalog
if state.SyncFlowOptions.TableNameSchemaMapping != nil {
migrateCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
})
if err := workflow.ExecuteActivity(
migrateCtx,
flowable.MigrateTableSchema,
cfg.FlowJobName,
state.SyncFlowOptions.TableNameSchemaMapping,
).Get(migrateCtx, nil); err != nil {
return state, fmt.Errorf("failed to migrate TableNameSchemaMapping: %w", err)
}
state.SyncFlowOptions.TableNameSchemaMapping = nil
}

// we cannot skip SetupFlow if SnapshotFlow did not complete in cases where Resync is enabled
// because Resync modifies TableMappings before Setup and also before Snapshot
// for safety, rely on the idempotency of SetupFlow instead
Expand Down
2 changes: 1 addition & 1 deletion protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@ message CreateTablesFromExistingOutput {
}

message SyncFlowOptions {
reserved 5;
uint32 batch_size = 1;
uint64 idle_timeout_seconds = 3;
map<uint32, string> src_table_id_name_mapping = 4;
map<string, TableSchema> table_name_schema_mapping = 5;
repeated TableMapping table_mappings = 6;
int32 number_of_syncs = 7;
}
Expand Down

0 comments on commit bc5fff5

Please sign in to comment.