diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index bc314b455b..ca002234c2 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -303,11 +303,17 @@ func (a *FlowableActivity) MaintainPull( return err } + normalizeBufferSize, err := peerdbenv.PeerDBNormalizeChannelBufferSize(ctx, config.Env) + if err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + return err + } + // syncDone will be closed by UnmaintainPull, // whereas normalizeDone will be closed by the normalize goroutine // Wait on normalizeDone at end to not interrupt final normalize syncDone := make(chan struct{}) - normalize := make(chan NormalizeBatchRequest) + normalize := make(chan NormalizeBatchRequest, normalizeBufferSize) normalizeDone := make(chan struct{}) a.CdcCacheRw.Lock() a.CdcCache[sessionID] = CdcCacheEntry{ diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 4361f4d5e1..b9bad18036 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -195,7 +195,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon if err != nil { return 0, fmt.Errorf("failed to get CDC channel buffer size: %w", err) } - recordBatchPull := model.NewCDCStream[Items](int(channelBufferSize)) + recordBatchPull := model.NewCDCStream[Items](channelBufferSize) recordBatchSync := recordBatchPull if adaptStream != nil { var err error diff --git a/flow/peerdbenv/dynamicconf.go b/flow/peerdbenv/dynamicconf.go index 98a47d8fdc..6f745978a9 100644 --- a/flow/peerdbenv/dynamicconf.go +++ b/flow/peerdbenv/dynamicconf.go @@ -27,6 +27,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{ ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_ALL, }, + { + Name: "PEERDB_NORMALIZE_CHANNEL_BUFFER_SIZE", + Description: "Advanced setting: changes buffer size of channel PeerDB uses for queueing normalizing, use with PEERDB_PARALLEL_SYNC_NORMALIZE", + DefaultValue: "0", + ValueType: protos.DynconfValueType_INT, + ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, + TargetForSetting: protos.DynconfTarget_ALL, + }, { Name: "PEERDB_QUEUE_FLUSH_TIMEOUT_SECONDS", Description: "Frequency of flushing to queue, applicable for PeerDB Streams mirrors only", @@ -341,8 +349,12 @@ func PeerDBBigQueryEnableSyncedAtPartitioning(ctx context.Context, env map[strin return dynamicConfBool(ctx, env, "PEERDB_BIGQUERY_ENABLE_SYNCED_AT_PARTITIONING_BY_DAYS") } -func PeerDBCDCChannelBufferSize(ctx context.Context, env map[string]string) (int64, error) { - return dynamicConfSigned[int64](ctx, env, "PEERDB_CDC_CHANNEL_BUFFER_SIZE") +func PeerDBCDCChannelBufferSize(ctx context.Context, env map[string]string) (int, error) { + return dynamicConfSigned[int](ctx, env, "PEERDB_CDC_CHANNEL_BUFFER_SIZE") +} + +func PeerDBNormalizeChannelBufferSize(ctx context.Context, env map[string]string) (int, error) { + return dynamicConfSigned[int](ctx, env, "PEERDB_NORMALIZE_CHANNEL_BUFFER_SIZE") } func PeerDBQueueFlushTimeoutSeconds(ctx context.Context, env map[string]string) (time.Duration, error) {