Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storage): fix compactor sst switch condition #13790

Merged
merged 6 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,13 @@ pub struct StorageConfig {
pub enable_fast_compaction: bool,
#[serde(default = "default::storage::max_preload_io_retry_times")]
pub max_preload_io_retry_times: usize,

#[serde(default = "default::storage::compactor_fast_max_compact_delete_ratio")]
pub compactor_fast_max_compact_delete_ratio: u32,

#[serde(default = "default::storage::compactor_fast_max_compact_task_size")]
pub compactor_fast_max_compact_task_size: u64,

#[serde(default, flatten)]
pub unrecognized: Unrecognized<Self>,

Expand Down Expand Up @@ -1150,6 +1157,14 @@ pub mod default {
pub fn mem_table_spill_threshold() -> usize {
4 << 20
}

pub fn compactor_fast_max_compact_delete_ratio() -> u32 {
40
}

pub fn compactor_fast_max_compact_task_size() -> u64 {
2 * 1024 * 1024 * 1024 // 2g
}
}

pub mod streaming {
Expand Down
2 changes: 2 additions & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ compact_iter_recreate_timeout_ms = 600000
compactor_max_sst_size = 536870912
enable_fast_compaction = true
max_preload_io_retry_times = 3
compactor_fast_max_compact_delete_ratio = 40
compactor_fast_max_compact_task_size = 2147483648
mem_table_spill_threshold = 4194304

[storage.data_file_cache]
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ mod rw_event_logs;
mod rw_fragments;
mod rw_functions;
mod rw_hummock_branched_objects;
mod rw_hummock_compact_task_assignment;
mod rw_hummock_compact_task_progress;
mod rw_hummock_compaction_group_configs;
mod rw_hummock_compaction_status;
mod rw_hummock_meta_configs;
mod rw_hummock_pinned_snapshots;
mod rw_hummock_pinned_versions;
Expand Down Expand Up @@ -60,9 +60,9 @@ pub use rw_event_logs::*;
pub use rw_fragments::*;
pub use rw_functions::*;
pub use rw_hummock_branched_objects::*;
pub use rw_hummock_compact_task_assignment::*;
pub use rw_hummock_compact_task_progress::*;
pub use rw_hummock_compaction_group_configs::*;
pub use rw_hummock_compaction_status::*;
pub use rw_hummock_meta_configs::*;
pub use rw_hummock_pinned_snapshots::*;
pub use rw_hummock_pinned_versions::*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use serde_json::json;
use crate::catalog::system_catalog::{BuiltinTable, SysCatalogReaderImpl};

pub const RW_HUMMOCK_COMPACT_TASK_ASSIGNMENT: BuiltinTable = BuiltinTable {
name: "RW_HUMMOCK_COMPACT_TASK_ASSIGNMENT",
name: "rw_hummock_compact_task_assignment",
schema: RW_CATALOG_SCHEMA_NAME,
columns: &[
(DataType::Int64, "compaction_group_id"),
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2931,7 +2931,7 @@ impl HummockManager {
.await;
match ret {
Ok((new_group_id, table_vnode_partition_count)) => {
tracing::info!("move state table [{}] from group-{} to group-{} success, Allow split by table: false", table_id, parent_group_id, new_group_id);
tracing::info!("move state table [{}] from group-{} to group-{} success table_vnode_partition_count {:?}", table_id, parent_group_id, new_group_id, table_vnode_partition_count);
return TableAlignRule::SplitToDedicatedCg((
new_group_id,
table_vnode_partition_count,
Expand Down
9 changes: 4 additions & 5 deletions src/storage/src/hummock/compactor/compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ use crate::hummock::{
SstableDeleteRangeIterator, SstableStoreRef,
};
use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
const FAST_COMPACT_MAX_COMPACT_SIZE: u64 = 2 * 1024 * 1024 * 1024; // 2GB
const FAST_COMPACT_MAX_DELETE_RATIO: u64 = 40; // 40%
pub struct CompactorRunner {
compact_task: CompactTask,
compactor: Compactor,
Expand Down Expand Up @@ -372,7 +370,7 @@ pub async fn compact(

let delete_key_count = sstable_infos
.iter()
.map(|table_info| table_info.stale_key_count)
.map(|table_info| table_info.stale_key_count + table_info.range_tombstone_count)
.sum::<u64>();
let total_key_count = sstable_infos
.iter()
Expand All @@ -385,8 +383,9 @@ pub async fn compact(
&& single_table
&& compact_task.target_level > 0
&& compact_task.input_ssts.len() == 2
&& compaction_size < FAST_COMPACT_MAX_COMPACT_SIZE
&& delete_key_count * 100 < FAST_COMPACT_MAX_DELETE_RATIO * total_key_count
&& compaction_size < context.storage_opts.compactor_fast_max_compact_task_size
&& delete_key_count * 100
< context.storage_opts.compactor_fast_max_compact_delete_ratio as u64 * total_key_count
&& compact_task.task_type() == TaskType::Dynamic;
if !optimize_by_copy_block {
match generate_splits(&sstable_infos, compaction_size, context.clone()).await {
Expand Down
6 changes: 1 addition & 5 deletions src/storage/src/hummock/sstable/multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,7 @@ where
if switch_builder {
need_seal_current = true;
} else if builder.reach_capacity() {
let is_split_table = self
.table_partition_vnode
.contains_key(&full_key.user_key.table_id.table_id());

if !is_split_table || builder.reach_max_sst_size() {
if !self.is_target_level_l0_or_lbase || builder.reach_max_sst_size() {
need_seal_current = true;
} else {
need_seal_current = self.is_target_level_l0_or_lbase && vnode_changed;
Expand Down
6 changes: 6 additions & 0 deletions src/storage/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ pub struct StorageOpts {
/// enable FastCompactorRunner.
pub enable_fast_compaction: bool,
pub max_preload_io_retry_times: usize,
pub compactor_fast_max_compact_delete_ratio: u32,
pub compactor_fast_max_compact_task_size: u64,

pub mem_table_spill_threshold: usize,

Expand Down Expand Up @@ -248,6 +250,10 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt
enable_fast_compaction: c.storage.enable_fast_compaction,
mem_table_spill_threshold: c.storage.mem_table_spill_threshold,
object_store_config: c.storage.object_store.clone(),
compactor_fast_max_compact_delete_ratio: c
.storage
.compactor_fast_max_compact_delete_ratio,
compactor_fast_max_compact_task_size: c.storage.compactor_fast_max_compact_task_size,
}
}
}
Loading