From 98ba6f2e64c04c6e82ffacf75a0921dfb8eb1472 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Sat, 7 Dec 2024 00:16:57 +0000 Subject: [PATCH] move schema changes into activity before normalize --- flow/activities/flowable.go | 6 ++-- flow/activities/flowable_core.go | 20 +++++++++++ flow/workflows/local_activities.go | 14 -------- flow/workflows/sync_flow.go | 55 +----------------------------- 4 files changed, 25 insertions(+), 70 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 3a872819a..b1eec8b70 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -333,8 +333,10 @@ func (a *FlowableActivity) MaintainPull( } close(normDone) }() - defer func() { <-normDone }() - defer close(normalize) // TODO race, this will cause sync to panic if it tries to send to normalize after maintainpull ends + defer func() { + close(normalize) // TODO race, this will cause sync to panic if it tries to send to normalize after maintainpull ends + <-normDone + }() for { select { diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 40285423f..08af39036 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -326,6 +326,26 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon activity.RecordHeartbeat(ctx, pushedRecordsWithCount) a.Alerter.LogFlowInfo(ctx, flowName, pushedRecordsWithCount) + tableSchemaDeltasCount := len(res.TableSchemaDeltas) + if tableSchemaDeltasCount > 0 { + modifiedSrcTables := make([]string, 0, tableSchemaDeltasCount) + for _, tableSchemaDelta := range res.TableSchemaDeltas { + modifiedSrcTables = append(modifiedSrcTables, tableSchemaDelta.SrcTableName) + } + + if err := a.SetupTableSchema(ctx, &protos.SetupTableSchemaBatchInput{ + PeerName: config.SourceName, + TableIdentifiers: modifiedSrcTables, + TableMappings: options.TableMappings, + FlowName: config.FlowJobName, + System: config.System, + }); err != nil { + err = fmt.Errorf("failed to execute schema update at source: %w", err) + a.Alerter.LogFlowError(ctx, flowName, err) + return nil, err + } + } + if recordBatchSync.NeedsNormalize() { parallel, err := peerdbenv.PeerDBEnableParallelSyncNormalize(ctx, config.Env) if err != nil { diff --git a/flow/workflows/local_activities.go b/flow/workflows/local_activities.go index 7a3e80f24..923bcaea5 100644 --- a/flow/workflows/local_activities.go +++ b/flow/workflows/local_activities.go @@ -15,20 +15,6 @@ import ( "github.com/PeerDB-io/peer-flow/shared" ) -func getParallelSyncNormalize(wCtx workflow.Context, logger log.Logger, env map[string]string) bool { - checkCtx := workflow.WithLocalActivityOptions(wCtx, workflow.LocalActivityOptions{ - StartToCloseTimeout: time.Minute, - }) - - getParallelFuture := workflow.ExecuteLocalActivity(checkCtx, peerdbenv.PeerDBEnableParallelSyncNormalize, env) - var parallel bool - if err := getParallelFuture.Get(checkCtx, ¶llel); err != nil { - logger.Warn("Failed to get status of parallel sync-normalize", slog.Any("error", err)) - return false - } - return parallel -} - func getQRepOverwriteFullRefreshMode(wCtx workflow.Context, logger log.Logger, env map[string]string) bool { checkCtx := workflow.WithLocalActivityOptions(wCtx, workflow.LocalActivityOptions{ StartToCloseTimeout: time.Minute, diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index eff4607d0..b4bc0f29f 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -64,16 +64,6 @@ func SyncFlowWorkflow( stop = true }) - var waitSelector workflow.Selector - parallel := getParallelSyncNormalize(ctx, logger, config.Env) - if !parallel { - waitSelector = workflow.NewNamedSelector(ctx, "NormalizeWait") - waitSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) - stopChan.AddToSelector(waitSelector, func(_ struct{}, _ bool) { - stop = true - }) - } - syncFlowCtx := workflow.WithActivityOptions(syncSessionCtx, workflow.ActivityOptions{ StartToCloseTimeout: 72 * time.Hour, HeartbeatTimeout: time.Minute, @@ -81,7 +71,6 @@ func SyncFlowWorkflow( }) for !stop && ctx.Err() == nil { var syncDone bool - mustWait := waitSelector != nil currentSyncFlowNum += 1 logger.Info("executing sync flow", slog.Int("count", currentSyncFlowNum)) @@ -102,56 +91,14 @@ func SyncFlowWorkflow( } else if childSyncFlowRes != nil { totalRecordsSynced += childSyncFlowRes.NumRecordsSynced logger.Info("Total records synced", slog.Int64("totalRecordsSynced", totalRecordsSynced)) - - // slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes. - tableSchemaDeltasCount := len(childSyncFlowRes.TableSchemaDeltas) - if tableSchemaDeltasCount > 0 { - modifiedSrcTables := make([]string, 0, tableSchemaDeltasCount) - for _, tableSchemaDelta := range childSyncFlowRes.TableSchemaDeltas { - modifiedSrcTables = append(modifiedSrcTables, tableSchemaDelta.SrcTableName) - } - - getModifiedSchemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 5 * time.Minute, - }) - getModifiedSchemaFuture := workflow.ExecuteActivity(getModifiedSchemaCtx, flowable.SetupTableSchema, - &protos.SetupTableSchemaBatchInput{ - PeerName: config.SourceName, - TableIdentifiers: modifiedSrcTables, - TableMappings: options.TableMappings, - FlowName: config.FlowJobName, - System: config.System, - }) - - if err := getModifiedSchemaFuture.Get(ctx, nil); err != nil { - logger.Error("failed to execute schema update at source", slog.Any("error", err)) - } - } - - mustWait = false - } else { - mustWait = false } }) for ctx.Err() == nil && ((!syncDone && !syncErr) || selector.HasPending()) { selector.Select(ctx) } - if ctx.Err() != nil { - break - } - restart := syncErr || workflow.GetInfo(ctx).GetContinueAsNewSuggested() - if !stop && !syncErr && mustWait { - waitSelector.Select(ctx) - if restart { - // must flush selector for signals received while waiting - for ctx.Err() == nil && selector.HasPending() { - selector.Select(ctx) - } - break - } - } else if restart { + if syncErr || ctx.Err() != nil || workflow.GetInfo(ctx).GetContinueAsNewSuggested() { break } }