Skip to content

Commit

Permalink
refactor(storage): deprecate code of range delete (#18525)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Sep 18, 2024
1 parent 6ca719f commit 3ee3b2c
Show file tree
Hide file tree
Showing 23 changed files with 71 additions and 2,289 deletions.
2 changes: 1 addition & 1 deletion ci/scripts/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ cargo build \
--timings


artifacts=(risingwave sqlsmith compaction-test risingwave_regress_test risingwave_e2e_extended_mode_test risedev-dev delete-range-test)
artifacts=(risingwave sqlsmith compaction-test risingwave_regress_test risingwave_e2e_extended_mode_test risedev-dev)

echo "--- Show link info"
ldd target/"$profile"/risingwave
Expand Down
8 changes: 0 additions & 8 deletions src/ctl/src/cmd_impl/hummock/sst_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,6 @@ pub async fn sst_dump_via_sstable_store(
println!("Bloom Filter Size: {}", sstable_meta.bloom_filter.len());
println!("Key Count: {}", sstable_meta.key_count);
println!("Version: {}", sstable_meta.version);
println!(
"Monotonoic Deletes Count: {}",
sstable_meta.monotonic_tombstone_events.len()
);
for monotonic_delete in &sstable_meta.monotonic_tombstone_events {
println!("\tevent key: {:?}", monotonic_delete.event_key);
println!("\tnew epoch: {:?}", monotonic_delete.new_epoch);
}

println!("Block Count: {}", sstable.block_count());
for i in 0..sstable.block_count() {
Expand Down
108 changes: 40 additions & 68 deletions src/storage/hummock_sdk/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use bytes::{Buf, BufMut, Bytes, BytesMut};
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
use risingwave_common_estimate_size::EstimateSize;
use serde::{Deserialize, Serialize};

use crate::{EpochWithGap, HummockEpoch};

Expand Down Expand Up @@ -441,14 +440,8 @@ impl CopyFromSlice for Bytes {
///
/// Its name come from the assumption that Hummock is always accessed by a table-like structure
/// identified by a [`TableId`].
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Serialize, Deserialize)]
pub struct TableKey<T: AsRef<[u8]>>(
#[serde(bound(
serialize = "T: serde::Serialize + serde_bytes::Serialize",
deserialize = "T: serde::Deserialize<'de> + serde_bytes::Deserialize<'de>"
))]
pub T,
);
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub struct TableKey<T: AsRef<[u8]>>(pub T);

impl<T: AsRef<[u8]>> Debug for TableKey<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand Down Expand Up @@ -542,15 +535,11 @@ pub fn gen_key_from_str(vnode: VirtualNode, payload: &str) -> TableKey<Bytes> {
/// will group these two values into one struct for convenient filtering.
///
/// The encoded format is | `table_id` | `table_key` |.
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Serialize, Deserialize)]
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub struct UserKey<T: AsRef<[u8]>> {
// When comparing `UserKey`, we first compare `table_id`, then `table_key`. So the order of
// declaration matters.
pub table_id: TableId,
#[serde(bound(
serialize = "T: serde::Serialize + serde_bytes::Serialize",
deserialize = "T: serde::Deserialize<'de> + serde_bytes::Deserialize<'de>"
))]
pub table_key: TableKey<T>,
}

Expand Down Expand Up @@ -590,15 +579,6 @@ impl<T: AsRef<[u8]>> UserKey<T> {
buf.put_slice(self.table_key.as_ref());
}

/// Encode in to a buffer.
///
/// length prefixed requires 4B more than its `encoded_len()`
pub fn encode_length_prefixed(&self, mut buf: impl BufMut) {
buf.put_u32(self.table_id.table_id());
buf.put_u32(self.table_key.as_ref().len() as u32);
buf.put_slice(self.table_key.as_ref());
}

pub fn encode(&self) -> Vec<u8> {
let mut ret = Vec::with_capacity(TABLE_PREFIX_LEN + self.table_key.as_ref().len());
self.encode_into(&mut ret);
Expand Down Expand Up @@ -658,16 +638,6 @@ impl<T: AsRef<[u8]>> UserKey<T> {
}
}

impl UserKey<Vec<u8>> {
pub fn decode_length_prefixed(buf: &mut &[u8]) -> Self {
let table_id = buf.get_u32();
let len = buf.get_u32() as usize;
let data = buf[..len].to_vec();
buf.advance(len);
UserKey::new(TableId::new(table_id), TableKey(data))
}
}

impl<T: AsRef<[u8]>> UserKey<T> {
/// Use this method to override an old `UserKey<Vec<u8>>` with a `UserKey<&[u8]>` to own the
/// table key without reallocating a new `UserKey` object.
Expand Down Expand Up @@ -882,48 +852,50 @@ impl<T: AsRef<[u8]> + Ord + Eq> PartialOrd for FullKey<T> {
}
}

#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct PointRange<T: AsRef<[u8]>> {
// When comparing `PointRange`, we first compare `left_user_key`, then
// `is_exclude_left_key`. Therefore the order of declaration matters.
#[serde(bound(
serialize = "T: serde::Serialize + serde_bytes::Serialize",
deserialize = "T: serde::Deserialize<'de> + serde_bytes::Deserialize<'de>"
))]
pub left_user_key: UserKey<T>,
/// `PointRange` represents the left user key itself if `is_exclude_left_key==false`
/// while represents the right δ Neighborhood of the left user key if
/// `is_exclude_left_key==true`.
pub is_exclude_left_key: bool,
}
pub mod range_delete_backward_compatibility_serde_struct {
use bytes::{Buf, BufMut};
use risingwave_common::catalog::TableId;
use serde::{Deserialize, Serialize};

impl<T: AsRef<[u8]>> PointRange<T> {
pub fn from_user_key(left_user_key: UserKey<T>, is_exclude_left_key: bool) -> Self {
Self {
left_user_key,
is_exclude_left_key,
}
}
#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
pub struct TableKey(Vec<u8>);

pub fn as_ref(&self) -> PointRange<&[u8]> {
PointRange::from_user_key(self.left_user_key.as_ref(), self.is_exclude_left_key)
#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
pub struct UserKey {
// When comparing `UserKey`, we first compare `table_id`, then `table_key`. So the order of
// declaration matters.
pub table_id: TableId,
pub table_key: TableKey,
}

pub fn is_empty(&self) -> bool {
self.left_user_key.is_empty()
}
}
impl UserKey {
pub fn decode_length_prefixed(buf: &mut &[u8]) -> Self {
let table_id = buf.get_u32();
let len = buf.get_u32() as usize;
let data = buf[..len].to_vec();
buf.advance(len);
UserKey {
table_id: TableId::new(table_id),
table_key: TableKey(data),
}
}

impl<'a> PointRange<&'a [u8]> {
pub fn to_vec(&self) -> PointRange<Vec<u8>> {
self.copy_into()
pub fn encode_length_prefixed(&self, mut buf: impl BufMut) {
buf.put_u32(self.table_id.table_id());
buf.put_u32(self.table_key.0.as_slice().len() as u32);
buf.put_slice(self.table_key.0.as_slice());
}
}

pub fn copy_into<T: CopyFromSlice + AsRef<[u8]>>(&self) -> PointRange<T> {
PointRange {
left_user_key: self.left_user_key.copy_into(),
is_exclude_left_key: self.is_exclude_left_key,
}
#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
pub struct PointRange {
// When comparing `PointRange`, we first compare `left_user_key`, then
// `is_exclude_left_key`. Therefore the order of declaration matters.
pub left_user_key: UserKey,
/// `PointRange` represents the left user key itself if `is_exclude_left_key==false`
/// while represents the right δ Neighborhood of the left user key if
/// `is_exclude_left_key==true`.
pub is_exclude_left_key: bool,
}
}

Expand Down
1 change: 0 additions & 1 deletion src/storage/hummock_test/benches/bench_hummock_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ fn criterion_benchmark(c: &mut Criterion) {
(Unbounded, Unbounded),
epoch,
ReadOptions {
ignore_range_tombstone: true,
prefetch_options: PrefetchOptions::default(),
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
Expand Down
2 changes: 0 additions & 2 deletions src/storage/hummock_trace/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ impl From<TracedTableId> for TableId {
#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
pub struct TracedReadOptions {
pub prefix_hint: Option<TracedBytes>,
pub ignore_range_tombstone: bool,
pub prefetch_options: TracedPrefetchOptions,
pub cache_policy: TracedCachePolicy,

Expand All @@ -116,7 +115,6 @@ impl TracedReadOptions {
pub fn for_test(table_id: u32) -> Self {
Self {
prefix_hint: Some(TracedBytes::from(vec![0])),
ignore_range_tombstone: true,
prefetch_options: TracedPrefetchOptions {
prefetch: true,
for_large_query: true,
Expand Down
13 changes: 2 additions & 11 deletions src/storage/src/hummock/compactor/compaction_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,13 @@ use crate::hummock::compactor::{
TtlCompactionFilter,
};
use crate::hummock::iterator::{
Forward, ForwardMergeRangeIterator, HummockIterator, MergeIterator, SkipWatermarkIterator,
UserIterator,
Forward, HummockIterator, MergeIterator, SkipWatermarkIterator, UserIterator,
};
use crate::hummock::multi_builder::TableBuilderFactory;
use crate::hummock::sstable::DEFAULT_ENTRY_SIZE;
use crate::hummock::{
CachePolicy, FilterBuilder, GetObjectId, HummockResult, MemoryLimiter, SstableBuilder,
SstableBuilderOptions, SstableDeleteRangeIterator, SstableWriterFactory, SstableWriterOptions,
SstableBuilderOptions, SstableWriterFactory, SstableWriterOptions,
};
use crate::monitor::StoreLocalStatistic;

Expand Down Expand Up @@ -349,7 +348,6 @@ pub async fn check_compaction_result(
}

let mut table_iters = Vec::new();
let mut del_iter = ForwardMergeRangeIterator::default();
for level in &compact_task.input_ssts {
if level.table_infos.is_empty() {
continue;
Expand All @@ -358,7 +356,6 @@ pub async fn check_compaction_result(
// Do not need to filter the table because manager has done it.
if level.level_type == PbLevelType::Nonoverlapping {
debug_assert!(can_concat(&level.table_infos));
del_iter.add_concat_iter(level.table_infos.clone(), context.sstable_store.clone());

table_iters.push(ConcatSstableIterator::new(
compact_task.existing_table_ids.clone(),
Expand All @@ -369,13 +366,7 @@ pub async fn check_compaction_result(
context.storage_opts.compactor_iter_max_io_retry_times,
));
} else {
let mut stats = StoreLocalStatistic::default();
for table_info in &level.table_infos {
let table = context
.sstable_store
.sstable(table_info, &mut stats)
.await?;
del_iter.add_sst_iter(SstableDeleteRangeIterator::new(table));
table_iters.push(ConcatSstableIterator::new(
compact_task.existing_table_ids.clone(),
vec![table_info.clone()],
Expand Down
Loading

0 comments on commit 3ee3b2c

Please sign in to comment.