diff --git a/src/meta/src/hummock/compaction/level_selector.rs b/src/meta/src/hummock/compaction/level_selector.rs index 7c6cd9fdf36a5..c2c6cafc6a43a 100644 --- a/src/meta/src/hummock/compaction/level_selector.rs +++ b/src/meta/src/hummock/compaction/level_selector.rs @@ -615,6 +615,7 @@ pub mod tests { uncompressed_file_size: sst.uncompressed_file_size, sub_level_id: sst.get_sst_id(), table_infos: vec![sst], + vnode_partition_count: 0, }); } @@ -645,6 +646,7 @@ pub mod tests { sub_level_id, table_infos, uncompressed_file_size, + vnode_partition_count: 0, }); } @@ -729,6 +731,7 @@ pub mod tests { total_file_size, sub_level_id: 0, uncompressed_file_size, + vnode_partition_count: 0, } } @@ -751,6 +754,7 @@ pub mod tests { uncompressed_file_size: table.uncompressed_file_size, sub_level_id: idx as u64, table_infos: vec![table], + vnode_partition_count: 0, }) .collect_vec(), total_file_size, @@ -775,6 +779,7 @@ pub mod tests { .sum::(), sub_level_id: idx as u64, table_infos: table, + vnode_partition_count: 0, }) .collect_vec(), total_file_size: 0, @@ -809,6 +814,7 @@ pub mod tests { .iter() .map(|sst| sst.uncompressed_file_size) .sum::(), + vnode_partition_count: 0, }) .collect_vec(), total_file_size: 0, diff --git a/src/meta/src/hummock/compaction/mod.rs b/src/meta/src/hummock/compaction/mod.rs index c94308e71dca4..b2cc51989fc2d 100644 --- a/src/meta/src/hummock/compaction/mod.rs +++ b/src/meta/src/hummock/compaction/mod.rs @@ -38,6 +38,7 @@ pub use crate::hummock::compaction::level_selector::{ ManualCompactionSelector, SpaceReclaimCompactionSelector, TtlCompactionSelector, }; use crate::hummock::compaction::overlap_strategy::{OverlapStrategy, RangeOverlapStrategy}; +pub use crate::hummock::compaction::picker::{partition_level, SubLevelPartition}; use crate::hummock::compaction::picker::{CompactionInput, LocalPickerStatistic}; use crate::hummock::level_handler::LevelHandler; use crate::hummock::model::CompactionGroup; diff --git a/src/meta/src/hummock/compaction/picker/intral_sub_level_picker.rs b/src/meta/src/hummock/compaction/picker/intral_sub_level_picker.rs index 8ceba3f202fa7..3286a2d641ef4 100644 --- a/src/meta/src/hummock/compaction/picker/intral_sub_level_picker.rs +++ b/src/meta/src/hummock/compaction/picker/intral_sub_level_picker.rs @@ -1,3 +1,18 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::Bound; use std::sync::Arc; use risingwave_common::hash::VirtualNode; @@ -119,7 +134,7 @@ impl CompactionPicker for IntraSubLevelPicker { input_levels, target_level: 0, target_sub_level_id: level.sub_level_id, - vnode_partition_count: vnode_partition_count, + vnode_partition_count, }); } @@ -224,16 +239,23 @@ pub fn partition_level( level: &Level, partitions: &mut Vec, ) -> bool { + assert_eq!(partition_vnode_count, partitions.len()); let mut left_idx = 0; let mut can_partition = true; let partition_size = VirtualNode::COUNT / partition_vnode_count; - for partition_id in 0..partition_vnode_count { + for (partition_id, partition) in partitions.iter_mut().enumerate() { let smallest_vnode = partition_id * partition_size; let largest_vnode = (partition_id + 1) * partition_size; let smallest_table_key = UserKey::prefix_of_vnode(table_id, VirtualNode::from_index(smallest_vnode)); - let largest_table_key = - UserKey::prefix_of_vnode(table_id, VirtualNode::from_index(largest_vnode)); + let largest_table_key = if largest_vnode >= VirtualNode::COUNT { + Bound::Unbounded + } else { + Bound::Excluded(UserKey::prefix_of_vnode( + table_id, + VirtualNode::from_index(largest_vnode), + )) + }; while left_idx < level.table_infos.len() { let key_range = level.table_infos[left_idx].key_range.as_ref().unwrap(); let ret = key_range.compare_right_with_user_key(smallest_table_key.as_ref()); @@ -243,7 +265,7 @@ pub fn partition_level( left_idx += 1; } if left_idx >= level.table_infos.len() { - partitions[partition_id].sub_levels.push(PartitionInfo { + partition.sub_levels.push(PartitionInfo { sub_level_id: level.sub_level_id, left_idx: 0, right_idx: 0, @@ -264,29 +286,45 @@ pub fn partition_level( let mut right_idx = left_idx; while right_idx < level.table_infos.len() { let key_range = level.table_infos[right_idx].key_range.as_ref().unwrap(); - let ret = key_range.compare_right_with_user_key(largest_table_key.as_ref()); + let ret = match &largest_table_key { + Bound::Excluded(key) => key_range.compare_right_with_user_key(key.as_ref()), + Bound::Unbounded => { + let right_key = FullKey::decode(&key_range.right); + assert!(right_key.user_key.table_id.table_id == table_id); + // We would assign partition_vnode_count to a level only when we compact all + // sstable of it, so there will never be another stale table in this sstable + // file. + std::cmp::Ordering::Less + } + _ => unreachable!(), + }; + if ret != std::cmp::Ordering::Less { break; } total_file_size += level.table_infos[right_idx].file_size; right_idx += 1; } + if right_idx < level.table_infos.len() - && FullKey::decode( - &level.table_infos[right_idx] - .key_range - .as_ref() - .unwrap() - .left, - ) - .user_key - .lt(&largest_table_key.as_ref()) + && match &largest_table_key { + Bound::Excluded(key) => FullKey::decode( + &level.table_infos[right_idx] + .key_range + .as_ref() + .unwrap() + .left, + ) + .user_key + .lt(&key.as_ref()), + _ => unreachable!(), + } { can_partition = false; break; } left_idx = right_idx; - partitions[partition_id].sub_levels.push(PartitionInfo { + partition.sub_levels.push(PartitionInfo { sub_level_id: level.sub_level_id, left_idx, right_idx, diff --git a/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs index 0e0615a74d244..ab70ddfaf2c50 100644 --- a/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs @@ -382,9 +382,7 @@ pub mod tests { generate_table(1, 1, 101, 200, 1), generate_table(2, 1, 222, 300, 1), ], - total_file_size: 0, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, Level { level_idx: 2, @@ -396,9 +394,7 @@ pub mod tests { generate_table(7, 1, 501, 800, 1), generate_table(8, 2, 301, 400, 1), ], - total_file_size: 0, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, ]; let mut levels = Levels { @@ -560,9 +556,7 @@ pub mod tests { generate_table(3, 1, 0, 100, 1), generate_table(4, 2, 2000, 3000, 1), ], - total_file_size: 0, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, Level { level_idx: 2, @@ -571,9 +565,7 @@ pub mod tests { generate_table(1, 1, 0, 100, 1), generate_table(2, 2, 2000, 3000, 1), ], - total_file_size: 0, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, ]; // Set internal_table_ids. @@ -615,9 +607,7 @@ pub mod tests { generate_table(3, 2, 200, 300, 1), generate_table(4, 2, 300, 400, 1), ], - total_file_size: 0, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }]; let levels = Levels { levels, @@ -635,10 +625,7 @@ pub mod tests { let levels = vec![Level { level_idx: 1, level_type: LevelType::Nonoverlapping as i32, - table_infos: vec![], - total_file_size: 0, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }]; let levels = Levels { levels, @@ -1175,9 +1162,7 @@ pub mod tests { generate_table(3, 1, 101, 200, 1), generate_table(4, 1, 222, 300, 1), ], - total_file_size: 0, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, ]; assert_eq!(levels.len(), 4); @@ -1285,9 +1270,7 @@ pub mod tests { generate_table(6, 1, 444, 500, 1), generate_table(7, 1, 555, 600, 1), ], - total_file_size: 0, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, ]; assert_eq!(levels.len(), 4); diff --git a/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs index 4c6952a702b34..2bc5f565b4b54 100644 --- a/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs @@ -346,238 +346,6 @@ pub mod tests { assert_eq!(ret.input_levels[1].table_infos[0].get_sst_id(), 4); } - #[test] - fn test_pick_l0_multi_level() { - let levels = vec![ - Level { - level_idx: 1, - level_type: LevelType::Nonoverlapping as i32, - table_infos: vec![ - generate_table(0, 1, 50, 99, 2), - generate_table(1, 1, 100, 149, 2), - generate_table(2, 1, 150, 249, 2), - generate_table(6, 1, 250, 300, 2), - generate_table(7, 1, 350, 400, 2), - generate_table(8, 1, 450, 500, 2), - ], - total_file_size: 800, - ..Default::default() - }, - Level { - level_idx: 2, - level_type: LevelType::Nonoverlapping as i32, - table_infos: vec![ - generate_table(4, 1, 50, 199, 1), - generate_table(5, 1, 200, 399, 1), - generate_table(9, 1, 250, 300, 2), - generate_table(10, 1, 350, 400, 2), - generate_table(11, 1, 450, 500, 2), - ], - total_file_size: 250, - ..Default::default() - }, - Level { - level_idx: 3, - level_type: LevelType::Nonoverlapping as i32, - table_infos: vec![ - generate_table(11, 1, 250, 300, 2), - generate_table(12, 1, 350, 400, 2), - generate_table(13, 1, 450, 500, 2), - ], - total_file_size: 150, - ..Default::default() - }, - Level { - level_idx: 4, - level_type: LevelType::Nonoverlapping as i32, - table_infos: vec![ - generate_table(14, 1, 250, 300, 2), - generate_table(15, 1, 350, 400, 2), - generate_table(16, 1, 450, 500, 2), - ], - total_file_size: 150, - ..Default::default() - }, - ]; - - let levels_handlers = vec![ - LevelHandler::new(0), - LevelHandler::new(1), - LevelHandler::new(2), - ]; - - { - // no limit - let picker = NonOverlapSubLevelPicker::new( - 0, - 10000, - 1, - 10000, - Arc::new(RangeOverlapStrategy::default()), - ); - let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]); - assert_eq!(6, ret.len()); - } - - { - // limit max bytes - let picker = NonOverlapSubLevelPicker::new( - 0, - 100, - 1, - 10000, - Arc::new(RangeOverlapStrategy::default()), - ); - let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]); - assert_eq!(6, ret.len()); - } - - { - // limit max file_count - let picker = NonOverlapSubLevelPicker::new( - 0, - 10000, - 1, - 5, - Arc::new(RangeOverlapStrategy::default()), - ); - let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]); - assert_eq!(6, ret.len()); - } - } - - #[test] - fn test_pick_l0_multi_level2() { - let levels = vec![ - Level { - level_idx: 1, - level_type: LevelType::Nonoverlapping as i32, - table_infos: vec![ - generate_table(0, 1, 50, 99, 2), - generate_table(1, 1, 100, 149, 2), - generate_table(2, 1, 150, 249, 2), - generate_table(6, 1, 250, 300, 2), - generate_table(7, 1, 350, 400, 2), - generate_table(8, 1, 450, 500, 2), - ], - total_file_size: 800, - ..Default::default() - }, - Level { - level_idx: 2, - level_type: LevelType::Nonoverlapping as i32, - table_infos: vec![ - generate_table(4, 1, 50, 99, 1), - generate_table(5, 1, 150, 200, 1), - generate_table(9, 1, 250, 300, 2), - generate_table(10, 1, 350, 400, 2), - generate_table(11, 1, 450, 500, 2), - ], - total_file_size: 250, - ..Default::default() - }, - Level { - level_idx: 3, - level_type: LevelType::Nonoverlapping as i32, - table_infos: vec![ - generate_table(11, 1, 250, 300, 2), - generate_table(12, 1, 350, 400, 2), - generate_table(13, 1, 450, 500, 2), - ], - total_file_size: 150, - ..Default::default() - }, - Level { - level_idx: 4, - level_type: LevelType::Nonoverlapping as i32, - table_infos: vec![ - generate_table(14, 1, 250, 300, 2), - generate_table(15, 1, 350, 400, 2), - generate_table(16, 1, 450, 500, 2), - ], - total_file_size: 150, - ..Default::default() - }, - ]; - - let levels_handlers = vec![ - LevelHandler::new(0), - LevelHandler::new(1), - LevelHandler::new(2), - ]; - - { - // no limit - let picker = NonOverlapSubLevelPicker::new( - 0, - 10000, - 1, - 10000, - Arc::new(RangeOverlapStrategy::default()), - ); - let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]); - assert_eq!(6, ret.len()); - } - - { - // limit max bytes - let max_compaction_bytes = 100; - let picker = NonOverlapSubLevelPicker::new( - 0, - max_compaction_bytes, - 1, - 10000, - Arc::new(RangeOverlapStrategy::default()), - ); - let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]); - assert_eq!(6, ret.len()); - } - - { - // limit max file_count - let max_file_count = 2; - let picker = NonOverlapSubLevelPicker::new( - 0, - 10000, - 1, - max_file_count, - Arc::new(RangeOverlapStrategy::default()), - ); - let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]); - assert_eq!(6, ret.len()); - - for plan in ret { - let mut sst_id_set = BTreeSet::default(); - for sst in &plan.sstable_infos { - sst_id_set.insert(sst[0].get_sst_id()); - } - assert!(sst_id_set.len() <= max_file_count as usize); - } - } - - { - // limit min_depth - let min_depth = 3; - let picker = NonOverlapSubLevelPicker::new( - 1000, - 10000, - min_depth, - 10000, - Arc::new(RangeOverlapStrategy::default()), - ); - let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]); - assert_eq!(3, ret.len()); - - for plan in ret { - let mut sst_id_set = BTreeSet::default(); - for sst in &plan.sstable_infos { - sst_id_set.insert(sst[0].get_sst_id()); - } - assert!(plan.sstable_infos.len() >= min_depth); - } - } - } - #[test] fn test_trivial_move_bug() { let levels = vec![ diff --git a/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs index 10abc21e8fedb..dd233699dca7e 100644 --- a/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs @@ -216,9 +216,7 @@ mod test { generate_table_with_ids_and_epochs(10, 1, 888, 1600, 1, vec![10], 0, 0), generate_table_with_ids_and_epochs(11, 1, 1600, 1800, 1, vec![10], 0, 0), ], - total_file_size: 0, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, ]; diff --git a/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs index af5c63253585b..5aff3f0edb39c 100644 --- a/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs @@ -355,9 +355,7 @@ mod test { u64::MAX, ), ], - total_file_size: 0, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, ]; diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 353992c0bc41f..a6f265ecc257a 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -26,14 +26,20 @@ pub(crate) mod tests { use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; use risingwave_common::constants::hummock::CompactionFilterFlag; + use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::Epoch; use risingwave_common_service::observer_manager::NotificationClient; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionExt; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; - use risingwave_hummock_sdk::key::{next_key, TABLE_PREFIX_LEN}; + use risingwave_hummock_sdk::key::{ + next_key, FullKey, PointRange, TableKey, UserKey, TABLE_PREFIX_LEN, + }; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; + use risingwave_hummock_sdk::HummockEpoch; use risingwave_meta::hummock::compaction::compaction_config::CompactionConfigBuilder; - use risingwave_meta::hummock::compaction::{default_level_selector, ManualCompactionOption}; + use risingwave_meta::hummock::compaction::{ + default_level_selector, partition_level, ManualCompactionOption, SubLevelPartition, + }; use risingwave_meta::hummock::test_utils::{ register_table_ids_to_compaction_group, setup_compute_env, setup_compute_env_with_config, unregister_table_ids_from_compaction_group, @@ -41,7 +47,7 @@ pub(crate) mod tests { use risingwave_meta::hummock::{HummockManagerRef, MockHummockMetaClient}; use risingwave_meta::storage::MetaStore; use risingwave_pb::common::{HostAddress, WorkerType}; - use risingwave_pb::hummock::{HummockVersion, TableOption}; + use risingwave_pb::hummock::{HummockVersion, Level, LevelType, TableOption}; use risingwave_pb::meta::add_worker_node_request::Property; use risingwave_rpc_client::HummockMetaClient; use risingwave_storage::filter_key_extractor::{ @@ -50,10 +56,14 @@ pub(crate) mod tests { }; use risingwave_storage::hummock::compactor::{CompactionExecutor, Compactor, CompactorContext}; use risingwave_storage::hummock::iterator::test_utils::mock_sstable_store; + use risingwave_storage::hummock::multi_builder::{ + CapacitySplitTableBuilder, LocalTableBuilderFactory, + }; use risingwave_storage::hummock::sstable_store::SstableStoreRef; + use risingwave_storage::hummock::value::HummockValue; use risingwave_storage::hummock::{ CachePolicy, HummockStorage as GlobalHummockStorage, HummockStorage, MemoryLimiter, - SstableObjectIdManager, + MonotonicDeleteEvent, SstableBuilderOptions, SstableObjectIdManager, }; use risingwave_storage::monitor::{CompactorMetrics, StoreLocalStatistic}; use risingwave_storage::opts::StorageOpts; @@ -1234,4 +1244,79 @@ pub(crate) mod tests { assert_eq!(1, output_level_info.table_infos.len()); assert_eq!(252, output_level_info.table_infos[0].total_key_count); } + + #[tokio::test] + async fn test_compaction_delete_range_vnode_partition() { + let sstable_store = mock_sstable_store(); + let vnode_partition_count = 8; + let mut builder = CapacitySplitTableBuilder::new( + LocalTableBuilderFactory::new(1, sstable_store, SstableBuilderOptions::default()), + Arc::new(CompactorMetrics::unused()), + None, + true, + vnode_partition_count, + ); + let watermark: u64 = 100; + let ts: u64 = 99; + let watermark_suffix = watermark.to_be_bytes().to_vec(); + let ts_suffix = ts.to_be_bytes().to_vec(); + let table_id = TableId::new(1); + let v = vec![0u8; 10]; + for vnode_id in 0..VirtualNode::COUNT { + let key = VirtualNode::from_index(vnode_id).to_be_bytes().to_vec(); + let mut add_key = key.clone(); + add_key.extend_from_slice(&ts_suffix); + builder + .add_full_key( + FullKey::new(table_id, TableKey(add_key), 13).to_ref(), + HummockValue::Put(v.as_slice()), + true, + ) + .await + .unwrap(); + let mut end_key = key.clone(); + end_key.extend_from_slice(&watermark_suffix); + builder + .add_monotonic_delete(MonotonicDeleteEvent { + event_key: PointRange::from_user_key( + UserKey::new(table_id, TableKey(key)), + false, + ), + new_epoch: 10, + }) + .await + .unwrap(); + builder + .add_monotonic_delete(MonotonicDeleteEvent { + event_key: PointRange::from_user_key( + UserKey::new(table_id, TableKey(end_key)), + false, + ), + new_epoch: HummockEpoch::MAX, + }) + .await + .unwrap(); + } + let ret = builder.finish().await.unwrap(); + let table_infos = ret + .into_iter() + .map(|output| output.sst_info.sst_info) + .collect_vec(); + let level = Level { + level_idx: 0, + level_type: LevelType::Nonoverlapping as i32, + table_infos, + total_file_size: 100, + sub_level_id: 1, + uncompressed_file_size: 100, + vnode_partition_count, + }; + let mut partitions = vec![SubLevelPartition::default(); vnode_partition_count as usize]; + assert!(partition_level( + table_id.table_id, + vnode_partition_count as usize, + &level, + &mut partitions + )); + } }