Skip to content

Commit

Permalink
fix(compaction): clean table write throughput statistic when unregister
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k committed Nov 22, 2024
1 parent d2cf5b3 commit 38092fe
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 8 deletions.
3 changes: 2 additions & 1 deletion src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,8 @@ impl HummockManager {
params.checkpoint_frequency() * barrier_interval_ms / 1000,
);

let mut table_throughput_statistic_manager = self.history_table_throughput.write();
let mut table_throughput_statistic_manager =
self.table_write_throughput_statistic_manager.write();
let timestamp = chrono::Utc::now().timestamp();

for (table_id, stat) in table_stats {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ impl HummockManager {
.cloned()
.filter(|table_id| !valid_ids.contains(table_id))
.collect_vec();

// As we have released versioning lock, the version that `to_unregister` is calculated from
// may not be the same as the one used in unregister_table_ids. It is OK.
self.unregister_table_ids(to_unregister).await
Expand Down Expand Up @@ -280,6 +281,18 @@ impl HummockManager {
if table_ids.peek().is_none() {
return Ok(());
}

{
// Remove table write throughput statistics
// The Caller acquires `Send`, so we should safely use `write` lock before the await point.
// The table write throughput statistic accepts data inconsistencies (unregister table ids fail), so we can clean it up in advance.
let mut table_write_throughput_statistic_manager =
self.table_write_throughput_statistic_manager.write();
for table_id in table_ids.by_ref() {
table_write_throughput_statistic_manager.remove_table(table_id.table_id);
}
}

let mut versioning_guard = self.versioning.write().await;
let versioning = versioning_guard.deref_mut();
let mut version = HummockVersionTransaction::new(
Expand Down Expand Up @@ -356,6 +369,7 @@ impl HummockManager {
version.latest_version(),
)));
commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;

// No need to handle DeltaType::GroupDestroy during time travel.
Ok(())
}
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/hummock/manager/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1473,7 +1473,8 @@ impl HummockManager {
.opts
.compact_task_table_size_partition_threshold_high;
// check latest write throughput
let history_table_throughput_guard = self.history_table_throughput.read();
let history_table_throughput_guard =
self.table_write_throughput_statistic_manager.read();
let timestamp = chrono::Utc::now().timestamp();
for (table_id, compact_table_size) in table_size_info {
let write_throughput = history_table_throughput_guard
Expand Down
5 changes: 3 additions & 2 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ pub struct HummockManager {
version_checkpoint_path: String,
version_archive_dir: String,
pause_version_checkpoint: AtomicBool,
history_table_throughput: parking_lot::RwLock<TableWriteThroughputStatisticManager>,
table_write_throughput_statistic_manager:
parking_lot::RwLock<TableWriteThroughputStatisticManager>,

// for compactor
// `compactor_streams_change_tx` is used to pass the mapping from `context_id` to event_stream
Expand Down Expand Up @@ -284,7 +285,7 @@ impl HummockManager {
version_checkpoint_path,
version_archive_dir,
pause_version_checkpoint: AtomicBool::new(false),
history_table_throughput: parking_lot::RwLock::new(
table_write_throughput_statistic_manager: parking_lot::RwLock::new(
TableWriteThroughputStatisticManager::new(max_table_statistic_expired_time),
),
compactor_streams_change_tx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,8 @@ impl TableWriteThroughputStatisticManager {
.filter(move |statistic| !statistic.is_expired(window_secs, timestamp_secs))
})
}

pub fn remove_table(&mut self, table_id: u32) {
self.table_throughput.remove(&table_id);
}
}
11 changes: 7 additions & 4 deletions src/meta/src/hummock/manager/timer_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,10 @@ impl HummockManager {
.compaction_group_count
.set(compaction_group_count as i64);

let tables_throughput =
hummock_manager.history_table_throughput.read().clone();
let tables_throughput = hummock_manager
.table_write_throughput_statistic_manager
.read()
.clone();

let current_version_levels = &hummock_manager
.versioning
Expand Down Expand Up @@ -565,7 +567,7 @@ impl HummockManager {
/// 1. `state table throughput`: If the table is in a high throughput state and it belongs to a multi table group, then an attempt will be made to split the table into separate compaction groups to increase its throughput and reduce the impact on write amplification.
/// 2. `group size`: If the group size has exceeded the set upper limit, e.g. `max_group_size` * `split_group_size_ratio`
async fn on_handle_schedule_group_split(&self) {
let table_write_throughput = self.history_table_throughput.read().clone();
let table_write_throughput = self.table_write_throughput_statistic_manager.read().clone();
let mut group_infos = self.calculate_compaction_group_statistic().await;
group_infos.sort_by_key(|group| group.group_size);
group_infos.reverse();
Expand Down Expand Up @@ -607,7 +609,8 @@ impl HummockManager {
return;
}
};
let table_write_throughput_statistic_manager = self.history_table_throughput.read().clone();
let table_write_throughput_statistic_manager =
self.table_write_throughput_statistic_manager.read().clone();
let mut group_infos = self.calculate_compaction_group_statistic().await;
// sort by first table id for deterministic merge order
group_infos.sort_by_key(|group| {
Expand Down

0 comments on commit 38092fe

Please sign in to comment.