Skip to content

Commit

Permalink
feat(meta): split by table according write throughput (#15547)
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace authored May 27, 2024
1 parent 1401d56 commit ac93e24
Show file tree
Hide file tree
Showing 14 changed files with 206 additions and 215 deletions.
39 changes: 36 additions & 3 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,12 @@ pub struct MetaConfig {
#[serde(default)]
pub do_not_config_object_storage_lifecycle: bool,

/// Count of partition in split group. Meta will assign this value to every new group when it splits from default-group by automatically.
/// Each partition contains aligned data of `VirtualNode::COUNT / partition_vnode_count` consecutive virtual-nodes of one state table.
#[serde(default = "default::meta::partition_vnode_count")]
pub partition_vnode_count: u32,

/// The threshold of write throughput to trigger a group split. Increase this configuration value to avoid split too many groups with few data write.
#[serde(default = "default::meta::table_write_throughput_threshold")]
pub table_write_throughput_threshold: u64,

Expand All @@ -334,8 +337,23 @@ pub struct MetaConfig {
#[config_doc(nested)]
pub compaction_config: CompactionConfig,

#[serde(default = "default::meta::hybird_partition_vnode_count")]
pub hybird_partition_vnode_count: u32,
/// Count of partitions of tables in default group and materialized view group.
/// The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment.
/// Each partition contains aligned data of `VirtualNode::COUNT / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table.
/// Set it zero to disable this feature.
#[serde(default = "default::meta::hybrid_partition_vnode_count")]
pub hybrid_partition_vnode_count: u32,

/// The threshold of table size in one compact task to decide whether to partition one table into `hybrid_partition_vnode_count` parts, which belongs to default group and materialized view group.
/// Set it max value of 64-bit number to disable this feature.
#[serde(default = "default::meta::compact_task_table_size_partition_threshold_low")]
pub compact_task_table_size_partition_threshold_low: u64,

/// The threshold of table size in one compact task to decide whether to partition one table into `partition_vnode_count` parts, which belongs to default group and materialized view group.
/// Set it max value of 64-bit number to disable this feature.
#[serde(default = "default::meta::compact_task_table_size_partition_threshold_high")]
pub compact_task_table_size_partition_threshold_high: u64,

#[serde(default = "default::meta::event_log_enabled")]
pub event_log_enabled: bool,
/// Keeps the latest N events per channel.
Expand Down Expand Up @@ -675,6 +693,9 @@ pub struct StorageConfig {
#[serde(default = "default::storage::sstable_id_remote_fetch_number")]
pub sstable_id_remote_fetch_number: u32,

#[serde(default = "default::storage::min_sstable_size_mb")]
pub min_sstable_size_mb: u32,

#[serde(default)]
pub data_file_cache: FileCacheConfig,

Expand Down Expand Up @@ -1260,10 +1281,18 @@ pub mod default {
1024 * 1024 * 1024 // 1GB
}

pub fn hybird_partition_vnode_count() -> u32 {
pub fn hybrid_partition_vnode_count() -> u32 {
4
}

pub fn compact_task_table_size_partition_threshold_low() -> u64 {
128 * 1024 * 1024 // 128MB
}

pub fn compact_task_table_size_partition_threshold_high() -> u64 {
512 * 1024 * 1024 // 512MB
}

pub fn event_log_enabled() -> bool {
true
}
Expand Down Expand Up @@ -1401,6 +1430,10 @@ pub mod default {
10
}

pub fn min_sstable_size_mb() -> u32 {
32
}

pub fn min_sst_size_for_streaming_upload() -> u64 {
// 32MB
32 * 1024 * 1024
Expand Down
9 changes: 6 additions & 3 deletions src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ This page is automatically generated by `./risedev generate-example-config`
|--------|-------------|---------|
| backend | | "Mem" |
| collect_gc_watermark_spin_interval_sec | The spin interval when collecting global GC watermark in hummock. | 5 |
| compact_task_table_size_partition_threshold_high | The threshold of table size in one compact task to decide whether to partition one table into `partition_vnode_count` parts, which belongs to default group and materialized view group. Set it max value of 64-bit number to disable this feature. | 536870912 |
| compact_task_table_size_partition_threshold_low | The threshold of table size in one compact task to decide whether to partition one table into `hybrid_partition_vnode_count` parts, which belongs to default group and materialized view group. Set it max value of 64-bit number to disable this feature. | 134217728 |
| compaction_task_max_heartbeat_interval_secs | | 30 |
| compaction_task_max_progress_interval_secs | | 600 |
| cut_table_size_limit | | 1073741824 |
Expand All @@ -36,7 +38,7 @@ This page is automatically generated by `./risedev generate-example-config`
| event_log_enabled | | true |
| full_gc_interval_sec | Interval of automatic hummock full GC. | 86400 |
| hummock_version_checkpoint_interval_sec | Interval of hummock version checkpoint. | 30 |
| hybird_partition_vnode_count | | 4 |
| hybrid_partition_vnode_count | Count of partitions of tables in default group and materialized view group. The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. Each partition contains aligned data of `VirtualNode::COUNT / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. Set it zero to disable this feature. | 4 |
| max_heartbeat_interval_secs | Maximum allowed heartbeat interval in seconds. | 60 |
| meta_leader_lease_secs | | 30 |
| min_delta_log_num_for_hummock_version_checkpoint | The minimum delta log number a new checkpoint should compact, otherwise the checkpoint attempt is rejected. | 10 |
Expand All @@ -47,14 +49,14 @@ This page is automatically generated by `./risedev generate-example-config`
| parallelism_control_batch_size | The number of streaming jobs per scaling operation. | 10 |
| parallelism_control_trigger_first_delay_sec | The first delay of parallelism control. | 30 |
| parallelism_control_trigger_period_sec | The period of parallelism control trigger. | 10 |
| partition_vnode_count | | 16 |
| partition_vnode_count | Count of partition in split group. Meta will assign this value to every new group when it splits from default-group by automatically. Each partition contains aligned data of `VirtualNode::COUNT / partition_vnode_count` consecutive virtual-nodes of one state table. | 16 |
| periodic_compaction_interval_sec | Schedule compaction for all compaction groups with this interval. | 60 |
| periodic_space_reclaim_compaction_interval_sec | Schedule `space_reclaim` compaction for all compaction groups with this interval. | 3600 |
| periodic_split_compact_group_interval_sec | | 10 |
| periodic_tombstone_reclaim_compaction_interval_sec | | 600 |
| periodic_ttl_reclaim_compaction_interval_sec | Schedule `ttl_reclaim` compaction for all compaction groups with this interval. | 1800 |
| split_group_size_limit | | 68719476736 |
| table_write_throughput_threshold | | 16777216 |
| table_write_throughput_threshold | The threshold of write throughput to trigger a group split. Increase this configuration value to avoid split too many groups with few data write. | 16777216 |
| unrecognized | | |
| vacuum_interval_sec | Interval of invoking a vacuum job, to remove stale metadata from meta store and objects from object store. | 30 |
| vacuum_spin_interval_ms | The spin interval inside a vacuum job. It avoids the vacuum job monopolizing resources of meta node. | 10 |
Expand Down Expand Up @@ -122,6 +124,7 @@ This page is automatically generated by `./risedev generate-example-config`
| meta_cache_capacity_mb | DEPRECATED: This config will be deprecated in the future version, use `storage.cache.meta_cache_capacity_mb` instead. | |
| meta_file_cache | | |
| min_sst_size_for_streaming_upload | Whether to enable streaming upload for sstable. | 33554432 |
| min_sstable_size_mb | | 32 |
| object_store | | |
| prefetch_buffer_capacity_mb | max memory usage for large query | |
| share_buffer_compaction_worker_threads_number | Worker threads number of dedicated tokio runtime for share buffer compaction. 0 means use tokio's default value (number of CPU core). | 4 |
Expand Down
5 changes: 4 additions & 1 deletion src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ table_write_throughput_threshold = 16777216
min_table_split_write_throughput = 4194304
compaction_task_max_heartbeat_interval_secs = 30
compaction_task_max_progress_interval_secs = 600
hybird_partition_vnode_count = 4
hybrid_partition_vnode_count = 4
compact_task_table_size_partition_threshold_low = 134217728
compact_task_table_size_partition_threshold_high = 536870912
event_log_enabled = true
event_log_channel_max_size = 10
enable_dropped_column_reclaim = false
Expand Down Expand Up @@ -128,6 +130,7 @@ share_buffer_upload_concurrency = 8
compactor_max_task_multiplier = 3.0
compactor_memory_available_proportion = 0.8
sstable_id_remote_fetch_number = 10
min_sstable_size_mb = 32
min_sst_size_for_streaming_upload = 33554432
max_sub_compaction = 4
max_concurrent_compaction_task_number = 16
Expand Down
8 changes: 7 additions & 1 deletion src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,12 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
table_write_throughput_threshold: config.meta.table_write_throughput_threshold,
min_table_split_write_throughput: config.meta.min_table_split_write_throughput,
partition_vnode_count: config.meta.partition_vnode_count,
compact_task_table_size_partition_threshold_low: config
.meta
.compact_task_table_size_partition_threshold_low,
compact_task_table_size_partition_threshold_high: config
.meta
.compact_task_table_size_partition_threshold_high,
do_not_config_object_storage_lifecycle: config
.meta
.do_not_config_object_storage_lifecycle,
Expand All @@ -352,7 +358,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
compaction_task_max_progress_interval_secs,
compaction_config: Some(config.meta.compaction_config),
cut_table_size_limit: config.meta.cut_table_size_limit,
hybird_partition_vnode_count: config.meta.hybird_partition_vnode_count,
hybrid_partition_node_count: config.meta.hybrid_partition_vnode_count,
event_log_enabled: config.meta.event_log_enabled,
event_log_channel_max_size: config.meta.event_log_channel_max_size,
advertise_addr: opts.advertise_addr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,6 @@ impl CompactionPicker for IntraCompactionPicker {
return None;
}

let is_l0_pending_compact =
level_handlers[0].is_level_all_pending_compact(&l0.sub_levels[0]);

if is_l0_pending_compact {
stats.skip_by_pending_files += 1;
return None;
}

if let Some(ret) = self.pick_l0_trivial_move_file(l0, level_handlers, stats) {
return Some(ret);
}
Expand Down
11 changes: 3 additions & 8 deletions src/meta/src/hummock/compaction/selector/level_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,14 +314,9 @@ impl DynamicLevelSelectorCore {
if level_idx < ctx.base_level || level_idx >= self.config.max_level as usize {
continue;
}
let upper_level = if level_idx == ctx.base_level {
0
} else {
level_idx - 1
};
let total_size = level.total_file_size
+ handlers[upper_level].get_pending_output_file_size(level.level_idx)
- handlers[level_idx].get_pending_output_file_size(level.level_idx + 1);
let output_file_size =
handlers[level_idx].get_pending_output_file_size(level.level_idx + 1);
let total_size = level.total_file_size.saturating_sub(output_file_size);
if total_size == 0 {
continue;
}
Expand Down
126 changes: 103 additions & 23 deletions src/meta/src/hummock/manager/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::{Arc, LazyLock};
use std::time::{Instant, SystemTime};
Expand Down Expand Up @@ -709,15 +723,6 @@ impl HummockManager {
}
}

let table_to_vnode_partition = match self
.group_to_table_vnode_partition
.read()
.get(&compaction_group_id)
{
Some(table_to_vnode_partition) => table_to_vnode_partition.clone(),
None => BTreeMap::default(),
};

while let Some(compact_task) = compact_status.get_compact_task(
version
.latest_version()
Expand Down Expand Up @@ -809,24 +814,18 @@ impl HummockManager {
"SUCCESS",
])
.inc();

version.apply_compact_task(&compact_task);
trivial_tasks.push(compact_task);
if trivial_tasks.len() >= self.env.opts.max_trivial_move_task_count_per_loop {
break 'outside;
}
} else {
if group_config.compaction_config.split_weight_by_vnode > 0 {
for table_id in &compact_task.existing_table_ids {
compact_task
.table_vnode_partition
.insert(*table_id, vnode_partition_count);
}
} else {
compact_task.table_vnode_partition = table_to_vnode_partition.clone();
}
compact_task
.table_vnode_partition
.retain(|table_id, _| compact_task.existing_table_ids.contains(table_id));
self.calculate_vnode_partition(
&mut compact_task,
group_config.compaction_config.as_ref(),
)
.await;
compact_task.table_watermarks = version
.latest_version()
.safe_epoch_table_watermarks(&compact_task.existing_table_ids);
Expand Down Expand Up @@ -879,7 +878,6 @@ impl HummockManager {
{
unschedule_groups.push(compaction_group_id);
}

stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
}

Expand All @@ -890,7 +888,6 @@ impl HummockManager {
compact_task_assignment,
version
)?;

self.metrics
.compact_task_batch_count
.with_label_values(&["batch_trivial_move"])
Expand Down Expand Up @@ -1141,6 +1138,7 @@ impl HummockManager {
///
/// Return Ok(false) indicates either the task is not found,
/// or the task is not owned by `context_id` when `context_id` is not None.
pub async fn report_compact_tasks(&self, report_tasks: Vec<ReportTask>) -> Result<Vec<bool>> {
let mut guard = self.compaction.write().await;
let deterministic_mode = self.env.opts.compaction_deterministic_test;
Expand Down Expand Up @@ -1450,6 +1448,88 @@ impl HummockManager {
}
}
}

pub(crate) async fn calculate_vnode_partition(
&self,
compact_task: &mut CompactTask,
compaction_config: &CompactionConfig,
) {
if compact_task.target_level > compact_task.base_level {
return;
}
if compaction_config.split_weight_by_vnode > 0 {
for table_id in &compact_task.existing_table_ids {
compact_task
.table_vnode_partition
.insert(*table_id, compact_task.split_weight_by_vnode);
}
} else {
let mut table_size_info: HashMap<u32, u64> = HashMap::default();
let mut existing_table_ids: HashSet<u32> = HashSet::default();
for input_ssts in &compact_task.input_ssts {
for sst in &input_ssts.table_infos {
existing_table_ids.extend(sst.table_ids.iter());
for table_id in &sst.table_ids {
*table_size_info.entry(*table_id).or_default() +=
sst.file_size / (sst.table_ids.len() as u64);
}
}
}
compact_task
.existing_table_ids
.retain(|table_id| existing_table_ids.contains(table_id));

let hybrid_vnode_count = self.env.opts.hybrid_partition_node_count;
let default_partition_count = self.env.opts.partition_vnode_count;
// We must ensure the partition threshold large enough to avoid too many small files.
let compact_task_table_size_partition_threshold_low = self
.env
.opts
.compact_task_table_size_partition_threshold_low;
let compact_task_table_size_partition_threshold_high = self
.env
.opts
.compact_task_table_size_partition_threshold_high;
use risingwave_common::system_param::reader::SystemParamsRead;
let params = self.env.system_params_reader().await;
let barrier_interval_ms = params.barrier_interval_ms() as u64;
let checkpoint_secs = std::cmp::max(
1,
params.checkpoint_frequency() * barrier_interval_ms / 1000,
);
// check latest write throughput
let history_table_throughput = self.history_table_throughput.read();
for (table_id, compact_table_size) in table_size_info {
let write_throughput = history_table_throughput
.get(&table_id)
.map(|que| que.back().cloned().unwrap_or(0))
.unwrap_or(0)
/ checkpoint_secs;
if compact_table_size > compact_task_table_size_partition_threshold_high
&& default_partition_count > 0
{
compact_task
.table_vnode_partition
.insert(table_id, default_partition_count);
} else if (compact_table_size > compact_task_table_size_partition_threshold_low
|| (write_throughput > self.env.opts.table_write_throughput_threshold
&& compact_table_size > compaction_config.target_file_size_base))
&& hybrid_vnode_count > 0
{
// partition for large write throughput table. But we also need to make sure that it can not be too small.
compact_task
.table_vnode_partition
.insert(table_id, hybrid_vnode_count);
} else if compact_table_size > compaction_config.target_file_size_base {
// partition for small table
compact_task.table_vnode_partition.insert(table_id, 1);
}
}
compact_task
.table_vnode_partition
.retain(|table_id, _| compact_task.existing_table_ids.contains(table_id));
}
}
}

#[cfg(any(test, feature = "test"))]
Expand Down
Loading

0 comments on commit ac93e24

Please sign in to comment.