From 1496e6a03bf52b3646bc3802a09972dc19dd1353 Mon Sep 17 00:00:00 2001 From: Li0k Date: Fri, 26 Jul 2024 13:57:23 +0800 Subject: [PATCH] feat(storage): provide end index for compactor iter to reduce io (#17426) --- .../hummock_test/src/compactor_tests.rs | 2 - src/storage/src/hummock/compactor/iterator.rs | 405 ++++++++++++++++-- .../src/storage_failpoints/test_iterator.rs | 1 - 3 files changed, 361 insertions(+), 47 deletions(-) diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 78a49788b6c53..4c5d8c9c65a19 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -803,7 +803,6 @@ pub(crate) mod tests { filter_key_extractor_manager, ) .await; - hummock_manager_ref .report_compact_task( result_task.task_id, @@ -1932,7 +1931,6 @@ pub(crate) mod tests { ..Default::default() }; let (ret, fast_ret) = run_fast_and_normal_runner(compact_ctx.clone(), task).await; - let mut fast_tables = Vec::with_capacity(fast_ret.len()); let mut normal_tables = Vec::with_capacity(ret.len()); let mut stats = StoreLocalStatistic::default(); diff --git a/src/storage/src/hummock/compactor/iterator.rs b/src/storage/src/hummock/compactor/iterator.rs index e96164cb6be04..84fa03c7b71d6 100644 --- a/src/storage/src/hummock/compactor/iterator.rs +++ b/src/storage/src/hummock/compactor/iterator.rs @@ -37,8 +37,11 @@ use crate::monitor::StoreLocalStatistic; const PROGRESS_KEY_INTERVAL: usize = 100; /// Iterates over the KV-pairs of an SST while downloading it. +/// `SstableStreamIterator` encapsulates operations on `sstables`, constructing block streams and accessing the corresponding data via `block_metas`. +/// Note that a `block_meta` does not necessarily correspond to the entire sstable, but rather to a subset, which is documented via the `block_idx`. pub struct SstableStreamIterator { sstable_store: SstableStoreRef, + /// The block metas subset of the SST. block_metas: Vec, /// The downloading stream. block_stream: Option, @@ -46,7 +49,8 @@ pub struct SstableStreamIterator { /// Iterates over the KV-pairs of the current block. block_iter: Option, - seek_block_idx: usize, + /// Index of the current block. + block_idx: usize, /// Counts the time used for IO. stats_ptr: Arc, @@ -78,7 +82,6 @@ impl SstableStreamIterator { block_metas: Vec, sstable_info: SstableInfo, existing_table_ids: HashSet, - start_block_idx: usize, stats: &StoreLocalStatistic, task_progress: Arc, sstable_store: SstableStoreRef, @@ -88,7 +91,7 @@ impl SstableStreamIterator { block_stream: None, block_iter: None, block_metas, - seek_block_idx: start_block_idx, + block_idx: 0, stats_ptr: stats.remote_io_time.clone(), existing_table_ids, sstable_info, @@ -104,7 +107,7 @@ impl SstableStreamIterator { .sstable_store .get_stream_for_blocks( self.sstable_info.object_id, - &self.block_metas[self.seek_block_idx..], + &self.block_metas[self.block_idx..], ) .verbose_instrument_await("stream_iter_get_stream") .await?; @@ -156,7 +159,7 @@ impl SstableStreamIterator { /// `self.block_iter` to `None`. async fn next_block(&mut self) -> HummockResult<()> { // Check if we want and if we can load the next block. - if self.seek_block_idx < self.block_metas.len() { + if self.block_idx < self.block_metas.len() { loop { let now = Instant::now(); let ret = match &mut self.block_stream { @@ -172,7 +175,7 @@ impl SstableStreamIterator { let mut block_iter = BlockIterator::new(BlockHolder::from_owned_block(block)); block_iter.seek_to_first(); - self.seek_block_idx += 1; + self.block_idx += 1; self.block_iter = Some(block_iter); return Ok(()); } @@ -190,7 +193,7 @@ impl SstableStreamIterator { .fetch_add(add as u64, atomic::Ordering::Relaxed); } } - self.seek_block_idx = self.block_metas.len(); + self.block_idx = self.block_metas.len(); self.block_iter = None; Ok(()) @@ -344,6 +347,7 @@ impl ConcatSstableIterator { (None, true) => None, (None, false) => Some(FullKey::decode(&self.key_range.left)), }; + self.cur_idx = idx; while self.cur_idx < self.sstables.len() { let table_info = &self.sstables[self.cur_idx]; @@ -361,49 +365,28 @@ impl ConcatSstableIterator { .sstable(table_info, &mut self.stats) .verbose_instrument_await("stream_iter_sstable") .await?; - let block_metas = &sstable.meta.block_metas; - let mut start_index = match seek_key { - None => 0, + + let filter_key_range = match seek_key { Some(seek_key) => { - // start_index points to the greatest block whose smallest_key <= seek_key. - block_metas - .partition_point(|block| { - seek_key.cmp(&FullKey::decode(&block.smallest_key)) != Ordering::Less - }) - .saturating_sub(1) + KeyRange::new(seek_key.encode().into(), self.key_range.right.clone()) } + None => self.key_range.clone(), }; - let end_index = if self.key_range.right.is_empty() { - block_metas.len() - } else { - block_metas.partition_point(|block| { - KeyComparator::compare_encoded_full_key( - &block.smallest_key, - &self.key_range.right, - ) != Ordering::Greater - }) - }; - while start_index < end_index { - let start_block_table_id = block_metas[start_index].table_id(); - if self - .existing_table_ids - .contains(&block_metas[start_index].table_id().table_id) - { - break; - } - start_index += &block_metas[(start_index + 1)..] - .partition_point(|block_meta| block_meta.table_id() == start_block_table_id) - + 1; - } - if start_index >= end_index { + + let block_metas = Self::filter_block_metas( + &sstable.meta.block_metas, + &self.existing_table_ids, + filter_key_range, + ); + + if block_metas.is_empty() { found = false; } else { self.task_progress.inc_num_pending_read_io(); let mut sstable_iter = SstableStreamIterator::new( - sstable.meta.block_metas.clone(), + block_metas, table_info.clone(), self.existing_table_ids.clone(), - start_index, &self.stats, self.task_progress.clone(), self.sstable_store.clone(), @@ -426,6 +409,94 @@ impl ConcatSstableIterator { } Ok(()) } + + pub fn filter_block_metas( + block_metas: &Vec, + existing_table_ids: &HashSet, + key_range: KeyRange, + ) -> Vec { + if block_metas.is_empty() { + return vec![]; + } + + let mut start_index = if key_range.left.is_empty() { + 0 + } else { + // start_index points to the greatest block whose smallest_key <= seek_key. + block_metas + .partition_point(|block| { + KeyComparator::compare_encoded_full_key(&key_range.left, &block.smallest_key) + != Ordering::Less + }) + .saturating_sub(1) + }; + + let mut end_index = if key_range.right.is_empty() { + block_metas.len() + } else { + let ret = block_metas.partition_point(|block| { + KeyComparator::compare_encoded_full_key(&block.smallest_key, &key_range.right) + != Ordering::Greater + }); + + if ret == 0 { + // not found + return vec![]; + } + + ret + } + .saturating_sub(1); + + // skip blocks that are not in existing_table_ids + while start_index <= end_index { + let start_block_table_id = block_metas[start_index].table_id().table_id(); + if existing_table_ids.contains(&start_block_table_id) { + break; + } + + // skip this table_id + let old_start_index = start_index; + let block_metas_to_search = &block_metas[start_index..=end_index]; + + start_index += block_metas_to_search.partition_point(|block_meta| { + block_meta.table_id().table_id() == start_block_table_id + }); + + if old_start_index == start_index { + // no more blocks with the same table_id + break; + } + } + + while start_index <= end_index { + let end_block_table_id = block_metas[end_index].table_id().table_id(); + if existing_table_ids.contains(&end_block_table_id) { + break; + } + + let old_end_index = end_index; + let block_metas_to_search = &block_metas[start_index..=end_index]; + + end_index = start_index + + block_metas_to_search + .partition_point(|block_meta| { + block_meta.table_id().table_id() < end_block_table_id + }) + .saturating_sub(1); + + if end_index == old_end_index { + // no more blocks with the same table_id + break; + } + } + + if start_index > end_index { + return vec![]; + } + + block_metas[start_index..=end_index].to_vec() + } } impl HummockIterator for ConcatSstableIterator { @@ -491,12 +562,12 @@ impl HummockIterator for ConcatSstableIterator { fn value_meta(&self) -> ValueMeta { let iter = self.sstable_iter.as_ref().expect("no table iter"); - // sstable_iter's seek_block_idx must have advanced at least one. + // sstable_iter's block_idx must have advanced at least one. // See SstableStreamIterator::next_block. - assert!(iter.seek_block_idx >= 1); + assert!(iter.block_idx >= 1); ValueMeta { object_id: Some(iter.sstable_info.object_id), - block_id: Some(iter.seek_block_idx as u64 - 1), + block_id: Some(iter.block_idx as u64 - 1), } } } @@ -569,7 +640,9 @@ impl> HummockIterator for MonitoredCompa #[cfg(test)] mod tests { use std::cmp::Ordering; + use std::collections::HashSet; + use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::key::{next_full_key, prev_full_key, FullKey}; use risingwave_hummock_sdk::key_range::KeyRange; @@ -581,6 +654,7 @@ mod tests { TEST_KEYS_COUNT, }; use crate::hummock::value::HummockValue; + use crate::hummock::BlockMeta; #[tokio::test] async fn test_concat_iterator() { @@ -783,9 +857,252 @@ mod tests { .unwrap(); assert!(iter.is_valid()); assert_eq!(iter.key(), block_1_second_key.to_ref()); + // Use None seek key and result in the second KV of block 1. iter.seek_idx(0, None).await.unwrap(); assert!(iter.is_valid()); assert_eq!(iter.key(), block_1_second_key.to_ref()); } + + #[tokio::test] + async fn test_filter_block_metas() { + { + let block_metas = Vec::default(); + + let ret = ConcatSstableIterator::filter_block_metas( + &block_metas, + &HashSet::default(), + KeyRange::default(), + ); + + assert!(ret.is_empty()); + } + + { + let block_metas = vec![ + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(), + ..Default::default() + }, + ]; + + let ret = ConcatSstableIterator::filter_block_metas( + &block_metas, + &HashSet::from_iter(vec![1_u32, 2, 3].into_iter()), + KeyRange::default(), + ); + + assert_eq!(3, ret.len()); + assert_eq!( + 1, + FullKey::decode(&ret[0].smallest_key) + .user_key + .table_id + .table_id() + ); + assert_eq!( + 3, + FullKey::decode(&ret[2].smallest_key) + .user_key + .table_id + .table_id() + ); + } + + { + let block_metas = vec![ + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(), + ..Default::default() + }, + ]; + + let ret = ConcatSstableIterator::filter_block_metas( + &block_metas, + &HashSet::from_iter(vec![2_u32, 3].into_iter()), + KeyRange::default(), + ); + + assert_eq!(2, ret.len()); + assert_eq!( + 2, + FullKey::decode(&ret[0].smallest_key) + .user_key + .table_id + .table_id() + ); + assert_eq!( + 3, + FullKey::decode(&ret[1].smallest_key) + .user_key + .table_id + .table_id() + ); + } + + { + let block_metas = vec![ + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(), + ..Default::default() + }, + ]; + + let ret = ConcatSstableIterator::filter_block_metas( + &block_metas, + &HashSet::from_iter(vec![1_u32, 2_u32].into_iter()), + KeyRange::default(), + ); + + assert_eq!(2, ret.len()); + assert_eq!( + 1, + FullKey::decode(&ret[0].smallest_key) + .user_key + .table_id + .table_id() + ); + assert_eq!( + 2, + FullKey::decode(&ret[1].smallest_key) + .user_key + .table_id + .table_id() + ); + } + + { + let block_metas = vec![ + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(), + ..Default::default() + }, + ]; + let ret = ConcatSstableIterator::filter_block_metas( + &block_metas, + &HashSet::from_iter(vec![2_u32].into_iter()), + KeyRange::default(), + ); + + assert_eq!(1, ret.len()); + assert_eq!( + 2, + FullKey::decode(&ret[0].smallest_key) + .user_key + .table_id + .table_id() + ); + } + + { + let block_metas = vec![ + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(), + ..Default::default() + }, + ]; + let ret = ConcatSstableIterator::filter_block_metas( + &block_metas, + &HashSet::from_iter(vec![2_u32].into_iter()), + KeyRange::default(), + ); + + assert_eq!(1, ret.len()); + assert_eq!( + 2, + FullKey::decode(&ret[0].smallest_key) + .user_key + .table_id + .table_id() + ); + } + + { + let block_metas = vec![ + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(), + ..Default::default() + }, + BlockMeta { + smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(), + ..Default::default() + }, + ]; + + let ret = ConcatSstableIterator::filter_block_metas( + &block_metas, + &HashSet::from_iter(vec![2_u32].into_iter()), + KeyRange::default(), + ); + + assert_eq!(1, ret.len()); + assert_eq!( + 2, + FullKey::decode(&ret[0].smallest_key) + .user_key + .table_id + .table_id() + ); + } + } } diff --git a/src/storage/src/storage_failpoints/test_iterator.rs b/src/storage/src/storage_failpoints/test_iterator.rs index 2a9fb64744371..c794c5c73bdce 100644 --- a/src/storage/src/storage_failpoints/test_iterator.rs +++ b/src/storage/src/storage_failpoints/test_iterator.rs @@ -403,7 +403,6 @@ async fn test_failpoints_compactor_iterator_recreate() { table.meta.block_metas.clone(), info, HashSet::from_iter(std::iter::once(0)), - 0, &stats, Arc::new(TaskProgress::default()), sstable_store,