From 0d71708972cad7fc4c630e6d0f208604b8bec3d4 Mon Sep 17 00:00:00 2001 From: Li0k Date: Thu, 18 Jul 2024 22:11:19 +0800 Subject: [PATCH 1/8] feat(storage): support commit multi epoch for new cg --- src/meta/src/hummock/manager/commit_epoch.rs | 115 +++++++++++++--- src/meta/src/hummock/manager/transaction.rs | 49 ++++++- .../src/hummock/mock_hummock_meta_client.rs | 58 ++++++++ src/rpc_client/src/hummock_meta_client.rs | 13 +- src/rpc_client/src/meta_client.rs | 15 +- .../hummock_test/src/hummock_storage_tests.rs | 129 +++++++++++++++++- .../src/hummock/event_handler/uploader/mod.rs | 1 + .../src/hummock/hummock_meta_client.rs | 15 +- .../hummock/store/local_hummock_storage.rs | 2 + 9 files changed, 368 insertions(+), 29 deletions(-) diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index cae89ab34d9d4..6f78d8508a06d 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -27,7 +27,7 @@ use risingwave_hummock_sdk::{ }; use risingwave_pb::hummock::compact_task::{self}; use risingwave_pb::hummock::hummock_version_delta::ChangeLogDelta; -use risingwave_pb::hummock::{HummockSnapshot, SstableInfo}; +use risingwave_pb::hummock::{CompactionConfig, HummockSnapshot, SstableInfo}; use sea_orm::TransactionTrait; use crate::hummock::error::{Error, Result}; @@ -39,7 +39,7 @@ use crate::hummock::manager::versioning::Versioning; use crate::hummock::metrics_utils::{ get_or_create_local_table_stat, trigger_local_table_stat, trigger_sst_stat, }; -use crate::hummock::sequence::next_sstable_object_id; +use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id}; use crate::hummock::{ commit_multi_var, commit_multi_var_with_provided_txn, start_measure_real_process_timer, HummockManager, @@ -52,6 +52,12 @@ pub struct NewTableFragmentInfo { pub internal_table_ids: Vec, } +pub struct BatchCommitForNewCg { + pub epoch_to_ssts: BTreeMap>, + pub table_ids: Vec, + pub compaction_group: Option<(CompactionGroupId, CompactionConfig)>, +} + pub struct CommitEpochInfo { pub sstables: Vec, pub new_table_watermarks: HashMap, @@ -60,6 +66,9 @@ pub struct CommitEpochInfo { pub change_log_delta: HashMap, pub table_committed_epoch: BTreeMap>, pub max_committed_epoch: HummockEpoch, + + // commit multi Epoch and SSTs for new compaction group + pub batch_commit_for_new_cg: Vec, } impl CommitEpochInfo { @@ -71,6 +80,28 @@ impl CommitEpochInfo { change_log_delta: HashMap, table_committed_epoch: BTreeMap>, max_committed_epoch: HummockEpoch, + ) -> Self { + Self::new_for_multi_epoch( + sstables, + new_table_watermarks, + sst_to_context, + new_table_fragment_info, + change_log_delta, + table_committed_epoch, + max_committed_epoch, + Vec::default(), + ) + } + + pub fn new_for_multi_epoch( + sstables: Vec, + new_table_watermarks: HashMap, + sst_to_context: HashMap, + new_table_fragment_info: Option, + change_log_delta: HashMap, + table_committed_epoch: BTreeMap>, + max_committed_epoch: HummockEpoch, + batch_commit_for_new_cg: Vec, ) -> Self { Self { sstables, @@ -80,6 +111,7 @@ impl CommitEpochInfo { change_log_delta, table_committed_epoch, max_committed_epoch, + batch_commit_for_new_cg, } } } @@ -126,8 +158,9 @@ impl HummockManager { sst_to_context, new_table_fragment_info, change_log_delta, - table_committed_epoch, + table_committed_epoch: _, max_committed_epoch: epoch, + mut batch_commit_for_new_cg, } = commit_info; let mut versioning_guard = self.versioning.write().await; let _timer = start_measure_real_process_timer!(self, "commit_epoch"); @@ -161,15 +194,15 @@ impl HummockManager { &self.metrics, ); - let state_table_info = &version.latest_version().state_table_info; - let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id(); + let state_table_info = version.latest_version().state_table_info.clone(); + let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id(); let mut new_table_ids = HashMap::new(); // Add new table if let Some(new_fragment_table_info) = new_table_fragment_info { if !new_fragment_table_info.internal_table_ids.is_empty() { on_handle_add_new_table( - state_table_info, + &state_table_info, &new_fragment_table_info.internal_table_ids, StaticCompactionGroupId::StateDefault as u64, &mut table_compaction_group_mapping, @@ -179,7 +212,7 @@ impl HummockManager { if let Some(mv_table_id) = new_fragment_table_info.mv_table_id { on_handle_add_new_table( - state_table_info, + &state_table_info, &[mv_table_id], StaticCompactionGroupId::MaterializedView as u64, &mut table_compaction_group_mapping, @@ -188,6 +221,45 @@ impl HummockManager { } } + let mut new_id_count = 0; + let batch_commit_for_new_cg = if !batch_commit_for_new_cg.is_empty() { + // The new config will be persisted later. + let config = self + .compaction_group_manager + .read() + .await + .default_compaction_config() + .as_ref() + .clone(); + + for BatchCommitForNewCg { + epoch_to_ssts, + table_ids, + compaction_group, + } in &mut batch_commit_for_new_cg + { + let new_compaction_group_id = next_compaction_group_id(&self.env).await?; + *compaction_group = Some((new_compaction_group_id, config.clone())); + + new_id_count += epoch_to_ssts + .iter() + .map(|(_, ssts)| ssts.len()) + .sum::(); + + on_handle_add_new_table( + &state_table_info, + &table_ids, + new_compaction_group_id as u64, + &mut table_compaction_group_mapping, + &mut new_table_ids, + )?; + } + let start_sst_id = next_sstable_object_id(&self.env, new_id_count).await?; + Some((batch_commit_for_new_cg, start_sst_id)) + } else { + None + }; + let commit_sstables = self .correct_commit_ssts(sstables, &table_compaction_group_mapping) .await?; @@ -200,23 +272,24 @@ impl HummockManager { new_table_ids, new_table_watermarks, change_log_delta, + batch_commit_for_new_cg, ); // TODO: remove the sanity check when supporting partial checkpoint - assert_eq!(1, table_committed_epoch.len()); - assert_eq!( - table_committed_epoch.iter().next().expect("non-empty"), - ( - &epoch, - &version - .latest_version() - .state_table_info - .info() - .keys() - .cloned() - .collect() - ) - ); + // assert_eq!(1, table_committed_epoch.len()); + // assert_eq!( + // table_committed_epoch.iter().next().expect("non-empty"), + // ( + // &epoch, + // &version + // .latest_version() + // .state_table_info + // .info() + // .keys() + // .cloned() + // .collect() + // ) + // ); // Apply stats changes. let mut version_stats = HummockVersionStatsTransaction::new( diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index 878870a6d8d26..dd355353628d1 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -16,16 +16,19 @@ use std::collections::{BTreeMap, HashMap}; use std::ops::{Deref, DerefMut}; use risingwave_common::catalog::TableId; +use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::table_watermark::TableWatermarks; use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, HummockVersionId}; use risingwave_pb::hummock::group_delta::DeltaType; use risingwave_pb::hummock::hummock_version_delta::ChangeLogDelta; use risingwave_pb::hummock::{ - GroupDelta, HummockVersionStats, IntraLevelDelta, SstableInfo, StateTableInfoDelta, + CompatibilityVersion, GroupConstruct, GroupDelta, HummockVersionStats, IntraLevelDelta, + SstableInfo, StateTableInfoDelta, }; use risingwave_pb::meta::subscribe_response::{Info, Operation}; +use super::BatchCommitForNewCg; use crate::manager::NotificationManager; use crate::model::{ InMemValTransaction, MetadataModelResult, Transactional, ValTransaction, VarTransaction, @@ -116,6 +119,7 @@ impl<'a> HummockVersionTransaction<'a> { new_table_ids: HashMap, new_table_watermarks: HashMap, change_log_delta: HashMap, + batch_commit_for_new_cg: Option<(Vec, u64)>, ) -> HummockVersionDelta { let mut new_version_delta = self.new_delta(); new_version_delta.max_committed_epoch = epoch; @@ -141,6 +145,49 @@ impl<'a> HummockVersionTransaction<'a> { group_deltas.push(group_delta); } + if let Some((batch_commit_for_new_cg, start_sst_id)) = batch_commit_for_new_cg { + let mut start_sst_id = start_sst_id; + + for commit_for_new_cg in batch_commit_for_new_cg { + let (compaction_group_id, compaction_group_config) = + commit_for_new_cg.compaction_group.unwrap(); + let group_deltas = &mut new_version_delta + .group_deltas + .entry(compaction_group_id) + .or_default() + .group_deltas; + + #[expect(deprecated)] + group_deltas.push(GroupDelta { + delta_type: Some(DeltaType::GroupConstruct(GroupConstruct { + group_config: Some(compaction_group_config), + group_id: compaction_group_id, + parent_group_id: StaticCompactionGroupId::StateDefault as u64, + new_sst_start_id: start_sst_id, + table_ids: vec![], + version: CompatibilityVersion::NoMemberTableIds as i32, + })), + }); + + for (epoch, insert_ssts) in commit_for_new_cg.epoch_to_ssts { + start_sst_id += insert_ssts.len() as u64; + let l0_sub_level_id = epoch; + let group_delta = GroupDelta { + delta_type: Some(DeltaType::IntraLevel(IntraLevelDelta { + level_idx: 0, + inserted_table_infos: insert_ssts + .into_iter() + .map(|s| s.sst_info) + .collect(), + l0_sub_level_id, + ..Default::default() + })), + }; + group_deltas.push(group_delta); + } + } + } + // update state table info new_version_delta.with_latest_version(|version, delta| { for (table_id, cg_id) in new_table_ids { diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index 95e5ad9a368cb..d20a87db72425 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -22,6 +22,8 @@ use async_trait::async_trait; use fail::fail_point; use futures::stream::BoxStream; use futures::{Stream, StreamExt}; +use itertools::Itertools; +use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::change_log::build_table_change_log_delta; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::version::HummockVersion; @@ -44,6 +46,7 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use tokio::task::JoinHandle; use tokio_stream::wrappers::UnboundedReceiverStream; +use super::BatchCommitForNewCg; use crate::hummock::compaction::selector::{ default_compaction_selector, CompactionSelector, SpaceReclaimCompactionSelector, }; @@ -196,6 +199,61 @@ impl HummockMetaClient for MockHummockMetaClient { Ok(()) } + async fn commit_multi_epoch( + &self, + epoch: HummockEpoch, + sync_result: SyncResult, + batch_commit: Vec<(BTreeMap>, Vec)>, + ) -> Result<()> { + let version: HummockVersion = self.hummock_manager.get_current_version().await; + let sst_to_worker = sync_result + .uncommitted_ssts + .iter() + .map(|LocalSstableInfo { sst_info, .. }| (sst_info.get_object_id(), self.context_id)) + .collect(); + let new_table_watermark = sync_result.table_watermarks; + let table_change_log = build_table_change_log_delta( + sync_result + .old_value_ssts + .into_iter() + .map(|sst| sst.sst_info), + sync_result.uncommitted_ssts.iter().map(|sst| &sst.sst_info), + &vec![epoch], + version + .state_table_info + .info() + .keys() + .map(|table_id| (table_id.table_id, 0)), + ); + + let batch_commit_for_new_cg = batch_commit + .into_iter() + .map(|(epoch_to_ssts, table_ids)| BatchCommitForNewCg { + epoch_to_ssts, + table_ids, + compaction_group: None, + }) + .collect_vec(); + + self.hummock_manager + .commit_epoch(CommitEpochInfo::new_for_multi_epoch( + sync_result.uncommitted_ssts, + new_table_watermark, + sst_to_worker, + None, + table_change_log, + BTreeMap::from_iter([( + epoch, + version.state_table_info.info().keys().cloned().collect(), + )]), + epoch, + batch_commit_for_new_cg, + )) + .await + .map_err(mock_err)?; + Ok(()) + } + async fn update_current_epoch(&self, epoch: HummockEpoch) -> Result<()> { self.hummock_manager.update_current_epoch(epoch); Ok(()) diff --git a/src/rpc_client/src/hummock_meta_client.rs b/src/rpc_client/src/hummock_meta_client.rs index 5c25a59afa7f8..daa668df7ca6e 100644 --- a/src/rpc_client/src/hummock_meta_client.rs +++ b/src/rpc_client/src/hummock_meta_client.rs @@ -12,11 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; + use async_trait::async_trait; use futures::stream::BoxStream; +use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{ - HummockEpoch, HummockSstableObjectId, HummockVersionId, SstObjectIdRange, SyncResult, + HummockEpoch, HummockSstableObjectId, HummockVersionId, LocalSstableInfo, SstObjectIdRange, + SyncResult, }; use risingwave_pb::hummock::{ HummockSnapshot, PbHummockVersion, SubscribeCompactionEventRequest, @@ -68,4 +72,11 @@ pub trait HummockMetaClient: Send + Sync + 'static { )>; async fn get_version_by_epoch(&self, epoch: HummockEpoch) -> Result; + + async fn commit_multi_epoch( + &self, + epoch: HummockEpoch, + sync_result: SyncResult, + batch_commit: Vec<(BTreeMap>, Vec)>, + ) -> Result<()>; } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index db24711ba2377..d1c3beb6183d7 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::fmt::{Debug, Display}; use std::sync::Arc; use std::thread; @@ -37,8 +37,8 @@ use risingwave_common::RW_VERSION; use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use risingwave_hummock_sdk::{ - CompactionGroupId, HummockEpoch, HummockSstableObjectId, HummockVersionId, SstObjectIdRange, - SyncResult, + CompactionGroupId, HummockEpoch, HummockSstableObjectId, HummockVersionId, LocalSstableInfo, + SstObjectIdRange, SyncResult, }; use risingwave_pb::backup_service::backup_service_client::BackupServiceClient; use risingwave_pb::backup_service::*; @@ -1561,6 +1561,15 @@ impl HummockMetaClient for MetaClient { async fn get_version_by_epoch(&self, epoch: HummockEpoch) -> Result { self.get_version_by_epoch(epoch).await } + + async fn commit_multi_epoch( + &self, + _epoch: HummockEpoch, + _sync_result: SyncResult, + _batch_commit: Vec<(BTreeMap>, Vec)>, + ) -> Result<()> { + panic!("Only meta service can commit_multi_epoch in production.") + } } #[async_trait] diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index b9e576b547d79..b526924fd6bd3 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -1,4 +1,4 @@ -use std::collections::HashSet; +use std::collections::{BTreeMap, HashMap, HashSet}; // Copyright 2024 RisingWave Labs // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -31,7 +31,8 @@ use risingwave_hummock_sdk::key::{ use risingwave_hummock_sdk::table_watermark::{ TableWatermarksIndex, VnodeWatermark, WatermarkDirection, }; -use risingwave_hummock_sdk::EpochWithGap; +use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo, SyncResult}; +use risingwave_pb::hummock::SstableInfo; use risingwave_rpc_client::HummockMetaClient; use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion; use risingwave_storage::hummock::store::version::read_filter_for_version; @@ -2479,3 +2480,127 @@ async fn test_table_watermark() { test_global_read(test_env.storage.clone(), epoch3).await; } + +#[tokio::test] +async fn test_commit_multi_epoch() { + let test_env = prepare_hummock_test_env().await; + let sr = SyncResult { + sync_size: 100, + uncommitted_ssts: vec![LocalSstableInfo { + sst_info: SstableInfo { + sst_id: 1, + object_id: 1, + ..Default::default() + }, + table_stats: Default::default(), + }], + table_watermarks: HashMap::new(), + old_value_ssts: vec![], + }; + + let epoch_1 = test_epoch(1); + let epoch_to_ssts = vec![( + BTreeMap::from_iter([( + epoch_1, + vec![LocalSstableInfo { + sst_info: SstableInfo { + sst_id: 2, + object_id: 2, + ..Default::default() + }, + table_stats: Default::default(), + }], + )]), + vec![TableId::from(2)], + )]; + + test_env + .meta_client + .commit_multi_epoch(epoch_1, sr, epoch_to_ssts) + .await + .unwrap(); + + let v = test_env + .meta_client + .hummock_manager_ref() + .get_current_version() + .await; + + let levels = &v.levels; + assert_eq!(3, levels.len()); + let new_cg_id = 5; + let ssts = &levels + .get(&new_cg_id) + .unwrap() + .get_l0() + .unwrap() + .get_sub_levels()[0] + .table_infos; + assert_eq!(1, ssts.len()); + let sst = &ssts[0]; + assert_eq!(2, sst.get_sst_id()); + assert!(v + .state_table_info + .compaction_group_member_table_ids(new_cg_id) + .contains(&TableId::from(2))); + + let epoch_2 = epoch_1.next_epoch(); + + let sr = SyncResult { + sync_size: 100, + uncommitted_ssts: vec![LocalSstableInfo { + sst_info: SstableInfo { + sst_id: 3, + object_id: 3, + ..Default::default() + }, + table_stats: Default::default(), + }], + table_watermarks: HashMap::new(), + old_value_ssts: vec![], + }; + let epoch_to_ssts = vec![( + BTreeMap::from_iter([( + epoch_2, + vec![LocalSstableInfo { + sst_info: SstableInfo { + sst_id: 4, + object_id: 4, + ..Default::default() + }, + table_stats: Default::default(), + }], + )]), + vec![TableId::from(5)], + )]; + + test_env + .meta_client + .commit_multi_epoch(epoch_2, sr, epoch_to_ssts) + .await + .unwrap(); + + let v = test_env + .meta_client + .hummock_manager_ref() + .get_current_version() + .await; + + let levels = &v.levels; + assert_eq!(4, levels.len()); + let new_cg_id = 6; + let ssts = &levels + .get(&new_cg_id) + .unwrap() + .get_l0() + .unwrap() + .get_sub_levels()[0] + .table_infos; + assert_eq!(1, ssts.len()); + let sst = &ssts[0]; + assert_eq!(4, sst.get_sst_id()); + assert!(v + .state_table_info + .compaction_group_member_table_ids(new_cg_id) + .contains(&TableId::from(5))); +} diff --git a/src/storage/src/hummock/event_handler/uploader/mod.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs index 101f54541fede..c99f9988635f3 100644 --- a/src/storage/src/hummock/event_handler/uploader/mod.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -567,6 +567,7 @@ impl LocalInstanceUnsyncData { // start syncing the imm inclusively before the `epoch` // returning data with newer data coming first fn sync(&mut self, epoch: HummockEpoch) -> Vec { + println!("sync epoch {:?}", epoch); // firstly added from old to new let mut ret = Vec::new(); while let Some(epoch_data) = self.sealed_data.back() diff --git a/src/storage/src/hummock/hummock_meta_client.rs b/src/storage/src/hummock/hummock_meta_client.rs index 9663a7787c474..67fb2394ad410 100644 --- a/src/storage/src/hummock/hummock_meta_client.rs +++ b/src/storage/src/hummock/hummock_meta_client.rs @@ -12,12 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; use std::sync::Arc; use async_trait::async_trait; use futures::stream::BoxStream; +use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::version::HummockVersion; -use risingwave_hummock_sdk::{HummockSstableObjectId, SstObjectIdRange, SyncResult}; +use risingwave_hummock_sdk::{ + HummockSstableObjectId, LocalSstableInfo, SstObjectIdRange, SyncResult, +}; use risingwave_pb::hummock::{ HummockSnapshot, PbHummockVersion, SubscribeCompactionEventRequest, VacuumTask, }; @@ -137,4 +141,13 @@ impl HummockMetaClient for MonitoredHummockMetaClient { async fn get_version_by_epoch(&self, epoch: HummockEpoch) -> Result { self.meta_client.get_version_by_epoch(epoch).await } + + async fn commit_multi_epoch( + &self, + _epoch: HummockEpoch, + _sync_result: SyncResult, + _batch_commit: Vec<(BTreeMap>, Vec)>, + ) -> Result<()> { + panic!("Only meta service can commit_multi_epoch in production.") + } } diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index 0a96f45de80a5..28c1ca2c0efce 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -496,6 +496,7 @@ impl LocalStateStore for LocalHummockStorage { } fn seal_current_epoch(&mut self, next_epoch: u64, mut opts: SealCurrentEpochOptions) { + println!("seal_current_epoch next_epoch {}", next_epoch); assert!(!self.is_dirty()); if let Some(new_level) = &opts.switch_op_consistency_level { self.mem_table.op_consistency_level.update(new_level); @@ -552,6 +553,7 @@ impl LocalHummockStorage { ) -> StorageResult { let epoch = write_options.epoch; let table_id = write_options.table_id; + println!("flush_inner table_id {:?}", table_id); let table_id_label = table_id.to_string(); self.stats From 9c47322eaba383197c6b435adbdaccf5f1f5c1b7 Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 31 Jul 2024 01:50:17 +0800 Subject: [PATCH 2/8] refine and fix test --- src/meta/src/barrier/mod.rs | 1 + src/meta/src/hummock/manager/commit_epoch.rs | 108 +++--- src/meta/src/hummock/manager/mod.rs | 2 + src/meta/src/hummock/manager/transaction.rs | 65 ++-- .../src/hummock/mock_hummock_meta_client.rs | 62 +--- src/meta/src/hummock/test_utils.rs | 1 + src/rpc_client/src/hummock_meta_client.rs | 13 +- src/rpc_client/src/meta_client.rs | 15 +- .../compaction_group/hummock_version_ext.rs | 41 ++- .../hummock_test/src/hummock_storage_tests.rs | 311 ++++++++++++------ .../src/hummock/event_handler/uploader/mod.rs | 1 - .../src/hummock/hummock_meta_client.rs | 15 +- .../hummock/store/local_hummock_storage.rs | 2 - 13 files changed, 340 insertions(+), 297 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index a168bd5c68dab..29ff4182fe338 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -1318,5 +1318,6 @@ fn collect_commit_epoch_info( table_new_change_log, BTreeMap::from_iter([(epoch, command_ctx.info.existing_table_ids())]), epoch, + vec![], ) } diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index ecea54986045c..88f2017367f74 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -28,7 +28,7 @@ use risingwave_hummock_sdk::{ CompactionGroupId, HummockContextId, HummockEpoch, HummockSstableObjectId, LocalSstableInfo, }; use risingwave_pb::hummock::compact_task::{self}; -use risingwave_pb::hummock::{CompactionConfig, HummockSnapshot}; +use risingwave_pb::hummock::HummockSnapshot; use sea_orm::TransactionTrait; use crate::hummock::error::{Error, Result}; @@ -56,7 +56,6 @@ pub struct NewTableFragmentInfo { pub struct BatchCommitForNewCg { pub epoch_to_ssts: BTreeMap>, pub table_ids: Vec, - pub compaction_group: Option<(CompactionGroupId, CompactionConfig)>, } pub struct CommitEpochInfo { @@ -81,27 +80,6 @@ impl CommitEpochInfo { change_log_delta: HashMap, table_committed_epoch: BTreeMap>, max_committed_epoch: HummockEpoch, - ) -> Self { - Self::new_for_multi_epoch( - sstables, - new_table_watermarks, - sst_to_context, - new_table_fragment_info, - change_log_delta, - table_committed_epoch, - max_committed_epoch, - Vec::default(), - ) - } - - pub fn new_for_multi_epoch( - sstables: Vec, - new_table_watermarks: HashMap, - sst_to_context: HashMap, - new_table_fragment_info: Option, - change_log_delta: HashMap, - table_committed_epoch: BTreeMap>, - max_committed_epoch: HummockEpoch, batch_commit_for_new_cg: Vec, ) -> Self { Self { @@ -119,11 +97,12 @@ impl CommitEpochInfo { impl HummockManager { #[cfg(any(test, feature = "test"))] - pub async fn commit_epoch_for_test( + pub async fn commit_epoch_with_batch_cg_for_test( &self, epoch: HummockEpoch, sstables: Vec>, sst_to_context: HashMap, + batch_commit_for_new_cg: Vec, ) -> Result<()> { let tables = self .versioning @@ -143,11 +122,23 @@ impl HummockManager { HashMap::new(), BTreeMap::from_iter([(epoch, tables)]), epoch, + batch_commit_for_new_cg, ); self.commit_epoch(info).await?; Ok(()) } + #[cfg(any(test, feature = "test"))] + pub async fn commit_epoch_for_test( + &self, + epoch: HummockEpoch, + sstables: Vec>, + sst_to_context: HashMap, + ) -> Result<()> { + self.commit_epoch_with_batch_cg_for_test(epoch, sstables, sst_to_context, vec![]) + .await + } + /// Caller should ensure `epoch` > `max_committed_epoch` pub async fn commit_epoch( &self, @@ -159,9 +150,9 @@ impl HummockManager { sst_to_context, new_table_fragment_info, change_log_delta, - table_committed_epoch: _, + table_committed_epoch, max_committed_epoch: epoch, - mut batch_commit_for_new_cg, + batch_commit_for_new_cg, } = commit_info; let mut versioning_guard = self.versioning.write().await; let _timer = start_measure_real_process_timer!(self, "commit_epoch"); @@ -198,6 +189,7 @@ impl HummockManager { let state_table_info = version.latest_version().state_table_info.clone(); let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id(); + let mut new_table_ids = HashMap::new(); // Add new table if let Some(new_fragment_table_info) = new_table_fragment_info { @@ -222,41 +214,39 @@ impl HummockManager { } } - let mut new_id_count = 0; let batch_commit_for_new_cg = if !batch_commit_for_new_cg.is_empty() { - // The new config will be persisted later. - let config = self - .compaction_group_manager - .read() - .await - .default_compaction_config() - .as_ref() - .clone(); - + let mut new_id_count = 0; + let mut batch_commit_info = HashMap::new(); for BatchCommitForNewCg { epoch_to_ssts, table_ids, - compaction_group, - } in &mut batch_commit_for_new_cg + } in batch_commit_for_new_cg { let new_compaction_group_id = next_compaction_group_id(&self.env).await?; - *compaction_group = Some((new_compaction_group_id, config.clone())); - new_id_count += epoch_to_ssts - .iter() - .map(|(_, ssts)| ssts.len()) - .sum::(); + new_id_count += epoch_to_ssts.values().map(|ssts| ssts.len()).sum::(); on_handle_add_new_table( &state_table_info, - table_ids, + &table_ids, new_compaction_group_id, &mut table_compaction_group_mapping, &mut new_table_ids, )?; + + batch_commit_info.insert(new_compaction_group_id, epoch_to_ssts); } let start_sst_id = next_sstable_object_id(&self.env, new_id_count).await?; - Some((batch_commit_for_new_cg, start_sst_id)) + Some(( + batch_commit_info, + start_sst_id, + (*self + .compaction_group_manager + .read() + .await + .default_compaction_config()) + .clone(), + )) } else { None }; @@ -277,20 +267,20 @@ impl HummockManager { ); // TODO: remove the sanity check when supporting partial checkpoint - // assert_eq!(1, table_committed_epoch.len()); - // assert_eq!( - // table_committed_epoch.iter().next().expect("non-empty"), - // ( - // &epoch, - // &version - // .latest_version() - // .state_table_info - // .info() - // .keys() - // .cloned() - // .collect() - // ) - // ); + assert_eq!(1, table_committed_epoch.len()); + assert_eq!( + table_committed_epoch.iter().next().expect("non-empty"), + ( + &epoch, + &version + .latest_version() + .state_table_info + .info() + .keys() + .cloned() + .collect() + ) + ); // Apply stats changes. let mut version_stats = HummockVersionStatsTransaction::new( diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 3bb4d5152adb9..e73744537ec91 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -68,6 +68,8 @@ mod utils; mod worker; pub(crate) use commit_epoch::*; +#[cfg(any(test, feature = "test"))] +pub use commit_epoch::{BatchCommitForNewCg, CommitEpochInfo}; use compaction::*; pub use compaction::{check_cg_write_limit, WriteLimitType}; pub(crate) use utils::*; diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index d4933e3ca903f..7191a0d18e7b2 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -23,13 +23,13 @@ use risingwave_hummock_sdk::table_watermark::TableWatermarks; use risingwave_hummock_sdk::version::{ GroupDelta, HummockVersion, HummockVersionDelta, IntraLevelDelta, }; -use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, HummockVersionId}; +use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, HummockVersionId, LocalSstableInfo}; use risingwave_pb::hummock::{ - CompatibilityVersion, GroupConstruct, HummockVersionStats, StateTableInfoDelta, + CompactionConfig, CompatibilityVersion, GroupConstruct, HummockVersionStats, + StateTableInfoDelta, }; use risingwave_pb::meta::subscribe_response::{Info, Operation}; -use super::BatchCommitForNewCg; use crate::manager::NotificationManager; use crate::model::{ InMemValTransaction, MetadataModelResult, Transactional, ValTransaction, VarTransaction, @@ -113,6 +113,7 @@ impl<'a> HummockVersionTransaction<'a> { } /// Returns a duplicate delta, used by time travel. + #[expect(clippy::type_complexity)] pub(super) fn pre_commit_epoch( &mut self, epoch: HummockEpoch, @@ -120,37 +121,23 @@ impl<'a> HummockVersionTransaction<'a> { new_table_ids: HashMap, new_table_watermarks: HashMap, change_log_delta: HashMap, - batch_commit_for_new_cg: Option<(Vec, u64)>, + batch_commit_for_new_cg: Option<( + HashMap>>, + u64, + CompactionConfig, + )>, ) -> HummockVersionDelta { let mut new_version_delta = self.new_delta(); new_version_delta.max_committed_epoch = epoch; new_version_delta.new_table_watermarks = new_table_watermarks; new_version_delta.change_log_delta = change_log_delta; - // Append SSTs to a new version. - for (compaction_group_id, inserted_table_infos) in commit_sstables { - let group_deltas = &mut new_version_delta - .group_deltas - .entry(compaction_group_id) - .or_default() - .group_deltas; - let l0_sub_level_id = epoch; - let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new( - 0, - l0_sub_level_id, - vec![], // default - inserted_table_infos, - 0, // default - )); - group_deltas.push(group_delta); - } - - if let Some((batch_commit_for_new_cg, start_sst_id)) = batch_commit_for_new_cg { + if let Some((batch_commit_for_new_cg, start_sst_id, compaction_group_config)) = + batch_commit_for_new_cg + { let mut start_sst_id = start_sst_id; - for commit_for_new_cg in batch_commit_for_new_cg { - let (compaction_group_id, compaction_group_config) = - commit_for_new_cg.compaction_group.unwrap(); + for (compaction_group_id, batch_commit_sst) in batch_commit_for_new_cg { let group_deltas = &mut new_version_delta .group_deltas .entry(compaction_group_id) @@ -159,15 +146,16 @@ impl<'a> HummockVersionTransaction<'a> { #[expect(deprecated)] group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct { - group_config: Some(compaction_group_config), + group_config: Some(compaction_group_config.clone()), group_id: compaction_group_id, - parent_group_id: StaticCompactionGroupId::StateDefault as u64, + parent_group_id: StaticCompactionGroupId::NewCompactionGroup + as CompactionGroupId, new_sst_start_id: start_sst_id, table_ids: vec![], version: CompatibilityVersion::NoMemberTableIds as i32, })); - for (epoch, insert_ssts) in commit_for_new_cg.epoch_to_ssts { + for (epoch, insert_ssts) in batch_commit_sst { start_sst_id += insert_ssts.len() as u64; let l0_sub_level_id = epoch; let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new( @@ -182,6 +170,25 @@ impl<'a> HummockVersionTransaction<'a> { } } + // Append SSTs to a new version. + for (compaction_group_id, inserted_table_infos) in commit_sstables { + let group_deltas = &mut new_version_delta + .group_deltas + .entry(compaction_group_id) + .or_default() + .group_deltas; + let l0_sub_level_id = epoch; + let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new( + 0, + l0_sub_level_id, + vec![], // default + inserted_table_infos, + 0, // default + )); + + group_deltas.push(group_delta); + } + // update state table info new_version_delta.with_latest_version(|version, delta| { for (table_id, cg_id) in new_table_ids { diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index 727329395a826..c821e4c911dc3 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -23,7 +23,6 @@ use fail::fail_point; use futures::stream::BoxStream; use futures::{Stream, StreamExt}; use itertools::Itertools; -use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::change_log::build_table_change_log_delta; use risingwave_hummock_sdk::compact_task::CompactTask; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; @@ -48,7 +47,6 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use tokio::task::JoinHandle; use tokio_stream::wrappers::UnboundedReceiverStream; -use super::BatchCommitForNewCg; use crate::hummock::compaction::selector::{ default_compaction_selector, CompactionSelector, SpaceReclaimCompactionSelector, }; @@ -97,6 +95,10 @@ impl MockHummockMetaClient { .await .unwrap_or(None) } + + pub fn context_id(&self) -> HummockContextId { + self.context_id + } } fn mock_err(error: super::error::Error) -> RpcError { @@ -195,61 +197,7 @@ impl HummockMetaClient for MockHummockMetaClient { version.state_table_info.info().keys().cloned().collect(), )]), epoch, - )) - .await - .map_err(mock_err)?; - Ok(()) - } - - async fn commit_multi_epoch( - &self, - epoch: HummockEpoch, - sync_result: SyncResult, - batch_commit: Vec<(BTreeMap>, Vec)>, - ) -> Result<()> { - let version: HummockVersion = self.hummock_manager.get_current_version().await; - let sst_to_worker = sync_result - .uncommitted_ssts - .iter() - .map(|LocalSstableInfo { sst_info, .. }| (sst_info.object_id, self.context_id)) - .collect(); - let new_table_watermark = sync_result.table_watermarks; - let table_change_log = build_table_change_log_delta( - sync_result - .old_value_ssts - .into_iter() - .map(|sst| sst.sst_info), - sync_result.uncommitted_ssts.iter().map(|sst| &sst.sst_info), - &vec![epoch], - version - .state_table_info - .info() - .keys() - .map(|table_id| (table_id.table_id, 0)), - ); - - let batch_commit_for_new_cg = batch_commit - .into_iter() - .map(|(epoch_to_ssts, table_ids)| BatchCommitForNewCg { - epoch_to_ssts, - table_ids, - compaction_group: None, - }) - .collect_vec(); - - self.hummock_manager - .commit_epoch(CommitEpochInfo::new_for_multi_epoch( - sync_result.uncommitted_ssts, - new_table_watermark, - sst_to_worker, - None, - table_change_log, - BTreeMap::from_iter([( - epoch, - version.state_table_info.info().keys().cloned().collect(), - )]), - epoch, - batch_commit_for_new_cg, + vec![], )) .await .map_err(mock_err)?; diff --git a/src/meta/src/hummock/test_utils.rs b/src/meta/src/hummock/test_utils.rs index 84c19cfd4d936..1793ae4c5dc29 100644 --- a/src/meta/src/hummock/test_utils.rs +++ b/src/meta/src/hummock/test_utils.rs @@ -41,6 +41,7 @@ use crate::hummock::compaction::compaction_config::CompactionConfigBuilder; use crate::hummock::compaction::selector::{default_compaction_selector, LocalSelectorStatistic}; use crate::hummock::compaction::{CompactionDeveloperConfig, CompactionSelectorContext}; use crate::hummock::level_handler::LevelHandler; +pub use crate::hummock::manager::CommitEpochInfo; use crate::hummock::model::CompactionGroup; use crate::hummock::{CompactorManager, HummockManager, HummockManagerRef}; use crate::manager::{ diff --git a/src/rpc_client/src/hummock_meta_client.rs b/src/rpc_client/src/hummock_meta_client.rs index daa668df7ca6e..5c25a59afa7f8 100644 --- a/src/rpc_client/src/hummock_meta_client.rs +++ b/src/rpc_client/src/hummock_meta_client.rs @@ -12,15 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; - use async_trait::async_trait; use futures::stream::BoxStream; -use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{ - HummockEpoch, HummockSstableObjectId, HummockVersionId, LocalSstableInfo, SstObjectIdRange, - SyncResult, + HummockEpoch, HummockSstableObjectId, HummockVersionId, SstObjectIdRange, SyncResult, }; use risingwave_pb::hummock::{ HummockSnapshot, PbHummockVersion, SubscribeCompactionEventRequest, @@ -72,11 +68,4 @@ pub trait HummockMetaClient: Send + Sync + 'static { )>; async fn get_version_by_epoch(&self, epoch: HummockEpoch) -> Result; - - async fn commit_multi_epoch( - &self, - epoch: HummockEpoch, - sync_result: SyncResult, - batch_commit: Vec<(BTreeMap>, Vec)>, - ) -> Result<()>; } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 75a2c4aa84cbd..951ed56f64fed 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::fmt::{Debug, Display}; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::Relaxed; @@ -39,8 +39,8 @@ use risingwave_common::RW_VERSION; use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use risingwave_hummock_sdk::{ - CompactionGroupId, HummockEpoch, HummockSstableObjectId, HummockVersionId, LocalSstableInfo, - SstObjectIdRange, SyncResult, + CompactionGroupId, HummockEpoch, HummockSstableObjectId, HummockVersionId, SstObjectIdRange, + SyncResult, }; use risingwave_pb::backup_service::backup_service_client::BackupServiceClient; use risingwave_pb::backup_service::*; @@ -1572,15 +1572,6 @@ impl HummockMetaClient for MetaClient { async fn get_version_by_epoch(&self, epoch: HummockEpoch) -> Result { self.get_version_by_epoch(epoch).await } - - async fn commit_multi_epoch( - &self, - _epoch: HummockEpoch, - _sync_result: SyncResult, - _batch_commit: Vec<(BTreeMap>, Vec)>, - ) -> Result<()> { - panic!("Only meta service can commit_multi_epoch in production.") - } } #[async_trait] diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index b98e2d1abd786..03f0be2c57dda 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -36,6 +36,7 @@ use crate::sstable_info::SstableInfo; use crate::table_watermark::{ReadTableWatermark, TableWatermarks}; use crate::version::{ GroupDelta, GroupDeltas, HummockVersion, HummockVersionDelta, HummockVersionStateTableInfo, + IntraLevelDelta, }; use crate::{can_concat, CompactionGroupId, HummockSstableId, HummockSstableObjectId}; @@ -656,29 +657,35 @@ impl HummockVersion { let GroupDeltasSummary { delete_sst_levels, delete_sst_ids_set, - insert_sst_level_id, - insert_sub_level_id, - insert_table_infos, .. } = summary; - assert!( - insert_sst_level_id == 0 || insert_table_infos.is_empty(), - "we should only add to L0 when we commit an epoch. Inserting into {} {:?}", - insert_sst_level_id, - insert_table_infos - ); + assert!( delete_sst_levels.is_empty() && delete_sst_ids_set.is_empty() || has_destroy, "no sst should be deleted when committing an epoch" ); - if !insert_table_infos.is_empty() { - insert_new_sub_level( - levels.l0.as_mut().unwrap(), - insert_sub_level_id, - PbLevelType::Overlapping, - insert_table_infos, - None, - ); + for group_delta in &group_deltas.group_deltas { + if let GroupDelta::IntraLevel(IntraLevelDelta { + level_idx, + l0_sub_level_id, + inserted_table_infos, + .. + }) = group_delta + { + assert_eq!( + *level_idx, 0, + "we should only add to L0 when we commit an epoch." + ); + if !inserted_table_infos.is_empty() { + insert_new_sub_level( + levels.l0.as_mut().unwrap(), + *l0_sub_level_id, + PbLevelType::Overlapping, + inserted_table_infos.clone(), + None, + ); + } + } } } else { // `max_committed_epoch` is not changed. The delta is caused by compaction. diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index b3fc32fe910fc..527b92e6a6b94 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -25,6 +25,7 @@ use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_common::range::RangeBoundsExt; use risingwave_common::util::epoch::{test_epoch, EpochExt}; +use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::key::{ gen_key_from_bytes, prefixed_range_with_vnode, FullKey, TableKey, UserKey, TABLE_PREFIX_LEN, }; @@ -32,7 +33,8 @@ use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::table_watermark::{ TableWatermarksIndex, VnodeWatermark, WatermarkDirection, }; -use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo, SyncResult}; +use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo}; +use risingwave_meta::hummock::{BatchCommitForNewCg, CommitEpochInfo}; use risingwave_rpc_client::HummockMetaClient; use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion; use risingwave_storage::hummock::store::version::read_filter_for_version; @@ -2484,111 +2486,232 @@ async fn test_table_watermark() { #[tokio::test] async fn test_commit_multi_epoch() { let test_env = prepare_hummock_test_env().await; - let sr = SyncResult { - sync_size: 100, - uncommitted_ssts: vec![LocalSstableInfo { - sst_info: SstableInfo { - sst_id: 1, - object_id: 1, - ..Default::default() - }, - table_stats: Default::default(), - }], - table_watermarks: HashMap::new(), - old_value_ssts: vec![], + let context_id = test_env.meta_client.context_id(); + let context_id_map = |object_ids: &[_]| { + HashMap::from_iter(object_ids.iter().map(|object_id| (*object_id, context_id))) }; + let existing_table_id = TableId::new(1); + test_env.register_table_id(existing_table_id).await; + let initial_epoch = test_env + .manager + .get_current_version() + .await + .max_committed_epoch; - let epoch_1 = test_epoch(1); - let epoch_to_ssts = vec![( - BTreeMap::from_iter([( - epoch_1, - vec![LocalSstableInfo { - sst_info: SstableInfo { - sst_id: 2, - object_id: 2, - ..Default::default() - }, + let epoch1 = initial_epoch.next_epoch(); + let sst1_epoch1 = SstableInfo { + sst_id: 1, + object_id: 1, + table_ids: vec![existing_table_id.table_id], + ..Default::default() + }; + test_env + .manager + .commit_epoch(CommitEpochInfo { + sstables: vec![LocalSstableInfo { + sst_info: sst1_epoch1.clone(), table_stats: Default::default(), }], - )]), - vec![TableId::from(2)], - )]; - - test_env - .meta_client - .commit_multi_epoch(epoch_1, sr, epoch_to_ssts) + new_table_watermarks: Default::default(), + sst_to_context: context_id_map(&[sst1_epoch1.object_id]), + new_table_fragment_info: None, + change_log_delta: Default::default(), + table_committed_epoch: BTreeMap::from_iter([( + epoch1, + HashSet::from_iter([existing_table_id]), + )]), + max_committed_epoch: epoch1, + batch_commit_for_new_cg: vec![], + }) .await .unwrap(); - let v = test_env - .meta_client - .hummock_manager_ref() - .get_current_version() - .await; + let old_cg_id_set: HashSet<_> = { + let old_version = test_env.manager.get_current_version().await; + let cg = old_version + .levels + .get(&(StaticCompactionGroupId::StateDefault as _)) + .unwrap(); + let sub_levels = &cg.l0.as_ref().unwrap().sub_levels; + assert_eq!(sub_levels.len(), 1); + let sub_level = &sub_levels[0]; + assert_eq!(sub_level.sub_level_id, epoch1); + assert_eq!(sub_level.table_infos.len(), 1); + assert_eq!(sub_level.table_infos[0], sst1_epoch1); + old_version.levels.keys().cloned().collect() + }; - let levels = &v.levels; - assert_eq!(3, levels.len()); - let new_cg_id = 5; - let ssts = &levels.get(&new_cg_id).unwrap().level0().sub_levels[0].table_infos; - assert_eq!(1, ssts.len()); - let sst = &ssts[0]; - assert_eq!(2, sst.sst_id); - assert!(v - .state_table_info - .compaction_group_member_table_ids(new_cg_id) - .contains(&TableId::from(2))); - - let epoch_2 = epoch_1.next_epoch(); - - let sr = SyncResult { - sync_size: 100, - uncommitted_ssts: vec![LocalSstableInfo { - sst_info: SstableInfo { - sst_id: 3, - object_id: 3, - ..Default::default() - }, - table_stats: Default::default(), - }], - table_watermarks: HashMap::new(), - old_value_ssts: vec![], + let epoch2 = epoch1.next_epoch(); + + let batch_commit_table_id = TableId::new(2); + + let sst_epoch2 = SstableInfo { + sst_id: 2, + object_id: 2, + table_ids: vec![existing_table_id.table_id, batch_commit_table_id.table_id], + ..Default::default() + }; + + let sst2_epoch1 = SstableInfo { + sst_id: 3, + object_id: 3, + table_ids: vec![batch_commit_table_id.table_id], + ..Default::default() }; - let epoch_to_ssts = vec![( - BTreeMap::from_iter([( - epoch_2, - vec![LocalSstableInfo { - sst_info: SstableInfo { - sst_id: 4, - object_id: 4, - ..Default::default() - }, - table_stats: Default::default(), - }], - )]), - vec![TableId::from(5)], - )]; test_env - .meta_client - .commit_multi_epoch(epoch_2, sr, epoch_to_ssts) + .manager + .commit_epoch(CommitEpochInfo { + sstables: vec![LocalSstableInfo { + sst_info: sst_epoch2.clone(), + table_stats: Default::default(), + }], + new_table_watermarks: Default::default(), + sst_to_context: context_id_map(&[sst_epoch2.object_id, sst2_epoch1.object_id]), + new_table_fragment_info: None, + change_log_delta: Default::default(), + table_committed_epoch: BTreeMap::from_iter([( + epoch2, + HashSet::from_iter([existing_table_id, batch_commit_table_id]), + )]), + max_committed_epoch: epoch2, + batch_commit_for_new_cg: vec![BatchCommitForNewCg { + epoch_to_ssts: BTreeMap::from_iter([( + epoch1, + vec![LocalSstableInfo { + sst_info: sst2_epoch1.clone(), + table_stats: Default::default(), + }], + )]), + table_ids: vec![batch_commit_table_id], + }], + }) .await .unwrap(); - let v = test_env - .meta_client - .hummock_manager_ref() - .get_current_version() - .await; - - let levels = &v.levels; - assert_eq!(4, levels.len()); - let new_cg_id = 6; - let ssts = &levels.get(&new_cg_id).unwrap().level0().sub_levels[0].table_infos; - assert_eq!(1, ssts.len()); - let sst = &ssts[0]; - assert_eq!(4, sst.sst_id); - assert!(v - .state_table_info - .compaction_group_member_table_ids(new_cg_id) - .contains(&TableId::from(5))); + let new_version = test_env.manager.get_current_version().await; + let old_cg = new_version + .levels + .get(&(StaticCompactionGroupId::StateDefault as _)) + .unwrap(); + let sub_levels = &old_cg.l0.as_ref().unwrap().sub_levels; + assert_eq!(sub_levels.len(), 2); + let sub_level1 = &sub_levels[0]; + assert_eq!(sub_level1.sub_level_id, epoch1); + assert_eq!(sub_level1.table_infos.len(), 1); + assert_eq!(sub_level1.table_infos[0], sst1_epoch1); + let sub_level2 = &sub_levels[1]; + assert_eq!(sub_level2.sub_level_id, epoch2); + assert_eq!(sub_level2.table_infos.len(), 1); + assert_eq!(sub_level2.table_infos[0].object_id, sst_epoch2.object_id); + + let new_cg_id_set: HashSet<_> = new_version.levels.keys().cloned().collect(); + let added_cg_id_set = &new_cg_id_set - &old_cg_id_set; + assert_eq!(added_cg_id_set.len(), 1); + let new_cg_id = added_cg_id_set.into_iter().next().unwrap(); + + let new_cg = new_version.levels.get(&new_cg_id).unwrap(); + let sub_levels = &new_cg.l0.as_ref().unwrap().sub_levels; + assert_eq!(sub_levels.len(), 2); + let sub_level1 = &sub_levels[0]; + assert_eq!(sub_level1.sub_level_id, epoch1); + assert_eq!(sub_level1.table_infos.len(), 1); + assert_eq!(sub_level1.table_infos[0], sst2_epoch1); + let sub_level2 = &sub_levels[1]; + assert_eq!(sub_level2.sub_level_id, epoch2); + assert_eq!(sub_level2.table_infos.len(), 1); + assert_eq!(sub_level2.table_infos[0].object_id, sst_epoch2.object_id); + + // let epoch_to_ssts = vec![( + // BTreeMap::from_iter([( + // epoch_1, + // vec![LocalSstableInfo { + // sst_info: SstableInfo { + // sst_id: 2, + // object_id: 2, + // ..Default::default() + // }, + // table_stats: Default::default(), + // }], + // )]), + // vec![TableId::from(2)], + // )]; + // + // test_env + // .meta_client + // .commit_multi_epoch(epoch_1, sr, epoch_to_ssts) + // .await + // .unwrap(); + // + // let v = test_env + // .meta_client + // .hummock_manager_ref() + // .get_current_version() + // .await; + // + // let levels = &v.levels; + // assert_eq!(3, levels.len()); + // let new_cg_id = 5; + // let ssts = &levels.get(&new_cg_id).unwrap().level0().sub_levels[0].table_infos; + // assert_eq!(1, ssts.len()); + // let sst = &ssts[0]; + // assert_eq!(2, sst.sst_id); + // assert!(v + // .state_table_info + // .compaction_group_member_table_ids(new_cg_id) + // .contains(&TableId::from(2))); + // + // let epoch_2 = epoch_1.next_epoch(); + // + // let sr = SyncResult { + // sync_size: 100, + // uncommitted_ssts: vec![LocalSstableInfo { + // sst_info: SstableInfo { + // sst_id: 3, + // object_id: 3, + // ..Default::default() + // }, + // table_stats: Default::default(), + // }], + // table_watermarks: HashMap::new(), + // old_value_ssts: vec![], + // }; + // let epoch_to_ssts = vec![( + // BTreeMap::from_iter([( + // epoch_2, + // vec![LocalSstableInfo { + // sst_info: SstableInfo { + // sst_id: 4, + // object_id: 4, + // ..Default::default() + // }, + // table_stats: Default::default(), + // }], + // )]), + // vec![TableId::from(5)], + // )]; + // + // test_env + // .meta_client + // .commit_multi_epoch(epoch_2, sr, epoch_to_ssts) + // .await + // .unwrap(); + // + // let v = test_env + // .meta_client + // .hummock_manager_ref() + // .get_current_version() + // .await; + // + // let levels = &v.levels; + // assert_eq!(4, levels.len()); + // let new_cg_id = 6; + // let ssts = &levels.get(&new_cg_id).unwrap().level0().sub_levels[0].table_infos; + // assert_eq!(1, ssts.len()); + // let sst = &ssts[0]; + // assert_eq!(4, sst.sst_id); + // assert!(v + // .state_table_info + // .compaction_group_member_table_ids(new_cg_id) + // .contains(&TableId::from(5))); } diff --git a/src/storage/src/hummock/event_handler/uploader/mod.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs index 6824d89ff8e07..884b29de5edd3 100644 --- a/src/storage/src/hummock/event_handler/uploader/mod.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -567,7 +567,6 @@ impl LocalInstanceUnsyncData { // start syncing the imm inclusively before the `epoch` // returning data with newer data coming first fn sync(&mut self, epoch: HummockEpoch) -> Vec { - println!("sync epoch {:?}", epoch); // firstly added from old to new let mut ret = Vec::new(); while let Some(epoch_data) = self.sealed_data.back() diff --git a/src/storage/src/hummock/hummock_meta_client.rs b/src/storage/src/hummock/hummock_meta_client.rs index 67fb2394ad410..9663a7787c474 100644 --- a/src/storage/src/hummock/hummock_meta_client.rs +++ b/src/storage/src/hummock/hummock_meta_client.rs @@ -12,16 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; use std::sync::Arc; use async_trait::async_trait; use futures::stream::BoxStream; -use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::version::HummockVersion; -use risingwave_hummock_sdk::{ - HummockSstableObjectId, LocalSstableInfo, SstObjectIdRange, SyncResult, -}; +use risingwave_hummock_sdk::{HummockSstableObjectId, SstObjectIdRange, SyncResult}; use risingwave_pb::hummock::{ HummockSnapshot, PbHummockVersion, SubscribeCompactionEventRequest, VacuumTask, }; @@ -141,13 +137,4 @@ impl HummockMetaClient for MonitoredHummockMetaClient { async fn get_version_by_epoch(&self, epoch: HummockEpoch) -> Result { self.meta_client.get_version_by_epoch(epoch).await } - - async fn commit_multi_epoch( - &self, - _epoch: HummockEpoch, - _sync_result: SyncResult, - _batch_commit: Vec<(BTreeMap>, Vec)>, - ) -> Result<()> { - panic!("Only meta service can commit_multi_epoch in production.") - } } diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index 9b74c31441a63..ae0d775219c53 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -501,7 +501,6 @@ impl LocalStateStore for LocalHummockStorage { } fn seal_current_epoch(&mut self, next_epoch: u64, mut opts: SealCurrentEpochOptions) { - println!("seal_current_epoch next_epoch {}", next_epoch); assert!(!self.is_dirty()); if let Some(new_level) = &opts.switch_op_consistency_level { self.mem_table.op_consistency_level.update(new_level); @@ -558,7 +557,6 @@ impl LocalHummockStorage { ) -> StorageResult { let epoch = write_options.epoch; let table_id = write_options.table_id; - println!("flush_inner table_id {:?}", table_id); let table_id_label = table_id.to_string(); self.stats From 006a54cb45cb04d995945c9069494a21224e480c Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 31 Jul 2024 02:53:55 +0800 Subject: [PATCH 3/8] add to compaction group manager --- src/meta/src/hummock/manager/commit_epoch.rs | 34 +++++++++++++++----- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 88f2017367f74..e8f5d3e496633 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -40,6 +40,7 @@ use crate::hummock::manager::versioning::Versioning; use crate::hummock::metrics_utils::{ get_or_create_local_table_stat, trigger_local_table_stat, trigger_sst_stat, }; +use crate::hummock::model::CompactionGroup; use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id}; use crate::hummock::{ commit_multi_var, commit_multi_var_with_provided_txn, start_measure_real_process_timer, @@ -155,6 +156,8 @@ impl HummockManager { batch_commit_for_new_cg, } = commit_info; let mut versioning_guard = self.versioning.write().await; + let mut compaction_group_manager_guard = self.compaction_group_manager.write().await; + let compaction_group_config = compaction_group_manager_guard.default_compaction_config(); let _timer = start_measure_real_process_timer!(self, "commit_epoch"); // Prevent commit new epochs if this flag is set if versioning_guard.disable_commit_epochs { @@ -185,6 +188,8 @@ impl HummockManager { self.env.notification_manager(), &self.metrics, ); + let mut compaction_group_manager = + compaction_group_manager_guard.start_compaction_groups_txn(); let state_table_info = version.latest_version().state_table_info.clone(); @@ -223,6 +228,13 @@ impl HummockManager { } in batch_commit_for_new_cg { let new_compaction_group_id = next_compaction_group_id(&self.env).await?; + compaction_group_manager.insert( + new_compaction_group_id, + CompactionGroup { + group_id: new_compaction_group_id, + compaction_config: compaction_group_config.clone(), + }, + ); new_id_count += epoch_to_ssts.values().map(|ssts| ssts.len()).sum::(); @@ -240,12 +252,7 @@ impl HummockManager { Some(( batch_commit_info, start_sst_id, - (*self - .compaction_group_manager - .read() - .await - .default_compaction_config()) - .clone(), + (*compaction_group_config).clone(), )) } else { None @@ -343,12 +350,22 @@ impl HummockManager { &versioning.last_time_travel_snapshot_sst_ids, ) .await?; - commit_multi_var_with_provided_txn!(txn, version, version_stats)?; + commit_multi_var_with_provided_txn!( + txn, + version, + version_stats, + compaction_group_manager + )?; if let Some(version_snapshot_sst_ids) = version_snapshot_sst_ids { versioning.last_time_travel_snapshot_sst_ids = version_snapshot_sst_ids; } } else { - commit_multi_var!(self.meta_store_ref(), version, version_stats)?; + commit_multi_var!( + self.meta_store_ref(), + version, + version_stats, + compaction_group_manager + )?; } let snapshot = HummockSnapshot { @@ -369,6 +386,7 @@ impl HummockManager { } drop(versioning_guard); + drop(compaction_group_manager_guard); tracing::trace!("new committed epoch {}", epoch); // Don't trigger compactions if we enable deterministic compaction From e1b188cd1dd8d4b5199273d96c90d6e26969a361 Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 31 Jul 2024 12:54:49 +0800 Subject: [PATCH 4/8] remove dead code --- .../hummock_test/src/hummock_storage_tests.rs | 93 ------------------- 1 file changed, 93 deletions(-) diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 527b92e6a6b94..63bcff21bdc32 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -2621,97 +2621,4 @@ async fn test_commit_multi_epoch() { assert_eq!(sub_level2.sub_level_id, epoch2); assert_eq!(sub_level2.table_infos.len(), 1); assert_eq!(sub_level2.table_infos[0].object_id, sst_epoch2.object_id); - - // let epoch_to_ssts = vec![( - // BTreeMap::from_iter([( - // epoch_1, - // vec![LocalSstableInfo { - // sst_info: SstableInfo { - // sst_id: 2, - // object_id: 2, - // ..Default::default() - // }, - // table_stats: Default::default(), - // }], - // )]), - // vec![TableId::from(2)], - // )]; - // - // test_env - // .meta_client - // .commit_multi_epoch(epoch_1, sr, epoch_to_ssts) - // .await - // .unwrap(); - // - // let v = test_env - // .meta_client - // .hummock_manager_ref() - // .get_current_version() - // .await; - // - // let levels = &v.levels; - // assert_eq!(3, levels.len()); - // let new_cg_id = 5; - // let ssts = &levels.get(&new_cg_id).unwrap().level0().sub_levels[0].table_infos; - // assert_eq!(1, ssts.len()); - // let sst = &ssts[0]; - // assert_eq!(2, sst.sst_id); - // assert!(v - // .state_table_info - // .compaction_group_member_table_ids(new_cg_id) - // .contains(&TableId::from(2))); - // - // let epoch_2 = epoch_1.next_epoch(); - // - // let sr = SyncResult { - // sync_size: 100, - // uncommitted_ssts: vec![LocalSstableInfo { - // sst_info: SstableInfo { - // sst_id: 3, - // object_id: 3, - // ..Default::default() - // }, - // table_stats: Default::default(), - // }], - // table_watermarks: HashMap::new(), - // old_value_ssts: vec![], - // }; - // let epoch_to_ssts = vec![( - // BTreeMap::from_iter([( - // epoch_2, - // vec![LocalSstableInfo { - // sst_info: SstableInfo { - // sst_id: 4, - // object_id: 4, - // ..Default::default() - // }, - // table_stats: Default::default(), - // }], - // )]), - // vec![TableId::from(5)], - // )]; - // - // test_env - // .meta_client - // .commit_multi_epoch(epoch_2, sr, epoch_to_ssts) - // .await - // .unwrap(); - // - // let v = test_env - // .meta_client - // .hummock_manager_ref() - // .get_current_version() - // .await; - // - // let levels = &v.levels; - // assert_eq!(4, levels.len()); - // let new_cg_id = 6; - // let ssts = &levels.get(&new_cg_id).unwrap().level0().sub_levels[0].table_infos; - // assert_eq!(1, ssts.len()); - // let sst = &ssts[0]; - // assert_eq!(4, sst.sst_id); - // assert!(v - // .state_table_info - // .compaction_group_member_table_ids(new_cg_id) - // .contains(&TableId::from(5))); } From 8666c2d442b69699ce6be18c1905e03a9607f92a Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 31 Jul 2024 16:17:31 +0800 Subject: [PATCH 5/8] lazily start compaction group transaction --- src/meta/src/hummock/manager/commit_epoch.rs | 97 +++++++++-------- .../manager/compaction_group_manager.rs | 22 +++- src/meta/src/model/mod.rs | 100 +++++++++++++++--- 3 files changed, 161 insertions(+), 58 deletions(-) diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index e8f5d3e496633..a2923d4f74e05 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -32,6 +32,7 @@ use risingwave_pb::hummock::HummockSnapshot; use sea_orm::TransactionTrait; use crate::hummock::error::{Error, Result}; +use crate::hummock::manager::compaction_group_manager::CompactionGroupManager; use crate::hummock::manager::time_travel::require_sql_meta_store_err; use crate::hummock::manager::transaction::{ HummockVersionStatsTransaction, HummockVersionTransaction, @@ -156,8 +157,6 @@ impl HummockManager { batch_commit_for_new_cg, } = commit_info; let mut versioning_guard = self.versioning.write().await; - let mut compaction_group_manager_guard = self.compaction_group_manager.write().await; - let compaction_group_config = compaction_group_manager_guard.default_compaction_config(); let _timer = start_measure_real_process_timer!(self, "commit_epoch"); // Prevent commit new epochs if this flag is set if versioning_guard.disable_commit_epochs { @@ -188,8 +187,6 @@ impl HummockManager { self.env.notification_manager(), &self.metrics, ); - let mut compaction_group_manager = - compaction_group_manager_guard.start_compaction_groups_txn(); let state_table_info = version.latest_version().state_table_info.clone(); @@ -219,44 +216,55 @@ impl HummockManager { } } - let batch_commit_for_new_cg = if !batch_commit_for_new_cg.is_empty() { - let mut new_id_count = 0; - let mut batch_commit_info = HashMap::new(); - for BatchCommitForNewCg { - epoch_to_ssts, - table_ids, - } in batch_commit_for_new_cg - { - let new_compaction_group_id = next_compaction_group_id(&self.env).await?; - compaction_group_manager.insert( - new_compaction_group_id, - CompactionGroup { - group_id: new_compaction_group_id, - compaction_config: compaction_group_config.clone(), - }, - ); - - new_id_count += epoch_to_ssts.values().map(|ssts| ssts.len()).sum::(); - - on_handle_add_new_table( - &state_table_info, - &table_ids, - new_compaction_group_id, - &mut table_compaction_group_mapping, - &mut new_table_ids, - )?; - - batch_commit_info.insert(new_compaction_group_id, epoch_to_ssts); - } - let start_sst_id = next_sstable_object_id(&self.env, new_id_count).await?; - Some(( - batch_commit_info, - start_sst_id, - (*compaction_group_config).clone(), - )) - } else { - None - }; + let (batch_commit_for_new_cg, compaction_group_manager_txn) = + if !batch_commit_for_new_cg.is_empty() { + let compaction_group_manager_guard = self.compaction_group_manager.write().await; + let compaction_group_config = + compaction_group_manager_guard.default_compaction_config(); + let mut compaction_group_manager = + CompactionGroupManager::start_owned_compaction_groups_txn( + compaction_group_manager_guard, + ); + let mut new_id_count = 0; + let mut batch_commit_info = HashMap::new(); + for BatchCommitForNewCg { + epoch_to_ssts, + table_ids, + } in batch_commit_for_new_cg + { + let new_compaction_group_id = next_compaction_group_id(&self.env).await?; + compaction_group_manager.insert( + new_compaction_group_id, + CompactionGroup { + group_id: new_compaction_group_id, + compaction_config: compaction_group_config.clone(), + }, + ); + + new_id_count += epoch_to_ssts.values().map(|ssts| ssts.len()).sum::(); + + on_handle_add_new_table( + &state_table_info, + &table_ids, + new_compaction_group_id, + &mut table_compaction_group_mapping, + &mut new_table_ids, + )?; + + batch_commit_info.insert(new_compaction_group_id, epoch_to_ssts); + } + let start_sst_id = next_sstable_object_id(&self.env, new_id_count).await?; + ( + Some(( + batch_commit_info, + start_sst_id, + (*compaction_group_config).clone(), + )), + Some(compaction_group_manager), + ) + } else { + (None, None) + }; let commit_sstables = self .correct_commit_ssts(sstables, &table_compaction_group_mapping) @@ -354,7 +362,7 @@ impl HummockManager { txn, version, version_stats, - compaction_group_manager + compaction_group_manager_txn )?; if let Some(version_snapshot_sst_ids) = version_snapshot_sst_ids { versioning.last_time_travel_snapshot_sst_ids = version_snapshot_sst_ids; @@ -364,7 +372,7 @@ impl HummockManager { self.meta_store_ref(), version, version_stats, - compaction_group_manager + compaction_group_manager_txn )?; } @@ -386,7 +394,6 @@ impl HummockManager { } drop(versioning_guard); - drop(compaction_group_manager_guard); tracing::trace!("new committed epoch {}", epoch); // Don't trigger compactions if we enable deterministic compaction diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index 61191722f800a..7ff0b691bd66f 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -46,7 +46,7 @@ use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat; use crate::hummock::model::CompactionGroup; use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id}; use crate::manager::{MetaSrvEnv, MetaStoreImpl}; -use crate::model::{BTreeMapTransaction, MetadataModel, MetadataModelError}; +use crate::model::{BTreeMapTransaction, BTreeMapTransactionInner, DerefMutForward, MetadataModel, MetadataModelError}; type CompactionGroupTransaction<'a> = BTreeMapTransaction<'a, CompactionGroupId, CompactionGroup>; @@ -693,6 +693,26 @@ impl CompactionGroupManager { CompactionGroupTransaction::new(&mut self.compaction_groups) } + pub fn start_owned_compaction_groups_txn>( + inner: P, + ) -> BTreeMapTransactionInner< + CompactionGroupId, + CompactionGroup, + DerefMutForward< + Self, + BTreeMap, + P, + impl Fn(&Self) -> &BTreeMap, + impl Fn(&mut Self) -> &mut BTreeMap, + >, + > { + BTreeMapTransactionInner::new(DerefMutForward::new( + inner, + |mgr| &mgr.compaction_groups, + |mgr| &mut mgr.compaction_groups, + )) + } + /// Tries to get compaction group config for `compaction_group_id`. pub(super) fn try_get_compaction_group_config( &self, diff --git a/src/meta/src/model/mod.rs b/src/meta/src/model/mod.rs index 1ca22854c5def..6b5f629a7c04d 100644 --- a/src/meta/src/model/mod.rs +++ b/src/meta/src/model/mod.rs @@ -462,16 +462,20 @@ enum BTreeMapOp { /// are stored in `staging`. On `commit`, it will apply the changes stored in `staging` to the in /// memory btree map. When serve `get` and `get_mut`, it merges the value stored in `staging` and /// `tree_ref`. -pub struct BTreeMapTransaction<'a, K: Ord, V> { +pub struct BTreeMapTransactionInner>> { /// A reference to the original `BTreeMap`. All access to this field should be immutable, /// except when we commit the staging changes to the original map. - tree_ref: &'a mut BTreeMap, + tree_ref: P, /// Store all the staging changes that will be applied to the original map on commit staging: BTreeMap>, } -impl<'a, K: Ord + Debug, V: Clone> BTreeMapTransaction<'a, K, V> { - pub fn new(tree_ref: &'a mut BTreeMap) -> BTreeMapTransaction<'a, K, V> { +pub type BTreeMapTransaction<'a, K, V> = BTreeMapTransactionInner>; + +impl>> + BTreeMapTransactionInner +{ + pub fn new(tree_ref: P) -> BTreeMapTransactionInner { Self { tree_ref, staging: BTreeMap::default(), @@ -481,7 +485,7 @@ impl<'a, K: Ord + Debug, V: Clone> BTreeMapTransaction<'a, K, V> { /// Start a `BTreeMapEntryTransaction` when the `key` exists #[allow(dead_code)] pub fn new_entry_txn(&mut self, key: K) -> Option> { - BTreeMapEntryTransaction::new(self.tree_ref, key, None) + BTreeMapEntryTransaction::new(&mut self.tree_ref, key, None) } /// Start a `BTreeMapEntryTransaction`. If the `key` does not exist, the the `default_val` will @@ -492,17 +496,17 @@ impl<'a, K: Ord + Debug, V: Clone> BTreeMapTransaction<'a, K, V> { key: K, default_val: V, ) -> BTreeMapEntryTransaction<'_, K, V> { - BTreeMapEntryTransaction::new(self.tree_ref, key, Some(default_val)) + BTreeMapEntryTransaction::new(&mut self.tree_ref, key, Some(default_val)) .expect("default value is provided and should return `Some`") } /// Start a `BTreeMapEntryTransaction` that inserts the `val` into `key`. pub fn new_entry_insert_txn(&mut self, key: K, val: V) -> BTreeMapEntryTransaction<'_, K, V> { - BTreeMapEntryTransaction::new_insert(self.tree_ref, key, val) + BTreeMapEntryTransaction::new_insert(&mut self.tree_ref, key, val) } pub fn tree_ref(&self) -> &BTreeMap { - self.tree_ref + &self.tree_ref } /// Get the value of the provided key by merging the staging value and the original value @@ -578,7 +582,7 @@ impl<'a, K: Ord + Debug, V: Clone> BTreeMapTransaction<'a, K, V> { } } - pub fn commit_memory(self) { + pub fn commit_memory(mut self) { // Apply each op stored in the staging to original tree. for (k, op) in self.staging { match op { @@ -593,14 +597,16 @@ impl<'a, K: Ord + Debug, V: Clone> BTreeMapTransaction<'a, K, V> { } } -impl<'a, K: Ord + Debug, V: Clone> InMemValTransaction for BTreeMapTransaction<'a, K, V> { +impl>> InMemValTransaction + for BTreeMapTransactionInner +{ fn commit(self) { self.commit_memory(); } } -impl<'a, K: Ord + Debug, V: Transactional + Clone, TXN> ValTransaction - for BTreeMapTransaction<'a, K, V> +impl + Clone, P: DerefMut>, TXN> + ValTransaction for BTreeMapTransactionInner { async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> { // Add the staging operation to txn @@ -696,6 +702,76 @@ impl<'a, K: Ord, V: PartialEq + Transactional, TXN> ValTransaction } } +impl InMemValTransaction for Option { + fn commit(self) { + if let Some(inner) = self { + inner.commit(); + } + } +} + +impl, TXN> ValTransaction for Option { + async fn apply_to_txn(&self, txn: &mut TXN) -> MetadataModelResult<()> { + if let Some(inner) = &self { + inner.apply_to_txn(txn).await?; + } + Ok(()) + } +} + +pub struct DerefMutForward< + Inner, + Target, + P: DerefMut, + F: Fn(&Inner) -> &Target, + FMut: Fn(&mut Inner) -> &mut Target, +> { + ptr: P, + f: F, + f_mut: FMut, +} + +impl< + Inner, + Target, + P: DerefMut, + F: Fn(&Inner) -> &Target, + FMut: Fn(&mut Inner) -> &mut Target, + > DerefMutForward +{ + pub fn new(ptr: P, f: F, f_mut: FMut) -> Self { + Self { ptr, f, f_mut } + } +} + +impl< + Inner, + Target, + P: DerefMut, + F: Fn(&Inner) -> &Target, + FMut: Fn(&mut Inner) -> &mut Target, + > Deref for DerefMutForward +{ + type Target = Target; + + fn deref(&self) -> &Self::Target { + (self.f)(&self.ptr) + } +} + +impl< + Inner, + Target, + P: DerefMut, + F: Fn(&Inner) -> &Target, + FMut: Fn(&mut Inner) -> &mut Target, + > DerefMut for DerefMutForward +{ + fn deref_mut(&mut self) -> &mut Self::Target { + (self.f_mut)(&mut self.ptr) + } +} + #[cfg(test)] mod tests { use itertools::Itertools; From 044c89168b5e659f7f964ea52a7e9db517706b12 Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 31 Jul 2024 16:52:10 +0800 Subject: [PATCH 6/8] fix --- src/meta/src/hummock/manager/commit_epoch.rs | 24 +++++-------------- .../manager/compaction_group_manager.rs | 5 +++- 2 files changed, 10 insertions(+), 19 deletions(-) diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index a2923d4f74e05..65de56b5c06d1 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -99,12 +99,11 @@ impl CommitEpochInfo { impl HummockManager { #[cfg(any(test, feature = "test"))] - pub async fn commit_epoch_with_batch_cg_for_test( + pub async fn commit_epoch_for_test( &self, epoch: HummockEpoch, sstables: Vec>, sst_to_context: HashMap, - batch_commit_for_new_cg: Vec, ) -> Result<()> { let tables = self .versioning @@ -124,23 +123,12 @@ impl HummockManager { HashMap::new(), BTreeMap::from_iter([(epoch, tables)]), epoch, - batch_commit_for_new_cg, + vec![], ); self.commit_epoch(info).await?; Ok(()) } - #[cfg(any(test, feature = "test"))] - pub async fn commit_epoch_for_test( - &self, - epoch: HummockEpoch, - sstables: Vec>, - sst_to_context: HashMap, - ) -> Result<()> { - self.commit_epoch_with_batch_cg_for_test(epoch, sstables, sst_to_context, vec![]) - .await - } - /// Caller should ensure `epoch` > `max_committed_epoch` pub async fn commit_epoch( &self, @@ -188,7 +176,7 @@ impl HummockManager { &self.metrics, ); - let state_table_info = version.latest_version().state_table_info.clone(); + let state_table_info = &version.latest_version().state_table_info; let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id(); @@ -197,7 +185,7 @@ impl HummockManager { if let Some(new_fragment_table_info) = new_table_fragment_info { if !new_fragment_table_info.internal_table_ids.is_empty() { on_handle_add_new_table( - &state_table_info, + state_table_info, &new_fragment_table_info.internal_table_ids, StaticCompactionGroupId::StateDefault as u64, &mut table_compaction_group_mapping, @@ -207,7 +195,7 @@ impl HummockManager { if let Some(mv_table_id) = new_fragment_table_info.mv_table_id { on_handle_add_new_table( - &state_table_info, + state_table_info, &[mv_table_id], StaticCompactionGroupId::MaterializedView as u64, &mut table_compaction_group_mapping, @@ -244,7 +232,7 @@ impl HummockManager { new_id_count += epoch_to_ssts.values().map(|ssts| ssts.len()).sum::(); on_handle_add_new_table( - &state_table_info, + state_table_info, &table_ids, new_compaction_group_id, &mut table_compaction_group_mapping, diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index 7ff0b691bd66f..d585c23e19ee1 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -46,7 +46,10 @@ use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat; use crate::hummock::model::CompactionGroup; use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id}; use crate::manager::{MetaSrvEnv, MetaStoreImpl}; -use crate::model::{BTreeMapTransaction, BTreeMapTransactionInner, DerefMutForward, MetadataModel, MetadataModelError}; +use crate::model::{ + BTreeMapTransaction, BTreeMapTransactionInner, DerefMutForward, MetadataModel, + MetadataModelError, +}; type CompactionGroupTransaction<'a> = BTreeMapTransaction<'a, CompactionGroupId, CompactionGroup>; From 7f0d985d479975e86da28786cb8506546033142d Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 31 Jul 2024 17:21:00 +0800 Subject: [PATCH 7/8] remove passing start_sst_id for new group --- src/meta/src/hummock/manager/commit_epoch.rs | 10 +--------- src/meta/src/hummock/manager/transaction.rs | 8 ++------ .../hummock_test/src/hummock_storage_tests.rs | 12 ++++++------ 3 files changed, 9 insertions(+), 21 deletions(-) diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 82a528f4ad2e6..2054bd4dbb5d2 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -219,7 +219,6 @@ impl HummockManager { CompactionGroupManager::start_owned_compaction_groups_txn( compaction_group_manager_guard, ); - let mut new_id_count = 0; let mut batch_commit_info = HashMap::new(); for BatchCommitForNewCg { epoch_to_ssts, @@ -235,8 +234,6 @@ impl HummockManager { }, ); - new_id_count += epoch_to_ssts.values().map(|ssts| ssts.len()).sum::(); - on_handle_add_new_table( state_table_info, &table_ids, @@ -247,13 +244,8 @@ impl HummockManager { batch_commit_info.insert(new_compaction_group_id, epoch_to_ssts); } - let start_sst_id = next_sstable_object_id(&self.env, new_id_count).await?; ( - Some(( - batch_commit_info, - start_sst_id, - (*compaction_group_config).clone(), - )), + Some((batch_commit_info, (*compaction_group_config).clone())), Some(compaction_group_manager), ) } else { diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index 7191a0d18e7b2..7b1de499d3e8a 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -123,7 +123,6 @@ impl<'a> HummockVersionTransaction<'a> { change_log_delta: HashMap, batch_commit_for_new_cg: Option<( HashMap>>, - u64, CompactionConfig, )>, ) -> HummockVersionDelta { @@ -132,11 +131,9 @@ impl<'a> HummockVersionTransaction<'a> { new_version_delta.new_table_watermarks = new_table_watermarks; new_version_delta.change_log_delta = change_log_delta; - if let Some((batch_commit_for_new_cg, start_sst_id, compaction_group_config)) = + if let Some((batch_commit_for_new_cg, compaction_group_config)) = batch_commit_for_new_cg { - let mut start_sst_id = start_sst_id; - for (compaction_group_id, batch_commit_sst) in batch_commit_for_new_cg { let group_deltas = &mut new_version_delta .group_deltas @@ -150,13 +147,12 @@ impl<'a> HummockVersionTransaction<'a> { group_id: compaction_group_id, parent_group_id: StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId, - new_sst_start_id: start_sst_id, + new_sst_start_id: 0, // No need to set it when `NewCompactionGroup` table_ids: vec![], version: CompatibilityVersion::NoMemberTableIds as i32, })); for (epoch, insert_ssts) in batch_commit_sst { - start_sst_id += insert_ssts.len() as u64; let l0_sub_level_id = epoch; let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new( 0, diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 63bcff21bdc32..218ac69f973d9 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -2500,7 +2500,7 @@ async fn test_commit_multi_epoch() { let epoch1 = initial_epoch.next_epoch(); let sst1_epoch1 = SstableInfo { - sst_id: 1, + sst_id: 11, object_id: 1, table_ids: vec![existing_table_id.table_id], ..Default::default() @@ -2537,7 +2537,7 @@ async fn test_commit_multi_epoch() { let sub_level = &sub_levels[0]; assert_eq!(sub_level.sub_level_id, epoch1); assert_eq!(sub_level.table_infos.len(), 1); - assert_eq!(sub_level.table_infos[0], sst1_epoch1); + assert_eq!(sub_level.table_infos[0].object_id, sst1_epoch1.object_id); old_version.levels.keys().cloned().collect() }; @@ -2546,14 +2546,14 @@ async fn test_commit_multi_epoch() { let batch_commit_table_id = TableId::new(2); let sst_epoch2 = SstableInfo { - sst_id: 2, + sst_id: 22, object_id: 2, table_ids: vec![existing_table_id.table_id, batch_commit_table_id.table_id], ..Default::default() }; let sst2_epoch1 = SstableInfo { - sst_id: 3, + sst_id: 33, object_id: 3, table_ids: vec![batch_commit_table_id.table_id], ..Default::default() @@ -2599,7 +2599,7 @@ async fn test_commit_multi_epoch() { let sub_level1 = &sub_levels[0]; assert_eq!(sub_level1.sub_level_id, epoch1); assert_eq!(sub_level1.table_infos.len(), 1); - assert_eq!(sub_level1.table_infos[0], sst1_epoch1); + assert_eq!(sub_level1.table_infos[0].object_id, sst1_epoch1.object_id); let sub_level2 = &sub_levels[1]; assert_eq!(sub_level2.sub_level_id, epoch2); assert_eq!(sub_level2.table_infos.len(), 1); @@ -2616,7 +2616,7 @@ async fn test_commit_multi_epoch() { let sub_level1 = &sub_levels[0]; assert_eq!(sub_level1.sub_level_id, epoch1); assert_eq!(sub_level1.table_infos.len(), 1); - assert_eq!(sub_level1.table_infos[0], sst2_epoch1); + assert_eq!(sub_level1.table_infos[0].object_id, sst2_epoch1.object_id); let sub_level2 = &sub_levels[1]; assert_eq!(sub_level2.sub_level_id, epoch2); assert_eq!(sub_level2.table_infos.len(), 1); From 5475645217ef0d89815a0e9ec48c4569c83834ab Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 2 Aug 2024 14:04:04 +0800 Subject: [PATCH 8/8] fix comment --- src/meta/src/hummock/manager/transaction.rs | 50 ++++++++++--------- .../compaction_group/hummock_version_ext.rs | 21 ++++++-- .../hummock_test/src/hummock_storage_tests.rs | 8 +++ 3 files changed, 52 insertions(+), 27 deletions(-) diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index 0c0e1cf45de6f..9833dbea6e3af 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -116,7 +116,7 @@ impl<'a> HummockVersionTransaction<'a> { #[expect(clippy::type_complexity)] pub(super) fn pre_commit_epoch( &mut self, - epoch: HummockEpoch, + max_committed_epoch: HummockEpoch, commit_sstables: BTreeMap>, new_table_ids: HashMap, new_table_watermarks: HashMap, @@ -127,7 +127,7 @@ impl<'a> HummockVersionTransaction<'a> { )>, ) -> HummockVersionDelta { let mut new_version_delta = self.new_delta(); - new_version_delta.max_committed_epoch = epoch; + new_version_delta.max_committed_epoch = max_committed_epoch; new_version_delta.new_table_watermarks = new_table_watermarks; new_version_delta.change_log_delta = change_log_delta; @@ -151,13 +151,14 @@ impl<'a> HummockVersionTransaction<'a> { })); for (epoch, insert_ssts) in batch_commit_sst { + assert!(epoch < max_committed_epoch); let l0_sub_level_id = epoch; let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new( 0, - l0_sub_level_id, // default - vec![], - insert_ssts.into_iter().map(|s| s.sst_info).collect(), // default - 0, // default + l0_sub_level_id, + vec![], // default + insert_ssts.into_iter().map(|s| s.sst_info).collect(), + 0, // default )); group_deltas.push(group_delta); } @@ -171,7 +172,7 @@ impl<'a> HummockVersionTransaction<'a> { .entry(compaction_group_id) .or_default() .group_deltas; - let l0_sub_level_id = epoch; + let l0_sub_level_id = max_committed_epoch; let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new( 0, l0_sub_level_id, @@ -186,32 +187,33 @@ impl<'a> HummockVersionTransaction<'a> { // update state table info new_version_delta.with_latest_version(|version, delta| { for (table_id, cg_id) in new_table_ids { + assert!( + !version.state_table_info.info().contains_key(&table_id), + "newly added table exists previously: {:?}", + table_id + ); delta.state_table_info_delta.insert( table_id, StateTableInfoDelta { - committed_epoch: epoch, - safe_epoch: epoch, + committed_epoch: max_committed_epoch, + safe_epoch: max_committed_epoch, compaction_group_id: cg_id, }, ); } for (table_id, info) in version.state_table_info.info() { - assert!( - delta - .state_table_info_delta - .insert( - *table_id, - StateTableInfoDelta { - committed_epoch: epoch, - safe_epoch: info.safe_epoch, - compaction_group_id: info.compaction_group_id, - } - ) - .is_none(), - "newly added table exists previously: {:?}", - table_id - ); + assert!(delta + .state_table_info_delta + .insert( + *table_id, + StateTableInfoDelta { + committed_epoch: max_committed_epoch, + safe_epoch: info.safe_epoch, + compaction_group_id: info.compaction_group_id, + } + ) + .is_none(),); } }); diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index bb465f962b634..0fe019d9afcd3 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -358,9 +358,24 @@ impl HummockVersion { new_sst_start_id: u64, ) { let mut new_sst_id = new_sst_start_id; - if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId - || !self.levels.contains_key(&parent_group_id) - { + if !self.levels.contains_key(&parent_group_id) { + warn!(parent_group_id, "non-existing parent group id to init from"); + return; + } + if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId { + if new_sst_start_id != 0 { + if cfg!(debug_assertions) { + panic!( + "non-zero sst start id {} for NewCompactionGroup", + new_sst_start_id + ); + } else { + warn!( + new_sst_start_id, + "non-zero sst start id for NewCompactionGroup" + ); + } + } return; } let [parent_levels, cur_levels] = self diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 23afa7bb40cca..b3e304305660c 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -2621,4 +2621,12 @@ async fn test_commit_multi_epoch() { assert_eq!(sub_level2.sub_level_id, epoch2); assert_eq!(sub_level2.table_infos.len(), 1); assert_eq!(sub_level2.table_infos[0].object_id, sst_epoch2.object_id); + + let info = new_version + .state_table_info + .info() + .get(&batch_commit_table_id) + .unwrap(); + assert_eq!(info.committed_epoch, epoch2); + assert_eq!(info.compaction_group_id, new_cg_id); }