diff --git a/src/ctl/src/cmd_impl/hummock/list_version.rs b/src/ctl/src/cmd_impl/hummock/list_version.rs index 0f88d881ddb6..460dd88621eb 100644 --- a/src/ctl/src/cmd_impl/hummock/list_version.rs +++ b/src/ctl/src/cmd_impl/hummock/list_version.rs @@ -51,7 +51,8 @@ pub async fn list_version( } else { println!( "Version {} max_committed_epoch {}", - version.id, version.max_committed_epoch + version.id, + version.visible_table_committed_epoch() ); for (cg, levels) in &version.levels { diff --git a/src/ctl/src/cmd_impl/hummock/pause_resume.rs b/src/ctl/src/cmd_impl/hummock/pause_resume.rs index 85791267bfd8..9fdb9bc0cab3 100644 --- a/src/ctl/src/cmd_impl/hummock/pause_resume.rs +++ b/src/ctl/src/cmd_impl/hummock/pause_resume.rs @@ -23,7 +23,8 @@ pub async fn disable_commit_epoch(context: &CtlContext) -> anyhow::Result<()> { println!( "Disabled.\ Current version: id {}, max_committed_epoch {}", - version.id, version.max_committed_epoch + version.id, + version.visible_table_committed_epoch() ); Ok(()) } diff --git a/src/ctl/src/cmd_impl/meta/migration.rs b/src/ctl/src/cmd_impl/meta/migration.rs index fb786a505d16..a6c488922022 100644 --- a/src/ctl/src/cmd_impl/meta/migration.rs +++ b/src/ctl/src/cmd_impl/meta/migration.rs @@ -741,7 +741,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an .map(|vd| hummock_version_delta::ActiveModel { id: Set(vd.id.to_u64() as _), prev_id: Set(vd.prev_id.to_u64() as _), - max_committed_epoch: Set(vd.max_committed_epoch as _), + max_committed_epoch: Set(vd.visible_table_committed_epoch() as _), safe_epoch: Set(vd.visible_table_safe_epoch() as _), trivial_move: Set(vd.trivial_move), full_version_delta: Set((&vd.to_protobuf()).into()), diff --git a/src/ctl/src/cmd_impl/table/scan.rs b/src/ctl/src/cmd_impl/table/scan.rs index 0689e315f74c..e5bba170bf97 100644 --- a/src/ctl/src/cmd_impl/table/scan.rs +++ b/src/ctl/src/cmd_impl/table/scan.rs @@ -124,7 +124,10 @@ async fn do_scan(table: TableCatalog, hummock: MonitoredStateStore Vec Result, current_version: &HummockVersion, @@ -221,11 +221,12 @@ impl HummockManager { } } - if epoch <= current_version.max_committed_epoch { + // TODO: allow equal when supporting partial checkpoint + if max_committed_epoch <= current_version.visible_table_committed_epoch() { return Err(anyhow::anyhow!( "Epoch {} <= max_committed_epoch {}", - epoch, - current_version.max_committed_epoch + max_committed_epoch, + current_version.visible_table_committed_epoch() ) .into()); } @@ -252,7 +253,7 @@ impl HummockManager { .send_event(ResponseEvent::ValidationTask(ValidationTask { sst_infos: sst_infos.into_iter().map(|sst| sst.into()).collect_vec(), sst_id_to_worker_id: sst_to_context.clone(), - epoch, + epoch: max_committed_epoch, })) .is_err() { @@ -427,7 +428,7 @@ impl HummockManager { let _timer = start_measure_real_process_timer!(self, "unpin_snapshot_before"); // Use the max_committed_epoch in storage as the snapshot ts so only committed changes are // visible in the snapshot. - let max_committed_epoch = versioning.current_version.max_committed_epoch; + let max_committed_epoch = versioning.current_version.visible_table_committed_epoch(); // Ensure the unpin will not clean the latest one. let snapshot_committed_epoch = hummock_snapshot.committed_epoch; #[cfg(not(test))] diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 3ab2e02f026d..5a3c1ca9ba26 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -433,8 +433,8 @@ impl HummockManager { self.latest_snapshot.store( HummockSnapshot { - committed_epoch: redo_state.max_committed_epoch, - current_epoch: redo_state.max_committed_epoch, + committed_epoch: redo_state.visible_table_committed_epoch(), + current_epoch: redo_state.visible_table_committed_epoch(), } .into(), ); diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index a35dc5c4b077..5fbec419cc7f 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -318,7 +318,10 @@ async fn test_hummock_transaction() { .await; // Get tables before committing epoch1. No tables should be returned. let current_version = hummock_manager.get_current_version().await; - assert_eq!(current_version.max_committed_epoch, INVALID_EPOCH); + assert_eq!( + current_version.visible_table_committed_epoch(), + INVALID_EPOCH + ); assert!(get_sorted_committed_object_ids(¤t_version).is_empty()); // Commit epoch1 @@ -333,7 +336,7 @@ async fn test_hummock_transaction() { // Get tables after committing epoch1. All tables committed in epoch1 should be returned let current_version = hummock_manager.get_current_version().await; - assert_eq!(current_version.max_committed_epoch, epoch1); + assert_eq!(current_version.visible_table_committed_epoch(), epoch1); assert_eq!( get_sorted_object_ids(&committed_tables), get_sorted_committed_object_ids(¤t_version) @@ -356,7 +359,7 @@ async fn test_hummock_transaction() { // Get tables before committing epoch2. tables_in_epoch1 should be returned and // tables_in_epoch2 should be invisible. let current_version = hummock_manager.get_current_version().await; - assert_eq!(current_version.max_committed_epoch, epoch1); + assert_eq!(current_version.visible_table_committed_epoch(), epoch1); assert_eq!( get_sorted_object_ids(&committed_tables), get_sorted_committed_object_ids(¤t_version) @@ -375,7 +378,7 @@ async fn test_hummock_transaction() { // Get tables after committing epoch2. tables_in_epoch1 and tables_in_epoch2 should be // returned let current_version = hummock_manager.get_current_version().await; - assert_eq!(current_version.max_committed_epoch, epoch2); + assert_eq!(current_version.visible_table_committed_epoch(), epoch2); assert_eq!( get_sorted_object_ids(&committed_tables), get_sorted_committed_object_ids(¤t_version) @@ -1148,7 +1151,7 @@ async fn test_extend_objects_to_delete() { ); let objects_to_delete = hummock_manager.get_objects_to_delete(); assert_eq!(objects_to_delete.len(), orphan_sst_num as usize); - let new_epoch = pinned_version2.max_committed_epoch.next_epoch(); + let new_epoch = pinned_version2.visible_table_committed_epoch().next_epoch(); hummock_manager .commit_epoch_for_test( new_epoch, @@ -1158,7 +1161,7 @@ async fn test_extend_objects_to_delete() { .await .unwrap(); let pinned_version3: HummockVersion = hummock_manager.pin_version(context_id).await.unwrap(); - assert_eq!(new_epoch, pinned_version3.max_committed_epoch); + assert_eq!(new_epoch, pinned_version3.visible_table_committed_epoch()); hummock_manager .unpin_version_before(context_id, pinned_version3.id) .await diff --git a/src/meta/src/hummock/manager/time_travel.rs b/src/meta/src/hummock/manager/time_travel.rs index 70035e70aa43..61bcdfe7e8de 100644 --- a/src/meta/src/hummock/manager/time_travel.rs +++ b/src/meta/src/hummock/manager/time_travel.rs @@ -388,7 +388,7 @@ impl HummockManager { Ok(count) } - let epoch = delta.max_committed_epoch; + let epoch = delta.visible_table_committed_epoch(); let version_id: u64 = delta.id.to_u64(); let m = hummock_epoch_to_version::ActiveModel { epoch: Set(epoch.try_into().unwrap()), @@ -483,14 +483,14 @@ fn replay_archive( deltas: impl Iterator, ) -> HummockVersion { let mut last_version = HummockVersion::from_persisted_protobuf(&version); - let mut mce = last_version.max_committed_epoch; + let mut mce = last_version.visible_table_committed_epoch(); for d in deltas { let d = HummockVersionDelta::from_persisted_protobuf(&d); assert!( - d.max_committed_epoch > mce, + d.visible_table_committed_epoch() > mce, "time travel expects delta from commit_epoch only" ); - mce = d.max_committed_epoch; + mce = d.visible_table_committed_epoch(); // Need to work around the assertion in `apply_version_delta`. // Because compaction deltas are not included in time travel archive. while last_version.id < d.prev_id { diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index f14aa070d586..32a6c7b9c1a0 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -43,7 +43,7 @@ fn trigger_delta_log_stats(metrics: &MetaMetrics, total_number: usize) { fn trigger_version_stat(metrics: &MetaMetrics, current_version: &HummockVersion) { metrics .max_committed_epoch - .set(current_version.max_committed_epoch as i64); + .set(current_version.visible_table_committed_epoch() as i64); metrics .version_size .set(current_version.estimated_encode_len() as i64); @@ -129,7 +129,7 @@ impl<'a> HummockVersionTransaction<'a> { )>, ) -> HummockVersionDelta { let mut new_version_delta = self.new_delta(); - new_version_delta.max_committed_epoch = max_committed_epoch; + new_version_delta.set_max_committed_epoch(max_committed_epoch); new_version_delta.new_table_watermarks = new_table_watermarks; new_version_delta.change_log_delta = change_log_delta; diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index 11e22522837a..30c8b09a69d0 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -178,7 +178,7 @@ impl HummockManager { .hummock_version_deltas .range(start_id..) .map(|(_id, delta)| delta) - .filter(|delta| delta.max_committed_epoch <= committed_epoch_limit) + .filter(|delta| delta.visible_table_committed_epoch() <= committed_epoch_limit) .take(num_limit as _) .cloned() .collect(); diff --git a/src/meta/src/hummock/model/ext/hummock.rs b/src/meta/src/hummock/model/ext/hummock.rs index 37dae37218fa..562ea1016af1 100644 --- a/src/meta/src/hummock/model/ext/hummock.rs +++ b/src/meta/src/hummock/model/ext/hummock.rs @@ -223,7 +223,7 @@ impl Transactional for HummockVersionDelta { let m = hummock_version_delta::ActiveModel { id: Set(self.id.to_u64().try_into().unwrap()), prev_id: Set(self.prev_id.to_u64().try_into().unwrap()), - max_committed_epoch: Set(self.max_committed_epoch.try_into().unwrap()), + max_committed_epoch: Set(self.visible_table_committed_epoch().try_into().unwrap()), safe_epoch: Set(self.visible_table_safe_epoch().try_into().unwrap()), trivial_move: Set(self.trivial_move), full_version_delta: Set(FullVersionDelta::from(&self.into())), diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index b49a625111d3..2886c4e4e23f 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -106,7 +106,7 @@ fiemap = "0.1.1" [features] # rocksdb-local = ["rocksdb"] # tikv = ["tikv-client"] -test = [] +test = ["risingwave_hummock_sdk/test"] failpoints = ["fail/failpoints"] bpf = [] hm-trace = [] diff --git a/src/storage/backup/src/lib.rs b/src/storage/backup/src/lib.rs index 91d3f6c77ca5..4c53af38eef6 100644 --- a/src/storage/backup/src/lib.rs +++ b/src/storage/backup/src/lib.rs @@ -75,7 +75,7 @@ impl MetaSnapshotMetadata { id, hummock_version_id: v.id, ssts: v.get_object_ids(), - max_committed_epoch: v.max_committed_epoch, + max_committed_epoch: v.visible_table_committed_epoch(), safe_epoch: v.visible_table_safe_epoch(), format_version, remarks, diff --git a/src/storage/backup/src/meta_snapshot_v2.rs b/src/storage/backup/src/meta_snapshot_v2.rs index bec07a80cf19..e7dbc92eae23 100644 --- a/src/storage/backup/src/meta_snapshot_v2.rs +++ b/src/storage/backup/src/meta_snapshot_v2.rs @@ -128,7 +128,8 @@ impl Display for MetadataV2 { writeln!( f, "Hummock version: id {}, max_committed_epoch: {}", - self.hummock_version.id, self.hummock_version.max_committed_epoch + self.hummock_version.id, + self.hummock_version.visible_table_committed_epoch() )?; // optionally dump other metadata Ok(()) diff --git a/src/storage/benches/bench_table_watermarks.rs b/src/storage/benches/bench_table_watermarks.rs index 96fda8462965..4a9e1c5edda0 100644 --- a/src/storage/benches/bench_table_watermarks.rs +++ b/src/storage/benches/bench_table_watermarks.rs @@ -28,8 +28,8 @@ use risingwave_hummock_sdk::table_watermark::{ TableWatermarks, TableWatermarksIndex, VnodeWatermark, WatermarkDirection, }; use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionStateTableInfo}; -use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId}; -use risingwave_pb::hummock::StateTableInfoDelta; +use risingwave_hummock_sdk::HummockEpoch; +use risingwave_pb::hummock::{PbHummockVersion, StateTableInfoDelta}; use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion; use spin::Mutex; use tokio::sync::mpsc::unbounded_channel; @@ -115,10 +115,12 @@ fn gen_version( new_epoch_idx, vnode_part_count, )); - let mut version = HummockVersion::default(); let committed_epoch = test_epoch(new_epoch_idx as _); - version.id = HummockVersionId::new(new_epoch_idx as _); - version.max_committed_epoch = committed_epoch; + let mut version = HummockVersion::from_persisted_protobuf(&PbHummockVersion { + id: new_epoch_idx as _, + max_committed_epoch: committed_epoch, + ..Default::default() + }); version.table_watermarks = (0..table_count) .map(|table_id| (TableId::new(table_id as _), table_watermarks.clone())) .collect(); diff --git a/src/storage/hummock_sdk/Cargo.toml b/src/storage/hummock_sdk/Cargo.toml index fa080b90b746..79d596bb3121 100644 --- a/src/storage/hummock_sdk/Cargo.toml +++ b/src/storage/hummock_sdk/Cargo.toml @@ -30,5 +30,8 @@ tracing = "0.1" [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../../workspace-hack" } +[features] +test = [] + [lints] workspace = true diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 3720afc12ff2..d194f4355aa2 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -556,7 +556,7 @@ impl HummockVersion { pub fn apply_version_delta(&mut self, version_delta: &HummockVersionDelta) { assert_eq!(self.id, version_delta.prev_id); - let changed_table_info = self.state_table_info.apply_delta( + let (changed_table_info, is_commit_epoch) = self.state_table_info.apply_delta( &version_delta.state_table_info_delta, &version_delta.removed_table_ids, ); @@ -630,6 +630,7 @@ impl HummockVersion { .append(&mut moving_tables); } let has_destroy = summary.group_destroy.is_some(); + let visible_table_committed_epoch = self.visible_table_committed_epoch(); let levels = self .levels .get_mut(compaction_group_id) @@ -647,12 +648,12 @@ impl HummockVersion { } assert!( - self.max_committed_epoch <= version_delta.max_committed_epoch, + visible_table_committed_epoch <= version_delta.visible_table_committed_epoch(), "new max commit epoch {} is older than the current max commit epoch {}", - version_delta.max_committed_epoch, - self.max_committed_epoch + version_delta.visible_table_committed_epoch(), + visible_table_committed_epoch ); - if self.max_committed_epoch < version_delta.max_committed_epoch { + if is_commit_epoch { // `max_committed_epoch` increases. It must be a `commit_epoch` let GroupDeltasSummary { delete_sst_levels, @@ -700,7 +701,7 @@ impl HummockVersion { } } self.id = version_delta.id; - self.max_committed_epoch = version_delta.max_committed_epoch; + self.set_max_committed_epoch(version_delta.visible_table_committed_epoch()); self.set_safe_epoch(version_delta.visible_table_safe_epoch()); // apply to table watermark @@ -790,7 +791,7 @@ impl HummockVersion { if !contains { warn!( ?table_id, - max_committed_epoch = version_delta.max_committed_epoch, + max_committed_epoch = version_delta.visible_table_committed_epoch(), "table change log dropped due to no further change log at newly committed epoch", ); } @@ -1223,11 +1224,11 @@ pub fn validate_version(version: &HummockVersion) -> Vec { let mut res = Vec::new(); // Ensure safe_epoch <= max_committed_epoch - if version.visible_table_safe_epoch() > version.max_committed_epoch { + if version.visible_table_safe_epoch() > version.visible_table_committed_epoch() { res.push(format!( "VERSION: safe_epoch {} > max_committed_epoch {}", version.visible_table_safe_epoch(), - version.max_committed_epoch + version.visible_table_committed_epoch() )); } diff --git a/src/storage/hummock_sdk/src/table_watermark.rs b/src/storage/hummock_sdk/src/table_watermark.rs index 2de3fd1b6942..250e9014a1d3 100644 --- a/src/storage/hummock_sdk/src/table_watermark.rs +++ b/src/storage/hummock_sdk/src/table_watermark.rs @@ -1155,7 +1155,7 @@ mod tests { ); let mut version = HummockVersion::default(); - version.max_committed_epoch = EPOCH1; + version.set_max_committed_epoch(EPOCH1); let test_table_id = TableId::from(233); version.table_watermarks.insert( test_table_id, diff --git a/src/storage/hummock_sdk/src/time_travel.rs b/src/storage/hummock_sdk/src/time_travel.rs index 1756353de74a..1fbd26ed3485 100644 --- a/src/storage/hummock_sdk/src/time_travel.rs +++ b/src/storage/hummock_sdk/src/time_travel.rs @@ -36,7 +36,7 @@ use crate::{CompactionGroupId, HummockSstableId, HummockVersionId}; pub struct IncompleteHummockVersion { pub id: HummockVersionId, pub levels: HashMap, - pub max_committed_epoch: u64, + max_committed_epoch: u64, safe_epoch: u64, pub table_watermarks: HashMap>, pub table_change_log: HashMap, @@ -211,7 +211,7 @@ impl From<(&HummockVersion, &HashSet)> for IncompleteHummockV } }) .collect(), - max_committed_epoch: version.max_committed_epoch, + max_committed_epoch: version.visible_table_committed_epoch(), safe_epoch: version.visible_table_safe_epoch(), table_watermarks: version.table_watermarks.clone(), // TODO: optimization: strip table change log @@ -294,7 +294,7 @@ impl From<(&HummockVersionDelta, &HashSet)> for IncompleteHum } }) .collect(), - max_committed_epoch: delta.max_committed_epoch, + max_committed_epoch: delta.visible_table_committed_epoch(), safe_epoch: delta.visible_table_safe_epoch(), trivial_move: delta.trivial_move, new_table_watermarks: delta.new_table_watermarks.clone(), diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 5e6ea2f3fe4a..4ce5b89138f8 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -98,8 +98,9 @@ impl HummockVersionStateTableInfo { &mut self, delta: &HashMap, removed_table_id: &HashSet, - ) -> HashMap> { + ) -> (HashMap>, bool) { let mut changed_table = HashMap::new(); + let mut has_bumped_committed_epoch = false; fn remove_table_from_compaction_group( compaction_group_member_tables: &mut HashMap>, compaction_group_id: CompactionGroupId, @@ -150,6 +151,9 @@ impl HummockVersionStateTableInfo { prev_info, new_info ); + if new_info.committed_epoch > prev_info.committed_epoch { + has_bumped_committed_epoch = true; + } if prev_info.compaction_group_id != new_info.compaction_group_id { // table moved to another compaction group remove_table_from_compaction_group( @@ -172,6 +176,7 @@ impl HummockVersionStateTableInfo { .entry(new_info.compaction_group_id) .or_default() .insert(*table_id)); + has_bumped_committed_epoch = true; entry.insert(new_info); changed_table.insert(*table_id, None); } @@ -181,7 +186,7 @@ impl HummockVersionStateTableInfo { self.compaction_group_member_tables, Self::build_compaction_group_member_tables(&self.state_table_info) ); - changed_table + (changed_table, has_bumped_committed_epoch) } pub fn info(&self) -> &HashMap { @@ -207,7 +212,7 @@ impl HummockVersionStateTableInfo { pub struct HummockVersion { pub id: HummockVersionId, pub levels: HashMap, - pub max_committed_epoch: u64, + max_committed_epoch: u64, safe_epoch: u64, pub table_watermarks: HashMap>, pub table_change_log: HashMap, @@ -396,6 +401,19 @@ impl HummockVersion { self.safe_epoch } + pub(crate) fn set_max_committed_epoch(&mut self, max_committed_epoch: u64) { + self.max_committed_epoch = max_committed_epoch; + } + + #[cfg(any(test, feature = "test"))] + pub fn max_committed_epoch(&self) -> u64 { + self.max_committed_epoch + } + + pub fn visible_table_committed_epoch(&self) -> u64 { + self.max_committed_epoch + } + pub fn create_init_version(default_compaction_config: Arc) -> HummockVersion { let mut init_version = HummockVersion { id: FIRST_VERSION_ID, @@ -439,7 +457,7 @@ pub struct HummockVersionDelta { pub id: HummockVersionId, pub prev_id: HummockVersionId, pub group_deltas: HashMap, - pub max_committed_epoch: u64, + max_committed_epoch: u64, safe_epoch: u64, pub trivial_move: bool, pub new_table_watermarks: HashMap, @@ -570,6 +588,14 @@ impl HummockVersionDelta { pub fn set_safe_epoch(&mut self, safe_epoch: u64) { self.safe_epoch = safe_epoch; } + + pub fn visible_table_committed_epoch(&self) -> u64 { + self.max_committed_epoch + } + + pub fn set_max_committed_epoch(&mut self, max_committed_epoch: u64) { + self.max_committed_epoch = max_committed_epoch; + } } impl From<&PbHummockVersionDelta> for HummockVersionDelta { diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 38ef095969a1..79b00d0f9b8f 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -977,7 +977,7 @@ pub(crate) mod tests { compact_task.current_epoch_time = hummock_manager_ref .get_current_version() .await - .max_committed_epoch; + .max_committed_epoch(); // assert compact_task assert_eq!( @@ -1179,7 +1179,7 @@ pub(crate) mod tests { compact_task.current_epoch_time = hummock_manager_ref .get_current_version() .await - .max_committed_epoch; + .max_committed_epoch(); // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index b3e304305660..caae996d62e4 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -2496,7 +2496,7 @@ async fn test_commit_multi_epoch() { .manager .get_current_version() .await - .max_committed_epoch; + .max_committed_epoch(); let epoch1 = initial_epoch.next_epoch(); let sst1_epoch1 = SstableInfo { diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 2d2f1165fefc..ee87177923e9 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -317,7 +317,7 @@ impl HummockEventHandler { let (hummock_event_tx, hummock_event_rx) = event_channel(state_store_metrics.event_handler_pending_event.clone()); let (version_update_notifier_tx, _) = - tokio::sync::watch::channel(pinned_version.max_committed_epoch()); + tokio::sync::watch::channel(pinned_version.visible_table_committed_epoch()); let version_update_notifier_tx = Arc::new(version_update_notifier_tx); let read_version_mapping = Arc::new(RwLock::new(HashMap::default())); let buffer_tracker = BufferTracker::from_storage_opts( @@ -511,8 +511,8 @@ impl HummockEventHandler { info!( ?version_id, - current_mce = current_version.max_committed_epoch(), - refiller_mce = new_pinned_version.max_committed_epoch(), + current_mce = current_version.visible_table_committed_epoch(), + refiller_mce = new_pinned_version.visible_table_committed_epoch(), "refiller is clear in recovery" ); @@ -643,8 +643,8 @@ impl HummockEventHandler { ); } - let prev_max_committed_epoch = pinned_version.max_committed_epoch(); - let max_committed_epoch = new_pinned_version.max_committed_epoch(); + let prev_max_committed_epoch = pinned_version.visible_table_committed_epoch(); + let max_committed_epoch = new_pinned_version.visible_table_committed_epoch(); // only notify local_version_manager when MCE change self.version_update_notifier_tx.send_if_modified(|state| { @@ -661,16 +661,17 @@ impl HummockEventHandler { conflict_detector.set_watermark(max_committed_epoch); } + // TODO: should we change the logic when supporting partial ckpt? if let Some(sstable_object_id_manager) = &self.sstable_object_id_manager { sstable_object_id_manager.remove_watermark_object_id(TrackerId::Epoch( - self.pinned_version.load().max_committed_epoch(), + self.pinned_version.load().visible_table_committed_epoch(), )); } debug!( "update to hummock version: {}, epoch: {}", new_pinned_version.id(), - new_pinned_version.max_committed_epoch() + new_pinned_version.visible_table_committed_epoch() ); self.uploader.update_pinned_version(new_pinned_version); diff --git a/src/storage/src/hummock/event_handler/uploader/mod.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs index ed9503be1462..88351c34b621 100644 --- a/src/storage/src/hummock/event_handler/uploader/mod.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -27,7 +27,7 @@ use std::task::{ready, Context, Poll}; use futures::FutureExt; use itertools::Itertools; -use more_asserts::{assert_ge, assert_gt}; +use more_asserts::assert_gt; use prometheus::core::{AtomicU64, GenericGauge}; use prometheus::{HistogramTimer, IntGauge}; use risingwave_common::bitmap::BitmapBuilder; @@ -1247,10 +1247,6 @@ impl HummockUploader { } pub(crate) fn update_pinned_version(&mut self, pinned_version: PinnedVersion) { - assert_ge!( - pinned_version.max_committed_epoch(), - self.context.pinned_version.max_committed_epoch() - ); if let UploaderState::Working(data) = &mut self.state { // TODO: may only `ack_committed` on table whose `committed_epoch` is changed. for (table_id, info) in pinned_version.version().state_table_info.info() { diff --git a/src/storage/src/hummock/event_handler/uploader/test_utils.rs b/src/storage/src/hummock/event_handler/uploader/test_utils.rs index 07a306a2a8df..ca3a38db2b94 100644 --- a/src/storage/src/hummock/event_handler/uploader/test_utils.rs +++ b/src/storage/src/hummock/event_handler/uploader/test_utils.rs @@ -34,8 +34,8 @@ use risingwave_hummock_sdk::key::{FullKey, TableKey}; use risingwave_hummock_sdk::key_range::KeyRange; use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::version::HummockVersion; -use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId, LocalSstableInfo}; -use risingwave_pb::hummock::StateTableInfoDelta; +use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo}; +use risingwave_pb::hummock::{PbHummockVersion, StateTableInfoDelta}; use spin::Mutex; use tokio::spawn; use tokio::sync::mpsc::unbounded_channel; @@ -89,9 +89,11 @@ impl HummockUploader { } pub(super) fn test_hummock_version(epoch: HummockEpoch) -> HummockVersion { - let mut version = HummockVersion::default(); - version.id = HummockVersionId::new(epoch); - version.max_committed_epoch = epoch; + let mut version = HummockVersion::from_persisted_protobuf(&PbHummockVersion { + id: epoch, + max_committed_epoch: epoch, + ..Default::default() + }); version.state_table_info.apply_delta( &HashMap::from_iter([( TEST_TABLE_ID, diff --git a/src/storage/src/hummock/local_version/pinned_version.rs b/src/storage/src/hummock/local_version/pinned_version.rs index 8fc6475d0ed2..2a552ffbbf31 100644 --- a/src/storage/src/hummock/local_version/pinned_version.rs +++ b/src/storage/src/hummock/local_version/pinned_version.rs @@ -149,8 +149,13 @@ impl PinnedVersion { } } + #[cfg(any(test, feature = "test"))] pub fn max_committed_epoch(&self) -> u64 { - self.version.max_committed_epoch + self.version.max_committed_epoch() + } + + pub fn visible_table_committed_epoch(&self) -> u64 { + self.version.visible_table_committed_epoch() } /// ret value can't be used as `HummockVersion`. it must be modified with delta diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index e0362d26e6b5..6753131c402f 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -132,8 +132,17 @@ pub fn get_committed_read_version_tuple( epoch: HummockEpoch, ) -> (TableKeyRange, ReadVersionTuple) { if let Some(table_watermarks) = version.version().table_watermarks.get(&table_id) { - TableWatermarksIndex::new_committed(table_watermarks.clone(), version.max_committed_epoch()) - .rewrite_range_with_table_watermark(epoch, &mut key_range) + TableWatermarksIndex::new_committed( + table_watermarks.clone(), + version + .version() + .state_table_info + .info() + .get(&table_id) + .expect("should exist when having table watermark") + .committed_epoch, + ) + .rewrite_range_with_table_watermark(epoch, &mut key_range) } (key_range, (vec![], vec![], version)) } @@ -202,8 +211,12 @@ impl HummockStorage { await_tree_reg.clone(), ); - let seal_epoch = Arc::new(AtomicU64::new(pinned_version.max_committed_epoch())); - let min_current_epoch = Arc::new(AtomicU64::new(pinned_version.max_committed_epoch())); + let seal_epoch = Arc::new(AtomicU64::new( + pinned_version.visible_table_committed_epoch(), + )); + let min_current_epoch = Arc::new(AtomicU64::new( + pinned_version.visible_table_committed_epoch(), + )); let hummock_event_handler = HummockEventHandler::new( version_update_rx, pinned_version, @@ -388,9 +401,17 @@ impl HummockStorage { ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> { let pinned_version = self.pinned_version.load(); validate_safe_epoch(pinned_version.version(), table_id, epoch)?; + let table_committed_epoch = pinned_version + .version() + .state_table_info + .info() + .get(&table_id) + .map(|info| info.committed_epoch); // check epoch if lower mce - let ret = if epoch <= pinned_version.max_committed_epoch() { + let ret = if let Some(table_committed_epoch) = table_committed_epoch + && epoch <= table_committed_epoch + { // read committed_version directly without build snapshot get_committed_read_version_tuple((**pinned_version).clone(), table_id, key_range, epoch) } else { @@ -427,20 +448,20 @@ impl HummockStorage { if read_version_vec.is_empty() { if matched_replicated_read_version_cnt > 0 { tracing::warn!( - "Read(table_id={} vnode={} epoch={}) is not allowed on replicated read version ({} found). Fall back to committed version (epoch={})", + "Read(table_id={} vnode={} epoch={}) is not allowed on replicated read version ({} found). Fall back to committed version (epoch={:?})", table_id, vnode.to_index(), epoch, matched_replicated_read_version_cnt, - pinned_version.max_committed_epoch() + table_committed_epoch, ); } else { tracing::debug!( - "No read version found for read(table_id={} vnode={} epoch={}). Fall back to committed version (epoch={})", + "No read version found for read(table_id={} vnode={} epoch={}). Fall back to committed version (epoch={:?})", table_id, vnode.to_index(), epoch, - pinned_version.max_committed_epoch() + table_committed_epoch ); } get_committed_read_version_tuple( @@ -505,7 +526,7 @@ impl HummockStorage { .expect("should send success"); rx.await.expect("should wait success"); - let epoch = self.pinned_version.load().max_committed_epoch(); + let epoch = self.pinned_version.load().visible_table_committed_epoch(); self.min_current_epoch .store(HummockEpoch::MAX, MemOrdering::SeqCst); self.seal_epoch.store(epoch, MemOrdering::SeqCst); diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index 6856d56ea900..63ee6762cfb3 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -507,7 +507,7 @@ mod tests { .storage .get_pinned_version() .version() - .max_committed_epoch + .max_committed_epoch() .next_epoch(); test_env .storage @@ -614,7 +614,7 @@ mod tests { .storage .get_pinned_version() .version() - .max_committed_epoch + .max_committed_epoch() .next_epoch(); test_env .storage @@ -806,7 +806,7 @@ mod tests { .storage .get_pinned_version() .version() - .max_committed_epoch + .max_committed_epoch() .next_epoch(); test_env .storage @@ -1035,7 +1035,7 @@ mod tests { .storage .get_pinned_version() .version() - .max_committed_epoch + .max_committed_epoch() .next_epoch(); test_env .storage @@ -1231,7 +1231,7 @@ mod tests { .storage .get_pinned_version() .version() - .max_committed_epoch + .max_committed_epoch() .next_epoch(); test_env .storage @@ -1371,7 +1371,7 @@ mod tests { .storage .get_pinned_version() .version() - .max_committed_epoch + .max_committed_epoch() .next_epoch(); test_env .storage @@ -1701,7 +1701,7 @@ mod tests { .storage .get_pinned_version() .version() - .max_committed_epoch + .max_committed_epoch() .next_epoch(); test_env .storage diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 3651dcc44d5e..fe58c4b3d911 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -598,7 +598,7 @@ impl PartialGraphManagedBarrierState { "ignore sealing data for the first barrier" ); if let Some(hummock) = self.state_store.as_hummock() { - let mce = hummock.get_pinned_version().max_committed_epoch(); + let mce = hummock.get_pinned_version().visible_table_committed_epoch(); assert_eq!( mce, prev_epoch, "first epoch should match with the current version", diff --git a/src/tests/compaction_test/src/compaction_test_runner.rs b/src/tests/compaction_test/src/compaction_test_runner.rs index 6e1ca43be811..0aa7d1d83c8d 100644 --- a/src/tests/compaction_test/src/compaction_test_runner.rs +++ b/src/tests/compaction_test/src/compaction_test_runner.rs @@ -375,7 +375,7 @@ async fn start_replay( for delta in version_delta_logs { let (current_version, compaction_groups) = meta_client.replay_version_delta(delta).await?; let (version_id, max_committed_epoch) = - (current_version.id, current_version.max_committed_epoch); + (current_version.id, current_version.max_committed_epoch()); tracing::info!( "Replayed version delta version_id: {}, max_committed_epoch: {}, compaction_groups: {:?}", version_id, @@ -464,7 +464,7 @@ async fn start_replay( ); let (new_version_id, new_committed_epoch) = - (new_version.id, new_version.max_committed_epoch); + (new_version.id, new_version.max_committed_epoch()); assert!( new_version_id >= version_id, "new_version_id: {}, epoch: {}",