From e1524f2b9c4a2ebe9b8799ebef8acbc3caaaacd6 Mon Sep 17 00:00:00 2001 From: Li0k Date: Mon, 24 Jun 2024 18:24:49 +0800 Subject: [PATCH 1/7] feat(compactor): Provide end index for compactor iter to reduce io --- src/storage/src/hummock/compactor/iterator.rs | 293 +++++++++++++++--- 1 file changed, 247 insertions(+), 46 deletions(-) diff --git a/src/storage/src/hummock/compactor/iterator.rs b/src/storage/src/hummock/compactor/iterator.rs index 5431c51270efc..d2c87739b76d8 100644 --- a/src/storage/src/hummock/compactor/iterator.rs +++ b/src/storage/src/hummock/compactor/iterator.rs @@ -46,7 +46,7 @@ pub struct SstableStreamIterator { /// Iterates over the KV-pairs of the current block. block_iter: Option, - seek_block_idx: usize, + block_idx: usize, /// Counts the time used for IO. stats_ptr: Arc, @@ -78,7 +78,6 @@ impl SstableStreamIterator { block_metas: Vec, sstable_info: SstableInfo, existing_table_ids: HashSet, - start_block_idx: usize, stats: &StoreLocalStatistic, task_progress: Arc, sstable_store: SstableStoreRef, @@ -88,7 +87,7 @@ impl SstableStreamIterator { block_stream: None, block_iter: None, block_metas, - seek_block_idx: start_block_idx, + block_idx: 0, stats_ptr: stats.remote_io_time.clone(), existing_table_ids, sstable_info, @@ -104,7 +103,7 @@ impl SstableStreamIterator { .sstable_store .get_stream_for_blocks( self.sstable_info.object_id, - &self.block_metas[self.seek_block_idx..], + &self.block_metas[self.block_idx..], ) .verbose_instrument_await("stream_iter_get_stream") .await?; @@ -156,7 +155,7 @@ impl SstableStreamIterator { /// `self.block_iter` to `None`. async fn next_block(&mut self) -> HummockResult<()> { // Check if we want and if we can load the next block. - if self.seek_block_idx < self.block_metas.len() { + if self.block_idx < self.block_metas.len() { loop { let now = Instant::now(); let ret = match &mut self.block_stream { @@ -172,7 +171,7 @@ impl SstableStreamIterator { let mut block_iter = BlockIterator::new(BlockHolder::from_owned_block(block)); block_iter.seek_to_first(); - self.seek_block_idx += 1; + self.block_idx += 1; self.block_iter = Some(block_iter); return Ok(()); } @@ -190,7 +189,7 @@ impl SstableStreamIterator { .fetch_add(add as u64, atomic::Ordering::Relaxed); } } - self.seek_block_idx = self.block_metas.len(); + self.block_idx = self.block_metas.len(); self.block_iter = None; Ok(()) @@ -362,48 +361,22 @@ impl ConcatSstableIterator { .verbose_instrument_await("stream_iter_sstable") .await?; let block_metas = &sstable.meta.block_metas; - let mut start_index = match seek_key { - None => 0, - Some(seek_key) => { - // start_index points to the greatest block whose smallest_key <= seek_key. - block_metas - .partition_point(|block| { - seek_key.cmp(&FullKey::decode(&block.smallest_key)) != Ordering::Less - }) - .saturating_sub(1) - } - }; - let end_index = if self.key_range.right.is_empty() { - block_metas.len() - } else { - block_metas.partition_point(|block| { - KeyComparator::compare_encoded_full_key( - &block.smallest_key, - &self.key_range.right, - ) != Ordering::Greater - }) - }; - while start_index < end_index { - let start_block_table_id = block_metas[start_index].table_id(); - if self - .existing_table_ids - .contains(&block_metas[start_index].table_id().table_id) - { - break; - } - start_index += &block_metas[(start_index + 1)..] - .partition_point(|block_meta| block_meta.table_id() == start_block_table_id) - + 1; - } - if start_index >= end_index { + let (start_index, end_index) = Self::filter_block_metas( + block_metas, + seek_key, + &self.existing_table_ids, + &self.key_range, + ); + + if start_index > end_index { found = false; } else { self.task_progress.inc_num_pending_read_io(); + let block_metas = block_metas[start_index..=end_index].to_vec(); let mut sstable_iter = SstableStreamIterator::new( - sstable.meta.block_metas.clone(), + block_metas, table_info.clone(), self.existing_table_ids.clone(), - start_index, &self.stats, self.task_progress.clone(), self.sstable_store.clone(), @@ -426,6 +399,77 @@ impl ConcatSstableIterator { } Ok(()) } + + pub fn filter_block_metas( + block_metas: &Vec, + seek_key: Option>, + existing_table_ids: &HashSet, + key_range: &KeyRange, + ) -> (usize, usize) { + if block_metas.is_empty() { + return (usize::MAX, 0); + } + + let mut start_index = match seek_key { + None => 0, + Some(seek_key) => { + // start_index points to the greatest block whose smallest_key <= seek_key. + block_metas + .partition_point(|block| { + seek_key.cmp(&FullKey::decode(&block.smallest_key)) != Ordering::Less + }) + .saturating_sub(1) + } + }; + + let mut end_index = if key_range.right.is_empty() { + block_metas.len() + } else { + block_metas.partition_point(|block| { + KeyComparator::compare_encoded_full_key(&block.smallest_key, &key_range.right) + != Ordering::Greater + }) + }; + + if end_index == 0 { + return (usize::MAX, 0); + } + + end_index = end_index.saturating_sub(1); + + // skip blocks that are not in existing_table_ids + while start_index <= end_index { + let start_block_table_id = block_metas[start_index].table_id().table_id(); + if existing_table_ids.contains(&start_block_table_id) { + break; + } + + start_index = start_index + + block_metas[(start_index + 1)..=end_index] + .partition_point(|block_meta| { + block_meta.table_id().table_id() > start_block_table_id + }) + .saturating_sub(1); + } + + if end_index != 0 { + while start_index <= end_index { + let end_block_table_id = block_metas[end_index].table_id().table_id(); + if existing_table_ids.contains(&end_block_table_id) { + break; + } + + end_index = start_index + + block_metas[start_index..end_index] + .partition_point(|block_meta| { + block_meta.table_id().table_id() < end_block_table_id + }) + .saturating_sub(1); + } + } + + (start_index, end_index) + } } impl HummockIterator for ConcatSstableIterator { @@ -491,12 +535,12 @@ impl HummockIterator for ConcatSstableIterator { fn value_meta(&self) -> ValueMeta { let iter = self.sstable_iter.as_ref().expect("no table iter"); - // sstable_iter's seek_block_idx must have advanced at least one. + // sstable_iter's block_idx must have advanced at least one. // See SstableStreamIterator::next_block. - assert!(iter.seek_block_idx >= 1); + assert!(iter.block_idx >= 1); ValueMeta { object_id: Some(iter.sstable_info.object_id), - block_id: Some(iter.seek_block_idx as u64 - 1), + block_id: Some(iter.block_idx as u64 - 1), } } } @@ -569,7 +613,9 @@ impl> HummockIterator for MonitoredCompa #[cfg(test)] mod tests { use std::cmp::Ordering; + use std::collections::HashSet; + use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::key::{next_full_key, prev_full_key, FullKey}; use risingwave_hummock_sdk::key_range::KeyRange; @@ -581,6 +627,7 @@ mod tests { TEST_KEYS_COUNT, }; use crate::hummock::value::HummockValue; + use crate::hummock::BlockMeta; #[tokio::test] async fn test_concat_iterator() { @@ -788,4 +835,158 @@ mod tests { assert!(iter.is_valid()); assert_eq!(iter.key(), block_1_second_key.to_ref()); } + + #[tokio::test] + async fn test_filter_block_metas() { + { + let block_metas = Vec::default(); + let seek_key = None; + + let (start_index, end_index) = ConcatSstableIterator::filter_block_metas( + &block_metas, + seek_key, + &HashSet::default(), + &KeyRange::default(), + ); + + assert_eq!(usize::MAX, start_index); + assert_eq!(0, end_index); + } + + { + let block_metas = vec![ + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0) + .encode() + .into(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0) + .encode() + .into(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0) + .encode() + .into(), + ..Default::default() + }, + ]; + let seek_key = None; + + let (start_index, end_index) = ConcatSstableIterator::filter_block_metas( + &block_metas, + seek_key, + &HashSet::from_iter(vec![1_u32, 2, 3].into_iter()), + &KeyRange::default(), + ); + + assert_eq!(0, start_index); + assert_eq!(2, end_index); + } + + { + let block_metas = vec![ + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0) + .encode() + .into(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0) + .encode() + .into(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0) + .encode() + .into(), + ..Default::default() + }, + ]; + let seek_key = None; + + let (start_index, end_index) = ConcatSstableIterator::filter_block_metas( + &block_metas, + seek_key, + &HashSet::from_iter(vec![2_u32, 3].into_iter()), + &KeyRange::default(), + ); + + assert_eq!(1, start_index); + assert_eq!(2, end_index); + } + + { + let block_metas = vec![ + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0) + .encode() + .into(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0) + .encode() + .into(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0) + .encode() + .into(), + ..Default::default() + }, + ]; + let seek_key = None; + + let (start_index, end_index) = ConcatSstableIterator::filter_block_metas( + &block_metas, + seek_key, + &HashSet::from_iter(vec![1_u32, 2_u32].into_iter()), + &KeyRange::default(), + ); + + assert_eq!(0, start_index); + assert_eq!(1, end_index); + } + + { + let block_metas = vec![ + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0) + .encode() + .into(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0) + .encode() + .into(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0) + .encode() + .into(), + ..Default::default() + }, + ]; + let seek_key = None; + + let (start_index, end_index) = ConcatSstableIterator::filter_block_metas( + &block_metas, + seek_key, + &HashSet::from_iter(vec![2_u32].into_iter()), + &KeyRange::default(), + ); + + assert_eq!(1, start_index); + assert_eq!(1, end_index); + } + } } From 1378f8243f51a6d913e690d6ca7a7bcc657508ad Mon Sep 17 00:00:00 2001 From: Li0k Date: Mon, 24 Jun 2024 18:40:44 +0800 Subject: [PATCH 2/7] chore(compactor): add more test --- src/storage/src/hummock/compactor/iterator.rs | 92 +++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/src/storage/src/hummock/compactor/iterator.rs b/src/storage/src/hummock/compactor/iterator.rs index d2c87739b76d8..cb245b5d36fab 100644 --- a/src/storage/src/hummock/compactor/iterator.rs +++ b/src/storage/src/hummock/compactor/iterator.rs @@ -988,5 +988,97 @@ mod tests { assert_eq!(1, start_index); assert_eq!(1, end_index); } + + { + let block_metas = vec![ + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0) + .encode() + .into(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0) + .encode() + .into(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0) + .encode() + .into(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0) + .encode() + .into(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0) + .encode() + .into(), + ..Default::default() + }, + ]; + let seek_key = None; + + let (start_index, end_index) = ConcatSstableIterator::filter_block_metas( + &block_metas, + seek_key, + &HashSet::from_iter(vec![2_u32].into_iter()), + &KeyRange::default(), + ); + + assert_eq!(3, start_index); + assert_eq!(4, end_index); + } + + { + let block_metas = vec![ + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0) + .encode() + .into(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0) + .encode() + .into(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0) + .encode() + .into(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0) + .encode() + .into(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0) + .encode() + .into(), + ..Default::default() + }, + ]; + let seek_key = None; + + let (start_index, end_index) = ConcatSstableIterator::filter_block_metas( + &block_metas, + seek_key, + &HashSet::from_iter(vec![2_u32].into_iter()), + &KeyRange::default(), + ); + + assert_eq!(1, start_index); + assert_eq!(1, end_index); + } } } From 4cb3f0d580907793846a73ac7f32c4e7aa2bb55f Mon Sep 17 00:00:00 2001 From: Li0k Date: Mon, 24 Jun 2024 19:18:47 +0800 Subject: [PATCH 3/7] fix(test): fix compile --- src/storage/src/storage_failpoints/test_iterator.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/storage/src/storage_failpoints/test_iterator.rs b/src/storage/src/storage_failpoints/test_iterator.rs index 2a9fb64744371..c794c5c73bdce 100644 --- a/src/storage/src/storage_failpoints/test_iterator.rs +++ b/src/storage/src/storage_failpoints/test_iterator.rs @@ -403,7 +403,6 @@ async fn test_failpoints_compactor_iterator_recreate() { table.meta.block_metas.clone(), info, HashSet::from_iter(std::iter::once(0)), - 0, &stats, Arc::new(TaskProgress::default()), sstable_store, From c36d5062c3164f511d67c2d297d49b712374b8c1 Mon Sep 17 00:00:00 2001 From: Li0k Date: Tue, 25 Jun 2024 11:33:47 +0800 Subject: [PATCH 4/7] fix(compactor): fix panic --- src/storage/src/hummock/compactor/iterator.rs | 28 ++++++++++++++----- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/src/storage/src/hummock/compactor/iterator.rs b/src/storage/src/hummock/compactor/iterator.rs index cb245b5d36fab..2cc21153d42c9 100644 --- a/src/storage/src/hummock/compactor/iterator.rs +++ b/src/storage/src/hummock/compactor/iterator.rs @@ -444,12 +444,18 @@ impl ConcatSstableIterator { break; } - start_index = start_index - + block_metas[(start_index + 1)..=end_index] - .partition_point(|block_meta| { - block_meta.table_id().table_id() > start_block_table_id - }) - .saturating_sub(1); + // skip this table_id + let old_start_index = start_index; + let block_metas_to_search = &block_metas[start_index..=end_index]; + + start_index += block_metas_to_search.partition_point(|block_meta| { + block_meta.table_id().table_id() == start_block_table_id + }); + + if old_start_index == start_index { + // no more blocks with the same table_id + break; + } } if end_index != 0 { @@ -459,12 +465,20 @@ impl ConcatSstableIterator { break; } + let old_end_index = end_index; + let block_metas_to_search = &block_metas[start_index..=end_index]; + end_index = start_index - + block_metas[start_index..end_index] + + block_metas_to_search .partition_point(|block_meta| { block_meta.table_id().table_id() < end_block_table_id }) .saturating_sub(1); + + if end_index == old_end_index { + // no more blocks with the same table_id + break; + } } } From f7689dd4372db8e30e248094e6621e331a63b3db Mon Sep 17 00:00:00 2001 From: Li0k Date: Tue, 25 Jun 2024 11:38:51 +0800 Subject: [PATCH 5/7] fix(compactor): fix compile --- src/storage/src/hummock/compactor/iterator.rs | 90 +++++-------------- 1 file changed, 23 insertions(+), 67 deletions(-) diff --git a/src/storage/src/hummock/compactor/iterator.rs b/src/storage/src/hummock/compactor/iterator.rs index 2cc21153d42c9..1eb005ae5d374 100644 --- a/src/storage/src/hummock/compactor/iterator.rs +++ b/src/storage/src/hummock/compactor/iterator.rs @@ -870,21 +870,15 @@ mod tests { { let block_metas = vec![ BlockMeta { - smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0) - .encode() - .into(), + smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(), ..Default::default() }, BlockMeta { - smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0) - .encode() - .into(), + smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(), ..Default::default() }, BlockMeta { - smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0) - .encode() - .into(), + smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(), ..Default::default() }, ]; @@ -904,21 +898,15 @@ mod tests { { let block_metas = vec![ BlockMeta { - smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0) - .encode() - .into(), + smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(), ..Default::default() }, BlockMeta { - smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0) - .encode() - .into(), + smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(), ..Default::default() }, BlockMeta { - smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0) - .encode() - .into(), + smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(), ..Default::default() }, ]; @@ -938,21 +926,15 @@ mod tests { { let block_metas = vec![ BlockMeta { - smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0) - .encode() - .into(), + smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(), ..Default::default() }, BlockMeta { - smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0) - .encode() - .into(), + smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(), ..Default::default() }, BlockMeta { - smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0) - .encode() - .into(), + smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(), ..Default::default() }, ]; @@ -972,21 +954,15 @@ mod tests { { let block_metas = vec![ BlockMeta { - smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0) - .encode() - .into(), + smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(), ..Default::default() }, BlockMeta { - smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0) - .encode() - .into(), + smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(), ..Default::default() }, BlockMeta { - smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0) - .encode() - .into(), + smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(), ..Default::default() }, ]; @@ -1006,33 +982,23 @@ mod tests { { let block_metas = vec![ BlockMeta { - smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0) - .encode() - .into(), + smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(), ..Default::default() }, BlockMeta { - smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0) - .encode() - .into(), + smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(), ..Default::default() }, BlockMeta { - smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0) - .encode() - .into(), + smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(), ..Default::default() }, BlockMeta { - smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0) - .encode() - .into(), + smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(), ..Default::default() }, BlockMeta { - smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0) - .encode() - .into(), + smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(), ..Default::default() }, ]; @@ -1046,39 +1012,29 @@ mod tests { ); assert_eq!(3, start_index); - assert_eq!(4, end_index); + assert_eq!(3, end_index); } { let block_metas = vec![ BlockMeta { - smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0) - .encode() - .into(), + smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(), ..Default::default() }, BlockMeta { - smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0) - .encode() - .into(), + smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(), ..Default::default() }, BlockMeta { - smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0) - .encode() - .into(), + smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(), ..Default::default() }, BlockMeta { - smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0) - .encode() - .into(), + smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(), ..Default::default() }, BlockMeta { - smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0) - .encode() - .into(), + smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(), ..Default::default() }, ]; From aba510bbe9f45d4f6f1ec639096df91590f49625 Mon Sep 17 00:00:00 2001 From: Li0k Date: Mon, 8 Jul 2024 21:31:39 +0800 Subject: [PATCH 6/7] refactor(compactor): address comment --- src/storage/src/hummock/compactor/iterator.rs | 228 +++++++++++------- 1 file changed, 140 insertions(+), 88 deletions(-) diff --git a/src/storage/src/hummock/compactor/iterator.rs b/src/storage/src/hummock/compactor/iterator.rs index 1eb005ae5d374..bb43ad82211e6 100644 --- a/src/storage/src/hummock/compactor/iterator.rs +++ b/src/storage/src/hummock/compactor/iterator.rs @@ -37,8 +37,11 @@ use crate::monitor::StoreLocalStatistic; const PROGRESS_KEY_INTERVAL: usize = 100; /// Iterates over the KV-pairs of an SST while downloading it. +/// `SstableStreamIterator` encapsulates operations on `sstables`, constructing block streams and accessing the corresponding data via `block_metas`. +/// Note that a `block_meta` does not necessarily correspond to the entire sstable, but rather to a subset, which is documented via the `block_idx`. pub struct SstableStreamIterator { sstable_store: SstableStoreRef, + /// The block metas subset of the SST. block_metas: Vec, /// The downloading stream. block_stream: Option, @@ -46,6 +49,7 @@ pub struct SstableStreamIterator { /// Iterates over the KV-pairs of the current block. block_iter: Option, + /// Index of the current block. block_idx: usize, /// Counts the time used for IO. @@ -343,6 +347,7 @@ impl ConcatSstableIterator { (None, true) => None, (None, false) => Some(FullKey::decode(&self.key_range.left)), }; + self.cur_idx = idx; while self.cur_idx < self.sstables.len() { let table_info = &self.sstables[self.cur_idx]; @@ -360,19 +365,24 @@ impl ConcatSstableIterator { .sstable(table_info, &mut self.stats) .verbose_instrument_await("stream_iter_sstable") .await?; - let block_metas = &sstable.meta.block_metas; - let (start_index, end_index) = Self::filter_block_metas( - block_metas, - seek_key, + + let filter_key_range = match seek_key { + Some(seek_key) => { + KeyRange::new(seek_key.encode().into(), self.key_range.right.clone()) + } + None => self.key_range.clone(), + }; + + let block_metas = Self::filter_block_metas( + &sstable.meta.block_metas, &self.existing_table_ids, - &self.key_range, + filter_key_range, ); - if start_index > end_index { + if block_metas.is_empty() { found = false; } else { self.task_progress.inc_num_pending_read_io(); - let block_metas = block_metas[start_index..=end_index].to_vec(); let mut sstable_iter = SstableStreamIterator::new( block_metas, table_info.clone(), @@ -402,24 +412,23 @@ impl ConcatSstableIterator { pub fn filter_block_metas( block_metas: &Vec, - seek_key: Option>, existing_table_ids: &HashSet, - key_range: &KeyRange, - ) -> (usize, usize) { + key_range: KeyRange, + ) -> Vec { if block_metas.is_empty() { - return (usize::MAX, 0); + return vec![]; } - let mut start_index = match seek_key { - None => 0, - Some(seek_key) => { - // start_index points to the greatest block whose smallest_key <= seek_key. - block_metas - .partition_point(|block| { - seek_key.cmp(&FullKey::decode(&block.smallest_key)) != Ordering::Less - }) - .saturating_sub(1) - } + let mut start_index = if key_range.left.is_empty() { + 0 + } else { + // start_index points to the greatest block whose smallest_key <= seek_key. + block_metas + .partition_point(|block| { + KeyComparator::compare_encoded_full_key(&key_range.left, &block.smallest_key) + != Ordering::Less + }) + .saturating_sub(1) }; let mut end_index = if key_range.right.is_empty() { @@ -429,14 +438,14 @@ impl ConcatSstableIterator { KeyComparator::compare_encoded_full_key(&block.smallest_key, &key_range.right) != Ordering::Greater }) - }; + } + .saturating_sub(1); if end_index == 0 { - return (usize::MAX, 0); + // not found + return vec![]; } - end_index = end_index.saturating_sub(1); - // skip blocks that are not in existing_table_ids while start_index <= end_index { let start_block_table_id = block_metas[start_index].table_id().table_id(); @@ -458,31 +467,33 @@ impl ConcatSstableIterator { } } - if end_index != 0 { - while start_index <= end_index { - let end_block_table_id = block_metas[end_index].table_id().table_id(); - if existing_table_ids.contains(&end_block_table_id) { - break; - } + while start_index <= end_index { + let end_block_table_id = block_metas[end_index].table_id().table_id(); + if existing_table_ids.contains(&end_block_table_id) { + break; + } - let old_end_index = end_index; - let block_metas_to_search = &block_metas[start_index..=end_index]; + let old_end_index = end_index; + let block_metas_to_search = &block_metas[start_index..=end_index]; - end_index = start_index - + block_metas_to_search - .partition_point(|block_meta| { - block_meta.table_id().table_id() < end_block_table_id - }) - .saturating_sub(1); + end_index = start_index + + block_metas_to_search + .partition_point(|block_meta| { + block_meta.table_id().table_id() < end_block_table_id + }) + .saturating_sub(1); - if end_index == old_end_index { - // no more blocks with the same table_id - break; - } + if end_index == old_end_index { + // no more blocks with the same table_id + break; } } - (start_index, end_index) + if start_index > end_index { + return vec![]; + } + + block_metas[start_index..=end_index].to_vec() } } @@ -844,6 +855,7 @@ mod tests { .unwrap(); assert!(iter.is_valid()); assert_eq!(iter.key(), block_1_second_key.to_ref()); + // Use None seek key and result in the second KV of block 1. iter.seek_idx(0, None).await.unwrap(); assert!(iter.is_valid()); @@ -854,17 +866,14 @@ mod tests { async fn test_filter_block_metas() { { let block_metas = Vec::default(); - let seek_key = None; - let (start_index, end_index) = ConcatSstableIterator::filter_block_metas( + let ret = ConcatSstableIterator::filter_block_metas( &block_metas, - seek_key, &HashSet::default(), - &KeyRange::default(), + KeyRange::default(), ); - assert_eq!(usize::MAX, start_index); - assert_eq!(0, end_index); + assert!(ret.is_empty()); } { @@ -882,17 +891,28 @@ mod tests { ..Default::default() }, ]; - let seek_key = None; - let (start_index, end_index) = ConcatSstableIterator::filter_block_metas( + let ret = ConcatSstableIterator::filter_block_metas( &block_metas, - seek_key, &HashSet::from_iter(vec![1_u32, 2, 3].into_iter()), - &KeyRange::default(), + KeyRange::default(), ); - assert_eq!(0, start_index); - assert_eq!(2, end_index); + assert_eq!(3, ret.len()); + assert_eq!( + 1, + FullKey::decode(&ret[0].smallest_key) + .user_key + .table_id + .table_id() + ); + assert_eq!( + 3, + FullKey::decode(&ret[2].smallest_key) + .user_key + .table_id + .table_id() + ); } { @@ -910,17 +930,28 @@ mod tests { ..Default::default() }, ]; - let seek_key = None; - let (start_index, end_index) = ConcatSstableIterator::filter_block_metas( + let ret = ConcatSstableIterator::filter_block_metas( &block_metas, - seek_key, &HashSet::from_iter(vec![2_u32, 3].into_iter()), - &KeyRange::default(), + KeyRange::default(), ); - assert_eq!(1, start_index); - assert_eq!(2, end_index); + assert_eq!(2, ret.len()); + assert_eq!( + 2, + FullKey::decode(&ret[0].smallest_key) + .user_key + .table_id + .table_id() + ); + assert_eq!( + 3, + FullKey::decode(&ret[1].smallest_key) + .user_key + .table_id + .table_id() + ); } { @@ -938,17 +969,28 @@ mod tests { ..Default::default() }, ]; - let seek_key = None; - let (start_index, end_index) = ConcatSstableIterator::filter_block_metas( + let ret = ConcatSstableIterator::filter_block_metas( &block_metas, - seek_key, &HashSet::from_iter(vec![1_u32, 2_u32].into_iter()), - &KeyRange::default(), + KeyRange::default(), ); - assert_eq!(0, start_index); - assert_eq!(1, end_index); + assert_eq!(2, ret.len()); + assert_eq!( + 1, + FullKey::decode(&ret[0].smallest_key) + .user_key + .table_id + .table_id() + ); + assert_eq!( + 2, + FullKey::decode(&ret[1].smallest_key) + .user_key + .table_id + .table_id() + ); } { @@ -966,17 +1008,20 @@ mod tests { ..Default::default() }, ]; - let seek_key = None; - - let (start_index, end_index) = ConcatSstableIterator::filter_block_metas( + let ret = ConcatSstableIterator::filter_block_metas( &block_metas, - seek_key, &HashSet::from_iter(vec![2_u32].into_iter()), - &KeyRange::default(), + KeyRange::default(), ); - assert_eq!(1, start_index); - assert_eq!(1, end_index); + assert_eq!(1, ret.len()); + assert_eq!( + 2, + FullKey::decode(&ret[0].smallest_key) + .user_key + .table_id + .table_id() + ); } { @@ -1002,17 +1047,20 @@ mod tests { ..Default::default() }, ]; - let seek_key = None; - - let (start_index, end_index) = ConcatSstableIterator::filter_block_metas( + let ret = ConcatSstableIterator::filter_block_metas( &block_metas, - seek_key, &HashSet::from_iter(vec![2_u32].into_iter()), - &KeyRange::default(), + KeyRange::default(), ); - assert_eq!(3, start_index); - assert_eq!(3, end_index); + assert_eq!(1, ret.len()); + assert_eq!( + 2, + FullKey::decode(&ret[0].smallest_key) + .user_key + .table_id + .table_id() + ); } { @@ -1038,17 +1086,21 @@ mod tests { ..Default::default() }, ]; - let seek_key = None; - let (start_index, end_index) = ConcatSstableIterator::filter_block_metas( + let ret = ConcatSstableIterator::filter_block_metas( &block_metas, - seek_key, &HashSet::from_iter(vec![2_u32].into_iter()), - &KeyRange::default(), + KeyRange::default(), ); - assert_eq!(1, start_index); - assert_eq!(1, end_index); + assert_eq!(1, ret.len()); + assert_eq!( + 2, + FullKey::decode(&ret[0].smallest_key) + .user_key + .table_id + .table_id() + ); } } } From 486ca0ff0656180056e7e05693fec302e91ce7e7 Mon Sep 17 00:00:00 2001 From: Li0k Date: Fri, 12 Jul 2024 15:27:02 +0800 Subject: [PATCH 7/7] fix(compactor): fix error --- src/storage/hummock_test/src/compactor_tests.rs | 16 ---------------- src/storage/src/hummock/compactor/iterator.rs | 16 +++++++++------- 2 files changed, 9 insertions(+), 23 deletions(-) diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index e3c19f54e4340..6af79fffdeac7 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -797,7 +797,6 @@ pub(crate) mod tests { filter_key_extractor_manager, ) .await; - hummock_manager_ref .report_compact_task( result_task.task_id, @@ -1389,10 +1388,6 @@ pub(crate) mod tests { normal_tables.push(sstable_store.sstable(sst_info, &mut stats).await.unwrap()); } assert!(fast_ret.iter().all(|f| f.file_size < capacity * 6 / 5)); - println!( - "fast sstables file size: {:?}", - fast_ret.iter().map(|f| f.file_size).collect_vec(), - ); assert!(can_concat(&ret)); assert!(can_concat(&fast_ret)); let read_options = Arc::new(SstableIteratorReadOptions::default()); @@ -1522,7 +1517,6 @@ pub(crate) mod tests { sstable_store.clone(), ) .await; - println!("generate ssts size: {}", sst.file_size); ssts.push(sst); } let select_file_count = ssts.len() / 2; @@ -1869,11 +1863,6 @@ pub(crate) mod tests { max_sst_file_size = std::cmp::max(max_sst_file_size, sst_info.file_size); sst_infos.push(sst_info); } - println!( - "input data: {}", - sst_infos.iter().map(|sst| sst.file_size).sum::(), - ); - let target_file_size = max_sst_file_size / 4; let mut table_watermarks = BTreeMap::default(); let key_count = KEY_COUNT / VirtualNode::COUNT * 2; @@ -1927,11 +1916,6 @@ pub(crate) mod tests { ..Default::default() }; let (ret, fast_ret) = run_fast_and_normal_runner(compact_ctx.clone(), task).await; - println!( - "normal compact result data: {}, fast compact result data: {}", - ret.iter().map(|sst| sst.file_size).sum::(), - fast_ret.iter().map(|sst| sst.file_size).sum::(), - ); // check_compaction_result(compact_ctx.sstable_store, ret.clone(), fast_ret, target_file_size).await; let mut fast_tables = Vec::with_capacity(fast_ret.len()); let mut normal_tables = Vec::with_capacity(ret.len()); diff --git a/src/storage/src/hummock/compactor/iterator.rs b/src/storage/src/hummock/compactor/iterator.rs index bb43ad82211e6..debdfb6eec448 100644 --- a/src/storage/src/hummock/compactor/iterator.rs +++ b/src/storage/src/hummock/compactor/iterator.rs @@ -434,17 +434,19 @@ impl ConcatSstableIterator { let mut end_index = if key_range.right.is_empty() { block_metas.len() } else { - block_metas.partition_point(|block| { + let ret = block_metas.partition_point(|block| { KeyComparator::compare_encoded_full_key(&block.smallest_key, &key_range.right) != Ordering::Greater - }) - } - .saturating_sub(1); + }); - if end_index == 0 { - // not found - return vec![]; + if ret == 0 { + // not found + return vec![]; + } + + ret } + .saturating_sub(1); // skip blocks that are not in existing_table_ids while start_index <= end_index {