diff --git a/src/common/src/util/epoch.rs b/src/common/src/util/epoch.rs index 9b6d9db8a68fd..e5478fae769bb 100644 --- a/src/common/src/util/epoch.rs +++ b/src/common/src/util/epoch.rs @@ -118,6 +118,17 @@ pub const EPOCH_AVAILABLE_BITS: u64 = 16; pub const MAX_SPILL_TIMES: u16 = ((1 << EPOCH_AVAILABLE_BITS) - 1) as u16; pub const EPOCH_MASK: u64 = (1 << EPOCH_AVAILABLE_BITS) - 1; pub const MAX_EPOCH: u64 = u64::MAX & !EPOCH_MASK; + +pub fn is_max_epoch(epoch: u64) -> bool { + // Since we have write `MAX_EPOCH` as max epoch to sstable in some previous version, + // it means that there may be two value in our system which represent infinite. We must check + // both of them for compatibility. See bug description in https://github.com/risingwavelabs/risingwave/issues/13717 + epoch >= MAX_EPOCH +} +pub fn is_compatibility_max_epoch(epoch: u64) -> bool { + // See bug description in https://github.com/risingwavelabs/risingwave/issues/13717 + epoch == MAX_EPOCH +} impl From for Epoch { fn from(epoch: u64) -> Self { Self(epoch) diff --git a/src/ctl/src/cmd_impl/hummock/list_kv.rs b/src/ctl/src/cmd_impl/hummock/list_kv.rs index f336a4e3bcbcb..aa807852182dd 100644 --- a/src/ctl/src/cmd_impl/hummock/list_kv.rs +++ b/src/ctl/src/cmd_impl/hummock/list_kv.rs @@ -15,7 +15,7 @@ use core::ops::Bound::Unbounded; use risingwave_common::catalog::TableId; -use risingwave_common::util::epoch::MAX_EPOCH; +use risingwave_common::util::epoch::is_max_epoch; use risingwave_storage::hummock::CachePolicy; use risingwave_storage::store::{PrefetchOptions, ReadOptions, StateStoreReadExt}; @@ -31,8 +31,8 @@ pub async fn list_kv( let hummock = context .hummock_store(HummockServiceOpts::from_env(data_dir)?) .await?; - if epoch == MAX_EPOCH { - tracing::info!("using MAX_EPOCH as epoch"); + if is_max_epoch(epoch) { + tracing::info!("using MAX EPOCH as epoch"); } let scan_result = { let range = (Unbounded, Unbounded); diff --git a/src/ctl/src/cmd_impl/hummock/list_version_deltas.rs b/src/ctl/src/cmd_impl/hummock/list_version_deltas.rs index d82385568e36f..7e56078e77ed2 100644 --- a/src/ctl/src/cmd_impl/hummock/list_version_deltas.rs +++ b/src/ctl/src/cmd_impl/hummock/list_version_deltas.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::util::epoch::MAX_EPOCH; +use risingwave_hummock_sdk::HummockEpoch; use crate::CtlContext; @@ -23,7 +23,7 @@ pub async fn list_version_deltas( ) -> anyhow::Result<()> { let meta_client = context.meta_client().await?; let resp = meta_client - .list_version_deltas(start_id, num_epochs, MAX_EPOCH) + .list_version_deltas(start_id, num_epochs, HummockEpoch::MAX) .await?; println!("{:#?}", resp.version_deltas); Ok(()) diff --git a/src/ctl/src/cmd_impl/hummock/pause_resume.rs b/src/ctl/src/cmd_impl/hummock/pause_resume.rs index 30ad94df4c06e..d599ce2327861 100644 --- a/src/ctl/src/cmd_impl/hummock/pause_resume.rs +++ b/src/ctl/src/cmd_impl/hummock/pause_resume.rs @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionUpdateExt; +use risingwave_hummock_sdk::HummockEpoch; use crate::CtlContext; @@ -62,7 +62,7 @@ pub async fn replay_version(context: &CtlContext) -> anyhow::Result<()> { let mut current_delta_id = base_version.id + 1; loop { let deltas = meta_client - .list_version_deltas(current_delta_id, delta_fetch_size, MAX_EPOCH) + .list_version_deltas(current_delta_id, delta_fetch_size, HummockEpoch::MAX) .await .unwrap(); if deltas.version_deltas.is_empty() { diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index 281c9839afe1e..ff055b5e9d616 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -19,7 +19,7 @@ use anyhow::Result; use clap::{Args, Parser, Subcommand}; use cmd_impl::bench::BenchCommands; use cmd_impl::hummock::SstDumpArgs; -use risingwave_common::util::epoch::MAX_EPOCH; +use risingwave_hummock_sdk::HummockEpoch; use risingwave_meta::backup_restore::RestoreOpts; use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability; @@ -160,7 +160,7 @@ enum HummockCommands { DisableCommitEpoch, /// list all Hummock key-value pairs ListKv { - #[clap(short, long = "epoch", default_value_t = MAX_EPOCH)] + #[clap(short, long = "epoch", default_value_t = HummockEpoch::MAX)] epoch: u64, #[clap(short, long = "table-id")] diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index d9fd237a3c658..ea382a03e2c5a 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -15,7 +15,6 @@ use std::collections::HashMap; use risingwave_common::system_param::reader::SystemParamsReader; -use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_pb::backup_service::MetaSnapshotMetadata; use risingwave_pb::catalog::Table; use risingwave_pb::common::WorkerNode; @@ -230,7 +229,7 @@ impl FrontendMetaClient for FrontendMetaClientImpl { async fn list_version_deltas(&self) -> Result> { // FIXME #8612: there can be lots of version deltas, so better to fetch them by pages and refactor `SysRowSeqScanExecutor` to yield multiple chunks. self.0 - .list_version_deltas(0, u32::MAX, MAX_EPOCH) + .list_version_deltas(0, u32::MAX, u64::MAX) .await .map(|v| v.version_deltas) } diff --git a/src/meta/src/hummock/compaction/mod.rs b/src/meta/src/hummock/compaction/mod.rs index dfcc3076c7793..aa4d16a4e1dcf 100644 --- a/src/meta/src/hummock/compaction/mod.rs +++ b/src/meta/src/hummock/compaction/mod.rs @@ -17,7 +17,6 @@ pub mod compaction_config; mod overlap_strategy; use risingwave_common::catalog::TableOption; -use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::prost_key_range::KeyRangeExt; use risingwave_pb::hummock::compact_task::{self, TaskStatus, TaskType}; @@ -29,7 +28,9 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; use picker::{LevelCompactionPicker, TierCompactionPicker}; -use risingwave_hummock_sdk::{can_concat, CompactionGroupId, HummockCompactionTaskId}; +use risingwave_hummock_sdk::{ + can_concat, CompactionGroupId, HummockCompactionTaskId, HummockEpoch, +}; use risingwave_pb::hummock::compaction_config::CompactionMode; use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::{CompactTask, CompactionConfig, KeyRange, LevelType}; @@ -130,7 +131,7 @@ impl CompactStatus { let compact_task = CompactTask { input_ssts: ret.input.input_levels, splits: vec![KeyRange::inf()], - watermark: MAX_EPOCH, + watermark: HummockEpoch::MAX, sorted_output_ssts: vec![], task_id, target_level: target_level_id as u32, diff --git a/src/meta/src/hummock/metrics_utils.rs b/src/meta/src/hummock/metrics_utils.rs index 8b8598e10b93b..6818b7f68570e 100644 --- a/src/meta/src/hummock/metrics_utils.rs +++ b/src/meta/src/hummock/metrics_utils.rs @@ -19,12 +19,11 @@ use std::time::{SystemTime, UNIX_EPOCH}; use itertools::{enumerate, Itertools}; use prost::Message; -use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ object_size_map, BranchedSstInfo, HummockVersionExt, }; use risingwave_hummock_sdk::{ - CompactionGroupId, HummockContextId, HummockSstableObjectId, HummockVersionId, + CompactionGroupId, HummockContextId, HummockEpoch, HummockSstableObjectId, HummockVersionId, }; use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::write_limits::WriteLimit; @@ -345,7 +344,7 @@ pub fn trigger_pin_unpin_snapshot_state( { metrics.min_pinned_epoch.set(m as i64); } else { - metrics.min_pinned_epoch.set(MAX_EPOCH as _); + metrics.min_pinned_epoch.set(HummockEpoch::MAX as _); } } diff --git a/src/storage/hummock_sdk/src/lib.rs b/src/storage/hummock_sdk/src/lib.rs index c7d643f972b3c..c6cb930e62738 100644 --- a/src/storage/hummock_sdk/src/lib.rs +++ b/src/storage/hummock_sdk/src/lib.rs @@ -26,7 +26,7 @@ mod key_cmp; use std::cmp::Ordering; pub use key_cmp::*; -use risingwave_common::util::epoch::{EPOCH_MASK, MAX_EPOCH}; +use risingwave_common::util::epoch::EPOCH_MASK; use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch}; use risingwave_pb::hummock::SstableInfo; @@ -279,9 +279,14 @@ pub struct EpochWithGap(u64); impl EpochWithGap { #[allow(unused_variables)] pub fn new(epoch: u64, spill_offset: u16) -> Self { + // We only use 48 high bit to store epoch and use 16 low bit to store spill offset. But for MAX epoch, + // we still keep `u64::MAX` because we have use it in delete range and persist this value to sstable files. + // So for compatibility, we must skip checking it for u64::MAX. See bug description in https://github.com/risingwavelabs/risingwave/issues/13717 #[cfg(not(feature = "enable_test_epoch"))] { - debug_assert_eq!(epoch & EPOCH_MASK, 0); + debug_assert!( + ((epoch & EPOCH_MASK) == 0) || risingwave_common::util::epoch::is_max_epoch(epoch) + ); let epoch_with_gap = epoch + spill_offset as u64; EpochWithGap(epoch_with_gap) } @@ -300,7 +305,7 @@ impl EpochWithGap { } pub fn new_max_epoch() -> Self { - EpochWithGap(MAX_EPOCH) + EpochWithGap(HummockEpoch::MAX) } // return the epoch_with_gap(epoch + spill_offset) diff --git a/src/storage/hummock_test/benches/bench_hummock_iter.rs b/src/storage/hummock_test/benches/bench_hummock_iter.rs index 4ecc094eb8d7e..2e2efa14f872f 100644 --- a/src/storage/hummock_test/benches/bench_hummock_iter.rs +++ b/src/storage/hummock_test/benches/bench_hummock_iter.rs @@ -19,8 +19,8 @@ use bytes::Bytes; use criterion::{criterion_group, criterion_main, Criterion}; use futures::{pin_mut, TryStreamExt}; use risingwave_common::cache::CachePriority; -use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::key::TableKey; +use risingwave_hummock_sdk::HummockEpoch; use risingwave_hummock_test::get_notification_client_for_test; use risingwave_hummock_test::local_state_store_test_utils::LocalStateStoreTestExt; use risingwave_hummock_test::test_utils::TestIngestBatch; @@ -99,7 +99,7 @@ fn criterion_benchmark(c: &mut Criterion) { )) .unwrap(); } - hummock_storage.seal_current_epoch(MAX_EPOCH, SealCurrentEpochOptions::for_test()); + hummock_storage.seal_current_epoch(HummockEpoch::MAX, SealCurrentEpochOptions::for_test()); c.bench_function("bench-hummock-iter", move |b| { b.iter(|| { diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index dcd840d19c875..9f5d6d2d92d47 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -19,14 +19,14 @@ use await_tree::InstrumentAwait; use bytes::Bytes; use futures::{stream, FutureExt, StreamExt}; use itertools::Itertools; -use risingwave_common::util::epoch::MAX_EPOCH; +use risingwave_common::util::epoch::is_max_epoch; use risingwave_hummock_sdk::compact::{ compact_task_to_string, estimate_memory_for_compact_task, statistics_compact_task, }; use risingwave_hummock_sdk::key::{FullKey, PointRange}; use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon}; use risingwave_hummock_sdk::table_stats::{add_table_stats_map, TableStats, TableStatsMap}; -use risingwave_hummock_sdk::{can_concat, EpochWithGap}; +use risingwave_hummock_sdk::{can_concat, EpochWithGap, HummockEpoch}; use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType}; use risingwave_pb::hummock::{BloomFilterType, CompactTask, LevelType}; use tokio::sync::oneshot::Receiver; @@ -157,7 +157,7 @@ impl CompactorRunner { .context .storage_opts .compact_iter_recreate_timeout_ms; - let mut del_iter = ForwardMergeRangeIterator::new(MAX_EPOCH); + let mut del_iter = ForwardMergeRangeIterator::new(HummockEpoch::MAX); for level in &self.compact_task.input_ssts { if level.table_infos.is_empty() { @@ -657,7 +657,7 @@ where del_iter.seek(full_key.user_key).await?; if !task_config.gc_delete_keys && del_iter.is_valid() - && del_iter.earliest_epoch() != MAX_EPOCH + && !is_max_epoch(del_iter.earliest_epoch()) { sst_builder .add_monotonic_delete(MonotonicDeleteEvent { @@ -680,7 +680,7 @@ where let mut last_key = FullKey::default(); let mut watermark_can_see_last_key = false; - let mut user_key_last_delete_epoch = MAX_EPOCH; + let mut user_key_last_delete_epoch = HummockEpoch::MAX; let mut local_stats = StoreLocalStatistic::default(); // Keep table stats changes due to dropping KV. @@ -716,7 +716,7 @@ where } last_key.set(iter_key); watermark_can_see_last_key = false; - user_key_last_delete_epoch = MAX_EPOCH; + user_key_last_delete_epoch = HummockEpoch::MAX; if value.is_delete() { local_stats.skip_delete_key_count += 1; } @@ -843,7 +843,7 @@ where sst_builder .add_monotonic_delete(MonotonicDeleteEvent { event_key: extended_largest_user_key, - new_epoch: MAX_EPOCH, + new_epoch: HummockEpoch::MAX, }) .await?; break; @@ -964,7 +964,7 @@ mod tests { .cloned() .collect_vec(); - let mut iter = ForwardMergeRangeIterator::new(MAX_EPOCH); + let mut iter = ForwardMergeRangeIterator::new(HummockEpoch::MAX); iter.add_concat_iter(sstable_infos, sstable_store); let ret = CompactionDeleteRangeIterator::new(iter) diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index 7d75c9adf6d49..b0efa1ac067da 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -23,11 +23,10 @@ use itertools::Itertools; use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; -use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::key::{FullKey, PointRange, TableKey, UserKey}; use risingwave_hummock_sdk::key_range::KeyRange; -use risingwave_hummock_sdk::{CompactionGroupId, EpochWithGap, LocalSstableInfo}; +use risingwave_hummock_sdk::{CompactionGroupId, EpochWithGap, HummockEpoch, LocalSstableInfo}; use risingwave_pb::hummock::compact_task; use tracing::error; @@ -241,7 +240,7 @@ async fn compact_shared_buffer( Box::new(sstable_object_id_manager.clone()), ); let mut forward_iters = Vec::with_capacity(payload.len()); - let mut del_iter = ForwardMergeRangeIterator::new(MAX_EPOCH); + let mut del_iter = ForwardMergeRangeIterator::new(HummockEpoch::MAX); for imm in &payload { forward_iters.push(imm.clone().into_forward_iter()); del_iter.add_batch_iter(imm.delete_range_iter()); @@ -322,7 +321,7 @@ pub async fn merge_imms_in_memory( let mut largest_table_key = Bound::Included(Bytes::new()); let mut imm_iters = Vec::with_capacity(imms.len()); - let mut del_iter = ForwardMergeRangeIterator::new(MAX_EPOCH); + let mut del_iter = ForwardMergeRangeIterator::new(HummockEpoch::MAX); for imm in imms { assert!( imm.kv_count() > 0 || imm.has_range_tombstone(), @@ -394,14 +393,14 @@ pub async fn merge_imms_in_memory( let mut versions: Vec<(EpochWithGap, HummockValue)> = Vec::new(); - let mut pivot_last_delete_epoch = MAX_EPOCH; + let mut pivot_last_delete_epoch = HummockEpoch::MAX; for ((key, value), epoch) in items { assert!(key >= pivot, "key should be in ascending order"); if key != pivot { merged_payload.push((pivot, versions)); pivot = key; - pivot_last_delete_epoch = MAX_EPOCH; + pivot_last_delete_epoch = HummockEpoch::MAX; versions = vec![]; let target_extended_user_key = PointRange::from_user_key(UserKey::new(table_id, TableKey(pivot.as_ref())), false); diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index ce26d1cf1b66a..d293f03e570e1 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -21,7 +21,6 @@ use arc_swap::ArcSwap; use await_tree::InstrumentAwait; use parking_lot::RwLock; use prometheus::core::{AtomicU64, GenericGauge}; -use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionUpdateExt; use risingwave_hummock_sdk::{info_in_release, HummockEpoch, LocalSstableInfo}; use risingwave_pb::hummock::version_update_payload::Payload; @@ -396,7 +395,7 @@ impl HummockEventHandler { } self.sstable_object_id_manager - .remove_watermark_object_id(TrackerId::Epoch(MAX_EPOCH)); + .remove_watermark_object_id(TrackerId::Epoch(HummockEpoch::MAX)); // Notify completion of the Clear event. let _ = notifier.send(()).inspect_err(|e| { diff --git a/src/storage/src/hummock/iterator/backward_user.rs b/src/storage/src/hummock/iterator/backward_user.rs index 16f6984be809a..481ebd7fb4748 100644 --- a/src/storage/src/hummock/iterator/backward_user.rs +++ b/src/storage/src/hummock/iterator/backward_user.rs @@ -277,13 +277,7 @@ impl> BackwardUserIterator { impl> BackwardUserIterator { /// Creates [`BackwardUserIterator`] with maximum epoch. pub(crate) fn for_test(iterator: I, key_range: UserKeyRange) -> Self { - Self::with_epoch( - iterator, - key_range, - risingwave_common::util::epoch::MAX_EPOCH, - 0, - None, - ) + Self::with_epoch(iterator, key_range, HummockEpoch::MAX, 0, None) } /// Creates [`BackwardUserIterator`] with maximum epoch. @@ -292,13 +286,7 @@ impl> BackwardUserIterator { key_range: UserKeyRange, min_epoch: HummockEpoch, ) -> Self { - Self::with_epoch( - iterator, - key_range, - risingwave_common::util::epoch::MAX_EPOCH, - min_epoch, - None, - ) + Self::with_epoch(iterator, key_range, HummockEpoch::MAX, min_epoch, None) } } diff --git a/src/storage/src/hummock/iterator/concat_delete_range_iterator.rs b/src/storage/src/hummock/iterator/concat_delete_range_iterator.rs index b0e1c75e8e4ae..3c6ab7ded6791 100644 --- a/src/storage/src/hummock/iterator/concat_delete_range_iterator.rs +++ b/src/storage/src/hummock/iterator/concat_delete_range_iterator.rs @@ -185,7 +185,7 @@ mod tests { use bytes::Bytes; use risingwave_common::catalog::TableId; - use risingwave_common::util::epoch::MAX_EPOCH; + use risingwave_common::util::epoch::is_max_epoch; use super::*; use crate::hummock::iterator::test_utils::mock_sstable_store; @@ -252,7 +252,7 @@ mod tests { sstable_store, ); concat_iterator.rewind().await.unwrap(); - assert_eq!(concat_iterator.current_epoch(), MAX_EPOCH); + assert!(is_max_epoch(concat_iterator.current_epoch())); assert_eq!( concat_iterator.next_extended_user_key().left_user_key, test_user_key(b"aaaa").as_ref() diff --git a/src/storage/src/hummock/iterator/delete_range_iterator.rs b/src/storage/src/hummock/iterator/delete_range_iterator.rs index e62c2f53d600e..81e0272eb9904 100644 --- a/src/storage/src/hummock/iterator/delete_range_iterator.rs +++ b/src/storage/src/hummock/iterator/delete_range_iterator.rs @@ -15,7 +15,7 @@ use std::collections::{BTreeSet, BinaryHeap}; use std::future::Future; -use risingwave_common::util::epoch::MAX_EPOCH; +use risingwave_common::util::epoch::is_max_epoch; use risingwave_hummock_sdk::key::{PointRange, UserKey}; use risingwave_hummock_sdk::HummockEpoch; use risingwave_pb::hummock::SstableInfo; @@ -54,7 +54,7 @@ pub trait DeleteRangeIterator { /// Retrieves the epoch of the current range delete. /// It returns the epoch between the previous `next_user_key` (inclusive) and the current /// `next_user_key` (not inclusive). When there is no range deletes, it will return - /// `MAX_EPOCH`. + /// `HummockEpoch::MAX`. /// /// Note: /// - Before calling this function, makes sure the iterator `is_valid`. @@ -233,7 +233,7 @@ pub struct ForwardMergeRangeIterator { impl Default for ForwardMergeRangeIterator { fn default() -> Self { - ForwardMergeRangeIterator::new(MAX_EPOCH) + ForwardMergeRangeIterator::new(HummockEpoch::MAX) } } @@ -281,13 +281,13 @@ impl ForwardMergeRangeIterator { self.current_epochs .range(epoch..) .next() - .map_or(MAX_EPOCH, |ret| *ret) + .map_or(HummockEpoch::MAX, |ret| *ret) } pub fn earliest_epoch(&self) -> HummockEpoch { self.current_epochs .first() - .map_or(MAX_EPOCH, |epoch| *epoch) + .map_or(HummockEpoch::MAX, |epoch| *epoch) } } @@ -319,7 +319,7 @@ impl DeleteRangeIterator for ForwardMergeRangeIterator { } for node in &self.tmp_buffer { let epoch = node.current_epoch(); - if epoch != MAX_EPOCH { + if !is_max_epoch(epoch) { self.current_epochs.remove(&epoch); } } @@ -328,7 +328,7 @@ impl DeleteRangeIterator for ForwardMergeRangeIterator { node.next().await?; if node.is_valid() { let epoch = node.current_epoch(); - if epoch != MAX_EPOCH { + if !is_max_epoch(epoch) { self.current_epochs.insert(epoch); } self.heap.push(node); @@ -349,7 +349,7 @@ impl DeleteRangeIterator for ForwardMergeRangeIterator { node.rewind().await?; if node.is_valid() { let epoch = node.current_epoch(); - if epoch != MAX_EPOCH { + if !is_max_epoch(epoch) { self.current_epochs.insert(epoch); } self.heap.push(node); @@ -368,7 +368,7 @@ impl DeleteRangeIterator for ForwardMergeRangeIterator { node.seek(target_user_key).await?; if node.is_valid() { let epoch = node.current_epoch(); - if epoch != MAX_EPOCH { + if !is_max_epoch(epoch) { self.current_epochs.insert(epoch); } self.heap.push(node); diff --git a/src/storage/src/hummock/iterator/forward_user.rs b/src/storage/src/hummock/iterator/forward_user.rs index e26675dc11979..630a0a8dda6a7 100644 --- a/src/storage/src/hummock/iterator/forward_user.rs +++ b/src/storage/src/hummock/iterator/forward_user.rs @@ -15,7 +15,7 @@ use std::ops::Bound::*; use bytes::Bytes; -use risingwave_common::util::epoch::{MAX_EPOCH, MAX_SPILL_TIMES}; +use risingwave_common::util::epoch::MAX_SPILL_TIMES; use risingwave_hummock_sdk::key::{FullKey, UserKey, UserKeyRange}; use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch}; @@ -84,7 +84,7 @@ impl> UserIterator { /// Create [`UserIterator`] with maximum epoch. pub fn for_test(iterator: I, key_range: UserKeyRange) -> Self { - let read_epoch = MAX_EPOCH; + let read_epoch = HummockEpoch::MAX; Self::new( iterator, key_range, diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs index a1e3a77f00c6e..0dc7d67bc6175 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs @@ -25,7 +25,6 @@ use bytes::{Bytes, BytesMut}; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; -use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::key::{FullKey, PointRange, TableKey, TableKeyRange, UserKey}; use risingwave_hummock_sdk::EpochWithGap; @@ -169,7 +168,7 @@ impl SharedBufferBatchInner { }); monotonic_tombstone_events.push(MonotonicDeleteEvent { event_key: end_point_range, - new_epoch: MAX_EPOCH, + new_epoch: HummockEpoch::MAX, }); } @@ -328,7 +327,7 @@ impl SharedBufferBatchInner { }, ); if idx == 0 { - MAX_EPOCH + HummockEpoch::MAX } else { self.monotonic_tombstone_events[idx - 1].new_epoch } @@ -857,7 +856,7 @@ impl DeleteRangeIterator for SharedBufferDeleteRangeIterator { if self.next_idx > 0 { self.inner.monotonic_tombstone_events[self.next_idx - 1].new_epoch } else { - MAX_EPOCH + HummockEpoch::MAX } } @@ -897,7 +896,6 @@ mod tests { use std::ops::Bound::{Excluded, Included}; use risingwave_common::must_match; - use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::key::map_table_key_range; use super::*; @@ -1192,7 +1190,7 @@ mod tests { .get_min_delete_range_epoch(UserKey::new(Default::default(), TableKey(b"aaa"),)) ); assert_eq!( - MAX_EPOCH, + HummockEpoch::MAX, shared_buffer_batch .get_min_delete_range_epoch(UserKey::new(Default::default(), TableKey(b"bbb"),)) ); @@ -1202,7 +1200,7 @@ mod tests { .get_min_delete_range_epoch(UserKey::new(Default::default(), TableKey(b"ddd"),)) ); assert_eq!( - MAX_EPOCH, + HummockEpoch::MAX, shared_buffer_batch .get_min_delete_range_epoch(UserKey::new(Default::default(), TableKey(b"eee"),)) ); diff --git a/src/storage/src/hummock/sstable/builder.rs b/src/storage/src/hummock/sstable/builder.rs index 537d25539c957..2d165638c074b 100644 --- a/src/storage/src/hummock/sstable/builder.rs +++ b/src/storage/src/hummock/sstable/builder.rs @@ -17,7 +17,7 @@ use std::collections::BTreeSet; use std::sync::Arc; use bytes::{Bytes, BytesMut}; -use risingwave_common::util::epoch::MAX_EPOCH; +use risingwave_common::util::epoch::{is_compatibility_max_epoch, is_max_epoch}; use risingwave_hummock_sdk::key::{user_key, FullKey, MAX_KEY_LEN}; use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap}; use risingwave_hummock_sdk::{HummockEpoch, KeyComparator, LocalSstableInfo}; @@ -114,7 +114,7 @@ pub struct SstableBuilder { /// epoch1 to epoch2, thus the `new epoch` is epoch2. epoch2 will be used from the event /// key wmk1 (5) and till the next event key wmk2 (7) (not inclusive). /// If there is no range deletes between current event key and next event key, `new_epoch` will - /// be `MAX_EPOCH`. + /// be `HummockEpoch::MAX`. monotonic_deletes: Vec, /// `table_id` of added keys. table_ids: BTreeSet, @@ -193,14 +193,14 @@ impl SstableBuilder { } /// Add kv pair to sstable. - pub fn add_monotonic_delete(&mut self, event: MonotonicDeleteEvent) { + pub fn add_monotonic_delete(&mut self, mut event: MonotonicDeleteEvent) { let table_id = event.event_key.left_user_key.table_id.table_id(); if self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id { self.table_ids.insert(table_id); } - if event.new_epoch == MAX_EPOCH + if is_max_epoch(event.new_epoch) && self.monotonic_deletes.last().map_or(true, |last| { - last.new_epoch == MAX_EPOCH + is_max_epoch(last.new_epoch) && last.event_key.left_user_key.table_id == event.event_key.left_user_key.table_id }) @@ -208,9 +208,14 @@ impl SstableBuilder { // This range would never delete any key so we can merge it with last range. return; } - if event.new_epoch != MAX_EPOCH { + if !is_max_epoch(event.new_epoch) { self.epoch_set.insert(event.new_epoch); } + if is_compatibility_max_epoch(event.new_epoch) { + // It is dangerous to mix two different max value in data, so rewrite it to keep same format with main branch. + // See bug description in https://github.com/risingwavelabs/risingwave/issues/13717 + event.new_epoch = HummockEpoch::MAX; + } self.range_tombstone_size += event.encoded_size(); self.monotonic_deletes.push(event); } @@ -218,7 +223,7 @@ impl SstableBuilder { pub fn last_range_tombstone_epoch(&self) -> HummockEpoch { self.monotonic_deletes .last() - .map_or(MAX_EPOCH, |delete| delete.new_epoch) + .map_or(HummockEpoch::MAX, |delete| delete.new_epoch) } pub fn last_range_tombstone(&self) -> Option<&MonotonicDeleteEvent> { @@ -399,7 +404,7 @@ impl SstableBuilder { assert!(self.monotonic_deletes.is_empty() || self.monotonic_deletes.len() > 1); if let Some(monotonic_delete) = self.monotonic_deletes.last() { - assert_eq!(monotonic_delete.new_epoch, MAX_EPOCH); + assert!(is_max_epoch(monotonic_delete.new_epoch)); if monotonic_delete.event_key.is_exclude_left_key { if largest_key.is_empty() || !KeyComparator::encoded_greater_than_unencoded( @@ -420,18 +425,18 @@ impl SstableBuilder { ) { // use MAX as epoch because the last monotonic delete must be - // `MAX_EPOCH`, so we can not include any version of + // `HummockEpoch::MAX`, so we can not include any version of // this key. largest_key = FullKey::from_user_key( monotonic_delete.event_key.left_user_key.clone(), - MAX_EPOCH, + HummockEpoch::MAX, ) .encode(); right_exclusive = true; } } if let Some(monotonic_delete) = self.monotonic_deletes.first() { - assert_ne!(monotonic_delete.new_epoch, MAX_EPOCH); + assert!(!is_max_epoch(monotonic_delete.new_epoch)); if smallest_key.is_empty() || !KeyComparator::encoded_less_than_unencoded( user_key(&smallest_key), @@ -440,7 +445,7 @@ impl SstableBuilder { { smallest_key = FullKey::from_user_key( monotonic_delete.event_key.left_user_key.clone(), - MAX_EPOCH, + HummockEpoch::MAX, ) .encode(); } @@ -491,11 +496,11 @@ impl SstableBuilder { // Expand the epoch of the whole sst by tombstone epoch let (tombstone_min_epoch, tombstone_max_epoch) = { - let mut tombstone_min_epoch = MAX_EPOCH; + let mut tombstone_min_epoch = HummockEpoch::MAX; let mut tombstone_max_epoch = u64::MIN; for monotonic_delete in &meta.monotonic_tombstone_events { - if monotonic_delete.new_epoch != MAX_EPOCH { + if !is_max_epoch(monotonic_delete.new_epoch) { tombstone_min_epoch = cmp::min(tombstone_min_epoch, monotonic_delete.new_epoch); tombstone_max_epoch = cmp::max(tombstone_max_epoch, monotonic_delete.new_epoch); } @@ -537,7 +542,7 @@ impl SstableBuilder { let (min_epoch, max_epoch) = { if self.epoch_set.is_empty() { - (MAX_EPOCH, u64::MIN) + (HummockEpoch::MAX, u64::MIN) } else { ( *self.epoch_set.first().unwrap(), @@ -693,7 +698,7 @@ pub(super) mod tests { let mut b = SstableBuilder::for_test(0, mock_sst_writer(&opt), opt); b.add_monotonic_deletes(vec![ MonotonicDeleteEvent::new(table_id, b"abcd".to_vec(), 0), - MonotonicDeleteEvent::new(table_id, b"eeee".to_vec(), MAX_EPOCH), + MonotonicDeleteEvent::new(table_id, b"eeee".to_vec(), HummockEpoch::MAX), ]); let s = b.finish().await.unwrap().sst_info; let key_range = s.sst_info.key_range.unwrap(); diff --git a/src/storage/src/hummock/sstable/delete_range_aggregator.rs b/src/storage/src/hummock/sstable/delete_range_aggregator.rs index 6056a38de698b..1118ad7f90da1 100644 --- a/src/storage/src/hummock/sstable/delete_range_aggregator.rs +++ b/src/storage/src/hummock/sstable/delete_range_aggregator.rs @@ -15,7 +15,8 @@ use std::cmp::Ordering; use std::future::Future; -use risingwave_common::util::epoch::MAX_EPOCH; +#[cfg(test)] +use risingwave_common::util::epoch::is_max_epoch; use risingwave_hummock_sdk::key::{PointRange, UserKey}; use risingwave_hummock_sdk::HummockEpoch; @@ -75,7 +76,7 @@ impl CompactionDeleteRangeIterator { let extended_smallest_user_key = PointRange::from_user_key(smallest_user_key, false); let extended_largest_user_key = PointRange::from_user_key(largest_user_key, false); let mut monotonic_events = vec![]; - if iter.earliest_epoch() < MAX_EPOCH { + if !is_max_epoch(iter.earliest_epoch()) { monotonic_events.push(MonotonicDeleteEvent { event_key: extended_smallest_user_key.to_vec(), new_epoch: iter.earliest_epoch(), @@ -87,7 +88,7 @@ impl CompactionDeleteRangeIterator { if !monotonic_events.is_empty() { monotonic_events.push(MonotonicDeleteEvent { event_key: extended_largest_user_key.to_vec(), - new_epoch: MAX_EPOCH, + new_epoch: HummockEpoch::MAX, }); } break; @@ -107,8 +108,8 @@ impl CompactionDeleteRangeIterator { && a.new_epoch == b.new_epoch }); if !monotonic_events.is_empty() { - assert_ne!(monotonic_events.first().unwrap().new_epoch, MAX_EPOCH); - assert_eq!(monotonic_events.last().unwrap().new_epoch, MAX_EPOCH); + assert!(!is_max_epoch(monotonic_events.first().unwrap().new_epoch)); + assert!(is_max_epoch(monotonic_events.last().unwrap().new_epoch)); } Ok(monotonic_events) } @@ -198,7 +199,7 @@ impl DeleteRangeIterator for SstableDeleteRangeIterator { if self.next_idx > 0 { self.table.value().meta.monotonic_tombstone_events[self.next_idx - 1].new_epoch } else { - MAX_EPOCH + HummockEpoch::MAX } } @@ -245,7 +246,7 @@ pub fn get_min_delete_range_epoch_from_sstable( |MonotonicDeleteEvent { event_key, .. }| event_key.as_ref().le(&query_extended_user_key), ); if idx == 0 { - MAX_EPOCH + HummockEpoch::MAX } else { table.meta.monotonic_tombstone_events[idx - 1].new_epoch } @@ -257,6 +258,7 @@ mod tests { use bytes::Bytes; use risingwave_common::catalog::TableId; + use risingwave_common::util::epoch::is_max_epoch; use super::*; use crate::hummock::iterator::test_utils::{ @@ -328,7 +330,7 @@ mod tests { iter.earliest_delete_which_can_see_key(test_user_key(b"bbb").as_ref(), 13) .await .unwrap(), - MAX_EPOCH, + HummockEpoch::MAX, ); assert_eq!( iter.earliest_delete_which_can_see_key(test_user_key(b"bbb").as_ref(), 11) @@ -359,20 +361,20 @@ mod tests { iter.earliest_delete_which_can_see_key(test_user_key(b"bbbddd").as_ref(), 8) .await .unwrap(), - MAX_EPOCH, + HummockEpoch::MAX, ); assert_eq!( iter.earliest_delete_which_can_see_key(test_user_key(b"bbbeee").as_ref(), 8) .await .unwrap(), - MAX_EPOCH, + HummockEpoch::MAX, ); assert_eq!( iter.earliest_delete_which_can_see_key(test_user_key(b"bbbeef").as_ref(), 10) .await .unwrap(), - MAX_EPOCH, + HummockEpoch::MAX, ); assert_eq!( iter.earliest_delete_which_can_see_key(test_user_key(b"eeeeee").as_ref(), 8) @@ -390,7 +392,7 @@ mod tests { iter.earliest_delete_which_can_see_key(test_user_key(b"hhhhhh").as_ref(), 6) .await .unwrap(), - MAX_EPOCH, + HummockEpoch::MAX, ); assert_eq!( iter.earliest_delete_which_can_see_key(test_user_key(b"iiiiii").as_ref(), 6) @@ -510,6 +512,6 @@ mod tests { sstable.value(), iterator_test_user_key_of(8).as_ref(), ); - assert_eq!(ret, MAX_EPOCH); + assert!(is_max_epoch(ret)); } } diff --git a/src/storage/src/hummock/sstable/mod.rs b/src/storage/src/hummock/sstable/mod.rs index dca63369aeda2..9d6d944bb5c4b 100644 --- a/src/storage/src/hummock/sstable/mod.rs +++ b/src/storage/src/hummock/sstable/mod.rs @@ -145,7 +145,7 @@ impl DeleteRangeTombstone { /// thus the `new epoch` is epoch2. epoch2 will be used from the event key wmk1 (5) and till the /// next event key wmk2 (7) (not inclusive). /// If there is no range deletes between current event key and next event key, `new_epoch` will be -/// `MAX_EPOCH`. +/// `HummockEpoch::MAX`. #[derive(Clone, Debug, PartialEq, Eq)] pub struct MonotonicDeleteEvent { pub event_key: PointRange>, @@ -356,7 +356,7 @@ pub struct SstableMeta { /// epoch1 to epoch2, thus the `new epoch` is epoch2. epoch2 will be used from the event /// key wmk1 (5) and till the next event key wmk2 (7) (not inclusive). /// If there is no range deletes between current event key and next event key, `new_epoch` will - /// be `MAX_EPOCH`. + /// be `HummockEpoch::MAX`. pub monotonic_tombstone_events: Vec, /// Format version, for further compatibility. pub version: u32, diff --git a/src/storage/src/hummock/sstable/multi_builder.rs b/src/storage/src/hummock/sstable/multi_builder.rs index c2e20684eb520..f2fee142c13d2 100644 --- a/src/storage/src/hummock/sstable/multi_builder.rs +++ b/src/storage/src/hummock/sstable/multi_builder.rs @@ -19,9 +19,9 @@ use std::sync::Arc; use bytes::Bytes; use num_integer::Integer; use risingwave_common::hash::VirtualNode; -use risingwave_common::util::epoch::MAX_EPOCH; +use risingwave_common::util::epoch::is_max_epoch; use risingwave_hummock_sdk::key::{FullKey, PointRange, UserKey}; -use risingwave_hummock_sdk::LocalSstableInfo; +use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo}; use tokio::task::JoinHandle; use super::MonotonicDeleteEvent; @@ -200,7 +200,7 @@ where // the captured reference to `current_builder` is also required to be `Send`, and then // `current_builder` itself is required to be `Sync`, which is unnecessary. let mut need_seal_current = false; - let mut last_range_tombstone_epoch = MAX_EPOCH; + let mut last_range_tombstone_epoch = HummockEpoch::MAX; if let Some(builder) = self.current_builder.as_mut() { if is_new_user_key { if switch_builder { @@ -215,7 +215,7 @@ where } if need_seal_current && let Some(event) = builder.last_range_tombstone() - && event.new_epoch != MAX_EPOCH + && !is_max_epoch(event.new_epoch) { last_range_tombstone_epoch = event.new_epoch; if event @@ -229,7 +229,7 @@ where } else { builder.add_monotonic_delete(MonotonicDeleteEvent { event_key: PointRange::from_user_key(full_key.user_key.to_vec(), false), - new_epoch: MAX_EPOCH, + new_epoch: HummockEpoch::MAX, }); } } @@ -248,7 +248,7 @@ where let mut builder = self.builder_factory.open_builder().await?; // If last_range_tombstone_epoch is not MAX, it means that we cut one range-tombstone to // two half and add the right half as a new range to next sstable. - if need_seal_current && last_range_tombstone_epoch != MAX_EPOCH { + if need_seal_current && !is_max_epoch(last_range_tombstone_epoch) { builder.add_monotonic_delete(MonotonicDeleteEvent { event_key: PointRange::from_user_key(full_key.user_key.to_vec(), false), new_epoch: last_range_tombstone_epoch, @@ -306,19 +306,19 @@ where pub async fn add_monotonic_delete(&mut self, event: MonotonicDeleteEvent) -> HummockResult<()> { if let Some(builder) = self.current_builder.as_mut() && builder.reach_capacity() - && event.new_epoch != MAX_EPOCH + && !is_max_epoch(event.new_epoch) { - if builder.last_range_tombstone_epoch() != MAX_EPOCH { + if !is_max_epoch(builder.last_range_tombstone_epoch()) { builder.add_monotonic_delete(MonotonicDeleteEvent { event_key: event.event_key.clone(), - new_epoch: MAX_EPOCH, + new_epoch: HummockEpoch::MAX, }); } self.seal_current().await?; } if self.current_builder.is_none() { - if event.new_epoch == MAX_EPOCH { + if is_max_epoch(event.new_epoch) { return Ok(()); } @@ -650,7 +650,7 @@ mod tests { FullKey::for_test( table_id, &[VirtualNode::ZERO.to_be_bytes().as_slice(), b"aaa"].concat(), - MAX_EPOCH, + HummockEpoch::MAX, ) .encode() ); @@ -659,7 +659,7 @@ mod tests { FullKey::for_test( table_id, &[VirtualNode::ZERO.to_be_bytes().as_slice(), b"kkk"].concat(), - MAX_EPOCH + HummockEpoch::MAX ) .encode() ); @@ -704,7 +704,7 @@ mod tests { 0, ); del_iter.rewind().await.unwrap(); - assert_eq!(del_iter.earliest_epoch(), MAX_EPOCH); + assert!(is_max_epoch(del_iter.earliest_epoch())); while del_iter.is_valid() { let event_key = del_iter.key().to_vec(); del_iter.next().await.unwrap(); @@ -803,7 +803,7 @@ mod tests { UserKey::for_test(table_id, b"gggg".to_vec()), false, ), - new_epoch: MAX_EPOCH, + new_epoch: HummockEpoch::MAX, }) .await .unwrap(); @@ -818,18 +818,22 @@ mod tests { let key_range = ssts[0].key_range.as_ref().unwrap(); let expected_left = - FullKey::from_user_key(UserKey::for_test(table_id, b"aaaa"), MAX_EPOCH).encode(); + FullKey::from_user_key(UserKey::for_test(table_id, b"aaaa"), HummockEpoch::MAX) + .encode(); let expected_right = - FullKey::from_user_key(UserKey::for_test(table_id, b"eeee"), MAX_EPOCH).encode(); + FullKey::from_user_key(UserKey::for_test(table_id, b"eeee"), HummockEpoch::MAX) + .encode(); assert_eq!(key_range.left, expected_left); assert_eq!(key_range.right, expected_right); assert!(key_range.right_exclusive); let key_range = ssts[1].key_range.as_ref().unwrap(); let expected_left = - FullKey::from_user_key(UserKey::for_test(table_id, b"eeee"), MAX_EPOCH).encode(); + FullKey::from_user_key(UserKey::for_test(table_id, b"eeee"), HummockEpoch::MAX) + .encode(); let expected_right = - FullKey::from_user_key(UserKey::for_test(table_id, b"gggg"), MAX_EPOCH).encode(); + FullKey::from_user_key(UserKey::for_test(table_id, b"gggg"), HummockEpoch::MAX) + .encode(); assert_eq!(key_range.left, expected_left); assert_eq!(key_range.right, expected_right); assert!(key_range.right_exclusive); diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index c018ce167bff9..207ad2cfc85d5 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -23,7 +23,7 @@ use bytes::Bytes; use itertools::Itertools; use more_asserts::assert_gt; use risingwave_common::catalog::TableId; -use risingwave_common::util::epoch::MAX_EPOCH; +use risingwave_common::util::epoch::is_max_epoch; use risingwave_common_service::observer_manager::{NotificationClient, ObserverManager}; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; use risingwave_hummock_sdk::HummockReadEpoch; @@ -404,7 +404,7 @@ impl StateStore for HummockStorage { self.validate_read_epoch(wait_epoch)?; let wait_epoch = match wait_epoch { HummockReadEpoch::Committed(epoch) => { - assert_ne!(epoch, MAX_EPOCH, "epoch should not be MAX_EPOCH"); + assert!(!is_max_epoch(epoch), "epoch should not be MAX EPOCH"); epoch } _ => return Ok(()), @@ -431,7 +431,7 @@ impl StateStore for HummockStorage { if is_checkpoint { let _ = self.min_current_epoch.compare_exchange( - MAX_EPOCH, + HummockEpoch::MAX, epoch, MemOrdering::SeqCst, MemOrdering::SeqCst, @@ -454,7 +454,8 @@ impl StateStore for HummockStorage { rx.await.expect("should wait success"); let epoch = self.pinned_version.load().max_committed_epoch(); - self.min_current_epoch.store(MAX_EPOCH, MemOrdering::SeqCst); + self.min_current_epoch + .store(HummockEpoch::MAX, MemOrdering::SeqCst); self.seal_epoch.store(epoch, MemOrdering::SeqCst); Ok(()) @@ -466,9 +467,9 @@ impl StateStore for HummockStorage { fn validate_read_epoch(&self, epoch: HummockReadEpoch) -> StorageResult<()> { if let HummockReadEpoch::Current(read_current_epoch) = epoch { - assert_ne!( - read_current_epoch, MAX_EPOCH, - "epoch should not be MAX_EPOCH" + assert!( + !is_max_epoch(read_current_epoch), + "epoch should not be MAX EPOCH" ); let sealed_epoch = self.seal_epoch.load(MemOrdering::SeqCst); if read_current_epoch > sealed_epoch { diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index a54692e43fac5..eea4c9045fcf9 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -20,7 +20,7 @@ use await_tree::InstrumentAwait; use bytes::Bytes; use parking_lot::RwLock; use risingwave_common::catalog::{TableId, TableOption}; -use risingwave_common::util::epoch::{MAX_EPOCH, MAX_SPILL_TIMES}; +use risingwave_common::util::epoch::MAX_SPILL_TIMES; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch}; use tokio::sync::mpsc; @@ -187,7 +187,7 @@ impl LocalHummockStorage { } let read_snapshot = read_filter_for_local( - MAX_EPOCH, // Use MAX epoch to make sure we read from latest + HummockEpoch::MAX, // Use MAX epoch to make sure we read from latest read_options.table_id, &key_range, self.read_version.clone(), diff --git a/src/storage/src/hummock/test_utils.rs b/src/storage/src/hummock/test_utils.rs index a1fad498c06bb..1b4988eadc124 100644 --- a/src/storage/src/hummock/test_utils.rs +++ b/src/storage/src/hummock/test_utils.rs @@ -23,7 +23,6 @@ use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_common::must_match; -use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::key::{FullKey, PointRange, TableKey, UserKey}; use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, HummockSstableObjectId}; use risingwave_pb::hummock::{KeyRange, SstableInfo}; @@ -228,17 +227,17 @@ pub async fn gen_test_sstable_impl + Clone + Default + Eq, F: Fil ); let mut last_key = FullKey::::default(); - let mut user_key_last_delete = MAX_EPOCH; + let mut user_key_last_delete = HummockEpoch::MAX; for (mut key, value) in kv_iter { let is_new_user_key = last_key.is_empty() || key.user_key.as_ref() != last_key.user_key.as_ref(); let epoch = key.epoch_with_gap.pure_epoch(); if is_new_user_key { last_key = key.clone(); - user_key_last_delete = MAX_EPOCH; + user_key_last_delete = HummockEpoch::MAX; } - let mut earliest_delete_epoch = MAX_EPOCH; + let mut earliest_delete_epoch = HummockEpoch::MAX; let extended_user_key = PointRange::from_user_key(key.user_key.as_ref(), false); for range_tombstone in &range_tombstones { if range_tombstone @@ -486,7 +485,7 @@ pub mod delete_range { apply_event(&mut epochs, &event); monotonic_tombstone_events.push(MonotonicDeleteEvent { event_key: event.0, - new_epoch: epochs.first().map_or(MAX_EPOCH, |epoch| *epoch), + new_epoch: epochs.first().map_or(HummockEpoch::MAX, |epoch| *epoch), }); } monotonic_tombstone_events.dedup_by(|a, b| { @@ -527,7 +526,7 @@ pub mod delete_range { /// `<1, +epoch1> <3, +epoch2> <5, -epoch1> <7, -epoch2> <10, +epoch3> <12, -epoch3>` /// We rely on the fact that keys met in compaction are in order. /// When user key 0 comes, no events have happened yet so no range delete epoch. (will be - /// represented as range delete epoch `MAX_EPOCH`) + /// represented as range delete epoch MAX EPOCH) /// When user key 1 comes, event `<1, +epoch1>` happens so there is currently one range delete /// epoch: epoch1. /// When user key 2 comes, no more events happen so the set remains `{epoch1}`. diff --git a/src/storage/src/memory.rs b/src/storage/src/memory.rs index 38f243b20d3f3..8dfce8a7ae1b9 100644 --- a/src/storage/src/memory.rs +++ b/src/storage/src/memory.rs @@ -20,7 +20,6 @@ use std::sync::{Arc, LazyLock}; use bytes::Bytes; use parking_lot::RwLock; use risingwave_common::catalog::TableId; -use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange, UserKey}; use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch}; @@ -450,7 +449,7 @@ where Included(k) => Included(FullKey::new( table_id, TableKey(Bytes::from(k.as_ref().to_vec())), - MAX_EPOCH, + HummockEpoch::MAX, )), Excluded(k) => Excluded(FullKey::new( table_id, @@ -460,7 +459,7 @@ where Unbounded => Included(FullKey::new( table_id, TableKey(Bytes::from(b"".to_vec())), - MAX_EPOCH, + HummockEpoch::MAX, )), }; let end = match table_key_range.end_bound() { @@ -472,14 +471,14 @@ where Excluded(k) => Excluded(FullKey::new( table_id, TableKey(Bytes::from(k.as_ref().to_vec())), - MAX_EPOCH, + HummockEpoch::MAX, )), Unbounded => { if let Some(next_table_id) = table_id.table_id().checked_add(1) { Excluded(FullKey::new( next_table_id.into(), TableKey(Bytes::from(b"".to_vec())), - MAX_EPOCH, + HummockEpoch::MAX, )) } else { Unbounded diff --git a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs index c0a759c77746b..571abbf3252be 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs @@ -24,11 +24,11 @@ use risingwave_common::array::StreamChunk; use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; use risingwave_common::hash::VnodeBitmapExt; -use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_connector::sink::log_store::{ ChunkId, LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset, }; use risingwave_hummock_sdk::key::TableKey; +use risingwave_hummock_sdk::HummockEpoch; use risingwave_storage::hummock::CachePolicy; use risingwave_storage::store::{PrefetchOptions, ReadOptions}; use risingwave_storage::StateStore; @@ -121,7 +121,7 @@ impl LogReader for KvLogStoreReader { state_store .iter( (Included(range_start), Excluded(range_end)), - MAX_EPOCH, + HummockEpoch::MAX, ReadOptions { prefetch_options: PrefetchOptions::default(), cache_policy: CachePolicy::Fill(CachePriority::Low), @@ -225,14 +225,14 @@ impl LogReader for KvLogStoreReader { serde.serialize_log_store_pk(vnode, item_epoch, Some(end_seq_id)); let state_store = &state_store; - // Use MAX_EPOCH here because the epoch to consume may be below the safe + // Use MAX EPOCH here because the epoch to consume may be below the safe // epoch async move { Ok::<_, anyhow::Error>(Box::pin( state_store .iter( (Included(range_start), Included(range_end)), - MAX_EPOCH, + HummockEpoch::MAX, ReadOptions { prefetch_options: PrefetchOptions::default(), cache_policy: CachePolicy::Fill(CachePriority::Low), diff --git a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs index 1ca0edc89be50..3c4f1cb8046c3 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs @@ -34,7 +34,6 @@ use risingwave_common::hash::VirtualNode; use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; -use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_common::util::row_serde::OrderedRowSerde; use risingwave_common::util::sort_util::OrderType; use risingwave_common::util::value_encoding::{ @@ -42,6 +41,7 @@ use risingwave_common::util::value_encoding::{ }; use risingwave_connector::sink::log_store::LogStoreResult; use risingwave_hummock_sdk::key::{next_key, TableKey}; +use risingwave_hummock_sdk::HummockEpoch; use risingwave_pb::catalog::Table; use risingwave_storage::error::StorageError; use risingwave_storage::row_serde::row_serde_util::serialize_pk_with_vnode; @@ -580,7 +580,7 @@ impl LogStoreRowOpStream { // sorted by epoch descending. Earlier epoch at the end self.not_started_streams - .sort_by_key(|(epoch, _)| MAX_EPOCH - *epoch); + .sort_by_key(|(epoch, _)| HummockEpoch::MAX - *epoch); let (epoch, stream) = self .not_started_streams