diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index ed9d2b720d..d833715728 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -38,9 +38,16 @@ type CheckConnectionResult struct { NeedsSetupMetadataTables bool } +type NormalizeBatchRequest struct { + Done chan struct{} + BatchID int64 +} + type CdcCacheEntry struct { - connector connectors.CDCPullConnectorCore - done chan struct{} + connector connectors.CDCPullConnectorCore + syncDone chan struct{} + normalize chan NormalizeBatchRequest + normalizeDone chan struct{} } type FlowableActivity struct { @@ -296,17 +303,32 @@ func (a *FlowableActivity) MaintainPull( return err } - done := make(chan struct{}) + normalizeBufferSize, err := peerdbenv.PeerDBNormalizeChannelBufferSize(ctx, config.Env) + if err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + return err + } + + // syncDone will be closed by UnmaintainPull, + // whereas normalizeDone will be closed by the normalize goroutine + // Wait on normalizeDone at end to not interrupt final normalize + syncDone := make(chan struct{}) + normalize := make(chan NormalizeBatchRequest, normalizeBufferSize) + normalizeDone := make(chan struct{}) a.CdcCacheRw.Lock() a.CdcCache[sessionID] = CdcCacheEntry{ - connector: srcConn, - done: done, + connector: srcConn, + syncDone: syncDone, + normalize: normalize, + normalizeDone: normalizeDone, } a.CdcCacheRw.Unlock() ticker := time.NewTicker(15 * time.Second) defer ticker.Stop() + go a.normalizeLoop(ctx, config, syncDone, normalize, normalizeDone) + for { select { case <-ticker.C: @@ -318,7 +340,7 @@ func (a *FlowableActivity) MaintainPull( a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", err) } - case <-done: + case <-syncDone: return nil case <-ctx.Done(): a.CdcCacheRw.Lock() @@ -330,12 +352,15 @@ func (a *FlowableActivity) MaintainPull( } func (a *FlowableActivity) UnmaintainPull(ctx context.Context, sessionID string) error { + var normalizeDone chan struct{} a.CdcCacheRw.Lock() if entry, ok := a.CdcCache[sessionID]; ok { - close(entry.done) + close(entry.syncDone) delete(a.CdcCache, sessionID) + normalizeDone = entry.normalizeDone } a.CdcCacheRw.Unlock() + <-normalizeDone return nil } @@ -344,7 +369,7 @@ func (a *FlowableActivity) SyncRecords( config *protos.FlowConnectionConfigs, options *protos.SyncFlowOptions, sessionID string, -) (*model.SyncCompositeResponse, error) { +) (model.SyncRecordsResult, error) { var adaptStream func(stream *model.CDCStream[model.RecordItems]) (*model.CDCStream[model.RecordItems], error) if config.Script != "" { var onErr context.CancelCauseFunc @@ -375,9 +400,10 @@ func (a *FlowableActivity) SyncRecords( return stream, nil } } - return syncCore(ctx, a, config, options, sessionID, adaptStream, + numRecords, err := syncCore(ctx, a, config, options, sessionID, adaptStream, connectors.CDCPullConnector.PullRecords, connectors.CDCSyncConnector.SyncRecords) + return model.SyncRecordsResult{NumRecordsSynced: numRecords}, err } func (a *FlowableActivity) SyncPg( @@ -385,7 +411,7 @@ func (a *FlowableActivity) SyncPg( config *protos.FlowConnectionConfigs, options *protos.SyncFlowOptions, sessionID string, -) (*model.SyncCompositeResponse, error) { +) (int64, error) { return syncCore(ctx, a, config, options, sessionID, nil, connectors.CDCPullPgConnector.PullPg, connectors.CDCSyncPgConnector.SyncPg) @@ -393,22 +419,22 @@ func (a *FlowableActivity) SyncPg( func (a *FlowableActivity) StartNormalize( ctx context.Context, - input *protos.StartNormalizeInput, -) (*model.NormalizeResponse, error) { - conn := input.FlowConnectionConfigs - ctx = context.WithValue(ctx, shared.FlowNameKey, conn.FlowJobName) + config *protos.FlowConnectionConfigs, + batchID int64, +) error { + ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) logger := activity.GetLogger(ctx) dstConn, err := connectors.GetByNameAs[connectors.CDCNormalizeConnector]( ctx, - input.FlowConnectionConfigs.Env, + config.Env, a.CatalogPool, - conn.DestinationName, + config.DestinationName, ) if errors.Is(err, errors.ErrUnsupported) { - return nil, monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, input.SyncBatchID) + return monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, config.FlowJobName, batchID) } else if err != nil { - return nil, fmt.Errorf("failed to get normalize connector: %w", err) + return fmt.Errorf("failed to get normalize connector: %w", err) } defer connectors.CloseConnector(ctx, dstConn) @@ -417,41 +443,34 @@ func (a *FlowableActivity) StartNormalize( }) defer shutdown() - tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, input.FlowConnectionConfigs.FlowJobName) + tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, config.FlowJobName) if err != nil { - return nil, fmt.Errorf("failed to get table name schema mapping: %w", err) + return fmt.Errorf("failed to get table name schema mapping: %w", err) } + logger.Info("Normalizing batch", slog.Int64("SyncBatchID", batchID)) res, err := dstConn.NormalizeRecords(ctx, &model.NormalizeRecordsRequest{ - FlowJobName: input.FlowConnectionConfigs.FlowJobName, - Env: input.FlowConnectionConfigs.Env, + FlowJobName: config.FlowJobName, + Env: config.Env, TableNameSchemaMapping: tableNameSchemaMapping, - TableMappings: input.FlowConnectionConfigs.TableMappings, - SyncBatchID: input.SyncBatchID, - SoftDeleteColName: input.FlowConnectionConfigs.SoftDeleteColName, - SyncedAtColName: input.FlowConnectionConfigs.SyncedAtColName, + TableMappings: config.TableMappings, + SoftDeleteColName: config.SoftDeleteColName, + SyncedAtColName: config.SyncedAtColName, + SyncBatchID: batchID, }) if err != nil { - a.Alerter.LogFlowError(ctx, input.FlowConnectionConfigs.FlowJobName, err) - return nil, fmt.Errorf("failed to normalized records: %w", err) - } - dstType, err := connectors.LoadPeerType(ctx, a.CatalogPool, input.FlowConnectionConfigs.DestinationName) - if err != nil { - return nil, fmt.Errorf("failed to get peer type: %w", err) + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + return fmt.Errorf("failed to normalized records: %w", err) } - if dstType == protos.DBType_POSTGRES { - err = monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, - input.SyncBatchID) - if err != nil { - return nil, fmt.Errorf("failed to update end time for cdc batch: %w", err) + if _, dstPg := dstConn.(*connpostgres.PostgresConnector); dstPg { + if err := monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, config.FlowJobName, batchID); err != nil { + return fmt.Errorf("failed to update end time for cdc batch: %w", err) } } - // log the number of batches normalized - logger.Info(fmt.Sprintf("normalized records from batch %d to batch %d", - res.StartBatchID, res.EndBatchID)) + logger.Info("normalized batches", slog.Int64("StartBatchID", res.StartBatchID), slog.Int64("EndBatchID", res.EndBatchID)) - return res, nil + return nil } // SetupQRepMetadataTables sets up the metadata tables for QReplication. diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index a9f58a5f3f..b9bad18036 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -50,7 +50,9 @@ func heartbeatRoutine( ) } -func waitForCdcCache[TPull connectors.CDCPullConnectorCore](ctx context.Context, a *FlowableActivity, sessionID string) (TPull, error) { +func waitForCdcCache[TPull connectors.CDCPullConnectorCore]( + ctx context.Context, a *FlowableActivity, sessionID string, +) (TPull, chan NormalizeBatchRequest, error) { var none TPull logger := activity.GetLogger(ctx) attempt := 0 @@ -63,9 +65,9 @@ func waitForCdcCache[TPull connectors.CDCPullConnectorCore](ctx context.Context, a.CdcCacheRw.RUnlock() if ok { if conn, ok := entry.connector.(TPull); ok { - return conn, nil + return conn, entry.normalize, nil } - return none, fmt.Errorf("expected %s, cache held %T", reflect.TypeFor[TPull]().Name(), entry.connector) + return none, nil, fmt.Errorf("expected %s, cache held %T", reflect.TypeFor[TPull]().Name(), entry.connector) } activity.RecordHeartbeat(ctx, fmt.Sprintf("wait %s for source connector", waitInterval)) attempt += 1 @@ -74,7 +76,7 @@ func waitForCdcCache[TPull connectors.CDCPullConnectorCore](ctx context.Context, slog.Int("attempt", attempt), slog.String("sessionID", sessionID)) } if err := ctx.Err(); err != nil { - return none, err + return none, nil, err } time.Sleep(waitInterval) if attempt == 300 { @@ -107,6 +109,34 @@ func (a *FlowableActivity) getTableNameSchemaMapping(ctx context.Context, flowNa return tableNameSchemaMapping, nil } +func (a *FlowableActivity) applySchemaDeltas( + ctx context.Context, + config *protos.FlowConnectionConfigs, + options *protos.SyncFlowOptions, + schemaDeltas []*protos.TableSchemaDelta, +) error { + tableSchemaDeltasCount := len(schemaDeltas) + if tableSchemaDeltasCount > 0 { + modifiedSrcTables := make([]string, 0, tableSchemaDeltasCount) + for _, tableSchemaDelta := range schemaDeltas { + modifiedSrcTables = append(modifiedSrcTables, tableSchemaDelta.SrcTableName) + } + + if err := a.SetupTableSchema(ctx, &protos.SetupTableSchemaBatchInput{ + PeerName: config.SourceName, + TableIdentifiers: modifiedSrcTables, + TableMappings: options.TableMappings, + FlowName: config.FlowJobName, + System: config.System, + }); err != nil { + err = fmt.Errorf("failed to execute schema update at source: %w", err) + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + return err + } + } + return nil +} + func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncConnectorCore, Items model.Items]( ctx context.Context, a *FlowableActivity, @@ -116,7 +146,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon adaptStream func(*model.CDCStream[Items]) (*model.CDCStream[Items], error), pull func(TPull, context.Context, *pgxpool.Pool, *otel_metrics.OtelManager, *model.PullRecordsRequest[Items]) error, sync func(TSync, context.Context, *model.SyncRecordsRequest[Items]) (*model.SyncResponse, error), -) (*model.SyncCompositeResponse, error) { +) (int64, error) { flowName := config.FlowJobName ctx = context.WithValue(ctx, shared.FlowNameKey, flowName) logger := activity.GetLogger(ctx) @@ -130,12 +160,12 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude) } - srcConn, err := waitForCdcCache[TPull](ctx, a, sessionID) + srcConn, normChan, err := waitForCdcCache[TPull](ctx, a, sessionID) if err != nil { - return nil, err + return 0, err } if err := srcConn.ConnectionActive(ctx); err != nil { - return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil) + return 0, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil) } batchSize := options.BatchSize @@ -154,7 +184,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon }() if err != nil { a.Alerter.LogFlowError(ctx, flowName, err) - return nil, err + return 0, err } logger.Info("pulling records...", slog.Int64("LastOffset", lastOffset)) @@ -163,20 +193,20 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon channelBufferSize, err := peerdbenv.PeerDBCDCChannelBufferSize(ctx, config.Env) if err != nil { - return nil, fmt.Errorf("failed to get CDC channel buffer size: %w", err) + return 0, fmt.Errorf("failed to get CDC channel buffer size: %w", err) } - recordBatchPull := model.NewCDCStream[Items](int(channelBufferSize)) + recordBatchPull := model.NewCDCStream[Items](channelBufferSize) recordBatchSync := recordBatchPull if adaptStream != nil { var err error if recordBatchSync, err = adaptStream(recordBatchPull); err != nil { - return nil, err + return 0, err } } tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, flowName) if err != nil { - return nil, err + return 0, err } startTime := time.Now() @@ -212,30 +242,24 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon a.Alerter.LogFlowError(ctx, flowName, err) } if temporal.IsApplicationError(err) { - return nil, err + return 0, err } else { - return nil, fmt.Errorf("failed in pull records when: %w", err) + return 0, fmt.Errorf("failed in pull records when: %w", err) } } logger.Info("no records to push") dstConn, err := connectors.GetByNameAs[TSync](ctx, config.Env, a.CatalogPool, config.DestinationName) if err != nil { - return nil, fmt.Errorf("failed to recreate destination connector: %w", err) + return 0, fmt.Errorf("failed to recreate destination connector: %w", err) } defer connectors.CloseConnector(ctx, dstConn) if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, recordBatchSync.SchemaDeltas); err != nil { - return nil, fmt.Errorf("failed to sync schema: %w", err) + return 0, fmt.Errorf("failed to sync schema: %w", err) } - return &model.SyncCompositeResponse{ - SyncResponse: &model.SyncResponse{ - CurrentSyncBatchID: -1, - TableSchemaDeltas: recordBatchSync.SchemaDeltas, - }, - NeedsNormalize: false, - }, nil + return -1, a.applySchemaDeltas(ctx, config, options, recordBatchSync.SchemaDeltas) } var syncStartTime time.Time @@ -252,6 +276,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon return err } syncBatchID += 1 + logger.Info("begin pulling records for batch", slog.Int64("SyncBatchID", syncBatchID)) if err := monitoring.AddCDCBatchForFlow(errCtx, a.CatalogPool, flowName, monitoring.CDCBatchInfo{ BatchID: syncBatchID, @@ -279,6 +304,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon return fmt.Errorf("failed to push records: %w", err) } + logger.Info("finished pulling records for batch", slog.Int64("SyncBatchID", syncBatchID)) return nil }) @@ -289,47 +315,68 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon a.Alerter.LogFlowError(ctx, flowName, err) } if temporal.IsApplicationError(err) { - return nil, err + return 0, err } else { - return nil, fmt.Errorf("failed to pull records: %w", err) + return 0, fmt.Errorf("failed to pull records: %w", err) } } - numRecords := res.NumRecordsSynced syncDuration := time.Since(syncStartTime) - - logger.Info(fmt.Sprintf("pushed %d records in %d seconds", numRecords, int(syncDuration.Seconds()))) - lastCheckpoint := recordBatchSync.GetLastCheckpoint() srcConn.UpdateReplStateLastOffset(lastCheckpoint) if err := monitoring.UpdateNumRowsAndEndLSNForCDCBatch( - ctx, a.CatalogPool, flowName, res.CurrentSyncBatchID, uint32(numRecords), lastCheckpoint, + ctx, a.CatalogPool, flowName, res.CurrentSyncBatchID, uint32(res.NumRecordsSynced), lastCheckpoint, ); err != nil { a.Alerter.LogFlowError(ctx, flowName, err) - return nil, err + return 0, err } if err := monitoring.UpdateLatestLSNAtTargetForCDCFlow(ctx, a.CatalogPool, flowName, lastCheckpoint); err != nil { a.Alerter.LogFlowError(ctx, flowName, err) - return nil, err + return 0, err } if res.TableNameRowsMapping != nil { if err := monitoring.AddCDCBatchTablesForFlow( ctx, a.CatalogPool, flowName, res.CurrentSyncBatchID, res.TableNameRowsMapping, ); err != nil { - return nil, err + return 0, err } } - pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords) + pushedRecordsWithCount := fmt.Sprintf("pushed %d records for batch %d in %v", + res.NumRecordsSynced, res.CurrentSyncBatchID, syncDuration.Truncate(time.Second)) activity.RecordHeartbeat(ctx, pushedRecordsWithCount) a.Alerter.LogFlowInfo(ctx, flowName, pushedRecordsWithCount) - return &model.SyncCompositeResponse{ - SyncResponse: res, - NeedsNormalize: recordBatchSync.NeedsNormalize(), - }, nil + if err := a.applySchemaDeltas(ctx, config, options, res.TableSchemaDeltas); err != nil { + return 0, err + } + + if recordBatchSync.NeedsNormalize() { + parallel, err := peerdbenv.PeerDBEnableParallelSyncNormalize(ctx, config.Env) + if err != nil { + return 0, err + } + var done chan struct{} + if !parallel { + done = make(chan struct{}) + } + select { + case normChan <- NormalizeBatchRequest{BatchID: res.CurrentSyncBatchID, Done: done}: + case <-ctx.Done(): + return 0, nil + } + if done != nil { + select { + case <-done: + case <-ctx.Done(): + return 0, nil + } + } + } + + return res.NumRecordsSynced, nil } func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) { @@ -582,3 +629,51 @@ func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn return currentSnapshotXmin, nil } + +// Suitable to be run as goroutine +func (a *FlowableActivity) normalizeLoop( + ctx context.Context, + config *protos.FlowConnectionConfigs, + syncDone <-chan struct{}, + normalize <-chan NormalizeBatchRequest, + normalizeDone chan struct{}, +) { + defer close(normalizeDone) + logger := activity.GetLogger(ctx) + + for { + select { + case req := <-normalize: + retryLoop: + for { + if err := a.StartNormalize(ctx, config, req.BatchID); err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + for { + // update req to latest normalize request & retry + select { + case req = <-normalize: + case <-syncDone: + logger.Info("[normalize-loop] syncDone closed before retry") + return + case <-ctx.Done(): + logger.Info("[normalize-loop] context closed before retry") + return + default: + time.Sleep(30 * time.Second) + continue retryLoop + } + } + } else if req.Done != nil { + close(req.Done) + } + break + } + case <-syncDone: + logger.Info("[normalize-loop] syncDone closed") + return + case <-ctx.Done(): + logger.Info("[normalize-loop] context closed") + return + } + } +} diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 17a33b2cae..ffe0016445 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -388,18 +388,17 @@ func (c *BigQueryConnector) syncRecordsViaAvro( // NormalizeRecords normalizes raw table to destination table, // one batch at a time from the previous normalized batch to the currently synced batch. -func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) { +func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.NormalizeRecordsRequest) (model.NormalizeResponse, error) { rawTableName := c.getRawTableName(req.FlowJobName) normBatchID, err := c.GetLastNormalizeBatchID(ctx, req.FlowJobName) if err != nil { - return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err) + return model.NormalizeResponse{}, fmt.Errorf("failed to get batch for the current mirror: %v", err) } // normalize has caught up with sync, chill until more records are loaded. if normBatchID >= req.SyncBatchID { - return &model.NormalizeResponse{ - Done: false, + return model.NormalizeResponse{ StartBatchID: normBatchID, EndBatchID: req.SyncBatchID, }, nil @@ -413,17 +412,16 @@ func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.Nor SyncedAtColName: req.SyncedAtColName, }) if mergeErr != nil { - return nil, mergeErr + return model.NormalizeResponse{}, mergeErr } err = c.UpdateNormalizeBatchID(ctx, req.FlowJobName, batchId) if err != nil { - return nil, err + return model.NormalizeResponse{}, err } } - return &model.NormalizeResponse{ - Done: true, + return model.NormalizeResponse{ StartBatchID: normBatchID + 1, EndBatchID: req.SyncBatchID, }, nil diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 36e510e405..60a79a6d9f 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -229,27 +229,26 @@ func getOrderedOrderByColumns( func (c *ClickHouseConnector) NormalizeRecords( ctx context.Context, req *model.NormalizeRecordsRequest, -) (*model.NormalizeResponse, error) { +) (model.NormalizeResponse, error) { // fix for potential consistency issues time.Sleep(3 * time.Second) normBatchID, err := c.GetLastNormalizeBatchID(ctx, req.FlowJobName) if err != nil { c.logger.Error("[clickhouse] error while getting last sync and normalize batch id", "error", err) - return nil, err + return model.NormalizeResponse{}, err } // normalize has caught up with sync, chill until more records are loaded. if normBatchID >= req.SyncBatchID { - return &model.NormalizeResponse{ - Done: false, + return model.NormalizeResponse{ StartBatchID: normBatchID, EndBatchID: req.SyncBatchID, }, nil } if err := c.copyAvroStagesToDestination(ctx, req.FlowJobName, normBatchID, req.SyncBatchID); err != nil { - return nil, fmt.Errorf("failed to copy avro stages to destination: %w", err) + return model.NormalizeResponse{}, fmt.Errorf("failed to copy avro stages to destination: %w", err) } destinationTableNames, err := c.getDistinctTableNamesInBatch( @@ -261,22 +260,21 @@ func (c *ClickHouseConnector) NormalizeRecords( ) if err != nil { c.logger.Error("[clickhouse] error while getting distinct table names in batch", "error", err) - return nil, err + return model.NormalizeResponse{}, err } enablePrimaryUpdate, err := peerdbenv.PeerDBEnableClickHousePrimaryUpdate(ctx, req.Env) if err != nil { - return nil, err + return model.NormalizeResponse{}, err } parallelNormalize, err := peerdbenv.PeerDBClickHouseParallelNormalize(ctx, req.Env) if err != nil { - return nil, err + return model.NormalizeResponse{}, err } parallelNormalize = min(max(parallelNormalize, 1), len(destinationTableNames)) - if parallelNormalize > 1 { - c.logger.Info("normalizing in parallel", slog.Int("connections", parallelNormalize)) - } + c.logger.Info("[clickhouse] normalizing batch", + slog.Int64("StartBatchID", normBatchID), slog.Int64("EndBatchID", req.SyncBatchID), slog.Int("connections", parallelNormalize)) queries := make(chan string) rawTbl := c.getRawTableName(req.FlowJobName) @@ -356,7 +354,7 @@ func (c *ClickHouseConnector) NormalizeRecords( clickHouseType, err = colType.ToDWHColumnType(ctx, req.Env, protos.DBType_CLICKHOUSE, column) if err != nil { close(queries) - return nil, fmt.Errorf("error while converting column type to clickhouse type: %w", err) + return model.NormalizeResponse{}, fmt.Errorf("error while converting column type to clickhouse type: %w", err) } if (schema.NullableEnabled || columnNullableEnabled) && column.Nullable && !colType.IsArray() { @@ -458,21 +456,20 @@ func (c *ClickHouseConnector) NormalizeRecords( c.logger.Error("[clickhouse] context canceled while normalizing", slog.Any("error", errCtx.Err()), slog.Any("cause", context.Cause(errCtx))) - return nil, context.Cause(errCtx) + return model.NormalizeResponse{}, context.Cause(errCtx) } } close(queries) if err := group.Wait(); err != nil { - return nil, err + return model.NormalizeResponse{}, err } if err := c.UpdateNormalizeBatchID(ctx, req.FlowJobName, req.SyncBatchID); err != nil { c.logger.Error("[clickhouse] error while updating normalize batch id", slog.Int64("BatchID", req.SyncBatchID), slog.Any("error", err)) - return nil, err + return model.NormalizeResponse{}, err } - return &model.NormalizeResponse{ - Done: true, + return model.NormalizeResponse{ StartBatchID: normBatchID + 1, EndBatchID: req.SyncBatchID, }, nil diff --git a/flow/connectors/core.go b/flow/connectors/core.go index afdf244947..47dc0ece06 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -198,7 +198,7 @@ type CDCNormalizeConnector interface { // NormalizeRecords merges records pushed earlier into the destination table. // This method should be idempotent, and should be able to be called multiple times with the same request. - NormalizeRecords(ctx context.Context, req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) + NormalizeRecords(ctx context.Context, req *model.NormalizeRecordsRequest) (model.NormalizeResponse, error) } type CreateTablesFromExistingConnector interface { diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 02c61b2ebb..6cd3be11e1 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -611,32 +611,29 @@ func syncRecordsCore[Items model.Items]( func (c *PostgresConnector) NormalizeRecords( ctx context.Context, req *model.NormalizeRecordsRequest, -) (*model.NormalizeResponse, error) { +) (model.NormalizeResponse, error) { rawTableIdentifier := getRawTableIdentifier(req.FlowJobName) jobMetadataExists, err := c.jobMetadataExists(ctx, req.FlowJobName) if err != nil { - return nil, err + return model.NormalizeResponse{}, err } // no SyncFlow has run, chill until more records are loaded. if !jobMetadataExists { c.logger.Info("no metadata found for mirror") - return &model.NormalizeResponse{ - Done: false, - }, nil + return model.NormalizeResponse{}, nil } normBatchID, err := c.GetLastNormalizeBatchID(ctx, req.FlowJobName) if err != nil { - return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err) + return model.NormalizeResponse{}, fmt.Errorf("failed to get batch for the current mirror: %v", err) } // normalize has caught up with sync, chill until more records are loaded. if normBatchID >= req.SyncBatchID { c.logger.Info(fmt.Sprintf("no records to normalize: syncBatchID %d, normalizeBatchID %d", req.SyncBatchID, normBatchID)) - return &model.NormalizeResponse{ - Done: false, + return model.NormalizeResponse{ StartBatchID: normBatchID, EndBatchID: req.SyncBatchID, }, nil @@ -645,23 +642,23 @@ func (c *PostgresConnector) NormalizeRecords( destinationTableNames, err := c.getDistinctTableNamesInBatch( ctx, req.FlowJobName, req.SyncBatchID, normBatchID, req.TableNameSchemaMapping) if err != nil { - return nil, err + return model.NormalizeResponse{}, err } unchangedToastColumnsMap, err := c.getTableNametoUnchangedCols(ctx, req.FlowJobName, req.SyncBatchID, normBatchID) if err != nil { - return nil, err + return model.NormalizeResponse{}, err } normalizeRecordsTx, err := c.conn.Begin(ctx) if err != nil { - return nil, fmt.Errorf("error starting transaction for normalizing records: %w", err) + return model.NormalizeResponse{}, fmt.Errorf("error starting transaction for normalizing records: %w", err) } defer shared.RollbackTx(normalizeRecordsTx, c.logger) pgversion, err := c.MajorVersion(ctx) if err != nil { - return nil, err + return model.NormalizeResponse{}, err } totalRowsAffected := 0 normalizeStmtGen := normalizeStmtGenerator{ @@ -689,7 +686,7 @@ func (c *PostgresConnector) NormalizeRecords( slog.String("destinationTableName", destinationTableName), slog.Any("error", err), ) - return nil, fmt.Errorf("error executing normalize statement for table %s: %w", destinationTableName, err) + return model.NormalizeResponse{}, fmt.Errorf("error executing normalize statement for table %s: %w", destinationTableName, err) } totalRowsAffected += int(ct.RowsAffected()) } @@ -699,16 +696,15 @@ func (c *PostgresConnector) NormalizeRecords( // updating metadata with new normalizeBatchID err = c.updateNormalizeMetadata(ctx, req.FlowJobName, req.SyncBatchID, normalizeRecordsTx) if err != nil { - return nil, err + return model.NormalizeResponse{}, err } // transaction commits err = normalizeRecordsTx.Commit(ctx) if err != nil { - return nil, err + return model.NormalizeResponse{}, err } - return &model.NormalizeResponse{ - Done: true, + return model.NormalizeResponse{ StartBatchID: normBatchID + 1, EndBatchID: req.SyncBatchID, }, nil diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 124560e90e..9bb66e5922 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -475,17 +475,16 @@ func (c *SnowflakeConnector) syncRecordsViaAvro( } // NormalizeRecords normalizes raw table to destination table. -func (c *SnowflakeConnector) NormalizeRecords(ctx context.Context, req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) { +func (c *SnowflakeConnector) NormalizeRecords(ctx context.Context, req *model.NormalizeRecordsRequest) (model.NormalizeResponse, error) { ctx = c.withMirrorNameQueryTag(ctx, req.FlowJobName) normBatchID, err := c.GetLastNormalizeBatchID(ctx, req.FlowJobName) if err != nil { - return nil, err + return model.NormalizeResponse{}, err } // normalize has caught up with sync, chill until more records are loaded. if normBatchID >= req.SyncBatchID { - return &model.NormalizeResponse{ - Done: false, + return model.NormalizeResponse{ StartBatchID: normBatchID, EndBatchID: req.SyncBatchID, }, nil @@ -501,17 +500,16 @@ func (c *SnowflakeConnector) NormalizeRecords(ctx context.Context, req *model.No }, ) if mergeErr != nil { - return nil, mergeErr + return model.NormalizeResponse{}, mergeErr } err = c.UpdateNormalizeBatchID(ctx, req.FlowJobName, batchId) if err != nil { - return nil, err + return model.NormalizeResponse{}, err } } - return &model.NormalizeResponse{ - Done: true, + return model.NormalizeResponse{ StartBatchID: normBatchID + 1, EndBatchID: req.SyncBatchID, }, nil diff --git a/flow/model/model.go b/flow/model/model.go index d2f66693c0..63e0922cd2 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -169,15 +169,11 @@ type SyncResponse struct { CurrentSyncBatchID int64 } -type NormalizePayload struct { - TableNameSchemaMapping map[string]*protos.TableSchema - Done bool - SyncBatchID int64 +type SyncRecordsResult struct { + NumRecordsSynced int64 } type NormalizeResponse struct { - // Flag to depict if normalization is done - Done bool StartBatchID int64 EndBatchID int64 } diff --git a/flow/model/signals.go b/flow/model/signals.go index 7e98ab343f..31a89ff778 100644 --- a/flow/model/signals.go +++ b/flow/model/signals.go @@ -138,11 +138,3 @@ var CDCDynamicPropertiesSignal = TypedSignal[*protos.CDCFlowConfigUpdate]{ var SyncStopSignal = TypedSignal[struct{}]{ Name: "sync-stop", } - -var NormalizeSignal = TypedSignal[NormalizePayload]{ - Name: "normalize", -} - -var NormalizeDoneSignal = TypedSignal[struct{}]{ - Name: "normalize-done", -} diff --git a/flow/peerdbenv/dynamicconf.go b/flow/peerdbenv/dynamicconf.go index 98a47d8fdc..636c740b67 100644 --- a/flow/peerdbenv/dynamicconf.go +++ b/flow/peerdbenv/dynamicconf.go @@ -27,6 +27,15 @@ var DynamicSettings = [...]*protos.DynamicSetting{ ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_ALL, }, + { + Name: "PEERDB_NORMALIZE_CHANNEL_BUFFER_SIZE", + Description: "Advanced setting: changes buffer size of channel PeerDB uses for queueing normalizing, " + + "use with PEERDB_PARALLEL_SYNC_NORMALIZE", + DefaultValue: "0", + ValueType: protos.DynconfValueType_INT, + ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, + TargetForSetting: protos.DynconfTarget_ALL, + }, { Name: "PEERDB_QUEUE_FLUSH_TIMEOUT_SECONDS", Description: "Frequency of flushing to queue, applicable for PeerDB Streams mirrors only", @@ -341,8 +350,12 @@ func PeerDBBigQueryEnableSyncedAtPartitioning(ctx context.Context, env map[strin return dynamicConfBool(ctx, env, "PEERDB_BIGQUERY_ENABLE_SYNCED_AT_PARTITIONING_BY_DAYS") } -func PeerDBCDCChannelBufferSize(ctx context.Context, env map[string]string) (int64, error) { - return dynamicConfSigned[int64](ctx, env, "PEERDB_CDC_CHANNEL_BUFFER_SIZE") +func PeerDBCDCChannelBufferSize(ctx context.Context, env map[string]string) (int, error) { + return dynamicConfSigned[int](ctx, env, "PEERDB_CDC_CHANNEL_BUFFER_SIZE") +} + +func PeerDBNormalizeChannelBufferSize(ctx context.Context, env map[string]string) (int, error) { + return dynamicConfSigned[int](ctx, env, "PEERDB_NORMALIZE_CHANNEL_BUFFER_SIZE") } func PeerDBQueueFlushTimeoutSeconds(ctx context.Context, env map[string]string) (time.Duration, error) { diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 0c97af9b7d..9533ecddec 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -331,7 +331,6 @@ func CDCFlowWorkflow( mirrorNameSearch := shared.NewSearchAttributes(cfg.FlowJobName) - var syncCountLimit int if state.ActiveSignal == model.PauseSignal { selector := workflow.NewNamedSelector(ctx, "PauseLoop") selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) @@ -356,7 +355,6 @@ func CDCFlowWorkflow( if err := processCDCFlowConfigUpdate(ctx, logger, cfg, state, mirrorNameSearch); err != nil { return state, err } - syncCountLimit = int(state.SyncFlowOptions.NumberOfSyncs) logger.Info("wiping flow state after state update processing") // finished processing, wipe it state.FlowConfigUpdate = nil @@ -492,10 +490,8 @@ func CDCFlowWorkflow( } syncFlowID := GetChildWorkflowID("sync-flow", cfg.FlowJobName, originalRunID) - normalizeFlowID := GetChildWorkflowID("normalize-flow", cfg.FlowJobName, originalRunID) var restart, finished bool - syncCount := 0 syncFlowOpts := workflow.ChildWorkflowOptions{ WorkflowID: syncFlowID, ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, @@ -507,96 +503,42 @@ func CDCFlowWorkflow( } syncCtx := workflow.WithChildOptions(ctx, syncFlowOpts) - normalizeFlowOpts := workflow.ChildWorkflowOptions{ - WorkflowID: normalizeFlowID, - ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, - RetryPolicy: &temporal.RetryPolicy{ - MaximumAttempts: 20, - }, - TypedSearchAttributes: mirrorNameSearch, - WaitForCancellation: true, - } - normCtx := workflow.WithChildOptions(ctx, normalizeFlowOpts) - - handleError := func(name string, err error) { - var panicErr *temporal.PanicError - if errors.As(err, &panicErr) { - logger.Error( - "panic in flow", - slog.String("name", name), - slog.Any("error", panicErr.Error()), - slog.String("stack", panicErr.StackTrace()), - ) - } else { - logger.Error("error in flow", slog.String("name", name), slog.Any("error", err)) - } - } - syncFlowFuture := workflow.ExecuteChildWorkflow(syncCtx, SyncFlowWorkflow, cfg, state.SyncFlowOptions) - normFlowFuture := workflow.ExecuteChildWorkflow(normCtx, NormalizeFlowWorkflow, cfg, nil) mainLoopSelector := workflow.NewNamedSelector(ctx, "MainLoop") - mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) + mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) { + finished = true + }) mainLoopSelector.AddFuture(syncFlowFuture, func(f workflow.Future) { if err := f.Get(ctx, nil); err != nil { - handleError("sync", err) - } - - logger.Info("sync finished, finishing normalize") - syncFlowFuture = nil - restart = true - if normFlowFuture != nil { - err := model.NormalizeSignal.SignalChildWorkflow(ctx, normFlowFuture, model.NormalizePayload{ - Done: true, - SyncBatchID: -1, - }).Get(ctx, nil) - if err != nil { - logger.Warn("failed to signal normalize done, finishing", slog.Any("error", err)) - finished = true + var panicErr *temporal.PanicError + if errors.As(err, &panicErr) { + logger.Error( + "panic in sync flow", + slog.Any("error", panicErr.Error()), + slog.String("stack", panicErr.StackTrace()), + ) + } else { + logger.Error("error in sync flow", slog.Any("error", err)) } + } else { + logger.Info("sync finished") } - }) - mainLoopSelector.AddFuture(normFlowFuture, func(f workflow.Future) { - err := f.Get(ctx, nil) - if err != nil { - handleError("normalize", err) - } - - logger.Info("normalize finished, finishing") - normFlowFuture = nil + syncFlowFuture = nil restart = true finished = true + if state.SyncFlowOptions.NumberOfSyncs > 0 { + state.ActiveSignal = model.PauseSignal + } }) flowSignalChan.AddToSelector(mainLoopSelector, func(val model.CDCFlowSignal, _ bool) { state.ActiveSignal = model.FlowSignalHandler(state.ActiveSignal, val, logger) - }) - - normChan := model.NormalizeSignal.GetSignalChannel(ctx) - normChan.AddToSelector(mainLoopSelector, func(payload model.NormalizePayload, _ bool) { - if normFlowFuture != nil { - _ = model.NormalizeSignal.SignalChildWorkflow(ctx, normFlowFuture, payload).Get(ctx, nil) + if state.ActiveSignal == model.PauseSignal { + finished = true } }) - parallel := getParallelSyncNormalize(ctx, logger, cfg.Env) - if !parallel { - normDoneChan := model.NormalizeDoneSignal.GetSignalChannel(ctx) - normDoneChan.Drain() - normDoneChan.AddToSelector(mainLoopSelector, func(x struct{}, _ bool) { - syncCount += 1 - if syncCount == syncCountLimit { - logger.Info("sync count limit reached, pausing", - slog.Int("limit", syncCountLimit), - slog.Int("count", syncCount)) - state.ActiveSignal = model.PauseSignal - } - if syncFlowFuture != nil { - _ = model.NormalizeDoneSignal.SignalChildWorkflow(ctx, syncFlowFuture, x).Get(ctx, nil) - } - }) - } - addCdcPropertiesSignalListener(ctx, logger, mainLoopSelector, state) state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING @@ -610,22 +552,17 @@ func CDCFlowWorkflow( return state, err } - if state.ActiveSignal == model.PauseSignal || workflow.GetInfo(ctx).GetContinueAsNewSuggested() { + if workflow.GetInfo(ctx).GetContinueAsNewSuggested() { restart = true if syncFlowFuture != nil { - err := model.SyncStopSignal.SignalChildWorkflow(ctx, syncFlowFuture, struct{}{}).Get(ctx, nil) - if err != nil { + if err := model.SyncStopSignal.SignalChildWorkflow(ctx, syncFlowFuture, struct{}{}).Get(ctx, nil); err != nil { logger.Warn("failed to send sync-stop, finishing", slog.Any("error", err)) finished = true } } } - if restart { - if state.ActiveSignal == model.PauseSignal { - finished = true - } - + if restart || finished { for ctx.Err() == nil && (!finished || mainLoopSelector.HasPending()) { mainLoopSelector.Select(ctx) } diff --git a/flow/workflows/local_activities.go b/flow/workflows/local_activities.go index 7a3e80f240..923bcaea58 100644 --- a/flow/workflows/local_activities.go +++ b/flow/workflows/local_activities.go @@ -15,20 +15,6 @@ import ( "github.com/PeerDB-io/peer-flow/shared" ) -func getParallelSyncNormalize(wCtx workflow.Context, logger log.Logger, env map[string]string) bool { - checkCtx := workflow.WithLocalActivityOptions(wCtx, workflow.LocalActivityOptions{ - StartToCloseTimeout: time.Minute, - }) - - getParallelFuture := workflow.ExecuteLocalActivity(checkCtx, peerdbenv.PeerDBEnableParallelSyncNormalize, env) - var parallel bool - if err := getParallelFuture.Get(checkCtx, ¶llel); err != nil { - logger.Warn("Failed to get status of parallel sync-normalize", slog.Any("error", err)) - return false - } - return parallel -} - func getQRepOverwriteFullRefreshMode(wCtx workflow.Context, logger log.Logger, env map[string]string) bool { checkCtx := workflow.WithLocalActivityOptions(wCtx, workflow.LocalActivityOptions{ StartToCloseTimeout: time.Minute, diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go deleted file mode 100644 index 3211e6ca06..0000000000 --- a/flow/workflows/normalize_flow.go +++ /dev/null @@ -1,121 +0,0 @@ -package peerflow - -import ( - "log/slog" - "time" - - "go.temporal.io/sdk/log" - "go.temporal.io/sdk/workflow" - - "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/shared" -) - -type NormalizeState struct { - LastSyncBatchID int64 - SyncBatchID int64 - Wait bool - Stop bool -} - -func NewNormalizeState() *NormalizeState { - return &NormalizeState{ - LastSyncBatchID: -1, - SyncBatchID: -1, - Wait: true, - Stop: false, - } -} - -// returns whether workflow should finish -// signals are flushed when ProcessLoop returns -func ProcessLoop(ctx workflow.Context, logger log.Logger, selector workflow.Selector, state *NormalizeState) bool { - for ctx.Err() == nil && selector.HasPending() { - selector.Select(ctx) - } - - if ctx.Err() != nil { - logger.Info("normalize canceled") - return true - } else if state.Stop && state.LastSyncBatchID == state.SyncBatchID { - logger.Info("normalize finished") - return true - } - return false -} - -func NormalizeFlowWorkflow( - ctx workflow.Context, - config *protos.FlowConnectionConfigs, - state *NormalizeState, -) error { - parent := workflow.GetInfo(ctx).ParentWorkflowExecution - logger := log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)) - - if state == nil { - state = NewNormalizeState() - } - - normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 7 * 24 * time.Hour, - HeartbeatTimeout: time.Minute, - }) - - selector := workflow.NewNamedSelector(ctx, "NormalizeLoop") - selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) - model.NormalizeSignal.GetSignalChannel(ctx).AddToSelector(selector, func(s model.NormalizePayload, _ bool) { - if s.Done { - state.Stop = true - } - if s.SyncBatchID > state.SyncBatchID { - state.SyncBatchID = s.SyncBatchID - } - - state.Wait = false - }) - - for state.Wait && ctx.Err() == nil { - selector.Select(ctx) - } - if ProcessLoop(ctx, logger, selector, state) { - return ctx.Err() - } - - if state.LastSyncBatchID != state.SyncBatchID { - state.LastSyncBatchID = state.SyncBatchID - - logger.Info("executing normalize") - startNormalizeInput := &protos.StartNormalizeInput{ - FlowConnectionConfigs: config, - SyncBatchID: state.SyncBatchID, - } - fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput) - - var normalizeResponse *model.NormalizeResponse - if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil { - logger.Info("Normalize errored", slog.Any("error", err)) - } else if normalizeResponse != nil { - logger.Info("Normalize finished", slog.Any("result", normalizeResponse)) - } - } - - if ctx.Err() == nil && !state.Stop { - parallel := getParallelSyncNormalize(ctx, logger, config.Env) - - if !parallel { - _ = model.NormalizeDoneSignal.SignalExternalWorkflow( - ctx, - parent.ID, - "", - struct{}{}, - ).Get(ctx, nil) - } - } - - state.Wait = true - if ProcessLoop(ctx, logger, selector, state) { - return ctx.Err() - } - return workflow.NewContinueAsNewError(ctx, NormalizeFlowWorkflow, config, state) -} diff --git a/flow/workflows/register.go b/flow/workflows/register.go index 2c4b32ba3c..4c458319cc 100644 --- a/flow/workflows/register.go +++ b/flow/workflows/register.go @@ -7,7 +7,6 @@ import ( func RegisterFlowWorkerWorkflows(w worker.WorkflowRegistry) { w.RegisterWorkflow(CDCFlowWorkflow) w.RegisterWorkflow(DropFlowWorkflow) - w.RegisterWorkflow(NormalizeFlowWorkflow) w.RegisterWorkflow(SetupFlowWorkflow) w.RegisterWorkflow(SyncFlowWorkflow) w.RegisterWorkflow(QRepFlowWorkflow) diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 7dce9ffec5..f6d75d4ee5 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -18,7 +18,6 @@ func SyncFlowWorkflow( config *protos.FlowConnectionConfigs, options *protos.SyncFlowOptions, ) error { - parent := workflow.GetInfo(ctx).ParentWorkflowExecution logger := log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)) sessionOptions := &workflow.SessionOptions{ @@ -48,7 +47,7 @@ func SyncFlowWorkflow( ) var stop, syncErr bool - currentSyncFlowNum := 0 + currentSyncFlowNum := int32(0) totalRecordsSynced := int64(0) selector := workflow.NewNamedSelector(ctx, "SyncLoop") @@ -65,19 +64,6 @@ func SyncFlowWorkflow( stop = true }) - var waitSelector workflow.Selector - parallel := getParallelSyncNormalize(ctx, logger, config.Env) - if !parallel { - waitSelector = workflow.NewNamedSelector(ctx, "NormalizeWait") - waitSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) - waitChan := model.NormalizeDoneSignal.GetSignalChannel(ctx) - waitChan.Drain() - waitChan.AddToSelector(waitSelector, func(_ struct{}, _ bool) {}) - stopChan.AddToSelector(waitSelector, func(_ struct{}, _ bool) { - stop = true - }) - } - syncFlowCtx := workflow.WithActivityOptions(syncSessionCtx, workflow.ActivityOptions{ StartToCloseTimeout: 7 * 24 * time.Hour, HeartbeatTimeout: time.Minute, @@ -85,10 +71,9 @@ func SyncFlowWorkflow( }) for !stop && ctx.Err() == nil { var syncDone bool - mustWait := waitSelector != nil currentSyncFlowNum += 1 - logger.Info("executing sync flow", slog.Int("count", currentSyncFlowNum)) + logger.Info("executing sync flow", slog.Int("count", int(currentSyncFlowNum))) var syncFlowFuture workflow.Future if config.System == protos.TypeSystem_Q { @@ -99,69 +84,20 @@ func SyncFlowWorkflow( selector.AddFuture(syncFlowFuture, func(f workflow.Future) { syncDone = true - var childSyncFlowRes *model.SyncCompositeResponse - if err := f.Get(ctx, &childSyncFlowRes); err != nil { + var syncResult model.SyncRecordsResult + if err := f.Get(ctx, &syncResult); err != nil { logger.Error("failed to execute sync flow", slog.Any("error", err)) syncErr = true - } else if childSyncFlowRes != nil { - totalRecordsSynced += childSyncFlowRes.SyncResponse.NumRecordsSynced - logger.Info("Total records synced", slog.Int64("totalRecordsSynced", totalRecordsSynced)) - - // slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes. - tableSchemaDeltasCount := len(childSyncFlowRes.SyncResponse.TableSchemaDeltas) - if tableSchemaDeltasCount > 0 { - modifiedSrcTables := make([]string, 0, tableSchemaDeltasCount) - for _, tableSchemaDelta := range childSyncFlowRes.SyncResponse.TableSchemaDeltas { - modifiedSrcTables = append(modifiedSrcTables, tableSchemaDelta.SrcTableName) - } - - getModifiedSchemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 5 * time.Minute, - }) - getModifiedSchemaFuture := workflow.ExecuteActivity(getModifiedSchemaCtx, flowable.SetupTableSchema, - &protos.SetupTableSchemaBatchInput{ - PeerName: config.SourceName, - TableIdentifiers: modifiedSrcTables, - TableMappings: options.TableMappings, - FlowName: config.FlowJobName, - System: config.System, - }) - - if err := getModifiedSchemaFuture.Get(ctx, nil); err != nil { - logger.Error("failed to execute schema update at source", slog.Any("error", err)) - } - } - - if childSyncFlowRes.NeedsNormalize { - err := model.NormalizeSignal.SignalExternalWorkflow( - ctx, - parent.ID, - "", - model.NormalizePayload{ - Done: false, - SyncBatchID: childSyncFlowRes.SyncResponse.CurrentSyncBatchID, - }, - ).Get(ctx, nil) - if err != nil { - logger.Error("failed to trigger normalize, so skip wait", slog.Any("error", err)) - mustWait = false - } - } else { - mustWait = false - } } else { - mustWait = false + totalRecordsSynced += syncResult.NumRecordsSynced + logger.Info("Total records synced", + slog.Int64("numRecordsSynced", syncResult.NumRecordsSynced), slog.Int64("totalRecordsSynced", totalRecordsSynced)) } }) for ctx.Err() == nil && ((!syncDone && !syncErr) || selector.HasPending()) { selector.Select(ctx) } - if ctx.Err() != nil { - break - } - - restart := syncErr || workflow.GetInfo(ctx).GetContinueAsNewSuggested() if syncErr { logger.Info("sync flow error, sleeping for 30 seconds...") @@ -171,16 +107,8 @@ func SyncFlowWorkflow( } } - if !stop && !syncErr && mustWait { - waitSelector.Select(ctx) - if restart { - // must flush selector for signals received while waiting - for ctx.Err() == nil && selector.HasPending() { - selector.Select(ctx) - } - break - } - } else if restart { + if (options.NumberOfSyncs > 0 && currentSyncFlowNum >= options.NumberOfSyncs) || + syncErr || ctx.Err() != nil || workflow.GetInfo(ctx).GetContinueAsNewSuggested() { break } } @@ -204,7 +132,7 @@ func SyncFlowWorkflow( logger.Warn("UnmaintainPull failed", slog.Any("error", err)) } - if stop { + if stop || currentSyncFlowNum >= options.NumberOfSyncs { return nil } else if _, stop := stopChan.ReceiveAsync(); stop { // if sync flow erroring may outrace receiving stop diff --git a/protos/flow.proto b/protos/flow.proto index 42170a5630..b917b6dae5 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -117,11 +117,6 @@ message SyncFlowOptions { int32 number_of_syncs = 7; } -message StartNormalizeInput { - FlowConnectionConfigs flow_connection_configs = 1; - int64 SyncBatchID = 3; -} - message EnsurePullabilityBatchInput { string flow_job_name = 2; repeated string source_table_identifiers = 3;