Skip to content

Commit

Permalink
do not split small group
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed May 27, 2024
1 parent cfad254 commit a4a2c69
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 72 deletions.
11 changes: 9 additions & 2 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,9 @@ pub struct MetaConfig {
#[serde(default = "default::meta::split_group_size_limit")]
pub split_group_size_limit: u64,

#[serde(default = "default::meta::max_group_size")]
pub max_group_size: u64,

#[serde(default = "default::meta::cut_table_size_limit")]
pub cut_table_size_limit: u64,

Expand Down Expand Up @@ -1235,11 +1238,15 @@ pub mod default {
}

pub fn move_table_size_limit() -> u64 {
10 * 1024 * 1024 * 1024 // 10GB
64 * 1024 * 1024 * 1024 // 64GB
}

pub fn split_group_size_limit() -> u64 {
64 * 1024 * 1024 * 1024 // 64GB
256 * 1024 * 1024 * 1024 // 256GB
}

pub fn max_group_size() -> u64 {
1024 * 1024 * 1024 * 1024 // 1TB
}

pub fn partition_vnode_count() -> u32 {
Expand Down
1 change: 1 addition & 0 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
.meta
.periodic_split_compact_group_interval_sec,
split_group_size_limit: config.meta.split_group_size_limit,
max_group_size: config.meta.max_group_size,
min_table_split_size: config.meta.move_table_size_limit,
table_write_throughput_threshold: config.meta.table_write_throughput_threshold,
min_table_split_write_throughput: config.meta.min_table_split_write_throughput,
Expand Down
195 changes: 125 additions & 70 deletions src/meta/src/hummock/manager/timer_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{HashMap, HashSet, VecDeque};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -21,7 +21,7 @@ use futures::stream::BoxStream;
use futures::{FutureExt, StreamExt};
use itertools::Itertools;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::get_compaction_group_ids;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{HummockLevelsExt, get_compaction_group_ids};
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::CompactionGroupId;
use risingwave_pb::hummock::compact_task::{self, TaskStatus};
Expand All @@ -37,6 +37,12 @@ use crate::hummock::manager::HISTORY_TABLE_INFO_STATISTIC_TIME;
use crate::hummock::metrics_utils::{trigger_lsm_stat, trigger_mv_stat};
use crate::hummock::{HummockManager, TASK_NORMAL};

pub enum TableAlignRule {
NoOptimization,
SplitToSharedGroup,
SplitToDedicatedCg(u32),
}

impl HummockManager {
pub fn hummock_timer_task(hummock_manager: Arc<Self>) -> (JoinHandle<()>, Sender<()>) {
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
Expand Down Expand Up @@ -439,36 +445,106 @@ impl HummockManager {
1,
params.checkpoint_frequency() * barrier_interval_ms / 1000,
);
let created_tables = match self.metadata_manager.get_created_table_ids().await {
Ok(created_tables) => created_tables,
Err(err) => {
tracing::warn!(error = %err.as_report(), "failed to fetch created table ids");
return;
}
};
let created_tables: HashSet<u32> = HashSet::from_iter(created_tables);
let table_write_throughput = self.history_table_throughput.read().clone();
let mut group_infos = self.calculate_compaction_group_statistic().await;
group_infos.sort_by_key(|group| group.group_size);
group_infos.reverse();

for group in &group_infos {
for group in &mut group_infos {
if group.table_statistic.len() == 1 {
// no need to handle the separate compaciton group
continue;
}

let mut split_table_ids = vec![];
for (table_id, table_size) in &group.table_statistic {
self.calculate_table_align_rule(
let rule = self.calculate_table_align_rule(
&table_write_throughput,
table_id,
table_size,
!created_tables.contains(table_id),
*table_size,
checkpoint_secs,
group.group_id,
group.group_size,
)
.await;
);
match rule {
TableAlignRule::NoOptimization => {
continue;
}

e @ (TableAlignRule::SplitToSharedGroup
| TableAlignRule::SplitToDedicatedCg(_)) => {
split_table_ids.push((*table_id, *table_size, e));
}
}
}
if split_table_ids.is_empty() || self.check_group_has_stale_sst(group.group_id).await {
continue;
}
let mut last_group_size = 0;
let mut last_group_split_table_ids = vec![];
for (table_id, table_size, rule) in split_table_ids {
match rule {
TableAlignRule::NoOptimization => {
unreachable!("table align rule can not be NoOptimization");
}
TableAlignRule::SplitToSharedGroup => {
last_group_split_table_ids.push(table_id);
last_group_size += table_size;
if last_group_size > self.env.opts.split_group_size_limit
&& group.group_size
> last_group_size + self.env.opts.split_group_size_limit
{
let ret = self
.move_state_table_to_compaction_group(
group.group_id,
&last_group_split_table_ids,
0,
)
.await;
match ret {
Ok((new_group_id, table_vnode_partition_count)) => {
group.group_size -= last_group_size;
tracing::info!("move state table [{}] from group-{} to group-{} success table_vnode_partition_count {:?}",
table_id, group.group_id, new_group_id, table_vnode_partition_count);
}
Err(e) => {
tracing::info!(
error = %e.as_report(),
"failed to move state table [{}] from group-{}",
table_id,
group.group_id,
)
}
}
last_group_split_table_ids.clear();
last_group_size = 0;
}
}
TableAlignRule::SplitToDedicatedCg(partition_vnode_count) => {
let ret = self
.move_state_table_to_compaction_group(
group.group_id,
&[table_id],
partition_vnode_count,
)
.await;
match ret {
Ok((new_group_id, table_vnode_partition_count)) => {
group.group_size -= table_size;
tracing::info!("move state table [{}] from group-{} to group-{} success table_vnode_partition_count {:?}",
table_id, group.group_id, new_group_id, table_vnode_partition_count);
}
Err(e) => {
tracing::info!(
error = %e.as_report(),
"failed to move state table [{}] from group-{}",
table_id,
group.group_id,
)
}
}
}
}
}
}
}
Expand All @@ -486,87 +562,66 @@ impl HummockManager {
}
}

async fn calculate_table_align_rule(
fn calculate_table_align_rule(
&self,
table_write_throughput: &HashMap<u32, VecDeque<u64>>,
table_id: &u32,
table_size: &u64,
is_creating_table: bool,
table_size: u64,
checkpoint_secs: u64,
parent_group_id: u64,
group_size: u64,
) {
) -> TableAlignRule {
let default_group_id: CompactionGroupId = StaticCompactionGroupId::StateDefault.into();
let mv_group_id: CompactionGroupId = StaticCompactionGroupId::MaterializedView.into();
let partition_vnode_count = self.env.opts.partition_vnode_count;
let window_size = HISTORY_TABLE_INFO_STATISTIC_TIME / (checkpoint_secs as usize);

let mut is_high_write_throughput = false;
let mut is_low_write_throughput = true;
if let Some(history) = table_write_throughput.get(table_id) {
if !is_creating_table {
if history.len() >= window_size {
is_high_write_throughput = history.iter().all(|throughput| {
*throughput / checkpoint_secs
> self.env.opts.table_write_throughput_threshold
});
is_low_write_throughput = history.iter().any(|throughput| {
*throughput / checkpoint_secs
< self.env.opts.min_table_split_write_throughput
});
}
} else {
// For creating table, relax the checking restrictions to make the data alignment behavior more sensitive.
let sum = history.iter().sum::<u64>();
is_low_write_throughput = sum
< self.env.opts.min_table_split_write_throughput
* history.len() as u64
* checkpoint_secs;
if history.len() >= window_size {
is_high_write_throughput = history.iter().all(|throughput| {
*throughput / checkpoint_secs > self.env.opts.table_write_throughput_threshold
});
}
}

let state_table_size = *table_size;

// 1. Avoid splitting a creating table
// 2. Avoid splitting a is_low_write_throughput creating table
// 3. Avoid splitting a non-high throughput medium-sized table
if is_creating_table
|| (is_low_write_throughput)
|| (state_table_size < self.env.opts.min_table_split_size && !is_high_write_throughput)
{
return;
let state_table_size = table_size;

// 1. Avoid splitting a small table.
// 2. Splitting a high throughput medium-sized table
// 3. Avoid splitting a non-high throughput table
if state_table_size < self.env.opts.min_table_split_size {
return TableAlignRule::NoOptimization;
} else if is_high_write_throughput {
return TableAlignRule::SplitToDedicatedCg(partition_vnode_count);
} else if group_size < self.env.opts.max_group_size {
return TableAlignRule::NoOptimization;
}

// do not split a large table and a small table because it would increase IOPS
// of small table.
if parent_group_id != default_group_id && parent_group_id != mv_group_id {
let rest_group_size = group_size - state_table_size;
if rest_group_size < state_table_size
&& rest_group_size < self.env.opts.min_table_split_size
&& rest_group_size < self.env.opts.split_group_size_limit
{
return;
return TableAlignRule::NoOptimization;
}
}

let ret = self
.move_state_table_to_compaction_group(
parent_group_id,
&[*table_id],
partition_vnode_count,
)
.await;
match ret {
Ok((new_group_id, table_vnode_partition_count)) => {
tracing::info!("move state table [{}] from group-{} to group-{} success table_vnode_partition_count {:?}", table_id, parent_group_id, new_group_id, table_vnode_partition_count);
}
Err(e) => {
tracing::info!(
error = %e.as_report(),
"failed to move state table [{}] from group-{}",
table_id,
parent_group_id,
)
}
if table_size > self.env.opts.split_group_size_limit {
TableAlignRule::SplitToDedicatedCg(0)
} else {
TableAlignRule::SplitToSharedGroup
}
}

pub async fn check_group_has_stale_sst(&self, group_id: u64) -> bool {
let versioning_guard = self.versioning.read().await;
let version = &versioning_guard.current_version;
match version.levels.get(&group_id) {
Some(group) => group.check_reclaim_sst_exist(),
None => false,
}
}
}
3 changes: 3 additions & 0 deletions src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ pub struct MetaOpts {
/// The size limit to move a state-table to other group.
pub min_table_split_size: u64,

pub max_group_size: u64,

/// Whether config object storage bucket lifecycle to purge stale data.
pub do_not_config_object_storage_lifecycle: bool,

Expand Down Expand Up @@ -317,6 +319,7 @@ impl MetaOpts {
periodic_tombstone_reclaim_compaction_interval_sec: 60,
periodic_split_compact_group_interval_sec: 60,
split_group_size_limit: 5 * 1024 * 1024 * 1024,
max_group_size: 20 * 1024 * 1024 * 1024,
min_table_split_size: 2 * 1024 * 1024 * 1024,
compact_task_table_size_partition_threshold_low: 128 * 1024 * 1024,
compact_task_table_size_partition_threshold_high: 512 * 1024 * 1024,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,32 @@ impl Levels {
}
delete_sst_ids_set.is_empty()
}

pub fn check_reclaim_sst_exist(&self) -> bool {
let existed_table_ids: HashSet<u32> =
HashSet::from_iter(self.member_table_ids.clone().into_iter());
for level in &self.l0.as_ref().unwrap().sub_levels {
for table in &level.table_infos {
for table_id in &table.table_ids {
if !existed_table_ids.contains(table_id) {
return true;
}
}
}
}

for level in &self.levels {
for table in &level.table_infos {
for table_id in &table.table_ids {
if !existed_table_ids.contains(table_id) {
return true;
}
}
}
}

false
}
}

pub fn build_initial_compaction_group_levels(
Expand Down

0 comments on commit a4a2c69

Please sign in to comment.