Skip to content

Commit

Permalink
add config
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k committed Oct 29, 2024
1 parent 742bcb4 commit 6ef8ba3
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 30 deletions.
16 changes: 16 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,14 @@ pub struct MetaConfig {

#[serde(default = "default::meta::periodic_scheduling_compaction_group_merge_interval_sec")]
pub periodic_scheduling_compaction_group_merge_interval_sec: u64,

#[serde(default = "default::meta::periodic_table_stat_throuput_reclaim_interval_sec")]
pub periodic_table_stat_throuput_reclaim_interval_sec: u64,

/// The interval of reclaiming old table throughput statistics. The old statistics with timestamp < now - `table_stat_old_throuput_reclaim_interval_sec` will be removed.
#[serde(default = "default::meta::table_stat_old_throuput_reclaim_interval_sec")]
pub table_stat_old_throuput_reclaim_interval_sec: u64,

#[serde(default)]
#[config_doc(nested)]
pub meta_store_config: MetaStoreConfig,
Expand Down Expand Up @@ -1578,6 +1586,14 @@ pub mod default {
pub fn periodic_scheduling_compaction_group_merge_interval_sec() -> u64 {
60 * 10 // 10min
}

pub fn periodic_table_stat_throuput_reclaim_interval_sec() -> u64 {
60 // 1min
}

pub fn table_stat_old_throuput_reclaim_interval_sec() -> u64 {
60 * 10 // 10min
}
}

pub mod server {
Expand Down
6 changes: 6 additions & 0 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,9 @@ pub fn start(
periodic_scheduling_compaction_group_merge_interval_sec: config
.meta
.periodic_scheduling_compaction_group_merge_interval_sec,
periodic_table_stat_throuput_reclaim_interval_sec: config
.meta
.periodic_table_stat_throuput_reclaim_interval_sec,
table_high_write_throughput_threshold: config
.meta
.table_high_write_throughput_threshold,
Expand Down Expand Up @@ -447,6 +450,9 @@ pub fn start(
table_stat_throuput_window_seconds_for_merge: config
.meta
.table_stat_throuput_window_seconds_for_merge,
table_stat_old_throuput_reclaim_interval_sec: config
.meta
.table_stat_old_throuput_reclaim_interval_sec,
object_store_config: config.storage.object_store,
max_trivial_move_task_count_per_loop: config
.meta
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use risingwave_pb::hummock::compact_task::{self};
use risingwave_pb::hummock::CompactionConfig;
use sea_orm::TransactionTrait;

use super::TableThroughputStatistic;
use super::TableWriteThroughputStatistic;
use crate::hummock::error::{Error, Result};
use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
use crate::hummock::manager::transaction::{
Expand Down Expand Up @@ -359,7 +359,7 @@ impl HummockManager {
for (table_id, stat) in table_stats {
let throughput = (stat.total_value_size + stat.total_key_size) as u64;
let entry = table_infos.entry(table_id).or_default();
entry.push_back(TableThroughputStatistic {
entry.push_back(TableWriteThroughputStatistic {
throughput,
timestamp,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::hummock::manager::transaction::HummockVersionTransaction;
use crate::hummock::manager::{commit_multi_var, HummockManager};
use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id};
use crate::hummock::TableThroughputStatistic;
use crate::hummock::TableWriteThroughputStatistic;

impl HummockManager {
pub async fn merge_compaction_group(
Expand Down Expand Up @@ -693,7 +693,7 @@ impl HummockManager {
/// Split the compaction group if the group is too large or contains high throughput tables.
pub async fn try_split_compaction_group(
&self,
table_write_throughput: &HashMap<u32, VecDeque<TableThroughputStatistic>>,
table_write_throughput: &HashMap<u32, VecDeque<TableWriteThroughputStatistic>>,
checkpoint_secs: u64,
group: CompactionGroupStatistic,
) {
Expand Down Expand Up @@ -724,7 +724,7 @@ impl HummockManager {
/// Try to move the high throughput table to a dedicated compaction group.
pub async fn try_move_high_throughput_table_to_dedicated_cg(
&self,
table_write_throughput: &HashMap<u32, VecDeque<TableThroughputStatistic>>,
table_write_throughput: &HashMap<u32, VecDeque<TableWriteThroughputStatistic>>,
table_id: &u32,
_table_size: &u64,
checkpoint_secs: u64,
Expand Down Expand Up @@ -831,7 +831,7 @@ impl HummockManager {

pub async fn try_merge_compaction_group(
&self,
table_write_throughput: &HashMap<u32, VecDeque<TableThroughputStatistic>>,
table_write_throughput: &HashMap<u32, VecDeque<TableWriteThroughputStatistic>>,
group: &CompactionGroupStatistic,
next_group: &CompactionGroupStatistic,
checkpoint_secs: u64,
Expand Down Expand Up @@ -965,7 +965,7 @@ impl HummockManager {

/// Check if the table is high write throughput with the given threshold and ratio.
pub fn is_table_high_write_throughput(
table_throughput: &VecDeque<TableThroughputStatistic>,
table_throughput: &VecDeque<TableWriteThroughputStatistic>,
sample_size: usize,
threshold: u64,
high_write_throughput_ratio: f64,
Expand All @@ -986,7 +986,7 @@ pub fn is_table_high_write_throughput(
}

pub fn is_table_low_write_throughput(
table_throughput: &VecDeque<TableThroughputStatistic>,
table_throughput: &VecDeque<TableWriteThroughputStatistic>,
sample_size: usize,
threshold: u64,
low_write_throughput_ratio: f64,
Expand All @@ -1007,34 +1007,48 @@ pub fn is_table_low_write_throughput(
}

fn check_is_low_write_throughput_compaction_group(
table_write_throughput: &HashMap<u32, VecDeque<TableThroughputStatistic>>,
table_write_throughput: &HashMap<u32, VecDeque<TableWriteThroughputStatistic>>,
sample_size: usize,
threshold: u64,
group: &CompactionGroupStatistic,
low_write_throughput_ratio: f64,
) -> bool {
// check table exists
let live_table = group
// check table exists and partitioned table_id into two groups
let (live_table_with_enough_statistic, live_table_without_enough_statistic): (
Vec<StateTableId>,
Vec<StateTableId>,
) = group
.table_statistic
.keys()
.filter(|table_id| table_write_throughput.contains_key(table_id))
.filter(|table_id| table_write_throughput.get(table_id).unwrap().len() >= sample_size)
.cloned()
.collect_vec();
.partition(|table_id| table_write_throughput.get(table_id).unwrap().len() >= sample_size);

// if all tables in the group do not have enough statistics, return false
if live_table_with_enough_statistic.is_empty() {
// if all tables in the group do not have statistics, it means the group is in the silent state, return true
if live_table_without_enough_statistic
.into_iter()
.all(|table_id| table_write_throughput.get(&table_id).unwrap().is_empty())
{
return true;
}

if live_table.is_empty() {
return false;
}

live_table.into_iter().all(|table_id| {
let table_write_throughput = table_write_throughput.get(&table_id).unwrap();
is_table_low_write_throughput(
table_write_throughput,
sample_size,
threshold,
low_write_throughput_ratio,
)
})
// check if all tables in the group are low write throughput with enough statistics
live_table_with_enough_statistic
.into_iter()
.all(|table_id| {
let table_write_throughput = table_write_throughput.get(&table_id).unwrap();
is_table_low_write_throughput(
table_write_throughput,
sample_size,
threshold,
low_write_throughput_ratio,
)
})
}

fn check_is_creating_compaction_group(
Expand Down
5 changes: 3 additions & 2 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ pub struct HummockManager {
version_checkpoint_path: String,
version_archive_dir: String,
pause_version_checkpoint: AtomicBool,
history_table_throughput: parking_lot::RwLock<HashMap<u32, VecDeque<TableThroughputStatistic>>>,
history_table_throughput:
parking_lot::RwLock<HashMap<u32, VecDeque<TableWriteThroughputStatistic>>>,

// for compactor
// `compactor_streams_change_tx` is used to pass the mapping from `context_id` to event_stream
Expand Down Expand Up @@ -497,7 +498,7 @@ async fn write_exclusive_cluster_id(
}

#[derive(Debug, Clone)]
pub struct TableThroughputStatistic {
pub struct TableWriteThroughputStatistic {
pub throughput: u64,
pub timestamp: i64,
}
33 changes: 28 additions & 5 deletions src/meta/src/hummock/manager/timer_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ impl HummockManager {
FullGc,

GroupScheduleMerge,
ReclaimTableWriteThroughputStatistic,
}
let mut check_compact_trigger_interval =
tokio::time::interval(Duration::from_secs(CHECK_PENDING_TASK_PERIOD_SEC));
Expand Down Expand Up @@ -181,6 +182,23 @@ impl HummockManager {
triggers.push(Box::pin(group_scheduling_merge_trigger));
}

let periodic_table_stat_throuput_reclaim_interval_sec = hummock_manager
.env
.opts
.periodic_table_stat_throuput_reclaim_interval_sec;

if periodic_table_stat_throuput_reclaim_interval_sec > 0 {
let mut table_stat_throuput_reclaim_trigger_interval = tokio::time::interval(
Duration::from_secs(periodic_table_stat_throuput_reclaim_interval_sec),
);
table_stat_throuput_reclaim_trigger_interval
.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let table_stat_throuput_reclaim_trigger =
IntervalStream::new(table_stat_throuput_reclaim_trigger_interval)
.map(|_| HummockTimerEvent::ReclaimTableWriteThroughputStatistic);
triggers.push(Box::pin(table_stat_throuput_reclaim_trigger));
}

let event_stream = select_all(triggers);
use futures::pin_mut;
pin_mut!(event_stream);
Expand Down Expand Up @@ -213,11 +231,6 @@ impl HummockManager {
continue;
}

// TODO(li0k): replace with config after PR #18806
const RECLAIM_TABLE_WRITE_THROUGHPUT_PERIOD_SEC: i64 = 60 * 10; // 10 minutes
hummock_manager.reclaim_table_write_throughput(
RECLAIM_TABLE_WRITE_THROUGHPUT_PERIOD_SEC,
);
hummock_manager.on_handle_schedule_group_split().await;
}

Expand Down Expand Up @@ -448,6 +461,16 @@ impl HummockManager {
tracing::info!("Start full GC from meta node.");
}
}

HummockTimerEvent::ReclaimTableWriteThroughputStatistic => {
hummock_manager.reclaim_table_write_throughput(
hummock_manager
.env
.opts
.table_stat_old_throuput_reclaim_interval_sec
as _,
);
}
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ pub struct MetaOpts {
/// The window seconds of table throughput statistic history for merge compaction group.
pub table_stat_throuput_window_seconds_for_merge: usize,

pub periodic_table_stat_throuput_reclaim_interval_sec: u64,

/// The interval of reclaiming old table throughput statistics. The old statistics with timestamp < now - `table_stat_old_throuput_reclaim_interval_sec` will be removed.
pub table_stat_old_throuput_reclaim_interval_sec: u64,

/// The configuration of the object store
pub object_store_config: ObjectStoreConfig,

Expand Down Expand Up @@ -324,6 +329,8 @@ impl MetaOpts {
table_stat_throuput_window_seconds_for_split: 60,
table_stat_throuput_window_seconds_for_merge: 240,
periodic_scheduling_compaction_group_merge_interval_sec: 60 * 10,
periodic_table_stat_throuput_reclaim_interval_sec: 60,
table_stat_old_throuput_reclaim_interval_sec: 60 * 10,
license_key_path: None,
}
}
Expand Down

0 comments on commit 6ef8ba3

Please sign in to comment.