diff --git a/ci/scripts/build.sh b/ci/scripts/build.sh index f595bede10d24..b6a6a0fe3b542 100755 --- a/ci/scripts/build.sh +++ b/ci/scripts/build.sh @@ -60,7 +60,7 @@ cargo build \ --timings -artifacts=(risingwave sqlsmith compaction-test risingwave_regress_test risingwave_e2e_extended_mode_test risedev-dev delete-range-test) +artifacts=(risingwave sqlsmith compaction-test risingwave_regress_test risingwave_e2e_extended_mode_test risedev-dev) echo "--- Show link info" ldd target/"$profile"/risingwave diff --git a/src/ctl/src/cmd_impl/hummock/sst_dump.rs b/src/ctl/src/cmd_impl/hummock/sst_dump.rs index 03a7b35a85192..51b776ad1b2c2 100644 --- a/src/ctl/src/cmd_impl/hummock/sst_dump.rs +++ b/src/ctl/src/cmd_impl/hummock/sst_dump.rs @@ -228,14 +228,6 @@ pub async fn sst_dump_via_sstable_store( println!("Bloom Filter Size: {}", sstable_meta.bloom_filter.len()); println!("Key Count: {}", sstable_meta.key_count); println!("Version: {}", sstable_meta.version); - println!( - "Monotonoic Deletes Count: {}", - sstable_meta.monotonic_tombstone_events.len() - ); - for monotonic_delete in &sstable_meta.monotonic_tombstone_events { - println!("\tevent key: {:?}", monotonic_delete.event_key); - println!("\tnew epoch: {:?}", monotonic_delete.new_epoch); - } println!("Block Count: {}", sstable.block_count()); for i in 0..sstable.block_count() { diff --git a/src/storage/hummock_sdk/src/key.rs b/src/storage/hummock_sdk/src/key.rs index 6a33d1ff1a09b..0f04440ec5489 100644 --- a/src/storage/hummock_sdk/src/key.rs +++ b/src/storage/hummock_sdk/src/key.rs @@ -24,7 +24,6 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_common_estimate_size::EstimateSize; -use serde::{Deserialize, Serialize}; use crate::{EpochWithGap, HummockEpoch}; @@ -441,14 +440,8 @@ impl CopyFromSlice for Bytes { /// /// Its name come from the assumption that Hummock is always accessed by a table-like structure /// identified by a [`TableId`]. -#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Serialize, Deserialize)] -pub struct TableKey>( - #[serde(bound( - serialize = "T: serde::Serialize + serde_bytes::Serialize", - deserialize = "T: serde::Deserialize<'de> + serde_bytes::Deserialize<'de>" - ))] - pub T, -); +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)] +pub struct TableKey>(pub T); impl> Debug for TableKey { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -542,15 +535,11 @@ pub fn gen_key_from_str(vnode: VirtualNode, payload: &str) -> TableKey { /// will group these two values into one struct for convenient filtering. /// /// The encoded format is | `table_id` | `table_key` |. -#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Serialize, Deserialize)] +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)] pub struct UserKey> { // When comparing `UserKey`, we first compare `table_id`, then `table_key`. So the order of // declaration matters. pub table_id: TableId, - #[serde(bound( - serialize = "T: serde::Serialize + serde_bytes::Serialize", - deserialize = "T: serde::Deserialize<'de> + serde_bytes::Deserialize<'de>" - ))] pub table_key: TableKey, } @@ -590,15 +579,6 @@ impl> UserKey { buf.put_slice(self.table_key.as_ref()); } - /// Encode in to a buffer. - /// - /// length prefixed requires 4B more than its `encoded_len()` - pub fn encode_length_prefixed(&self, mut buf: impl BufMut) { - buf.put_u32(self.table_id.table_id()); - buf.put_u32(self.table_key.as_ref().len() as u32); - buf.put_slice(self.table_key.as_ref()); - } - pub fn encode(&self) -> Vec { let mut ret = Vec::with_capacity(TABLE_PREFIX_LEN + self.table_key.as_ref().len()); self.encode_into(&mut ret); @@ -658,16 +638,6 @@ impl> UserKey { } } -impl UserKey> { - pub fn decode_length_prefixed(buf: &mut &[u8]) -> Self { - let table_id = buf.get_u32(); - let len = buf.get_u32() as usize; - let data = buf[..len].to_vec(); - buf.advance(len); - UserKey::new(TableId::new(table_id), TableKey(data)) - } -} - impl> UserKey { /// Use this method to override an old `UserKey>` with a `UserKey<&[u8]>` to own the /// table key without reallocating a new `UserKey` object. @@ -882,48 +852,50 @@ impl + Ord + Eq> PartialOrd for FullKey { } } -#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] -pub struct PointRange> { - // When comparing `PointRange`, we first compare `left_user_key`, then - // `is_exclude_left_key`. Therefore the order of declaration matters. - #[serde(bound( - serialize = "T: serde::Serialize + serde_bytes::Serialize", - deserialize = "T: serde::Deserialize<'de> + serde_bytes::Deserialize<'de>" - ))] - pub left_user_key: UserKey, - /// `PointRange` represents the left user key itself if `is_exclude_left_key==false` - /// while represents the right δ Neighborhood of the left user key if - /// `is_exclude_left_key==true`. - pub is_exclude_left_key: bool, -} +pub mod range_delete_backward_compatibility_serde_struct { + use bytes::{Buf, BufMut}; + use risingwave_common::catalog::TableId; + use serde::{Deserialize, Serialize}; -impl> PointRange { - pub fn from_user_key(left_user_key: UserKey, is_exclude_left_key: bool) -> Self { - Self { - left_user_key, - is_exclude_left_key, - } - } + #[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] + pub struct TableKey(Vec); - pub fn as_ref(&self) -> PointRange<&[u8]> { - PointRange::from_user_key(self.left_user_key.as_ref(), self.is_exclude_left_key) + #[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] + pub struct UserKey { + // When comparing `UserKey`, we first compare `table_id`, then `table_key`. So the order of + // declaration matters. + pub table_id: TableId, + pub table_key: TableKey, } - pub fn is_empty(&self) -> bool { - self.left_user_key.is_empty() - } -} + impl UserKey { + pub fn decode_length_prefixed(buf: &mut &[u8]) -> Self { + let table_id = buf.get_u32(); + let len = buf.get_u32() as usize; + let data = buf[..len].to_vec(); + buf.advance(len); + UserKey { + table_id: TableId::new(table_id), + table_key: TableKey(data), + } + } -impl<'a> PointRange<&'a [u8]> { - pub fn to_vec(&self) -> PointRange> { - self.copy_into() + pub fn encode_length_prefixed(&self, mut buf: impl BufMut) { + buf.put_u32(self.table_id.table_id()); + buf.put_u32(self.table_key.0.as_slice().len() as u32); + buf.put_slice(self.table_key.0.as_slice()); + } } - pub fn copy_into>(&self) -> PointRange { - PointRange { - left_user_key: self.left_user_key.copy_into(), - is_exclude_left_key: self.is_exclude_left_key, - } + #[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] + pub struct PointRange { + // When comparing `PointRange`, we first compare `left_user_key`, then + // `is_exclude_left_key`. Therefore the order of declaration matters. + pub left_user_key: UserKey, + /// `PointRange` represents the left user key itself if `is_exclude_left_key==false` + /// while represents the right δ Neighborhood of the left user key if + /// `is_exclude_left_key==true`. + pub is_exclude_left_key: bool, } } diff --git a/src/storage/hummock_test/benches/bench_hummock_iter.rs b/src/storage/hummock_test/benches/bench_hummock_iter.rs index 9f0cef22e4e43..f85367e408f10 100644 --- a/src/storage/hummock_test/benches/bench_hummock_iter.rs +++ b/src/storage/hummock_test/benches/bench_hummock_iter.rs @@ -107,7 +107,6 @@ fn criterion_benchmark(c: &mut Criterion) { (Unbounded, Unbounded), epoch, ReadOptions { - ignore_range_tombstone: true, prefetch_options: PrefetchOptions::default(), cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() diff --git a/src/storage/hummock_trace/src/opts.rs b/src/storage/hummock_trace/src/opts.rs index ff8b43c15c458..562e989051395 100644 --- a/src/storage/hummock_trace/src/opts.rs +++ b/src/storage/hummock_trace/src/opts.rs @@ -102,7 +102,6 @@ impl From for TableId { #[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)] pub struct TracedReadOptions { pub prefix_hint: Option, - pub ignore_range_tombstone: bool, pub prefetch_options: TracedPrefetchOptions, pub cache_policy: TracedCachePolicy, @@ -116,7 +115,6 @@ impl TracedReadOptions { pub fn for_test(table_id: u32) -> Self { Self { prefix_hint: Some(TracedBytes::from(vec![0])), - ignore_range_tombstone: true, prefetch_options: TracedPrefetchOptions { prefetch: true, for_large_query: true, diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index f61991c0fa274..f91cf5eedb563 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -39,14 +39,13 @@ use crate::hummock::compactor::{ TtlCompactionFilter, }; use crate::hummock::iterator::{ - Forward, ForwardMergeRangeIterator, HummockIterator, MergeIterator, SkipWatermarkIterator, - UserIterator, + Forward, HummockIterator, MergeIterator, SkipWatermarkIterator, UserIterator, }; use crate::hummock::multi_builder::TableBuilderFactory; use crate::hummock::sstable::DEFAULT_ENTRY_SIZE; use crate::hummock::{ CachePolicy, FilterBuilder, GetObjectId, HummockResult, MemoryLimiter, SstableBuilder, - SstableBuilderOptions, SstableDeleteRangeIterator, SstableWriterFactory, SstableWriterOptions, + SstableBuilderOptions, SstableWriterFactory, SstableWriterOptions, }; use crate::monitor::StoreLocalStatistic; @@ -349,7 +348,6 @@ pub async fn check_compaction_result( } let mut table_iters = Vec::new(); - let mut del_iter = ForwardMergeRangeIterator::default(); for level in &compact_task.input_ssts { if level.table_infos.is_empty() { continue; @@ -358,7 +356,6 @@ pub async fn check_compaction_result( // Do not need to filter the table because manager has done it. if level.level_type == PbLevelType::Nonoverlapping { debug_assert!(can_concat(&level.table_infos)); - del_iter.add_concat_iter(level.table_infos.clone(), context.sstable_store.clone()); table_iters.push(ConcatSstableIterator::new( compact_task.existing_table_ids.clone(), @@ -369,13 +366,7 @@ pub async fn check_compaction_result( context.storage_opts.compactor_iter_max_io_retry_times, )); } else { - let mut stats = StoreLocalStatistic::default(); for table_info in &level.table_infos { - let table = context - .sstable_store - .sstable(table_info, &mut stats) - .await?; - del_iter.add_sst_iter(SstableDeleteRangeIterator::new(table)); table_iters.push(ConcatSstableIterator::new( compact_task.existing_table_ids.clone(), vec![table_info.clone()], diff --git a/src/storage/src/hummock/iterator/concat_delete_range_iterator.rs b/src/storage/src/hummock/iterator/concat_delete_range_iterator.rs deleted file mode 100644 index a7c5215439bfb..0000000000000 --- a/src/storage/src/hummock/iterator/concat_delete_range_iterator.rs +++ /dev/null @@ -1,175 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::future::Future; - -use risingwave_hummock_sdk::key::{FullKey, PointRange, UserKey}; -use risingwave_hummock_sdk::sstable_info::SstableInfo; -use risingwave_hummock_sdk::HummockEpoch; - -use crate::hummock::iterator::DeleteRangeIterator; -use crate::hummock::sstable_store::SstableStoreRef; -use crate::hummock::{HummockResult, SstableDeleteRangeIterator}; -use crate::monitor::StoreLocalStatistic; - -pub struct ConcatDeleteRangeIterator { - sstables: Vec, - current: Option, - idx: usize, - sstable_store: SstableStoreRef, - stats: StoreLocalStatistic, -} - -impl ConcatDeleteRangeIterator { - pub fn new(sstables: Vec, sstable_store: SstableStoreRef) -> Self { - Self { - sstables, - sstable_store, - stats: StoreLocalStatistic::default(), - idx: 0, - current: None, - } - } - - async fn next_inner(&mut self) -> HummockResult<()> { - if let Some(iter) = self.current.as_mut() { - if iter.is_valid() { - if iter.is_last_range() - && self.idx + 1 < self.sstables.len() - && self.sstables[self.idx + 1].range_tombstone_count > 0 - && iter - .next_extended_user_key() - .left_user_key - .eq(&FullKey::decode(&self.sstables[self.idx].key_range.right).user_key) - { - // When the last range of the current sstable is equal to the first range of the - // next sstable, the `next` method would return two same `PointRange`. So we - // must skip one. - let exclusive_range_start = iter.next_extended_user_key().is_exclude_left_key; - let last_key_in_sst_start = iter - .next_extended_user_key() - .left_user_key - .eq(&FullKey::decode(&self.sstables[self.idx + 1].key_range.left).user_key); - iter.next().await?; - if !iter.is_valid() && last_key_in_sst_start { - self.seek_idx(self.idx + 1, None).await?; - let next_range = self.next_extended_user_key(); - debug_assert!(self.is_valid()); - if next_range.is_exclude_left_key == exclusive_range_start - && next_range - .left_user_key - .eq(&FullKey::decode(&self.sstables[self.idx].key_range.left) - .user_key) - { - self.current.as_mut().unwrap().next().await?; - } - return Ok(()); - } - } else { - iter.next().await?; - } - let mut idx = self.idx; - while idx + 1 < self.sstables.len() && !self.is_valid() { - self.seek_idx(idx + 1, None).await?; - idx += 1; - } - } - } - Ok(()) - } - - /// Seeks to a table, and then seeks to the key if `seek_key` is given. - async fn seek_idx( - &mut self, - idx: usize, - seek_key: Option>, - ) -> HummockResult<()> { - self.current.take(); - if idx < self.sstables.len() { - if self.sstables[idx].range_tombstone_count == 0 { - return Ok(()); - } - let table = self - .sstable_store - .sstable(&self.sstables[idx], &mut self.stats) - .await?; - let mut sstable_iter = SstableDeleteRangeIterator::new(table); - - if let Some(key) = seek_key { - sstable_iter.seek(key).await?; - } else { - sstable_iter.rewind().await?; - } - self.current = Some(sstable_iter); - self.idx = idx; - } - Ok(()) - } -} - -impl DeleteRangeIterator for ConcatDeleteRangeIterator { - type NextFuture<'a> = impl Future> + 'a; - type RewindFuture<'a> = impl Future> + 'a; - type SeekFuture<'a> = impl Future> + 'a; - - fn next_extended_user_key(&self) -> PointRange<&[u8]> { - self.current.as_ref().unwrap().next_extended_user_key() - } - - fn current_epoch(&self) -> HummockEpoch { - self.current.as_ref().unwrap().current_epoch() - } - - fn next(&mut self) -> Self::NextFuture<'_> { - self.next_inner() - } - - fn rewind(&mut self) -> Self::RewindFuture<'_> { - async move { - let mut idx = 0; - self.seek_idx(idx, None).await?; - while idx + 1 < self.sstables.len() && !self.is_valid() { - self.seek_idx(idx + 1, None).await?; - idx += 1; - } - Ok(()) - } - } - - fn seek<'a>(&'a mut self, target_user_key: UserKey<&'a [u8]>) -> Self::SeekFuture<'_> { - async move { - let mut idx = self - .sstables - .partition_point(|sst| { - FullKey::decode(&sst.key_range.left) - .user_key - .le(&target_user_key) - }) - .saturating_sub(1); // considering the boundary of 0 - self.seek_idx(idx, Some(target_user_key)).await?; - while idx + 1 < self.sstables.len() && !self.is_valid() { - self.seek_idx(idx + 1, None).await?; - idx += 1; - } - Ok(()) - } - } - - fn is_valid(&self) -> bool { - self.current - .as_ref() - .map(|iter| iter.is_valid()) - .unwrap_or(false) - } -} diff --git a/src/storage/src/hummock/iterator/delete_range_iterator.rs b/src/storage/src/hummock/iterator/delete_range_iterator.rs deleted file mode 100644 index bcc2f3e3ea26f..0000000000000 --- a/src/storage/src/hummock/iterator/delete_range_iterator.rs +++ /dev/null @@ -1,386 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::{BTreeSet, BinaryHeap}; -use std::future::Future; - -use risingwave_common::util::epoch::is_max_epoch; -use risingwave_hummock_sdk::key::{PointRange, UserKey}; -use risingwave_hummock_sdk::sstable_info::SstableInfo; -use risingwave_hummock_sdk::HummockEpoch; - -use crate::hummock::iterator::concat_delete_range_iterator::ConcatDeleteRangeIterator; -use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferDeleteRangeIterator; -use crate::hummock::sstable_store::SstableStoreRef; -use crate::hummock::{HummockResult, SstableDeleteRangeIterator}; - -/// `DeleteRangeIterator` defines the interface of all delete-range iterators, which is used to -/// filter keys deleted by some range tombstone -/// -/// After creating the iterator instance, -/// - if you want to iterate from the beginning, you need to then call its `rewind` method. -/// - if you want to iterate from some specific position, you need to then call its `seek` method. -pub trait DeleteRangeIterator { - type NextFuture<'a>: Future> + Send + 'a - where - Self: 'a; - type RewindFuture<'a>: Future> + Send + 'a - where - Self: 'a; - type SeekFuture<'a>: Future> + Send + 'a - where - Self: 'a; - /// Retrieves the next extended user key that changes current epoch. - /// - /// Note: - /// - Before calling this function, makes sure the iterator `is_valid`. - /// - This function should be straightforward and return immediately. - /// - /// # Panics - /// This function will panic if the iterator is invalid. - fn next_extended_user_key(&self) -> PointRange<&[u8]>; - - /// Retrieves the epoch of the current range delete. - /// It returns the epoch between the previous `next_user_key` (inclusive) and the current - /// `next_user_key` (not inclusive). When there is no range deletes, it will return - /// `HummockEpoch::MAX`. - /// - /// Note: - /// - Before calling this function, makes sure the iterator `is_valid`. - /// - This function should be straightforward and return immediately. - /// - /// # Panics - /// This function will panic if the iterator is invalid. - fn current_epoch(&self) -> HummockEpoch; - - /// Moves a valid iterator to the next tombstone. - /// - /// Note: - /// - Before calling this function, makes sure the iterator `is_valid`. - /// - After calling this function, you may first check whether the iterator `is_valid` again, - /// then get the new tombstone by calling `start_user_key`, `end_user_key` and - /// `current_epoch`. - /// - If the position after calling this is invalid, this function WON'T return an `Err`. You - /// should check `is_valid` before continuing the iteration. - /// - /// # Panics - /// This function will panic if the iterator is invalid. - fn next(&mut self) -> Self::NextFuture<'_>; - - /// Resets the position of the iterator. - /// - /// Note: - /// - Do not decide whether the position is valid or not by checking the returned error of this - /// function. This function WON'T return an `Err` if invalid. You should check `is_valid` - /// before starting iteration. - fn rewind(&mut self) -> Self::RewindFuture<'_>; - - /// Resets iterator and seeks to the first tombstone whose left-end >= provided key, we use this - /// method to skip tombstones which do not overlap with the provided key. - /// - /// Note: - /// - Do not decide whether the position is valid or not by checking the returned error of this - /// function. This function WON'T return an `Err` if invalid. You should check `is_valid` - /// before starting iteration. - fn seek<'a>(&'a mut self, target_user_key: UserKey<&'a [u8]>) -> Self::SeekFuture<'_>; - - /// Indicates whether the iterator can be used. - /// - /// Note: - /// - ONLY call `next_user_key`, `current_epoch` and `next` if `is_valid` returns `true`. - /// - This function should be straightforward and return immediately. - fn is_valid(&self) -> bool; -} - -pub enum RangeIteratorTyped { - Sst(SstableDeleteRangeIterator), - Batch(SharedBufferDeleteRangeIterator), - Concat(ConcatDeleteRangeIterator), -} - -impl DeleteRangeIterator for RangeIteratorTyped { - type NextFuture<'a> = impl Future> + 'a; - type RewindFuture<'a> = impl Future> + 'a; - type SeekFuture<'a> = impl Future> + 'a; - - fn next_extended_user_key(&self) -> PointRange<&[u8]> { - match self { - RangeIteratorTyped::Sst(sst) => sst.next_extended_user_key(), - RangeIteratorTyped::Batch(batch) => batch.next_extended_user_key(), - RangeIteratorTyped::Concat(batch) => batch.next_extended_user_key(), - } - } - - fn current_epoch(&self) -> HummockEpoch { - match self { - RangeIteratorTyped::Sst(sst) => sst.current_epoch(), - RangeIteratorTyped::Batch(batch) => batch.current_epoch(), - RangeIteratorTyped::Concat(batch) => batch.current_epoch(), - } - } - - fn next(&mut self) -> Self::NextFuture<'_> { - async move { - match self { - RangeIteratorTyped::Sst(sst) => sst.next().await, - RangeIteratorTyped::Batch(batch) => batch.next().await, - RangeIteratorTyped::Concat(iter) => iter.next().await, - } - } - } - - fn rewind(&mut self) -> Self::RewindFuture<'_> { - async move { - match self { - RangeIteratorTyped::Sst(sst) => sst.rewind().await, - RangeIteratorTyped::Batch(batch) => batch.rewind().await, - RangeIteratorTyped::Concat(iter) => iter.rewind().await, - } - } - } - - fn seek<'a>(&'a mut self, target_user_key: UserKey<&'a [u8]>) -> Self::SeekFuture<'_> { - async move { - match self { - RangeIteratorTyped::Sst(sst) => sst.seek(target_user_key).await, - RangeIteratorTyped::Batch(batch) => batch.seek(target_user_key).await, - RangeIteratorTyped::Concat(iter) => iter.seek(target_user_key).await, - } - } - } - - fn is_valid(&self) -> bool { - match self { - RangeIteratorTyped::Sst(sst) => sst.is_valid(), - RangeIteratorTyped::Batch(batch) => batch.is_valid(), - RangeIteratorTyped::Concat(iter) => iter.is_valid(), - } - } -} - -impl PartialEq for RangeIteratorTyped { - fn eq(&self, other: &Self) -> bool { - self.next_extended_user_key() - .eq(&other.next_extended_user_key()) - } -} - -impl PartialOrd for RangeIteratorTyped { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Eq for RangeIteratorTyped {} - -impl Ord for RangeIteratorTyped { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - other - .next_extended_user_key() - .cmp(&self.next_extended_user_key()) - } -} - -/// For each SST or batch delete range iterator, it represents the union set of delete ranges in the -/// corresponding SST/batch. Therefore delete ranges are then ordered and do not overlap with each -/// other in every `RangeIteratorTyped`. However, in each SST, since original delete ranges are -/// replaced with a union set of delete ranges, we lose exact information about whether a key -/// is deleted by a delete range in the same SST. Therefore we need to construct a -/// corresponding delete key (aka key tombstone) to represent this. -/// -/// In the `ForwardMergeRangeIterator`, assume that SST1 has delete range event -/// `<5, epoch1>`, `<8, epoch2>` and `<11, epoch3>` -/// and SST2 has delete range event -/// `<7, epoch4>`and `<9, epoch5>`. -/// Initially, `next_user_key` of `ForwardMergeRangeIterator` is 5, which is the earliest event and -/// current epochs is empty set at this time. -/// When `UserIterator` queries user key 5, `current_epochs` becomes `{epoch1}`, which means the key -/// fetched by `UserIterator` is deleted only if `key epoch <= epoch1 <= read_epoch`. -/// Simultaneously, `next_user_key` becomes 7, which means that the inequality will be kept until -/// the key fetched by `UserIterator` reaches user key 7. -/// For example, if the `UserIterator` queries user key 6 later, the delete condition is still -/// `key epoch <= epoch1 <= read_epoch`. -/// -/// When `UserIterator` queries user key 8, -/// `next_user_key` of SST1 is 11, `current_epoch` of SST1 is epoch2; -/// `next_user_key` of SST2 is 9, `current_epoch` of SST2 is epoch4; -/// Therefore `current_epochs` of `ForwardMergeRangeIterator` is `{epoch2, epoch4}`, -/// `next_user_key` of `ForwardMergeRangeIterator` is min(11, 9) == 9, -/// which means that `current_epochs` won't change until user key 9. -/// -/// We can then get the largest epoch which is not greater than read epoch in `{epoch2, epoch4}`. -/// The user key is deleted only if key epoch is below this epoch. -pub struct ForwardMergeRangeIterator { - heap: BinaryHeap, - unused_iters: Vec, - tmp_buffer: Vec, - read_epoch: HummockEpoch, - /// The correctness of the algorithm needs to be guaranteed by "the epoch of the - /// intervals covering each other must be different". - current_epochs: BTreeSet, -} - -impl Default for ForwardMergeRangeIterator { - fn default() -> Self { - ForwardMergeRangeIterator::new(HummockEpoch::MAX) - } -} - -impl ForwardMergeRangeIterator { - pub fn new(read_epoch: HummockEpoch) -> Self { - Self { - heap: BinaryHeap::new(), - unused_iters: vec![], - tmp_buffer: vec![], - read_epoch, - current_epochs: BTreeSet::new(), - } - } - - pub fn add_batch_iter(&mut self, iter: SharedBufferDeleteRangeIterator) { - self.unused_iters.push(RangeIteratorTyped::Batch(iter)); - } - - pub fn add_sst_iter(&mut self, iter: SstableDeleteRangeIterator) { - self.unused_iters.push(RangeIteratorTyped::Sst(iter)); - } - - pub fn add_concat_iter(&mut self, sstables: Vec, sstable_store: SstableStoreRef) { - self.unused_iters - .push(RangeIteratorTyped::Concat(ConcatDeleteRangeIterator::new( - sstables, - sstable_store, - ))) - } -} - -impl ForwardMergeRangeIterator { - pub async fn next_until(&mut self, target_user_key: UserKey<&[u8]>) -> HummockResult<()> { - let target_extended_user_key = PointRange::from_user_key(target_user_key, false); - while self.is_valid() && self.next_extended_user_key().le(&target_extended_user_key) { - self.next().await?; - } - Ok(()) - } - - pub fn earliest_delete_since(&self, epoch: HummockEpoch) -> HummockEpoch { - self.current_epochs - .range(epoch..) - .next() - .map_or(HummockEpoch::MAX, |ret| *ret) - } - - pub fn earliest_epoch(&self) -> HummockEpoch { - self.current_epochs - .first() - .map_or(HummockEpoch::MAX, |epoch| *epoch) - } -} - -impl DeleteRangeIterator for ForwardMergeRangeIterator { - type NextFuture<'a> = impl Future> + 'a; - type RewindFuture<'a> = impl Future> + 'a; - type SeekFuture<'a> = impl Future> + 'a; - - fn next_extended_user_key(&self) -> PointRange<&[u8]> { - self.heap.peek().unwrap().next_extended_user_key() - } - - fn current_epoch(&self) -> HummockEpoch { - self.current_epochs - .range(..=self.read_epoch) - .last() - .map_or(HummockEpoch::MIN, |epoch| *epoch) - } - - fn next(&mut self) -> Self::NextFuture<'_> { - async { - self.tmp_buffer - .push(self.heap.pop().expect("no inner iter")); - while let Some(node) = self.heap.peek() - && node.is_valid() - && node.next_extended_user_key() == self.tmp_buffer[0].next_extended_user_key() - { - self.tmp_buffer.push(self.heap.pop().unwrap()); - } - for node in &self.tmp_buffer { - let epoch = node.current_epoch(); - if !is_max_epoch(epoch) { - self.current_epochs.remove(&epoch); - } - } - // Correct because ranges in an epoch won't intersect. - for mut node in std::mem::take(&mut self.tmp_buffer) { - node.next().await?; - if node.is_valid() { - let epoch = node.current_epoch(); - if !is_max_epoch(epoch) { - self.current_epochs.insert(epoch); - } - self.heap.push(node); - } else { - // Put back to `unused_iters` - self.unused_iters.push(node); - } - } - Ok(()) - } - } - - fn rewind(&mut self) -> Self::RewindFuture<'_> { - async move { - self.current_epochs.clear(); - self.unused_iters.extend(self.heap.drain()); - for mut node in self.unused_iters.drain(..) { - node.rewind().await?; - if node.is_valid() { - let epoch = node.current_epoch(); - if !is_max_epoch(epoch) { - self.current_epochs.insert(epoch); - } - self.heap.push(node); - } - } - Ok(()) - } - } - - fn seek<'a>(&'a mut self, target_user_key: UserKey<&'a [u8]>) -> Self::SeekFuture<'_> { - async move { - self.current_epochs.clear(); - let mut iters = std::mem::take(&mut self.unused_iters); - iters.extend(self.heap.drain()); - for mut node in iters { - node.seek(target_user_key).await?; - if node.is_valid() { - let epoch = node.current_epoch(); - if !is_max_epoch(epoch) { - self.current_epochs.insert(epoch); - } - self.heap.push(node); - } else { - self.unused_iters.push(node); - } - } - Ok(()) - } - } - - fn is_valid(&self) -> bool { - self.heap - .peek() - .map(|node| node.is_valid()) - .unwrap_or(false) - } -} diff --git a/src/storage/src/hummock/iterator/mod.rs b/src/storage/src/hummock/iterator/mod.rs index fdfcd26a3a592..a205baac0aa96 100644 --- a/src/storage/src/hummock/iterator/mod.rs +++ b/src/storage/src/hummock/iterator/mod.rs @@ -46,15 +46,10 @@ use risingwave_hummock_sdk::EpochWithGap; use crate::hummock::iterator::HummockIteratorUnion::{First, Fourth, Second, Third}; pub mod change_log; -mod concat_delete_range_iterator; -mod delete_range_iterator; mod skip_watermark; #[cfg(any(test, feature = "test"))] pub mod test_utils; -pub use delete_range_iterator::{ - DeleteRangeIterator, ForwardMergeRangeIterator, RangeIteratorTyped, -}; use risingwave_common::catalog::TableId; pub use skip_watermark::*; diff --git a/src/storage/src/hummock/mod.rs b/src/storage/src/hummock/mod.rs index f10b6deee503e..8c5410f5c4cde 100644 --- a/src/storage/src/hummock/mod.rs +++ b/src/storage/src/hummock/mod.rs @@ -90,16 +90,6 @@ pub async fn get_from_sstable_info( local_stats, ) { - if !read_options.ignore_range_tombstone { - let delete_epoch = get_min_delete_range_epoch_from_sstable(&sstable, full_key.user_key); - if delete_epoch <= full_key.epoch_with_gap.pure_epoch() { - return Ok(Some(( - HummockValue::Delete, - EpochWithGap::new_from_epoch(delete_epoch), - ))); - } - } - return Ok(None); } @@ -113,17 +103,6 @@ pub async fn get_from_sstable_info( iter.seek(full_key).await?; // Iterator has sought passed the borders. if !iter.is_valid() { - if !read_options.ignore_range_tombstone { - let delete_epoch = - get_min_delete_range_epoch_from_sstable(iter.sst(), full_key.user_key); - if delete_epoch <= full_key.epoch_with_gap.pure_epoch() { - return Ok(Some(( - HummockValue::Delete, - EpochWithGap::new_from_epoch(delete_epoch), - ))); - } - } - return Ok(None); } @@ -131,16 +110,6 @@ pub async fn get_from_sstable_info( // or key next to it. let value = if iter.key().user_key == full_key.user_key { Some((iter.value().to_bytes(), iter.key().epoch_with_gap)) - } else if !read_options.ignore_range_tombstone { - let delete_epoch = get_min_delete_range_epoch_from_sstable(iter.sst(), full_key.user_key); - if delete_epoch <= full_key.epoch_with_gap.pure_epoch() { - Some(( - HummockValue::Delete, - EpochWithGap::new_from_epoch(delete_epoch), - )) - } else { - None - } } else { None }; diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs index 1f8b17fb6c662..53fccc922b2bc 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs @@ -14,7 +14,6 @@ use std::cmp::Ordering; use std::fmt::Debug; -use std::future::Future; use std::marker::PhantomData; use std::mem::size_of_val; use std::ops::Bound::Included; @@ -27,16 +26,15 @@ use bytes::Bytes; use prometheus::IntGauge; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; -use risingwave_hummock_sdk::key::{FullKey, PointRange, TableKey, TableKeyRange, UserKey}; +use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange, UserKey}; use risingwave_hummock_sdk::EpochWithGap; use crate::hummock::iterator::{ - Backward, DeleteRangeIterator, DirectionEnum, Forward, HummockIterator, - HummockIteratorDirection, ValueMeta, + Backward, DirectionEnum, Forward, HummockIterator, HummockIteratorDirection, ValueMeta, }; use crate::hummock::utils::{range_overlap, MemoryTracker}; use crate::hummock::value::HummockValue; -use crate::hummock::{HummockEpoch, HummockResult, MonotonicDeleteEvent}; +use crate::hummock::{HummockEpoch, HummockResult}; use crate::mem_table::ImmId; use crate::store::ReadOptions; @@ -844,122 +842,6 @@ impl HummockIterator } } -pub struct SharedBufferDeleteRangeIterator { - monotonic_tombstone_events: Vec, - next_idx: usize, -} - -impl SharedBufferDeleteRangeIterator { - #[cfg(any(test, feature = "test"))] - pub(crate) fn new( - epoch: HummockEpoch, - table_id: TableId, - delete_ranges: Vec<(Bound, Bound)>, - ) -> Self { - use itertools::Itertools; - let point_range_pairs = delete_ranges - .into_iter() - .map(|(left_bound, right_bound)| { - ( - match left_bound { - Bound::Excluded(x) => PointRange::from_user_key( - UserKey::new(table_id, TableKey(x.to_vec())), - true, - ), - Bound::Included(x) => PointRange::from_user_key( - UserKey::new(table_id, TableKey(x.to_vec())), - false, - ), - Bound::Unbounded => unreachable!(), - }, - match right_bound { - Bound::Excluded(x) => PointRange::from_user_key( - UserKey::new(table_id, TableKey(x.to_vec())), - false, - ), - Bound::Included(x) => PointRange::from_user_key( - UserKey::new(table_id, TableKey(x.to_vec())), - true, - ), - Bound::Unbounded => PointRange::from_user_key( - UserKey::new( - TableId::new(table_id.table_id() + 1), - TableKey::default(), - ), - false, - ), - }, - ) - }) - .collect_vec(); - let mut monotonic_tombstone_events = Vec::with_capacity(point_range_pairs.len() * 2); - for (start_point_range, end_point_range) in point_range_pairs { - monotonic_tombstone_events.push(MonotonicDeleteEvent { - event_key: start_point_range, - new_epoch: epoch, - }); - monotonic_tombstone_events.push(MonotonicDeleteEvent { - event_key: end_point_range, - new_epoch: HummockEpoch::MAX, - }); - } - Self { - monotonic_tombstone_events, - next_idx: 0, - } - } -} - -impl DeleteRangeIterator for SharedBufferDeleteRangeIterator { - type NextFuture<'a> = impl Future> + 'a; - type RewindFuture<'a> = impl Future> + 'a; - type SeekFuture<'a> = impl Future> + 'a; - - fn next_extended_user_key(&self) -> PointRange<&[u8]> { - self.monotonic_tombstone_events[self.next_idx] - .event_key - .as_ref() - } - - fn current_epoch(&self) -> HummockEpoch { - if self.next_idx > 0 { - self.monotonic_tombstone_events[self.next_idx - 1].new_epoch - } else { - HummockEpoch::MAX - } - } - - fn next(&mut self) -> Self::NextFuture<'_> { - async move { - self.next_idx += 1; - Ok(()) - } - } - - fn rewind(&mut self) -> Self::RewindFuture<'_> { - async move { - self.next_idx = 0; - Ok(()) - } - } - - fn seek<'a>(&'a mut self, target_user_key: UserKey<&'a [u8]>) -> Self::SeekFuture<'a> { - async move { - let target_extended_user_key = PointRange::from_user_key(target_user_key, false); - self.next_idx = self.monotonic_tombstone_events.partition_point( - |MonotonicDeleteEvent { event_key, .. }| { - event_key.as_ref().le(&target_extended_user_key) - }, - ); - Ok(()) - } - } - - fn is_valid(&self) -> bool { - self.next_idx < self.monotonic_tombstone_events.len() - } -} - #[cfg(test)] mod tests { use std::ops::Bound::Excluded; diff --git a/src/storage/src/hummock/sstable/builder.rs b/src/storage/src/hummock/sstable/builder.rs index 32960c7b8f97d..49caf0ba02568 100644 --- a/src/storage/src/hummock/sstable/builder.rs +++ b/src/storage/src/hummock/sstable/builder.rs @@ -12,12 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cmp; use std::collections::BTreeSet; use std::sync::Arc; use bytes::{Bytes, BytesMut}; -use risingwave_common::util::epoch::is_max_epoch; use risingwave_hummock_sdk::key::{user_key, FullKey, MAX_KEY_LEN}; use risingwave_hummock_sdk::key_range::KeyRange; use risingwave_hummock_sdk::sstable_info::SstableInfo; @@ -411,6 +409,7 @@ impl SstableBuilder { .map(|block_meta| block_meta.uncompressed_size as u64) .sum::(); + #[expect(deprecated)] let mut meta = SstableMeta { block_metas: self.block_metas, bloom_filter, @@ -450,21 +449,6 @@ impl SstableBuilder { ) }); - // Expand the epoch of the whole sst by tombstone epoch - let (tombstone_min_epoch, tombstone_max_epoch) = { - let mut tombstone_min_epoch = HummockEpoch::MAX; - let mut tombstone_max_epoch = u64::MIN; - - for monotonic_delete in &meta.monotonic_tombstone_events { - if !is_max_epoch(monotonic_delete.new_epoch) { - tombstone_min_epoch = cmp::min(tombstone_min_epoch, monotonic_delete.new_epoch); - tombstone_max_epoch = cmp::max(tombstone_max_epoch, monotonic_delete.new_epoch); - } - } - - (tombstone_min_epoch, tombstone_max_epoch) - }; - let (avg_key_size, avg_value_size) = if self.table_stats.is_empty() { (0, 0) } else { @@ -522,9 +506,9 @@ impl SstableBuilder { stale_key_count, total_key_count, uncompressed_file_size: uncompressed_file_size + meta.encoded_size() as u64, - min_epoch: cmp::min(min_epoch, tombstone_min_epoch), - max_epoch: cmp::max(max_epoch, tombstone_max_epoch), - range_tombstone_count: meta.monotonic_tombstone_events.len() as u64, + min_epoch, + max_epoch, + range_tombstone_count: 0, sst_size: meta.estimated_size as u64, }; tracing::trace!( diff --git a/src/storage/src/hummock/sstable/delete_range_aggregator.rs b/src/storage/src/hummock/sstable/delete_range_aggregator.rs deleted file mode 100644 index a0c01d8cb80ea..0000000000000 --- a/src/storage/src/hummock/sstable/delete_range_aggregator.rs +++ /dev/null @@ -1,475 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::future::Future; - -#[cfg(test)] -use risingwave_common::util::epoch::is_max_epoch; -use risingwave_hummock_sdk::key::{PointRange, UserKey}; -use risingwave_hummock_sdk::HummockEpoch; - -use super::MonotonicDeleteEvent; -use crate::hummock::iterator::{DeleteRangeIterator, ForwardMergeRangeIterator}; -use crate::hummock::sstable_store::TableHolder; -use crate::hummock::{HummockResult, Sstable}; - -pub struct CompactionDeleteRangeIterator { - inner: ForwardMergeRangeIterator, -} - -impl CompactionDeleteRangeIterator { - pub fn new(inner: ForwardMergeRangeIterator) -> Self { - Self { inner } - } - - pub async fn next(&mut self) -> HummockResult<()> { - self.inner.next().await - } - - #[cfg(test)] - pub async fn get_tombstone_between( - self, - smallest_user_key: UserKey<&[u8]>, - largest_user_key: UserKey<&[u8]>, - ) -> HummockResult> { - let mut iter = self; - iter.seek(smallest_user_key).await?; - let extended_smallest_user_key = PointRange::from_user_key(smallest_user_key, false); - let extended_largest_user_key = PointRange::from_user_key(largest_user_key, false); - let mut monotonic_events = vec![]; - if !is_max_epoch(iter.earliest_epoch()) { - monotonic_events.push(MonotonicDeleteEvent { - event_key: extended_smallest_user_key.to_vec(), - new_epoch: iter.earliest_epoch(), - }); - } - - while iter.is_valid() { - if !extended_largest_user_key.is_empty() && iter.key().ge(&extended_largest_user_key) { - if !monotonic_events.is_empty() { - monotonic_events.push(MonotonicDeleteEvent { - event_key: extended_largest_user_key.to_vec(), - new_epoch: HummockEpoch::MAX, - }); - } - break; - } - - let event_key = iter.key().to_vec(); - iter.next().await?; - - monotonic_events.push(MonotonicDeleteEvent { - new_epoch: iter.earliest_epoch(), - event_key, - }); - } - - monotonic_events.dedup_by(|a, b| { - a.event_key.left_user_key.table_id == b.event_key.left_user_key.table_id - && a.new_epoch == b.new_epoch - }); - if !monotonic_events.is_empty() { - assert!(!is_max_epoch(monotonic_events.first().unwrap().new_epoch)); - assert!(is_max_epoch(monotonic_events.last().unwrap().new_epoch)); - } - Ok(monotonic_events) - } - - /// Return the earliest range-tombstone which deletes target-key. - /// Target-key must be given in order. - #[cfg(test)] - pub async fn earliest_delete_which_can_see_key_for_test( - &mut self, - target_user_key: UserKey<&[u8]>, - epoch: HummockEpoch, - ) -> HummockResult { - let target_extended_user_key = PointRange::from_user_key(target_user_key, false); - while self.inner.is_valid() - && self - .inner - .next_extended_user_key() - .le(&target_extended_user_key) - { - self.inner.next().await?; - } - Ok(self.earliest_delete_since(epoch)) - } - - pub fn key(&self) -> PointRange<&[u8]> { - self.inner.next_extended_user_key() - } - - pub fn is_valid(&self) -> bool { - self.inner.is_valid() - } - - pub fn earliest_epoch(&self) -> HummockEpoch { - self.inner.earliest_epoch() - } - - pub fn earliest_delete_since(&self, epoch: HummockEpoch) -> HummockEpoch { - self.inner.earliest_delete_since(epoch) - } - - /// seek to the first key which larger than `target_user_key`. - pub async fn seek<'a>(&'a mut self, target_user_key: UserKey<&'a [u8]>) -> HummockResult<()> { - self.inner.seek(target_user_key).await - } - - pub async fn rewind(&mut self) -> HummockResult<()> { - self.inner.rewind().await - } -} - -pub struct SstableDeleteRangeIterator { - table: TableHolder, - next_idx: usize, -} - -impl SstableDeleteRangeIterator { - pub fn new(table: TableHolder) -> Self { - Self { table, next_idx: 0 } - } - - /// Retrieves whether `next_extended_user_key` is the last range of this SST file. - /// - /// Note: - /// - Before calling this function, makes sure the iterator `is_valid`. - /// - This function should return immediately. - /// - /// # Panics - /// This function will panic if the iterator is invalid. - pub fn is_last_range(&self) -> bool { - debug_assert!(self.next_idx < self.table.meta.monotonic_tombstone_events.len()); - self.next_idx + 1 == self.table.meta.monotonic_tombstone_events.len() - } -} - -impl DeleteRangeIterator for SstableDeleteRangeIterator { - type NextFuture<'a> = impl Future> + 'a; - type RewindFuture<'a> = impl Future> + 'a; - type SeekFuture<'a> = impl Future> + 'a; - - fn next_extended_user_key(&self) -> PointRange<&[u8]> { - self.table.meta.monotonic_tombstone_events[self.next_idx] - .event_key - .as_ref() - } - - fn current_epoch(&self) -> HummockEpoch { - if self.next_idx > 0 { - self.table.meta.monotonic_tombstone_events[self.next_idx - 1].new_epoch - } else { - HummockEpoch::MAX - } - } - - fn next(&mut self) -> Self::NextFuture<'_> { - async move { - self.next_idx += 1; - Ok(()) - } - } - - fn rewind(&mut self) -> Self::RewindFuture<'_> { - async move { - self.next_idx = 0; - Ok(()) - } - } - - fn seek<'a>(&'a mut self, target_user_key: UserKey<&'a [u8]>) -> Self::SeekFuture<'_> { - async move { - let target_extended_user_key = PointRange::from_user_key(target_user_key, false); - self.next_idx = self.table.meta.monotonic_tombstone_events.partition_point( - |MonotonicDeleteEvent { event_key, .. }| { - event_key.as_ref().le(&target_extended_user_key) - }, - ); - Ok(()) - } - } - - fn is_valid(&self) -> bool { - self.next_idx < self.table.meta.monotonic_tombstone_events.len() - } -} - -pub fn get_min_delete_range_epoch_from_sstable( - table: &Sstable, - query_user_key: UserKey<&[u8]>, -) -> HummockEpoch { - let query_extended_user_key = PointRange::from_user_key(query_user_key, false); - let idx = table.meta.monotonic_tombstone_events.partition_point( - |MonotonicDeleteEvent { event_key, .. }| event_key.as_ref().le(&query_extended_user_key), - ); - if idx == 0 { - HummockEpoch::MAX - } else { - table.meta.monotonic_tombstone_events[idx - 1].new_epoch - } -} - -#[cfg(test)] -mod tests { - use std::ops::Bound; - - use bytes::Bytes; - use risingwave_common::catalog::TableId; - use risingwave_common::util::epoch::test_epoch; - - use super::*; - use crate::hummock::test_utils::delete_range::CompactionDeleteRangesBuilder; - use crate::hummock::test_utils::test_user_key; - - #[tokio::test] - pub async fn test_compaction_delete_range_iterator() { - let mut builder = CompactionDeleteRangesBuilder::default(); - let table_id = TableId::default(); - builder.add_delete_events_for_test( - 9, - table_id, - vec![ - ( - Bound::Included(Bytes::copy_from_slice(b"aaaaaa")), - Bound::Excluded(Bytes::copy_from_slice(b"bbbddd")), - ), - ( - Bound::Included(Bytes::copy_from_slice(b"bbbfff")), - Bound::Excluded(Bytes::copy_from_slice(b"ffffff")), - ), - ( - Bound::Included(Bytes::copy_from_slice(b"gggggg")), - Bound::Excluded(Bytes::copy_from_slice(b"hhhhhh")), - ), - ], - ); - builder.add_delete_events_for_test( - 12, - table_id, - vec![( - Bound::Included(Bytes::copy_from_slice(b"aaaaaa")), - Bound::Excluded(Bytes::copy_from_slice(b"bbbccc")), - )], - ); - builder.add_delete_events_for_test( - 8, - table_id, - vec![( - Bound::Excluded(Bytes::copy_from_slice(b"bbbeee")), - Bound::Included(Bytes::copy_from_slice(b"eeeeee")), - )], - ); - builder.add_delete_events_for_test( - 6, - table_id, - vec![( - Bound::Included(Bytes::copy_from_slice(b"bbbaab")), - Bound::Excluded(Bytes::copy_from_slice(b"bbbdddf")), - )], - ); - builder.add_delete_events_for_test( - 7, - table_id, - vec![( - Bound::Excluded(Bytes::copy_from_slice(b"hhhhhh")), - Bound::Unbounded, - )], - ); - let mut iter = builder.build_for_compaction(); - iter.rewind().await.unwrap(); - - assert_eq!( - iter.earliest_delete_which_can_see_key_for_test( - test_user_key(b"bbb").as_ref(), - test_epoch(13) - ) - .await - .unwrap(), - HummockEpoch::MAX, - ); - assert_eq!( - iter.earliest_delete_which_can_see_key_for_test( - test_user_key(b"bbb").as_ref(), - test_epoch(11) - ) - .await - .unwrap(), - test_epoch(12) - ); - assert_eq!( - iter.earliest_delete_which_can_see_key_for_test( - test_user_key(b"bbb").as_ref(), - test_epoch(8) - ) - .await - .unwrap(), - test_epoch(9) - ); - assert_eq!( - iter.earliest_delete_which_can_see_key_for_test( - test_user_key(b"bbbaaa").as_ref(), - test_epoch(8) - ) - .await - .unwrap(), - test_epoch(9) - ); - assert_eq!( - iter.earliest_delete_which_can_see_key_for_test( - test_user_key(b"bbbccd").as_ref(), - test_epoch(8) - ) - .await - .unwrap(), - test_epoch(9) - ); - - assert_eq!( - iter.earliest_delete_which_can_see_key_for_test( - test_user_key(b"bbbddd").as_ref(), - test_epoch(8) - ) - .await - .unwrap(), - HummockEpoch::MAX, - ); - assert_eq!( - iter.earliest_delete_which_can_see_key_for_test( - test_user_key(b"bbbeee").as_ref(), - test_epoch(8) - ) - .await - .unwrap(), - HummockEpoch::MAX, - ); - - assert_eq!( - iter.earliest_delete_which_can_see_key_for_test( - test_user_key(b"bbbeef").as_ref(), - test_epoch(10) - ) - .await - .unwrap(), - HummockEpoch::MAX, - ); - assert_eq!( - iter.earliest_delete_which_can_see_key_for_test( - test_user_key(b"eeeeee").as_ref(), - test_epoch(8) - ) - .await - .unwrap(), - test_epoch(8) - ); - assert_eq!( - iter.earliest_delete_which_can_see_key_for_test( - test_user_key(b"gggggg").as_ref(), - test_epoch(8) - ) - .await - .unwrap(), - test_epoch(9) - ); - assert_eq!( - iter.earliest_delete_which_can_see_key_for_test( - test_user_key(b"hhhhhh").as_ref(), - test_epoch(6) - ) - .await - .unwrap(), - HummockEpoch::MAX, - ); - assert_eq!( - iter.earliest_delete_which_can_see_key_for_test( - test_user_key(b"iiiiii").as_ref(), - test_epoch(6) - ) - .await - .unwrap(), - test_epoch(7) - ); - } - - #[tokio::test] - pub async fn test_delete_range_split() { - let table_id = TableId::default(); - let mut builder = CompactionDeleteRangesBuilder::default(); - builder.add_delete_events_for_test( - 13, - table_id, - vec![( - Bound::Included(Bytes::copy_from_slice(b"aaaa")), - Bound::Excluded(Bytes::copy_from_slice(b"cccc")), - )], - ); - builder.add_delete_events_for_test( - 10, - table_id, - vec![( - Bound::Excluded(Bytes::copy_from_slice(b"cccc")), - Bound::Excluded(Bytes::copy_from_slice(b"dddd")), - )], - ); - builder.add_delete_events_for_test( - 12, - table_id, - vec![( - Bound::Included(Bytes::copy_from_slice(b"cccc")), - Bound::Included(Bytes::copy_from_slice(b"eeee")), - )], - ); - builder.add_delete_events_for_test( - 15, - table_id, - vec![( - Bound::Excluded(Bytes::copy_from_slice(b"eeee")), - Bound::Excluded(Bytes::copy_from_slice(b"ffff")), - )], - ); - let compaction_delete_range = builder.build_for_compaction(); - let split_ranges = compaction_delete_range - .get_tombstone_between( - test_user_key(b"bbbb").as_ref(), - test_user_key(b"eeeeee").as_ref(), - ) - .await - .unwrap(); - assert_eq!(6, split_ranges.len()); - assert_eq!( - PointRange::from_user_key(test_user_key(b"bbbb"), false), - split_ranges[0].event_key - ); - assert_eq!( - PointRange::from_user_key(test_user_key(b"cccc"), false), - split_ranges[1].event_key - ); - assert_eq!( - PointRange::from_user_key(test_user_key(b"cccc"), true), - split_ranges[2].event_key - ); - assert_eq!( - PointRange::from_user_key(test_user_key(b"dddd"), false), - split_ranges[3].event_key - ); - assert_eq!( - PointRange::from_user_key(test_user_key(b"eeee"), true), - split_ranges[4].event_key - ); - assert_eq!( - PointRange::from_user_key(test_user_key(b"eeeeee"), false), - split_ranges[5].event_key - ); - } -} diff --git a/src/storage/src/hummock/sstable/forward_sstable_iterator.rs b/src/storage/src/hummock/sstable/forward_sstable_iterator.rs index fe258db352143..287c387dd3270 100644 --- a/src/storage/src/hummock/sstable/forward_sstable_iterator.rs +++ b/src/storage/src/hummock/sstable/forward_sstable_iterator.rs @@ -101,10 +101,6 @@ impl SstableIterator { } } - pub(crate) fn sst(&self) -> &TableHolder { - &self.sst - } - /// Seeks to a block, and then seeks to the key if `seek_key` is given. async fn seek_idx( &mut self, diff --git a/src/storage/src/hummock/sstable/mod.rs b/src/storage/src/hummock/sstable/mod.rs index 1125cc919bd58..8460f179d9527 100644 --- a/src/storage/src/hummock/sstable/mod.rs +++ b/src/storage/src/hummock/sstable/mod.rs @@ -18,7 +18,7 @@ mod block; use std::collections::HashSet; -use std::fmt::{Debug, Display, Formatter}; +use std::fmt::{Debug, Formatter}; use std::ops::{BitXor, Bound, Range}; pub use block::*; @@ -43,20 +43,13 @@ pub use forward_sstable_iterator::*; use tracing::warn; mod backward_sstable_iterator; pub use backward_sstable_iterator::*; -use risingwave_hummock_sdk::key::{ - FullKey, KeyPayloadType, PointRange, TableKey, UserKey, UserKeyRangeRef, -}; +use risingwave_hummock_sdk::key::{FullKey, KeyPayloadType, UserKey, UserKeyRangeRef}; use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId}; -mod delete_range_aggregator; mod filter; mod sstable_object_id_manager; mod utils; -pub use delete_range_aggregator::{ - get_min_delete_range_epoch_from_sstable, CompactionDeleteRangeIterator, - SstableDeleteRangeIterator, -}; pub use filter::FilterBuilder; pub use sstable_object_id_manager::*; pub use utils::CompressionAlgorithm; @@ -72,69 +65,6 @@ const MAGIC: u32 = 0x5785ab73; const OLD_VERSION: u32 = 1; const VERSION: u32 = 2; -#[derive(Clone, PartialEq, Eq, Debug)] -// delete keys located in [start_user_key, end_user_key) -pub struct DeleteRangeTombstone { - pub start_user_key: PointRange>, - pub end_user_key: PointRange>, - pub sequence: HummockEpoch, -} - -impl PartialOrd for DeleteRangeTombstone { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for DeleteRangeTombstone { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.start_user_key - .cmp(&other.start_user_key) - .then_with(|| self.end_user_key.cmp(&other.end_user_key)) - .then_with(|| other.sequence.cmp(&self.sequence)) - } -} - -impl DeleteRangeTombstone { - pub fn new( - table_id: TableId, - start_table_key: Vec, - is_left_open: bool, - end_table_key: Vec, - is_right_close: bool, - sequence: HummockEpoch, - ) -> Self { - Self { - start_user_key: PointRange::from_user_key( - UserKey::new(table_id, TableKey(start_table_key)), - is_left_open, - ), - end_user_key: PointRange::from_user_key( - UserKey::new(table_id, TableKey(end_table_key)), - is_right_close, - ), - sequence, - } - } - - #[cfg(test)] - pub fn new_for_test( - table_id: TableId, - start_table_key: Vec, - end_table_key: Vec, - sequence: HummockEpoch, - ) -> Self { - Self::new( - table_id, - start_table_key, - false, - end_table_key, - false, - sequence, - ) - } -} - /// Assume that watermark1 is 5, watermark2 is 7, watermark3 is 11, delete ranges /// `{ [0, wmk1) in epoch1, [wmk1, wmk2) in epoch2, [wmk2, wmk3) in epoch3 }` /// can be transformed into events below: @@ -148,24 +78,14 @@ impl DeleteRangeTombstone { /// next event key wmk2 (7) (not inclusive). /// If there is no range deletes between current event key and next event key, `new_epoch` will be /// `HummockEpoch::MAX`. -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] pub struct MonotonicDeleteEvent { - pub event_key: PointRange>, + pub event_key: + risingwave_hummock_sdk::key::range_delete_backward_compatibility_serde_struct::PointRange, pub new_epoch: HummockEpoch, } impl MonotonicDeleteEvent { - #[cfg(test)] - pub fn new(table_id: TableId, event_key: Vec, new_epoch: HummockEpoch) -> Self { - Self { - event_key: PointRange::from_user_key( - UserKey::new(table_id, TableKey(event_key)), - false, - ), - new_epoch, - } - } - pub fn encode(&self, mut buf: impl BufMut) { self.event_key .left_user_key @@ -179,6 +99,7 @@ impl MonotonicDeleteEvent { } pub fn decode(buf: &mut &[u8]) -> Self { + use risingwave_hummock_sdk::key::range_delete_backward_compatibility_serde_struct::*; let user_key = UserKey::decode_length_prefixed(buf); let exclude_left_key_flag = buf.get_u8(); let is_exclude_left_key = match exclude_left_key_flag { @@ -188,26 +109,13 @@ impl MonotonicDeleteEvent { }; let new_epoch = buf.get_u64_le(); Self { - event_key: PointRange::from_user_key(user_key, is_exclude_left_key), + event_key: PointRange { + left_user_key: user_key, + is_exclude_left_key, + }, new_epoch, } } - - #[inline] - pub fn encoded_size(&self) -> usize { - // length prefixed requires 4B more than its `encoded_len()` - 4 + self.event_key.left_user_key.encoded_len() + 1 + 8 - } -} - -impl Display for MonotonicDeleteEvent { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!( - f, - "Event key {:?} epoch {:?}", - self.event_key, self.new_epoch - ) - } } #[derive(Serialize, Deserialize)] @@ -387,6 +295,7 @@ pub struct SstableMeta { /// key wmk1 (5) and till the next event key wmk2 (7) (not inclusive). /// If there is no range deletes between current event key and next event key, `new_epoch` will /// be `HummockEpoch::MAX`. + #[deprecated] pub monotonic_tombstone_events: Vec, /// Format version, for further compatibility. pub version: u32, @@ -435,6 +344,7 @@ impl SstableMeta { buf.put_u32_le(self.key_count); put_length_prefixed_slice(&mut buf, &self.smallest_key); put_length_prefixed_slice(&mut buf, &self.largest_key); + #[expect(deprecated)] buf.put_u32_le( utils::checked_into_u32(self.monotonic_tombstone_events.len()).unwrap_or_else(|_| { let tmp_full_key = FullKey::decode(&self.smallest_key); @@ -445,6 +355,7 @@ impl SstableMeta { ) }), ); + #[expect(deprecated)] for monotonic_tombstone_event in &self.monotonic_tombstone_events { monotonic_tombstone_event.encode(&mut buf); } @@ -513,6 +424,7 @@ impl SstableMeta { "read non-empty range tombstones"); } + #[expect(deprecated)] Ok(Self { block_metas, bloom_filter, @@ -535,11 +447,6 @@ impl SstableMeta { .map(|block_meta| block_meta.encoded_size()) .sum::() + 4 // monotonic tombstone events len - + self - .monotonic_tombstone_events - .iter() - .map(|event| event.encoded_size()) - .sum::() + 4 // bloom filter len + self.bloom_filter.len() + 4 // estimated size @@ -585,6 +492,7 @@ mod tests { #[test] fn test_sstable_meta_enc_dec() { + #[expect(deprecated)] let meta = SstableMeta { block_metas: vec![ BlockMeta { diff --git a/src/storage/src/hummock/sstable/writer.rs b/src/storage/src/hummock/sstable/writer.rs index 99347411fef6e..9c0f3f06f1c28 100644 --- a/src/storage/src/hummock/sstable/writer.rs +++ b/src/storage/src/hummock/sstable/writer.rs @@ -324,7 +324,7 @@ impl SstableWriter for StreamingUploadWriter { t }); - assert!(!meta.block_metas.is_empty() || !meta.monotonic_tombstone_events.is_empty()); + assert!(!meta.block_metas.is_empty()); // Upload data to object store. self.object_uploader.finish().await?; @@ -496,6 +496,7 @@ mod tests { }); blocks.push(data.slice((i * 1000) as usize..((i + 1) * 1000) as usize)); } + #[expect(deprecated)] let meta = SstableMeta { block_metas, bloom_filter: vec![], diff --git a/src/storage/src/hummock/test_utils.rs b/src/storage/src/hummock/test_utils.rs index 8d96e29f5426d..7f3bad46b644a 100644 --- a/src/storage/src/hummock/test_utils.rs +++ b/src/storage/src/hummock/test_utils.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeSet; -use std::ops::Bound; use std::sync::Arc; use bytes::Bytes; @@ -26,26 +24,23 @@ use risingwave_common::catalog::TableId; use risingwave_common::config::EvictionConfig; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::test_epoch; -use risingwave_hummock_sdk::key::{FullKey, PointRange, TableKey, UserKey}; +use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey}; use risingwave_hummock_sdk::key_range::KeyRange; use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, HummockSstableObjectId}; use super::iterator::test_utils::iterator_test_table_key_of; use super::{ - HummockResult, InMemWriter, MonotonicDeleteEvent, SstableMeta, SstableWriterOptions, - DEFAULT_RESTART_INTERVAL, + HummockResult, InMemWriter, SstableMeta, SstableWriterOptions, DEFAULT_RESTART_INTERVAL, }; use crate::filter_key_extractor::{FilterKeyExtractorImpl, FullKeyFilterKeyExtractor}; -use crate::hummock::iterator::ForwardMergeRangeIterator; use crate::hummock::shared_buffer::shared_buffer_batch::{ SharedBufferBatch, SharedBufferItem, SharedBufferValue, }; use crate::hummock::value::HummockValue; use crate::hummock::{ - BlockedXor16FilterBuilder, CachePolicy, CompactionDeleteRangeIterator, DeleteRangeTombstone, - FilterBuilder, LruCache, Sstable, SstableBuilder, SstableBuilderOptions, SstableStoreRef, - SstableWriter, TableHolder, Xor16FilterBuilder, + BlockedXor16FilterBuilder, CachePolicy, FilterBuilder, LruCache, Sstable, SstableBuilder, + SstableBuilderOptions, SstableStoreRef, SstableWriter, TableHolder, Xor16FilterBuilder, }; use crate::monitor::StoreLocalStatistic; use crate::opts::StorageOpts; @@ -387,167 +382,3 @@ where .await .unwrap() } - -pub mod delete_range { - use super::*; - use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferDeleteRangeIterator; - - #[derive(Default)] - pub struct CompactionDeleteRangesBuilder { - iter: ForwardMergeRangeIterator, - } - - impl CompactionDeleteRangesBuilder { - pub fn add_delete_events_for_test( - &mut self, - epoch: HummockEpoch, - table_id: TableId, - delete_ranges: Vec<(Bound, Bound)>, - ) { - self.iter - .add_batch_iter(SharedBufferDeleteRangeIterator::new( - test_epoch(epoch), - table_id, - delete_ranges, - )); - } - - pub fn build_for_compaction(self) -> CompactionDeleteRangeIterator { - CompactionDeleteRangeIterator::new(self.iter) - } - } - - /// Assume that watermark1 is 5, watermark2 is 7, watermark3 is 11, delete ranges - /// `{ [0, wmk1) in epoch1, [wmk1, wmk2) in epoch2, [wmk2, wmk3) in epoch3 }` - /// can be transformed into events below: - /// `{ <0, +epoch1> }` - fn build_events( - delete_tombstones: &Vec, - ) -> Vec { - let tombstone_len = delete_tombstones.len(); - let mut events = Vec::with_capacity(tombstone_len * 2); - for DeleteRangeTombstone { - start_user_key, - end_user_key, - sequence, - } in delete_tombstones - { - events.push((start_user_key, 1, *sequence)); - events.push((end_user_key, 0, *sequence)); - } - events.sort(); - - let mut result = Vec::with_capacity(events.len()); - for (user_key, group) in &events.into_iter().group_by(|(user_key, _, _)| *user_key) { - let (mut exit, mut enter) = (vec![], vec![]); - for (_, op, sequence) in group { - match op { - 0 => exit.push(TombstoneEnterExitEvent { - tombstone_epoch: sequence, - }), - 1 => { - enter.push(TombstoneEnterExitEvent { - tombstone_epoch: sequence, - }); - } - _ => unreachable!(), - } - } - result.push((user_key.clone(), exit, enter)); - } - - result - } - - pub fn create_monotonic_events( - mut delete_range_tombstones: Vec, - ) -> Vec { - delete_range_tombstones.sort(); - let events = build_events(&delete_range_tombstones); - create_monotonic_events_from_compaction_delete_events(events) - } - - fn create_monotonic_events_from_compaction_delete_events( - compaction_delete_range_events: Vec, - ) -> Vec { - let mut epochs = BTreeSet::new(); - let mut monotonic_tombstone_events = - Vec::with_capacity(compaction_delete_range_events.len()); - for event in compaction_delete_range_events { - apply_event(&mut epochs, &event); - monotonic_tombstone_events.push(MonotonicDeleteEvent { - event_key: event.0, - new_epoch: epochs.first().map_or(HummockEpoch::MAX, |epoch| *epoch), - }); - } - monotonic_tombstone_events.dedup_by(|a, b| { - a.event_key.left_user_key.table_id == b.event_key.left_user_key.table_id - && a.new_epoch == b.new_epoch - }); - monotonic_tombstone_events - } - - #[derive(Clone)] - #[cfg(any(test, feature = "test"))] - pub struct TombstoneEnterExitEvent { - pub(crate) tombstone_epoch: HummockEpoch, - } - - #[cfg(any(test, feature = "test"))] - pub type CompactionDeleteRangeEvent = ( - // event key - PointRange>, - // Old tombstones which exits at the event key - Vec, - // New tombstones which enters at the event key - Vec, - ); - /// We introduce `event` to avoid storing excessive range tombstones after compaction if there are - /// overlaps among range tombstones among different SSTs/batchs in compaction. - /// The core idea contains two parts: - /// 1) we only need to keep the smallest epoch of the overlapping - /// range tomstone intervals since the key covered by the range tombstone in lower level must have - /// smaller epoches; - /// 2) due to 1), we lose the information to delete a key by tombstone in a single - /// SST so we add a tombstone key in the data block. - /// We leverage `events` to calculate the epoch information mentioned above. - /// - /// e.g. Delete range [1, 5) at epoch1, delete range [3, 7) at epoch2 and delete range [10, 12) at - /// epoch3 will first be transformed into `events` below: - /// - /// `<1, +epoch1> <5, -epoch1> <3, +epoch2> <7, -epoch2> <10, +epoch3> <12, -epoch3>` - /// - /// Then `events` are sorted by user key: - /// - /// `<1, +epoch1> <3, +epoch2> <5, -epoch1> <7, -epoch2> <10, +epoch3> <12, -epoch3>` - /// - /// We rely on the fact that keys met in compaction are in order. - /// - /// When user key 0 comes, no events have happened yet so no range delete epoch. (will be - /// represented as range delete epoch MAX EPOCH) - /// - /// When user key 1 comes, event `<1, +epoch1>` happens so there is currently one range delete - /// epoch: epoch1. - /// - /// When user key 2 comes, no more events happen so the set remains `{epoch1}`. - /// - /// When user key 3 comes, event `<3, +epoch2>` happens so the range delete epoch set is now - /// `{epoch1, epoch2}`. - /// - /// When user key 5 comes, event `<5, -epoch1>` happens so epoch1 exits the set, - /// therefore the current range delete epoch set is `{epoch2}`. - /// - /// When user key 11 comes, events `<7, -epoch2>` and `<10, +epoch3>` - /// both happen, one after another. The set changes to `{epoch3}` from `{epoch2}`. - pub fn apply_event(epochs: &mut BTreeSet, event: &CompactionDeleteRangeEvent) { - let (_, exit, enter) = event; - // Correct because ranges in an epoch won't intersect. - for TombstoneEnterExitEvent { tombstone_epoch } in exit { - epochs.remove(tombstone_epoch); - } - for TombstoneEnterExitEvent { tombstone_epoch } in enter { - epochs.insert(*tombstone_epoch); - } - } -} diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index ab80f712570ca..db21faa78c6cf 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -493,7 +493,6 @@ pub struct ReadOptions { /// If the `prefix_hint` is not None, it should be included in /// `key` or `key_range` in the read API. pub prefix_hint: Option, - pub ignore_range_tombstone: bool, pub prefetch_options: PrefetchOptions, pub cache_policy: CachePolicy, @@ -509,7 +508,6 @@ impl From for ReadOptions { fn from(value: TracedReadOptions) -> Self { Self { prefix_hint: value.prefix_hint.map(|b| b.into()), - ignore_range_tombstone: value.ignore_range_tombstone, prefetch_options: value.prefetch_options.into(), cache_policy: value.cache_policy.into(), retention_seconds: value.retention_seconds, @@ -524,7 +522,6 @@ impl From for TracedReadOptions { fn from(value: ReadOptions) -> Self { Self { prefix_hint: value.prefix_hint.map(|b| b.into()), - ignore_range_tombstone: value.ignore_range_tombstone, prefetch_options: value.prefetch_options.into(), cache_policy: value.cache_policy.into(), retention_seconds: value.retention_seconds, diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 8c5f432f46c57..254e8e73095b1 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -503,7 +503,6 @@ impl StorageTableInner { read_committed, prefetch_options, cache_policy, - ..Default::default() }; let pk_serializer = match self.output_row_in_key_indices.is_empty() { true => None, diff --git a/src/tests/compaction_test/Cargo.toml b/src/tests/compaction_test/Cargo.toml index 330e8b2a3637f..3bc86649ea3b3 100644 --- a/src/tests/compaction_test/Cargo.toml +++ b/src/tests/compaction_test/Cargo.toml @@ -56,10 +56,5 @@ name = "compaction-test" path = "src/bin/compaction.rs" test = false -[[bin]] -name = "delete-range-test" -path = "src/bin/delete_range.rs" -test = false - [lints] workspace = true diff --git a/src/tests/compaction_test/src/bin/delete_range.rs b/src/tests/compaction_test/src/bin/delete_range.rs deleted file mode 100644 index 1861ca1b9b03f..0000000000000 --- a/src/tests/compaction_test/src/bin/delete_range.rs +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#![cfg_attr(coverage, feature(coverage_attribute))] - -#[cfg_attr(coverage, coverage(off))] -fn main() { - // Since we decide to record watermark in every state-table to replace delete-range, this test is not need again. We keep it because we may need delete-range in some day for other features. - use clap::Parser; - - let opts = risingwave_compaction_test::CompactionTestOpts::parse(); - - risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::default()); - - risingwave_rt::main_okk(|_| risingwave_compaction_test::start_delete_range(opts)) -} diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs deleted file mode 100644 index 4fd246208b69a..0000000000000 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ /dev/null @@ -1,662 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashSet; -use std::future::Future; -use std::ops::{Bound, RangeBounds}; -use std::pin::{pin, Pin}; -use std::sync::Arc; -use std::time::{Duration, SystemTime}; - -use bytes::Bytes; -use foyer::{CacheContext, HybridCacheBuilder}; -use rand::rngs::StdRng; -use rand::{RngCore, SeedableRng}; -use risingwave_common::catalog::TableId; -use risingwave_common::config::{ - extract_storage_memory_config, load_config, NoOverride, ObjectStoreConfig, RwConfig, -}; -use risingwave_common::system_param::reader::SystemParamsRead; -use risingwave_common::util::epoch::{test_epoch, EpochExt}; -use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; -use risingwave_hummock_sdk::key::TableKey; -use risingwave_hummock_test::get_notification_client_for_test; -use risingwave_hummock_test::local_state_store_test_utils::LocalStateStoreTestExt; -use risingwave_meta::hummock::compaction::compaction_config::CompactionConfigBuilder; -use risingwave_meta::hummock::test_utils::setup_compute_env_with_config; -use risingwave_meta::hummock::MockHummockMetaClient; -use risingwave_object_store::object::build_remote_object_store; -use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; -use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable}; -use risingwave_pb::hummock::{CompactionConfig, CompactionGroupInfo}; -use risingwave_pb::meta::SystemParams; -use risingwave_rpc_client::HummockMetaClient; -use risingwave_storage::filter_key_extractor::{ - FilterKeyExtractorImpl, FilterKeyExtractorManager, FullKeyFilterKeyExtractor, - RpcFilterKeyExtractorManager, -}; -use risingwave_storage::hummock::compactor::{ - start_compactor, CompactionExecutor, CompactorContext, -}; -use risingwave_storage::hummock::sstable_store::SstableStoreRef; -use risingwave_storage::hummock::utils::cmp_delete_range_left_bounds; -use risingwave_storage::hummock::{ - CachePolicy, HummockStorage, MemoryLimiter, SstableObjectIdManager, SstableStore, - SstableStoreConfig, -}; -use risingwave_storage::monitor::{CompactorMetrics, HummockStateStoreMetrics}; -use risingwave_storage::opts::StorageOpts; -use risingwave_storage::store::{ - LocalStateStore, NewLocalOptions, PrefetchOptions, ReadOptions, SealCurrentEpochOptions, -}; -use risingwave_storage::{StateStore, StateStoreIter}; - -use crate::CompactionTestOpts; -pub fn start_delete_range(opts: CompactionTestOpts) -> Pin + Send>> { - // WARNING: don't change the function signature. Making it `async fn` will cause - // slow compile in release mode. - Box::pin(async move { - tracing::info!("Compaction delete-range test start with options {:?}", opts); - let prefix = opts.state_store.strip_prefix("hummock+"); - match prefix { - Some(s) => { - assert!( - s.starts_with("s3://") || s.starts_with("minio://"), - "Only support S3 and MinIO object store" - ); - } - None => { - panic!("Invalid state store"); - } - } - let ret = compaction_test_main(opts).await; - - match ret { - Ok(_) => { - tracing::info!("Compaction delete-range test Success"); - } - Err(e) => { - panic!("Compaction delete-range test Fail: {}", e); - } - } - }) -} -pub async fn compaction_test_main(opts: CompactionTestOpts) -> anyhow::Result<()> { - let config = load_config(&opts.config_path, NoOverride); - let compaction_config = - CompactionConfigBuilder::with_opt(&config.meta.compaction_config).build(); - compaction_test( - compaction_config, - config, - &opts.state_store, - 1000000, - 800, - 1, - ) - .await -} - -async fn compaction_test( - compaction_config: CompactionConfig, - config: RwConfig, - state_store_type: &str, - test_range: u64, - test_count: u64, - test_delete_ratio: u32, -) -> anyhow::Result<()> { - let (env, hummock_manager_ref, _cluster_manager_ref, worker_node) = - setup_compute_env_with_config(8080, compaction_config.clone()).await; - let meta_client = Arc::new(MockHummockMetaClient::new( - hummock_manager_ref.clone(), - worker_node.id, - )); - - let delete_key_table = PbTable { - id: 1, - schema_id: 1, - database_id: 1, - name: "delete-key-table".to_string(), - columns: vec![], - pk: vec![], - dependent_relations: vec![], - distribution_key: vec![], - stream_key: vec![], - owner: 0, - retention_seconds: None, - fragment_id: 0, - dml_fragment_id: None, - initialized_at_epoch: None, - vnode_col_index: None, - value_indices: vec![], - definition: "".to_string(), - handle_pk_conflict_behavior: 0, - version_column_index: None, - read_prefix_len_hint: 0, - optional_associated_source_id: None, - table_type: 0, - append_only: false, - row_id_index: None, - version: None, - watermark_indices: vec![], - dist_key_in_pk: vec![], - cardinality: None, - created_at_epoch: None, - cleaned_by_watermark: false, - stream_job_status: PbStreamJobStatus::Created.into(), - create_type: PbCreateType::Foreground.into(), - description: None, - incoming_sinks: vec![], - initialized_at_cluster_version: None, - created_at_cluster_version: None, - cdc_table_id: None, - }; - let mut delete_range_table = delete_key_table.clone(); - delete_range_table.id = 2; - delete_range_table.name = "delete-range-table".to_string(); - let group1 = CompactionGroupInfo { - id: StaticCompactionGroupId::StateDefault as _, - parent_id: 0, - member_table_ids: vec![1], - compaction_config: Some(compaction_config.clone()), - }; - let group2 = CompactionGroupInfo { - id: StaticCompactionGroupId::MaterializedView as _, - parent_id: 0, - member_table_ids: vec![2], - compaction_config: Some(compaction_config.clone()), - }; - hummock_manager_ref - .init_metadata_for_version_replay( - vec![delete_key_table, delete_range_table], - vec![group1, group2], - ) - .await?; - - let system_params = SystemParams { - sstable_size_mb: Some(128), - parallel_compact_size_mb: Some(512), - block_size_kb: Some(1024), - bloom_false_positive: Some(0.001), - data_directory: Some("hummock_001".to_string()), - backup_storage_url: Some("memory".to_string()), - backup_storage_directory: Some("backup".to_string()), - ..Default::default() - } - .into(); - let storage_memory_config = extract_storage_memory_config(&config); - let storage_opts = Arc::new(StorageOpts::from(( - &config, - &system_params, - &storage_memory_config, - ))); - let state_store_metrics = Arc::new(HummockStateStoreMetrics::unused()); - let compactor_metrics = Arc::new(CompactorMetrics::unused()); - let object_store_metrics = Arc::new(ObjectStoreMetrics::unused()); - let remote_object_store = build_remote_object_store( - state_store_type.strip_prefix("hummock+").unwrap(), - object_store_metrics.clone(), - "Hummock", - Arc::new(ObjectStoreConfig::default()), - ) - .await; - let meta_cache = HybridCacheBuilder::new() - .memory(storage_memory_config.meta_cache_capacity_mb * (1 << 20)) - .with_shards(storage_memory_config.meta_cache_shard_num) - .storage() - .build() - .await?; - let block_cache = HybridCacheBuilder::new() - .memory(storage_memory_config.block_cache_capacity_mb * (1 << 20)) - .with_shards(storage_memory_config.block_cache_shard_num) - .storage() - .build() - .await?; - let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig { - store: Arc::new(remote_object_store), - path: system_params.data_directory().to_string(), - prefetch_buffer_capacity: storage_memory_config.prefetch_buffer_capacity_mb * (1 << 20), - max_prefetch_block_number: storage_opts.max_prefetch_block_number, - recent_filter: None, - state_store_metrics: state_store_metrics.clone(), - use_new_object_prefix_strategy: system_params.use_new_object_prefix_strategy(), - meta_cache, - block_cache, - })); - - let store = HummockStorage::new( - storage_opts.clone(), - sstable_store.clone(), - meta_client.clone(), - get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node), - Arc::new(RpcFilterKeyExtractorManager::default()), - state_store_metrics.clone(), - compactor_metrics.clone(), - None, - ) - .await?; - let sstable_object_id_manager = store.sstable_object_id_manager().clone(); - let filter_key_extractor_manager = match store.filter_key_extractor_manager().clone() { - FilterKeyExtractorManager::RpcFilterKeyExtractorManager( - rpc_filter_key_extractor_manager, - ) => rpc_filter_key_extractor_manager, - FilterKeyExtractorManager::StaticFilterKeyExtractorManager(_) => unreachable!(), - }; - - filter_key_extractor_manager.update( - 1, - Arc::new(FilterKeyExtractorImpl::FullKey( - FullKeyFilterKeyExtractor {}, - )), - ); - filter_key_extractor_manager.update( - 2, - Arc::new(FilterKeyExtractorImpl::FullKey( - FullKeyFilterKeyExtractor {}, - )), - ); - - let (compactor_thrd, compactor_shutdown_tx) = run_compactor_thread( - storage_opts, - sstable_store, - meta_client.clone(), - filter_key_extractor_manager, - sstable_object_id_manager, - compactor_metrics, - ); - run_compare_result( - &store, - meta_client.clone(), - test_range, - test_count, - test_delete_ratio, - ) - .await - .unwrap(); - let version = store.get_pinned_version().version().clone(); - let remote_version = meta_client.get_current_version().await.unwrap(); - println!( - "version-{}, remote version-{}", - version.id, remote_version.id - ); - for (group, levels) in &version.levels { - let l0 = &levels.l0; - println!( - "group-{}: l0 sz: {}, count: {}", - group, - l0.total_file_size, - l0.sub_levels - .iter() - .map(|level| level.table_infos.len()) - .sum::() - ); - } - - compactor_shutdown_tx.send(()).unwrap(); - compactor_thrd.await.unwrap(); - Ok(()) -} - -async fn run_compare_result( - hummock: &HummockStorage, - meta_client: Arc, - test_range: u64, - test_count: u64, - test_delete_ratio: u32, -) -> Result<(), String> { - let init_epoch = test_epoch(hummock.get_pinned_version().max_committed_epoch() + 1); - - let mut normal = NormalState::new(hummock, 1, init_epoch).await; - let mut delete_range = DeleteRangeState::new(hummock, 2, init_epoch).await; - let table_id_set = HashSet::from_iter([1.into(), 2.into()]); - const RANGE_BASE: u64 = 4000; - let range_mod = test_range / RANGE_BASE; - - let seed = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs(); - println!("========== run with seed: {}", seed); - let mut rng = StdRng::seed_from_u64(seed); - let mut overlap_ranges = vec![]; - for epoch_idx in 0..test_count { - let epoch = test_epoch(init_epoch / test_epoch(1) + epoch_idx); - for idx in 0..1000 { - let op = rng.next_u32() % 50; - let key_number = rng.next_u64() % test_range; - if op < test_delete_ratio { - let end_key = key_number + (rng.next_u64() % range_mod) + 1; - overlap_ranges.push((key_number, end_key, epoch, idx)); - let start_key = format!("\0\0{:010}", key_number); - let end_key = format!("\0\0{:010}", end_key); - normal - .delete_range(start_key.as_bytes(), end_key.as_bytes()) - .await; - delete_range - .delete_range(start_key.as_bytes(), end_key.as_bytes()) - .await; - } else if op < test_delete_ratio + 5 { - let key = format!("\0\0{:010}", key_number); - let a = normal.get(key.as_bytes()).await; - let b = delete_range.get(key.as_bytes()).await; - assert!( - a.eq(&b), - "query {} {:?} vs {:?} in epoch-{}", - key_number, - a.map(|raw| String::from_utf8(raw.to_vec()).unwrap()), - b.map(|raw| String::from_utf8(raw.to_vec()).unwrap()), - epoch, - ); - } else if op < test_delete_ratio + 10 { - let end_key = key_number + (rng.next_u64() % range_mod) + 1; - let start_key = format!("\0\0{:010}", key_number); - let end_key = format!("\0\0{:010}", end_key); - let ret1 = normal.scan(start_key.as_bytes(), end_key.as_bytes()).await; - let ret2 = delete_range - .scan(start_key.as_bytes(), end_key.as_bytes()) - .await; - assert_eq!(ret1, ret2); - } else { - let overlap = overlap_ranges - .iter() - .any(|(left, right, _, _)| *left <= key_number && key_number < *right); - if overlap { - continue; - } - let key = format!("\0\0{:010}", key_number); - let val = format!("val-{:010}-{:016}-{:016}", idx, key_number, epoch); - normal.insert(key.as_bytes(), val.as_bytes()); - delete_range.insert(key.as_bytes(), val.as_bytes()); - } - } - let next_epoch = epoch.next_epoch(); - normal.commit(next_epoch).await?; - delete_range.commit(next_epoch).await?; - // let checkpoint = epoch % 10 == 0; - let ret = hummock - .seal_and_sync_epoch(epoch, table_id_set.clone()) - .await - .unwrap(); - meta_client - .commit_epoch(epoch, ret, false) - .await - .map_err(|e| format!("{:?}", e))?; - if (epoch / test_epoch(1)) % 200 == 0 { - tokio::time::sleep(Duration::from_secs(1)).await; - } - } - Ok(()) -} - -struct NormalState { - storage: ::Local, - table_id: TableId, -} - -struct DeleteRangeState { - inner: NormalState, - delete_ranges: Vec<(Bound, Bound)>, -} - -impl DeleteRangeState { - async fn new(hummock: &HummockStorage, table_id: u32, epoch: u64) -> Self { - Self { - inner: NormalState::new(hummock, table_id, epoch).await, - delete_ranges: vec![], - } - } -} - -#[async_trait::async_trait] -trait CheckState { - async fn delete_range(&mut self, left: &[u8], right: &[u8]); - async fn get(&self, key: &[u8]) -> Option; - async fn scan(&self, left: &[u8], right: &[u8]) -> Vec<(Bytes, Bytes)>; - fn insert(&mut self, key: &[u8], val: &[u8]); - async fn commit(&mut self, epoch: u64) -> Result<(), String>; -} - -impl NormalState { - async fn new(hummock: &HummockStorage, table_id: u32, epoch: u64) -> Self { - let table_id = TableId::new(table_id); - let mut storage = hummock.new_local(NewLocalOptions::for_test(table_id)).await; - storage.init_for_test(epoch).await.unwrap(); - Self { storage, table_id } - } - - async fn commit_impl( - &mut self, - _delete_ranges: Vec<(Bound, Bound)>, - next_epoch: u64, - ) -> Result<(), String> { - // self.storage - // .flush(delete_ranges) - // .await - // .map_err(|e| format!("{:?}", e))?; - self.storage.flush().await.map_err(|e| format!("{:?}", e))?; - self.storage - .seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); - Ok(()) - } - - async fn get_impl(&self, key: &[u8], ignore_range_tombstone: bool) -> Option { - self.storage - .get( - TableKey(Bytes::copy_from_slice(key)), - ReadOptions { - ignore_range_tombstone, - table_id: self.table_id, - cache_policy: CachePolicy::Fill(CacheContext::Default), - ..Default::default() - }, - ) - .await - .unwrap() - } - - async fn scan_impl( - &self, - left: &[u8], - right: &[u8], - ignore_range_tombstone: bool, - ) -> Vec<(Bytes, Bytes)> { - let mut iter = pin!(self - .storage - .iter( - ( - Bound::Included(TableKey(Bytes::copy_from_slice(left))), - Bound::Excluded(TableKey(Bytes::copy_from_slice(right))), - ), - ReadOptions { - ignore_range_tombstone, - table_id: self.table_id, - read_version_from_backup: false, - prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CacheContext::Default), - ..Default::default() - }, - ) - .await - .unwrap(),); - let mut ret = vec![]; - while let Some(item) = iter.try_next().await.unwrap() { - let (full_key, val) = item; - let tkey = Bytes::copy_from_slice(full_key.user_key.table_key.0); - ret.push((tkey, Bytes::copy_from_slice(val))); - } - ret - } -} - -#[async_trait::async_trait] -impl CheckState for NormalState { - async fn delete_range(&mut self, left: &[u8], right: &[u8]) { - let mut iter = self - .storage - .iter( - ( - Bound::Included(Bytes::copy_from_slice(left)).map(TableKey), - Bound::Excluded(Bytes::copy_from_slice(right)).map(TableKey), - ), - ReadOptions { - ignore_range_tombstone: true, - table_id: self.table_id, - read_version_from_backup: false, - prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CacheContext::Default), - ..Default::default() - }, - ) - .await - .unwrap(); - let mut delete_item = Vec::new(); - while let Some(item) = iter.try_next().await.unwrap() { - let (full_key, value) = item; - delete_item.push(( - full_key.user_key.table_key.copy_into(), - Bytes::copy_from_slice(value), - )); - } - drop(iter); - for (key, value) in delete_item { - self.storage.delete(key, value).unwrap(); - } - } - - fn insert(&mut self, key: &[u8], val: &[u8]) { - self.storage - .insert( - TableKey(Bytes::from(key.to_vec())), - Bytes::copy_from_slice(val), - None, - ) - .unwrap(); - } - - async fn get(&self, key: &[u8]) -> Option { - self.get_impl(key, true).await - } - - async fn scan(&self, left: &[u8], right: &[u8]) -> Vec<(Bytes, Bytes)> { - self.scan_impl(left, right, true).await - } - - async fn commit(&mut self, next_epoch: u64) -> Result<(), String> { - self.commit_impl(vec![], next_epoch).await - } -} - -#[async_trait::async_trait] -impl CheckState for DeleteRangeState { - async fn delete_range(&mut self, left: &[u8], right: &[u8]) { - self.delete_ranges.push(( - Bound::Included(Bytes::copy_from_slice(left)), - Bound::Excluded(Bytes::copy_from_slice(right)), - )); - } - - async fn get(&self, key: &[u8]) -> Option { - for delete_range in &self.delete_ranges { - if delete_range.contains(key) { - return None; - } - } - self.inner.get_impl(key, false).await - } - - async fn scan(&self, left: &[u8], right: &[u8]) -> Vec<(Bytes, Bytes)> { - let mut ret = self.inner.scan_impl(left, right, false).await; - ret.retain(|(key, _)| { - for delete_range in &self.delete_ranges { - if delete_range.contains(key) { - return false; - } - } - true - }); - ret - } - - fn insert(&mut self, key: &[u8], val: &[u8]) { - self.inner.insert(key, val); - } - - async fn commit(&mut self, next_epoch: u64) -> Result<(), String> { - let mut delete_ranges = std::mem::take(&mut self.delete_ranges); - delete_ranges.sort_by(|a, b| cmp_delete_range_left_bounds(a.0.as_ref(), b.0.as_ref())); - self.inner.commit_impl(delete_ranges, next_epoch).await - } -} - -fn run_compactor_thread( - storage_opts: Arc, - sstable_store: SstableStoreRef, - meta_client: Arc, - filter_key_extractor_manager: Arc, - sstable_object_id_manager: Arc, - compactor_metrics: Arc, -) -> ( - tokio::task::JoinHandle<()>, - tokio::sync::oneshot::Sender<()>, -) { - let filter_key_extractor_manager = - FilterKeyExtractorManager::RpcFilterKeyExtractorManager(filter_key_extractor_manager); - let compactor_context = CompactorContext { - storage_opts, - sstable_store, - compactor_metrics, - is_share_buffer_compact: false, - compaction_executor: Arc::new(CompactionExecutor::new(None)), - memory_limiter: MemoryLimiter::unlimit(), - task_progress_manager: Default::default(), - await_tree_reg: None, - }; - - start_compactor( - compactor_context, - meta_client, - sstable_object_id_manager, - filter_key_extractor_manager, - ) -} - -#[cfg(test)] -mod tests { - - use risingwave_common::config::RwConfig; - use risingwave_meta::hummock::compaction::compaction_config::CompactionConfigBuilder; - - use super::compaction_test; - - #[ignore] - // TODO: may modify the test to use per vnode table watermark - #[tokio::test(flavor = "multi_thread", worker_threads = 3)] - async fn test_small_data() { - let config = RwConfig::default(); - let mut compaction_config = CompactionConfigBuilder::new().build(); - compaction_config.max_sub_compaction = 1; - compaction_config.level0_tier_compact_file_number = 2; - compaction_config.max_bytes_for_level_base = 512 * 1024; - compaction_config.sub_level_max_compaction_bytes = 256 * 1024; - compaction_test( - compaction_config.clone(), - config.clone(), - "hummock+memory", - 1000000, - 60, - 10, - ) - .await - .unwrap(); - } -} diff --git a/src/tests/compaction_test/src/lib.rs b/src/tests/compaction_test/src/lib.rs index e5fd10b10b176..70f6e20b62adc 100644 --- a/src/tests/compaction_test/src/lib.rs +++ b/src/tests/compaction_test/src/lib.rs @@ -27,10 +27,8 @@ #![allow(rw::format_error)] // test code mod compaction_test_runner; -mod delete_range_runner; use clap::Parser; -pub use delete_range_runner::start_delete_range; use crate::compaction_test_runner::compaction_test_main;