From bc5fff545fc1f6fd22fd32fe5b063b6ed3fe62c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 19 Dec 2024 03:51:32 +0000 Subject: [PATCH] remove table name schema migration (#2375) migration introduced in 0.18.1 --- flow/activities/flowable.go | 32 -------------------------------- flow/workflows/cdc_flow.go | 16 ---------------- protos/flow.proto | 2 +- 3 files changed, 1 insertion(+), 49 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 0051d0d0f..84a9234bb 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -137,38 +137,6 @@ func (a *FlowableActivity) CreateRawTable( return res, nil } -func (a *FlowableActivity) MigrateTableSchema( - ctx context.Context, - flowName string, - schemas map[string]*protos.TableSchema, -) error { - logger := activity.GetLogger(ctx) - tx, err := a.CatalogPool.BeginTx(ctx, pgx.TxOptions{}) - if err != nil { - return err - } - defer shared.RollbackTx(tx, logger) - - for k, v := range schemas { - processedBytes, err := proto.Marshal(v) - if err != nil { - return err - } - if _, err := tx.Exec( - ctx, - "insert into table_schema_mapping(flow_name, table_name, table_schema) values ($1, $2, $3) "+ - "on conflict (flow_name, table_name) do update set table_schema = $3", - flowName, - k, - processedBytes, - ); err != nil { - return err - } - } - - return tx.Commit(ctx) -} - // SetupTableSchema populates table_schema_mapping func (a *FlowableActivity) SetupTableSchema( ctx context.Context, diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 1bb86dd53..c43b56a9c 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -368,22 +368,6 @@ func CDCFlowWorkflow( originalRunID := workflow.GetInfo(ctx).OriginalRunID - // MIGRATION TableNameSchemaMapping moved to catalog - if state.SyncFlowOptions.TableNameSchemaMapping != nil { - migrateCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 5 * time.Minute, - }) - if err := workflow.ExecuteActivity( - migrateCtx, - flowable.MigrateTableSchema, - cfg.FlowJobName, - state.SyncFlowOptions.TableNameSchemaMapping, - ).Get(migrateCtx, nil); err != nil { - return state, fmt.Errorf("failed to migrate TableNameSchemaMapping: %w", err) - } - state.SyncFlowOptions.TableNameSchemaMapping = nil - } - // we cannot skip SetupFlow if SnapshotFlow did not complete in cases where Resync is enabled // because Resync modifies TableMappings before Setup and also before Snapshot // for safety, rely on the idempotency of SetupFlow instead diff --git a/protos/flow.proto b/protos/flow.proto index b917b6dae..09a6393dc 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -109,10 +109,10 @@ message CreateTablesFromExistingOutput { } message SyncFlowOptions { + reserved 5; uint32 batch_size = 1; uint64 idle_timeout_seconds = 3; map src_table_id_name_mapping = 4; - map table_name_schema_mapping = 5; repeated TableMapping table_mappings = 6; int32 number_of_syncs = 7; }