From c9b8cfe70de8becf9c4309208910cc3cdd081986 Mon Sep 17 00:00:00 2001 From: Li0k Date: Mon, 4 Dec 2023 15:31:43 +0800 Subject: [PATCH 1/5] fix(storage): fix sst switch condiction --- src/storage/src/hummock/sstable/multi_builder.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/storage/src/hummock/sstable/multi_builder.rs b/src/storage/src/hummock/sstable/multi_builder.rs index b90cc2239d42..89576429342a 100644 --- a/src/storage/src/hummock/sstable/multi_builder.rs +++ b/src/storage/src/hummock/sstable/multi_builder.rs @@ -198,11 +198,7 @@ where if switch_builder { need_seal_current = true; } else if builder.reach_capacity() { - let is_split_table = self - .table_partition_vnode - .contains_key(&full_key.user_key.table_id.table_id()); - - if !is_split_table || builder.reach_max_sst_size() { + if !self.is_target_level_l0_or_lbase || builder.reach_max_sst_size() { need_seal_current = true; } else { need_seal_current = self.is_target_level_l0_or_lbase && vnode_changed; From 698d8b4d8c39b43f92f36c1954ce9d719606a85a Mon Sep 17 00:00:00 2001 From: Li0k Date: Mon, 4 Dec 2023 19:22:04 +0800 Subject: [PATCH 2/5] refactor(storage): refactor compactor config --- src/common/src/config.rs | 63 +++++++++++++------ .../src/hummock/compactor/compactor_runner.rs | 19 ++++-- src/storage/src/hummock/compactor/mod.rs | 10 ++- src/storage/src/opts.rs | 19 +++--- 4 files changed, 71 insertions(+), 40 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 5c8c91db2a3a..b581caab8407 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -532,12 +532,6 @@ pub struct StorageConfig { #[serde(default)] pub compactor_memory_limit_mb: Option, - /// Compactor calculates the maximum number of tasks that can be executed on the node based on - /// worker_num and compactor_max_task_multiplier. - /// max_pull_task_count = worker_num * compactor_max_task_multiplier - #[serde(default = "default::storage::compactor_max_task_multiplier")] - pub compactor_max_task_multiplier: f32, - /// The percentage of memory available when compactor is deployed separately. /// non_reserved_memory_bytes = system_memory_available_bytes * compactor_memory_available_proportion #[serde(default = "default::storage::compactor_memory_available_proportion")] @@ -573,25 +567,21 @@ pub struct StorageConfig { #[serde(default = "default::storage::max_version_pinning_duration_sec")] pub max_version_pinning_duration_sec: u64, - #[serde(default = "default::storage::compactor_max_sst_key_count")] - pub compactor_max_sst_key_count: u64, - #[serde(default = "default::storage::compact_iter_recreate_timeout_ms")] - pub compact_iter_recreate_timeout_ms: u64, - #[serde(default = "default::storage::compactor_max_sst_size")] - pub compactor_max_sst_size: u64, - #[serde(default = "default::storage::enable_fast_compaction")] - pub enable_fast_compaction: bool, - #[serde(default = "default::storage::max_preload_io_retry_times")] - pub max_preload_io_retry_times: usize, #[serde(default, flatten)] pub unrecognized: Unrecognized, + #[serde(default = "default::storage::max_preload_io_retry_times")] + pub max_preload_io_retry_times: usize, + /// The spill threshold for mem table. #[serde(default = "default::storage::mem_table_spill_threshold")] pub mem_table_spill_threshold: usize, #[serde(default)] pub object_store: ObjectStoreConfig, + + #[serde(default)] + pub compactor: CompactorConfig, } #[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)] @@ -892,6 +882,28 @@ pub struct S3ObjectStoreConfig { pub object_store_req_retry_max_attempts: usize, } +/// The subsections `[storage.object_store]`. +#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)] +pub struct CompactorConfig { + /// Compactor calculates the maximum number of tasks that can be executed on the node based on + /// worker_num and compactor_max_task_multiplier. + /// max_pull_task_count = worker_num * compactor_max_task_multiplier + #[serde(default = "default::storage::compactor_max_task_multiplier")] + pub compactor_max_task_multiplier: f32, + #[serde(default = "default::compactor::compactor_max_sst_key_count")] + pub compactor_max_sst_key_count: u64, + #[serde(default = "default::compactor::compact_iter_recreate_timeout_ms")] + pub compact_iter_recreate_timeout_ms: u64, + #[serde(default = "default::compactor::compactor_max_sst_size")] + pub compactor_max_sst_size: u64, + #[serde(default = "default::compactor::enable_fast_compaction")] + pub enable_fast_compaction: bool, + #[serde(default = "default::compactor::fast_max_compact_delete_ratio")] + pub fast_max_compact_delete_ratio: u32, + #[serde(default = "default::compactor::fast_max_compact_task_size")] + pub fast_max_compact_task_size: u64, +} + impl SystemConfig { #![allow(deprecated)] pub fn into_init_system_params(self) -> SystemParams { @@ -1128,6 +1140,16 @@ pub mod default { 3 * 3600 } + pub fn max_preload_io_retry_times() -> usize { + 3 + } + + pub fn mem_table_spill_threshold() -> usize { + 4 << 20 + } + } + + pub mod compactor { pub fn compactor_max_sst_key_count() -> u64 { 2 * 1024 * 1024 // 200w } @@ -1144,11 +1166,12 @@ pub mod default { true } - pub fn max_preload_io_retry_times() -> usize { - 3 + pub fn fast_max_compact_delete_ratio() -> u32 { + 40 } - pub fn mem_table_spill_threshold() -> usize { - 4 << 20 + + pub fn fast_max_compact_task_size() -> u64 { + 2 * 1024 * 1024 * 1024 // 2g } } diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index d4f7e95d34c0..0583a75eae89 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -53,8 +53,6 @@ use crate::hummock::{ SstableDeleteRangeIterator, SstableStoreRef, }; use crate::monitor::{CompactorMetrics, StoreLocalStatistic}; -const FAST_COMPACT_MAX_COMPACT_SIZE: u64 = 2 * 1024 * 1024 * 1024; // 2GB -const FAST_COMPACT_MAX_DELETE_RATIO: u64 = 40; // 40% pub struct CompactorRunner { compact_task: CompactTask, compactor: Compactor, @@ -372,21 +370,30 @@ pub async fn compact( let delete_key_count = sstable_infos .iter() - .map(|table_info| table_info.stale_key_count) + .map(|table_info| table_info.stale_key_count + table_info.range_tombstone_count) .sum::(); let total_key_count = sstable_infos .iter() .map(|table_info| table_info.total_key_count) .sum::(); - let optimize_by_copy_block = context.storage_opts.enable_fast_compaction + let optimize_by_copy_block = context.storage_opts.compactor_config.enable_fast_compaction && all_ssts_are_blocked_filter && !has_tombstone && !has_ttl && single_table && compact_task.target_level > 0 && compact_task.input_ssts.len() == 2 - && compaction_size < FAST_COMPACT_MAX_COMPACT_SIZE - && delete_key_count * 100 < FAST_COMPACT_MAX_DELETE_RATIO * total_key_count + && compaction_size + < context + .storage_opts + .compactor_config + .fast_max_compact_task_size + && delete_key_count * 100 + < context + .storage_opts + .compactor_config + .fast_max_compact_delete_ratio as u64 + * total_key_count && compact_task.task_type() == TaskType::Dynamic; if !optimize_by_copy_block { match generate_splits(&sstable_infos, compaction_size, context.clone()).await { diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index 156bc55f9e67..1bfa80ecc293 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -308,11 +308,17 @@ pub fn start_compactor( let pull_task_ack = Arc::new(AtomicBool::new(true)); assert_ge!( - compactor_context.storage_opts.compactor_max_task_multiplier, + compactor_context + .storage_opts + .compactor_config + .compactor_max_task_multiplier, 0.0 ); let max_pull_task_count = (cpu_core_num as f32 - * compactor_context.storage_opts.compactor_max_task_multiplier) + * compactor_context + .storage_opts + .compactor_config + .compactor_max_task_multiplier) .ceil() as u32; let join_handle = tokio::spawn(async move { diff --git a/src/storage/src/opts.rs b/src/storage/src/opts.rs index 4cbefd7da24e..18fe91aaee48 100644 --- a/src/storage/src/opts.rs +++ b/src/storage/src/opts.rs @@ -13,7 +13,8 @@ // limitations under the License. use risingwave_common::config::{ - extract_storage_memory_config, ObjectStoreConfig, RwConfig, StorageMemoryConfig, + extract_storage_memory_config, CompactorConfig, ObjectStoreConfig, RwConfig, + StorageMemoryConfig, }; use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_common::system_param::system_params_for_test; @@ -122,16 +123,13 @@ pub struct StorageOpts { /// object store read timeout. pub object_store_read_timeout_ms: u64, - pub compactor_max_sst_key_count: u64, - pub compactor_max_task_multiplier: f32, - pub compactor_max_sst_size: u64, - /// enable FastCompactorRunner. - pub enable_fast_compaction: bool, pub max_preload_io_retry_times: usize, pub mem_table_spill_threshold: usize, pub object_store_config: ObjectStoreConfig, + + pub compactor_config: CompactorConfig, } impl Default for StorageOpts { @@ -232,22 +230,19 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt .storage .object_store .object_store_streaming_read_timeout_ms, - compact_iter_recreate_timeout_ms: c.storage.compact_iter_recreate_timeout_ms, + compact_iter_recreate_timeout_ms: c.storage.compactor.compact_iter_recreate_timeout_ms, object_store_streaming_upload_timeout_ms: c .storage .object_store .object_store_streaming_upload_timeout_ms, object_store_read_timeout_ms: c.storage.object_store.object_store_read_timeout_ms, object_store_upload_timeout_ms: c.storage.object_store.object_store_upload_timeout_ms, - max_preload_io_retry_times: c.storage.max_preload_io_retry_times, backup_storage_url: p.backup_storage_url().to_string(), backup_storage_directory: p.backup_storage_directory().to_string(), - compactor_max_sst_key_count: c.storage.compactor_max_sst_key_count, - compactor_max_task_multiplier: c.storage.compactor_max_task_multiplier, - compactor_max_sst_size: c.storage.compactor_max_sst_size, - enable_fast_compaction: c.storage.enable_fast_compaction, + max_preload_io_retry_times: c.storage.max_preload_io_retry_times, mem_table_spill_threshold: c.storage.mem_table_spill_threshold, object_store_config: c.storage.object_store.clone(), + compactor_config: c.storage.compactor.clone(), } } } From aa63f3e1cd23734d38f262ebf5bbacfcef15bc4b Mon Sep 17 00:00:00 2001 From: Li0k Date: Mon, 4 Dec 2023 19:23:02 +0800 Subject: [PATCH 3/5] chore(storage): rename --- src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs | 4 ++-- ...action_status.rs => rw_hummock_compact_task_assignment.rs} | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) rename src/frontend/src/catalog/system_catalog/rw_catalog/{rw_hummock_compaction_status.rs => rw_hummock_compact_task_assignment.rs} (98%) diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs index 2c783039a1ab..401ce0b3b6c2 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs @@ -22,9 +22,9 @@ mod rw_event_logs; mod rw_fragments; mod rw_functions; mod rw_hummock_branched_objects; +mod rw_hummock_compact_task_assignment; mod rw_hummock_compact_task_progress; mod rw_hummock_compaction_group_configs; -mod rw_hummock_compaction_status; mod rw_hummock_meta_configs; mod rw_hummock_pinned_snapshots; mod rw_hummock_pinned_versions; @@ -60,9 +60,9 @@ pub use rw_event_logs::*; pub use rw_fragments::*; pub use rw_functions::*; pub use rw_hummock_branched_objects::*; +pub use rw_hummock_compact_task_assignment::*; pub use rw_hummock_compact_task_progress::*; pub use rw_hummock_compaction_group_configs::*; -pub use rw_hummock_compaction_status::*; pub use rw_hummock_meta_configs::*; pub use rw_hummock_pinned_snapshots::*; pub use rw_hummock_pinned_versions::*; diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_compaction_status.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_compact_task_assignment.rs similarity index 98% rename from src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_compaction_status.rs rename to src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_compact_task_assignment.rs index f2519c46b32a..222b9d2ca833 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_compaction_status.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_compact_task_assignment.rs @@ -22,7 +22,7 @@ use serde_json::json; use crate::catalog::system_catalog::{BuiltinTable, SysCatalogReaderImpl}; pub const RW_HUMMOCK_COMPACT_TASK_ASSIGNMENT: BuiltinTable = BuiltinTable { - name: "RW_HUMMOCK_COMPACT_TASK_ASSIGNMENT", + name: "rw_hummock_compact_task_assignment", schema: RW_CATALOG_SCHEMA_NAME, columns: &[ (DataType::Int64, "compaction_group_id"), From fe81d5130259a68c75f3f0ae531be52122a20836 Mon Sep 17 00:00:00 2001 From: Li0k Date: Mon, 4 Dec 2023 19:24:08 +0800 Subject: [PATCH 4/5] chore(storage): refine log --- src/meta/src/hummock/manager/mod.rs | 2 +- src/storage/src/hummock/sstable/builder.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 703470c3d621..cd7baa4546aa 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -2931,7 +2931,7 @@ impl HummockManager { .await; match ret { Ok((new_group_id, table_vnode_partition_count)) => { - tracing::info!("move state table [{}] from group-{} to group-{} success, Allow split by table: false", table_id, parent_group_id, new_group_id); + tracing::info!("move state table [{}] from group-{} to group-{} success table_vnode_partition_count {:?}", table_id, parent_group_id, new_group_id, table_vnode_partition_count); return TableAlignRule::SplitToDedicatedCg(( new_group_id, table_vnode_partition_count, diff --git a/src/storage/src/hummock/sstable/builder.rs b/src/storage/src/hummock/sstable/builder.rs index bc946488a720..602a8725d08f 100644 --- a/src/storage/src/hummock/sstable/builder.rs +++ b/src/storage/src/hummock/sstable/builder.rs @@ -65,7 +65,7 @@ impl From<&StorageOpts> for SstableBuilderOptions { restart_interval: DEFAULT_RESTART_INTERVAL, bloom_false_positive: options.bloom_false_positive, compression_algorithm: CompressionAlgorithm::None, - max_sst_size: options.compactor_max_sst_size, + max_sst_size: options.compactor_config.compactor_max_sst_size, } } } From 1a6207e2bb0ef4e9574bad03c7830dd92464a0e9 Mon Sep 17 00:00:00 2001 From: Li0k Date: Mon, 4 Dec 2023 19:49:29 +0800 Subject: [PATCH 5/5] refactor(storage): revert config breaking change --- src/common/src/config.rs | 72 +++++++++---------- src/config/example.toml | 2 + .../src/hummock/compactor/compactor_runner.rs | 14 +--- src/storage/src/hummock/compactor/mod.rs | 10 +-- src/storage/src/hummock/sstable/builder.rs | 2 +- src/storage/src/opts.rs | 25 +++++-- 6 files changed, 58 insertions(+), 67 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index b581caab8407..a1e5d5fde757 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -532,6 +532,12 @@ pub struct StorageConfig { #[serde(default)] pub compactor_memory_limit_mb: Option, + /// Compactor calculates the maximum number of tasks that can be executed on the node based on + /// worker_num and compactor_max_task_multiplier. + /// max_pull_task_count = worker_num * compactor_max_task_multiplier + #[serde(default = "default::storage::compactor_max_task_multiplier")] + pub compactor_max_task_multiplier: f32, + /// The percentage of memory available when compactor is deployed separately. /// non_reserved_memory_bytes = system_memory_available_bytes * compactor_memory_available_proportion #[serde(default = "default::storage::compactor_memory_available_proportion")] @@ -567,21 +573,32 @@ pub struct StorageConfig { #[serde(default = "default::storage::max_version_pinning_duration_sec")] pub max_version_pinning_duration_sec: u64, - #[serde(default, flatten)] - pub unrecognized: Unrecognized, - + #[serde(default = "default::storage::compactor_max_sst_key_count")] + pub compactor_max_sst_key_count: u64, + #[serde(default = "default::storage::compact_iter_recreate_timeout_ms")] + pub compact_iter_recreate_timeout_ms: u64, + #[serde(default = "default::storage::compactor_max_sst_size")] + pub compactor_max_sst_size: u64, + #[serde(default = "default::storage::enable_fast_compaction")] + pub enable_fast_compaction: bool, #[serde(default = "default::storage::max_preload_io_retry_times")] pub max_preload_io_retry_times: usize, + #[serde(default = "default::storage::compactor_fast_max_compact_delete_ratio")] + pub compactor_fast_max_compact_delete_ratio: u32, + + #[serde(default = "default::storage::compactor_fast_max_compact_task_size")] + pub compactor_fast_max_compact_task_size: u64, + + #[serde(default, flatten)] + pub unrecognized: Unrecognized, + /// The spill threshold for mem table. #[serde(default = "default::storage::mem_table_spill_threshold")] pub mem_table_spill_threshold: usize, #[serde(default)] pub object_store: ObjectStoreConfig, - - #[serde(default)] - pub compactor: CompactorConfig, } #[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)] @@ -882,28 +899,6 @@ pub struct S3ObjectStoreConfig { pub object_store_req_retry_max_attempts: usize, } -/// The subsections `[storage.object_store]`. -#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)] -pub struct CompactorConfig { - /// Compactor calculates the maximum number of tasks that can be executed on the node based on - /// worker_num and compactor_max_task_multiplier. - /// max_pull_task_count = worker_num * compactor_max_task_multiplier - #[serde(default = "default::storage::compactor_max_task_multiplier")] - pub compactor_max_task_multiplier: f32, - #[serde(default = "default::compactor::compactor_max_sst_key_count")] - pub compactor_max_sst_key_count: u64, - #[serde(default = "default::compactor::compact_iter_recreate_timeout_ms")] - pub compact_iter_recreate_timeout_ms: u64, - #[serde(default = "default::compactor::compactor_max_sst_size")] - pub compactor_max_sst_size: u64, - #[serde(default = "default::compactor::enable_fast_compaction")] - pub enable_fast_compaction: bool, - #[serde(default = "default::compactor::fast_max_compact_delete_ratio")] - pub fast_max_compact_delete_ratio: u32, - #[serde(default = "default::compactor::fast_max_compact_task_size")] - pub fast_max_compact_task_size: u64, -} - impl SystemConfig { #![allow(deprecated)] pub fn into_init_system_params(self) -> SystemParams { @@ -1140,16 +1135,6 @@ pub mod default { 3 * 3600 } - pub fn max_preload_io_retry_times() -> usize { - 3 - } - - pub fn mem_table_spill_threshold() -> usize { - 4 << 20 - } - } - - pub mod compactor { pub fn compactor_max_sst_key_count() -> u64 { 2 * 1024 * 1024 // 200w } @@ -1166,11 +1151,18 @@ pub mod default { true } - pub fn fast_max_compact_delete_ratio() -> u32 { + pub fn max_preload_io_retry_times() -> usize { + 3 + } + pub fn mem_table_spill_threshold() -> usize { + 4 << 20 + } + + pub fn compactor_fast_max_compact_delete_ratio() -> u32 { 40 } - pub fn fast_max_compact_task_size() -> u64 { + pub fn compactor_fast_max_compact_task_size() -> u64 { 2 * 1024 * 1024 * 1024 // 2g } } diff --git a/src/config/example.toml b/src/config/example.toml index f142c90eef75..fddd8f34bb79 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -110,6 +110,8 @@ compact_iter_recreate_timeout_ms = 600000 compactor_max_sst_size = 536870912 enable_fast_compaction = true max_preload_io_retry_times = 3 +compactor_fast_max_compact_delete_ratio = 40 +compactor_fast_max_compact_task_size = 2147483648 mem_table_spill_threshold = 4194304 [storage.data_file_cache] diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 0583a75eae89..a137b1f101a6 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -376,24 +376,16 @@ pub async fn compact( .iter() .map(|table_info| table_info.total_key_count) .sum::(); - let optimize_by_copy_block = context.storage_opts.compactor_config.enable_fast_compaction + let optimize_by_copy_block = context.storage_opts.enable_fast_compaction && all_ssts_are_blocked_filter && !has_tombstone && !has_ttl && single_table && compact_task.target_level > 0 && compact_task.input_ssts.len() == 2 - && compaction_size - < context - .storage_opts - .compactor_config - .fast_max_compact_task_size + && compaction_size < context.storage_opts.compactor_fast_max_compact_task_size && delete_key_count * 100 - < context - .storage_opts - .compactor_config - .fast_max_compact_delete_ratio as u64 - * total_key_count + < context.storage_opts.compactor_fast_max_compact_delete_ratio as u64 * total_key_count && compact_task.task_type() == TaskType::Dynamic; if !optimize_by_copy_block { match generate_splits(&sstable_infos, compaction_size, context.clone()).await { diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index 1bfa80ecc293..156bc55f9e67 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -308,17 +308,11 @@ pub fn start_compactor( let pull_task_ack = Arc::new(AtomicBool::new(true)); assert_ge!( - compactor_context - .storage_opts - .compactor_config - .compactor_max_task_multiplier, + compactor_context.storage_opts.compactor_max_task_multiplier, 0.0 ); let max_pull_task_count = (cpu_core_num as f32 - * compactor_context - .storage_opts - .compactor_config - .compactor_max_task_multiplier) + * compactor_context.storage_opts.compactor_max_task_multiplier) .ceil() as u32; let join_handle = tokio::spawn(async move { diff --git a/src/storage/src/hummock/sstable/builder.rs b/src/storage/src/hummock/sstable/builder.rs index 602a8725d08f..bc946488a720 100644 --- a/src/storage/src/hummock/sstable/builder.rs +++ b/src/storage/src/hummock/sstable/builder.rs @@ -65,7 +65,7 @@ impl From<&StorageOpts> for SstableBuilderOptions { restart_interval: DEFAULT_RESTART_INTERVAL, bloom_false_positive: options.bloom_false_positive, compression_algorithm: CompressionAlgorithm::None, - max_sst_size: options.compactor_config.compactor_max_sst_size, + max_sst_size: options.compactor_max_sst_size, } } } diff --git a/src/storage/src/opts.rs b/src/storage/src/opts.rs index 18fe91aaee48..5650fc48ce19 100644 --- a/src/storage/src/opts.rs +++ b/src/storage/src/opts.rs @@ -13,8 +13,7 @@ // limitations under the License. use risingwave_common::config::{ - extract_storage_memory_config, CompactorConfig, ObjectStoreConfig, RwConfig, - StorageMemoryConfig, + extract_storage_memory_config, ObjectStoreConfig, RwConfig, StorageMemoryConfig, }; use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_common::system_param::system_params_for_test; @@ -123,13 +122,18 @@ pub struct StorageOpts { /// object store read timeout. pub object_store_read_timeout_ms: u64, + pub compactor_max_sst_key_count: u64, + pub compactor_max_task_multiplier: f32, + pub compactor_max_sst_size: u64, + /// enable FastCompactorRunner. + pub enable_fast_compaction: bool, pub max_preload_io_retry_times: usize, + pub compactor_fast_max_compact_delete_ratio: u32, + pub compactor_fast_max_compact_task_size: u64, pub mem_table_spill_threshold: usize, pub object_store_config: ObjectStoreConfig, - - pub compactor_config: CompactorConfig, } impl Default for StorageOpts { @@ -230,19 +234,26 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt .storage .object_store .object_store_streaming_read_timeout_ms, - compact_iter_recreate_timeout_ms: c.storage.compactor.compact_iter_recreate_timeout_ms, + compact_iter_recreate_timeout_ms: c.storage.compact_iter_recreate_timeout_ms, object_store_streaming_upload_timeout_ms: c .storage .object_store .object_store_streaming_upload_timeout_ms, object_store_read_timeout_ms: c.storage.object_store.object_store_read_timeout_ms, object_store_upload_timeout_ms: c.storage.object_store.object_store_upload_timeout_ms, + max_preload_io_retry_times: c.storage.max_preload_io_retry_times, backup_storage_url: p.backup_storage_url().to_string(), backup_storage_directory: p.backup_storage_directory().to_string(), - max_preload_io_retry_times: c.storage.max_preload_io_retry_times, + compactor_max_sst_key_count: c.storage.compactor_max_sst_key_count, + compactor_max_task_multiplier: c.storage.compactor_max_task_multiplier, + compactor_max_sst_size: c.storage.compactor_max_sst_size, + enable_fast_compaction: c.storage.enable_fast_compaction, mem_table_spill_threshold: c.storage.mem_table_spill_threshold, object_store_config: c.storage.object_store.clone(), - compactor_config: c.storage.compactor.clone(), + compactor_fast_max_compact_delete_ratio: c + .storage + .compactor_fast_max_compact_delete_ratio, + compactor_fast_max_compact_task_size: c.storage.compactor_fast_max_compact_task_size, } } }