From f2c00b22b808a3b057d94c65a543b91b737478ea Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Sun, 17 Nov 2024 15:06:39 -0600 Subject: [PATCH] refactor: reduce default batch and partition sizes for improved stability Reduces default batch sizes from 1,000,000 to 250,000 rows across CDC and snapshot operations: - Lower CDC batch size for incremental syncs - Reduce snapshot partition size for initial loads - Update UI defaults and tooltips to reflect new values This change helps prevent memory pressure and timeout issues when processing large datasets. --- flow/activities/flowable_core.go | 2 +- flow/workflows/snapshot_flow.go | 2 +- ui/app/mirrors/create/helpers/cdc.ts | 12 ++++++------ ui/app/mirrors/create/helpers/common.ts | 4 ++-- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index db04efea30..d583044b0c 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -139,7 +139,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon batchSize := options.BatchSize if batchSize == 0 { - batchSize = 1_000_000 + batchSize = 250_000 } lastOffset, err := func() (int64, error) { diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index d4f494d1ff..9b21b7b384 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -166,7 +166,7 @@ func (s *SnapshotFlowExecution) cloneTable( numWorkers = s.config.SnapshotMaxParallelWorkers } - numRowsPerPartition := uint32(500000) + numRowsPerPartition := uint32(250000) if s.config.SnapshotNumRowsPerPartition > 0 { numRowsPerPartition = s.config.SnapshotNumRowsPerPartition } diff --git a/ui/app/mirrors/create/helpers/cdc.ts b/ui/app/mirrors/create/helpers/cdc.ts index 99dd229cb3..957564d678 100644 --- a/ui/app/mirrors/create/helpers/cdc.ts +++ b/ui/app/mirrors/create/helpers/cdc.ts @@ -22,12 +22,12 @@ export const cdcSettings: MirrorSetting[] = [ setter( (curr: CDCConfig): CDCConfig => ({ ...curr, - maxBatchSize: (value as number) || 1000000, + maxBatchSize: (value as number) || 250000, }) ), - tips: 'The number of rows PeerDB will pull from source at a time. If left empty, the default value is 1,000,000 rows.', + tips: 'The number of rows PeerDB will pull from source at a time. If left empty, the default value is 250,000 rows.', type: 'number', - default: '1000000', + default: '250000', advanced: AdvancedSettingType.ALL, }, { @@ -78,11 +78,11 @@ export const cdcSettings: MirrorSetting[] = [ setter( (curr: CDCConfig): CDCConfig => ({ ...curr, - snapshotNumRowsPerPartition: parseInt(value as string, 10) || 1000000, + snapshotNumRowsPerPartition: parseInt(value as string, 10) || 250000, }) ), - tips: 'PeerDB splits up table data into partitions for increased performance. This setting controls the number of rows per partition. The default value is 1000000.', - default: '1000000', + tips: 'PeerDB splits up table data into partitions for increased performance. This setting controls the number of rows per partition. The default value is 250000.', + default: '250000', type: 'number', advanced: AdvancedSettingType.ALL, }, diff --git a/ui/app/mirrors/create/helpers/common.ts b/ui/app/mirrors/create/helpers/common.ts index d4ba5747ad..f29a2376c9 100644 --- a/ui/app/mirrors/create/helpers/common.ts +++ b/ui/app/mirrors/create/helpers/common.ts @@ -25,10 +25,10 @@ export const blankCDCSetting: CDCConfig = { destinationName: '', flowJobName: '', tableMappings: [], - maxBatchSize: 1000000, + maxBatchSize: 250000, doInitialSnapshot: true, publicationName: '', - snapshotNumRowsPerPartition: 1000000, + snapshotNumRowsPerPartition: 250000, snapshotMaxParallelWorkers: 4, snapshotNumTablesInParallel: 1, snapshotStagingPath: '',