Skip to content

Commit

Permalink
PEERDB_NORMALIZE_CHANNEL_BUFFER_SIZE
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 17, 2024
1 parent 46bdbdb commit 11e2103
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 4 deletions.
8 changes: 7 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,17 @@ func (a *FlowableActivity) MaintainPull(
return err
}

normalizeBufferSize, err := peerdbenv.PeerDBNormalizeChannelBufferSize(ctx, config.Env)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return err
}

// syncDone will be closed by UnmaintainPull,
// whereas normalizeDone will be closed by the normalize goroutine
// Wait on normalizeDone at end to not interrupt final normalize
syncDone := make(chan struct{})
normalize := make(chan NormalizeBatchRequest)
normalize := make(chan NormalizeBatchRequest, normalizeBufferSize)
normalizeDone := make(chan struct{})
a.CdcCacheRw.Lock()
a.CdcCache[sessionID] = CdcCacheEntry{
Expand Down
2 changes: 1 addition & 1 deletion flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
if err != nil {
return 0, fmt.Errorf("failed to get CDC channel buffer size: %w", err)
}
recordBatchPull := model.NewCDCStream[Items](int(channelBufferSize))
recordBatchPull := model.NewCDCStream[Items](channelBufferSize)
recordBatchSync := recordBatchPull
if adaptStream != nil {
var err error
Expand Down
17 changes: 15 additions & 2 deletions flow/peerdbenv/dynamicconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ var DynamicSettings = [...]*protos.DynamicSetting{
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_ALL,
},
{
Name: "PEERDB_NORMALIZE_CHANNEL_BUFFER_SIZE",
Description: "Advanced setting: changes buffer size of channel PeerDB uses for queueing normalizing, " +
"use with PEERDB_PARALLEL_SYNC_NORMALIZE",
DefaultValue: "0",
ValueType: protos.DynconfValueType_INT,
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_ALL,
},
{
Name: "PEERDB_QUEUE_FLUSH_TIMEOUT_SECONDS",
Description: "Frequency of flushing to queue, applicable for PeerDB Streams mirrors only",
Expand Down Expand Up @@ -341,8 +350,12 @@ func PeerDBBigQueryEnableSyncedAtPartitioning(ctx context.Context, env map[strin
return dynamicConfBool(ctx, env, "PEERDB_BIGQUERY_ENABLE_SYNCED_AT_PARTITIONING_BY_DAYS")
}

func PeerDBCDCChannelBufferSize(ctx context.Context, env map[string]string) (int64, error) {
return dynamicConfSigned[int64](ctx, env, "PEERDB_CDC_CHANNEL_BUFFER_SIZE")
func PeerDBCDCChannelBufferSize(ctx context.Context, env map[string]string) (int, error) {
return dynamicConfSigned[int](ctx, env, "PEERDB_CDC_CHANNEL_BUFFER_SIZE")
}

func PeerDBNormalizeChannelBufferSize(ctx context.Context, env map[string]string) (int, error) {
return dynamicConfSigned[int](ctx, env, "PEERDB_NORMALIZE_CHANNEL_BUFFER_SIZE")
}

func PeerDBQueueFlushTimeoutSeconds(ctx context.Context, env map[string]string) (time.Duration, error) {
Expand Down

0 comments on commit 11e2103

Please sign in to comment.