From b9b1c4d168245d52351ca01e8ada42c1ef163be4 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Fri, 9 Feb 2024 04:13:33 +0530 Subject: [PATCH 1/3] don't overwrite options for child workflows when ContinueAsNew --- flow/workflows/cdc_flow.go | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 5244e9e4f1..04ecfbc45d 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -362,16 +362,21 @@ func CDCFlowWorkflowWithConfig( } } - state.SyncFlowOptions = &protos.SyncFlowOptions{ - BatchSize: cfg.MaxBatchSize, - // this means the env variable assignment path is never hit - IdleTimeoutSeconds: cfg.IdleTimeoutSeconds, - SrcTableIdNameMapping: state.SrcTableIdNameMapping, - TableNameSchemaMapping: state.TableNameSchemaMapping, - TableMappings: state.TableMappings, + // when we carry forward state, don't remake the options + if state.SyncFlowOptions != nil { + state.SyncFlowOptions = &protos.SyncFlowOptions{ + BatchSize: cfg.MaxBatchSize, + // this means the env variable assignment path is never hit + IdleTimeoutSeconds: cfg.IdleTimeoutSeconds, + SrcTableIdNameMapping: state.SrcTableIdNameMapping, + TableNameSchemaMapping: state.TableNameSchemaMapping, + TableMappings: state.TableMappings, + } } - state.NormalizeFlowOptions = &protos.NormalizeFlowOptions{ - TableNameSchemaMapping: state.TableNameSchemaMapping, + if state.NormalizeFlowOptions != nil { + state.NormalizeFlowOptions = &protos.NormalizeFlowOptions{ + TableNameSchemaMapping: state.TableNameSchemaMapping, + } } currentSyncFlowNum := 0 From 52dda5f8ed8ceb515692ad6741c2a0f797d0893c Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Fri, 9 Feb 2024 04:20:13 +0530 Subject: [PATCH 2/3] did the opposite in if statement --- flow/workflows/cdc_flow.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 04ecfbc45d..1dba282ed7 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -363,7 +363,7 @@ func CDCFlowWorkflowWithConfig( } // when we carry forward state, don't remake the options - if state.SyncFlowOptions != nil { + if state.SyncFlowOptions == nil { state.SyncFlowOptions = &protos.SyncFlowOptions{ BatchSize: cfg.MaxBatchSize, // this means the env variable assignment path is never hit @@ -373,7 +373,7 @@ func CDCFlowWorkflowWithConfig( TableMappings: state.TableMappings, } } - if state.NormalizeFlowOptions != nil { + if state.NormalizeFlowOptions == nil { state.NormalizeFlowOptions = &protos.NormalizeFlowOptions{ TableNameSchemaMapping: state.TableNameSchemaMapping, } From f213c632c998f760751c0515a7692e4861457a13 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Fri, 9 Feb 2024 18:56:28 +0530 Subject: [PATCH 3/3] removed NormalizeFlowOptions too, per comment --- flow/workflows/cdc_flow.go | 18 +++++------------- flow/workflows/normalize_flow.go | 10 +++------- protos/flow.proto | 4 ---- 3 files changed, 8 insertions(+), 24 deletions(-) diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 1dba282ed7..2a8faab05e 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -29,7 +29,7 @@ type CDCFlowWorkflowState struct { Progress []string // Accumulates status for sync flows spawned. SyncFlowStatuses []*model.SyncResponse - // Accumulates status for sync flows spawned. + // Accumulates status for normalize flows spawned. NormalizeFlowStatuses []model.NormalizeResponse // Current signalled state of the peer flow. ActiveSignal shared.CDCFlowSignal @@ -48,8 +48,6 @@ type CDCFlowWorkflowState struct { FlowConfigUpdates []*protos.CDCFlowConfigUpdate // options passed to all SyncFlows SyncFlowOptions *protos.SyncFlowOptions - // options passed to all NormalizeFlows - NormalizeFlowOptions *protos.NormalizeFlowOptions // initially copied from config, all changes are made here though TableMappings []*protos.TableMapping } @@ -61,8 +59,9 @@ func NewCDCFlowWorkflowState(cfgTableMappings []*protos.TableMapping) *CDCFlowWo tableMappings = append(tableMappings, proto.Clone(tableMapping).(*protos.TableMapping)) } return &CDCFlowWorkflowState{ - Progress: []string{"started"}, - SyncFlowStatuses: nil, + Progress: []string{"started"}, + // 1 more than the limit of 10 + SyncFlowStatuses: make([]*model.SyncResponse, 0, 11), NormalizeFlowStatuses: nil, ActiveSignal: shared.NoopSignal, SyncFlowErrors: nil, @@ -79,7 +78,6 @@ func NewCDCFlowWorkflowState(cfgTableMappings []*protos.TableMapping) *CDCFlowWo TableNameSchemaMapping: nil, FlowConfigUpdates: nil, SyncFlowOptions: nil, - NormalizeFlowOptions: nil, TableMappings: tableMappings, } } @@ -262,7 +260,7 @@ func CDCFlowWorkflowWithConfig( if cfg.Resync { for _, mapping := range state.TableMappings { oldName := mapping.DestinationTableIdentifier - newName := fmt.Sprintf("%s_resync", oldName) + newName := oldName + "_resync" mapping.DestinationTableIdentifier = newName } } @@ -373,11 +371,6 @@ func CDCFlowWorkflowWithConfig( TableMappings: state.TableMappings, } } - if state.NormalizeFlowOptions == nil { - state.NormalizeFlowOptions = &protos.NormalizeFlowOptions{ - TableNameSchemaMapping: state.TableNameSchemaMapping, - } - } currentSyncFlowNum := 0 totalRecordsSynced := int64(0) @@ -401,7 +394,6 @@ func CDCFlowWorkflowWithConfig( normCtx, NormalizeFlowWorkflow, cfg, - state.NormalizeFlowOptions, ) var normWaitChan workflow.ReceiveChannel diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index 8b51faeac3..f37544b14a 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -1,7 +1,6 @@ package peerflow import ( - "fmt" "log/slog" "time" @@ -15,10 +14,8 @@ import ( func NormalizeFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigs, - options *protos.NormalizeFlowOptions, ) (*model.NormalizeFlowResponse, error) { logger := workflow.GetLogger(ctx) - tableNameSchemaMapping := options.TableNameSchemaMapping normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 7 * 24 * time.Hour, @@ -28,12 +25,13 @@ func NormalizeFlowWorkflow(ctx workflow.Context, results := make([]model.NormalizeResponse, 0, 4) errors := make([]string, 0) syncChan := workflow.GetSignalChannel(ctx, shared.NormalizeSyncSignalName) + var tableNameSchemaMapping map[string]*protos.TableSchema var stopLoop, canceled bool var lastSyncBatchID, syncBatchID int64 lastSyncBatchID = -1 syncBatchID = -1 - selector := workflow.NewNamedSelector(ctx, fmt.Sprintf("%s-normalize", config.FlowJobName)) + selector := workflow.NewNamedSelector(ctx, config.FlowJobName+"-normalize") selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) { canceled = true }) @@ -46,9 +44,7 @@ func NormalizeFlowWorkflow(ctx workflow.Context, if s.SyncBatchID > syncBatchID { syncBatchID = s.SyncBatchID } - if len(s.TableNameSchemaMapping) != 0 { - tableNameSchemaMapping = s.TableNameSchemaMapping - } + tableNameSchemaMapping = s.TableNameSchemaMapping }) for !stopLoop { selector.Select(ctx) diff --git a/protos/flow.proto b/protos/flow.proto index fc407a7b9f..e51c873b6e 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -106,10 +106,6 @@ message SyncFlowOptions { repeated TableMapping table_mappings = 6; } -message NormalizeFlowOptions { - map table_name_schema_mapping = 1; -} - message LastSyncState { int64 checkpoint = 1; }