diff --git a/proto/backup_service.proto b/proto/backup_service.proto index 5c9f1364582f0..fbbbee2c4a6ee 100644 --- a/proto/backup_service.proto +++ b/proto/backup_service.proto @@ -47,7 +47,8 @@ message MetaSnapshotManifest { message MetaSnapshotMetadata { uint64 id = 1; uint64 hummock_version_id = 2; - uint64 max_committed_epoch = 3; + reserved 3; + reserved 'max_committed_epoch'; reserved 4; reserved 'safe_epoch'; optional uint32 format_version = 5; diff --git a/proto/hummock.proto b/proto/hummock.proto index a746d1675e835..0fab515e546e6 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -183,7 +183,7 @@ message HummockVersion { uint64 id = 1; // Levels of each compaction group map levels = 2; - uint64 max_committed_epoch = 3; + uint64 max_committed_epoch = 3 [deprecated = true]; reserved 4; reserved 'safe_epoch'; map table_watermarks = 5; @@ -199,7 +199,7 @@ message HummockVersionDelta { uint64 prev_id = 2; // Levels of each compaction group map group_deltas = 3; - uint64 max_committed_epoch = 4; + uint64 max_committed_epoch = 4 [deprecated = true]; reserved 5; reserved 'safe_epoch'; bool trivial_move = 6; diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index a7f37bf505910..ef4852a665f72 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -103,7 +103,7 @@ workspace-hack = { path = "../workspace-hack" } assert_matches = "1" expect-test = "1.5" rand = { workspace = true } -risingwave_hummock_sdk = { workspace = true } +risingwave_hummock_sdk = { workspace = true, features = ["test"] } risingwave_test_runner = { workspace = true } [features] diff --git a/src/meta/model_v2/src/hummock_version_delta.rs b/src/meta/model_v2/src/hummock_version_delta.rs index 550310e3d6b3f..44f4d24c71949 100644 --- a/src/meta/model_v2/src/hummock_version_delta.rs +++ b/src/meta/model_v2/src/hummock_version_delta.rs @@ -41,7 +41,6 @@ impl From for PbHummockVersionDelta { let ret = value.full_version_delta.to_protobuf(); assert_eq!(value.id, ret.id as i64); assert_eq!(value.prev_id, ret.prev_id as i64); - assert_eq!(value.max_committed_epoch, ret.max_committed_epoch as i64); assert_eq!(value.trivial_move, ret.trivial_move); ret } diff --git a/src/meta/src/barrier/info.rs b/src/meta/src/barrier/info.rs index 06a390380abfb..d0e2ab020f488 100644 --- a/src/meta/src/barrier/info.rs +++ b/src/meta/src/barrier/info.rs @@ -81,6 +81,10 @@ impl InflightGraphInfo { } } + pub fn is_empty(&self) -> bool { + self.fragment_infos.is_empty() + } + /// Update worker nodes snapshot. We need to support incremental updates for it in the future. pub fn on_new_worker_node_map(&self, node_map: &HashMap) { for (node_id, actors) in &self.actor_map { diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index bdfb5a30ad53e..463de3f6febe4 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -29,7 +29,6 @@ use prometheus::HistogramTimer; use risingwave_common::catalog::TableId; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY; -use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH}; use risingwave_common::{bail, must_match}; use risingwave_hummock_sdk::change_log::build_table_change_log_delta; use risingwave_hummock_sdk::sstable_info::SstableInfo; @@ -597,7 +596,7 @@ impl GlobalBarrierManager { let in_flight_barrier_nums = env.opts.in_flight_barrier_nums; let initial_invalid_state = BarrierManagerState::new( - TracedEpoch::new(Epoch(INVALID_EPOCH)), + None, InflightGraphInfo::default(), InflightSubscriptionInfo::default(), None, @@ -949,7 +948,14 @@ impl GlobalBarrierManager { } } - let (prev_epoch, curr_epoch) = self.state.next_epoch_pair(); + let Some((prev_epoch, curr_epoch)) = self.state.next_epoch_pair(&command) else { + // skip the command when there is nothing to do with the barrier + for mut notifier in notifiers { + notifier.notify_started(); + notifier.notify_collected(); + } + return Ok(()); + }; // Insert newly added creating job if let Command::CreateStreamingJob { @@ -1175,7 +1181,6 @@ impl GlobalBarrierManagerContext { change_log_delta: Default::default(), committed_epoch: epoch, tables_to_commit, - is_visible_table_committed_epoch: false, }; self.hummock_manager.commit_epoch(info).await?; Ok(()) @@ -1771,6 +1776,5 @@ fn collect_commit_epoch_info( change_log_delta: table_new_change_log, committed_epoch: epoch, tables_to_commit, - is_visible_table_committed_epoch: true, } } diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index efe164af77979..266e280ca48f4 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -326,16 +326,33 @@ impl GlobalBarrierManager { .context .hummock_manager .on_current_version(|version| { - let max_committed_epoch = version.max_committed_epoch_for_meta(); - for (table_id, info) in version.state_table_info.info() { - assert_eq!( - info.committed_epoch, max_committed_epoch, - "table {} with invisible epoch is not purged", - table_id + let state_table_info = version.state_table_info.info(); + let committed_epoch = state_table_info + .values() + .map(|info| info.committed_epoch) + .next(); + let existing_table_ids = info.existing_table_ids(); + for table_id in existing_table_ids { + assert!( + state_table_info.contains_key(&table_id), + "table id {table_id} not registered to hummock but in recovered job {:?}. hummock table info{:?}", + info.existing_table_ids().collect_vec(), + state_table_info ); } + if let Some(committed_epoch) = committed_epoch { + for (table_id, info) in version.state_table_info.info() { + assert_eq!( + info.committed_epoch, committed_epoch, + "table {} with invisible epoch is not purged", + table_id + ); + } + } ( - TracedEpoch::new(Epoch::from(max_committed_epoch)), + committed_epoch.map(|committed_epoch| { + TracedEpoch::new(Epoch::from(committed_epoch)) + }), version.id, ) }) @@ -388,30 +405,36 @@ impl GlobalBarrierManager { subscriptions_to_add: Default::default(), }); - // Use a different `curr_epoch` for each recovery attempt. - let new_epoch = prev_epoch.next(); - - let mut node_to_collect = control_stream_manager.inject_barrier( - None, - Some(mutation), - (&new_epoch, &prev_epoch), - &BarrierKind::Initial, - &info, - Some(&info), - Some(node_actors), - vec![], - vec![], - )?; - debug!(?node_to_collect, "inject initial barrier"); - while !node_to_collect.is_empty() { - let (worker_id, result) = control_stream_manager - .next_complete_barrier_response() - .await; - let resp = result?; - assert_eq!(resp.epoch, prev_epoch.value().0); - assert!(node_to_collect.remove(&worker_id)); - } - debug!("collected initial barrier"); + let new_epoch = if let Some(prev_epoch) = &prev_epoch { + // Use a different `curr_epoch` for each recovery attempt. + let new_epoch = prev_epoch.next(); + + let mut node_to_collect = control_stream_manager.inject_barrier( + None, + Some(mutation), + (&new_epoch, prev_epoch), + &BarrierKind::Initial, + &info, + Some(&info), + Some(node_actors), + vec![], + vec![], + )?; + debug!(?node_to_collect, "inject initial barrier"); + while !node_to_collect.is_empty() { + let (worker_id, result) = control_stream_manager + .next_complete_barrier_response() + .await; + let resp = result?; + assert_eq!(resp.epoch, prev_epoch.value().0); + assert!(node_to_collect.remove(&worker_id)); + } + debug!("collected initial barrier"); + Some(new_epoch) + } else { + assert!(info.is_empty()); + None + }; ( BarrierManagerState::new(new_epoch, info, subscription_info, paused_reason), @@ -446,7 +469,7 @@ impl GlobalBarrierManager { CheckpointControl::new(self.context.clone(), create_mview_tracker).await; tracing::info!( - epoch = self.state.in_flight_prev_epoch().value().0, + epoch = self.state.in_flight_prev_epoch().map(|epoch| epoch.value().0), paused = ?self.state.paused_reason(), "recovery success" ); diff --git a/src/meta/src/barrier/state.rs b/src/meta/src/barrier/state.rs index fa2ead1b1df05..db2ded5629d7a 100644 --- a/src/meta/src/barrier/state.rs +++ b/src/meta/src/barrier/state.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common::util::epoch::Epoch; use risingwave_pb::meta::PausedReason; use crate::barrier::info::{InflightGraphInfo, InflightSubscriptionInfo}; @@ -23,7 +24,7 @@ pub struct BarrierManagerState { /// /// There's no need to persist this field. On recovery, we will restore this from the latest /// committed snapshot in `HummockManager`. - in_flight_prev_epoch: TracedEpoch, + in_flight_prev_epoch: Option, /// Inflight running actors info. pub(crate) inflight_graph_info: InflightGraphInfo, @@ -36,7 +37,7 @@ pub struct BarrierManagerState { impl BarrierManagerState { pub fn new( - in_flight_prev_epoch: TracedEpoch, + in_flight_prev_epoch: Option, inflight_graph_info: InflightGraphInfo, inflight_subscription_info: InflightSubscriptionInfo, paused_reason: Option, @@ -60,16 +61,24 @@ impl BarrierManagerState { } } - pub fn in_flight_prev_epoch(&self) -> &TracedEpoch { - &self.in_flight_prev_epoch + pub fn in_flight_prev_epoch(&self) -> Option<&TracedEpoch> { + self.in_flight_prev_epoch.as_ref() } /// Returns the epoch pair for the next barrier, and updates the state. - pub fn next_epoch_pair(&mut self) -> (TracedEpoch, TracedEpoch) { - let prev_epoch = self.in_flight_prev_epoch.clone(); + pub fn next_epoch_pair(&mut self, command: &Command) -> Option<(TracedEpoch, TracedEpoch)> { + if self.inflight_graph_info.is_empty() + && !matches!(&command, Command::CreateStreamingJob { .. }) + { + return None; + }; + let in_flight_prev_epoch = self + .in_flight_prev_epoch + .get_or_insert_with(|| TracedEpoch::new(Epoch::now())); + let prev_epoch = in_flight_prev_epoch.clone(); let next_epoch = prev_epoch.next(); - self.in_flight_prev_epoch = next_epoch.clone(); - (prev_epoch, next_epoch) + *in_flight_prev_epoch = next_epoch.clone(); + Some((prev_epoch, next_epoch)) } /// Returns the inflight actor infos that have included the newly added actors in the given command. The dropped actors diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 619a5d24426db..e444f2c263bb3 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -65,11 +65,11 @@ pub struct CommitEpochInfo { pub change_log_delta: HashMap, pub committed_epoch: u64, pub tables_to_commit: HashSet, - pub is_visible_table_committed_epoch: bool, } impl HummockManager { - /// Caller should ensure `epoch` > `max_committed_epoch` + /// Caller should ensure `epoch` > `committed_epoch` of `tables_to_commit` + /// if tables are not newly added via `new_table_fragment_info` pub async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> Result<()> { let CommitEpochInfo { mut sstables, @@ -79,7 +79,6 @@ impl HummockManager { change_log_delta, committed_epoch, tables_to_commit, - is_visible_table_committed_epoch, } = commit_info; let mut versioning_guard = self.versioning.write().await; let _timer = start_measure_real_process_timer!(self, "commit_epoch"); @@ -88,11 +87,12 @@ impl HummockManager { return Ok(()); } + assert!(!tables_to_commit.is_empty()); + let versioning: &mut Versioning = &mut versioning_guard; self.commit_epoch_sanity_check( committed_epoch, &tables_to_commit, - is_visible_table_committed_epoch, &sstables, &sst_to_context, &versioning.current_version, @@ -194,7 +194,6 @@ impl HummockManager { let time_travel_delta = version.pre_commit_epoch( committed_epoch, &tables_to_commit, - is_visible_table_committed_epoch, new_compaction_group, commit_sstables, &new_table_ids, diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs index a9823768925cf..8bdd1561142a9 100644 --- a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs @@ -218,7 +218,16 @@ impl HummockManager { self.env.notification_manager(), &self.metrics, ); - let mut new_version_delta = version.new_delta(None); + let mut new_version_delta = version.new_delta(); + + let committed_epoch = new_version_delta + .latest_version() + .state_table_info + .info() + .values() + .map(|info| info.committed_epoch) + .max() + .unwrap_or(INVALID_EPOCH); for (table_id, raw_group_id) in pairs { let mut group_id = *raw_group_id; @@ -262,7 +271,7 @@ impl HummockManager { .insert( TableId::new(*table_id), PbStateTableInfoDelta { - committed_epoch: INVALID_EPOCH, + committed_epoch, compaction_group_id: *raw_group_id, } ) @@ -290,7 +299,7 @@ impl HummockManager { self.env.notification_manager(), &self.metrics, ); - let mut new_version_delta = version.new_delta(None); + let mut new_version_delta = version.new_delta(); let mut modified_groups: HashMap = HashMap::new(); // Remove member tables diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs index 863d3e532ece4..9eae55e622b0f 100644 --- a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs @@ -161,7 +161,7 @@ impl HummockManager { self.env.notification_manager(), &self.metrics, ); - let mut new_version_delta = version.new_delta(None); + let mut new_version_delta = version.new_delta(); let target_compaction_group_id = { // merge right_group_id to left_group_id and remove right_group_id @@ -448,7 +448,7 @@ impl HummockManager { self.env.notification_manager(), &self.metrics, ); - let mut new_version_delta = version.new_delta(None); + let mut new_version_delta = version.new_delta(); let split_sst_count = new_version_delta .latest_version() diff --git a/src/meta/src/hummock/manager/compaction/mod.rs b/src/meta/src/hummock/manager/compaction/mod.rs index c4e35ddb921e2..98e4216df0327 100644 --- a/src/meta/src/hummock/manager/compaction/mod.rs +++ b/src/meta/src/hummock/manager/compaction/mod.rs @@ -144,7 +144,7 @@ fn init_selectors() -> HashMap HummockVersionTransaction<'a> { fn apply_compact_task(&mut self, compact_task: &CompactTask) { - let mut version_delta = self.new_delta(None); + let mut version_delta = self.new_delta(); let trivial_move = CompactStatus::is_trivial_move_task(compact_task); version_delta.trivial_move = trivial_move; diff --git a/src/meta/src/hummock/manager/context.rs b/src/meta/src/hummock/manager/context.rs index 230cb29608bc0..b42dd9e54d0ed 100644 --- a/src/meta/src/hummock/manager/context.rs +++ b/src/meta/src/hummock/manager/context.rs @@ -190,7 +190,6 @@ impl HummockManager { &self, committed_epoch: HummockEpoch, tables_to_commit: &HashSet, - is_visible_table_committed_epoch: bool, sstables: &[LocalSstableInfo], sst_to_context: &HashMap, current_version: &HummockVersion, @@ -215,17 +214,6 @@ impl HummockManager { } } - if is_visible_table_committed_epoch - && committed_epoch <= current_version.max_committed_epoch_for_meta() - { - return Err(anyhow::anyhow!( - "Epoch {} <= max_committed_epoch {}", - committed_epoch, - current_version.max_committed_epoch_for_meta() - ) - .into()); - } - // sanity check on monotonically increasing table committed epoch for table_id in tables_to_commit { if let Some(info) = current_version.state_table_info.info().get(table_id) { diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 82b5dda90dd69..74668e67a27c2 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -22,7 +22,7 @@ use itertools::Itertools; use prometheus::Registry; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; -use risingwave_common::util::epoch::{test_epoch, EpochExt, INVALID_EPOCH}; +use risingwave_common::util::epoch::{test_epoch, EpochExt}; use risingwave_hummock_sdk::compact::compact_task_to_string; use risingwave_hummock_sdk::compact_task::CompactTask; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::get_compaction_group_ssts; @@ -52,6 +52,26 @@ use crate::manager::{MetaSrvEnv, MetaStoreImpl}; use crate::model::MetadataModel; use crate::rpc::metrics::MetaMetrics; +pub fn version_max_committed_epoch(version: &HummockVersion) -> u64 { + let committed_epoch = version + .state_table_info + .info() + .values() + .next() + .unwrap() + .committed_epoch; + assert!( + version + .state_table_info + .info() + .values() + .all(|info| info.committed_epoch == committed_epoch), + "info: {:?}", + version.state_table_info.info() + ); + committed_epoch +} + fn pin_versions_sum(pin_versions: &[HummockPinnedVersion]) -> usize { pin_versions.iter().len() } @@ -270,10 +290,6 @@ async fn test_hummock_transaction() { .await; // Get tables before committing epoch1. No tables should be returned. let current_version = hummock_manager.get_current_version().await; - assert_eq!( - current_version.max_committed_epoch_for_test(), - INVALID_EPOCH - ); let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); assert!(get_sorted_committed_object_ids(¤t_version, compaction_group_id).is_empty()); @@ -294,7 +310,7 @@ async fn test_hummock_transaction() { // Get tables after committing epoch1. All tables committed in epoch1 should be returned let current_version = hummock_manager.get_current_version().await; let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); - assert_eq!(current_version.max_committed_epoch_for_test(), epoch1); + assert_eq!(version_max_committed_epoch(¤t_version), epoch1); assert_eq!( get_sorted_object_ids(&committed_tables), get_sorted_committed_object_ids(¤t_version, compaction_group_id) @@ -318,7 +334,7 @@ async fn test_hummock_transaction() { // tables_in_epoch2 should be invisible. let current_version = hummock_manager.get_current_version().await; let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); - assert_eq!(current_version.max_committed_epoch_for_test(), epoch1); + assert_eq!(version_max_committed_epoch(¤t_version), epoch1); assert_eq!( get_sorted_object_ids(&committed_tables), get_sorted_committed_object_ids(¤t_version, compaction_group_id) @@ -342,7 +358,7 @@ async fn test_hummock_transaction() { // returned let current_version = hummock_manager.get_current_version().await; let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); - assert_eq!(current_version.max_committed_epoch_for_test(), epoch2); + assert_eq!(version_max_committed_epoch(¤t_version), epoch2); assert_eq!( get_sorted_object_ids(&committed_tables), get_sorted_committed_object_ids(¤t_version, compaction_group_id) @@ -623,7 +639,7 @@ async fn test_pin_snapshot_response_lost() { // Pin a snapshot with smallest last_pin // [ e0 ] -> [ e0:pinned ] let mut epoch_recorded_in_frontend = hummock_manager - .on_current_version(|version| version.max_committed_epoch_for_test()) + .on_current_version(version_max_committed_epoch) .await; let prev_epoch = epoch.prev_epoch(); assert_eq!(epoch_recorded_in_frontend, prev_epoch); @@ -652,7 +668,7 @@ async fn test_pin_snapshot_response_lost() { // Assume the response of the previous rpc is lost. // [ e0:pinned, e1 ] -> [ e0, e1:pinned ] epoch_recorded_in_frontend = hummock_manager - .on_current_version(|version| version.max_committed_epoch_for_test()) + .on_current_version(version_max_committed_epoch) .await; let prev_epoch = epoch.prev_epoch(); assert_eq!(epoch_recorded_in_frontend, prev_epoch); @@ -660,7 +676,7 @@ async fn test_pin_snapshot_response_lost() { // Assume the response of the previous rpc is lost. // [ e0, e1:pinned ] -> [ e0, e1:pinned ] epoch_recorded_in_frontend = hummock_manager - .on_current_version(|version| version.max_committed_epoch_for_test()) + .on_current_version(version_max_committed_epoch) .await; assert_eq!(epoch_recorded_in_frontend, epoch.prev_epoch()); @@ -688,7 +704,7 @@ async fn test_pin_snapshot_response_lost() { // Use correct snapshot id. // [ e0, e1:pinned, e2 ] -> [ e0, e1:pinned, e2:pinned ] epoch_recorded_in_frontend = hummock_manager - .on_current_version(|version| version.max_committed_epoch_for_test()) + .on_current_version(version_max_committed_epoch) .await; assert_eq!(epoch_recorded_in_frontend, epoch.prev_epoch()); @@ -716,7 +732,7 @@ async fn test_pin_snapshot_response_lost() { // Use u64::MAX as epoch to pin greatest snapshot // [ e0, e1:pinned, e2:pinned, e3 ] -> [ e0, e1:pinned, e2:pinned, e3::pinned ] epoch_recorded_in_frontend = hummock_manager - .on_current_version(|version| version.max_committed_epoch_for_test()) + .on_current_version(version_max_committed_epoch) .await; assert_eq!(epoch_recorded_in_frontend, epoch.prev_epoch()); } @@ -1223,7 +1239,7 @@ async fn test_extend_objects_to_delete() { ); let objects_to_delete = hummock_manager.get_objects_to_delete(); assert_eq!(objects_to_delete.len(), orphan_sst_num as usize); - let new_epoch = pinned_version2.max_committed_epoch_for_test().next_epoch(); + let new_epoch = version_max_committed_epoch(&pinned_version2).next_epoch(); hummock_meta_client .commit_epoch( new_epoch, @@ -1236,7 +1252,7 @@ async fn test_extend_objects_to_delete() { .await .unwrap(); let pinned_version3: HummockVersion = hummock_manager.pin_version(context_id).await.unwrap(); - assert_eq!(new_epoch, pinned_version3.max_committed_epoch_for_test()); + assert_eq!(new_epoch, version_max_committed_epoch(&pinned_version3)); hummock_manager .unpin_version_before(context_id, pinned_version3.id) .await diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index c664c76b400fc..b8e9335a161b6 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -43,9 +43,6 @@ fn trigger_delta_log_stats(metrics: &MetaMetrics, total_number: usize) { } fn trigger_version_stat(metrics: &MetaMetrics, current_version: &HummockVersion) { - metrics - .max_committed_epoch - .set(current_version.max_committed_epoch_for_meta() as i64); metrics .version_size .set(current_version.estimated_encode_len() as i64); @@ -97,13 +94,8 @@ impl<'a> HummockVersionTransaction<'a> { } } - pub(super) fn new_delta<'b>( - &'b mut self, - max_committed_epoch: Option, - ) -> SingleDeltaTransaction<'a, 'b> { - let delta = self - .latest_version() - .version_delta_after(max_committed_epoch); + pub(super) fn new_delta<'b>(&'b mut self) -> SingleDeltaTransaction<'a, 'b> { + let delta = self.latest_version().version_delta_after(); SingleDeltaTransaction { version_txn: self, delta: Some(delta), @@ -123,19 +115,13 @@ impl<'a> HummockVersionTransaction<'a> { &mut self, committed_epoch: HummockEpoch, tables_to_commit: &HashSet, - is_visible_table_committed_epoch: bool, new_compaction_group: Option<(CompactionGroupId, CompactionConfig)>, commit_sstables: BTreeMap>, new_table_ids: &HashMap, new_table_watermarks: HashMap, change_log_delta: HashMap, ) -> HummockVersionDelta { - let new_max_committed_epoch = if is_visible_table_committed_epoch { - Some(committed_epoch) - } else { - None - }; - let mut new_version_delta = self.new_delta(new_max_committed_epoch); + let mut new_version_delta = self.new_delta(); new_version_delta.new_table_watermarks = new_table_watermarks; new_version_delta.change_log_delta = change_log_delta; @@ -163,12 +149,23 @@ impl<'a> HummockVersionTransaction<'a> { // Append SSTs to a new version. for (compaction_group_id, inserted_table_infos) in commit_sstables { + let l0_sub_level_id = new_version_delta + .latest_version() + .levels + .get(&compaction_group_id) + .and_then(|levels| { + levels + .l0 + .sub_levels + .last() + .map(|level| level.sub_level_id + 1) + }) + .unwrap_or(committed_epoch); let group_deltas = &mut new_version_delta .group_deltas .entry(compaction_group_id) .or_default() .group_deltas; - let l0_sub_level_id = committed_epoch; let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new( 0, l0_sub_level_id, diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index e396a0123b9b1..725d838711e9e 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -255,7 +255,7 @@ impl HummockManager { self.env.notification_manager(), &self.metrics, ); - let mut new_version_delta = version.new_delta(None); + let mut new_version_delta = version.new_delta(); new_version_delta.with_latest_version(|version, delta| { version.may_fill_backward_compatible_state_table_info_delta(delta) }); diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index bc87348c09308..ba54bc64969ad 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -162,6 +162,13 @@ impl HummockMetaClient for MockHummockMetaClient { .iter() .flat_map(|sstable| sstable.sst_info.table_ids.clone()) }) + .chain( + sync_result + .table_watermarks + .keys() + .map(|table_id| table_id.table_id), + ) + .chain(table_ids.iter().cloned()) .collect::>(); let new_table_fragment_info = if commit_table_ids @@ -216,7 +223,6 @@ impl HummockMetaClient for MockHummockMetaClient { .cloned() .map(TableId::from) .collect(), - is_visible_table_committed_epoch: true, }) .await .map_err(mock_err)?; diff --git a/src/storage/backup/src/lib.rs b/src/storage/backup/src/lib.rs index 18d619c1e504f..37fa28f4fe260 100644 --- a/src/storage/backup/src/lib.rs +++ b/src/storage/backup/src/lib.rs @@ -53,7 +53,6 @@ pub struct MetaSnapshotMetadata { pub id: MetaSnapshotId, pub hummock_version_id: HummockVersionId, pub ssts: HashSet, - pub max_committed_epoch: u64, #[serde(default)] pub format_version: u32, pub remarks: Option, @@ -73,7 +72,6 @@ impl MetaSnapshotMetadata { id, hummock_version_id: v.id, ssts: v.get_object_ids(), - max_committed_epoch: v.max_committed_epoch_for_meta(), format_version, remarks, state_table_info: v @@ -116,7 +114,6 @@ impl From<&MetaSnapshotMetadata> for PbMetaSnapshotMetadata { Self { id: m.id, hummock_version_id: m.hummock_version_id.to_u64(), - max_committed_epoch: m.max_committed_epoch, format_version: Some(m.format_version), remarks: m.remarks.clone(), state_table_info: m diff --git a/src/storage/benches/bench_table_watermarks.rs b/src/storage/benches/bench_table_watermarks.rs index b314d665c313d..a99dd765fd593 100644 --- a/src/storage/benches/bench_table_watermarks.rs +++ b/src/storage/benches/bench_table_watermarks.rs @@ -118,7 +118,6 @@ fn gen_version( let committed_epoch = test_epoch(new_epoch_idx as _); let mut version = HummockVersion::from_persisted_protobuf(&PbHummockVersion { id: new_epoch_idx as _, - max_committed_epoch: committed_epoch, ..Default::default() }); version.table_watermarks = (0..table_count) diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 2ce4eeda1dc7d..338c18aa23690 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -575,9 +575,14 @@ impl HummockVersion { &version_delta.removed_table_ids, ); - if !is_commit_epoch && self.max_committed_epoch < version_delta.max_committed_epoch { - is_commit_epoch = true; - tracing::trace!("max committed epoch bumped but no table committed epoch is changed"); + #[expect(deprecated)] + { + if !is_commit_epoch && self.max_committed_epoch < version_delta.max_committed_epoch { + is_commit_epoch = true; + tracing::trace!( + "max committed epoch bumped but no table committed epoch is changed" + ); + } } // apply to `levels`, which is different compaction groups @@ -638,20 +643,12 @@ impl HummockVersion { ); self.merge_compaction_group(group_merge.left_group_id, group_merge.right_group_id) } - let max_committed_epoch = self.max_committed_epoch; let group_destroy = summary.group_destroy; let levels = self.levels.get_mut(compaction_group_id).unwrap_or_else(|| { panic!("compaction group {} does not exist", compaction_group_id) }); - assert!( - max_committed_epoch <= version_delta.max_committed_epoch, - "new max commit epoch {} is older than the current max commit epoch {}", - version_delta.max_committed_epoch, - max_committed_epoch - ); if is_commit_epoch { - // `max_committed_epoch` increases. It must be a `commit_epoch` let GroupDeltasSummary { delete_sst_levels, delete_sst_ids_set, @@ -687,7 +684,7 @@ impl HummockVersion { } } } else { - // `max_committed_epoch` is not changed. The delta is caused by compaction. + // The delta is caused by compaction. levels.apply_compact_ssts( summary, self.state_table_info @@ -699,7 +696,10 @@ impl HummockVersion { } } self.id = version_delta.id; - self.max_committed_epoch = version_delta.max_committed_epoch; + #[expect(deprecated)] + { + self.max_committed_epoch = version_delta.max_committed_epoch; + } // apply to table watermark diff --git a/src/storage/hummock_sdk/src/frontend_version.rs b/src/storage/hummock_sdk/src/frontend_version.rs index 254e151f7ec06..fd010e1c3e6ff 100644 --- a/src/storage/hummock_sdk/src/frontend_version.rs +++ b/src/storage/hummock_sdk/src/frontend_version.rs @@ -62,6 +62,7 @@ impl FrontendHummockVersion { } pub fn to_protobuf(&self) -> PbHummockVersion { + #[expect(deprecated)] PbHummockVersion { id: self.id.0, levels: Default::default(), @@ -174,6 +175,7 @@ impl FrontendHummockVersionDelta { } pub fn to_protobuf(&self) -> PbHummockVersionDelta { + #[expect(deprecated)] PbHummockVersionDelta { id: self.id.to_u64(), prev_id: self.prev_id.to_u64(), diff --git a/src/storage/hummock_sdk/src/table_watermark.rs b/src/storage/hummock_sdk/src/table_watermark.rs index e2d5a675f3e99..324e8a91cf4a3 100644 --- a/src/storage/hummock_sdk/src/table_watermark.rs +++ b/src/storage/hummock_sdk/src/table_watermark.rs @@ -726,8 +726,9 @@ mod tests { use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::{test_epoch, EpochExt}; - use risingwave_pb::hummock::PbHummockVersion; + use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo}; + use crate::compaction_group::StaticCompactionGroupId; use crate::key::{is_empty_key_range, prefixed_range_with_vnode, TableKeyRange}; use crate::table_watermark::{ merge_multiple_new_table_watermarks, TableWatermarks, TableWatermarksIndex, VnodeWatermark, @@ -1184,11 +1185,18 @@ mod tests { watermark3.clone(), ); + let test_table_id = TableId::from(233); + let mut version = HummockVersion::from_rpc_protobuf(&PbHummockVersion { - max_committed_epoch: EPOCH1, + state_table_info: HashMap::from_iter([( + test_table_id.table_id, + StateTableInfo { + committed_epoch: EPOCH1, + compaction_group_id: StaticCompactionGroupId::StateDefault as _, + }, + )]), ..Default::default() }); - let test_table_id = TableId::from(233); version.table_watermarks.insert( test_table_id, TableWatermarks { diff --git a/src/storage/hummock_sdk/src/time_travel.rs b/src/storage/hummock_sdk/src/time_travel.rs index 7c6b368300329..3a58a7daa760c 100644 --- a/src/storage/hummock_sdk/src/time_travel.rs +++ b/src/storage/hummock_sdk/src/time_travel.rs @@ -91,6 +91,7 @@ fn refill_sstable_info( impl From<(&HummockVersion, &HashSet)> for IncompleteHummockVersion { fn from(p: (&HummockVersion, &HashSet)) -> Self { let (version, select_group) = p; + #[expect(deprecated)] Self { id: version.id, levels: version @@ -136,6 +137,7 @@ pub type IncompleteHummockVersionDelta = HummockVersionDeltaCommon)> for IncompleteHummockVersionDelta { fn from(p: (&HummockVersionDelta, &HashSet)) -> Self { let (delta, select_group) = p; + #[expect(deprecated)] Self { id: delta.id, prev_id: delta.prev_id, diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 46807a5138d09..dbd927e3d724a 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -219,6 +219,7 @@ impl HummockVersionStateTableInfo { pub struct HummockVersionCommon { pub id: HummockVersionId, pub levels: HashMap>, + #[deprecated] pub(crate) max_committed_epoch: u64, pub table_watermarks: HashMap>, pub table_change_log: HashMap>, @@ -277,6 +278,7 @@ where T: for<'a> From<&'a PbSstableInfo>, { fn from(pb_version: &PbHummockVersion) -> Self { + #[expect(deprecated)] Self { id: HummockVersionId(pb_version.id), levels: pb_version @@ -319,6 +321,7 @@ where PbSstableInfo: for<'a> From<&'a T>, { fn from(version: &HummockVersionCommon) -> Self { + #[expect(deprecated)] Self { id: version.id.0, levels: version @@ -348,6 +351,7 @@ where PbSstableInfo: for<'a> From<&'a T>, { fn from(version: HummockVersionCommon) -> Self { + #[expect(deprecated)] Self { id: version.id.0, levels: version @@ -413,15 +417,6 @@ impl HummockVersion { } } - pub fn max_committed_epoch_for_meta(&self) -> u64 { - self.max_committed_epoch - } - - #[cfg(any(test, feature = "test"))] - pub fn max_committed_epoch_for_test(&self) -> u64 { - self.max_committed_epoch - } - pub fn table_committed_epoch(&self, table_id: TableId) -> Option { self.state_table_info .info() @@ -430,6 +425,7 @@ impl HummockVersion { } pub fn create_init_version(default_compaction_config: Arc) -> HummockVersion { + #[expect(deprecated)] let mut init_version = HummockVersion { id: FIRST_VERSION_ID, levels: Default::default(), @@ -450,19 +446,13 @@ impl HummockVersion { init_version } - pub fn version_delta_after(&self, max_committed_epoch: Option) -> HummockVersionDelta { - let max_committed_epoch = max_committed_epoch.unwrap_or(self.max_committed_epoch); - assert!( - max_committed_epoch >= self.max_committed_epoch, - "new max_committed_epoch {} less than prev max_committed_epoch: {}", - max_committed_epoch, - self.max_committed_epoch - ); + pub fn version_delta_after(&self) -> HummockVersionDelta { + #[expect(deprecated)] HummockVersionDelta { id: self.next_version_id(), prev_id: self.id, trivial_move: false, - max_committed_epoch, + max_committed_epoch: self.max_committed_epoch, group_deltas: Default::default(), new_table_watermarks: HashMap::new(), removed_table_ids: HashSet::new(), @@ -477,6 +467,7 @@ pub struct HummockVersionDeltaCommon { pub id: HummockVersionId, pub prev_id: HummockVersionId, pub group_deltas: HashMap>, + #[deprecated] pub(crate) max_committed_epoch: u64, pub trivial_move: bool, pub new_table_watermarks: HashMap, @@ -600,6 +591,7 @@ impl HummockVersionDelta { })) } + #[expect(deprecated)] pub fn max_committed_epoch_for_migration(&self) -> HummockEpoch { self.max_committed_epoch } @@ -610,6 +602,7 @@ where T: for<'a> From<&'a PbSstableInfo>, { fn from(pb_version_delta: &PbHummockVersionDelta) -> Self { + #[expect(deprecated)] Self { id: HummockVersionId(pb_version_delta.id), prev_id: HummockVersionId(pb_version_delta.prev_id), @@ -665,6 +658,7 @@ where PbSstableInfo: for<'a> From<&'a T>, { fn from(version_delta: &HummockVersionDeltaCommon) -> Self { + #[expect(deprecated)] Self { id: version_delta.id.0, prev_id: version_delta.prev_id.0, @@ -704,6 +698,7 @@ where PbSstableInfo: From, { fn from(version_delta: HummockVersionDeltaCommon) -> Self { + #[expect(deprecated)] Self { id: version_delta.id.0, prev_id: version_delta.prev_id.0, @@ -743,6 +738,7 @@ where T: From, { fn from(pb_version_delta: PbHummockVersionDelta) -> Self { + #[expect(deprecated)] Self { id: HummockVersionId(pb_version_delta.id), prev_id: HummockVersionId(pb_version_delta.prev_id), diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 038621af70b8b..402d902d27e8c 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -808,10 +808,7 @@ pub(crate) mod tests { retention_seconds: Some(retention_seconds_expire_second), }, )]); - compact_task.current_epoch_time = hummock_manager_ref - .get_current_version() - .await - .max_committed_epoch_for_test(); + compact_task.current_epoch_time = Epoch::now().0; // assert compact_task assert_eq!( @@ -1014,10 +1011,7 @@ pub(crate) mod tests { let compaction_filter_flag = CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL; compact_task.compaction_filter_mask = compaction_filter_flag.bits(); - compact_task.current_epoch_time = hummock_manager_ref - .get_current_version() - .await - .max_committed_epoch_for_test(); + compact_task.current_epoch_time = Epoch::now().0; // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); @@ -2001,10 +1995,7 @@ pub(crate) mod tests { let compaction_filter_flag = CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL; compact_task.compaction_filter_mask = compaction_filter_flag.bits(); - compact_task.current_epoch_time = hummock_manager_ref - .get_current_version() - .await - .max_committed_epoch_for_test(); + compact_task.current_epoch_time = Epoch::now().0; // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); @@ -2228,10 +2219,7 @@ pub(crate) mod tests { let compaction_filter_flag = CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL; compact_task.compaction_filter_mask = compaction_filter_flag.bits(); - compact_task.current_epoch_time = hummock_manager_ref - .get_current_version() - .await - .max_committed_epoch_for_test(); + compact_task.current_epoch_time = Epoch::now().0; // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); diff --git a/src/storage/hummock_test/src/hummock_read_version_tests.rs b/src/storage/hummock_test/src/hummock_read_version_tests.rs index 0e49b9de872e8..c1e1276025f36 100644 --- a/src/storage/hummock_test/src/hummock_read_version_tests.rs +++ b/src/storage/hummock_test/src/hummock_read_version_tests.rs @@ -335,11 +335,8 @@ async fn test_read_filter_basic() { assert_eq!(1, hummock_read_snapshot.0.len()); assert_eq!(0, hummock_read_snapshot.1.len()); assert_eq!( - read_version - .read() - .committed() - .max_committed_epoch_for_test(), - hummock_read_snapshot.2.max_committed_epoch_for_test() + read_version.read().committed().id, + hummock_read_snapshot.2.id, ); } } diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 0e45aeb7e833d..7c70721f04d82 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -25,7 +25,7 @@ use risingwave_common::bitmap::BitmapBuilder; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_common::range::RangeBoundsExt; -use risingwave_common::util::epoch::{test_epoch, EpochExt}; +use risingwave_common::util::epoch::{test_epoch, EpochExt, INVALID_EPOCH}; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::key::{ gen_key_from_bytes, prefixed_range_with_vnode, FullKey, TableKey, UserKey, TABLE_PREFIX_LEN, @@ -474,7 +474,8 @@ async fn test_state_store_sync() { let base_epoch = read_version .read() .committed() - .max_committed_epoch_for_test(); + .table_committed_epoch(TEST_TABLE_ID) + .unwrap(); let epoch1 = test_epoch(base_epoch.next_epoch()); test_env .storage @@ -845,7 +846,8 @@ async fn test_delete_get() { .read_version() .read() .committed() - .max_committed_epoch_for_test(); + .table_committed_epoch(TEST_TABLE_ID) + .unwrap(); let epoch1 = initial_epoch.next_epoch(); test_env @@ -947,7 +949,8 @@ async fn test_multiple_epoch_sync() { .read_version() .read() .committed() - .max_committed_epoch_for_test(); + .table_committed_epoch(TEST_TABLE_ID) + .unwrap(); let epoch1 = initial_epoch.next_epoch(); test_env @@ -1658,8 +1661,12 @@ async fn test_hummock_version_reader() { .read_version() .read() .committed() - .max_committed_epoch_for_test(), - read_snapshot.2.max_committed_epoch_for_test() + .table_committed_epoch(TEST_TABLE_ID) + .unwrap(), + read_snapshot + .2 + .table_committed_epoch(TEST_TABLE_ID) + .unwrap() ); let iter = hummock_version_reader @@ -2420,7 +2427,7 @@ async fn test_table_watermark() { .get(&TEST_TABLE_ID) .unwrap() .clone(), - version.max_committed_epoch_for_test(), + epoch, ); assert_eq!(WatermarkDirection::Ascending, table_watermarks.direction()); assert_eq!( @@ -2575,51 +2582,43 @@ async fn test_commit_multi_epoch() { HashMap::from_iter(object_ids.iter().map(|object_id| (*object_id, context_id))) }; let existing_table_id = TableId::new(1); - let initial_epoch = test_env - .manager - .get_current_version() - .await - .max_committed_epoch_for_test(); - - let commit_epoch = |epoch, - sst: SstableInfo, + let initial_epoch = INVALID_EPOCH; + + let commit_epoch = + |epoch, sst: SstableInfo, new_table_fragment_info, tables_to_commit: &[TableId]| { + let manager = &test_env.manager; + let tables_to_commit = tables_to_commit.iter().cloned().collect(); + async move { + manager + .commit_epoch(CommitEpochInfo { + new_table_watermarks: Default::default(), + sst_to_context: context_id_map(&[sst.object_id]), + sstables: vec![LocalSstableInfo { + table_stats: sst + .table_ids + .iter() + .map(|&table_id| { + ( + table_id, + TableStats { + total_compressed_size: 10, + ..Default::default() + }, + ) + }) + .collect(), + sst_info: sst, + created_at: u64::MAX, + }], new_table_fragment_info, - tables_to_commit: &[TableId], - is_visible_table_committed_epoch| { - let manager = &test_env.manager; - let tables_to_commit = tables_to_commit.iter().cloned().collect(); - async move { - manager - .commit_epoch(CommitEpochInfo { - new_table_watermarks: Default::default(), - sst_to_context: context_id_map(&[sst.object_id]), - sstables: vec![LocalSstableInfo { - table_stats: sst - .table_ids - .iter() - .map(|&table_id| { - ( - table_id, - TableStats { - total_compressed_size: 10, - ..Default::default() - }, - ) - }) - .collect(), - sst_info: sst, - created_at: u64::MAX, - }], - new_table_fragment_info, - change_log_delta: Default::default(), - committed_epoch: epoch, - tables_to_commit, - is_visible_table_committed_epoch, - }) - .await - .unwrap(); - } - }; + change_log_delta: Default::default(), + committed_epoch: epoch, + tables_to_commit, + }) + .await + .unwrap(); + } + }; let epoch1 = initial_epoch.next_epoch(); let sst1_epoch1 = SstableInfo { @@ -2639,7 +2638,6 @@ async fn test_commit_multi_epoch() { internal_table_ids: vec![existing_table_id], }, &[existing_table_id], - true, ) .await; @@ -2652,12 +2650,9 @@ async fn test_commit_multi_epoch() { let sub_levels = &cg.l0.sub_levels; assert_eq!(sub_levels.len(), 1); let sub_level = &sub_levels[0]; - assert_eq!(sub_level.sub_level_id, epoch1); assert_eq!(sub_level.table_infos.len(), 1); assert_eq!(sub_level.table_infos[0].object_id, sst1_epoch1.object_id); - assert_eq!(version.max_committed_epoch_for_test(), epoch1); - let info = version .state_table_info .info() @@ -2688,7 +2683,6 @@ async fn test_commit_multi_epoch() { sst1_epoch2.clone(), NewTableFragmentInfo::None, &[existing_table_id], - true, ) .await; @@ -2701,16 +2695,12 @@ async fn test_commit_multi_epoch() { let sub_levels = &cg.l0.sub_levels; assert_eq!(sub_levels.len(), 2); let sub_level = &sub_levels[0]; - assert_eq!(sub_level.sub_level_id, epoch1); assert_eq!(sub_level.table_infos.len(), 1); assert_eq!(sub_level.table_infos[0].object_id, sst1_epoch1.object_id); let sub_level = &sub_levels[1]; - assert_eq!(sub_level.sub_level_id, epoch2); assert_eq!(sub_level.table_infos.len(), 1); assert_eq!(sub_level.table_infos[0].object_id, sst1_epoch2.object_id); - assert_eq!(version.max_committed_epoch_for_test(), epoch2); - let info = version .state_table_info .info() @@ -2741,7 +2731,6 @@ async fn test_commit_multi_epoch() { table_ids: HashSet::from_iter([new_table_id]), }, &[new_table_id], - false, ) .await; @@ -2756,12 +2745,9 @@ async fn test_commit_multi_epoch() { let sub_levels = &new_cg.l0.sub_levels; assert_eq!(sub_levels.len(), 1); let sub_level1 = &sub_levels[0]; - assert_eq!(sub_level1.sub_level_id, epoch1); assert_eq!(sub_level1.table_infos.len(), 1); assert_eq!(sub_level1.table_infos[0].object_id, sst2_epoch1.object_id); - assert_eq!(version.max_committed_epoch_for_test(), epoch2); - let info = version.state_table_info.info().get(&new_table_id).unwrap(); assert_eq!(info.committed_epoch, epoch1); assert_eq!(info.compaction_group_id, new_cg_id); @@ -2783,7 +2769,6 @@ async fn test_commit_multi_epoch() { sst2_epoch2.clone(), NewTableFragmentInfo::None, &[new_table_id], - false, ) .await; @@ -2794,16 +2779,12 @@ async fn test_commit_multi_epoch() { let sub_levels = &new_cg.l0.sub_levels; assert_eq!(sub_levels.len(), 2); let sub_level1 = &sub_levels[0]; - assert_eq!(sub_level1.sub_level_id, epoch1); assert_eq!(sub_level1.table_infos.len(), 1); assert_eq!(sub_level1.table_infos[0].object_id, sst2_epoch1.object_id); let sub_level2 = &sub_levels[1]; - assert_eq!(sub_level2.sub_level_id, epoch2); assert_eq!(sub_level2.table_infos.len(), 1); assert_eq!(sub_level2.table_infos[0].object_id, sst2_epoch2.object_id); - assert_eq!(version.max_committed_epoch_for_test(), epoch2); - let info = version.state_table_info.info().get(&new_table_id).unwrap(); assert_eq!(info.committed_epoch, epoch2); assert_eq!(info.compaction_group_id, new_cg_id); @@ -2825,7 +2806,6 @@ async fn test_commit_multi_epoch() { sst_epoch3.clone(), NewTableFragmentInfo::None, &[existing_table_id, new_table_id], - true, ) .await; @@ -2838,15 +2818,12 @@ async fn test_commit_multi_epoch() { let sub_levels = &old_cg.l0.sub_levels; assert_eq!(sub_levels.len(), 3); let sub_level1 = &sub_levels[0]; - assert_eq!(sub_level1.sub_level_id, epoch1); assert_eq!(sub_level1.table_infos.len(), 1); assert_eq!(sub_level1.table_infos[0].object_id, sst1_epoch1.object_id); let sub_level2 = &sub_levels[1]; - assert_eq!(sub_level2.sub_level_id, epoch2); assert_eq!(sub_level2.table_infos.len(), 1); assert_eq!(sub_level2.table_infos[0].object_id, sst1_epoch2.object_id); let sub_level3 = &sub_levels[2]; - assert_eq!(sub_level3.sub_level_id, epoch3); assert_eq!(sub_level3.table_infos.len(), 1); assert_eq!(sub_level3.table_infos[0].object_id, sst_epoch3.object_id); @@ -2854,20 +2831,15 @@ async fn test_commit_multi_epoch() { let sub_levels = &new_cg.l0.sub_levels; assert_eq!(sub_levels.len(), 3); let sub_level1 = &sub_levels[0]; - assert_eq!(sub_level1.sub_level_id, epoch1); assert_eq!(sub_level1.table_infos.len(), 1); assert_eq!(sub_level1.table_infos[0].object_id, sst2_epoch1.object_id); let sub_level2 = &sub_levels[1]; - assert_eq!(sub_level2.sub_level_id, epoch2); assert_eq!(sub_level2.table_infos.len(), 1); assert_eq!(sub_level2.table_infos[0].object_id, sst2_epoch2.object_id); let sub_level3 = &sub_levels[1]; - assert_eq!(sub_level3.sub_level_id, epoch2); assert_eq!(sub_level3.table_infos.len(), 1); assert_eq!(sub_level3.table_infos[0].object_id, sst2_epoch2.object_id); - assert_eq!(version.max_committed_epoch_for_test(), epoch3); - let info = version.state_table_info.info().get(&new_table_id).unwrap(); assert_eq!(info.committed_epoch, epoch3); diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index ab38ccf33fb5c..7b2c5b5e60649 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -25,7 +25,7 @@ use itertools::Itertools; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::hash::VirtualNode; -use risingwave_common::util::epoch::{test_epoch, EpochExt, MAX_EPOCH}; +use risingwave_common::util::epoch::{test_epoch, EpochExt, INVALID_EPOCH, MAX_EPOCH}; use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, TableKeyRange}; use risingwave_hummock_sdk::{ HummockReadEpoch, HummockSstableObjectId, LocalSstableInfo, SyncResult, @@ -423,10 +423,7 @@ async fn test_basic_v2() { async fn test_state_store_sync_v2() { let (hummock_storage, _meta_client) = with_hummock_storage_v2(Default::default()).await; - let mut epoch = hummock_storage - .get_pinned_version() - .max_committed_epoch_for_test() - .next_epoch(); + let mut epoch = INVALID_EPOCH.next_epoch(); // ingest 16B batch let mut batch1 = vec![ @@ -1040,9 +1037,7 @@ async fn test_reload_storage() { async fn test_delete_get_v2() { let (hummock_storage, meta_client) = with_hummock_storage_v2(Default::default()).await; - let initial_epoch = hummock_storage - .get_pinned_version() - .max_committed_epoch_for_test(); + let initial_epoch = INVALID_EPOCH; let epoch1 = initial_epoch.next_epoch(); hummock_storage.start_epoch(epoch1, HashSet::from_iter([Default::default()])); let batch1 = vec![ @@ -1132,9 +1127,7 @@ async fn test_delete_get_v2() { async fn test_multiple_epoch_sync_v2() { let (hummock_storage, meta_client) = with_hummock_storage_v2(Default::default()).await; - let initial_epoch = hummock_storage - .get_pinned_version() - .max_committed_epoch_for_test(); + let initial_epoch = INVALID_EPOCH; let epoch1 = initial_epoch.next_epoch(); let batch1 = vec![ ( @@ -1310,9 +1303,7 @@ async fn test_gc_watermark_and_clear_shared_buffer() { .await; let table_id_set = HashSet::from_iter([local_hummock_storage.table_id()]); - let initial_epoch = hummock_storage - .get_pinned_version() - .max_committed_epoch_for_test(); + let initial_epoch = INVALID_EPOCH; let epoch1 = initial_epoch.next_epoch(); hummock_storage.start_epoch(epoch1, HashSet::from_iter([Default::default()])); local_hummock_storage.init_for_test(epoch1).await.unwrap(); @@ -1401,11 +1392,11 @@ async fn test_gc_watermark_and_clear_shared_buffer() { async fn test_replicated_local_hummock_storage() { const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; - let (hummock_storage, meta_client) = with_hummock_storage_v2(Default::default()).await; + let (hummock_storage, meta_client) = with_hummock_storage_v2(TEST_TABLE_ID).await; let epoch0 = meta_client .hummock_manager_ref() - .on_current_version(|version| version.max_committed_epoch_for_test()) + .on_current_version(|version| version.table_committed_epoch(TEST_TABLE_ID).unwrap()) .await; let epoch0 = epoch0.next_epoch(); @@ -1416,13 +1407,10 @@ async fn test_replicated_local_hummock_storage() { sstables: vec![], new_table_watermarks: Default::default(), sst_to_context: Default::default(), - new_table_fragment_info: NewTableFragmentInfo::NewCompactionGroup { - table_ids: HashSet::from_iter([TEST_TABLE_ID]), - }, + new_table_fragment_info: NewTableFragmentInfo::None, change_log_delta: Default::default(), committed_epoch: epoch0, - tables_to_commit: Default::default(), - is_visible_table_committed_epoch: true, + tables_to_commit: HashSet::from_iter([TEST_TABLE_ID]), }) .await .unwrap(); diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 15f4c74369fdf..7744b102761de 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -878,7 +878,7 @@ impl SyncedData { #[cfg(test)] mod tests { - use std::collections::HashSet; + use std::collections::{HashMap, HashSet}; use std::future::{poll_fn, Future}; use std::sync::Arc; use std::task::Poll; @@ -888,8 +888,9 @@ mod tests { use risingwave_common::bitmap::Bitmap; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::{test_epoch, EpochExt}; + use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::version::HummockVersion; - use risingwave_pb::hummock::PbHummockVersion; + use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo}; use tokio::spawn; use tokio::sync::mpsc::unbounded_channel; use tokio::sync::oneshot; @@ -908,19 +909,17 @@ mod tests { #[tokio::test] async fn test_clear_shared_buffer() { - let epoch0 = test_epoch(233); let mut next_version_id = 1; - let mut make_new_version = |max_committed_epoch| { + let mut make_new_version = || { let id = next_version_id; next_version_id += 1; HummockVersion::from_rpc_protobuf(&PbHummockVersion { id, - max_committed_epoch, ..Default::default() }) }; - let initial_version = PinnedVersion::new(make_new_version(epoch0), unbounded_channel().0); + let initial_version = PinnedVersion::new(make_new_version(), unbounded_channel().0); let (version_update_tx, version_update_rx) = unbounded_channel(); let (refill_task_tx, mut refill_task_rx) = unbounded_channel(); @@ -961,8 +960,7 @@ mod tests { send_clear(initial_version.id()).await.unwrap(); // test normal refill finish - let epoch1 = epoch0 + 1; - let version1 = make_new_version(epoch1); + let version1 = make_new_version(); { version_update_tx .send(HummockVersionUpdate::PinnedVersion(Box::new( @@ -981,10 +979,8 @@ mod tests { } // test recovery with pending refill task - let epoch2 = epoch1 + 1; - let version2 = make_new_version(epoch2); - let epoch3 = epoch2 + 1; - let version3 = make_new_version(epoch3); + let version2 = make_new_version(); + let version3 = make_new_version(); { version_update_tx .send(HummockVersionUpdate::PinnedVersion(Box::new( @@ -1016,10 +1012,8 @@ mod tests { } // test recovery with later arriving version update - let epoch4 = epoch3 + 1; - let version4 = make_new_version(epoch4); - let epoch5 = epoch4 + 1; - let version5 = make_new_version(epoch5); + let version4 = make_new_version(); + let version5 = make_new_version(); { let mut rx = send_clear(version5.id); assert_pending(&mut rx).await; @@ -1046,7 +1040,13 @@ mod tests { let initial_version = PinnedVersion::new( HummockVersion::from_rpc_protobuf(&PbHummockVersion { id: 1, - max_committed_epoch: epoch0, + state_table_info: HashMap::from_iter([( + TEST_TABLE_ID.table_id, + StateTableInfo { + committed_epoch: epoch0, + compaction_group_id: StaticCompactionGroupId::StateDefault as _, + }, + )]), ..Default::default() }), unbounded_channel().0, diff --git a/src/storage/src/hummock/event_handler/uploader/mod.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs index 964c5f641520a..cad5708aa831e 100644 --- a/src/storage/src/hummock/event_handler/uploader/mod.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -1647,7 +1647,8 @@ pub(crate) mod tests { uploader .context .pinned_version - .max_committed_epoch_for_test() + .table_committed_epoch(TEST_TABLE_ID) + .unwrap() ); } @@ -1684,7 +1685,8 @@ pub(crate) mod tests { uploader .context .pinned_version - .max_committed_epoch_for_test() + .table_committed_epoch(TEST_TABLE_ID) + .unwrap() ); } @@ -1725,7 +1727,8 @@ pub(crate) mod tests { uploader .context .pinned_version - .max_committed_epoch_for_test() + .table_committed_epoch(TEST_TABLE_ID) + .unwrap() ); } diff --git a/src/storage/src/hummock/event_handler/uploader/test_utils.rs b/src/storage/src/hummock/event_handler/uploader/test_utils.rs index 05bd5200b3486..2f711ede7cefc 100644 --- a/src/storage/src/hummock/event_handler/uploader/test_utils.rs +++ b/src/storage/src/hummock/event_handler/uploader/test_utils.rs @@ -91,7 +91,6 @@ impl HummockUploader { pub(super) fn test_hummock_version(epoch: HummockEpoch) -> HummockVersion { let mut version = HummockVersion::from_persisted_protobuf(&PbHummockVersion { id: epoch, - max_committed_epoch: epoch, ..Default::default() }); version.state_table_info.apply_delta( diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index 0dfdc1aab466a..49ca6bbc17d5e 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -507,7 +507,8 @@ mod tests { let epoch1 = test_env .storage .get_pinned_version() - .max_committed_epoch_for_test() + .table_committed_epoch(TableId::new(table.id)) + .unwrap() .next_epoch(); test_env .storage @@ -616,7 +617,8 @@ mod tests { let epoch1 = test_env .storage .get_pinned_version() - .max_committed_epoch_for_test() + .table_committed_epoch(TableId::new(table.id)) + .unwrap() .next_epoch(); test_env .storage @@ -808,7 +810,8 @@ mod tests { let epoch1 = test_env .storage .get_pinned_version() - .max_committed_epoch_for_test() + .table_committed_epoch(TableId::new(table.id)) + .unwrap() .next_epoch(); test_env .storage @@ -1039,7 +1042,8 @@ mod tests { let epoch1 = test_env .storage .get_pinned_version() - .max_committed_epoch_for_test() + .table_committed_epoch(TableId::new(table.id)) + .unwrap() .next_epoch(); test_env .storage @@ -1236,7 +1240,8 @@ mod tests { let epoch1 = test_env .storage .get_pinned_version() - .max_committed_epoch_for_test() + .table_committed_epoch(TableId::new(table.id)) + .unwrap() .next_epoch(); test_env .storage @@ -1375,7 +1380,8 @@ mod tests { let epoch1 = test_env .storage .get_pinned_version() - .max_committed_epoch_for_test() + .table_committed_epoch(TableId::new(table.id)) + .unwrap() .next_epoch(); test_env .storage @@ -1714,7 +1720,8 @@ mod tests { let epoch1 = test_env .storage .get_pinned_version() - .max_committed_epoch_for_test() + .table_committed_epoch(TableId::new(table.id)) + .unwrap() .next_epoch(); test_env .storage