diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index fe436bdde652..e2a1b24794c6 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -1342,5 +1342,6 @@ fn collect_commit_epoch_info( InflightActorInfo::existing_table_ids(&command_ctx.info.fragment_infos).collect(), )]), epoch, + vec![], ) } diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 025c7f503e89..2054bd4dbb5d 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -32,6 +32,7 @@ use risingwave_pb::hummock::HummockSnapshot; use sea_orm::TransactionTrait; use crate::hummock::error::{Error, Result}; +use crate::hummock::manager::compaction_group_manager::CompactionGroupManager; use crate::hummock::manager::transaction::{ HummockVersionStatsTransaction, HummockVersionTransaction, }; @@ -39,7 +40,8 @@ use crate::hummock::manager::versioning::Versioning; use crate::hummock::metrics_utils::{ get_or_create_local_table_stat, trigger_local_table_stat, trigger_sst_stat, }; -use crate::hummock::sequence::next_sstable_object_id; +use crate::hummock::model::CompactionGroup; +use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id}; use crate::hummock::{ commit_multi_var, commit_multi_var_with_provided_txn, start_measure_real_process_timer, HummockManager, @@ -52,6 +54,11 @@ pub struct NewTableFragmentInfo { pub internal_table_ids: Vec, } +pub struct BatchCommitForNewCg { + pub epoch_to_ssts: BTreeMap>, + pub table_ids: Vec, +} + pub struct CommitEpochInfo { pub sstables: Vec, pub new_table_watermarks: HashMap, @@ -60,6 +67,9 @@ pub struct CommitEpochInfo { pub change_log_delta: HashMap, pub table_committed_epoch: BTreeMap>, pub max_committed_epoch: HummockEpoch, + + // commit multi Epoch and SSTs for new compaction group + pub batch_commit_for_new_cg: Vec, } impl CommitEpochInfo { @@ -71,6 +81,7 @@ impl CommitEpochInfo { change_log_delta: HashMap, table_committed_epoch: BTreeMap>, max_committed_epoch: HummockEpoch, + batch_commit_for_new_cg: Vec, ) -> Self { Self { sstables, @@ -80,6 +91,7 @@ impl CommitEpochInfo { change_log_delta, table_committed_epoch, max_committed_epoch, + batch_commit_for_new_cg, } } } @@ -110,6 +122,7 @@ impl HummockManager { HashMap::new(), BTreeMap::from_iter([(epoch, tables)]), epoch, + vec![], ); self.commit_epoch(info).await?; Ok(()) @@ -128,6 +141,7 @@ impl HummockManager { change_log_delta, table_committed_epoch, max_committed_epoch: epoch, + batch_commit_for_new_cg, } = commit_info; let mut versioning_guard = self.versioning.write().await; let _timer = start_measure_real_process_timer!(self, "commit_epoch"); @@ -169,6 +183,7 @@ impl HummockManager { ); let state_table_info = &version.latest_version().state_table_info; + let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id(); let mut new_table_ids = HashMap::new(); @@ -195,6 +210,48 @@ impl HummockManager { } } + let (batch_commit_for_new_cg, compaction_group_manager_txn) = + if !batch_commit_for_new_cg.is_empty() { + let compaction_group_manager_guard = self.compaction_group_manager.write().await; + let compaction_group_config = + compaction_group_manager_guard.default_compaction_config(); + let mut compaction_group_manager = + CompactionGroupManager::start_owned_compaction_groups_txn( + compaction_group_manager_guard, + ); + let mut batch_commit_info = HashMap::new(); + for BatchCommitForNewCg { + epoch_to_ssts, + table_ids, + } in batch_commit_for_new_cg + { + let new_compaction_group_id = next_compaction_group_id(&self.env).await?; + compaction_group_manager.insert( + new_compaction_group_id, + CompactionGroup { + group_id: new_compaction_group_id, + compaction_config: compaction_group_config.clone(), + }, + ); + + on_handle_add_new_table( + state_table_info, + &table_ids, + new_compaction_group_id, + &mut table_compaction_group_mapping, + &mut new_table_ids, + )?; + + batch_commit_info.insert(new_compaction_group_id, epoch_to_ssts); + } + ( + Some((batch_commit_info, (*compaction_group_config).clone())), + Some(compaction_group_manager), + ) + } else { + (None, None) + }; + let commit_sstables = self .correct_commit_ssts(sstables, &table_compaction_group_mapping) .await?; @@ -207,6 +264,7 @@ impl HummockManager { new_table_ids, new_table_watermarks, change_log_delta, + batch_commit_for_new_cg, ); // TODO: remove the sanity check when supporting partial checkpoint @@ -287,12 +345,22 @@ impl HummockManager { &versioning.last_time_travel_snapshot_sst_ids, ) .await?; - commit_multi_var_with_provided_txn!(txn, version, version_stats)?; + commit_multi_var_with_provided_txn!( + txn, + version, + version_stats, + compaction_group_manager_txn + )?; if let Some(version_snapshot_sst_ids) = version_snapshot_sst_ids { versioning.last_time_travel_snapshot_sst_ids = version_snapshot_sst_ids; } } else { - commit_multi_var!(self.meta_store_ref(), version, version_stats)?; + commit_multi_var!( + self.meta_store_ref(), + version, + version_stats, + compaction_group_manager_txn + )?; } let snapshot = HummockSnapshot { diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index 61191722f800..d585c23e19ee 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -46,7 +46,10 @@ use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat; use crate::hummock::model::CompactionGroup; use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id}; use crate::manager::{MetaSrvEnv, MetaStoreImpl}; -use crate::model::{BTreeMapTransaction, MetadataModel, MetadataModelError}; +use crate::model::{ + BTreeMapTransaction, BTreeMapTransactionInner, DerefMutForward, MetadataModel, + MetadataModelError, +}; type CompactionGroupTransaction<'a> = BTreeMapTransaction<'a, CompactionGroupId, CompactionGroup>; @@ -693,6 +696,26 @@ impl CompactionGroupManager { CompactionGroupTransaction::new(&mut self.compaction_groups) } + pub fn start_owned_compaction_groups_txn>( + inner: P, + ) -> BTreeMapTransactionInner< + CompactionGroupId, + CompactionGroup, + DerefMutForward< + Self, + BTreeMap, + P, + impl Fn(&Self) -> &BTreeMap, + impl Fn(&mut Self) -> &mut BTreeMap, + >, + > { + BTreeMapTransactionInner::new(DerefMutForward::new( + inner, + |mgr| &mgr.compaction_groups, + |mgr| &mut mgr.compaction_groups, + )) + } + /// Tries to get compaction group config for `compaction_group_id`. pub(super) fn try_get_compaction_group_config( &self, diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 82ce16cf1034..3d7fd1ee58ee 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -68,6 +68,8 @@ mod utils; mod worker; pub(crate) use commit_epoch::*; +#[cfg(any(test, feature = "test"))] +pub use commit_epoch::{BatchCommitForNewCg, CommitEpochInfo}; use compaction::*; pub use compaction::{check_cg_write_limit, WriteLimitType}; pub(crate) use utils::*; diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index b6a41e8ffa8a..9833dbea6e3a 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -17,13 +17,17 @@ use std::ops::{Deref, DerefMut}; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::change_log::ChangeLogDelta; +use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::table_watermark::TableWatermarks; use risingwave_hummock_sdk::version::{ GroupDelta, HummockVersion, HummockVersionDelta, IntraLevelDelta, }; -use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, HummockVersionId}; -use risingwave_pb::hummock::{HummockVersionStats, StateTableInfoDelta}; +use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, HummockVersionId, LocalSstableInfo}; +use risingwave_pb::hummock::{ + CompactionConfig, CompatibilityVersion, GroupConstruct, HummockVersionStats, + StateTableInfoDelta, +}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use crate::manager::NotificationManager; @@ -109,18 +113,58 @@ impl<'a> HummockVersionTransaction<'a> { } /// Returns a duplicate delta, used by time travel. + #[expect(clippy::type_complexity)] pub(super) fn pre_commit_epoch( &mut self, - epoch: HummockEpoch, + max_committed_epoch: HummockEpoch, commit_sstables: BTreeMap>, new_table_ids: HashMap, new_table_watermarks: HashMap, change_log_delta: HashMap, + batch_commit_for_new_cg: Option<( + HashMap>>, + CompactionConfig, + )>, ) -> HummockVersionDelta { let mut new_version_delta = self.new_delta(); - new_version_delta.max_committed_epoch = epoch; + new_version_delta.max_committed_epoch = max_committed_epoch; new_version_delta.new_table_watermarks = new_table_watermarks; new_version_delta.change_log_delta = change_log_delta; + + if let Some((batch_commit_for_new_cg, compaction_group_config)) = batch_commit_for_new_cg { + for (compaction_group_id, batch_commit_sst) in batch_commit_for_new_cg { + let group_deltas = &mut new_version_delta + .group_deltas + .entry(compaction_group_id) + .or_default() + .group_deltas; + + #[expect(deprecated)] + group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct { + group_config: Some(compaction_group_config.clone()), + group_id: compaction_group_id, + parent_group_id: StaticCompactionGroupId::NewCompactionGroup + as CompactionGroupId, + new_sst_start_id: 0, // No need to set it when `NewCompactionGroup` + table_ids: vec![], + version: CompatibilityVersion::NoMemberTableIds as i32, + })); + + for (epoch, insert_ssts) in batch_commit_sst { + assert!(epoch < max_committed_epoch); + let l0_sub_level_id = epoch; + let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new( + 0, + l0_sub_level_id, + vec![], // default + insert_ssts.into_iter().map(|s| s.sst_info).collect(), + 0, // default + )); + group_deltas.push(group_delta); + } + } + } + // Append SSTs to a new version. for (compaction_group_id, inserted_table_infos) in commit_sstables { let group_deltas = &mut new_version_delta @@ -128,7 +172,7 @@ impl<'a> HummockVersionTransaction<'a> { .entry(compaction_group_id) .or_default() .group_deltas; - let l0_sub_level_id = epoch; + let l0_sub_level_id = max_committed_epoch; let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new( 0, l0_sub_level_id, @@ -143,32 +187,33 @@ impl<'a> HummockVersionTransaction<'a> { // update state table info new_version_delta.with_latest_version(|version, delta| { for (table_id, cg_id) in new_table_ids { + assert!( + !version.state_table_info.info().contains_key(&table_id), + "newly added table exists previously: {:?}", + table_id + ); delta.state_table_info_delta.insert( table_id, StateTableInfoDelta { - committed_epoch: epoch, - safe_epoch: epoch, + committed_epoch: max_committed_epoch, + safe_epoch: max_committed_epoch, compaction_group_id: cg_id, }, ); } for (table_id, info) in version.state_table_info.info() { - assert!( - delta - .state_table_info_delta - .insert( - *table_id, - StateTableInfoDelta { - committed_epoch: epoch, - safe_epoch: info.safe_epoch, - compaction_group_id: info.compaction_group_id, - } - ) - .is_none(), - "newly added table exists previously: {:?}", - table_id - ); + assert!(delta + .state_table_info_delta + .insert( + *table_id, + StateTableInfoDelta { + committed_epoch: max_committed_epoch, + safe_epoch: info.safe_epoch, + compaction_group_id: info.compaction_group_id, + } + ) + .is_none(),); } }); diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index 8499605e04fb..c821e4c911dc 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -95,6 +95,10 @@ impl MockHummockMetaClient { .await .unwrap_or(None) } + + pub fn context_id(&self) -> HummockContextId { + self.context_id + } } fn mock_err(error: super::error::Error) -> RpcError { @@ -193,6 +197,7 @@ impl HummockMetaClient for MockHummockMetaClient { version.state_table_info.info().keys().cloned().collect(), )]), epoch, + vec![], )) .await .map_err(mock_err)?; diff --git a/src/meta/src/hummock/test_utils.rs b/src/meta/src/hummock/test_utils.rs index 9a6ab6d19799..9a764f871cd6 100644 --- a/src/meta/src/hummock/test_utils.rs +++ b/src/meta/src/hummock/test_utils.rs @@ -41,6 +41,7 @@ use crate::hummock::compaction::compaction_config::CompactionConfigBuilder; use crate::hummock::compaction::selector::{default_compaction_selector, LocalSelectorStatistic}; use crate::hummock::compaction::{CompactionDeveloperConfig, CompactionSelectorContext}; use crate::hummock::level_handler::LevelHandler; +pub use crate::hummock::manager::CommitEpochInfo; use crate::hummock::model::CompactionGroup; use crate::hummock::{CompactorManager, HummockManager, HummockManagerRef}; use crate::manager::{ diff --git a/src/meta/src/model/mod.rs b/src/meta/src/model/mod.rs index 1ca22854c5de..6b5f629a7c04 100644 --- a/src/meta/src/model/mod.rs +++ b/src/meta/src/model/mod.rs @@ -462,16 +462,20 @@ enum BTreeMapOp { /// are stored in `staging`. On `commit`, it will apply the changes stored in `staging` to the in /// memory btree map. When serve `get` and `get_mut`, it merges the value stored in `staging` and /// `tree_ref`. -pub struct BTreeMapTransaction<'a, K: Ord, V> { +pub struct BTreeMapTransactionInner>> { /// A reference to the original `BTreeMap`. All access to this field should be immutable, /// except when we commit the staging changes to the original map. - tree_ref: &'a mut BTreeMap, + tree_ref: P, /// Store all the staging changes that will be applied to the original map on commit staging: BTreeMap>, } -impl<'a, K: Ord + Debug, V: Clone> BTreeMapTransaction<'a, K, V> { - pub fn new(tree_ref: &'a mut BTreeMap) -> BTreeMapTransaction<'a, K, V> { +pub type BTreeMapTransaction<'a, K, V> = BTreeMapTransactionInner>; + +impl>> + BTreeMapTransactionInner +{ + pub fn new(tree_ref: P) -> BTreeMapTransactionInner { Self { tree_ref, staging: BTreeMap::default(), @@ -481,7 +485,7 @@ impl<'a, K: Ord + Debug, V: Clone> BTreeMapTransaction<'a, K, V> { /// Start a `BTreeMapEntryTransaction` when the `key` exists #[allow(dead_code)] pub fn new_entry_txn(&mut self, key: K) -> Option> { - BTreeMapEntryTransaction::new(self.tree_ref, key, None) + BTreeMapEntryTransaction::new(&mut self.tree_ref, key, None) } /// Start a `BTreeMapEntryTransaction`. If the `key` does not exist, the the `default_val` will @@ -492,17 +496,17 @@ impl<'a, K: Ord + Debug, V: Clone> BTreeMapTransaction<'a, K, V> { key: K, default_val: V, ) -> BTreeMapEntryTransaction<'_, K, V> { - BTreeMapEntryTransaction::new(self.tree_ref, key, Some(default_val)) + BTreeMapEntryTransaction::new(&mut self.tree_ref, key, Some(default_val)) .expect("default value is provided and should return `Some`") } /// Start a `BTreeMapEntryTransaction` that inserts the `val` into `key`. pub fn new_entry_insert_txn(&mut self, key: K, val: V) -> BTreeMapEntryTransaction<'_, K, V> { - BTreeMapEntryTransaction::new_insert(self.tree_ref, key, val) + BTreeMapEntryTransaction::new_insert(&mut self.tree_ref, key, val) } pub fn tree_ref(&self) -> &BTreeMap { - self.tree_ref + &self.tree_ref } /// Get the value of the provided key by merging the staging value and the original value @@ -578,7 +582,7 @@ impl<'a, K: Ord + Debug, V: Clone> BTreeMapTransaction<'a, K, V> { } } - pub fn commit_memory(self) { + pub fn commit_memory(mut self) { // Apply each op stored in the staging to original tree. for (k, op) in self.staging { match op { @@ -593,14 +597,16 @@ impl<'a, K: Ord + Debug, V: Clone> BTreeMapTransaction<'a, K, V> { } } -impl<'a, K: Ord + Debug, V: Clone> InMemValTransaction for BTreeMapTransaction<'a, K, V> { +impl>> InMemValTransaction + for BTreeMapTransactionInner +{ fn commit(self) { self.commit_memory(); } } -impl<'a, K: Ord + Debug, V: Transactional + Clone, TXN> ValTransaction - for BTreeMapTransaction<'a, K, V> +impl + Clone, P: DerefMut>, TXN> + ValTransaction for BTreeMapTransactionInner { async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> { // Add the staging operation to txn @@ -696,6 +702,76 @@ impl<'a, K: Ord, V: PartialEq + Transactional, TXN> ValTransaction } } +impl InMemValTransaction for Option { + fn commit(self) { + if let Some(inner) = self { + inner.commit(); + } + } +} + +impl, TXN> ValTransaction for Option { + async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> { + if let Some(inner) = &self { + inner.apply_to_txn(txn).await?; + } + Ok(()) + } +} + +pub struct DerefMutForward< + Inner, + Target, + P: DerefMut, + F: Fn(&Inner) -> &Target, + FMut: Fn(&mut Inner) -> &mut Target, +> { + ptr: P, + f: F, + f_mut: FMut, +} + +impl< + Inner, + Target, + P: DerefMut, + F: Fn(&Inner) -> &Target, + FMut: Fn(&mut Inner) -> &mut Target, + > DerefMutForward +{ + pub fn new(ptr: P, f: F, f_mut: FMut) -> Self { + Self { ptr, f, f_mut } + } +} + +impl< + Inner, + Target, + P: DerefMut, + F: Fn(&Inner) -> &Target, + FMut: Fn(&mut Inner) -> &mut Target, + > Deref for DerefMutForward +{ + type Target = Target; + + fn deref(&self) -> &Self::Target { + (self.f)(&self.ptr) + } +} + +impl< + Inner, + Target, + P: DerefMut, + F: Fn(&Inner) -> &Target, + FMut: Fn(&mut Inner) -> &mut Target, + > DerefMut for DerefMutForward +{ + fn deref_mut(&mut self) -> &mut Self::Target { + (self.f_mut)(&mut self.ptr) + } +} + #[cfg(test)] mod tests { use itertools::Itertools; diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 3af0586e143a..0fe019d9afcd 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -36,6 +36,7 @@ use crate::sstable_info::SstableInfo; use crate::table_watermark::{ReadTableWatermark, TableWatermarks}; use crate::version::{ GroupDelta, GroupDeltas, HummockVersion, HummockVersionDelta, HummockVersionStateTableInfo, + IntraLevelDelta, }; use crate::{can_concat, CompactionGroupId, HummockSstableId, HummockSstableObjectId}; @@ -357,9 +358,24 @@ impl HummockVersion { new_sst_start_id: u64, ) { let mut new_sst_id = new_sst_start_id; - if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId - || !self.levels.contains_key(&parent_group_id) - { + if !self.levels.contains_key(&parent_group_id) { + warn!(parent_group_id, "non-existing parent group id to init from"); + return; + } + if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId { + if new_sst_start_id != 0 { + if cfg!(debug_assertions) { + panic!( + "non-zero sst start id {} for NewCompactionGroup", + new_sst_start_id + ); + } else { + warn!( + new_sst_start_id, + "non-zero sst start id for NewCompactionGroup" + ); + } + } return; } let [parent_levels, cur_levels] = self @@ -641,29 +657,35 @@ impl HummockVersion { let GroupDeltasSummary { delete_sst_levels, delete_sst_ids_set, - insert_sst_level_id, - insert_sub_level_id, - insert_table_infos, .. } = summary; - assert!( - insert_sst_level_id == 0 || insert_table_infos.is_empty(), - "we should only add to L0 when we commit an epoch. Inserting into {} {:?}", - insert_sst_level_id, - insert_table_infos - ); + assert!( delete_sst_levels.is_empty() && delete_sst_ids_set.is_empty() || has_destroy, "no sst should be deleted when committing an epoch" ); - if !insert_table_infos.is_empty() { - insert_new_sub_level( - &mut levels.l0, - insert_sub_level_id, - PbLevelType::Overlapping, - insert_table_infos, - None, - ); + for group_delta in &group_deltas.group_deltas { + if let GroupDelta::IntraLevel(IntraLevelDelta { + level_idx, + l0_sub_level_id, + inserted_table_infos, + .. + }) = group_delta + { + assert_eq!( + *level_idx, 0, + "we should only add to L0 when we commit an epoch." + ); + if !inserted_table_infos.is_empty() { + insert_new_sub_level( + &mut levels.l0, + *l0_sub_level_id, + PbLevelType::Overlapping, + inserted_table_infos.clone(), + None, + ); + } + } } } else { // `max_committed_epoch` is not changed. The delta is caused by compaction. diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index b9e576b547d7..b3e304305660 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -1,4 +1,4 @@ -use std::collections::HashSet; +use std::collections::{BTreeMap, HashMap, HashSet}; // Copyright 2024 RisingWave Labs // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -25,13 +25,16 @@ use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_common::range::RangeBoundsExt; use risingwave_common::util::epoch::{test_epoch, EpochExt}; +use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::key::{ gen_key_from_bytes, prefixed_range_with_vnode, FullKey, TableKey, UserKey, TABLE_PREFIX_LEN, }; +use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::table_watermark::{ TableWatermarksIndex, VnodeWatermark, WatermarkDirection, }; -use risingwave_hummock_sdk::EpochWithGap; +use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo}; +use risingwave_meta::hummock::{BatchCommitForNewCg, CommitEpochInfo}; use risingwave_rpc_client::HummockMetaClient; use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion; use risingwave_storage::hummock::store::version::read_filter_for_version; @@ -2479,3 +2482,151 @@ async fn test_table_watermark() { test_global_read(test_env.storage.clone(), epoch3).await; } + +#[tokio::test] +async fn test_commit_multi_epoch() { + let test_env = prepare_hummock_test_env().await; + let context_id = test_env.meta_client.context_id(); + let context_id_map = |object_ids: &[_]| { + HashMap::from_iter(object_ids.iter().map(|object_id| (*object_id, context_id))) + }; + let existing_table_id = TableId::new(1); + test_env.register_table_id(existing_table_id).await; + let initial_epoch = test_env + .manager + .get_current_version() + .await + .max_committed_epoch; + + let epoch1 = initial_epoch.next_epoch(); + let sst1_epoch1 = SstableInfo { + sst_id: 11, + object_id: 1, + table_ids: vec![existing_table_id.table_id], + ..Default::default() + }; + test_env + .manager + .commit_epoch(CommitEpochInfo { + sstables: vec![LocalSstableInfo { + sst_info: sst1_epoch1.clone(), + table_stats: Default::default(), + }], + new_table_watermarks: Default::default(), + sst_to_context: context_id_map(&[sst1_epoch1.object_id]), + new_table_fragment_info: None, + change_log_delta: Default::default(), + table_committed_epoch: BTreeMap::from_iter([( + epoch1, + HashSet::from_iter([existing_table_id]), + )]), + max_committed_epoch: epoch1, + batch_commit_for_new_cg: vec![], + }) + .await + .unwrap(); + + let old_cg_id_set: HashSet<_> = { + let old_version = test_env.manager.get_current_version().await; + let cg = old_version + .levels + .get(&(StaticCompactionGroupId::StateDefault as _)) + .unwrap(); + let sub_levels = &cg.l0.sub_levels; + assert_eq!(sub_levels.len(), 1); + let sub_level = &sub_levels[0]; + assert_eq!(sub_level.sub_level_id, epoch1); + assert_eq!(sub_level.table_infos.len(), 1); + assert_eq!(sub_level.table_infos[0].object_id, sst1_epoch1.object_id); + old_version.levels.keys().cloned().collect() + }; + + let epoch2 = epoch1.next_epoch(); + + let batch_commit_table_id = TableId::new(2); + + let sst_epoch2 = SstableInfo { + sst_id: 22, + object_id: 2, + table_ids: vec![existing_table_id.table_id, batch_commit_table_id.table_id], + ..Default::default() + }; + + let sst2_epoch1 = SstableInfo { + sst_id: 33, + object_id: 3, + table_ids: vec![batch_commit_table_id.table_id], + ..Default::default() + }; + + test_env + .manager + .commit_epoch(CommitEpochInfo { + sstables: vec![LocalSstableInfo { + sst_info: sst_epoch2.clone(), + table_stats: Default::default(), + }], + new_table_watermarks: Default::default(), + sst_to_context: context_id_map(&[sst_epoch2.object_id, sst2_epoch1.object_id]), + new_table_fragment_info: None, + change_log_delta: Default::default(), + table_committed_epoch: BTreeMap::from_iter([( + epoch2, + HashSet::from_iter([existing_table_id, batch_commit_table_id]), + )]), + max_committed_epoch: epoch2, + batch_commit_for_new_cg: vec![BatchCommitForNewCg { + epoch_to_ssts: BTreeMap::from_iter([( + epoch1, + vec![LocalSstableInfo { + sst_info: sst2_epoch1.clone(), + table_stats: Default::default(), + }], + )]), + table_ids: vec![batch_commit_table_id], + }], + }) + .await + .unwrap(); + + let new_version = test_env.manager.get_current_version().await; + let old_cg = new_version + .levels + .get(&(StaticCompactionGroupId::StateDefault as _)) + .unwrap(); + let sub_levels = &old_cg.l0.sub_levels; + assert_eq!(sub_levels.len(), 2); + let sub_level1 = &sub_levels[0]; + assert_eq!(sub_level1.sub_level_id, epoch1); + assert_eq!(sub_level1.table_infos.len(), 1); + assert_eq!(sub_level1.table_infos[0].object_id, sst1_epoch1.object_id); + let sub_level2 = &sub_levels[1]; + assert_eq!(sub_level2.sub_level_id, epoch2); + assert_eq!(sub_level2.table_infos.len(), 1); + assert_eq!(sub_level2.table_infos[0].object_id, sst_epoch2.object_id); + + let new_cg_id_set: HashSet<_> = new_version.levels.keys().cloned().collect(); + let added_cg_id_set = &new_cg_id_set - &old_cg_id_set; + assert_eq!(added_cg_id_set.len(), 1); + let new_cg_id = added_cg_id_set.into_iter().next().unwrap(); + + let new_cg = new_version.levels.get(&new_cg_id).unwrap(); + let sub_levels = &new_cg.l0.sub_levels; + assert_eq!(sub_levels.len(), 2); + let sub_level1 = &sub_levels[0]; + assert_eq!(sub_level1.sub_level_id, epoch1); + assert_eq!(sub_level1.table_infos.len(), 1); + assert_eq!(sub_level1.table_infos[0].object_id, sst2_epoch1.object_id); + let sub_level2 = &sub_levels[1]; + assert_eq!(sub_level2.sub_level_id, epoch2); + assert_eq!(sub_level2.table_infos.len(), 1); + assert_eq!(sub_level2.table_infos[0].object_id, sst_epoch2.object_id); + + let info = new_version + .state_table_info + .info() + .get(&batch_commit_table_id) + .unwrap(); + assert_eq!(info.committed_epoch, epoch2); + assert_eq!(info.compaction_group_id, new_cg_id); +}