diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index bcb42b17954a..3fae8495b1b9 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -299,7 +299,8 @@ impl HummockManager { params.checkpoint_frequency() * barrier_interval_ms / 1000, ); - let mut table_throughput_statistic_manager = self.history_table_throughput.write(); + let mut table_throughput_statistic_manager = + self.table_write_throughput_statistic_manager.write(); let timestamp = chrono::Utc::now().timestamp(); for (table_id, stat) in table_stats { diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs index 02b63ab47de6..93a859e35538 100644 --- a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs @@ -164,6 +164,7 @@ impl HummockManager { .cloned() .filter(|table_id| !valid_ids.contains(table_id)) .collect_vec(); + // As we have released versioning lock, the version that `to_unregister` is calculated from // may not be the same as the one used in unregister_table_ids. It is OK. self.unregister_table_ids(to_unregister).await @@ -280,6 +281,18 @@ impl HummockManager { if table_ids.peek().is_none() { return Ok(()); } + + { + // Remove table write throughput statistics + // The Caller acquires `Send`, so we should safely use `write` lock before the await point. + // The table write throughput statistic accepts data inconsistencies (unregister table ids fail), so we can clean it up in advance. + let mut table_write_throughput_statistic_manager = + self.table_write_throughput_statistic_manager.write(); + for table_id in table_ids.by_ref() { + table_write_throughput_statistic_manager.remove_table(table_id.table_id); + } + } + let mut versioning_guard = self.versioning.write().await; let versioning = versioning_guard.deref_mut(); let mut version = HummockVersionTransaction::new( @@ -356,6 +369,7 @@ impl HummockManager { version.latest_version(), ))); commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?; + // No need to handle DeltaType::GroupDestroy during time travel. Ok(()) } diff --git a/src/meta/src/hummock/manager/compaction/mod.rs b/src/meta/src/hummock/manager/compaction/mod.rs index e383dc34516a..a6d5fd13024a 100644 --- a/src/meta/src/hummock/manager/compaction/mod.rs +++ b/src/meta/src/hummock/manager/compaction/mod.rs @@ -1473,7 +1473,8 @@ impl HummockManager { .opts .compact_task_table_size_partition_threshold_high; // check latest write throughput - let history_table_throughput_guard = self.history_table_throughput.read(); + let history_table_throughput_guard = + self.table_write_throughput_statistic_manager.read(); let timestamp = chrono::Utc::now().timestamp(); for (table_id, compact_table_size) in table_size_info { let write_throughput = history_table_throughput_guard diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 7fe7cd07fc0c..2e5118bbcd9a 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -97,7 +97,8 @@ pub struct HummockManager { version_checkpoint_path: String, version_archive_dir: String, pause_version_checkpoint: AtomicBool, - history_table_throughput: parking_lot::RwLock, + table_write_throughput_statistic_manager: + parking_lot::RwLock, // for compactor // `compactor_streams_change_tx` is used to pass the mapping from `context_id` to event_stream @@ -284,7 +285,7 @@ impl HummockManager { version_checkpoint_path, version_archive_dir, pause_version_checkpoint: AtomicBool::new(false), - history_table_throughput: parking_lot::RwLock::new( + table_write_throughput_statistic_manager: parking_lot::RwLock::new( TableWriteThroughputStatisticManager::new(max_table_statistic_expired_time), ), compactor_streams_change_tx, diff --git a/src/meta/src/hummock/manager/table_write_throughput_statistic.rs b/src/meta/src/hummock/manager/table_write_throughput_statistic.rs index f5a685c63394..10719833838a 100644 --- a/src/meta/src/hummock/manager/table_write_throughput_statistic.rs +++ b/src/meta/src/hummock/manager/table_write_throughput_statistic.rs @@ -87,4 +87,8 @@ impl TableWriteThroughputStatisticManager { .filter(move |statistic| !statistic.is_expired(window_secs, timestamp_secs)) }) } + + pub fn remove_table(&mut self, table_id: u32) { + self.table_throughput.remove(&table_id); + } } diff --git a/src/meta/src/hummock/manager/timer_task.rs b/src/meta/src/hummock/manager/timer_task.rs index 35f6755966bd..709a7117b480 100644 --- a/src/meta/src/hummock/manager/timer_task.rs +++ b/src/meta/src/hummock/manager/timer_task.rs @@ -285,8 +285,10 @@ impl HummockManager { .compaction_group_count .set(compaction_group_count as i64); - let tables_throughput = - hummock_manager.history_table_throughput.read().clone(); + let tables_throughput = hummock_manager + .table_write_throughput_statistic_manager + .read() + .clone(); let current_version_levels = &hummock_manager .versioning @@ -565,7 +567,7 @@ impl HummockManager { /// 1. `state table throughput`: If the table is in a high throughput state and it belongs to a multi table group, then an attempt will be made to split the table into separate compaction groups to increase its throughput and reduce the impact on write amplification. /// 2. `group size`: If the group size has exceeded the set upper limit, e.g. `max_group_size` * `split_group_size_ratio` async fn on_handle_schedule_group_split(&self) { - let table_write_throughput = self.history_table_throughput.read().clone(); + let table_write_throughput = self.table_write_throughput_statistic_manager.read().clone(); let mut group_infos = self.calculate_compaction_group_statistic().await; group_infos.sort_by_key(|group| group.group_size); group_infos.reverse(); @@ -607,7 +609,8 @@ impl HummockManager { return; } }; - let table_write_throughput_statistic_manager = self.history_table_throughput.read().clone(); + let table_write_throughput_statistic_manager = + self.table_write_throughput_statistic_manager.read().clone(); let mut group_infos = self.calculate_compaction_group_statistic().await; // sort by first table id for deterministic merge order group_infos.sort_by_key(|group| {