From c882bdaa8acd73a6d3a20ff4809dda89aead6fb8 Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Wed, 11 Sep 2024 20:54:44 +0800 Subject: [PATCH] refactor(storage): cache recent versions and unify batch query and time travel (#18477) --- e2e_test/time_travel/syntax.slt | 4 + src/common/src/config.rs | 3 + src/config/docs.md | 1 + src/storage/Cargo.toml | 2 +- src/storage/benches/bench_table_watermarks.rs | 2 +- src/storage/hummock_trace/src/opts.rs | 4 +- .../event_handler/hummock_event_handler.rs | 70 ++-- .../src/hummock/event_handler/uploader/mod.rs | 29 +- src/storage/src/hummock/local_version/mod.rs | 1 + .../hummock/local_version/pinned_version.rs | 9 +- .../hummock/local_version/recent_versions.rs | 321 ++++++++++++++++++ .../src/hummock/store/hummock_storage.rs | 147 ++++---- src/storage/src/hummock/utils.rs | 21 +- src/storage/src/opts.rs | 6 + src/storage/src/store.rs | 6 +- .../src/table/batch_table/storage_table.rs | 14 +- 16 files changed, 500 insertions(+), 140 deletions(-) create mode 100644 src/storage/src/hummock/local_version/recent_versions.rs diff --git a/e2e_test/time_travel/syntax.slt b/e2e_test/time_travel/syntax.slt index 6c3408a276763..5895f6d9b9e8b 100644 --- a/e2e_test/time_travel/syntax.slt +++ b/e2e_test/time_travel/syntax.slt @@ -7,6 +7,10 @@ SET QUERY_MODE TO local; statement ok CREATE TABLE t (k INT); +query I +SELECT * FROM t; +---- + query error SELECT * FROM t FOR SYSTEM_TIME AS OF 963716300; ---- diff --git a/src/common/src/config.rs b/src/common/src/config.rs index d78fdbe51fa9b..e2b4dd7b0f97c 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -699,6 +699,9 @@ pub struct StorageConfig { #[serde(default)] pub prefetch_buffer_capacity_mb: Option, + #[serde(default)] + pub max_cached_recent_versions_number: Option, + /// max prefetch block number #[serde(default = "default::storage::max_prefetch_block_number")] pub max_prefetch_block_number: usize, diff --git a/src/config/docs.md b/src/config/docs.md index 47905d71e5e0c..bcce61d8bb456 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -119,6 +119,7 @@ This page is automatically generated by `./risedev generate-example-config` | enable_fast_compaction | | true | | high_priority_ratio_in_percent | DEPRECATED: This config will be deprecated in the future version, use `storage.cache.block_cache_eviction.high_priority_ratio_in_percent` with `storage.cache.block_cache_eviction.algorithm = "Lru"` instead. | | | imm_merge_threshold | The threshold for the number of immutable memtables to merge to a new imm. | 0 | +| max_cached_recent_versions_number | | | | max_concurrent_compaction_task_number | | 16 | | max_prefetch_block_number | max prefetch block number | 16 | | max_preload_io_retry_times | | 3 | diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 2886c4e4e23f7..6a6bde4b146e0 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -96,7 +96,7 @@ workspace-hack = { path = "../workspace-hack" } bincode = "1" criterion = { workspace = true, features = ["async_futures", "async_tokio"] } expect-test = "1" -risingwave_hummock_sdk = { workspace = true } +risingwave_hummock_sdk = { workspace = true, features = ["test"] } risingwave_test_runner = { workspace = true } uuid = { version = "1", features = ["v4"] } diff --git a/src/storage/benches/bench_table_watermarks.rs b/src/storage/benches/bench_table_watermarks.rs index 4a9e1c5edda0b..5153dd0f9fe38 100644 --- a/src/storage/benches/bench_table_watermarks.rs +++ b/src/storage/benches/bench_table_watermarks.rs @@ -166,7 +166,7 @@ fn bench_table_watermarks(c: &mut Criterion) { let mut pinned_version = PinnedVersion::new(versions.pop_front().unwrap(), unbounded_channel().0); while let Some(version) = versions.pop_front() { - pinned_version = pinned_version.new_pin_version(version); + pinned_version = pinned_version.new_pin_version(version).unwrap(); } }, BatchSize::SmallInput, diff --git a/src/storage/hummock_trace/src/opts.rs b/src/storage/hummock_trace/src/opts.rs index 5d480cca96b58..ff8b43c15c458 100644 --- a/src/storage/hummock_trace/src/opts.rs +++ b/src/storage/hummock_trace/src/opts.rs @@ -109,7 +109,7 @@ pub struct TracedReadOptions { pub retention_seconds: Option, pub table_id: TracedTableId, pub read_version_from_backup: bool, - pub read_version_from_time_travel: bool, + pub read_committed: bool, } impl TracedReadOptions { @@ -125,7 +125,7 @@ impl TracedReadOptions { retention_seconds: None, table_id: TracedTableId { table_id }, read_version_from_backup: false, - read_version_from_time_travel: false, + read_committed: false, } } } diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index f2aa2ea7fd88d..1c8abc78ddffc 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -50,6 +50,7 @@ use crate::hummock::event_handler::{ ReadOnlyRwLockRef, }; use crate::hummock::local_version::pinned_version::PinnedVersion; +use crate::hummock::local_version::recent_versions::RecentVersions; use crate::hummock::store::version::{ HummockReadVersion, StagingData, StagingSstableInfo, VersionUpdate, }; @@ -197,7 +198,7 @@ pub struct HummockEventHandler { local_read_version_mapping: HashMap, version_update_notifier_tx: Arc>, - pinned_version: Arc>, + recent_versions: Arc>, write_conflict_detector: Option>, uploader: HummockUploader, @@ -355,7 +356,10 @@ impl HummockEventHandler { hummock_event_rx, version_update_rx, version_update_notifier_tx, - pinned_version: Arc::new(ArcSwap::from_pointee(pinned_version)), + recent_versions: Arc::new(ArcSwap::from_pointee(RecentVersions::new( + pinned_version, + storage_opts.max_cached_recent_versions_number, + ))), write_conflict_detector, read_version_mapping, local_read_version_mapping: Default::default(), @@ -371,8 +375,8 @@ impl HummockEventHandler { self.version_update_notifier_tx.clone() } - pub fn pinned_version(&self) -> Arc> { - self.pinned_version.clone() + pub fn recent_versions(&self) -> Arc> { + self.recent_versions.clone() } pub fn read_version_mapping(&self) -> ReadOnlyReadVersionMapping { @@ -529,17 +533,18 @@ impl HummockEventHandler { .await .expect("should not be empty"); let prev_version_id = latest_version_ref.id(); - let new_version = Self::resolve_version_update_info( + if let Some(new_version) = Self::resolve_version_update_info( latest_version_ref.clone(), version_update, None, - ); - info!( - ?prev_version_id, - new_version_id = ?new_version.id(), - "recv new version" - ); - latest_version = Some(new_version); + ) { + info!( + ?prev_version_id, + new_version_id = ?new_version.id(), + "recv new version" + ); + latest_version = Some(new_version); + } } self.apply_version_update( @@ -582,21 +587,21 @@ impl HummockEventHandler { .unwrap_or_else(|| self.uploader.hummock_version().clone()); let mut sst_delta_infos = vec![]; - let new_pinned_version = Self::resolve_version_update_info( + if let Some(new_pinned_version) = Self::resolve_version_update_info( pinned_version.clone(), version_payload, Some(&mut sst_delta_infos), - ); - - self.refiller - .start_cache_refill(sst_delta_infos, pinned_version, new_pinned_version); + ) { + self.refiller + .start_cache_refill(sst_delta_infos, pinned_version, new_pinned_version); + } } fn resolve_version_update_info( pinned_version: PinnedVersion, version_payload: HummockVersionUpdate, mut sst_delta_infos: Option<&mut Vec>, - ) -> PinnedVersion { + ) -> Option { let newly_pinned_version = match version_payload { HummockVersionUpdate::VersionDeltas(version_deltas) => { let mut version_to_apply = pinned_version.version().clone(); @@ -629,8 +634,9 @@ impl HummockEventHandler { .metrics .event_handler_on_apply_version_update .start_timer(); - self.pinned_version - .store(Arc::new(new_pinned_version.clone())); + self.recent_versions.rcu(|prev_recent_versions| { + prev_recent_versions.with_new_version(new_pinned_version.clone()) + }); { self.for_each_read_version( @@ -663,7 +669,10 @@ impl HummockEventHandler { // TODO: should we change the logic when supporting partial ckpt? if let Some(sstable_object_id_manager) = &self.sstable_object_id_manager { sstable_object_id_manager.remove_watermark_object_id(TrackerId::Epoch( - self.pinned_version.load().visible_table_committed_epoch(), + self.recent_versions + .load() + .latest_version() + .visible_table_committed_epoch(), )); } @@ -789,13 +798,13 @@ impl HummockEventHandler { is_replicated, vnodes, } => { - let pinned_version = self.pinned_version.load(); + let pinned_version = self.recent_versions.load().latest_version().clone(); let instance_id = self.generate_instance_id(); let basic_read_version = Arc::new(RwLock::new( HummockReadVersion::new_with_replication_option( table_id, instance_id, - (**pinned_version).clone(), + pinned_version, is_replicated, vnodes, ), @@ -992,7 +1001,7 @@ mod tests { ); let event_tx = event_handler.event_sender(); - let latest_version = event_handler.pinned_version.clone(); + let latest_version = event_handler.recent_versions.clone(); let latest_version_update_tx = event_handler.version_update_notifier_tx.clone(); let send_clear = |version_id| { @@ -1018,12 +1027,15 @@ mod tests { let (old_version, new_version, refill_finish_tx) = refill_task_rx.recv().await.unwrap(); assert_eq!(old_version.version(), initial_version.version()); assert_eq!(new_version.version(), &version1); - assert_eq!(latest_version.load().version(), initial_version.version()); + assert_eq!( + latest_version.load().latest_version().version(), + initial_version.version() + ); let mut changed = latest_version_update_tx.subscribe(); refill_finish_tx.send(()).unwrap(); changed.changed().await.unwrap(); - assert_eq!(latest_version.load().version(), &version1); + assert_eq!(latest_version.load().latest_version().version(), &version1); } // test recovery with pending refill task @@ -1050,11 +1062,11 @@ mod tests { refill_task_rx.recv().await.unwrap(); assert_eq!(old_version3.version(), &version2); assert_eq!(new_version3.version(), &version3); - assert_eq!(latest_version.load().version(), &version1); + assert_eq!(latest_version.load().latest_version().version(), &version1); let rx = send_clear(version3.id); rx.await.unwrap(); - assert_eq!(latest_version.load().version(), &version3); + assert_eq!(latest_version.load().latest_version().version(), &version3); } async fn assert_pending(fut: &mut (impl Future + Unpin)) { @@ -1081,7 +1093,7 @@ mod tests { ))) .unwrap(); rx.await.unwrap(); - assert_eq!(latest_version.load().version(), &version5); + assert_eq!(latest_version.load().latest_version().version(), &version5); } } diff --git a/src/storage/src/hummock/event_handler/uploader/mod.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs index 4494049d93b0b..90e6a9306930a 100644 --- a/src/storage/src/hummock/event_handler/uploader/mod.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -1643,7 +1643,8 @@ pub(crate) mod tests { let new_pinned_version = uploader .context .pinned_version - .new_pin_version(test_hummock_version(epoch1)); + .new_pin_version(test_hummock_version(epoch1)) + .unwrap(); uploader.update_pinned_version(new_pinned_version); assert_eq!(epoch1, uploader.max_committed_epoch()); } @@ -1672,7 +1673,8 @@ pub(crate) mod tests { let new_pinned_version = uploader .context .pinned_version - .new_pin_version(test_hummock_version(epoch1)); + .new_pin_version(test_hummock_version(epoch1)) + .unwrap(); uploader.update_pinned_version(new_pinned_version); assert!(uploader.data().syncing_data.is_empty()); assert_eq!(epoch1, uploader.max_committed_epoch()); @@ -1706,7 +1708,8 @@ pub(crate) mod tests { let new_pinned_version = uploader .context .pinned_version - .new_pin_version(test_hummock_version(epoch1)); + .new_pin_version(test_hummock_version(epoch1)) + .unwrap(); uploader.update_pinned_version(new_pinned_version); assert!(uploader.data().syncing_data.is_empty()); assert_eq!(epoch1, uploader.max_committed_epoch()); @@ -1730,11 +1733,21 @@ pub(crate) mod tests { let epoch4 = epoch3.next_epoch(); let epoch5 = epoch4.next_epoch(); let epoch6 = epoch5.next_epoch(); - let version1 = initial_pinned_version.new_pin_version(test_hummock_version(epoch1)); - let version2 = initial_pinned_version.new_pin_version(test_hummock_version(epoch2)); - let version3 = initial_pinned_version.new_pin_version(test_hummock_version(epoch3)); - let version4 = initial_pinned_version.new_pin_version(test_hummock_version(epoch4)); - let version5 = initial_pinned_version.new_pin_version(test_hummock_version(epoch5)); + let version1 = initial_pinned_version + .new_pin_version(test_hummock_version(epoch1)) + .unwrap(); + let version2 = initial_pinned_version + .new_pin_version(test_hummock_version(epoch2)) + .unwrap(); + let version3 = initial_pinned_version + .new_pin_version(test_hummock_version(epoch3)) + .unwrap(); + let version4 = initial_pinned_version + .new_pin_version(test_hummock_version(epoch4)) + .unwrap(); + let version5 = initial_pinned_version + .new_pin_version(test_hummock_version(epoch5)) + .unwrap(); uploader.start_epochs_for_test([epoch6]); uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch6); diff --git a/src/storage/src/hummock/local_version/mod.rs b/src/storage/src/hummock/local_version/mod.rs index 578e123c6574e..4a45c8dc9075c 100644 --- a/src/storage/src/hummock/local_version/mod.rs +++ b/src/storage/src/hummock/local_version/mod.rs @@ -13,3 +13,4 @@ // limitations under the License. pub mod pinned_version; +pub mod recent_versions; diff --git a/src/storage/src/hummock/local_version/pinned_version.rs b/src/storage/src/hummock/local_version/pinned_version.rs index 5ef53edcd26ef..afaafdf7cbe8a 100644 --- a/src/storage/src/hummock/local_version/pinned_version.rs +++ b/src/storage/src/hummock/local_version/pinned_version.rs @@ -92,22 +92,25 @@ impl PinnedVersion { } } - pub fn new_pin_version(&self, version: HummockVersion) -> Self { + pub fn new_pin_version(&self, version: HummockVersion) -> Option { assert!( version.id >= self.version.id, "pinning a older version {}. Current is {}", version.id, self.version.id ); + if version.id == self.version.id { + return None; + } let version_id = version.id; - PinnedVersion { + Some(PinnedVersion { version: Arc::new(version), guard: Arc::new(PinnedVersionGuard::new( version_id, self.guard.pinned_version_manager_tx.clone(), )), - } + }) } pub fn id(&self) -> HummockVersionId { diff --git a/src/storage/src/hummock/local_version/recent_versions.rs b/src/storage/src/hummock/local_version/recent_versions.rs new file mode 100644 index 0000000000000..8d3f1a015ad6a --- /dev/null +++ b/src/storage/src/hummock/local_version/recent_versions.rs @@ -0,0 +1,321 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::Ordering; + +use risingwave_common::catalog::TableId; +use risingwave_hummock_sdk::HummockEpoch; + +use crate::hummock::local_version::pinned_version::PinnedVersion; + +pub struct RecentVersions { + latest_version: PinnedVersion, + is_latest_committed: bool, + recent_versions: Vec, // earlier version at the front + max_version_num: usize, +} + +impl RecentVersions { + pub fn new(version: PinnedVersion, max_version_num: usize) -> Self { + assert!(max_version_num > 0); + Self { + latest_version: version, + is_latest_committed: true, // The first version is always treated as committed epochs + recent_versions: Vec::new(), + max_version_num, + } + } + + fn has_table_committed(&self, new_version: &PinnedVersion) -> bool { + let mut has_table_committed = false; + for (table_id, info) in new_version.version().state_table_info.info() { + if let Some(prev_info) = self + .latest_version + .version() + .state_table_info + .info() + .get(table_id) + { + match info.committed_epoch.cmp(&prev_info.committed_epoch) { + Ordering::Less => { + unreachable!( + "table {} has regress committed epoch {}, prev committed epoch {}", + table_id, info.committed_epoch, prev_info.committed_epoch + ); + } + Ordering::Equal => {} + Ordering::Greater => { + has_table_committed = true; + } + } + } else { + has_table_committed = true; + } + } + has_table_committed + } + + #[must_use] + pub fn with_new_version(&self, version: PinnedVersion) -> Self { + assert!(version.version().id > self.latest_version.version().id); + let is_committed = self.has_table_committed(&version); + let recent_versions = if self.is_latest_committed { + let prev_recent_versions = if self.recent_versions.len() >= self.max_version_num { + assert_eq!(self.recent_versions.len(), self.max_version_num); + &self.recent_versions[1..] + } else { + &self.recent_versions[..] + }; + let mut recent_versions = Vec::with_capacity(prev_recent_versions.len() + 1); + recent_versions.extend(prev_recent_versions.iter().cloned()); + recent_versions.push(self.latest_version.clone()); + recent_versions + } else { + self.recent_versions.clone() + }; + Self { + latest_version: version, + is_latest_committed: is_committed, + recent_versions, + max_version_num: self.max_version_num, + } + } + + pub fn latest_version(&self) -> &PinnedVersion { + &self.latest_version + } + + /// Return the latest version that is safe to read `epoch` on `table_id`. + /// + /// `safe to read` means that the `committed_epoch` of the `table_id` in the version won't be greater than the given `epoch` + pub fn get_safe_version( + &self, + table_id: TableId, + epoch: HummockEpoch, + ) -> Option { + if let Some(info) = self + .latest_version + .version() + .state_table_info + .info() + .get(&table_id) + { + if info.committed_epoch <= epoch { + Some(self.latest_version.clone()) + } else { + self.get_safe_version_from_recent(table_id, epoch) + } + } else { + None + } + } + + fn get_safe_version_from_recent( + &self, + table_id: TableId, + epoch: HummockEpoch, + ) -> Option { + if cfg!(debug_assertions) { + assert!( + epoch + < self + .latest_version + .version() + .state_table_info + .info() + .get(&table_id) + .expect("should exist") + .committed_epoch + ); + } + let result = self.recent_versions.binary_search_by(|version| { + let committed_epoch = version + .version() + .state_table_info + .info() + .get(&table_id) + .map(|info| info.committed_epoch); + if let Some(committed_epoch) = committed_epoch { + committed_epoch.cmp(&epoch) + } else { + // We have ensured that the table_id exists in the latest version, so if the table_id does not exist in a + // previous version, the table must have not created yet, and therefore has less ordering. + Ordering::Less + } + }); + match result { + Ok(index) => Some(self.recent_versions[index].clone()), + Err(index) => { + // `index` is index of the first version that has `committed_epoch` greater than `epoch` + // or `index` equals `recent_version.len()` when `epoch` is greater than all `committed_epoch` + let version = if index >= self.recent_versions.len() { + assert_eq!(index, self.recent_versions.len()); + self.recent_versions.last().cloned() + } else if index == 0 { + // The earliest version has a higher committed epoch + None + } else { + self.recent_versions.get(index - 1).cloned() + }; + version.and_then(|version| { + if version + .version() + .state_table_info + .info() + .contains_key(&table_id) + { + Some(version) + } else { + // if the table does not exist in the version, return `None` to try get a time travel version + None + } + }) + } + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use risingwave_common::catalog::TableId; + use risingwave_hummock_sdk::version::HummockVersion; + use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo}; + use tokio::sync::mpsc::unbounded_channel; + + use crate::hummock::local_version::pinned_version::PinnedVersion; + use crate::hummock::local_version::recent_versions::RecentVersions; + + const TEST_TABLE_ID1: TableId = TableId::new(233); + const TEST_TABLE_ID2: TableId = TableId::new(234); + + fn gen_pin_version( + version_id: u64, + table_committed_epoch: impl IntoIterator, + ) -> PinnedVersion { + PinnedVersion::new( + HummockVersion::from_rpc_protobuf(&PbHummockVersion { + id: version_id, + state_table_info: HashMap::from_iter(table_committed_epoch.into_iter().map( + |(table_id, committed_epoch)| { + ( + table_id.table_id, + StateTableInfo { + committed_epoch, + safe_epoch: 0, + compaction_group_id: 0, + }, + ) + }, + )), + ..Default::default() + }), + unbounded_channel().0, + ) + } + + fn assert_query_equal( + recent_version: &RecentVersions, + expected: &[(TableId, u64, Option<&PinnedVersion>)], + ) { + for (table_id, epoch, expected_version) in expected + .iter() + .cloned() + .chain([(TEST_TABLE_ID1, 0, None), (TEST_TABLE_ID2, 0, None)]) + { + let version = recent_version.get_safe_version(table_id, epoch); + assert_eq!( + version.as_ref().map(|version| version.id()), + expected_version.map(|version| version.id()) + ); + } + } + + #[test] + fn test_basic() { + let epoch1 = 233; + let epoch0 = epoch1 - 1; + let epoch2 = epoch1 + 1; + let epoch3 = epoch2 + 1; + let epoch4 = epoch3 + 1; + let version1 = gen_pin_version(1, [(TEST_TABLE_ID1, epoch1)]); + // with at most 2 historical versions + let recent_versions = RecentVersions::new(version1.clone(), 2); + assert!(recent_versions.recent_versions.is_empty()); + assert!(recent_versions.is_latest_committed); + assert_query_equal( + &recent_versions, + &[ + (TEST_TABLE_ID1, epoch0, None), + (TEST_TABLE_ID1, epoch1, Some(&version1)), + (TEST_TABLE_ID1, epoch2, Some(&version1)), + ], + ); + + let recent_versions = + recent_versions.with_new_version(gen_pin_version(2, [(TEST_TABLE_ID1, epoch1)])); + assert_eq!(recent_versions.recent_versions.len(), 1); + assert!(!recent_versions.is_latest_committed); + + let version3 = gen_pin_version(3, [(TEST_TABLE_ID1, epoch2)]); + let recent_versions = recent_versions.with_new_version(version3.clone()); + assert_eq!(recent_versions.recent_versions.len(), 1); + assert!(recent_versions.is_latest_committed); + assert_query_equal( + &recent_versions, + &[ + (TEST_TABLE_ID1, epoch0, None), + (TEST_TABLE_ID1, epoch1, Some(&version1)), + (TEST_TABLE_ID1, epoch2, Some(&version3)), + (TEST_TABLE_ID1, epoch3, Some(&version3)), + ], + ); + + let version4 = gen_pin_version(4, [(TEST_TABLE_ID2, epoch1), (TEST_TABLE_ID1, epoch2)]); + let recent_versions = recent_versions.with_new_version(version4.clone()); + assert_eq!(recent_versions.recent_versions.len(), 2); + assert!(recent_versions.is_latest_committed); + assert_query_equal( + &recent_versions, + &[ + (TEST_TABLE_ID1, epoch0, None), + (TEST_TABLE_ID1, epoch1, Some(&version1)), + (TEST_TABLE_ID1, epoch2, Some(&version4)), + (TEST_TABLE_ID1, epoch3, Some(&version4)), + (TEST_TABLE_ID2, epoch0, None), + (TEST_TABLE_ID2, epoch1, Some(&version4)), + (TEST_TABLE_ID2, epoch2, Some(&version4)), + ], + ); + + let version5 = gen_pin_version(5, [(TEST_TABLE_ID2, epoch1), (TEST_TABLE_ID1, epoch3)]); + let recent_versions = recent_versions.with_new_version(version5.clone()); + assert_eq!(recent_versions.recent_versions.len(), 2); + assert!(recent_versions.is_latest_committed); + assert_query_equal( + &recent_versions, + &[ + (TEST_TABLE_ID1, epoch0, None), + (TEST_TABLE_ID1, epoch1, None), + (TEST_TABLE_ID1, epoch2, Some(&version4)), + (TEST_TABLE_ID1, epoch3, Some(&version5)), + (TEST_TABLE_ID1, epoch4, Some(&version5)), + (TEST_TABLE_ID2, epoch0, None), + (TEST_TABLE_ID2, epoch1, Some(&version5)), + (TEST_TABLE_ID2, epoch2, Some(&version5)), + ], + ); + } +} diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 82b98c5f4fb39..b4924a5dca60f 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -14,7 +14,7 @@ use std::collections::HashSet; use std::future::Future; -use std::ops::{Bound, Deref}; +use std::ops::Bound; use std::sync::Arc; use arc_swap::ArcSwap; @@ -50,9 +50,10 @@ use crate::hummock::event_handler::{ }; use crate::hummock::iterator::change_log::ChangeLogIterator; use crate::hummock::local_version::pinned_version::{start_pinned_version_worker, PinnedVersion}; +use crate::hummock::local_version::recent_versions::RecentVersions; use crate::hummock::observer_manager::HummockObserverNode; use crate::hummock::time_travel_version_cache::SimpleTimeTravelVersionCache; -use crate::hummock::utils::{validate_safe_epoch, wait_for_epoch}; +use crate::hummock::utils::wait_for_epoch; use crate::hummock::write_limiter::{WriteLimiter, WriteLimiterRef}; use crate::hummock::{ HummockEpoch, HummockError, HummockResult, HummockStorageIterator, HummockStorageRevIterator, @@ -97,7 +98,7 @@ pub struct HummockStorage { version_update_notifier_tx: Arc>, - pinned_version: Arc>, + recent_versions: Arc>, hummock_version_reader: HummockVersionReader, @@ -223,7 +224,7 @@ impl HummockStorage { version_update_notifier_tx: hummock_event_handler.version_update_notifier_tx(), hummock_event_sender: event_tx.clone(), _version_update_sender: version_update_tx, - pinned_version: hummock_event_handler.pinned_version(), + recent_versions: hummock_event_handler.recent_versions(), hummock_version_reader: HummockVersionReader::new( sstable_store, state_store_metrics.clone(), @@ -260,15 +261,9 @@ impl HummockStorage { ) -> StorageResult> { let key_range = (Bound::Included(key.clone()), Bound::Included(key.clone())); - let (key_range, read_version_tuple) = if read_options.read_version_from_time_travel { - self.build_read_version_by_time_travel(epoch, read_options.table_id, key_range) - .await? - } else if read_options.read_version_from_backup { - self.build_read_version_tuple_from_backup(epoch, read_options.table_id, key_range) - .await? - } else { - self.build_read_version_tuple(epoch, read_options.table_id, key_range)? - }; + let (key_range, read_version_tuple) = self + .build_read_version_tuple(epoch, key_range, &read_options) + .await?; if is_empty_key_range(&key_range) { return Ok(None); @@ -285,15 +280,9 @@ impl HummockStorage { epoch: u64, read_options: ReadOptions, ) -> StorageResult { - let (key_range, read_version_tuple) = if read_options.read_version_from_time_travel { - self.build_read_version_by_time_travel(epoch, read_options.table_id, key_range) - .await? - } else if read_options.read_version_from_backup { - self.build_read_version_tuple_from_backup(epoch, read_options.table_id, key_range) - .await? - } else { - self.build_read_version_tuple(epoch, read_options.table_id, key_range)? - }; + let (key_range, read_version_tuple) = self + .build_read_version_tuple(epoch, key_range, &read_options) + .await?; self.hummock_version_reader .iter(key_range, epoch, read_options, read_version_tuple) @@ -306,27 +295,20 @@ impl HummockStorage { epoch: u64, read_options: ReadOptions, ) -> StorageResult { - let (key_range, read_version_tuple) = if read_options.read_version_from_time_travel { - self.build_read_version_by_time_travel(epoch, read_options.table_id, key_range) - .await? - } else if read_options.read_version_from_backup { - self.build_read_version_tuple_from_backup(epoch, read_options.table_id, key_range) - .await? - } else { - self.build_read_version_tuple(epoch, read_options.table_id, key_range)? - }; + let (key_range, read_version_tuple) = self + .build_read_version_tuple(epoch, key_range, &read_options) + .await?; self.hummock_version_reader .rev_iter(key_range, epoch, read_options, read_version_tuple, None) .await } - async fn build_read_version_by_time_travel( + async fn get_time_travel_version( &self, epoch: u64, table_id: TableId, - key_range: TableKeyRange, - ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> { + ) -> StorageResult { let fetch = async { let pb_version = self .hummock_meta_client @@ -335,7 +317,6 @@ impl HummockStorage { .inspect_err(|e| tracing::error!("{}", e.to_report_string())) .map_err(|e| HummockError::meta_error(e.to_report_string()))?; let version = HummockVersion::from_rpc_protobuf(&pb_version); - validate_safe_epoch(&version, table_id, epoch)?; let (tx, _rx) = unbounded_channel(); Ok(PinnedVersion::new(version, tx)) }; @@ -343,9 +324,24 @@ impl HummockStorage { .simple_time_travel_version_cache .get_or_insert(epoch, fetch) .await?; - Ok(get_committed_read_version_tuple( - version, table_id, key_range, epoch, - )) + Ok(version) + } + + async fn build_read_version_tuple( + &self, + epoch: u64, + key_range: TableKeyRange, + read_options: &ReadOptions, + ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> { + if read_options.read_version_from_backup { + self.build_read_version_tuple_from_backup(epoch, read_options.table_id, key_range) + .await + } else if read_options.read_committed { + self.build_read_version_tuple_from_committed(epoch, read_options.table_id, key_range) + .await + } else { + self.build_read_version_tuple_from_all(epoch, read_options.table_id, key_range) + } } async fn build_read_version_tuple_from_backup( @@ -359,16 +355,12 @@ impl HummockStorage { .try_get_hummock_version(table_id, epoch) .await { - Ok(Some(backup_version)) => { - validate_safe_epoch(backup_version.version(), table_id, epoch)?; - - Ok(get_committed_read_version_tuple( - backup_version, - table_id, - key_range, - epoch, - )) - } + Ok(Some(backup_version)) => Ok(get_committed_read_version_tuple( + backup_version, + table_id, + key_range, + epoch, + )), Ok(None) => Err(HummockError::read_backup_error(format!( "backup include epoch {} not found", epoch @@ -378,27 +370,47 @@ impl HummockStorage { } } - fn build_read_version_tuple( + async fn build_read_version_tuple_from_committed( &self, epoch: u64, table_id: TableId, key_range: TableKeyRange, ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> { - let pinned_version = self.pinned_version.load(); - validate_safe_epoch(pinned_version.version(), table_id, epoch)?; - let table_committed_epoch = pinned_version + let version = match self + .recent_versions + .load() + .get_safe_version(table_id, epoch) + { + Some(version) => version, + None => self.get_time_travel_version(epoch, table_id).await?, + }; + Ok(get_committed_read_version_tuple( + version, table_id, key_range, epoch, + )) + } + + fn build_read_version_tuple_from_all( + &self, + epoch: u64, + table_id: TableId, + key_range: TableKeyRange, + ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> { + let pinned_version = self.recent_versions.load().latest_version().clone(); + let info = pinned_version .version() .state_table_info .info() - .get(&table_id) - .map(|info| info.committed_epoch); + .get(&table_id); // check epoch if lower mce - let ret = if let Some(table_committed_epoch) = table_committed_epoch - && epoch <= table_committed_epoch + let ret = if let Some(info) = info + && epoch <= info.committed_epoch { + if epoch < info.safe_epoch { + return Err(HummockError::expired_epoch(table_id, info.safe_epoch, epoch).into()); + } // read committed_version directly without build snapshot - get_committed_read_version_tuple((**pinned_version).clone(), table_id, key_range, epoch) + get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch) } else { let vnode = vnode(&key_range); let mut matched_replicated_read_version_cnt = 0; @@ -431,6 +443,7 @@ impl HummockStorage { // When the system has just started and no state has been created, the memory state // may be empty if read_version_vec.is_empty() { + let table_committed_epoch = info.map(|info| info.committed_epoch); if matched_replicated_read_version_cnt > 0 { tracing::warn!( "Read(table_id={} vnode={} epoch={}) is not allowed on replicated read version ({} found). Fall back to committed version (epoch={:?})", @@ -449,12 +462,7 @@ impl HummockStorage { table_committed_epoch ); } - get_committed_read_version_tuple( - (**pinned_version).clone(), - table_id, - key_range, - epoch, - ) + get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch) } else { if read_version_vec.len() != 1 { let read_version_vnodes = read_version_vec @@ -538,7 +546,7 @@ impl HummockStorage { } pub fn get_pinned_version(&self) -> PinnedVersion { - self.pinned_version.load().deref().deref().clone() + self.recent_versions.load().latest_version().clone() } pub fn backup_reader(&self) -> BackupReaderRef { @@ -604,7 +612,7 @@ impl StateStoreRead for HummockStorage { key_range: TableKeyRange, options: ReadLogOptions, ) -> StorageResult { - let version = (**self.pinned_version.load()).clone(); + let version = self.recent_versions.load().latest_version().clone(); let iter = self .hummock_version_reader .iter_log(version, epoch_range, key_range, options) @@ -655,8 +663,9 @@ impl HummockStorage { epoch: u64, ) -> StorageResult { let table_ids = self - .pinned_version + .recent_versions .load() + .latest_version() .version() .state_table_info .info() @@ -675,7 +684,7 @@ impl HummockStorage { .send(HummockVersionUpdate::PinnedVersion(Box::new(version))) .unwrap(); loop { - if self.pinned_version.load().id() >= version_id { + if self.recent_versions.load().latest_version().id() >= version_id { break; } @@ -686,7 +695,7 @@ impl HummockStorage { pub async fn wait_version(&self, version: HummockVersion) { use tokio::task::yield_now; loop { - if self.pinned_version.load().id() >= version.id { + if self.recent_versions.load().latest_version().id() >= version.id { break; } @@ -736,7 +745,7 @@ impl HummockStorage { pub async fn wait_version_update(&self, old_id: HummockVersionId) -> HummockVersionId { use tokio::task::yield_now; loop { - let cur_id = self.pinned_version.load().id(); + let cur_id = self.recent_versions.load().latest_version().id(); if cur_id > old_id { return cur_id; } diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index 3f2d1f989f529..c2f6cbafed79b 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -30,11 +30,10 @@ use risingwave_hummock_sdk::key::{ bound_table_key_range, EmptySliceRef, FullKey, TableKey, UserKey, }; use risingwave_hummock_sdk::sstable_info::SstableInfo; -use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{can_concat, HummockEpoch}; use tokio::sync::oneshot::{channel, Receiver, Sender}; -use super::{HummockError, HummockResult, SstableStoreRef}; +use super::{HummockError, SstableStoreRef}; use crate::error::StorageResult; use crate::hummock::CachePolicy; use crate::mem_table::{KeyOp, MemTableError}; @@ -72,24 +71,6 @@ where !too_left && !too_right } -pub fn validate_safe_epoch( - version: &HummockVersion, - table_id: TableId, - epoch: u64, -) -> HummockResult<()> { - if let Some(info) = version.state_table_info.info().get(&table_id) - && epoch < info.safe_epoch - { - return Err(HummockError::expired_epoch( - table_id, - info.safe_epoch, - epoch, - )); - } - - Ok(()) -} - pub fn filter_single_sst(info: &SstableInfo, table_id: TableId, table_key_range: &R) -> bool where R: RangeBounds>, diff --git a/src/storage/src/opts.rs b/src/storage/src/opts.rs index f6d6f31fb3a4f..a3a787f55c97d 100644 --- a/src/storage/src/opts.rs +++ b/src/storage/src/opts.rs @@ -63,6 +63,8 @@ pub struct StorageOpts { /// max memory usage for large query. pub prefetch_buffer_capacity_mb: usize, + pub max_cached_recent_versions_number: usize, + pub max_prefetch_block_number: usize, pub disable_remote_compactor: bool, @@ -170,6 +172,10 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt meta_cache_shard_num: s.meta_cache_shard_num, meta_cache_eviction_config: s.meta_cache_eviction_config.clone(), prefetch_buffer_capacity_mb: s.prefetch_buffer_capacity_mb, + max_cached_recent_versions_number: c + .storage + .max_cached_recent_versions_number + .unwrap_or(60), max_prefetch_block_number: c.storage.max_prefetch_block_number, disable_remote_compactor: c.storage.disable_remote_compactor, share_buffer_upload_concurrency: c.storage.share_buffer_upload_concurrency, diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 91f79231f6939..ab80f712570ca 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -502,7 +502,7 @@ pub struct ReadOptions { /// Read from historical hummock version of meta snapshot backup. /// It should only be used by `StorageTable` for batch query. pub read_version_from_backup: bool, - pub read_version_from_time_travel: bool, + pub read_committed: bool, } impl From for ReadOptions { @@ -515,7 +515,7 @@ impl From for ReadOptions { retention_seconds: value.retention_seconds, table_id: value.table_id.into(), read_version_from_backup: value.read_version_from_backup, - read_version_from_time_travel: value.read_version_from_time_travel, + read_committed: value.read_committed, } } } @@ -530,7 +530,7 @@ impl From for TracedReadOptions { retention_seconds: value.retention_seconds, table_id: value.table_id.into(), read_version_from_backup: value.read_version_from_backup, - read_version_from_time_travel: value.read_version_from_time_travel, + read_committed: value.read_committed, } } } diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 7a0ad76cce4a5..8c5f432f46c57 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -361,7 +361,10 @@ impl StorageTableInner { ) -> StorageResult> { let epoch = wait_epoch.get_epoch(); let read_backup = matches!(wait_epoch, HummockReadEpoch::Backup(_)); - let read_time_travel = matches!(wait_epoch, HummockReadEpoch::TimeTravel(_)); + let read_committed = matches!( + wait_epoch, + HummockReadEpoch::TimeTravel(_) | HummockReadEpoch::Committed(_) + ); self.store.try_wait_epoch(wait_epoch).await?; let serialized_pk = serialize_pk_with_vnode( &pk, @@ -382,7 +385,7 @@ impl StorageTableInner { retention_seconds: self.table_option.retention_seconds, table_id: self.table_id, read_version_from_backup: read_backup, - read_version_from_time_travel: read_time_travel, + read_committed, cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }; @@ -487,14 +490,17 @@ impl StorageTableInner { let iterators: Vec<_> = try_join_all(table_key_ranges.map(|table_key_range| { let prefix_hint = prefix_hint.clone(); let read_backup = matches!(wait_epoch, HummockReadEpoch::Backup(_)); - let read_time_travel = matches!(wait_epoch, HummockReadEpoch::TimeTravel(_)); + let read_committed = matches!( + wait_epoch, + HummockReadEpoch::TimeTravel(_) | HummockReadEpoch::Committed(_) + ); async move { let read_options = ReadOptions { prefix_hint, retention_seconds: self.table_option.retention_seconds, table_id: self.table_id, read_version_from_backup: read_backup, - read_version_from_time_travel: read_time_travel, + read_committed, prefetch_options, cache_policy, ..Default::default()