diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index 093612623cce8..f61991c0fa274 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -533,6 +533,12 @@ pub fn optimize_by_copy_block(compact_task: &CompactTask, context: &CompactorCon .iter() .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0)); + let has_split_sst = compact_task + .input_ssts + .iter() + .flat_map(|level| level.table_infos.iter()) + .any(|sst| sst.sst_id != sst.object_id); + let compact_table_ids: HashSet = HashSet::from_iter( compact_task .input_ssts @@ -546,6 +552,7 @@ pub fn optimize_by_copy_block(compact_task: &CompactTask, context: &CompactorCon && all_ssts_are_blocked_filter && !has_tombstone && !has_ttl + && !has_split_sst && single_table && compact_task.target_level > 0 && compact_task.input_ssts.len() == 2 diff --git a/src/storage/src/hummock/compactor/fast_compactor_runner.rs b/src/storage/src/hummock/compactor/fast_compactor_runner.rs index 9851afc47f148..2c2fa3b781528 100644 --- a/src/storage/src/hummock/compactor/fast_compactor_runner.rs +++ b/src/storage/src/hummock/compactor/fast_compactor_runner.rs @@ -242,6 +242,9 @@ impl ConcatSstableIterator { let stats_ptr = self.stats.remote_io_time.clone(); let now = Instant::now(); self.task_progress.inc_num_pending_read_io(); + + // Fast compact only support the single table compaction.(not split sst) + // So we don't need to filter the block_metas with table_id and key_range let block_stream = self .sstable_store .get_stream_for_blocks(sstable.id, &sstable.meta.block_metas) diff --git a/src/storage/src/hummock/compactor/iterator.rs b/src/storage/src/hummock/compactor/iterator.rs index df92a6f0d0aa8..d1bb5f9753f84 100644 --- a/src/storage/src/hummock/compactor/iterator.rs +++ b/src/storage/src/hummock/compactor/iterator.rs @@ -57,7 +57,9 @@ pub struct SstableStreamIterator { /// For key sanity check of divided SST and debugging sstable_info: SstableInfo, - existing_table_ids: HashSet, + + /// To Filter out the blocks + sstable_table_ids: HashSet, task_progress: Arc, io_retry_times: usize, max_io_retry_times: usize, @@ -86,16 +88,17 @@ impl SstableStreamIterator { pub fn new( block_metas: Vec, sstable_info: SstableInfo, - existing_table_ids: HashSet, stats: &StoreLocalStatistic, task_progress: Arc, sstable_store: SstableStoreRef, max_io_retry_times: usize, ) -> Self { + let sstable_table_ids = HashSet::from_iter(sstable_info.table_ids.iter().cloned()); + // filter the block meta with key range let block_metas = filter_block_metas( &block_metas, - &existing_table_ids, + &sstable_table_ids, sstable_info.key_range.clone(), ); @@ -109,7 +112,7 @@ impl SstableStreamIterator { block_metas, block_idx: 0, stats_ptr: stats.remote_io_time.clone(), - existing_table_ids, + sstable_table_ids, sstable_info, sstable_store, task_progress, @@ -137,7 +140,7 @@ impl SstableStreamIterator { async fn prune_from_valid_block_iter(&mut self) -> HummockResult<()> { while let Some(block_iter) = self.block_iter.as_mut() { if self - .existing_table_ids + .sstable_table_ids .contains(&block_iter.table_id().table_id) { return Ok(()); @@ -461,7 +464,6 @@ impl ConcatSstableIterator { let mut sstable_iter = SstableStreamIterator::new( block_metas, table_info.clone(), - self.existing_table_ids.clone(), &self.stats, self.task_progress.clone(), self.sstable_store.clone(), diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index bb1bdbdf9ee1b..b9f29c5740e4b 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -776,6 +776,7 @@ mod tests { meta.clone(), sstable_store.clone(), writer_opts, + vec![SST_ID as u32], ) .await .unwrap(); @@ -806,6 +807,7 @@ mod tests { meta.clone(), sstable_store.clone(), writer_opts, + vec![SST_ID as u32], ) .await .unwrap(); diff --git a/src/storage/src/hummock/test_utils.rs b/src/storage/src/hummock/test_utils.rs index 03faa196a08b0..a9b1e9dfb31b7 100644 --- a/src/storage/src/hummock/test_utils.rs +++ b/src/storage/src/hummock/test_utils.rs @@ -175,6 +175,7 @@ pub async fn put_sst( mut meta: SstableMeta, sstable_store: SstableStoreRef, mut options: SstableWriterOptions, + table_ids: Vec, ) -> HummockResult { options.policy = CachePolicy::NotFill; let mut writer = sstable_store @@ -199,6 +200,7 @@ pub async fn put_sst( file_size: meta.estimated_size as u64, meta_offset: meta.meta_offset, uncompressed_file_size: meta.estimated_size as u64, + table_ids, ..Default::default() }; let writer_output = writer.finish(meta).await?; diff --git a/src/storage/src/storage_failpoints/test_iterator.rs b/src/storage/src/storage_failpoints/test_iterator.rs index c794c5c73bdce..cb05f0b788c29 100644 --- a/src/storage/src/storage_failpoints/test_iterator.rs +++ b/src/storage/src/storage_failpoints/test_iterator.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; use std::ops::Bound::Unbounded; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -392,6 +391,7 @@ async fn test_failpoints_compactor_iterator_recreate() { meta.clone(), sstable_store.clone(), default_writer_opt_for_test(), + vec![table_id as u32], ) .await .unwrap(); @@ -402,7 +402,6 @@ async fn test_failpoints_compactor_iterator_recreate() { let mut sstable_iter = SstableStreamIterator::new( table.meta.block_metas.clone(), info, - HashSet::from_iter(std::iter::once(0)), &stats, Arc::new(TaskProgress::default()), sstable_store, diff --git a/src/storage/src/storage_failpoints/test_sstable.rs b/src/storage/src/storage_failpoints/test_sstable.rs index c89791767b4d0..fa7bd970039f0 100644 --- a/src/storage/src/storage_failpoints/test_sstable.rs +++ b/src/storage/src/storage_failpoints/test_sstable.rs @@ -104,6 +104,7 @@ async fn test_failpoints_vacuum_and_metadata() { meta.clone(), sstable_store.clone(), default_writer_opt_for_test(), + vec![table_id as u32], ) .await; assert!(result.is_err()); @@ -118,6 +119,7 @@ async fn test_failpoints_vacuum_and_metadata() { meta, sstable_store.clone(), default_writer_opt_for_test(), + vec![table_id as u32], ) .await .unwrap();