From 2a588424eac92582065c39283a7121f29d5785a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 6 Dec 2024 21:57:23 +0000 Subject: [PATCH] move normalize into sync activity can still run in parallel, in fact now sync batches won't sync ahead if normalize is slow --- flow/activities/flowable.go | 57 ++++++++--- flow/activities/flowable_core.go | 51 ++++++---- flow/connectors/bigquery/bigquery.go | 12 +-- flow/connectors/clickhouse/normalize.go | 24 ++--- flow/connectors/core.go | 2 +- flow/connectors/postgres/postgres.go | 26 ++--- flow/connectors/snowflake/snowflake.go | 12 +-- flow/model/signals.go | 8 -- flow/workflows/cdc_flow.go | 62 ------------ flow/workflows/normalize_flow.go | 121 ------------------------ flow/workflows/register.go | 1 - flow/workflows/sync_flow.go | 30 +----- 12 files changed, 123 insertions(+), 283 deletions(-) delete mode 100644 flow/workflows/normalize_flow.go diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index ed9d2b720d..3a872819af 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -38,9 +38,15 @@ type CheckConnectionResult struct { NeedsSetupMetadataTables bool } +type NormalizeBatchRequest struct { + Done chan model.NormalizeResponse + BatchID int64 +} + type CdcCacheEntry struct { connector connectors.CDCPullConnectorCore done chan struct{} + normalize chan NormalizeBatchRequest } type FlowableActivity struct { @@ -297,16 +303,39 @@ func (a *FlowableActivity) MaintainPull( } done := make(chan struct{}) + normalize := make(chan NormalizeBatchRequest) a.CdcCacheRw.Lock() a.CdcCache[sessionID] = CdcCacheEntry{ connector: srcConn, done: done, + normalize: normalize, } a.CdcCacheRw.Unlock() ticker := time.NewTicker(15 * time.Second) defer ticker.Stop() + normDone := make(chan struct{}) + go func() { + for req := range normalize { + res, err := a.StartNormalize(ctx, &protos.StartNormalizeInput{ + FlowConnectionConfigs: config, + SyncBatchID: req.BatchID, + }) + if err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + } else if req.Done != nil { + req.Done <- res + } + if req.Done != nil { + close(req.Done) + } + } + close(normDone) + }() + defer func() { <-normDone }() + defer close(normalize) // TODO race, this will cause sync to panic if it tries to send to normalize after maintainpull ends + for { select { case <-ticker.C: @@ -344,7 +373,7 @@ func (a *FlowableActivity) SyncRecords( config *protos.FlowConnectionConfigs, options *protos.SyncFlowOptions, sessionID string, -) (*model.SyncCompositeResponse, error) { +) (*model.SyncResponse, error) { var adaptStream func(stream *model.CDCStream[model.RecordItems]) (*model.CDCStream[model.RecordItems], error) if config.Script != "" { var onErr context.CancelCauseFunc @@ -385,7 +414,7 @@ func (a *FlowableActivity) SyncPg( config *protos.FlowConnectionConfigs, options *protos.SyncFlowOptions, sessionID string, -) (*model.SyncCompositeResponse, error) { +) (*model.SyncResponse, error) { return syncCore(ctx, a, config, options, sessionID, nil, connectors.CDCPullPgConnector.PullPg, connectors.CDCSyncPgConnector.SyncPg) @@ -394,7 +423,7 @@ func (a *FlowableActivity) SyncPg( func (a *FlowableActivity) StartNormalize( ctx context.Context, input *protos.StartNormalizeInput, -) (*model.NormalizeResponse, error) { +) (model.NormalizeResponse, error) { conn := input.FlowConnectionConfigs ctx = context.WithValue(ctx, shared.FlowNameKey, conn.FlowJobName) logger := activity.GetLogger(ctx) @@ -406,9 +435,10 @@ func (a *FlowableActivity) StartNormalize( conn.DestinationName, ) if errors.Is(err, errors.ErrUnsupported) { - return nil, monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, input.SyncBatchID) + return model.NormalizeResponse{}, + monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, input.SyncBatchID) } else if err != nil { - return nil, fmt.Errorf("failed to get normalize connector: %w", err) + return model.NormalizeResponse{}, fmt.Errorf("failed to get normalize connector: %w", err) } defer connectors.CloseConnector(ctx, dstConn) @@ -419,7 +449,7 @@ func (a *FlowableActivity) StartNormalize( tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, input.FlowConnectionConfigs.FlowJobName) if err != nil { - return nil, fmt.Errorf("failed to get table name schema mapping: %w", err) + return model.NormalizeResponse{}, fmt.Errorf("failed to get table name schema mapping: %w", err) } res, err := dstConn.NormalizeRecords(ctx, &model.NormalizeRecordsRequest{ @@ -433,17 +463,20 @@ func (a *FlowableActivity) StartNormalize( }) if err != nil { a.Alerter.LogFlowError(ctx, input.FlowConnectionConfigs.FlowJobName, err) - return nil, fmt.Errorf("failed to normalized records: %w", err) + return model.NormalizeResponse{}, fmt.Errorf("failed to normalized records: %w", err) } dstType, err := connectors.LoadPeerType(ctx, a.CatalogPool, input.FlowConnectionConfigs.DestinationName) if err != nil { - return nil, fmt.Errorf("failed to get peer type: %w", err) + return model.NormalizeResponse{}, fmt.Errorf("failed to get peer type: %w", err) } if dstType == protos.DBType_POSTGRES { - err = monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, - input.SyncBatchID) - if err != nil { - return nil, fmt.Errorf("failed to update end time for cdc batch: %w", err) + if err := monitoring.UpdateEndTimeForCDCBatch( + ctx, + a.CatalogPool, + input.FlowConnectionConfigs.FlowJobName, + input.SyncBatchID, + ); err != nil { + return model.NormalizeResponse{}, fmt.Errorf("failed to update end time for cdc batch: %w", err) } } diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index a9f58a5f3f..40285423f9 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -3,6 +3,7 @@ package activities import ( "context" + "errors" "fmt" "log/slog" "reflect" @@ -50,7 +51,9 @@ func heartbeatRoutine( ) } -func waitForCdcCache[TPull connectors.CDCPullConnectorCore](ctx context.Context, a *FlowableActivity, sessionID string) (TPull, error) { +func waitForCdcCache[TPull connectors.CDCPullConnectorCore]( + ctx context.Context, a *FlowableActivity, sessionID string, +) (TPull, chan NormalizeBatchRequest, error) { var none TPull logger := activity.GetLogger(ctx) attempt := 0 @@ -63,9 +66,9 @@ func waitForCdcCache[TPull connectors.CDCPullConnectorCore](ctx context.Context, a.CdcCacheRw.RUnlock() if ok { if conn, ok := entry.connector.(TPull); ok { - return conn, nil + return conn, entry.normalize, nil } - return none, fmt.Errorf("expected %s, cache held %T", reflect.TypeFor[TPull]().Name(), entry.connector) + return none, nil, fmt.Errorf("expected %s, cache held %T", reflect.TypeFor[TPull]().Name(), entry.connector) } activity.RecordHeartbeat(ctx, fmt.Sprintf("wait %s for source connector", waitInterval)) attempt += 1 @@ -74,7 +77,7 @@ func waitForCdcCache[TPull connectors.CDCPullConnectorCore](ctx context.Context, slog.Int("attempt", attempt), slog.String("sessionID", sessionID)) } if err := ctx.Err(); err != nil { - return none, err + return none, nil, err } time.Sleep(waitInterval) if attempt == 300 { @@ -116,7 +119,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon adaptStream func(*model.CDCStream[Items]) (*model.CDCStream[Items], error), pull func(TPull, context.Context, *pgxpool.Pool, *otel_metrics.OtelManager, *model.PullRecordsRequest[Items]) error, sync func(TSync, context.Context, *model.SyncRecordsRequest[Items]) (*model.SyncResponse, error), -) (*model.SyncCompositeResponse, error) { +) (*model.SyncResponse, error) { flowName := config.FlowJobName ctx = context.WithValue(ctx, shared.FlowNameKey, flowName) logger := activity.GetLogger(ctx) @@ -130,7 +133,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude) } - srcConn, err := waitForCdcCache[TPull](ctx, a, sessionID) + srcConn, normChan, err := waitForCdcCache[TPull](ctx, a, sessionID) if err != nil { return nil, err } @@ -229,12 +232,9 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon return nil, fmt.Errorf("failed to sync schema: %w", err) } - return &model.SyncCompositeResponse{ - SyncResponse: &model.SyncResponse{ - CurrentSyncBatchID: -1, - TableSchemaDeltas: recordBatchSync.SchemaDeltas, - }, - NeedsNormalize: false, + return &model.SyncResponse{ + CurrentSyncBatchID: -1, + TableSchemaDeltas: recordBatchSync.SchemaDeltas, }, nil } @@ -326,10 +326,29 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon activity.RecordHeartbeat(ctx, pushedRecordsWithCount) a.Alerter.LogFlowInfo(ctx, flowName, pushedRecordsWithCount) - return &model.SyncCompositeResponse{ - SyncResponse: res, - NeedsNormalize: recordBatchSync.NeedsNormalize(), - }, nil + if recordBatchSync.NeedsNormalize() { + parallel, err := peerdbenv.PeerDBEnableParallelSyncNormalize(ctx, config.Env) + if err != nil { + return nil, err + } + var done chan model.NormalizeResponse + if !parallel { + done = make(chan model.NormalizeResponse) + } + normChan <- NormalizeBatchRequest{ + BatchID: res.CurrentSyncBatchID, + Done: done, + } + if done != nil { + if normRes, ok := <-done; !ok { + return nil, errors.New("failed to normalize") + } else { + a.Alerter.LogFlowInfo(ctx, flowName, fmt.Sprintf("normalized from %d to %d", normRes.StartBatchID, normRes.EndBatchID)) + } + } + } + + return res, nil } func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) { diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index d6504322ca..7523b6fa8e 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -383,17 +383,17 @@ func (c *BigQueryConnector) syncRecordsViaAvro( // NormalizeRecords normalizes raw table to destination table, // one batch at a time from the previous normalized batch to the currently synced batch. -func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) { +func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.NormalizeRecordsRequest) (model.NormalizeResponse, error) { rawTableName := c.getRawTableName(req.FlowJobName) normBatchID, err := c.GetLastNormalizeBatchID(ctx, req.FlowJobName) if err != nil { - return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err) + return model.NormalizeResponse{}, fmt.Errorf("failed to get batch for the current mirror: %v", err) } // normalize has caught up with sync, chill until more records are loaded. if normBatchID >= req.SyncBatchID { - return &model.NormalizeResponse{ + return model.NormalizeResponse{ Done: false, StartBatchID: normBatchID, EndBatchID: req.SyncBatchID, @@ -408,16 +408,16 @@ func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.Nor SyncedAtColName: req.SyncedAtColName, }) if mergeErr != nil { - return nil, mergeErr + return model.NormalizeResponse{}, mergeErr } err = c.UpdateNormalizeBatchID(ctx, req.FlowJobName, batchId) if err != nil { - return nil, err + return model.NormalizeResponse{}, err } } - return &model.NormalizeResponse{ + return model.NormalizeResponse{ Done: true, StartBatchID: normBatchID + 1, EndBatchID: req.SyncBatchID, diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index fabe07a35f..ab630b0543 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -231,19 +231,19 @@ func getOrderedOrderByColumns( func (c *ClickHouseConnector) NormalizeRecords( ctx context.Context, req *model.NormalizeRecordsRequest, -) (*model.NormalizeResponse, error) { +) (model.NormalizeResponse, error) { // fix for potential consistency issues time.Sleep(3 * time.Second) normBatchID, err := c.GetLastNormalizeBatchID(ctx, req.FlowJobName) if err != nil { c.logger.Error("[clickhouse] error while getting last sync and normalize batch id", "error", err) - return nil, err + return model.NormalizeResponse{}, err } // normalize has caught up with sync, chill until more records are loaded. if normBatchID >= req.SyncBatchID { - return &model.NormalizeResponse{ + return model.NormalizeResponse{ Done: false, StartBatchID: normBatchID, EndBatchID: req.SyncBatchID, @@ -251,7 +251,7 @@ func (c *ClickHouseConnector) NormalizeRecords( } if err := c.copyAvroStagesToDestination(ctx, req.FlowJobName, normBatchID, req.SyncBatchID); err != nil { - return nil, fmt.Errorf("failed to copy avro stages to destination: %w", err) + return model.NormalizeResponse{}, fmt.Errorf("failed to copy avro stages to destination: %w", err) } destinationTableNames, err := c.getDistinctTableNamesInBatch( @@ -262,17 +262,17 @@ func (c *ClickHouseConnector) NormalizeRecords( ) if err != nil { c.logger.Error("[clickhouse] error while getting distinct table names in batch", "error", err) - return nil, err + return model.NormalizeResponse{}, err } enablePrimaryUpdate, err := peerdbenv.PeerDBEnableClickHousePrimaryUpdate(ctx, req.Env) if err != nil { - return nil, err + return model.NormalizeResponse{}, err } parallelNormalize, err := peerdbenv.PeerDBClickHouseParallelNormalize(ctx, req.Env) if err != nil { - return nil, err + return model.NormalizeResponse{}, err } parallelNormalize = min(max(parallelNormalize, 1), len(destinationTableNames)) if parallelNormalize > 1 { @@ -357,7 +357,7 @@ func (c *ClickHouseConnector) NormalizeRecords( clickHouseType, err = colType.ToDWHColumnType(ctx, req.Env, protos.DBType_CLICKHOUSE, column) if err != nil { close(queries) - return nil, fmt.Errorf("error while converting column type to clickhouse type: %w", err) + return model.NormalizeResponse{}, fmt.Errorf("error while converting column type to clickhouse type: %w", err) } if (schema.NullableEnabled || columnNullableEnabled) && column.Nullable && !colType.IsArray() { @@ -459,20 +459,20 @@ func (c *ClickHouseConnector) NormalizeRecords( c.logger.Error("[clickhouse] context canceled while normalizing", slog.Any("error", errCtx.Err()), slog.Any("cause", context.Cause(errCtx))) - return nil, context.Cause(errCtx) + return model.NormalizeResponse{}, context.Cause(errCtx) } } close(queries) if err := group.Wait(); err != nil { - return nil, err + return model.NormalizeResponse{}, err } if err := c.UpdateNormalizeBatchID(ctx, req.FlowJobName, req.SyncBatchID); err != nil { c.logger.Error("[clickhouse] error while updating normalize batch id", slog.Int64("BatchID", req.SyncBatchID), slog.Any("error", err)) - return nil, err + return model.NormalizeResponse{}, err } - return &model.NormalizeResponse{ + return model.NormalizeResponse{ Done: true, StartBatchID: normBatchID + 1, EndBatchID: req.SyncBatchID, diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 0991a50978..040b824b2b 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -198,7 +198,7 @@ type CDCNormalizeConnector interface { // NormalizeRecords merges records pushed earlier into the destination table. // This method should be idempotent, and should be able to be called multiple times with the same request. - NormalizeRecords(ctx context.Context, req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) + NormalizeRecords(ctx context.Context, req *model.NormalizeRecordsRequest) (model.NormalizeResponse, error) } type CreateTablesFromExistingConnector interface { diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 435df65ed4..f17269b5ab 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -611,31 +611,31 @@ func syncRecordsCore[Items model.Items]( func (c *PostgresConnector) NormalizeRecords( ctx context.Context, req *model.NormalizeRecordsRequest, -) (*model.NormalizeResponse, error) { +) (model.NormalizeResponse, error) { rawTableIdentifier := getRawTableIdentifier(req.FlowJobName) jobMetadataExists, err := c.jobMetadataExists(ctx, req.FlowJobName) if err != nil { - return nil, err + return model.NormalizeResponse{}, err } // no SyncFlow has run, chill until more records are loaded. if !jobMetadataExists { c.logger.Info("no metadata found for mirror") - return &model.NormalizeResponse{ + return model.NormalizeResponse{ Done: false, }, nil } normBatchID, err := c.GetLastNormalizeBatchID(ctx, req.FlowJobName) if err != nil { - return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err) + return model.NormalizeResponse{}, fmt.Errorf("failed to get batch for the current mirror: %v", err) } // normalize has caught up with sync, chill until more records are loaded. if normBatchID >= req.SyncBatchID { c.logger.Info(fmt.Sprintf("no records to normalize: syncBatchID %d, normalizeBatchID %d", req.SyncBatchID, normBatchID)) - return &model.NormalizeResponse{ + return model.NormalizeResponse{ Done: false, StartBatchID: normBatchID, EndBatchID: req.SyncBatchID, @@ -645,23 +645,23 @@ func (c *PostgresConnector) NormalizeRecords( destinationTableNames, err := c.getDistinctTableNamesInBatch( ctx, req.FlowJobName, req.SyncBatchID, normBatchID) if err != nil { - return nil, err + return model.NormalizeResponse{}, err } unchangedToastColumnsMap, err := c.getTableNametoUnchangedCols(ctx, req.FlowJobName, req.SyncBatchID, normBatchID) if err != nil { - return nil, err + return model.NormalizeResponse{}, err } normalizeRecordsTx, err := c.conn.Begin(ctx) if err != nil { - return nil, fmt.Errorf("error starting transaction for normalizing records: %w", err) + return model.NormalizeResponse{}, fmt.Errorf("error starting transaction for normalizing records: %w", err) } defer shared.RollbackTx(normalizeRecordsTx, c.logger) pgversion, err := c.MajorVersion(ctx) if err != nil { - return nil, err + return model.NormalizeResponse{}, err } totalRowsAffected := 0 normalizeStmtGen := normalizeStmtGenerator{ @@ -689,7 +689,7 @@ func (c *PostgresConnector) NormalizeRecords( slog.String("destinationTableName", destinationTableName), slog.Any("error", err), ) - return nil, fmt.Errorf("error executing normalize statement for table %s: %w", destinationTableName, err) + return model.NormalizeResponse{}, fmt.Errorf("error executing normalize statement for table %s: %w", destinationTableName, err) } totalRowsAffected += int(ct.RowsAffected()) } @@ -699,15 +699,15 @@ func (c *PostgresConnector) NormalizeRecords( // updating metadata with new normalizeBatchID err = c.updateNormalizeMetadata(ctx, req.FlowJobName, req.SyncBatchID, normalizeRecordsTx) if err != nil { - return nil, err + return model.NormalizeResponse{}, err } // transaction commits err = normalizeRecordsTx.Commit(ctx) if err != nil { - return nil, err + return model.NormalizeResponse{}, err } - return &model.NormalizeResponse{ + return model.NormalizeResponse{ Done: true, StartBatchID: normBatchID + 1, EndBatchID: req.SyncBatchID, diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 124560e90e..3fadbff670 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -475,16 +475,16 @@ func (c *SnowflakeConnector) syncRecordsViaAvro( } // NormalizeRecords normalizes raw table to destination table. -func (c *SnowflakeConnector) NormalizeRecords(ctx context.Context, req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) { +func (c *SnowflakeConnector) NormalizeRecords(ctx context.Context, req *model.NormalizeRecordsRequest) (model.NormalizeResponse, error) { ctx = c.withMirrorNameQueryTag(ctx, req.FlowJobName) normBatchID, err := c.GetLastNormalizeBatchID(ctx, req.FlowJobName) if err != nil { - return nil, err + return model.NormalizeResponse{}, err } // normalize has caught up with sync, chill until more records are loaded. if normBatchID >= req.SyncBatchID { - return &model.NormalizeResponse{ + return model.NormalizeResponse{ Done: false, StartBatchID: normBatchID, EndBatchID: req.SyncBatchID, @@ -501,16 +501,16 @@ func (c *SnowflakeConnector) NormalizeRecords(ctx context.Context, req *model.No }, ) if mergeErr != nil { - return nil, mergeErr + return model.NormalizeResponse{}, mergeErr } err = c.UpdateNormalizeBatchID(ctx, req.FlowJobName, batchId) if err != nil { - return nil, err + return model.NormalizeResponse{}, err } } - return &model.NormalizeResponse{ + return model.NormalizeResponse{ Done: true, StartBatchID: normBatchID + 1, EndBatchID: req.SyncBatchID, diff --git a/flow/model/signals.go b/flow/model/signals.go index 7e98ab343f..31a89ff778 100644 --- a/flow/model/signals.go +++ b/flow/model/signals.go @@ -138,11 +138,3 @@ var CDCDynamicPropertiesSignal = TypedSignal[*protos.CDCFlowConfigUpdate]{ var SyncStopSignal = TypedSignal[struct{}]{ Name: "sync-stop", } - -var NormalizeSignal = TypedSignal[NormalizePayload]{ - Name: "normalize", -} - -var NormalizeDoneSignal = TypedSignal[struct{}]{ - Name: "normalize-done", -} diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 0c97af9b7d..60d42e0243 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -331,7 +331,6 @@ func CDCFlowWorkflow( mirrorNameSearch := shared.NewSearchAttributes(cfg.FlowJobName) - var syncCountLimit int if state.ActiveSignal == model.PauseSignal { selector := workflow.NewNamedSelector(ctx, "PauseLoop") selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) @@ -356,7 +355,6 @@ func CDCFlowWorkflow( if err := processCDCFlowConfigUpdate(ctx, logger, cfg, state, mirrorNameSearch); err != nil { return state, err } - syncCountLimit = int(state.SyncFlowOptions.NumberOfSyncs) logger.Info("wiping flow state after state update processing") // finished processing, wipe it state.FlowConfigUpdate = nil @@ -492,10 +490,8 @@ func CDCFlowWorkflow( } syncFlowID := GetChildWorkflowID("sync-flow", cfg.FlowJobName, originalRunID) - normalizeFlowID := GetChildWorkflowID("normalize-flow", cfg.FlowJobName, originalRunID) var restart, finished bool - syncCount := 0 syncFlowOpts := workflow.ChildWorkflowOptions{ WorkflowID: syncFlowID, ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, @@ -507,17 +503,6 @@ func CDCFlowWorkflow( } syncCtx := workflow.WithChildOptions(ctx, syncFlowOpts) - normalizeFlowOpts := workflow.ChildWorkflowOptions{ - WorkflowID: normalizeFlowID, - ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, - RetryPolicy: &temporal.RetryPolicy{ - MaximumAttempts: 20, - }, - TypedSearchAttributes: mirrorNameSearch, - WaitForCancellation: true, - } - normCtx := workflow.WithChildOptions(ctx, normalizeFlowOpts) - handleError := func(name string, err error) { var panicErr *temporal.PanicError if errors.As(err, &panicErr) { @@ -533,7 +518,6 @@ func CDCFlowWorkflow( } syncFlowFuture := workflow.ExecuteChildWorkflow(syncCtx, SyncFlowWorkflow, cfg, state.SyncFlowOptions) - normFlowFuture := workflow.ExecuteChildWorkflow(normCtx, NormalizeFlowWorkflow, cfg, nil) mainLoopSelector := workflow.NewNamedSelector(ctx, "MainLoop") mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) @@ -545,58 +529,12 @@ func CDCFlowWorkflow( logger.Info("sync finished, finishing normalize") syncFlowFuture = nil restart = true - if normFlowFuture != nil { - err := model.NormalizeSignal.SignalChildWorkflow(ctx, normFlowFuture, model.NormalizePayload{ - Done: true, - SyncBatchID: -1, - }).Get(ctx, nil) - if err != nil { - logger.Warn("failed to signal normalize done, finishing", slog.Any("error", err)) - finished = true - } - } - }) - mainLoopSelector.AddFuture(normFlowFuture, func(f workflow.Future) { - err := f.Get(ctx, nil) - if err != nil { - handleError("normalize", err) - } - - logger.Info("normalize finished, finishing") - normFlowFuture = nil - restart = true - finished = true }) flowSignalChan.AddToSelector(mainLoopSelector, func(val model.CDCFlowSignal, _ bool) { state.ActiveSignal = model.FlowSignalHandler(state.ActiveSignal, val, logger) }) - normChan := model.NormalizeSignal.GetSignalChannel(ctx) - normChan.AddToSelector(mainLoopSelector, func(payload model.NormalizePayload, _ bool) { - if normFlowFuture != nil { - _ = model.NormalizeSignal.SignalChildWorkflow(ctx, normFlowFuture, payload).Get(ctx, nil) - } - }) - - parallel := getParallelSyncNormalize(ctx, logger, cfg.Env) - if !parallel { - normDoneChan := model.NormalizeDoneSignal.GetSignalChannel(ctx) - normDoneChan.Drain() - normDoneChan.AddToSelector(mainLoopSelector, func(x struct{}, _ bool) { - syncCount += 1 - if syncCount == syncCountLimit { - logger.Info("sync count limit reached, pausing", - slog.Int("limit", syncCountLimit), - slog.Int("count", syncCount)) - state.ActiveSignal = model.PauseSignal - } - if syncFlowFuture != nil { - _ = model.NormalizeDoneSignal.SignalChildWorkflow(ctx, syncFlowFuture, x).Get(ctx, nil) - } - }) - } - addCdcPropertiesSignalListener(ctx, logger, mainLoopSelector, state) state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go deleted file mode 100644 index 3211e6ca06..0000000000 --- a/flow/workflows/normalize_flow.go +++ /dev/null @@ -1,121 +0,0 @@ -package peerflow - -import ( - "log/slog" - "time" - - "go.temporal.io/sdk/log" - "go.temporal.io/sdk/workflow" - - "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/shared" -) - -type NormalizeState struct { - LastSyncBatchID int64 - SyncBatchID int64 - Wait bool - Stop bool -} - -func NewNormalizeState() *NormalizeState { - return &NormalizeState{ - LastSyncBatchID: -1, - SyncBatchID: -1, - Wait: true, - Stop: false, - } -} - -// returns whether workflow should finish -// signals are flushed when ProcessLoop returns -func ProcessLoop(ctx workflow.Context, logger log.Logger, selector workflow.Selector, state *NormalizeState) bool { - for ctx.Err() == nil && selector.HasPending() { - selector.Select(ctx) - } - - if ctx.Err() != nil { - logger.Info("normalize canceled") - return true - } else if state.Stop && state.LastSyncBatchID == state.SyncBatchID { - logger.Info("normalize finished") - return true - } - return false -} - -func NormalizeFlowWorkflow( - ctx workflow.Context, - config *protos.FlowConnectionConfigs, - state *NormalizeState, -) error { - parent := workflow.GetInfo(ctx).ParentWorkflowExecution - logger := log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)) - - if state == nil { - state = NewNormalizeState() - } - - normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 7 * 24 * time.Hour, - HeartbeatTimeout: time.Minute, - }) - - selector := workflow.NewNamedSelector(ctx, "NormalizeLoop") - selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) - model.NormalizeSignal.GetSignalChannel(ctx).AddToSelector(selector, func(s model.NormalizePayload, _ bool) { - if s.Done { - state.Stop = true - } - if s.SyncBatchID > state.SyncBatchID { - state.SyncBatchID = s.SyncBatchID - } - - state.Wait = false - }) - - for state.Wait && ctx.Err() == nil { - selector.Select(ctx) - } - if ProcessLoop(ctx, logger, selector, state) { - return ctx.Err() - } - - if state.LastSyncBatchID != state.SyncBatchID { - state.LastSyncBatchID = state.SyncBatchID - - logger.Info("executing normalize") - startNormalizeInput := &protos.StartNormalizeInput{ - FlowConnectionConfigs: config, - SyncBatchID: state.SyncBatchID, - } - fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput) - - var normalizeResponse *model.NormalizeResponse - if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil { - logger.Info("Normalize errored", slog.Any("error", err)) - } else if normalizeResponse != nil { - logger.Info("Normalize finished", slog.Any("result", normalizeResponse)) - } - } - - if ctx.Err() == nil && !state.Stop { - parallel := getParallelSyncNormalize(ctx, logger, config.Env) - - if !parallel { - _ = model.NormalizeDoneSignal.SignalExternalWorkflow( - ctx, - parent.ID, - "", - struct{}{}, - ).Get(ctx, nil) - } - } - - state.Wait = true - if ProcessLoop(ctx, logger, selector, state) { - return ctx.Err() - } - return workflow.NewContinueAsNewError(ctx, NormalizeFlowWorkflow, config, state) -} diff --git a/flow/workflows/register.go b/flow/workflows/register.go index 2c4b32ba3c..4c458319cc 100644 --- a/flow/workflows/register.go +++ b/flow/workflows/register.go @@ -7,7 +7,6 @@ import ( func RegisterFlowWorkerWorkflows(w worker.WorkflowRegistry) { w.RegisterWorkflow(CDCFlowWorkflow) w.RegisterWorkflow(DropFlowWorkflow) - w.RegisterWorkflow(NormalizeFlowWorkflow) w.RegisterWorkflow(SetupFlowWorkflow) w.RegisterWorkflow(SyncFlowWorkflow) w.RegisterWorkflow(QRepFlowWorkflow) diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 8b00364dd2..eff4607d05 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -18,7 +18,6 @@ func SyncFlowWorkflow( config *protos.FlowConnectionConfigs, options *protos.SyncFlowOptions, ) error { - parent := workflow.GetInfo(ctx).ParentWorkflowExecution logger := log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)) sessionOptions := &workflow.SessionOptions{ @@ -70,9 +69,6 @@ func SyncFlowWorkflow( if !parallel { waitSelector = workflow.NewNamedSelector(ctx, "NormalizeWait") waitSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) - waitChan := model.NormalizeDoneSignal.GetSignalChannel(ctx) - waitChan.Drain() - waitChan.AddToSelector(waitSelector, func(_ struct{}, _ bool) {}) stopChan.AddToSelector(waitSelector, func(_ struct{}, _ bool) { stop = true }) @@ -99,19 +95,19 @@ func SyncFlowWorkflow( selector.AddFuture(syncFlowFuture, func(f workflow.Future) { syncDone = true - var childSyncFlowRes *model.SyncCompositeResponse + var childSyncFlowRes *model.SyncResponse if err := f.Get(ctx, &childSyncFlowRes); err != nil { logger.Error("failed to execute sync flow", slog.Any("error", err)) syncErr = true } else if childSyncFlowRes != nil { - totalRecordsSynced += childSyncFlowRes.SyncResponse.NumRecordsSynced + totalRecordsSynced += childSyncFlowRes.NumRecordsSynced logger.Info("Total records synced", slog.Int64("totalRecordsSynced", totalRecordsSynced)) // slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes. - tableSchemaDeltasCount := len(childSyncFlowRes.SyncResponse.TableSchemaDeltas) + tableSchemaDeltasCount := len(childSyncFlowRes.TableSchemaDeltas) if tableSchemaDeltasCount > 0 { modifiedSrcTables := make([]string, 0, tableSchemaDeltasCount) - for _, tableSchemaDelta := range childSyncFlowRes.SyncResponse.TableSchemaDeltas { + for _, tableSchemaDelta := range childSyncFlowRes.TableSchemaDeltas { modifiedSrcTables = append(modifiedSrcTables, tableSchemaDelta.SrcTableName) } @@ -132,23 +128,7 @@ func SyncFlowWorkflow( } } - if childSyncFlowRes.NeedsNormalize { - err := model.NormalizeSignal.SignalExternalWorkflow( - ctx, - parent.ID, - "", - model.NormalizePayload{ - Done: false, - SyncBatchID: childSyncFlowRes.SyncResponse.CurrentSyncBatchID, - }, - ).Get(ctx, nil) - if err != nil { - logger.Error("failed to trigger normalize, so skip wait", slog.Any("error", err)) - mustWait = false - } - } else { - mustWait = false - } + mustWait = false } else { mustWait = false }