From 181a88abd4e6a77bdc9ec77e9a73d434d6e2ec65 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Tue, 28 Nov 2023 20:42:05 +0800 Subject: [PATCH] fix epoch Signed-off-by: Little-Wallace --- src/storage/src/hummock/compactor/compactor_runner.rs | 3 ++- .../src/hummock/iterator/delete_range_iterator.rs | 9 +++++---- src/storage/src/hummock/sstable/builder.rs | 9 +++++---- .../src/hummock/sstable/delete_range_aggregator.rs | 5 +++-- src/storage/src/hummock/sstable/multi_builder.rs | 11 ++++++----- 5 files changed, 21 insertions(+), 16 deletions(-) diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 9afef69151a8a..21d58d8cea994 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -19,6 +19,7 @@ use await_tree::InstrumentAwait; use bytes::Bytes; use futures::{stream, FutureExt, StreamExt}; use itertools::Itertools; +use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::compact::{ compact_task_to_string, estimate_memory_for_compact_task, statistics_compact_task, }; @@ -675,7 +676,7 @@ where del_iter.seek(full_key.user_key); if !task_config.gc_delete_keys && del_iter.is_valid() - && del_iter.earliest_epoch() != HummockEpoch::MAX + && del_iter.earliest_epoch() < MAX_EPOCH { sst_builder .add_monotonic_delete(MonotonicDeleteEvent { diff --git a/src/storage/src/hummock/iterator/delete_range_iterator.rs b/src/storage/src/hummock/iterator/delete_range_iterator.rs index 4d943ce63d3fc..fc6649fe1cb3a 100644 --- a/src/storage/src/hummock/iterator/delete_range_iterator.rs +++ b/src/storage/src/hummock/iterator/delete_range_iterator.rs @@ -15,6 +15,7 @@ use std::collections::{BTreeSet, BinaryHeap}; use std::future::Future; +use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::key::{PointRange, UserKey}; use risingwave_hummock_sdk::HummockEpoch; use risingwave_pb::hummock::SstableInfo; @@ -299,7 +300,7 @@ impl DeleteRangeIterator for ForwardMergeRangeIterator { } for node in &self.tmp_buffer { let epoch = node.current_epoch(); - if epoch != HummockEpoch::MAX { + if epoch < MAX_EPOCH { self.current_epochs.remove(&epoch); } } @@ -308,7 +309,7 @@ impl DeleteRangeIterator for ForwardMergeRangeIterator { node.next().await?; if node.is_valid() { let epoch = node.current_epoch(); - if epoch != HummockEpoch::MAX { + if epoch < MAX_EPOCH { self.current_epochs.insert(epoch); } self.heap.push(node); @@ -329,7 +330,7 @@ impl DeleteRangeIterator for ForwardMergeRangeIterator { node.rewind().await?; if node.is_valid() { let epoch = node.current_epoch(); - if epoch != HummockEpoch::MAX { + if epoch < MAX_EPOCH { self.current_epochs.insert(epoch); } self.heap.push(node); @@ -348,7 +349,7 @@ impl DeleteRangeIterator for ForwardMergeRangeIterator { node.seek(target_user_key).await?; if node.is_valid() { let epoch = node.current_epoch(); - if epoch != HummockEpoch::MAX { + if epoch < MAX_EPOCH { self.current_epochs.insert(epoch); } self.heap.push(node); diff --git a/src/storage/src/hummock/sstable/builder.rs b/src/storage/src/hummock/sstable/builder.rs index bb6d190468f8c..6882ba187b304 100644 --- a/src/storage/src/hummock/sstable/builder.rs +++ b/src/storage/src/hummock/sstable/builder.rs @@ -17,6 +17,7 @@ use std::collections::BTreeSet; use std::sync::Arc; use bytes::{Bytes, BytesMut}; +use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::key::{user_key, FullKey, MAX_KEY_LEN}; use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap}; use risingwave_hummock_sdk::{HummockEpoch, KeyComparator, LocalSstableInfo}; @@ -197,9 +198,9 @@ impl SstableBuilder { if self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id { self.table_ids.insert(table_id); } - if event.new_epoch == HummockEpoch::MAX + if event.new_epoch >= MAX_EPOCH && self.monotonic_deletes.last().map_or(true, |last| { - last.new_epoch == HummockEpoch::MAX + last.new_epoch >= MAX_EPOCH && last.event_key.left_user_key.table_id == event.event_key.left_user_key.table_id }) @@ -207,7 +208,7 @@ impl SstableBuilder { // This range would never delete any key so we can merge it with last range. return; } - if event.new_epoch != HummockEpoch::MAX { + if event.new_epoch < MAX_EPOCH { self.epoch_set.insert(event.new_epoch); } self.range_tombstone_size += event.encoded_size(); @@ -493,7 +494,7 @@ impl SstableBuilder { let mut tombstone_max_epoch = u64::MIN; for monotonic_delete in &meta.monotonic_tombstone_events { - if monotonic_delete.new_epoch != HummockEpoch::MAX { + if monotonic_delete.new_epoch < MAX_EPOCH { tombstone_min_epoch = cmp::min(tombstone_min_epoch, monotonic_delete.new_epoch); tombstone_max_epoch = cmp::max(tombstone_max_epoch, monotonic_delete.new_epoch); } diff --git a/src/storage/src/hummock/sstable/delete_range_aggregator.rs b/src/storage/src/hummock/sstable/delete_range_aggregator.rs index 2b82e1f3c17f9..952283ce8cff4 100644 --- a/src/storage/src/hummock/sstable/delete_range_aggregator.rs +++ b/src/storage/src/hummock/sstable/delete_range_aggregator.rs @@ -18,6 +18,7 @@ use std::future::Future; use std::sync::Arc; use itertools::Itertools; +use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::key::{PointRange, UserKey}; use risingwave_hummock_sdk::HummockEpoch; @@ -172,13 +173,13 @@ impl CompactionDeleteRangesBuilder { for monotonic_deletes in self.events { let mut last_exit_epoch = HummockEpoch::MAX; for delete_event in monotonic_deletes { - if last_exit_epoch != HummockEpoch::MAX { + if last_exit_epoch < MAX_EPOCH { let entry = ret.entry(delete_event.event_key.clone()).or_default(); entry.0.push(TombstoneEnterExitEvent { tombstone_epoch: last_exit_epoch, }); } - if delete_event.new_epoch != HummockEpoch::MAX { + if delete_event.new_epoch < MAX_EPOCH { let entry = ret.entry(delete_event.event_key).or_default(); entry.1.push(TombstoneEnterExitEvent { tombstone_epoch: delete_event.new_epoch, diff --git a/src/storage/src/hummock/sstable/multi_builder.rs b/src/storage/src/hummock/sstable/multi_builder.rs index 4baabb4fdafe6..cc9ea5c4d261d 100644 --- a/src/storage/src/hummock/sstable/multi_builder.rs +++ b/src/storage/src/hummock/sstable/multi_builder.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use bytes::Bytes; use num_integer::Integer; use risingwave_common::hash::VirtualNode; +use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::key::{FullKey, PointRange, UserKey}; use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo}; use tokio::task::JoinHandle; @@ -214,7 +215,7 @@ where } if need_seal_current && let Some(event) = builder.last_range_tombstone() - && event.new_epoch != HummockEpoch::MAX + && event.new_epoch < MAX_EPOCH { last_range_tombstone_epoch = event.new_epoch; if event @@ -247,7 +248,7 @@ where let mut builder = self.builder_factory.open_builder().await?; // If last_range_tombstone_epoch is not MAX, it means that we cut one range-tombstone to // two half and add the right half as a new range to next sstable. - if need_seal_current && last_range_tombstone_epoch != HummockEpoch::MAX { + if need_seal_current && last_range_tombstone_epoch < MAX_EPOCH { builder.add_monotonic_delete(MonotonicDeleteEvent { event_key: PointRange::from_user_key(full_key.user_key.to_vec(), false), new_epoch: last_range_tombstone_epoch, @@ -305,9 +306,9 @@ where pub async fn add_monotonic_delete(&mut self, event: MonotonicDeleteEvent) -> HummockResult<()> { if let Some(builder) = self.current_builder.as_mut() && builder.reach_capacity() - && event.new_epoch != HummockEpoch::MAX + && event.new_epoch < MAX_EPOCH { - if builder.last_range_tombstone_epoch() != HummockEpoch::MAX { + if builder.last_range_tombstone_epoch() < MAX_EPOCH { builder.add_monotonic_delete(MonotonicDeleteEvent { event_key: event.event_key.clone(), new_epoch: HummockEpoch::MAX, @@ -317,7 +318,7 @@ where } if self.current_builder.is_none() { - if event.new_epoch == HummockEpoch::MAX { + if event.new_epoch >= MAX_EPOCH { return Ok(()); }