diff --git a/Cargo.lock b/Cargo.lock index 693a2b9a36f4d..4c8bf0942c393 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8178,6 +8178,7 @@ dependencies = [ "risingwave_e2e_extended_mode_test", "risingwave_expr_impl", "risingwave_frontend", + "risingwave_hummock_sdk", "risingwave_meta_node", "risingwave_pb", "risingwave_rpc_client", diff --git a/e2e_test/streaming/bug_fixes/issue_12299.slt b/e2e_test/streaming/bug_fixes/issue_12299.slt index 7be47038f15cf..3259881e926ae 100644 --- a/e2e_test/streaming/bug_fixes/issue_12299.slt +++ b/e2e_test/streaming/bug_fixes/issue_12299.slt @@ -1,6 +1,9 @@ # https://github.com/risingwavelabs/risingwave/issues/12299 # TL;DR When upstream's stream key is not pk and the stream scan does not contain whole pk. +statement ok +SET RW_IMPLICIT_FLUSH TO true; + statement ok create table t1( id bigint primary key, diff --git a/src/batch/Cargo.toml b/src/batch/Cargo.toml index fef154450a563..60141e1638c28 100644 --- a/src/batch/Cargo.toml +++ b/src/batch/Cargo.toml @@ -63,6 +63,7 @@ workspace-hack = { path = "../workspace-hack" } criterion = { workspace = true, features = ["async_tokio", "async"] } rand = "0.8" risingwave_expr_impl = { workspace = true } +risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] } tempfile = "3" tikv-jemallocator = { workspace = true } diff --git a/src/cmd_all/src/standalone.rs b/src/cmd_all/src/standalone.rs index c0a46f087298c..85381e932d493 100644 --- a/src/cmd_all/src/standalone.rs +++ b/src/cmd_all/src/standalone.rs @@ -269,6 +269,7 @@ mod test { connector_rpc_sink_payload_format: None, config_path: "src/config/test.toml", total_memory_bytes: 34359738368, + mem_table_spill_threshold: 4194304, parallelism: 10, role: Both, metrics_level: None, diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 709ceae4b2799..ae92218803503 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -582,6 +582,10 @@ pub struct StorageConfig { pub enable_fast_compaction: bool, #[serde(default, flatten)] pub unrecognized: Unrecognized, + + /// The spill threshold for mem table. + #[serde(default = "default::storage::mem_table_spill_threshold")] + pub mem_table_spill_threshold: usize, } #[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)] @@ -1090,6 +1094,10 @@ pub mod default { pub fn enable_fast_compaction() -> bool { true } + + pub fn mem_table_spill_threshold() -> usize { + 4 << 20 + } } pub mod streaming { diff --git a/src/common/src/util/epoch.rs b/src/common/src/util/epoch.rs index 86f72406b0945..d43f0b4a2a8c3 100644 --- a/src/common/src/util/epoch.rs +++ b/src/common/src/util/epoch.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cmp::Ordering; use std::sync::LazyLock; use std::time::{Duration, SystemTime}; @@ -42,24 +41,23 @@ impl Epoch { #[must_use] pub fn next(self) -> Self { - let physical_now = Epoch::physical_now(); + let mut physical_now = Epoch::physical_now(); let prev_physical_time = self.physical_time(); - let next_epoch = match physical_now.cmp(&prev_physical_time) { - Ordering::Greater => Self::from_physical_time(physical_now), - Ordering::Equal => { - tracing::warn!("New generate epoch is too close to the previous one."); - Epoch(self.0 + 1) + loop { + if physical_now > prev_physical_time { + break; } - Ordering::Less => { - tracing::warn!( - "Clock goes backwards when calling Epoch::next(): prev={}, curr={}", - prev_physical_time, - physical_now - ); - Epoch(self.0 + 1) - } - }; + physical_now = Epoch::physical_now(); + + #[cfg(madsim)] + tokio::time::advance(std::time::Duration::from_micros(10)); + #[cfg(not(madsim))] + std::hint::spin_loop(); + } + // The last 16 bits of the previous epoch ((prev_epoch + 1, prev_epoch + 65536)) will be + // used as the gap epoch when the mem table spill occurs. + let next_epoch = Self::from_physical_time(physical_now); assert!(next_epoch.0 > self.0); next_epoch @@ -117,7 +115,7 @@ impl Epoch { } pub const EPOCH_AVAILABLE_BITS: u64 = 16; -pub const MAX_SPILL_TIMES: u64 = 1 << EPOCH_AVAILABLE_BITS; +pub const MAX_SPILL_TIMES: u16 = ((1 << EPOCH_AVAILABLE_BITS) - 1) as u16; pub const EPOCH_MASK: u64 = (1 << EPOCH_AVAILABLE_BITS) - 1; pub const MAX_EPOCH: u64 = u64::MAX & !EPOCH_MASK; impl From for Epoch { @@ -207,8 +205,8 @@ mod tests { assert_eq!(risingwave_st, *UNIX_RISINGWAVE_DATE_EPOCH); } - #[test] - fn test_epoch_generate() { + #[tokio::test] + async fn test_epoch_generate() { let mut prev_epoch = Epoch::now(); for _ in 0..1000 { let epoch = prev_epoch.next(); diff --git a/src/compute/Cargo.toml b/src/compute/Cargo.toml index 624e901c6b7ae..5dc1b90f14ba1 100644 --- a/src/compute/Cargo.toml +++ b/src/compute/Cargo.toml @@ -62,6 +62,7 @@ workspace-hack = { path = "../workspace-hack" } [dev-dependencies] futures-async-stream = { workspace = true } rand = "0.8" +risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] } tempfile = "3" [lints] diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index fc5ae9ff19854..8bc9093274333 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -92,6 +92,10 @@ pub struct ComputeNodeOpts { #[clap(long, env = "RW_TOTAL_MEMORY_BYTES", default_value_t = default_total_memory_bytes())] pub total_memory_bytes: usize, + /// Spill threshold for mem table. + #[clap(long, env = "RW_MEM_TABLE_SPILL_THRESHOLD", default_value_t = default_mem_table_spill_threshold())] + pub mem_table_spill_threshold: usize, + /// The parallelism that the compute node will register to the scheduler of the meta service. #[clap(long, env = "RW_PARALLELISM", default_value_t = default_parallelism())] #[override_opts(if_absent, path = streaming.actor_runtime_worker_threads_num)] @@ -230,6 +234,10 @@ fn default_total_memory_bytes() -> usize { (system_memory_available_bytes() as f64 * DEFAULT_MEMORY_PROPORTION) as usize } +fn default_mem_table_spill_threshold() -> usize { + (4 << 20) as usize +} + fn default_parallelism() -> usize { total_cpu_available().ceil() as usize } diff --git a/src/config/example.toml b/src/config/example.toml index f381fce00ab09..93d6a0820102b 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -110,7 +110,7 @@ compactor_max_sst_key_count = 2097152 compact_iter_recreate_timeout_ms = 600000 compactor_max_sst_size = 536870912 enable_fast_compaction = true - +mem_table_spill_threshold = 4194304 [storage.data_file_cache] dir = "" diff --git a/src/ctl/Cargo.toml b/src/ctl/Cargo.toml index 061783b3d6dfb..4c9fe916d29b9 100644 --- a/src/ctl/Cargo.toml +++ b/src/ctl/Cargo.toml @@ -53,5 +53,8 @@ uuid = { version = "1", features = ["v4"] } [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../workspace-hack" } +[dev-dependencies] +risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] } + [lints] workspace = true diff --git a/src/ctl/src/cmd_impl/hummock/list_kv.rs b/src/ctl/src/cmd_impl/hummock/list_kv.rs index c7acc5f753ee3..15e59b23a2451 100644 --- a/src/ctl/src/cmd_impl/hummock/list_kv.rs +++ b/src/ctl/src/cmd_impl/hummock/list_kv.rs @@ -15,6 +15,7 @@ use core::ops::Bound::Unbounded; use risingwave_common::catalog::TableId; +use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_storage::hummock::CachePolicy; use risingwave_storage::store::{PrefetchOptions, ReadOptions, StateStoreReadExt}; @@ -30,8 +31,8 @@ pub async fn list_kv( let hummock = context .hummock_store(HummockServiceOpts::from_env(data_dir)?) .await?; - if epoch == u64::MAX { - tracing::info!("using u64::MAX as epoch"); + if epoch == MAX_EPOCH { + tracing::info!("using MAX_EPOCH as epoch"); } let scan_result = { let range = (Unbounded, Unbounded); diff --git a/src/ctl/src/cmd_impl/hummock/list_version_deltas.rs b/src/ctl/src/cmd_impl/hummock/list_version_deltas.rs index a80ca3985e5e1..d82385568e36f 100644 --- a/src/ctl/src/cmd_impl/hummock/list_version_deltas.rs +++ b/src/ctl/src/cmd_impl/hummock/list_version_deltas.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common::util::epoch::MAX_EPOCH; + use crate::CtlContext; pub async fn list_version_deltas( @@ -21,7 +23,7 @@ pub async fn list_version_deltas( ) -> anyhow::Result<()> { let meta_client = context.meta_client().await?; let resp = meta_client - .list_version_deltas(start_id, num_epochs, u64::MAX) + .list_version_deltas(start_id, num_epochs, MAX_EPOCH) .await?; println!("{:#?}", resp.version_deltas); Ok(()) diff --git a/src/ctl/src/cmd_impl/hummock/pause_resume.rs b/src/ctl/src/cmd_impl/hummock/pause_resume.rs index d599ce2327861..30ad94df4c06e 100644 --- a/src/ctl/src/cmd_impl/hummock/pause_resume.rs +++ b/src/ctl/src/cmd_impl/hummock/pause_resume.rs @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionUpdateExt; -use risingwave_hummock_sdk::HummockEpoch; use crate::CtlContext; @@ -62,7 +62,7 @@ pub async fn replay_version(context: &CtlContext) -> anyhow::Result<()> { let mut current_delta_id = base_version.id + 1; loop { let deltas = meta_client - .list_version_deltas(current_delta_id, delta_fetch_size, HummockEpoch::MAX) + .list_version_deltas(current_delta_id, delta_fetch_size, MAX_EPOCH) .await .unwrap(); if deltas.version_deltas.is_empty() { diff --git a/src/ctl/src/cmd_impl/hummock/sst_dump.rs b/src/ctl/src/cmd_impl/hummock/sst_dump.rs index 4f957cd508862..7ed529ec02834 100644 --- a/src/ctl/src/cmd_impl/hummock/sst_dump.rs +++ b/src/ctl/src/cmd_impl/hummock/sst_dump.rs @@ -320,13 +320,17 @@ fn print_kv_pairs( let full_val = block_iter.value(); let humm_val = HummockValue::from_slice(full_val)?; - let epoch = Epoch::from(full_key.epoch); + let epoch = Epoch::from(full_key.epoch_with_gap.pure_epoch()); let date_time = DateTime::::from(epoch.as_system_time()); println!("\t\t key: {:?}, len={}", full_key, full_key.encoded_len()); println!("\t\t value: {:?}, len={}", humm_val, humm_val.encoded_len()); - println!("\t\t epoch: {} ({})", epoch, date_time); - + println!( + "\t\t epoch: {} offset = {} ({})", + epoch, + full_key.epoch_with_gap.offset(), + date_time + ); if args.print_table { print_table_column(full_key, humm_val, table_data)?; } diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index cedcf4922e404..fd579a1ea7522 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -19,6 +19,7 @@ use anyhow::Result; use clap::{Parser, Subcommand}; use cmd_impl::bench::BenchCommands; use cmd_impl::hummock::SstDumpArgs; +use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_meta::backup_restore::RestoreOpts; use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability; @@ -156,7 +157,7 @@ enum HummockCommands { DisableCommitEpoch, /// list all Hummock key-value pairs ListKv { - #[clap(short, long = "epoch", default_value_t = u64::MAX)] + #[clap(short, long = "epoch", default_value_t = MAX_EPOCH)] epoch: u64, #[clap(short, long = "table-id")] diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index d37c5dec127f1..00c1e2c04f257 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use risingwave_common::system_param::reader::SystemParamsReader; +use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_pb::backup_service::MetaSnapshotMetadata; use risingwave_pb::catalog::Table; use risingwave_pb::ddl_service::DdlProgress; @@ -219,7 +220,7 @@ impl FrontendMetaClient for FrontendMetaClientImpl { async fn list_version_deltas(&self) -> Result> { // FIXME #8612: there can be lots of version deltas, so better to fetch them by pages and refactor `SysRowSeqScanExecutor` to yield multiple chunks. self.0 - .list_version_deltas(0, u32::MAX, u64::MAX) + .list_version_deltas(0, u32::MAX, MAX_EPOCH) .await .map(|v| v.version_deltas) } diff --git a/src/jni_core/Cargo.toml b/src/jni_core/Cargo.toml index 6d6b1684096a7..c1bdde44b43d9 100644 --- a/src/jni_core/Cargo.toml +++ b/src/jni_core/Cargo.toml @@ -40,6 +40,7 @@ tracing = "0.1" [dev-dependencies] expect-test = "1" risingwave_expr = { workspace = true } +risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] } [lints] workspace = true diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index f37c909546594..e0b58defa9b57 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -92,6 +92,7 @@ workspace-hack = { path = "../workspace-hack" } assert_matches = "1" maplit = "1.0.2" rand = "0.8" +risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] } risingwave_test_runner = { workspace = true } [features] diff --git a/src/meta/src/hummock/compaction/mod.rs b/src/meta/src/hummock/compaction/mod.rs index aa4d16a4e1dcf..dfcc3076c7793 100644 --- a/src/meta/src/hummock/compaction/mod.rs +++ b/src/meta/src/hummock/compaction/mod.rs @@ -17,6 +17,7 @@ pub mod compaction_config; mod overlap_strategy; use risingwave_common::catalog::TableOption; +use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::prost_key_range::KeyRangeExt; use risingwave_pb::hummock::compact_task::{self, TaskStatus, TaskType}; @@ -28,9 +29,7 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; use picker::{LevelCompactionPicker, TierCompactionPicker}; -use risingwave_hummock_sdk::{ - can_concat, CompactionGroupId, HummockCompactionTaskId, HummockEpoch, -}; +use risingwave_hummock_sdk::{can_concat, CompactionGroupId, HummockCompactionTaskId}; use risingwave_pb::hummock::compaction_config::CompactionMode; use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::{CompactTask, CompactionConfig, KeyRange, LevelType}; @@ -131,7 +130,7 @@ impl CompactStatus { let compact_task = CompactTask { input_ssts: ret.input.input_levels, splits: vec![KeyRange::inf()], - watermark: HummockEpoch::MAX, + watermark: MAX_EPOCH, sorted_output_ssts: vec![], task_id, target_level: target_level_id as u32, diff --git a/src/meta/src/hummock/metrics_utils.rs b/src/meta/src/hummock/metrics_utils.rs index 6818b7f68570e..8b8598e10b93b 100644 --- a/src/meta/src/hummock/metrics_utils.rs +++ b/src/meta/src/hummock/metrics_utils.rs @@ -19,11 +19,12 @@ use std::time::{SystemTime, UNIX_EPOCH}; use itertools::{enumerate, Itertools}; use prost::Message; +use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ object_size_map, BranchedSstInfo, HummockVersionExt, }; use risingwave_hummock_sdk::{ - CompactionGroupId, HummockContextId, HummockEpoch, HummockSstableObjectId, HummockVersionId, + CompactionGroupId, HummockContextId, HummockSstableObjectId, HummockVersionId, }; use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::write_limits::WriteLimit; @@ -344,7 +345,7 @@ pub fn trigger_pin_unpin_snapshot_state( { metrics.min_pinned_epoch.set(m as i64); } else { - metrics.min_pinned_epoch.set(HummockEpoch::MAX as _); + metrics.min_pinned_epoch.set(MAX_EPOCH as _); } } diff --git a/src/rpc_client/Cargo.toml b/src/rpc_client/Cargo.toml index f340837bf5d65..b23d7a9405bbd 100644 --- a/src/rpc_client/Cargo.toml +++ b/src/rpc_client/Cargo.toml @@ -44,6 +44,9 @@ tower = "0.4" tracing = "0.1" url = "2.4.1" +[dev-dependencies] +risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] } + [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../workspace-hack" } diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 9b06310bde62c..9e6b8486fa30d 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -88,6 +88,7 @@ workspace-hack = { path = "../workspace-hack" } [dev-dependencies] criterion = { workspace = true, features = ["async_futures"] } moka = { version = "0.12", features = ["future"] } +risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] } risingwave_test_runner = { workspace = true } uuid = { version = "1", features = ["v4"] } diff --git a/src/storage/backup/Cargo.toml b/src/storage/backup/Cargo.toml index 61e98f584eb91..9e98e6c9076a8 100644 --- a/src/storage/backup/Cargo.toml +++ b/src/storage/backup/Cargo.toml @@ -31,5 +31,8 @@ serde_json = "1" thiserror = "1" twox-hash = "1" +[dev-dependencies] +risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] } + [lints] workspace = true diff --git a/src/storage/hummock_sdk/Cargo.toml b/src/storage/hummock_sdk/Cargo.toml index a4773c0cd0e74..4e8e47a019c2b 100644 --- a/src/storage/hummock_sdk/Cargo.toml +++ b/src/storage/hummock_sdk/Cargo.toml @@ -27,3 +27,7 @@ workspace-hack = { path = "../../workspace-hack" } [lints] workspace = true + +[features] +enable_test_epoch = [] + diff --git a/src/storage/hummock_sdk/src/key.rs b/src/storage/hummock_sdk/src/key.rs index d635bb4518a4d..bbaa972cadf6d 100644 --- a/src/storage/hummock_sdk/src/key.rs +++ b/src/storage/hummock_sdk/src/key.rs @@ -23,7 +23,7 @@ use risingwave_common::catalog::TableId; use risingwave_common::estimate_size::EstimateSize; use risingwave_common::hash::VirtualNode; -use crate::HummockEpoch; +use crate::{EpochWithGap, HummockEpoch}; pub const EPOCH_LEN: usize = std::mem::size_of::(); pub const TABLE_PREFIX_LEN: usize = std::mem::size_of::(); @@ -575,12 +575,18 @@ impl UserKey> { #[derive(Clone, Copy, PartialEq, Eq, Hash, Default)] pub struct FullKey> { pub user_key: UserKey, - pub epoch: HummockEpoch, + pub epoch_with_gap: EpochWithGap, } impl> Debug for FullKey { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "FullKey {{ {:?}, {} }}", self.user_key, self.epoch) + write!( + f, + "FullKey {{ {:?}, epoch: {}, epoch_with_gap: {}}}", + self.user_key, + self.epoch_with_gap.as_u64(), + self.epoch_with_gap.pure_epoch() + ) } } @@ -588,26 +594,40 @@ impl> FullKey { pub fn new(table_id: TableId, table_key: TableKey, epoch: HummockEpoch) -> Self { Self { user_key: UserKey::new(table_id, table_key), - epoch, + epoch_with_gap: EpochWithGap::new(epoch, 0), + } + } + + pub fn new_with_gap_epoch( + table_id: TableId, + table_key: TableKey, + epoch_with_gap: EpochWithGap, + ) -> Self { + Self { + user_key: UserKey::new(table_id, table_key), + epoch_with_gap, } } pub fn from_user_key(user_key: UserKey, epoch: HummockEpoch) -> Self { - Self { user_key, epoch } + Self { + user_key, + epoch_with_gap: EpochWithGap::new_from_epoch(epoch), + } } /// Pass the inner type of `table_key` to make the code less verbose. pub fn for_test(table_id: TableId, table_key: T, epoch: HummockEpoch) -> Self { Self { user_key: UserKey::for_test(table_id, table_key), - epoch, + epoch_with_gap: EpochWithGap::new(epoch, 0), } } /// Encode in to a buffer. pub fn encode_into(&self, buf: &mut impl BufMut) { self.user_key.encode_into(buf); - buf.put_u64(self.epoch); + buf.put_u64(self.epoch_with_gap.as_u64()); } pub fn encode(&self) -> Vec { @@ -621,7 +641,7 @@ impl> FullKey { // Encode in to a buffer. pub fn encode_into_without_table_id(&self, buf: &mut impl BufMut) { self.user_key.encode_table_key_into(buf); - buf.put_u64(self.epoch); + buf.put_u64(self.epoch_with_gap.as_u64()); } pub fn encode_reverse_epoch(&self) -> Vec { @@ -629,7 +649,7 @@ impl> FullKey { TABLE_PREFIX_LEN + self.user_key.table_key.as_ref().len() + EPOCH_LEN, ); self.user_key.encode_into(&mut buf); - buf.put_u64(u64::MAX - self.epoch); + buf.put_u64(u64::MAX - self.epoch_with_gap.as_u64()); buf } @@ -651,7 +671,7 @@ impl<'a> FullKey<&'a [u8]> { Self { user_key: UserKey::decode(&slice[..epoch_pos]), - epoch, + epoch_with_gap: EpochWithGap::from_u64(epoch), } } @@ -665,7 +685,7 @@ impl<'a> FullKey<&'a [u8]> { Self { user_key: UserKey::new(table_id, TableKey(&slice_without_table_id[..epoch_pos])), - epoch, + epoch_with_gap: EpochWithGap::from_u64(epoch), } } @@ -676,7 +696,7 @@ impl<'a> FullKey<&'a [u8]> { Self { user_key: UserKey::decode(&slice[..epoch_pos]), - epoch: u64::MAX - epoch, + epoch_with_gap: EpochWithGap::from_u64(u64::MAX - epoch), } } @@ -687,7 +707,7 @@ impl<'a> FullKey<&'a [u8]> { pub fn copy_into>(self) -> FullKey { FullKey { user_key: self.user_key.copy_into(), - epoch: self.epoch, + epoch_with_gap: self.epoch_with_gap, } } } @@ -697,7 +717,7 @@ impl FullKey> { /// `Bytes` pub fn into_bytes(self) -> FullKey { FullKey { - epoch: self.epoch, + epoch_with_gap: self.epoch_with_gap, user_key: self.user_key.into_bytes(), } } @@ -707,7 +727,7 @@ impl> FullKey { pub fn to_ref(&self) -> FullKey<&[u8]> { FullKey { user_key: self.user_key.as_ref(), - epoch: self.epoch, + epoch_with_gap: self.epoch_with_gap, } } } @@ -717,7 +737,7 @@ impl FullKey> { /// table key without reallocating a new `FullKey` object. pub fn set(&mut self, other: FullKey<&[u8]>) { self.user_key.set(other.user_key); - self.epoch = other.epoch; + self.epoch_with_gap = other.epoch_with_gap; } } @@ -726,7 +746,7 @@ impl + Ord + Eq> Ord for FullKey { // When `user_key` is the same, greater epoch comes first. self.user_key .cmp(&other.user_key) - .then_with(|| other.epoch.cmp(&self.epoch)) + .then_with(|| other.epoch_with_gap.cmp(&self.epoch_with_gap)) } } diff --git a/src/storage/hummock_sdk/src/lib.rs b/src/storage/hummock_sdk/src/lib.rs index 0fc6735571e4d..c7d643f972b3c 100644 --- a/src/storage/hummock_sdk/src/lib.rs +++ b/src/storage/hummock_sdk/src/lib.rs @@ -26,6 +26,7 @@ mod key_cmp; use std::cmp::Ordering; pub use key_cmp::*; +use risingwave_common::util::epoch::{EPOCH_MASK, MAX_EPOCH}; use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch}; use risingwave_pb::hummock::SstableInfo; @@ -266,3 +267,65 @@ pub fn version_checkpoint_path(root_dir: &str) -> String { pub fn version_checkpoint_dir(checkpoint_path: &str) -> String { checkpoint_path.trim_end_matches(|c| c != '/').to_string() } + +/// Represents an epoch with a gap. +/// +/// When a spill of the mem table occurs between two epochs, `EpochWithGap` generates an offset. +/// This offset is encoded when performing full key encoding. When returning to the upper-level +/// interface, a pure epoch with the lower 16 bits set to 0 should be returned. +#[derive(Clone, Copy, PartialEq, Eq, Hash, Default, Debug, PartialOrd, Ord)] +pub struct EpochWithGap(u64); + +impl EpochWithGap { + #[allow(unused_variables)] + pub fn new(epoch: u64, spill_offset: u16) -> Self { + #[cfg(not(feature = "enable_test_epoch"))] + { + debug_assert_eq!(epoch & EPOCH_MASK, 0); + let epoch_with_gap = epoch + spill_offset as u64; + EpochWithGap(epoch_with_gap) + } + #[cfg(feature = "enable_test_epoch")] + { + EpochWithGap(epoch) + } + } + + pub fn new_from_epoch(epoch: u64) -> Self { + EpochWithGap::new(epoch, 0) + } + + pub fn new_min_epoch() -> Self { + EpochWithGap(0) + } + + pub fn new_max_epoch() -> Self { + EpochWithGap(MAX_EPOCH) + } + + // return the epoch_with_gap(epoch + spill_offset) + pub(crate) fn as_u64(&self) -> HummockEpoch { + self.0 + } + + // return the epoch_with_gap(epoch + spill_offset) + pub(crate) fn from_u64(epoch_with_gap: u64) -> Self { + EpochWithGap(epoch_with_gap) + } + + // return the pure epoch without spill offset + pub fn pure_epoch(&self) -> HummockEpoch { + #[cfg(not(feature = "enable_test_epoch"))] + { + self.0 & !EPOCH_MASK + } + #[cfg(feature = "enable_test_epoch")] + { + self.0 + } + } + + pub fn offset(&self) -> u64 { + self.0 & EPOCH_MASK + } +} diff --git a/src/storage/hummock_test/Cargo.toml b/src/storage/hummock_test/Cargo.toml index 8abf2f45e6855..c3482f142d46b 100644 --- a/src/storage/hummock_test/Cargo.toml +++ b/src/storage/hummock_test/Cargo.toml @@ -48,6 +48,7 @@ futures = { version = "0.3", default-features = false, features = [ ] } futures-async-stream = "0.2.9" +risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] } risingwave_test_runner = { workspace = true } serial_test = "2.0" sync-point = { path = "../../utils/sync-point" } diff --git a/src/storage/hummock_test/benches/bench_hummock_iter.rs b/src/storage/hummock_test/benches/bench_hummock_iter.rs index 1c6294fc672be..abac4d9b57c06 100644 --- a/src/storage/hummock_test/benches/bench_hummock_iter.rs +++ b/src/storage/hummock_test/benches/bench_hummock_iter.rs @@ -19,6 +19,7 @@ use bytes::Bytes; use criterion::{criterion_group, criterion_main, Criterion}; use futures::{pin_mut, TryStreamExt}; use risingwave_common::cache::CachePriority; +use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::key::TableKey; use risingwave_hummock_test::get_notification_client_for_test; use risingwave_hummock_test::local_state_store_test_utils::LocalStateStoreTestExt; @@ -98,7 +99,7 @@ fn criterion_benchmark(c: &mut Criterion) { )) .unwrap(); } - hummock_storage.seal_current_epoch(u64::MAX); + hummock_storage.seal_current_epoch(MAX_EPOCH); c.bench_function("bench-hummock-iter", move |b| { b.iter(|| { diff --git a/src/storage/hummock_test/src/hummock_read_version_tests.rs b/src/storage/hummock_test/src/hummock_read_version_tests.rs index c041c07b44218..89ee5bb826f3f 100644 --- a/src/storage/hummock_test/src/hummock_read_version_tests.rs +++ b/src/storage/hummock_test/src/hummock_read_version_tests.rs @@ -54,6 +54,7 @@ async fn test_read_version_basic() { let size = SharedBufferBatch::measure_batch_size(&sorted_items); let imm = SharedBufferBatch::build_shared_buffer_batch( epoch, + 0, sorted_items, size, vec![], @@ -92,6 +93,7 @@ async fn test_read_version_basic() { let size = SharedBufferBatch::measure_batch_size(&sorted_items); let imm = SharedBufferBatch::build_shared_buffer_batch( epoch, + 0, sorted_items, size, vec![], @@ -275,6 +277,7 @@ async fn test_read_filter_basic() { let size = SharedBufferBatch::measure_batch_size(&sorted_items); let imm = SharedBufferBatch::build_shared_buffer_batch( epoch, + 0, sorted_items, size, vec![], diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 3b9752c6c6fe4..6f38c51f89c69 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -1446,13 +1446,13 @@ async fn test_replicated_local_hummock_storage() { [ Ok( ( - FullKey { UserKey { 233, TableKey { 000061616161 } }, 1 }, + FullKey { UserKey { 233, TableKey { 000061616161 } }, epoch: 1, epoch_with_gap: 1}, b"1111", ), ), Ok( ( - FullKey { UserKey { 233, TableKey { 000062626262 } }, 1 }, + FullKey { UserKey { 233, TableKey { 000062626262 } }, epoch: 1, epoch_with_gap: 1}, b"2222", ), ), @@ -1513,13 +1513,13 @@ async fn test_replicated_local_hummock_storage() { [ Ok( ( - FullKey { UserKey { 233, TableKey { 000063636363 } }, 2 }, + FullKey { UserKey { 233, TableKey { 000063636363 } }, epoch: 2, epoch_with_gap: 2}, b"3333", ), ), Ok( ( - FullKey { UserKey { 233, TableKey { 000064646464 } }, 2 }, + FullKey { UserKey { 233, TableKey { 000064646464 } }, epoch: 2, epoch_with_gap: 2}, b"4444", ), ), diff --git a/src/storage/hummock_trace/Cargo.toml b/src/storage/hummock_trace/Cargo.toml index 150b35b79cda0..357bc121c0296 100644 --- a/src/storage/hummock_trace/Cargo.toml +++ b/src/storage/hummock_trace/Cargo.toml @@ -27,6 +27,7 @@ tracing = "0.1" [dev-dependencies] itertools = "0.10.5" mockall = "0.11.4" +risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] } [lints] workspace = true diff --git a/src/storage/hummock_trace/src/collector.rs b/src/storage/hummock_trace/src/collector.rs index c6bd3a7093f80..aaa18205a90e6 100644 --- a/src/storage/hummock_trace/src/collector.rs +++ b/src/storage/hummock_trace/src/collector.rs @@ -316,6 +316,10 @@ impl TraceSpan { Self::new_global_op(Operation::Flush(delete_range), storage_type) } + pub fn new_try_flush_span(storage_type: StorageType) -> MayTraceSpan { + Self::new_global_op(Operation::TryFlush, storage_type) + } + pub fn new_meta_message_span(resp: SubscribeResponse) -> MayTraceSpan { Self::new_global_op( Operation::MetaMessage(Box::new(TracedSubResp::from(resp))), diff --git a/src/storage/hummock_trace/src/record.rs b/src/storage/hummock_trace/src/record.rs index 5d209635e967e..ace14b0c0412c 100644 --- a/src/storage/hummock_trace/src/record.rs +++ b/src/storage/hummock_trace/src/record.rs @@ -182,6 +182,8 @@ pub enum Operation { LocalStorageIsDirty, + TryFlush, + Flush(Vec<(Bound, Bound)>), /// Finish operation of Hummock. Finish, @@ -287,6 +289,7 @@ pub enum OperationResult { Get(TraceResult>), Insert(TraceResult<()>), Delete(TraceResult<()>), + TryFlush(TraceResult<()>), Flush(TraceResult), Iter(TraceResult<()>), IterNext(TraceResult>), diff --git a/src/storage/hummock_trace/src/replay/worker.rs b/src/storage/hummock_trace/src/replay/worker.rs index e5c2b168d6e0a..a4d99707d3516 100644 --- a/src/storage/hummock_trace/src/replay/worker.rs +++ b/src/storage/hummock_trace/src/replay/worker.rs @@ -410,6 +410,23 @@ impl ReplayWorker { panic!("wrong flush result, expect flush result, but got {:?}", res); } } + Operation::TryFlush => { + assert_ne!(storage_type, StorageType::Global); + let local_storage = local_storages.get_mut(&storage_type).unwrap(); + let res = res_rx.recv().await.expect("recv result failed"); + let delete_range = vec![]; + if let OperationResult::TryFlush(_) = res { + let _ = local_storage.flush(delete_range).await; + // todo(wcy-fdu): unify try_flush and flush interface, do not return usize. + // assert_eq!(TraceResult::from(actual), expected, "try flush wrong"); + } else { + panic!( + "wrong try flush result, expect flush result, but got {:?}", + res + ); + } + } + Operation::Finish => unreachable!(), Operation::Result(_) => unreachable!(), } diff --git a/src/storage/src/hummock/compactor/compaction_filter.rs b/src/storage/src/hummock/compactor/compaction_filter.rs index b388d62c60564..32e6b7e3909a4 100644 --- a/src/storage/src/hummock/compactor/compaction_filter.rs +++ b/src/storage/src/hummock/compactor/compaction_filter.rs @@ -71,7 +71,7 @@ impl CompactionFilter for TtlCompactionFilter { fn should_delete(&mut self, key: FullKey<&[u8]>) -> bool { pub use risingwave_common::util::epoch::Epoch; let table_id = key.user_key.table_id.table_id(); - let epoch = key.epoch; + let epoch = key.epoch_with_gap.pure_epoch(); if let Some((last_table_id, ttl_mill)) = self.last_table_and_ttl.as_ref() { if *last_table_id == table_id { let min_epoch = Epoch(self.expire_epoch).subtract_ms(*ttl_mill); diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index 3c1332d09317c..d0cd96a831d80 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -23,7 +23,7 @@ use risingwave_hummock_sdk::key::FullKey; use risingwave_hummock_sdk::key_range::KeyRange; use risingwave_hummock_sdk::prost_key_range::KeyRangeExt; use risingwave_hummock_sdk::table_stats::TableStatsMap; -use risingwave_hummock_sdk::{HummockEpoch, KeyComparator}; +use risingwave_hummock_sdk::{EpochWithGap, KeyComparator}; use risingwave_pb::hummock::{compact_task, CompactTask, KeyRange as KeyRange_vec, SstableInfo}; use tokio::time::Instant; @@ -183,14 +183,14 @@ fn generate_splits_fast( indexes.push( FullKey { user_key: FullKey::decode(&key_range.left).user_key, - epoch: HummockEpoch::MAX, + epoch_with_gap: EpochWithGap::new_max_epoch(), } .encode(), ); indexes.push( FullKey { user_key: FullKey::decode(&key_range.right).user_key, - epoch: HummockEpoch::MAX, + epoch_with_gap: EpochWithGap::new_max_epoch(), } .encode(), ); @@ -241,7 +241,7 @@ pub async fn generate_splits( let data_size = block.len; let full_key = FullKey { user_key: FullKey::decode(&block.smallest_key).user_key, - epoch: HummockEpoch::MAX, + epoch_with_gap: EpochWithGap::new_max_epoch(), } .encode(); (data_size as u64, full_key) diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index e46965b16e5fc..67b18ce392429 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -19,13 +19,14 @@ use await_tree::InstrumentAwait; use bytes::Bytes; use futures::{stream, FutureExt, StreamExt}; use itertools::Itertools; +use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::compact::{ compact_task_to_string, estimate_memory_for_compact_task, statistics_compact_task, }; use risingwave_hummock_sdk::key::{FullKey, PointRange}; use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon}; use risingwave_hummock_sdk::table_stats::{add_table_stats_map, TableStats, TableStatsMap}; -use risingwave_hummock_sdk::{can_concat, HummockEpoch}; +use risingwave_hummock_sdk::{can_concat, EpochWithGap}; use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType}; use risingwave_pb::hummock::{BloomFilterType, CompactTask, LevelType, SstableInfo}; use tokio::sync::oneshot::Receiver; @@ -158,7 +159,7 @@ impl CompactorRunner { tombstone.event_key.left_user_key.as_ref(), tombstone.new_epoch, )) { - tombstone.new_epoch = HummockEpoch::MAX; + tombstone.new_epoch = MAX_EPOCH; } }); builder.add_delete_events(range_tombstone_list); @@ -665,7 +666,7 @@ where del_iter.seek(full_key.user_key); if !task_config.gc_delete_keys && del_iter.is_valid() - && del_iter.earliest_epoch() != HummockEpoch::MAX + && del_iter.earliest_epoch() != MAX_EPOCH { sst_builder .add_monotonic_delete(MonotonicDeleteEvent { @@ -688,7 +689,7 @@ where let mut last_key = FullKey::default(); let mut watermark_can_see_last_key = false; - let mut user_key_last_delete_epoch = HummockEpoch::MAX; + let mut user_key_last_delete_epoch = MAX_EPOCH; let mut local_stats = StoreLocalStatistic::default(); // Keep table stats changes due to dropping KV. @@ -715,7 +716,8 @@ where last_key.is_empty() || iter_key.user_key != last_key.user_key.as_ref(); let mut drop = false; - let epoch = iter_key.epoch; + + let epoch = iter_key.epoch_with_gap.pure_epoch(); let value = iter.value(); if is_new_user_key { if !max_key.is_empty() && iter_key >= max_key { @@ -723,7 +725,7 @@ where } last_key.set(iter_key); watermark_can_see_last_key = false; - user_key_last_delete_epoch = HummockEpoch::MAX; + user_key_last_delete_epoch = MAX_EPOCH; if value.is_delete() { local_stats.skip_delete_key_count += 1; } @@ -808,7 +810,7 @@ where user_key_last_delete_epoch = epoch; } else if earliest_range_delete_which_can_see_iter_key < user_key_last_delete_epoch { debug_assert!( - iter_key.epoch < earliest_range_delete_which_can_see_iter_key + iter_key.epoch_with_gap.pure_epoch() < earliest_range_delete_which_can_see_iter_key && earliest_range_delete_which_can_see_iter_key < user_key_last_delete_epoch ); user_key_last_delete_epoch = earliest_range_delete_which_can_see_iter_key; @@ -818,7 +820,8 @@ where // information about whether a key is deleted by a delete range in // the same SST. Therefore we need to construct a corresponding // delete key to represent this. - iter_key.epoch = earliest_range_delete_which_can_see_iter_key; + iter_key.epoch_with_gap = + EpochWithGap::new_from_epoch(earliest_range_delete_which_can_see_iter_key); sst_builder .add_full_key(iter_key, HummockValue::Delete, is_new_user_key) .verbose_instrument_await("add_full_key_delete") @@ -826,7 +829,7 @@ where last_table_stats.total_key_count += 1; last_table_stats.total_key_size += iter_key.encoded_len() as i64; last_table_stats.total_value_size += 1; - iter_key.epoch = epoch; + iter_key.epoch_with_gap = EpochWithGap::new_from_epoch(epoch); is_new_user_key = false; } @@ -848,7 +851,7 @@ where sst_builder .add_monotonic_delete(MonotonicDeleteEvent { event_key: extended_largest_user_key, - new_epoch: HummockEpoch::MAX, + new_epoch: MAX_EPOCH, }) .await?; break; diff --git a/src/storage/src/hummock/compactor/fast_compactor_runner.rs b/src/storage/src/hummock/compactor/fast_compactor_runner.rs index c3184fc3e5f76..feea5cb1124ea 100644 --- a/src/storage/src/hummock/compactor/fast_compactor_runner.rs +++ b/src/storage/src/hummock/compactor/fast_compactor_runner.rs @@ -25,7 +25,7 @@ use itertools::Itertools; use risingwave_hummock_sdk::key::FullKey; use risingwave_hummock_sdk::key_range::KeyRange; use risingwave_hummock_sdk::table_stats::TableStats; -use risingwave_hummock_sdk::{can_concat, HummockEpoch, LocalSstableInfo}; +use risingwave_hummock_sdk::{can_concat, EpochWithGap, LocalSstableInfo}; use risingwave_pb::hummock::{CompactTask, SstableInfo}; use crate::filter_key_extractor::FilterKeyExtractorImpl; @@ -132,7 +132,7 @@ impl BlockStreamIterator { .as_ref(), ); // do not include this key because it is the smallest key of next block. - largest_key.epoch = HummockEpoch::MAX; + largest_key.epoch_with_gap = EpochWithGap::new_max_epoch(); largest_key.encode() } else { self.sstable.value().meta.largest_key.clone() @@ -584,7 +584,7 @@ impl CompactTaskExecutor { self.may_report_process_key(1); let mut drop = false; - let epoch = iter.key().epoch; + let epoch = iter.key().epoch_with_gap.pure_epoch(); let value = HummockValue::from_slice(iter.value()).unwrap(); if is_new_user_key || self.last_key.is_empty() { self.last_key.set(iter.key()); diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index 428361237c0ac..0aae5dbbd9310 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -23,10 +23,11 @@ use itertools::Itertools; use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; +use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey}; use risingwave_hummock_sdk::key_range::KeyRange; -use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, LocalSstableInfo}; +use risingwave_hummock_sdk::{CompactionGroupId, EpochWithGap, LocalSstableInfo}; use risingwave_pb::hummock::compact_task; use tracing::error; @@ -194,7 +195,7 @@ async fn compact_shared_buffer( key_split_append( &FullKey { user_key, - epoch: HummockEpoch::MAX, + epoch_with_gap: EpochWithGap::new_max_epoch(), } .encode() .into(), @@ -379,31 +380,31 @@ pub async fn merge_imms_in_memory( .unwrap_or_default(); del_iter.earliest_delete_which_can_see_key( UserKey::new(table_id, TableKey(pivot.as_ref())), - HummockEpoch::MAX, + MAX_EPOCH, ); - let mut versions: Vec<(HummockEpoch, HummockValue)> = Vec::new(); + let mut versions: Vec<(EpochWithGap, HummockValue)> = Vec::new(); - let mut pivot_last_delete_epoch = HummockEpoch::MAX; + let mut pivot_last_delete_epoch = MAX_EPOCH; for ((key, value), epoch) in items { assert!(key >= pivot, "key should be in ascending order"); let earliest_range_delete_which_can_see_key = if key == pivot { - del_iter.earliest_delete_since(epoch) + del_iter.earliest_delete_since(epoch.pure_epoch()) } else { merged_payload.push((pivot, versions)); pivot = key; - pivot_last_delete_epoch = HummockEpoch::MAX; + pivot_last_delete_epoch = MAX_EPOCH; versions = vec![]; del_iter.earliest_delete_which_can_see_key( UserKey::new(table_id, TableKey(pivot.as_ref())), - epoch, + epoch.pure_epoch(), ) }; if value.is_delete() { - pivot_last_delete_epoch = epoch; + pivot_last_delete_epoch = epoch.pure_epoch(); } else if earliest_range_delete_which_can_see_key < pivot_last_delete_epoch { debug_assert!( - epoch < earliest_range_delete_which_can_see_key + epoch.pure_epoch() < earliest_range_delete_which_can_see_key && earliest_range_delete_which_can_see_key < pivot_last_delete_epoch ); pivot_last_delete_epoch = earliest_range_delete_which_can_see_key; @@ -413,7 +414,7 @@ pub async fn merge_imms_in_memory( // a delete range in the merged imm which it belongs to. Therefore we need // to construct a corresponding delete key to represent this. versions.push(( - earliest_range_delete_which_can_see_key, + EpochWithGap::new_from_epoch(earliest_range_delete_which_can_see_key), HummockValue::Delete, )); } diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index c55b73e6af6b0..1e0dde692bed0 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -21,6 +21,7 @@ use arc_swap::ArcSwap; use await_tree::InstrumentAwait; use parking_lot::RwLock; use prometheus::core::{AtomicU64, GenericGauge}; +use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionUpdateExt; use risingwave_hummock_sdk::{info_in_release, HummockEpoch, LocalSstableInfo}; use risingwave_pb::hummock::version_update_payload::Payload; @@ -395,7 +396,7 @@ impl HummockEventHandler { } self.sstable_object_id_manager - .remove_watermark_object_id(TrackerId::Epoch(HummockEpoch::MAX)); + .remove_watermark_object_id(TrackerId::Epoch(MAX_EPOCH)); // Notify completion of the Clear event. let _ = notifier.send(()).inspect_err(|e| { diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index 3b9e9dc587fe1..b3a96f109d233 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -1091,6 +1091,7 @@ mod tests { }; SharedBufferBatch::build_shared_buffer_batch( epoch, + 0, sorted_items, size, vec![], diff --git a/src/storage/src/hummock/iterator/backward_user.rs b/src/storage/src/hummock/iterator/backward_user.rs index 3b71ce4a113c2..f99cf8bc06c96 100644 --- a/src/storage/src/hummock/iterator/backward_user.rs +++ b/src/storage/src/hummock/iterator/backward_user.rs @@ -16,7 +16,7 @@ use std::ops::Bound::*; use bytes::Bytes; use risingwave_hummock_sdk::key::{FullKey, UserKey, UserKeyRange}; -use risingwave_hummock_sdk::HummockEpoch; +use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch}; use crate::hummock::iterator::{Backward, HummockIterator}; use crate::hummock::local_version::pinned_version::PinnedVersion; @@ -136,7 +136,7 @@ impl> BackwardUserIterator { while self.iterator.is_valid() { let full_key = self.iterator.key(); - let epoch = full_key.epoch; + let epoch = full_key.epoch_with_gap.pure_epoch(); let key = &full_key.user_key; if epoch > self.min_epoch && epoch <= self.read_epoch { @@ -216,7 +216,7 @@ impl> BackwardUserIterator { Included(end_key) => { let full_key = FullKey { user_key: end_key.clone(), - epoch: 0, + epoch_with_gap: EpochWithGap::new_min_epoch(), }; self.iterator.seek(full_key.to_ref()).await?; } @@ -245,7 +245,10 @@ impl> BackwardUserIterator { Excluded(_) => unimplemented!("excluded begin key is not supported"), Unbounded => user_key, }; - let full_key = FullKey { user_key, epoch: 0 }; + let full_key = FullKey { + user_key, + epoch_with_gap: EpochWithGap::new_min_epoch(), + }; self.iterator.seek(full_key).await?; // Handle multi-version @@ -274,7 +277,13 @@ impl> BackwardUserIterator { impl> BackwardUserIterator { /// Creates [`BackwardUserIterator`] with maximum epoch. pub(crate) fn for_test(iterator: I, key_range: UserKeyRange) -> Self { - Self::with_epoch(iterator, key_range, HummockEpoch::MAX, 0, None) + Self::with_epoch( + iterator, + key_range, + risingwave_common::util::epoch::MAX_EPOCH, + 0, + None, + ) } /// Creates [`BackwardUserIterator`] with maximum epoch. @@ -283,7 +292,13 @@ impl> BackwardUserIterator { key_range: UserKeyRange, min_epoch: HummockEpoch, ) -> Self { - Self::with_epoch(iterator, key_range, HummockEpoch::MAX, min_epoch, None) + Self::with_epoch( + iterator, + key_range, + risingwave_common::util::epoch::MAX_EPOCH, + min_epoch, + None, + ) } } @@ -299,6 +314,7 @@ mod tests { use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::key::prev_key; + use risingwave_hummock_sdk::EpochWithGap; use super::*; use crate::hummock::iterator::test_utils::{ @@ -1020,7 +1036,7 @@ mod tests { inserts.iter().map(|(time, value)| { let full_key = FullKey { user_key: key.clone(), - epoch: time.0, + epoch_with_gap: EpochWithGap::new_from_epoch(time.0), }; (full_key, value.clone()) }) @@ -1190,7 +1206,7 @@ mod tests { let mut i = 0; while ui.is_valid() { let key = ui.key(); - let key_epoch = key.epoch; + let key_epoch = key.epoch_with_gap.pure_epoch(); assert!(key_epoch > min_epoch); i += 1; diff --git a/src/storage/src/hummock/iterator/concat_delete_range_iterator.rs b/src/storage/src/hummock/iterator/concat_delete_range_iterator.rs index 7b661ccdfd98d..82921da511909 100644 --- a/src/storage/src/hummock/iterator/concat_delete_range_iterator.rs +++ b/src/storage/src/hummock/iterator/concat_delete_range_iterator.rs @@ -182,6 +182,7 @@ impl DeleteRangeIterator for ConcatDeleteRangeIterator { #[cfg(test)] mod tests { use risingwave_common::catalog::TableId; + use risingwave_common::util::epoch::MAX_EPOCH; use super::*; use crate::hummock::iterator::test_utils::mock_sstable_store; @@ -246,7 +247,7 @@ mod tests { sstable_store, ); concat_iterator.rewind().await.unwrap(); - assert_eq!(concat_iterator.current_epoch(), HummockEpoch::MAX); + assert_eq!(concat_iterator.current_epoch(), MAX_EPOCH); assert_eq!( concat_iterator.next_extended_user_key().left_user_key, test_user_key(b"aaaa").as_ref() diff --git a/src/storage/src/hummock/iterator/delete_range_iterator.rs b/src/storage/src/hummock/iterator/delete_range_iterator.rs index 4d943ce63d3fc..7f72a025943c1 100644 --- a/src/storage/src/hummock/iterator/delete_range_iterator.rs +++ b/src/storage/src/hummock/iterator/delete_range_iterator.rs @@ -15,6 +15,7 @@ use std::collections::{BTreeSet, BinaryHeap}; use std::future::Future; +use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::key::{PointRange, UserKey}; use risingwave_hummock_sdk::HummockEpoch; use risingwave_pb::hummock::SstableInfo; @@ -53,7 +54,7 @@ pub trait DeleteRangeIterator { /// Retrieves the epoch of the current range delete. /// It returns the epoch between the previous `next_user_key` (inclusive) and the current /// `next_user_key` (not inclusive). When there is no range deletes, it will return - /// `HummockEpoch::MAX`. + /// `MAX_EPOCH`. /// /// Note: /// - Before calling this function, makes sure the iterator `is_valid`. @@ -299,7 +300,7 @@ impl DeleteRangeIterator for ForwardMergeRangeIterator { } for node in &self.tmp_buffer { let epoch = node.current_epoch(); - if epoch != HummockEpoch::MAX { + if epoch != MAX_EPOCH { self.current_epochs.remove(&epoch); } } @@ -308,7 +309,7 @@ impl DeleteRangeIterator for ForwardMergeRangeIterator { node.next().await?; if node.is_valid() { let epoch = node.current_epoch(); - if epoch != HummockEpoch::MAX { + if epoch != MAX_EPOCH { self.current_epochs.insert(epoch); } self.heap.push(node); @@ -329,7 +330,7 @@ impl DeleteRangeIterator for ForwardMergeRangeIterator { node.rewind().await?; if node.is_valid() { let epoch = node.current_epoch(); - if epoch != HummockEpoch::MAX { + if epoch != MAX_EPOCH { self.current_epochs.insert(epoch); } self.heap.push(node); @@ -348,7 +349,7 @@ impl DeleteRangeIterator for ForwardMergeRangeIterator { node.seek(target_user_key).await?; if node.is_valid() { let epoch = node.current_epoch(); - if epoch != HummockEpoch::MAX { + if epoch != MAX_EPOCH { self.current_epochs.insert(epoch); } self.heap.push(node); diff --git a/src/storage/src/hummock/iterator/forward_merge.rs b/src/storage/src/hummock/iterator/forward_merge.rs index df31dcb1b3d24..deffb1472c483 100644 --- a/src/storage/src/hummock/iterator/forward_merge.rs +++ b/src/storage/src/hummock/iterator/forward_merge.rs @@ -22,6 +22,7 @@ mod test { use futures::{pin_mut, FutureExt}; use risingwave_common::cache::CachePriority; use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey}; + use risingwave_hummock_sdk::EpochWithGap; use crate::hummock::iterator::test_utils::{ default_builder_opt_for_test, gen_iterator_test_sstable_base, @@ -349,7 +350,7 @@ mod test { table_id: Default::default(), table_key: TableKey(&b"test_key"[..]), }, - epoch: 0, + epoch_with_gap: EpochWithGap::new_from_epoch(0), } } diff --git a/src/storage/src/hummock/iterator/forward_user.rs b/src/storage/src/hummock/iterator/forward_user.rs index 78da3628a3588..47e58e2516253 100644 --- a/src/storage/src/hummock/iterator/forward_user.rs +++ b/src/storage/src/hummock/iterator/forward_user.rs @@ -15,8 +15,9 @@ use std::ops::Bound::*; use bytes::Bytes; +use risingwave_common::util::epoch::{MAX_EPOCH, MAX_SPILL_TIMES}; use risingwave_hummock_sdk::key::{FullKey, UserKey, UserKeyRange}; -use risingwave_hummock_sdk::HummockEpoch; +use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch}; use super::DeleteRangeIterator; use crate::hummock::iterator::{Forward, ForwardMergeRangeIterator, HummockIterator}; @@ -83,7 +84,7 @@ impl> UserIterator { /// Create [`UserIterator`] with maximum epoch. pub fn for_test(iterator: I, key_range: UserKeyRange) -> Self { - let read_epoch = HummockEpoch::MAX; + let read_epoch = MAX_EPOCH; Self::new( iterator, key_range, @@ -103,7 +104,7 @@ impl> UserIterator { pub async fn next(&mut self) -> HummockResult<()> { while self.iterator.is_valid() { let full_key = self.iterator.key(); - let epoch = full_key.epoch; + let epoch = full_key.epoch_with_gap.pure_epoch(); // handle multi-version if epoch < self.min_epoch || epoch > self.read_epoch { @@ -183,7 +184,7 @@ impl> UserIterator { Included(begin_key) => { let full_key = FullKey { user_key: begin_key.clone(), - epoch: self.read_epoch, + epoch_with_gap: EpochWithGap::new(self.read_epoch, MAX_SPILL_TIMES), }; self.iterator.seek(full_key.to_ref()).await?; if !self.iterator.is_valid() { @@ -235,7 +236,7 @@ impl> UserIterator { let full_key = FullKey { user_key, - epoch: self.read_epoch, + epoch_with_gap: EpochWithGap::new(self.read_epoch, MAX_SPILL_TIMES), }; self.iterator.seek(full_key).await?; if !self.iterator.is_valid() { @@ -930,7 +931,7 @@ mod tests { let mut i = 0; while ui.is_valid() { let key = ui.key(); - let key_epoch = key.epoch; + let key_epoch = key.epoch_with_gap.pure_epoch(); assert!(key_epoch >= min_epoch); i += 1; diff --git a/src/storage/src/hummock/iterator/merge_inner.rs b/src/storage/src/hummock/iterator/merge_inner.rs index 8a4e29ae58633..e3c2df4e51067 100644 --- a/src/storage/src/hummock/iterator/merge_inner.rs +++ b/src/storage/src/hummock/iterator/merge_inner.rs @@ -19,7 +19,7 @@ use std::ops::{Deref, DerefMut}; use bytes::Bytes; use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey}; -use risingwave_hummock_sdk::HummockEpoch; +use risingwave_hummock_sdk::EpochWithGap; use crate::hummock::iterator::{DirectionEnum, Forward, HummockIterator, HummockIteratorDirection}; use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatchIterator; @@ -137,14 +137,14 @@ impl OrderedMergeIteratorInner { impl OrderedMergeIteratorInner> { /// Used in `merge_imms_in_memory` to merge immutable memtables. - pub fn current_item(&self) -> (TableKey, (HummockEpoch, HummockValue)) { + pub fn current_item(&self) -> (TableKey, (EpochWithGap, HummockValue)) { let item = self .heap .peek() .expect("no inner iter for imm merge") .iter .current_item(); - (item.0.clone(), item.1.clone()) + (item.0.clone(), (item.1 .0, item.1 .1.clone())) } } @@ -300,7 +300,7 @@ impl MergeIteratorNext for OrderedMergeIteratorInner { table_id: top_key.user_key.table_id, table_key: TableKey(self.last_table_key.as_slice()), }, - epoch: top_key.epoch, + epoch_with_gap: top_key.epoch_with_gap, } }; loop { diff --git a/src/storage/src/hummock/iterator/test_utils.rs b/src/storage/src/hummock/iterator/test_utils.rs index 4845d7b43a0e4..18ce9791797de 100644 --- a/src/storage/src/hummock/iterator/test_utils.rs +++ b/src/storage/src/hummock/iterator/test_utils.rs @@ -20,7 +20,7 @@ use itertools::Itertools; use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey}; -use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId}; +use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, HummockSstableObjectId}; use risingwave_object_store::object::{ InMemObjectStore, ObjectStore, ObjectStoreImpl, ObjectStoreRef, }; @@ -90,7 +90,7 @@ pub fn iterator_test_bytes_user_key_of(idx: usize) -> UserKey { pub fn iterator_test_key_of(idx: usize) -> FullKey> { FullKey { user_key: iterator_test_user_key_of(idx), - epoch: 233, + epoch_with_gap: EpochWithGap::new_from_epoch(233), } } @@ -103,7 +103,7 @@ pub fn iterator_test_bytes_key_of(idx: usize) -> FullKey { pub fn iterator_test_key_of_epoch(idx: usize, epoch: HummockEpoch) -> FullKey> { FullKey { user_key: iterator_test_user_key_of(idx), - epoch, + epoch_with_gap: EpochWithGap::new_from_epoch(epoch), } } diff --git a/src/storage/src/hummock/mod.rs b/src/storage/src/hummock/mod.rs index 11af5e7deaea2..319f35d18f9bc 100644 --- a/src/storage/src/hummock/mod.rs +++ b/src/storage/src/hummock/mod.rs @@ -71,7 +71,7 @@ pub async fn get_from_sstable_info( read_options: &ReadOptions, dist_key_hash: Option, local_stats: &mut StoreLocalStatistic, -) -> HummockResult, HummockEpoch)>> { +) -> HummockResult, EpochWithGap)>> { let sstable = sstable_store_ref.sstable(sstable_info, local_stats).await?; // Bloom filter key is the distribution key, which is no need to be the prefix of pk, and do not @@ -92,8 +92,8 @@ pub async fn get_from_sstable_info( sstable.value().as_ref(), full_key.user_key, ); - if delete_epoch <= full_key.epoch { - return Ok(Some((HummockValue::Delete, delete_epoch))); + if delete_epoch <= full_key.epoch_with_gap.pure_epoch() { + return Ok(Some((HummockValue::Delete, EpochWithGap::new_from_epoch(delete_epoch)))); } } @@ -115,8 +115,11 @@ pub async fn get_from_sstable_info( iter.sst().value().as_ref(), full_key.user_key, ); - if delete_epoch <= full_key.epoch { - return Ok(Some((HummockValue::Delete, delete_epoch))); + if delete_epoch <= full_key.epoch_with_gap.pure_epoch() { + return Ok(Some(( + HummockValue::Delete, + EpochWithGap::new_from_epoch(delete_epoch), + ))); } } @@ -126,12 +129,15 @@ pub async fn get_from_sstable_info( // Iterator gets us the key, we tell if it's the key we want // or key next to it. let value = if iter.key().user_key == full_key.user_key { - Some((iter.value().to_bytes(), iter.key().epoch)) + Some((iter.value().to_bytes(), iter.key().epoch_with_gap)) } else if !read_options.ignore_range_tombstone { let delete_epoch = get_min_delete_range_epoch_from_sstable(iter.sst().value().as_ref(), full_key.user_key); - if delete_epoch <= full_key.epoch { - Some((HummockValue::Delete, delete_epoch)) + if delete_epoch <= full_key.epoch_with_gap.pure_epoch() { + Some(( + HummockValue::Delete, + EpochWithGap::new_from_epoch(delete_epoch), + )) } else { None } @@ -165,7 +171,7 @@ pub fn get_from_batch( read_epoch: HummockEpoch, read_options: &ReadOptions, local_stats: &mut StoreLocalStatistic, -) -> Option<(HummockValue, HummockEpoch)> { +) -> Option<(HummockValue, EpochWithGap)> { imm.get(table_key, read_epoch, read_options).map(|v| { local_stats.get_shared_buffer_hit_counts += 1; v diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs index 8a3ab574ef3d5..5c517a1155150 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs @@ -25,7 +25,9 @@ use bytes::{Bytes, BytesMut}; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; +use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::key::{FullKey, PointRange, TableKey, TableKeyRange, UserKey}; +use risingwave_hummock_sdk::EpochWithGap; use crate::hummock::event_handler::LocalInstanceId; use crate::hummock::iterator::{ @@ -57,7 +59,7 @@ pub type SharedBufferBatchId = u64; /// A shared buffer may contain data from multiple epochs, /// there are multiple versions for a given key (`table_key`), we put those versions into a vector /// and sort them in descending order, aka newest to oldest. -pub type SharedBufferVersionedEntry = (TableKey, Vec<(HummockEpoch, HummockValue)>); +pub type SharedBufferVersionedEntry = (TableKey, Vec<(EpochWithGap, HummockValue)>); type PointRangePair = (PointRange>, PointRange>); struct SharedBufferDeleteRangeMeta { @@ -92,6 +94,7 @@ impl SharedBufferBatchInner { pub(crate) fn new( table_id: TableId, epoch: HummockEpoch, + spill_offset: u16, payload: Vec, delete_ranges: Vec<(Bound, Bound)>, size: usize, @@ -152,9 +155,10 @@ impl SharedBufferBatchInner { } } let kv_count = payload.len(); + let epoch_with_gap = EpochWithGap::new(epoch, spill_offset); let items = payload .into_iter() - .map(|(k, v)| (k, vec![(epoch, v)])) + .map(|(k, v)| (k, vec![(epoch_with_gap, v)])) .collect_vec(); let mut monotonic_tombstone_events = Vec::with_capacity(point_range_pairs.len() * 2); @@ -165,7 +169,7 @@ impl SharedBufferBatchInner { }); monotonic_tombstone_events.push(MonotonicDeleteEvent { event_key: end_point_range, - new_epoch: HummockEpoch::MAX, + new_epoch: MAX_EPOCH, }); } @@ -285,7 +289,7 @@ impl SharedBufferBatchInner { table_key: TableKey<&[u8]>, read_epoch: HummockEpoch, read_options: &ReadOptions, - ) -> Option<(HummockValue, HummockEpoch)> { + ) -> Option<(HummockValue, EpochWithGap)> { // Perform binary search on table key to find the corresponding entry if let Ok(i) = self.payload.binary_search_by(|m| (m.0[..]).cmp(*table_key)) { let item = &self.payload[i]; @@ -293,7 +297,7 @@ impl SharedBufferBatchInner { // Scan to find the first version <= epoch for (e, v) in &item.1 { // skip invisible versions - if read_epoch < *e { + if read_epoch < e.pure_epoch() { continue; } return Some((v.clone(), *e)); @@ -304,7 +308,10 @@ impl SharedBufferBatchInner { if !read_options.ignore_range_tombstone { let delete_epoch = self.get_min_delete_range_epoch(UserKey::new(table_id, table_key)); if delete_epoch <= read_epoch { - Some((HummockValue::Delete, delete_epoch)) + Some(( + HummockValue::Delete, + EpochWithGap::new_from_epoch(delete_epoch), + )) } else { None } @@ -321,7 +328,7 @@ impl SharedBufferBatchInner { }, ); if idx == 0 { - HummockEpoch::MAX + MAX_EPOCH } else { self.monotonic_tombstone_events[idx - 1].new_epoch } @@ -365,6 +372,7 @@ impl SharedBufferBatch { inner: Arc::new(SharedBufferBatchInner::new( table_id, epoch, + 0, sorted_items, vec![], size, @@ -459,7 +467,7 @@ impl SharedBufferBatch { table_key: TableKey<&[u8]>, read_epoch: HummockEpoch, read_options: &ReadOptions, - ) -> Option<(HummockValue, HummockEpoch)> { + ) -> Option<(HummockValue, EpochWithGap)> { self.inner .get_value(self.table_id, table_key, read_epoch, read_options) } @@ -572,6 +580,7 @@ impl SharedBufferBatch { pub fn build_shared_buffer_batch( epoch: HummockEpoch, + spill_offset: u16, sorted_items: Vec, size: usize, delete_ranges: Vec<(Bound, Bound)>, @@ -582,6 +591,7 @@ impl SharedBufferBatch { let inner = SharedBufferBatchInner::new( table_id, epoch, + spill_offset, sorted_items, delete_ranges, size, @@ -658,7 +668,7 @@ impl SharedBufferBatchIterator { } /// Return all values of the current key - pub(crate) fn current_versions(&self) -> &Vec<(HummockEpoch, HummockValue)> { + pub(crate) fn current_versions(&self) -> &Vec<(EpochWithGap, HummockValue)> { debug_assert!(self.current_idx < self.inner.len()); let idx = match D::direction() { DirectionEnum::Forward => self.current_idx, @@ -675,7 +685,7 @@ impl SharedBufferBatchIterator { } } - pub(crate) fn current_item(&self) -> (&TableKey, &(HummockEpoch, HummockValue)) { + pub(crate) fn current_item(&self) -> (&TableKey, &(EpochWithGap, HummockValue)) { assert!(self.is_valid(), "iterator is not valid"); let (idx, version_idx) = match D::direction() { DirectionEnum::Forward => (self.current_idx, self.current_version_idx), @@ -724,8 +734,8 @@ impl HummockIterator for SharedBufferBatchIterator< } fn key(&self) -> FullKey<&[u8]> { - let (key, (epoch, _)) = self.current_item(); - FullKey::new(self.table_id, TableKey(key), *epoch) + let (key, (epoch_with_gap, _)) = self.current_item(); + FullKey::new_with_gap_epoch(self.table_id, TableKey(key), *epoch_with_gap) } fn value(&self) -> HummockValue<&[u8]> { @@ -765,15 +775,15 @@ impl HummockIterator for SharedBufferBatchIterator< let partition_point = self .inner .binary_search_by(|probe| probe.0[..].cmp(*key.user_key.table_key)); - let seek_key_epoch = key.epoch; + let seek_key_epoch = key.epoch_with_gap; match D::direction() { DirectionEnum::Forward => match partition_point { Ok(i) => { self.current_idx = i; // seek to the first version that is <= the seek key epoch let mut idx: i32 = 0; - for (epoch, _) in self.current_versions() { - if *epoch <= seek_key_epoch { + for (epoch_with_gap, _) in self.current_versions() { + if epoch_with_gap <= &seek_key_epoch { break; } idx += 1; @@ -800,8 +810,8 @@ impl HummockIterator for SharedBufferBatchIterator< // seek from back to the first version that is >= seek_key_epoch let values = self.current_versions(); let mut idx: i32 = (values.len() - 1) as i32; - for (epoch, _) in values.iter().rev() { - if *epoch >= seek_key_epoch { + for (epoch_with_gap, _) in values.iter().rev() { + if epoch_with_gap >= &seek_key_epoch { break; } idx -= 1; @@ -857,7 +867,7 @@ impl DeleteRangeIterator for SharedBufferDeleteRangeIterator { if self.next_idx > 0 { self.inner.monotonic_tombstone_events[self.next_idx - 1].new_epoch } else { - HummockEpoch::MAX + MAX_EPOCH } } @@ -897,6 +907,7 @@ mod tests { use std::ops::Bound::{Excluded, Included}; use risingwave_common::must_match; + use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::key::map_table_key_range; use super::*; @@ -994,6 +1005,7 @@ mod tests { let batch = SharedBufferBatch::build_shared_buffer_batch( epoch, + 0, vec![], 1, vec![ @@ -1176,6 +1188,7 @@ mod tests { ]; let shared_buffer_batch = SharedBufferBatch::build_shared_buffer_batch( epoch, + 0, vec![], 0, delete_ranges, @@ -1189,7 +1202,7 @@ mod tests { .get_min_delete_range_epoch(UserKey::new(Default::default(), TableKey(b"aaa"),)) ); assert_eq!( - HummockEpoch::MAX, + MAX_EPOCH, shared_buffer_batch .get_min_delete_range_epoch(UserKey::new(Default::default(), TableKey(b"bbb"),)) ); @@ -1199,7 +1212,7 @@ mod tests { .get_min_delete_range_epoch(UserKey::new(Default::default(), TableKey(b"ddd"),)) ); assert_eq!( - HummockEpoch::MAX, + MAX_EPOCH, shared_buffer_batch .get_min_delete_range_epoch(UserKey::new(Default::default(), TableKey(b"eee"),)) ); @@ -1379,7 +1392,7 @@ mod tests { iter.rewind().await.unwrap(); let mut output = vec![]; while iter.is_valid() { - let epoch = iter.key().epoch; + let epoch = iter.key().epoch_with_gap.pure_epoch(); if snapshot_epoch == epoch { output.push(( iter.key().user_key.table_key.to_vec(), @@ -1468,6 +1481,7 @@ mod tests { let size = SharedBufferBatch::measure_batch_size(&sorted_items1); let imm1 = SharedBufferBatch::build_shared_buffer_batch( epoch, + 0, sorted_items1, size, delete_ranges, @@ -1513,6 +1527,7 @@ mod tests { let size = SharedBufferBatch::measure_batch_size(&sorted_items2); let imm2 = SharedBufferBatch::build_shared_buffer_batch( epoch, + 0, sorted_items2, size, delete_ranges, diff --git a/src/storage/src/hummock/sstable/block.rs b/src/storage/src/hummock/sstable/block.rs index 809f797bb11e8..1c73d0ca00088 100644 --- a/src/storage/src/hummock/sstable/block.rs +++ b/src/storage/src/hummock/sstable/block.rs @@ -474,7 +474,7 @@ impl BlockBuilder { KeyComparator::compare_encoded_full_key(&self.last_key[..], &key[..]), Ordering::Less, "epoch: {}, table key: {}", - full_key.epoch, + full_key.epoch_with_gap.pure_epoch(), u64::from_be_bytes( full_key.user_key.table_key.as_ref()[0..8] .try_into() diff --git a/src/storage/src/hummock/sstable/builder.rs b/src/storage/src/hummock/sstable/builder.rs index bb6d190468f8c..537d25539c957 100644 --- a/src/storage/src/hummock/sstable/builder.rs +++ b/src/storage/src/hummock/sstable/builder.rs @@ -17,6 +17,7 @@ use std::collections::BTreeSet; use std::sync::Arc; use bytes::{Bytes, BytesMut}; +use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::key::{user_key, FullKey, MAX_KEY_LEN}; use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap}; use risingwave_hummock_sdk::{HummockEpoch, KeyComparator, LocalSstableInfo}; @@ -113,7 +114,7 @@ pub struct SstableBuilder { /// epoch1 to epoch2, thus the `new epoch` is epoch2. epoch2 will be used from the event /// key wmk1 (5) and till the next event key wmk2 (7) (not inclusive). /// If there is no range deletes between current event key and next event key, `new_epoch` will - /// be `HummockEpoch::MAX`. + /// be `MAX_EPOCH`. monotonic_deletes: Vec, /// `table_id` of added keys. table_ids: BTreeSet, @@ -197,9 +198,9 @@ impl SstableBuilder { if self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id { self.table_ids.insert(table_id); } - if event.new_epoch == HummockEpoch::MAX + if event.new_epoch == MAX_EPOCH && self.monotonic_deletes.last().map_or(true, |last| { - last.new_epoch == HummockEpoch::MAX + last.new_epoch == MAX_EPOCH && last.event_key.left_user_key.table_id == event.event_key.left_user_key.table_id }) @@ -207,7 +208,7 @@ impl SstableBuilder { // This range would never delete any key so we can merge it with last range. return; } - if event.new_epoch != HummockEpoch::MAX { + if event.new_epoch != MAX_EPOCH { self.epoch_set.insert(event.new_epoch); } self.range_tombstone_size += event.encoded_size(); @@ -217,7 +218,7 @@ impl SstableBuilder { pub fn last_range_tombstone_epoch(&self) -> HummockEpoch { self.monotonic_deletes .last() - .map_or(HummockEpoch::MAX, |delete| delete.new_epoch) + .map_or(MAX_EPOCH, |delete| delete.new_epoch) } pub fn last_range_tombstone(&self) -> Option<&MonotonicDeleteEvent> { @@ -305,11 +306,12 @@ impl SstableBuilder { { let table_id = full_key.user_key.table_id.table_id(); tracing::warn!( - "A large key/value (table_id={}, key len={}, value len={}, epoch={}) is added to block", + "A large key/value (table_id={}, key len={}, value len={}, epoch={}, spill offset={}) is added to block", table_id, table_key_len, table_value_len, - full_key.epoch + full_key.epoch_with_gap.pure_epoch(), + full_key.epoch_with_gap.offset(), ); } @@ -335,7 +337,7 @@ impl SstableBuilder { self.build_block().await?; } self.last_table_stats.total_key_count += 1; - self.epoch_set.insert(full_key.epoch); + self.epoch_set.insert(full_key.epoch_with_gap.pure_epoch()); // Rotate block builder if the previous one has been built. if self.block_builder.is_empty() { @@ -397,7 +399,7 @@ impl SstableBuilder { assert!(self.monotonic_deletes.is_empty() || self.monotonic_deletes.len() > 1); if let Some(monotonic_delete) = self.monotonic_deletes.last() { - assert_eq!(monotonic_delete.new_epoch, HummockEpoch::MAX); + assert_eq!(monotonic_delete.new_epoch, MAX_EPOCH); if monotonic_delete.event_key.is_exclude_left_key { if largest_key.is_empty() || !KeyComparator::encoded_greater_than_unencoded( @@ -418,18 +420,18 @@ impl SstableBuilder { ) { // use MAX as epoch because the last monotonic delete must be - // `HummockEpoch::MAX`, so we can not include any version of + // `MAX_EPOCH`, so we can not include any version of // this key. largest_key = FullKey::from_user_key( monotonic_delete.event_key.left_user_key.clone(), - HummockEpoch::MAX, + MAX_EPOCH, ) .encode(); right_exclusive = true; } } if let Some(monotonic_delete) = self.monotonic_deletes.first() { - assert_ne!(monotonic_delete.new_epoch, HummockEpoch::MAX); + assert_ne!(monotonic_delete.new_epoch, MAX_EPOCH); if smallest_key.is_empty() || !KeyComparator::encoded_less_than_unencoded( user_key(&smallest_key), @@ -438,7 +440,7 @@ impl SstableBuilder { { smallest_key = FullKey::from_user_key( monotonic_delete.event_key.left_user_key.clone(), - HummockEpoch::MAX, + MAX_EPOCH, ) .encode(); } @@ -489,11 +491,11 @@ impl SstableBuilder { // Expand the epoch of the whole sst by tombstone epoch let (tombstone_min_epoch, tombstone_max_epoch) = { - let mut tombstone_min_epoch = u64::MAX; + let mut tombstone_min_epoch = MAX_EPOCH; let mut tombstone_max_epoch = u64::MIN; for monotonic_delete in &meta.monotonic_tombstone_events { - if monotonic_delete.new_epoch != HummockEpoch::MAX { + if monotonic_delete.new_epoch != MAX_EPOCH { tombstone_min_epoch = cmp::min(tombstone_min_epoch, monotonic_delete.new_epoch); tombstone_max_epoch = cmp::max(tombstone_max_epoch, monotonic_delete.new_epoch); } @@ -535,7 +537,7 @@ impl SstableBuilder { let (min_epoch, max_epoch) = { if self.epoch_set.is_empty() { - (u64::MAX, u64::MIN) + (MAX_EPOCH, u64::MIN) } else { ( *self.epoch_set.first().unwrap(), @@ -691,7 +693,7 @@ pub(super) mod tests { let mut b = SstableBuilder::for_test(0, mock_sst_writer(&opt), opt); b.add_monotonic_deletes(vec![ MonotonicDeleteEvent::new(table_id, b"abcd".to_vec(), 0), - MonotonicDeleteEvent::new(table_id, b"eeee".to_vec(), HummockEpoch::MAX), + MonotonicDeleteEvent::new(table_id, b"eeee".to_vec(), MAX_EPOCH), ]); let s = b.finish().await.unwrap().sst_info; let key_range = s.sst_info.key_range.unwrap(); diff --git a/src/storage/src/hummock/sstable/delete_range_aggregator.rs b/src/storage/src/hummock/sstable/delete_range_aggregator.rs index 2b82e1f3c17f9..953f5b16b28a5 100644 --- a/src/storage/src/hummock/sstable/delete_range_aggregator.rs +++ b/src/storage/src/hummock/sstable/delete_range_aggregator.rs @@ -18,6 +18,7 @@ use std::future::Future; use std::sync::Arc; use itertools::Itertools; +use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::key::{PointRange, UserKey}; use risingwave_hummock_sdk::HummockEpoch; @@ -89,7 +90,7 @@ pub(crate) type CompactionDeleteRangeEvent = ( /// `<1, +epoch1> <3, +epoch2> <5, -epoch1> <7, -epoch2> <10, +epoch3> <12, -epoch3>` /// We rely on the fact that keys met in compaction are in order. /// When user key 0 comes, no events have happened yet so no range delete epoch. (will be -/// represented as range delete epoch `HummockEpoch::MAX`) +/// represented as range delete epoch `MAX_EPOCH`) /// When user key 1 comes, event `<1, +epoch1>` happens so there is currently one range delete /// epoch: epoch1. /// When user key 2 comes, no more events happen so the set remains `{epoch1}`. @@ -170,15 +171,15 @@ impl CompactionDeleteRangesBuilder { (Vec, Vec), >::default(); for monotonic_deletes in self.events { - let mut last_exit_epoch = HummockEpoch::MAX; + let mut last_exit_epoch = MAX_EPOCH; for delete_event in monotonic_deletes { - if last_exit_epoch != HummockEpoch::MAX { + if last_exit_epoch != MAX_EPOCH { let entry = ret.entry(delete_event.event_key.clone()).or_default(); entry.0.push(TombstoneEnterExitEvent { tombstone_epoch: last_exit_epoch, }); } - if delete_event.new_epoch != HummockEpoch::MAX { + if delete_event.new_epoch != MAX_EPOCH { let entry = ret.entry(delete_event.event_key).or_default(); entry.1.push(TombstoneEnterExitEvent { tombstone_epoch: delete_event.new_epoch, @@ -242,7 +243,7 @@ impl CompactionDeleteRanges { if !monotonic_events.is_empty() { monotonic_events.push(MonotonicDeleteEvent { event_key: extended_largest_user_key.to_vec(), - new_epoch: HummockEpoch::MAX, + new_epoch: MAX_EPOCH, }); } break; @@ -250,7 +251,7 @@ impl CompactionDeleteRanges { apply_event(&mut epochs, &self.events[idx]); monotonic_events.push(MonotonicDeleteEvent { event_key: self.events[idx].0.clone(), - new_epoch: epochs.first().map_or(HummockEpoch::MAX, |epoch| *epoch), + new_epoch: epochs.first().map_or(MAX_EPOCH, |epoch| *epoch), }); idx += 1; } @@ -259,14 +260,8 @@ impl CompactionDeleteRanges { && a.new_epoch == b.new_epoch }); if !monotonic_events.is_empty() { - assert_ne!( - monotonic_events.first().unwrap().new_epoch, - HummockEpoch::MAX - ); - assert_eq!( - monotonic_events.last().unwrap().new_epoch, - HummockEpoch::MAX - ); + assert_ne!(monotonic_events.first().unwrap().new_epoch, MAX_EPOCH); + assert_eq!(monotonic_events.last().unwrap().new_epoch, MAX_EPOCH); } monotonic_events } @@ -323,16 +318,14 @@ impl CompactionDeleteRangeIterator { } pub(crate) fn earliest_epoch(&self) -> HummockEpoch { - self.epochs - .first() - .map_or(HummockEpoch::MAX, |epoch| *epoch) + self.epochs.first().map_or(MAX_EPOCH, |epoch| *epoch) } pub(crate) fn earliest_delete_since(&self, epoch: HummockEpoch) -> HummockEpoch { self.epochs .range(epoch..) .next() - .map_or(HummockEpoch::MAX, |ret| *ret) + .map_or(MAX_EPOCH, |ret| *ret) } pub(crate) fn seek<'a>(&'a mut self, target_user_key: UserKey<&'a [u8]>) { @@ -394,7 +387,7 @@ impl DeleteRangeIterator for SstableDeleteRangeIterator { if self.next_idx > 0 { self.table.value().meta.monotonic_tombstone_events[self.next_idx - 1].new_epoch } else { - HummockEpoch::MAX + MAX_EPOCH } } @@ -441,7 +434,7 @@ pub fn get_min_delete_range_epoch_from_sstable( |MonotonicDeleteEvent { event_key, .. }| event_key.as_ref().le(&query_extended_user_key), ); if idx == 0 { - HummockEpoch::MAX + MAX_EPOCH } else { table.meta.monotonic_tombstone_events[idx - 1].new_epoch } @@ -508,7 +501,7 @@ mod tests { assert_eq!( iter.earliest_delete_which_can_see_key(test_user_key(b"bbb").as_ref(), 13), - HummockEpoch::MAX + MAX_EPOCH ); assert_eq!( iter.earliest_delete_which_can_see_key(test_user_key(b"bbb").as_ref(), 11), @@ -529,16 +522,16 @@ mod tests { assert_eq!( iter.earliest_delete_which_can_see_key(test_user_key(b"bbbddd").as_ref(), 8), - HummockEpoch::MAX + MAX_EPOCH ); assert_eq!( iter.earliest_delete_which_can_see_key(test_user_key(b"bbbeee").as_ref(), 8), - HummockEpoch::MAX + MAX_EPOCH ); assert_eq!( iter.earliest_delete_which_can_see_key(test_user_key(b"bbbeef").as_ref(), 10), - HummockEpoch::MAX + MAX_EPOCH ); assert_eq!( iter.earliest_delete_which_can_see_key(test_user_key(b"eeeeee").as_ref(), 8), @@ -550,7 +543,7 @@ mod tests { ); assert_eq!( iter.earliest_delete_which_can_see_key(test_user_key(b"hhhhhh").as_ref(), 6), - HummockEpoch::MAX + MAX_EPOCH ); assert_eq!( iter.earliest_delete_which_can_see_key(test_user_key(b"iiiiii").as_ref(), 6), @@ -652,6 +645,6 @@ mod tests { &sstable, iterator_test_user_key_of(8).as_ref(), ); - assert_eq!(ret, HummockEpoch::MAX); + assert_eq!(ret, MAX_EPOCH); } } diff --git a/src/storage/src/hummock/sstable/mod.rs b/src/storage/src/hummock/sstable/mod.rs index dfa4a5c0a0095..1c6b5aeedea5a 100644 --- a/src/storage/src/hummock/sstable/mod.rs +++ b/src/storage/src/hummock/sstable/mod.rs @@ -34,6 +34,7 @@ pub mod builder; pub use builder::*; pub mod writer; use risingwave_common::catalog::TableId; +use risingwave_common::util::epoch::MAX_EPOCH; pub use writer::*; mod forward_sstable_iterator; pub mod multi_builder; @@ -149,7 +150,7 @@ impl DeleteRangeTombstone { /// thus the `new epoch` is epoch2. epoch2 will be used from the event key wmk1 (5) and till the /// next event key wmk2 (7) (not inclusive). /// If there is no range deletes between current event key and next event key, `new_epoch` will be -/// `HummockEpoch::MAX`. +/// `MAX_EPOCH`. #[derive(Clone, Debug, PartialEq, Eq)] pub struct MonotonicDeleteEvent { pub event_key: PointRange>, @@ -208,7 +209,7 @@ pub(crate) fn create_monotonic_events_from_compaction_delete_events( apply_event(&mut epochs, &event); monotonic_tombstone_events.push(MonotonicDeleteEvent { event_key: event.0, - new_epoch: epochs.first().map_or(HummockEpoch::MAX, |epoch| *epoch), + new_epoch: epochs.first().map_or(MAX_EPOCH, |epoch| *epoch), }); } monotonic_tombstone_events.dedup_by(|a, b| { @@ -406,7 +407,7 @@ pub struct SstableMeta { /// epoch1 to epoch2, thus the `new epoch` is epoch2. epoch2 will be used from the event /// key wmk1 (5) and till the next event key wmk2 (7) (not inclusive). /// If there is no range deletes between current event key and next event key, `new_epoch` will - /// be `HummockEpoch::MAX`. + /// be `MAX_EPOCH`. pub monotonic_tombstone_events: Vec, /// Format version, for further compatibility. pub version: u32, diff --git a/src/storage/src/hummock/sstable/multi_builder.rs b/src/storage/src/hummock/sstable/multi_builder.rs index 4baabb4fdafe6..b6d181b48894a 100644 --- a/src/storage/src/hummock/sstable/multi_builder.rs +++ b/src/storage/src/hummock/sstable/multi_builder.rs @@ -19,8 +19,9 @@ use std::sync::Arc; use bytes::Bytes; use num_integer::Integer; use risingwave_common::hash::VirtualNode; +use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::key::{FullKey, PointRange, UserKey}; -use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo}; +use risingwave_hummock_sdk::LocalSstableInfo; use tokio::task::JoinHandle; use super::MonotonicDeleteEvent; @@ -199,7 +200,7 @@ where // the captured reference to `current_builder` is also required to be `Send`, and then // `current_builder` itself is required to be `Sync`, which is unnecessary. let mut need_seal_current = false; - let mut last_range_tombstone_epoch = HummockEpoch::MAX; + let mut last_range_tombstone_epoch = MAX_EPOCH; if let Some(builder) = self.current_builder.as_mut() { if is_new_user_key { if switch_builder { @@ -214,7 +215,7 @@ where } if need_seal_current && let Some(event) = builder.last_range_tombstone() - && event.new_epoch != HummockEpoch::MAX + && event.new_epoch != MAX_EPOCH { last_range_tombstone_epoch = event.new_epoch; if event @@ -228,7 +229,7 @@ where } else { builder.add_monotonic_delete(MonotonicDeleteEvent { event_key: PointRange::from_user_key(full_key.user_key.to_vec(), false), - new_epoch: HummockEpoch::MAX, + new_epoch: MAX_EPOCH, }); } } @@ -247,7 +248,7 @@ where let mut builder = self.builder_factory.open_builder().await?; // If last_range_tombstone_epoch is not MAX, it means that we cut one range-tombstone to // two half and add the right half as a new range to next sstable. - if need_seal_current && last_range_tombstone_epoch != HummockEpoch::MAX { + if need_seal_current && last_range_tombstone_epoch != MAX_EPOCH { builder.add_monotonic_delete(MonotonicDeleteEvent { event_key: PointRange::from_user_key(full_key.user_key.to_vec(), false), new_epoch: last_range_tombstone_epoch, @@ -305,19 +306,19 @@ where pub async fn add_monotonic_delete(&mut self, event: MonotonicDeleteEvent) -> HummockResult<()> { if let Some(builder) = self.current_builder.as_mut() && builder.reach_capacity() - && event.new_epoch != HummockEpoch::MAX + && event.new_epoch != MAX_EPOCH { - if builder.last_range_tombstone_epoch() != HummockEpoch::MAX { + if builder.last_range_tombstone_epoch() != MAX_EPOCH { builder.add_monotonic_delete(MonotonicDeleteEvent { event_key: event.event_key.clone(), - new_epoch: HummockEpoch::MAX, + new_epoch: MAX_EPOCH, }); } self.seal_current().await?; } if self.current_builder.is_none() { - if event.new_epoch == HummockEpoch::MAX { + if event.new_epoch == MAX_EPOCH { return Ok(()); } @@ -652,7 +653,7 @@ mod tests { FullKey::for_test( table_id, &[VirtualNode::ZERO.to_be_bytes().as_slice(), b"aaa"].concat(), - u64::MAX, + MAX_EPOCH, ) .encode() ); @@ -661,7 +662,7 @@ mod tests { FullKey::for_test( table_id, &[VirtualNode::ZERO.to_be_bytes().as_slice(), b"kkk"].concat(), - u64::MAX + MAX_EPOCH ) .encode() ); @@ -694,7 +695,7 @@ mod tests { false, 0, ); - assert_eq!(del_iter.earliest_epoch(), HummockEpoch::MAX); + assert_eq!(del_iter.earliest_epoch(), MAX_EPOCH); while del_iter.is_valid() { del_iter.update_range(); builder @@ -793,7 +794,7 @@ mod tests { UserKey::for_test(table_id, b"gggg".to_vec()), false, ), - new_epoch: HummockEpoch::MAX, + new_epoch: MAX_EPOCH, }) .await .unwrap(); @@ -808,22 +809,18 @@ mod tests { let key_range = ssts[0].key_range.as_ref().unwrap(); let expected_left = - FullKey::from_user_key(UserKey::for_test(table_id, b"aaaa"), HummockEpoch::MAX) - .encode(); + FullKey::from_user_key(UserKey::for_test(table_id, b"aaaa"), MAX_EPOCH).encode(); let expected_right = - FullKey::from_user_key(UserKey::for_test(table_id, b"eeee"), HummockEpoch::MAX) - .encode(); + FullKey::from_user_key(UserKey::for_test(table_id, b"eeee"), MAX_EPOCH).encode(); assert_eq!(key_range.left, expected_left); assert_eq!(key_range.right, expected_right); assert!(key_range.right_exclusive); let key_range = ssts[1].key_range.as_ref().unwrap(); let expected_left = - FullKey::from_user_key(UserKey::for_test(table_id, b"eeee"), HummockEpoch::MAX) - .encode(); + FullKey::from_user_key(UserKey::for_test(table_id, b"eeee"), MAX_EPOCH).encode(); let expected_right = - FullKey::from_user_key(UserKey::for_test(table_id, b"gggg"), HummockEpoch::MAX) - .encode(); + FullKey::from_user_key(UserKey::for_test(table_id, b"gggg"), MAX_EPOCH).encode(); assert_eq!(key_range.left, expected_left); assert_eq!(key_range.right, expected_right); assert!(key_range.right_exclusive); diff --git a/src/storage/src/hummock/sstable/xor_filter.rs b/src/storage/src/hummock/sstable/xor_filter.rs index 8d0564ebbf92d..fea7d0c42c8e1 100644 --- a/src/storage/src/hummock/sstable/xor_filter.rs +++ b/src/storage/src/hummock/sstable/xor_filter.rs @@ -444,6 +444,7 @@ impl Clone for XorFilterReader { mod tests { use rand::RngCore; use risingwave_common::cache::CachePriority; + use risingwave_hummock_sdk::EpochWithGap; use super::*; use crate::filter_key_extractor::{FilterKeyExtractorImpl, FullKeyFilterKeyExtractor}; @@ -488,7 +489,7 @@ mod tests { let epoch = 20 - j; let k = FullKey { user_key: test_user_key_of(i), - epoch, + epoch_with_gap: EpochWithGap::new_from_epoch(epoch), }; let v = HummockValue::put(test_value_of(i)); builder.add(k.to_ref(), v.as_slice()).await.unwrap(); diff --git a/src/storage/src/hummock/state_store.rs b/src/storage/src/hummock/state_store.rs index 13947802c4d9b..8dfb33fe2ea17 100644 --- a/src/storage/src/hummock/state_store.rs +++ b/src/storage/src/hummock/state_store.rs @@ -181,7 +181,7 @@ impl StateStore for HummockStorage { self.validate_read_epoch(wait_epoch)?; let wait_epoch = match wait_epoch { HummockReadEpoch::Committed(epoch) => { - assert_ne!(epoch, HummockEpoch::MAX, "epoch should not be u64::MAX"); + assert_ne!(epoch, MAX_EPOCH, "epoch should not be MAX_EPOCH"); epoch } _ => return Ok(()), @@ -208,7 +208,7 @@ impl StateStore for HummockStorage { if is_checkpoint { let _ = self.min_current_epoch.compare_exchange( - HummockEpoch::MAX, + MAX_EPOCH, epoch, MemOrdering::SeqCst, MemOrdering::SeqCst, @@ -232,7 +232,7 @@ impl StateStore for HummockStorage { let epoch = self.pinned_version.load().max_committed_epoch(); self.min_current_epoch - .store(HummockEpoch::MAX, MemOrdering::SeqCst); + .store(MAX_EPOCH, MemOrdering::SeqCst); self.seal_epoch.store(epoch, MemOrdering::SeqCst); Ok(()) @@ -246,8 +246,8 @@ impl StateStore for HummockStorage { if let HummockReadEpoch::Current(read_current_epoch) = epoch { assert_ne!( read_current_epoch, - HummockEpoch::MAX, - "epoch should not be u64::MAX" + MAX_EPOCH, + "epoch should not be MAX_EPOCH" ); let sealed_epoch = self.seal_epoch.load(MemOrdering::SeqCst); if read_current_epoch > sealed_epoch { diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 5e51fa1170b12..e0125263df0a5 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -23,6 +23,7 @@ use bytes::Bytes; use itertools::Itertools; use more_asserts::assert_gt; use risingwave_common::catalog::TableId; +use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_common_service::observer_manager::{NotificationClient, ObserverManager}; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; use risingwave_hummock_sdk::HummockReadEpoch; @@ -341,6 +342,7 @@ impl HummockStorage { self.write_limiter.clone(), option, version_update_notifier_tx, + self.context.storage_opts.mem_table_spill_threshold, ) } @@ -400,7 +402,7 @@ impl StateStore for HummockStorage { self.validate_read_epoch(wait_epoch)?; let wait_epoch = match wait_epoch { HummockReadEpoch::Committed(epoch) => { - assert_ne!(epoch, HummockEpoch::MAX, "epoch should not be u64::MAX"); + assert_ne!(epoch, MAX_EPOCH, "epoch should not be MAX_EPOCH"); epoch } _ => return Ok(()), @@ -427,7 +429,7 @@ impl StateStore for HummockStorage { if is_checkpoint { let _ = self.min_current_epoch.compare_exchange( - HummockEpoch::MAX, + MAX_EPOCH, epoch, MemOrdering::SeqCst, MemOrdering::SeqCst, @@ -450,8 +452,7 @@ impl StateStore for HummockStorage { rx.await.expect("should wait success"); let epoch = self.pinned_version.load().max_committed_epoch(); - self.min_current_epoch - .store(HummockEpoch::MAX, MemOrdering::SeqCst); + self.min_current_epoch.store(MAX_EPOCH, MemOrdering::SeqCst); self.seal_epoch.store(epoch, MemOrdering::SeqCst); Ok(()) @@ -464,9 +465,8 @@ impl StateStore for HummockStorage { fn validate_read_epoch(&self, epoch: HummockReadEpoch) -> StorageResult<()> { if let HummockReadEpoch::Current(read_current_epoch) = epoch { assert_ne!( - read_current_epoch, - HummockEpoch::MAX, - "epoch should not be u64::MAX" + read_current_epoch, MAX_EPOCH, + "epoch should not be MAX_EPOCH" ); let sealed_epoch = self.seal_epoch.load(MemOrdering::SeqCst); if read_current_epoch > sealed_epoch { diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index 94b536bf6919f..0c92013d20d0c 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -21,6 +21,7 @@ use bytes::Bytes; use parking_lot::RwLock; use prometheus::IntGauge; use risingwave_common::catalog::{TableId, TableOption}; +use risingwave_common::util::epoch::{MAX_EPOCH, MAX_SPILL_TIMES}; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; use risingwave_hummock_sdk::HummockEpoch; use tokio::sync::mpsc; @@ -51,9 +52,11 @@ use crate::StateStoreIter; /// `LocalHummockStorage` is a handle for a state table shard to access data from and write data to /// the hummock state backend. It is created via `HummockStorage::new_local`. + pub struct LocalHummockStorage { mem_table: MemTable, + spill_offset: u16, epoch: Option, table_id: TableId, @@ -93,6 +96,8 @@ pub struct LocalHummockStorage { mem_table_size: IntGauge, mem_table_item_count: IntGauge, + + mem_table_spill_threshold: usize, } impl LocalHummockStorage { @@ -156,7 +161,7 @@ impl LocalHummockStorage { } let read_snapshot = read_filter_for_local( - HummockEpoch::MAX, // Use MAX epoch to make sure we read from latest + MAX_EPOCH, // Use MAX epoch to make sure we read from latest read_options.table_id, &key_range, self.read_version.clone(), @@ -337,6 +342,25 @@ impl LocalStateStore for LocalHummockStorage { .await } + async fn try_flush(&mut self) -> StorageResult<()> { + if self.mem_table.kv_size.size() > self.mem_table_spill_threshold { + tracing::info!( + "The size of mem table is {} Mb and it exceeds {} Mb and spill occurs. table_id {}", + self.mem_table.kv_size.size() >> 20, + self.mem_table_spill_threshold >> 20, + self.table_id.table_id() + ); + + if self.spill_offset < MAX_SPILL_TIMES { + self.flush(vec![]).await?; + } else { + tracing::warn!("No mem table spill occurs, the gap epoch exceeds available range."); + } + } + + Ok(()) + } + fn epoch(&self) -> u64 { self.epoch.expect("should have set the epoch") } @@ -355,6 +379,7 @@ impl LocalStateStore for LocalHummockStorage { "local state store of table id {:?} is init for more than once", self.table_id ); + Ok(()) } @@ -364,6 +389,7 @@ impl LocalStateStore for LocalHummockStorage { .epoch .replace(next_epoch) .expect("should have init epoch before seal the first epoch"); + self.spill_offset = 0; assert!( next_epoch > prev_epoch, "new epoch {} should be greater than current epoch: {}", @@ -426,6 +452,7 @@ impl LocalHummockStorage { let instance_id = self.instance_guard.instance_id; let imm = SharedBufferBatch::build_shared_buffer_batch( epoch, + self.spill_offset, sorted_items, size, delete_ranges, @@ -433,6 +460,7 @@ impl LocalHummockStorage { Some(instance_id), Some(tracker), ); + self.spill_offset += 1; let imm_size = imm.size(); self.update(VersionUpdate::Staging(StagingData::ImmMem(imm.clone()))); @@ -468,6 +496,7 @@ impl LocalHummockStorage { write_limiter: WriteLimiterRef, option: NewLocalOptions, version_update_notifier_tx: Arc>, + mem_table_spill_threshold: usize, ) -> Self { let stats = hummock_version_reader.stats().clone(); let mem_table_size = stats.mem_table_memory_size.with_label_values(&[ @@ -480,6 +509,7 @@ impl LocalHummockStorage { ]); Self { mem_table: MemTable::new(option.is_consistent_op), + spill_offset: 0, epoch: None, table_id: option.table_id, is_consistent_op: option.is_consistent_op, @@ -495,6 +525,7 @@ impl LocalHummockStorage { version_update_notifier_tx, mem_table_size, mem_table_item_count, + mem_table_spill_threshold, } } diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index f2d5eca81b171..c2dba80ee6cfe 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -568,7 +568,7 @@ impl HummockVersionReader { &read_options, local_stats, ) { - return Ok(if data_epoch < min_epoch { + return Ok(if data_epoch.pure_epoch() < min_epoch { None } else { data.into_user_value() @@ -594,7 +594,7 @@ impl HummockVersionReader { ) .await? { - return Ok(if data_epoch < min_epoch { + return Ok(if data_epoch.pure_epoch() < min_epoch { None } else { data.into_user_value() @@ -631,7 +631,7 @@ impl HummockVersionReader { ) .await? { - return Ok(if data_epoch < min_epoch { + return Ok(if data_epoch.pure_epoch() < min_epoch { None } else { data.into_user_value() @@ -668,7 +668,7 @@ impl HummockVersionReader { ) .await? { - return Ok(if data_epoch < min_epoch { + return Ok(if data_epoch.pure_epoch() < min_epoch { None } else { data.into_user_value() diff --git a/src/storage/src/hummock/test_utils.rs b/src/storage/src/hummock/test_utils.rs index 424ca7d1a2131..a5a00413c8d20 100644 --- a/src/storage/src/hummock/test_utils.rs +++ b/src/storage/src/hummock/test_utils.rs @@ -21,8 +21,9 @@ use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_common::must_match; +use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::key::{FullKey, PointRange, TableKey, UserKey}; -use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId}; +use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, HummockSstableObjectId}; use risingwave_pb::hummock::{KeyRange, SstableInfo}; use super::iterator::test_utils::iterator_test_table_key_of; @@ -223,17 +224,17 @@ pub async fn gen_test_sstable_impl + Clone + Default + Eq, F: Fil ); let mut last_key = FullKey::::default(); - let mut user_key_last_delete = HummockEpoch::MAX; + let mut user_key_last_delete = MAX_EPOCH; for (mut key, value) in kv_iter { let is_new_user_key = last_key.is_empty() || key.user_key.as_ref() != last_key.user_key.as_ref(); - let epoch = key.epoch; + let epoch = key.epoch_with_gap.pure_epoch(); if is_new_user_key { last_key = key.clone(); - user_key_last_delete = HummockEpoch::MAX; + user_key_last_delete = MAX_EPOCH; } - let mut earliest_delete_epoch = HummockEpoch::MAX; + let mut earliest_delete_epoch = MAX_EPOCH; let extended_user_key = PointRange::from_user_key(key.user_key.as_ref(), false); for range_tombstone in &range_tombstones { if range_tombstone @@ -241,7 +242,7 @@ pub async fn gen_test_sstable_impl + Clone + Default + Eq, F: Fil .as_ref() .le(&extended_user_key) && range_tombstone.end_user_key.as_ref().gt(&extended_user_key) - && range_tombstone.sequence >= key.epoch + && range_tombstone.sequence >= key.epoch_with_gap.pure_epoch() && range_tombstone.sequence < earliest_delete_epoch { earliest_delete_epoch = range_tombstone.sequence; @@ -253,9 +254,9 @@ pub async fn gen_test_sstable_impl + Clone + Default + Eq, F: Fil } else if earliest_delete_epoch < user_key_last_delete { user_key_last_delete = earliest_delete_epoch; - key.epoch = earliest_delete_epoch; + key.epoch_with_gap = EpochWithGap::new_from_epoch(earliest_delete_epoch); b.add(key.to_ref(), HummockValue::Delete).await.unwrap(); - key.epoch = epoch; + key.epoch_with_gap = EpochWithGap::new_from_epoch(epoch); } b.add(key.to_ref(), value.as_slice()).await.unwrap(); @@ -347,7 +348,7 @@ pub fn test_user_key_of(idx: usize) -> UserKey> { pub fn test_key_of(idx: usize) -> FullKey> { FullKey { user_key: test_user_key_of(idx), - epoch: 233, + epoch_with_gap: EpochWithGap::new_from_epoch(233), } } diff --git a/src/storage/src/mem_table.rs b/src/storage/src/mem_table.rs index 34508a730a5fb..52c8f0b0c200d 100644 --- a/src/storage/src/mem_table.rs +++ b/src/storage/src/mem_table.rs @@ -35,7 +35,6 @@ use crate::hummock::utils::{ use crate::row_serde::value_serde::ValueRowSerde; use crate::storage_value::StorageValue; use crate::store::*; - pub type ImmutableMemtable = SharedBufferBatch; pub type ImmId = SharedBufferBatchId; @@ -575,6 +574,10 @@ impl LocalStateStore for MemtableLocalState prev_epoch ); } + + async fn try_flush(&mut self) -> StorageResult<()> { + Ok(()) + } } #[cfg(test)] diff --git a/src/storage/src/memory.rs b/src/storage/src/memory.rs index c454ad94339dc..38f243b20d3f3 100644 --- a/src/storage/src/memory.rs +++ b/src/storage/src/memory.rs @@ -20,6 +20,7 @@ use std::sync::{Arc, LazyLock}; use bytes::Bytes; use parking_lot::RwLock; use risingwave_common::catalog::TableId; +use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange, UserKey}; use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch}; @@ -198,6 +199,7 @@ pub mod sled { use bytes::Bytes; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey}; + use risingwave_hummock_sdk::EpochWithGap; use crate::memory::sled::SledRangeKv; use crate::memory::RangeKv; @@ -216,7 +218,7 @@ pub mod sled { table_id, table_key: TableKey(Bytes::from(table_key.to_vec())), }, - epoch, + epoch_with_gap: EpochWithGap::new_from_epoch(epoch), }; let left_full_key = to_full_key(&left_table_key[..]); @@ -319,10 +321,10 @@ mod batched_iter { .collect_vec(); if let Some((last_key, _)) = batch.last() { - let full_key = FullKey::new( + let full_key = FullKey::new_with_gap_epoch( last_key.user_key.table_id, TableKey(last_key.user_key.table_key.0.clone()), - last_key.epoch, + last_key.epoch_with_gap, ); self.range.0 = Bound::Excluded(full_key); } @@ -448,7 +450,7 @@ where Included(k) => Included(FullKey::new( table_id, TableKey(Bytes::from(k.as_ref().to_vec())), - HummockEpoch::MAX, + MAX_EPOCH, )), Excluded(k) => Excluded(FullKey::new( table_id, @@ -458,7 +460,7 @@ where Unbounded => Included(FullKey::new( table_id, TableKey(Bytes::from(b"".to_vec())), - HummockEpoch::MAX, + MAX_EPOCH, )), }; let end = match table_key_range.end_bound() { @@ -470,14 +472,14 @@ where Excluded(k) => Excluded(FullKey::new( table_id, TableKey(Bytes::from(k.as_ref().to_vec())), - HummockEpoch::MAX, + MAX_EPOCH, )), Unbounded => { if let Some(next_table_id) = table_id.table_id().checked_add(1) { Excluded(FullKey::new( next_table_id.into(), TableKey(Bytes::from(b"".to_vec())), - HummockEpoch::MAX, + MAX_EPOCH, )) } else { Unbounded @@ -515,7 +517,7 @@ impl RangeKvStateStore { .inner .range(to_full_key_range(table_id, key_range), None)? { - if key.epoch > epoch { + if key.epoch_with_gap.pure_epoch() > epoch { continue; } if Some(&key.user_key) != last_user_key.as_ref() { @@ -695,7 +697,7 @@ impl StateStoreIter for RangeKvStateStoreIter { impl RangeKvStateStoreIter { fn next_inner(&mut self) -> StorageResult> { while let Some((key, value)) = self.inner.next()? { - if key.epoch > self.epoch { + if key.epoch_with_gap.pure_epoch() > self.epoch { continue; } if Some(key.user_key.as_ref()) != self.last_key.as_ref().map(|key| key.as_ref()) { diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index fd3e235201eb4..024946b20bbc3 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -263,7 +263,6 @@ impl LocalStateStore for MonitoredStateStore { &mut self, delete_ranges: Vec<(Bound, Bound)>, ) -> impl Future> + Send + '_ { - // TODO: collect metrics self.inner .flush(delete_ranges) .verbose_instrument_await("store_flush") @@ -285,6 +284,12 @@ impl LocalStateStore for MonitoredStateStore { // TODO: may collect metrics self.inner.seal_current_epoch(next_epoch) } + + fn try_flush(&mut self) -> impl Future> + Send + '_ { + self.inner + .try_flush() + .verbose_instrument_await("store_try_flush") + } } impl StateStore for MonitoredStateStore { diff --git a/src/storage/src/monitor/traced_store.rs b/src/storage/src/monitor/traced_store.rs index e92b5974844e2..2a7c87a738174 100644 --- a/src/storage/src/monitor/traced_store.rs +++ b/src/storage/src/monitor/traced_store.rs @@ -212,6 +212,13 @@ impl LocalStateStore for TracedStateStore { let _span = TraceSpan::new_seal_current_epoch_span(next_epoch, self.storage_type); self.inner.seal_current_epoch(next_epoch) } + + async fn try_flush(&mut self) -> StorageResult<()> { + let span = TraceSpan::new_try_flush_span(self.storage_type); + let res = self.inner.try_flush().await; + span.may_send_result(OperationResult::TryFlush(res.as_ref().map(|o| *o).into())); + res + } } impl StateStore for TracedStateStore { diff --git a/src/storage/src/opts.rs b/src/storage/src/opts.rs index c850761a9df40..16112d1ae9f24 100644 --- a/src/storage/src/opts.rs +++ b/src/storage/src/opts.rs @@ -125,6 +125,8 @@ pub struct StorageOpts { pub compactor_max_sst_size: u64, /// enable FastCompactorRunner. pub enable_fast_compaction: bool, + + pub mem_table_spill_threshold: usize, } impl Default for StorageOpts { @@ -237,6 +239,7 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt compactor_max_task_multiplier: c.storage.compactor_max_task_multiplier, compactor_max_sst_size: c.storage.compactor_max_sst_size, enable_fast_compaction: c.storage.enable_fast_compaction, + mem_table_spill_threshold: c.storage.mem_table_spill_threshold, } } } diff --git a/src/storage/src/panic_store.rs b/src/storage/src/panic_store.rs index de4e9f38032d7..0457bd1828c2c 100644 --- a/src/storage/src/panic_store.rs +++ b/src/storage/src/panic_store.rs @@ -133,6 +133,11 @@ impl LocalStateStore for PanicStateStore { fn seal_current_epoch(&mut self, _next_epoch: u64) { panic!("should not operate on the panic state store!") } + + #[allow(clippy::unused_async)] + async fn try_flush(&mut self) -> StorageResult<()> { + panic!("should not operate on the panic state store!"); + } } impl StateStore for PanicStateStore { diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 2cef99ab4e651..f47e5644a388b 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -234,6 +234,7 @@ pub trait LocalStateStore: StaticSendSync { delete_ranges: Vec<(Bound, Bound)>, ) -> impl Future> + Send + '_; + fn try_flush(&mut self) -> impl Future> + Send + '_; fn epoch(&self) -> u64; fn is_dirty(&self) -> bool; diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index 06305bb208df2..46afb0711cdae 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -430,6 +430,13 @@ pub mod verify { self.actual.flush(delete_ranges).await } + async fn try_flush(&mut self) -> StorageResult<()> { + if let Some(expected) = &mut self.expected { + expected.try_flush().await?; + } + self.actual.try_flush().await + } + async fn init(&mut self, options: InitOptions) -> StorageResult<()> { self.actual.init(options.clone()).await?; if let Some(expected) = &mut self.expected { @@ -784,6 +791,8 @@ pub mod boxed_state_store { delete_ranges: Vec<(Bound, Bound)>, ) -> StorageResult; + async fn try_flush(&mut self) -> StorageResult<()>; + fn epoch(&self) -> u64; fn is_dirty(&self) -> bool; @@ -839,6 +848,10 @@ pub mod boxed_state_store { self.flush(delete_ranges).await } + async fn try_flush(&mut self) -> StorageResult<()> { + self.try_flush().await + } + fn epoch(&self) -> u64 { self.epoch() } @@ -905,6 +918,10 @@ pub mod boxed_state_store { self.deref_mut().flush(delete_ranges) } + fn try_flush(&mut self) -> impl Future> + Send + '_ { + self.deref_mut().try_flush() + } + fn epoch(&self) -> u64 { self.deref().epoch() } diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index 655effee51cfd..d85914a87b0df 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -81,6 +81,7 @@ assert_matches = "1" criterion = { workspace = true, features = ["async_tokio", "async"] } expect-test = "1" risingwave_expr_impl = { workspace = true } +risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] } risingwave_hummock_test = { path = "../storage/hummock_test", features = [ "test", ] } diff --git a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs index 4a7f2e20eadf9..d4161da6b581d 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs @@ -225,14 +225,14 @@ impl LogReader for KvLogStoreReader { serde.serialize_log_store_pk(vnode, item_epoch, Some(end_seq_id)); let state_store = &state_store; - // Use u64::MAX here because the epoch to consume may be below the safe + // Use MAX_EPOCH here because the epoch to consume may be below the safe // epoch async move { Ok::<_, anyhow::Error>(Box::pin( state_store .iter( (Included(range_start), Included(range_end)), - u64::MAX, + MAX_EPOCH, ReadOptions { prefetch_options: PrefetchOptions::new_for_exhaust_iter(), diff --git a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs index ba69209887b67..1ca0edc89be50 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs @@ -34,6 +34,7 @@ use risingwave_common::hash::VirtualNode; use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; +use risingwave_common::util::epoch::MAX_EPOCH; use risingwave_common::util::row_serde::OrderedRowSerde; use risingwave_common::util::sort_util::OrderType; use risingwave_common::util::value_encoding::{ @@ -579,7 +580,7 @@ impl LogStoreRowOpStream { // sorted by epoch descending. Earlier epoch at the end self.not_started_streams - .sort_by_key(|(epoch, _)| u64::MAX - *epoch); + .sort_by_key(|(epoch, _)| MAX_EPOCH - *epoch); let (epoch, stream) = self .not_started_streams diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index f0ef04ab7ec91..83c722d8683f9 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -1090,6 +1090,11 @@ where self.local_store.seal_current_epoch(next_epoch); Ok(()) } + + pub async fn try_flush(&mut self) -> StreamExecutorResult<()> { + self.local_store.try_flush().await?; + Ok(()) + } } // Iterator functions diff --git a/src/stream/src/executor/dynamic_filter.rs b/src/stream/src/executor/dynamic_filter.rs index ccb55b75c24fc..36418a20724c3 100644 --- a/src/stream/src/executor/dynamic_filter.rs +++ b/src/stream/src/executor/dynamic_filter.rs @@ -179,7 +179,7 @@ impl DynamicFilterExecutor HashJoinExecutor { let mut right_time = Duration::from_nanos(0); @@ -792,6 +793,7 @@ impl HashJoinExecutor { let barrier_start_time = Instant::now(); @@ -839,6 +841,14 @@ impl HashJoinExecutor StreamExecutorResult<()> { + // All changes to the state has been buffered in the mem-table of the state table. Just + // `commit` them here. + self.side_l.ht.try_flush().await?; + self.side_r.ht.try_flush().await?; + Ok(()) + } + // We need to manually evict the cache. fn evict_cache( side_update: &mut JoinSide, diff --git a/src/stream/src/executor/managed_state/join/mod.rs b/src/stream/src/executor/managed_state/join/mod.rs index d8ad231c677c7..05f8c1ffde9b3 100644 --- a/src/stream/src/executor/managed_state/join/mod.rs +++ b/src/stream/src/executor/managed_state/join/mod.rs @@ -479,6 +479,12 @@ impl JoinHashMap { Ok(()) } + pub async fn try_flush(&mut self) -> StreamExecutorResult<()> { + self.state.table.try_flush().await?; + self.degree_state.table.try_flush().await?; + Ok(()) + } + /// Insert a join row #[allow(clippy::unused_async)] pub async fn insert(&mut self, key: &K, value: JoinRow) -> StreamExecutorResult<()> { diff --git a/src/stream/src/executor/mview/materialize.rs b/src/stream/src/executor/mview/materialize.rs index cfb02ec34c481..c068a6eba0de1 100644 --- a/src/stream/src/executor/mview/materialize.rs +++ b/src/stream/src/executor/mview/materialize.rs @@ -169,6 +169,7 @@ impl MaterializeExecutor { match generate_output(fixed_changes, data_types.clone())? { Some(output_chunk) => { self.state_table.write_chunk(output_chunk.clone()); + self.state_table.try_flush().await?; Message::Chunk(output_chunk) } None => continue, @@ -177,6 +178,7 @@ impl MaterializeExecutor { ConflictBehavior::NoCheck => { self.state_table.write_chunk(chunk.clone()); + self.state_table.try_flush().await?; Message::Chunk(chunk) } } diff --git a/src/tests/compaction_test/Cargo.toml b/src/tests/compaction_test/Cargo.toml index 87ad5946b26d5..8825e6692d6dd 100644 --- a/src/tests/compaction_test/Cargo.toml +++ b/src/tests/compaction_test/Cargo.toml @@ -44,6 +44,9 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ ] } tracing = "0.1" +[dev-dependencies] +risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] } + [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../../workspace-hack" } diff --git a/src/tests/simulation/Cargo.toml b/src/tests/simulation/Cargo.toml index 1bf59ada48586..f5b9d48eb0d20 100644 --- a/src/tests/simulation/Cargo.toml +++ b/src/tests/simulation/Cargo.toml @@ -37,6 +37,7 @@ risingwave_ctl = { workspace = true } risingwave_e2e_extended_mode_test = { path = "../e2e_extended_mode" } risingwave_expr_impl = { workspace = true } risingwave_frontend = { workspace = true } +risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] } risingwave_meta_node = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true }