diff --git a/Cargo.lock b/Cargo.lock index 20e6b328357c..a38ed0bdf652 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -735,9 +735,9 @@ dependencies = [ [[package]] name = "arrow-udf-js" -version = "0.3.1" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76cb6d108605c5489fff1ef9c520656946ad05ed0de3ea6d26d56bcb34bdb8c5" +checksum = "dc1d882f3cbdaf8dd02d9936c3acbf9e1deca401987562f804977ec262bf5e10" dependencies = [ "anyhow", "arrow-array 50.0.0", @@ -799,9 +799,9 @@ dependencies = [ [[package]] name = "arrow-udf-python" -version = "0.2.1" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "900ace379560dff311315591fe713121723a7578b7abb3b195b9076aa155eaf1" +checksum = "4506efc6fbc200c083add2a7ed4e3616a859941a745e922320ae7051d90d12ec" dependencies = [ "anyhow", "arrow-array 50.0.0", diff --git a/proto/hummock.proto b/proto/hummock.proto index 2ab63d9ea03b..c99df3992512 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -137,32 +137,6 @@ message TableWatermarks { bool is_ascending = 2; } -message SnapshotGroup { - uint32 table_fragments_id = 1; - repeated uint32 member_table_ids = 2; - uint64 committed_epoch = 3; - // Snapshots with epoch less than the safe epoch have been GCed. - // Reads against such an epoch will fail. - uint64 safe_epoch = 4; -} - -message SnapshotGroupDelta { - message NewSnapshotGroup { - repeated uint32 member_table_ids = 1; - uint64 committed_epoch = 2; - uint64 safe_epoch = 3; - } - - message DestroySnapshotGroup {} - - oneof delta { - NewSnapshotGroup new_snapshot_group = 1; - uint64 new_committed_epoch = 2; - uint64 new_safe_epoch = 3; - DestroySnapshotGroup destroy = 4; - } -} - message EpochNewChangeLog { repeated SstableInfo old_value = 1; repeated SstableInfo new_value = 2; @@ -175,6 +149,16 @@ message TableChangeLog { repeated EpochNewChangeLog change_logs = 1; } +message StateTableInfo { + uint64 committed_epoch = 1; + uint64 safe_epoch = 2; +} + +message StateTableInfoDelta { + uint64 committed_epoch = 1; + uint64 safe_epoch = 2; +} + message HummockVersion { message Levels { repeated Level levels = 1; @@ -192,7 +176,7 @@ message HummockVersion { uint64 safe_epoch = 4; map table_watermarks = 5; map table_change_logs = 6; - map snapshot_groups = 7; + map state_table_info = 7; } message HummockVersionDelta { @@ -218,7 +202,7 @@ message HummockVersionDelta { uint64 truncate_epoch = 2; } map change_log_delta = 10; - map snapshot_group_delta = 11; + map state_table_info_delta = 11; } message HummockVersionDeltas { diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version.rs index d1ea946dd1e2..cb3bba6f899a 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version.rs @@ -206,31 +206,25 @@ async fn read_hummock_table_watermarks( } #[derive(Fields)] -struct RwHummockSnapshotGroups { +struct RwHummockSnapshot { #[primary_key] - group_id: i32, + table_id: i32, safe_epoch: i64, committed_epoch: i64, - member_table_ids: Vec, } -#[system_catalog(table, "rw_catalog.rw_hummock_snapshot_groups")] +#[system_catalog(table, "rw_catalog.rw_hummock_snapshot")] async fn read_hummock_snapshot_groups( reader: &SysCatalogReaderImpl, -) -> Result> { +) -> Result> { let version = reader.meta_client.get_hummock_current_version().await?; Ok(version - .snapshot_groups + .state_table_info .iter() - .map(|(group_id, group)| RwHummockSnapshotGroups { - group_id: u32::from(*group_id) as _, - safe_epoch: group.safe_epoch as _, - committed_epoch: group.committed_epoch as _, - member_table_ids: group - .member_table_ids - .iter() - .map(|table_id| table_id.table_id as _) - .collect(), + .map(|(table_id, info)| RwHummockSnapshot { + table_id: table_id.table_id as _, + committed_epoch: info.committed_epoch as _, + safe_epoch: info.safe_epoch as _, }) .collect()) } diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index a16061445975..d84c8c66f2d3 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -587,13 +587,8 @@ pub async fn start_service_as_election_leader( .unwrap(), ); - let existing_table_fragment_state_tables = metadata_manager - .get_table_fragment_state_table_ids() - .await - .unwrap(); - hummock_manager - .may_fill_backward_snapshot_group(&existing_table_fragment_state_tables) + .may_fill_backward_state_table_info() .await .unwrap(); diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index c02ca598e78a..865835c49ede 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::collections::{HashMap, HashSet}; -use std::iter::once; use std::sync::Arc; use futures::future::try_join_all; @@ -868,15 +867,19 @@ impl CommandContext { Command::DropStreamingJobs { actors, - unregistered_table_fragment_ids, + unregistered_state_table_ids, .. } => { // Tell compute nodes to drop actors. self.clean_up(actors.clone()).await?; + let unregistered_state_table_ids = unregistered_state_table_ids + .iter() + .map(|table_id| table_id.table_id) + .collect_vec(); self.barrier_manager_context .hummock_manager - .unregister_table_fragments_ids(unregistered_table_fragment_ids.clone()) + .unregister_table_ids(&unregistered_state_table_ids) .await?; } @@ -894,11 +897,11 @@ impl CommandContext { // since the failure could be recoverable. // As such it needs to be handled here. let table_id = table_fragments.table_id().table_id; + let mut table_ids = table_fragments.internal_table_ids(); + table_ids.push(table_id); self.barrier_manager_context .hummock_manager - .unregister_table_fragments_ids(HashSet::from_iter(once( - table_fragments.table_id(), - ))) + .unregister_table_ids(&table_ids) .await?; match &self.barrier_manager_context.metadata_manager { diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 53de652d4773..f17b4e163901 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -113,18 +113,23 @@ impl GlobalBarrierManagerContext { } async fn purge_state_table_from_hummock(&self) -> MetaResult<()> { - let existing_table_fragment_state_tables = self - .metadata_manager - .get_table_fragment_state_table_ids() - .await?; - self.hummock_manager - .purge(HashSet::from_iter( - existing_table_fragment_state_tables - .keys() - .cloned() - .map(TableId::new), - )) - .await?; + let all_state_table_ids = match &self.metadata_manager { + MetadataManager::V1(mgr) => mgr + .catalog_manager + .list_tables() + .await + .into_iter() + .map(|t| t.id) + .collect_vec(), + MetadataManager::V2(mgr) => mgr + .catalog_controller + .list_all_state_table_ids() + .await? + .into_iter() + .map(|id| id as u32) + .collect_vec(), + }; + self.hummock_manager.purge(&all_state_table_ids).await?; Ok(()) } @@ -308,23 +313,29 @@ impl GlobalBarrierManagerContext { let (dropped_actors, cancelled) = scheduled_barriers.pre_apply_drop_cancel_scheduled(); let applied = !dropped_actors.is_empty() || !cancelled.is_empty(); if !cancelled.is_empty() { - match &self.metadata_manager { + let unregister_table_ids = match &self.metadata_manager { MetadataManager::V1(mgr) => { mgr.fragment_manager .drop_table_fragments_vec(&cancelled) - .await?; + .await? } MetadataManager::V2(mgr) => { - for job_id in &cancelled { - let _ = mgr + let mut unregister_table_ids = Vec::new(); + for job_id in cancelled { + let (_, table_ids_to_unregister) = mgr .catalog_controller .try_abort_creating_streaming_job(job_id.table_id as _, true) .await?; + unregister_table_ids.extend(table_ids_to_unregister); } + unregister_table_ids + .into_iter() + .map(|table_id| table_id as u32) + .collect() } }; self.hummock_manager - .unregister_table_fragments_ids(HashSet::from_iter(cancelled)) + .unregister_table_ids(&unregister_table_ids) .await?; } Ok(applied) diff --git a/src/meta/src/hummock/error.rs b/src/meta/src/hummock/error.rs index 30e10e32e093..434de4762331 100644 --- a/src/meta/src/hummock/error.rs +++ b/src/meta/src/hummock/error.rs @@ -43,8 +43,6 @@ pub enum Error { CompactorUnreachable(HummockContextId), #[error("compaction group error: {0}")] CompactionGroup(String), - #[error("snapshot group error: {0}")] - SnapshotGroup(String), #[error("SST {0} is invalid")] InvalidSst(HummockSstableObjectId), #[error(transparent)] diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index b98c2eca4b4c..1471ddd32af5 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::collections::{BTreeMap, HashMap, HashSet}; -use std::iter::once; use itertools::Itertools; use risingwave_common::catalog::TableId; @@ -28,9 +27,11 @@ use risingwave_hummock_sdk::{ use risingwave_pb::hummock::compact_task::{self}; use risingwave_pb::hummock::group_delta::DeltaType; use risingwave_pb::hummock::hummock_version_delta::ChangeLogDelta; -use risingwave_pb::hummock::{GroupDelta, HummockSnapshot, IntraLevelDelta}; +use risingwave_pb::hummock::{ + GroupDelta, GroupMetaChange, HummockSnapshot, IntraLevelDelta, StateTableInfoDelta, +}; -use crate::hummock::error::Result; +use crate::hummock::error::{Error, Result}; use crate::hummock::manager::transaction::{ HummockVersionStatsTransaction, HummockVersionTransaction, }; @@ -141,48 +142,80 @@ impl HummockManager { .latest_version() .build_compaction_group_info(); + let mut new_table_ids = None; + // Add new table if let Some(new_fragment_table_info) = new_table_fragment_info { - let mut cg_table_ids = HashMap::new(); + let new_table_ids = new_table_ids.insert(HashSet::new()); if !new_fragment_table_info.internal_table_ids.is_empty() { - cg_table_ids.insert( - StaticCompactionGroupId::StateDefault as u64, - new_fragment_table_info - .internal_table_ids - .iter() - .map(|table_id| table_id.table_id) - .collect(), - ); + if let Some(levels) = new_version_delta + .latest_version() + .levels + .get(&(StaticCompactionGroupId::StateDefault as u64)) + { + for table_id in &new_fragment_table_info.internal_table_ids { + if levels.member_table_ids.contains(&table_id.table_id) { + return Err(Error::CompactionGroup(format!( + "table {} already in group {}", + table_id, + StaticCompactionGroupId::StateDefault as u64 + ))); + } + } + } + + let group_deltas = &mut new_version_delta + .group_deltas + .entry(StaticCompactionGroupId::StateDefault as u64) + .or_default() + .group_deltas; + group_deltas.push(GroupDelta { + delta_type: Some(DeltaType::GroupMetaChange(GroupMetaChange { + table_ids_add: new_fragment_table_info + .internal_table_ids + .iter() + .map(|table_id| table_id.table_id) + .collect(), + ..Default::default() + })), + }); for table_id in &new_fragment_table_info.internal_table_ids { table_compaction_group_mapping .insert(*table_id, StaticCompactionGroupId::StateDefault as u64); + new_table_ids.insert(*table_id); } } if let Some(table_id) = new_fragment_table_info.mv_table_id { - cg_table_ids.insert( - StaticCompactionGroupId::MaterializedView as u64, - HashSet::from_iter(once(table_id.table_id)), - ); - table_compaction_group_mapping + if let Some(levels) = new_version_delta + .latest_version() + .levels + .get(&(StaticCompactionGroupId::MaterializedView as u64)) + { + if levels.member_table_ids.contains(&table_id.table_id) { + return Err(Error::CompactionGroup(format!( + "table {} already in group {}", + table_id, + StaticCompactionGroupId::MaterializedView as u64 + ))); + } + } + let group_deltas = &mut new_version_delta + .group_deltas + .entry(StaticCompactionGroupId::MaterializedView as u64) + .or_default() + .group_deltas; + group_deltas.push(GroupDelta { + delta_type: Some(DeltaType::GroupMetaChange(GroupMetaChange { + table_ids_add: vec![table_id.table_id], + ..Default::default() + })), + }); + let _ = table_compaction_group_mapping .insert(table_id, StaticCompactionGroupId::MaterializedView as u64); + new_table_ids.insert(table_id); } - - let table_fragments_id = new_fragment_table_info.table_id; - - let (snapshot_group_delta, compaction_group_deltas) = self - .gen_register_table_fragments_delta( - table_fragments_id, - cg_table_ids, - new_version_delta.latest_version(), - ) - .await?; - - new_version_delta - .snapshot_group_delta - .insert(table_fragments_id, snapshot_group_delta); - new_version_delta.group_deltas = compaction_group_deltas; } let mut incorrect_ssts = vec![]; @@ -303,6 +336,28 @@ impl HummockManager { group_deltas.push(group_delta); } + // update state table info + new_version_delta.with_latest_version(|version, delta| { + for table_id in new_table_ids + .into_iter() + .flat_map(|ids| ids.into_iter().map(|table_id| table_id.table_id)) + .chain( + version + .levels + .values() + .flat_map(|group| group.member_table_ids.iter().cloned()), + ) + { + delta.state_table_info_delta.insert( + TableId::new(table_id), + StateTableInfoDelta { + committed_epoch: epoch, + safe_epoch: version.safe_epoch, + }, + ); + } + }); + new_version_delta.pre_apply(); // Apply stats changes. diff --git a/src/meta/src/hummock/manager/compaction.rs b/src/meta/src/hummock/manager/compaction.rs index 275b5ed9e8b4..5cff0ff8a07b 100644 --- a/src/meta/src/hummock/manager/compaction.rs +++ b/src/meta/src/hummock/manager/compaction.rs @@ -30,7 +30,7 @@ use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockLevels use risingwave_hummock_sdk::table_stats::{ add_prost_table_stats_map, purge_prost_table_stats, PbTableStatsMap, }; -use risingwave_hummock_sdk::version::{HummockVersion, SnapshotGroupDelta}; +use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{ compact_task_to_string, statistics_compact_task, CompactionGroupId, HummockCompactionTaskId, HummockVersionId, @@ -47,7 +47,7 @@ use risingwave_pb::hummock::subscribe_compaction_event_response::{ use risingwave_pb::hummock::{ compact_task, CompactStatus as PbCompactStatus, CompactTask, CompactTaskAssignment, CompactionConfig, GroupDelta, InputLevel, IntraLevelDelta, Level, SstableInfo, - SubscribeCompactionEventRequest, TableOption, TableSchema, + StateTableInfoDelta, SubscribeCompactionEventRequest, TableOption, TableSchema, }; use rw_futures_util::pending_on_none; use thiserror_ext::AsReport; @@ -172,14 +172,17 @@ impl<'a> HummockVersionTransaction<'a> { compact_task.watermark, ); if version_delta.latest_version().safe_epoch < version_delta.safe_epoch { - version_delta.snapshot_group_delta = version_delta + version_delta.state_table_info_delta = version_delta .latest_version() - .snapshot_groups - .keys() - .map(|group_id| { + .state_table_info + .iter() + .map(|(table_id, info)| { ( - *group_id, - SnapshotGroupDelta::NewSafeEpoch(version_delta.safe_epoch), + *table_id, + StateTableInfoDelta { + committed_epoch: info.committed_epoch, + safe_epoch: version_delta.safe_epoch, + }, ) }) .collect(); diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index cb4e2ba1ab80..cfb770124d61 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -13,17 +13,16 @@ // limitations under the License. use std::collections::{BTreeMap, HashMap, HashSet}; -use std::iter::once; use std::ops::DerefMut; use std::sync::Arc; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ - get_compaction_group_ids, try_get_compaction_group_id_by_table_id, TableGroupInfo, + get_compaction_group_ids, get_member_table_ids, try_get_compaction_group_id_by_table_id, + TableGroupInfo, }; use risingwave_hummock_sdk::compaction_group::{StateTableId, StaticCompactionGroupId}; -use risingwave_hummock_sdk::version::{HummockVersion, SnapshotGroupDelta}; use risingwave_hummock_sdk::CompactionGroupId; use risingwave_meta_model_v2::compaction_config; use risingwave_pb::hummock::compact_task::TaskStatus; @@ -34,7 +33,7 @@ use risingwave_pb::hummock::subscribe_compaction_event_request::ReportTask; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ compact_task, CompactionConfig, CompactionGroupInfo, CompatibilityVersion, GroupConstruct, - GroupDelta, GroupDestroy, GroupMetaChange, + GroupDelta, GroupDestroy, GroupMetaChange, StateTableInfoDelta, }; use tokio::sync::OnceCell; @@ -99,27 +98,29 @@ impl HummockManager { /// Registers `table_fragments` to compaction groups. pub async fn register_table_fragments( &self, - table_fragments: crate::model::TableFragments, - ) -> Result<()> { - let mut group_table_ids = HashMap::new(); - if let Some(mv_table) = table_fragments.mv_table_id() { + mv_table: Option, + mut internal_tables: Vec, + ) -> Result> { + let mut pairs = vec![]; + if let Some(mv_table) = mv_table { + if internal_tables.extract_if(|t| *t == mv_table).count() > 0 { + tracing::warn!("`mv_table` {} found in `internal_tables`", mv_table); + } // materialized_view - group_table_ids.insert( - StaticCompactionGroupId::MaterializedView as u64, - HashSet::from_iter(once(mv_table)), - ); + pairs.push(( + mv_table, + CompactionGroupId::from(StaticCompactionGroupId::MaterializedView), + )); } // internal states - let internal_tables = table_fragments.internal_table_ids(); - if !internal_tables.is_empty() { - group_table_ids.insert( - StaticCompactionGroupId::StateDefault as u64, - HashSet::from_iter(internal_tables), - ); + for table_id in internal_tables { + pairs.push(( + table_id, + CompactionGroupId::from(StaticCompactionGroupId::StateDefault), + )); } - self.register_table_fragments_tables(table_fragments.table_id(), group_table_ids) - .await?; - Ok(()) + self.register_table_ids_for_test(&pairs).await?; + Ok(pairs.iter().map(|(table_id, ..)| *table_id).collect_vec()) } #[cfg(test)] @@ -128,9 +129,12 @@ impl HummockManager { &self, table_fragments: &[crate::model::TableFragments], ) { - self.unregister_table_fragments_ids(HashSet::from_iter( - table_fragments.iter().map(|tf| tf.table_id()), - )) + self.unregister_table_ids( + &table_fragments + .iter() + .flat_map(|t| t.all_table_ids()) + .collect_vec(), + ) .await .unwrap(); } @@ -138,114 +142,59 @@ impl HummockManager { /// Unregisters stale members and groups /// The caller should ensure `table_fragments_list` remain unchanged during `purge`. /// Currently `purge` is only called during meta service start ups. - pub async fn purge(&self, existing_table_fragment_ids: HashSet) -> Result<()> { - let registered_table_fragments_ids: HashSet = self - .versioning - .read() - .await - .current_version - .snapshot_groups - .keys() - .cloned() - .collect(); - let to_unregister: HashSet = - ®istered_table_fragments_ids - &existing_table_fragment_ids; + pub async fn purge(&self, valid_ids: &[u32]) -> Result<()> { + let registered_members = + get_member_table_ids(&self.versioning.read().await.current_version); + let to_unregister = registered_members + .into_iter() + .filter(|table_id| !valid_ids.contains(table_id)) + .collect_vec(); // As we have released versioning lock, the version that `to_unregister` is calculated from // may not be the same as the one used in unregister_table_ids. It is OK. - self.unregister_table_fragments_ids(to_unregister).await + self.unregister_table_ids(&to_unregister).await } - pub async fn register_single_table_fragments( - &self, - table_id: TableId, - compaction_group_id: CompactionGroupId, - ) -> Result<()> { - self.register_table_fragments_tables( - table_id, - HashMap::from_iter(once(( - compaction_group_id, - HashSet::from_iter(once(table_id.table_id)), - ))), - ) - .await - } - - pub async fn register_table_fragments_tables( + /// The implementation acquires `versioning` lock. + /// + /// The method name is temporarily added with a `_for_test` prefix to mark + /// that it's currently only used in test. + pub async fn register_table_ids_for_test( &self, - table_fragments_id: TableId, - group_table_ids: HashMap>, + pairs: &[(StateTableId, CompactionGroupId)], ) -> Result<()> { + if pairs.is_empty() { + return Ok(()); + } let mut versioning_guard = self.versioning.write().await; - let versioning: &mut Versioning = &mut versioning_guard; - - let (snapshot_group_delta, compaction_group_deltas) = self - .gen_register_table_fragments_delta( - table_fragments_id, - group_table_ids, - &versioning.current_version, - ) - .await?; + let versioning = versioning_guard.deref_mut(); + let current_version = &versioning.current_version; + for (table_id, _) in pairs { + if let Some(old_group) = + try_get_compaction_group_id_by_table_id(current_version, *table_id) + { + return Err(Error::CompactionGroup(format!( + "table {} already in group {}", + *table_id, old_group + ))); + } + } + // All NewCompactionGroup pairs are mapped to one new compaction group. + let new_compaction_group_id: OnceCell = OnceCell::new(); let mut version = HummockVersionTransaction::new( &mut versioning.current_version, &mut versioning.hummock_version_deltas, self.env.notification_manager(), &self.metrics, ); - let mut new_version_delta = version.new_delta(); - - new_version_delta - .snapshot_group_delta - .insert(table_fragments_id, snapshot_group_delta); - new_version_delta.group_deltas = compaction_group_deltas; - new_version_delta.pre_apply(); - - commit_multi_var!(self.meta_store_ref(), version)?; - - Ok(()) - } - - pub(super) async fn gen_register_table_fragments_delta( - &self, - table_fragments_id: TableId, - group_table_ids: HashMap>, - current_version: &HummockVersion, - ) -> Result<(SnapshotGroupDelta, HashMap)> { - if let Some(group) = ¤t_version.snapshot_groups.get(&table_fragments_id) { - return Err(Error::SnapshotGroup(format!( - "table fragments {} already exists. {:?}", - table_fragments_id.table_id, group - ))); - } - for (cg_id, table_ids) in &group_table_ids { - if let Some(group) = current_version.levels.get(cg_id) { - for table_id in table_ids { - if group.member_table_ids.contains(table_id) { - return Err(Error::CompactionGroup(format!( - "table {} already in group {}", - *table_id, cg_id, - ))); - } - } - } - } - - let snapshot_group_delta = SnapshotGroupDelta::NewSnapshotGroup { - member_table_ids: group_table_ids - .values() - .flat_map(|table_ids| table_ids.iter().map(|table_id| TableId::new(*table_id))) - .collect(), - committed_epoch: current_version.max_committed_epoch, - safe_epoch: current_version.safe_epoch, + let (committed_epoch, safe_epoch) = { + let version = new_version_delta.latest_version(); + (version.max_committed_epoch, version.safe_epoch) }; - let mut compaction_group_deltas: HashMap = HashMap::new(); - - // All NewCompactionGroup pairs are mapped to one new compaction group. - let new_compaction_group_id: OnceCell = OnceCell::new(); - for (raw_group_id, table_ids) in group_table_ids { - let mut group_id = raw_group_id; + for (table_id, raw_group_id) in pairs { + let mut group_id = *raw_group_id; if group_id == StaticCompactionGroupId::NewCompactionGroup as u64 { let mut is_group_init = false; group_id = *new_compaction_group_id @@ -259,8 +208,11 @@ impl HummockManager { }) .await?; if is_group_init { - let group_deltas = - &mut (compaction_group_deltas.entry(group_id).or_default()).group_deltas; + let group_deltas = &mut new_version_delta + .group_deltas + .entry(group_id) + .or_default() + .group_deltas; let config = self .compaction_group_manager .write() @@ -279,26 +231,36 @@ impl HummockManager { }); } } - let group_deltas = &mut compaction_group_deltas + let group_deltas = &mut new_version_delta + .group_deltas .entry(group_id) .or_default() .group_deltas; group_deltas.push(GroupDelta { delta_type: Some(DeltaType::GroupMetaChange(GroupMetaChange { - table_ids_add: table_ids.iter().cloned().collect(), + table_ids_add: vec![*table_id], ..Default::default() })), }); + assert!(new_version_delta + .state_table_info_delta + .insert( + TableId::new(*table_id), + StateTableInfoDelta { + committed_epoch, + safe_epoch, + } + ) + .is_none()); } + new_version_delta.pre_apply(); + commit_multi_var!(self.meta_store_ref(), version)?; - Ok((snapshot_group_delta, compaction_group_deltas)) + Ok(()) } - pub async fn unregister_table_fragments_ids( - &self, - table_fragments_ids: HashSet, - ) -> Result<()> { - if table_fragments_ids.is_empty() { + pub async fn unregister_table_ids(&self, table_ids: &[StateTableId]) -> Result<()> { + if table_ids.is_empty() { return Ok(()); } let mut versioning_guard = self.versioning.write().await; @@ -310,24 +272,6 @@ impl HummockManager { &self.metrics, ); let mut new_version_delta = version.new_delta(); - let mut table_ids = HashSet::new(); - for table_fragments_id in &table_fragments_ids { - if let Some(group) = new_version_delta - .latest_version() - .snapshot_groups - .get(table_fragments_id) - { - table_ids.extend( - group - .member_table_ids - .iter() - .map(|table_id| table_id.table_id), - ); - new_version_delta - .snapshot_group_delta - .insert(*table_fragments_id, SnapshotGroupDelta::Destroy); - } - } let mut modified_groups: HashMap = HashMap::new(); // Remove member tables @@ -363,7 +307,7 @@ impl HummockManager { ); new_version_delta .removed_table_ids - .push(TableId::new(*table_id)); + .insert(TableId::new(*table_id)); } let groups_to_remove = modified_groups @@ -959,29 +903,17 @@ fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfi #[cfg(test)] mod tests { - use std::collections::{BTreeMap, HashSet}; - use std::iter::once; + use std::collections::BTreeMap; + use itertools::Itertools; use risingwave_common::catalog::TableId; - use risingwave_hummock_sdk::CompactionGroupId; use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig; use risingwave_pb::meta::table_fragments::Fragment; use crate::hummock::manager::compaction_group_manager::CompactionGroupManager; use crate::hummock::test_utils::setup_compute_env; - use crate::hummock::HummockManager; use crate::model::TableFragments; - impl HummockManager { - pub(crate) async fn register_table_ids( - &self, - [(table_id, compaction_group_id)]: &[(u32, CompactionGroupId); 1], - ) -> super::Result<()> { - self.register_single_table_fragments(TableId::new(*table_id), *compaction_group_id) - .await - } - } - #[tokio::test] async fn test_inner() { let (env, ..) = setup_compute_env(8080).await; @@ -1061,12 +993,18 @@ mod tests { assert_eq!(registered_number().await, 0); compaction_group_manager - .register_table_fragments(table_fragment_1.clone()) + .register_table_fragments( + Some(table_fragment_1.table_id().table_id), + table_fragment_1.internal_table_ids(), + ) .await .unwrap(); assert_eq!(registered_number().await, 4); compaction_group_manager - .register_table_fragments(table_fragment_2.clone()) + .register_table_fragments( + Some(table_fragment_2.table_id().table_id), + table_fragment_2.internal_table_ids(), + ) .await .unwrap(); assert_eq!(registered_number().await, 8); @@ -1079,20 +1017,20 @@ mod tests { // Test purge_stale_members: table fragments compaction_group_manager - .purge(once(table_fragment_2.table_id()).collect()) + .purge(&table_fragment_2.all_table_ids().collect_vec()) .await .unwrap(); assert_eq!(registered_number().await, 4); - compaction_group_manager - .purge(HashSet::new()) - .await - .unwrap(); + compaction_group_manager.purge(&[]).await.unwrap(); assert_eq!(registered_number().await, 0); assert_eq!(group_number().await, 2); compaction_group_manager - .register_table_fragments(table_fragment_1.clone()) + .register_table_fragments( + Some(table_fragment_1.table_id().table_id), + table_fragment_1.internal_table_ids(), + ) .await .unwrap(); assert_eq!(registered_number().await, 4); diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 588a6da0d203..62de1e2bcc30 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -69,7 +69,6 @@ mod worker; pub(crate) use commit_epoch::*; use compaction::*; pub use compaction::{check_cg_write_limit, WriteLimitType}; -use risingwave_common::catalog::TableId; pub(crate) use utils::*; type Snapshot = ArcSwap; @@ -511,12 +510,10 @@ impl HummockManager { } for group in &compaction_groups { - for table_id in &group.member_table_ids { - self.register_single_table_fragments(TableId::new(*table_id), group.id) - .await - .unwrap(); + let mut pairs = vec![]; + for table_id in group.member_table_ids.clone() { + pairs.push((table_id as StateTableId, group.id)); } - tracing::info!("Registered table ids {:?}", group.member_table_ids); let group_config = group.compaction_config.clone().unwrap(); self.compaction_group_manager .write() @@ -524,6 +521,8 @@ impl HummockManager { .init_compaction_config_for_replay(group.id, group_config) .await .unwrap(); + self.register_table_ids_for_test(&pairs).await?; + tracing::info!("Registered table ids {:?}", pairs); } // Notify that tables have created diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 4fd988554324..fbf0e7cd9fa7 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -512,6 +512,8 @@ async fn test_hummock_manager_basic() { ); let mut epoch = test_epoch(1); + let mut register_log_count = 0; + let mut commit_log_count = 0; let commit_one = |epoch: HummockEpoch, hummock_manager: HummockManagerRef| async move { let original_tables = generate_test_tables(test_epoch(epoch), get_sst_ids(&hummock_manager, 2).await); @@ -531,13 +533,17 @@ async fn test_hummock_manager_basic() { }; commit_one(epoch, hummock_manager.clone()).await; + register_log_count += 1; + commit_log_count += 1; epoch.inc_epoch(); let init_version_id = FIRST_VERSION_ID; - let version_id1 = hummock_manager.get_current_version().await.id; // increased version id - assert!(version_id1 > init_version_id); + assert_eq!( + hummock_manager.get_current_version().await.id, + init_version_id + commit_log_count + register_log_count + ); // min pinned version id if no clients assert_eq!( @@ -556,25 +562,31 @@ async fn test_hummock_manager_basic() { // should pin latest because u64::MAX let version = hummock_manager.pin_version(context_id_1).await.unwrap(); - assert_eq!(version.id, version_id1); + assert_eq!( + version.id, + init_version_id + commit_log_count + register_log_count + ); assert_eq!( hummock_manager.get_min_pinned_version_id().await, - version_id1 + init_version_id + commit_log_count + register_log_count ); } commit_one(epoch, hummock_manager.clone()).await; - - let version_id2 = hummock_manager.get_current_version().await.id; + commit_log_count += 1; + register_log_count += 1; for _ in 0..2 { // should pin latest because deltas cannot contain INVALID_EPOCH let version = hummock_manager.pin_version(context_id_2).await.unwrap(); - assert_eq!(version.id, version_id2); + assert_eq!( + version.id, + init_version_id + commit_log_count + register_log_count + ); // pinned by context_id_1 assert_eq!( hummock_manager.get_min_pinned_version_id().await, - version_id1, + init_version_id + commit_log_count + register_log_count - 2, ); } // objects_to_delete is always empty because no compaction is ever invoked. @@ -588,7 +600,7 @@ async fn test_hummock_manager_basic() { ); assert_eq!( hummock_manager.create_version_checkpoint(1).await.unwrap(), - version_id2 - init_version_id + commit_log_count + register_log_count ); assert!(hummock_manager.get_objects_to_delete().is_empty()); assert_eq!( @@ -596,7 +608,7 @@ async fn test_hummock_manager_basic() { .delete_version_deltas(usize::MAX) .await .unwrap(), - ((version_id2 - init_version_id) as usize, 0) + ((commit_log_count + register_log_count) as usize, 0) ); hummock_manager .unpin_version_before(context_id_1, u64::MAX) @@ -604,7 +616,7 @@ async fn test_hummock_manager_basic() { .unwrap(); assert_eq!( hummock_manager.get_min_pinned_version_id().await, - version_id2 + init_version_id + commit_log_count + register_log_count ); hummock_manager .unpin_version_before(context_id_2, u64::MAX) @@ -1130,7 +1142,7 @@ async fn test_extend_objects_to_delete() { // Checkpoint assert_eq!( hummock_manager.create_version_checkpoint(1).await.unwrap(), - 8 + 6 ); assert_eq!( hummock_manager.get_objects_to_delete().len(), @@ -1327,11 +1339,11 @@ async fn test_split_compaction_group_on_commit() { let (_env, hummock_manager, _, worker_node) = setup_compute_env(80).await; let context_id = worker_node.id; hummock_manager - .register_table_ids(&[(100, 2)]) + .register_table_ids_for_test(&[(100, 2)]) .await .unwrap(); hummock_manager - .register_table_ids(&[(101, 3)]) + .register_table_ids_for_test(&[(101, 3)]) .await .unwrap(); let sst_1 = ExtendedSstableInfo { @@ -1413,11 +1425,11 @@ async fn test_split_compaction_group_on_demand_basic() { ); hummock_manager - .register_table_ids(&[(100, 2)]) + .register_table_ids_for_test(&[(100, 2)]) .await .unwrap(); hummock_manager - .register_table_ids(&[(101, 2)]) + .register_table_ids_for_test(&[(101, 2)]) .await .unwrap(); let sst_1 = ExtendedSstableInfo { @@ -1477,7 +1489,7 @@ async fn test_split_compaction_group_on_demand_basic() { // Now group 2 has member tables [100,101,102], so split [100, 101] can succeed even though // there is no data of 102. hummock_manager - .register_table_ids(&[(102, 2)]) + .register_table_ids_for_test(&[(102, 2)]) .await .unwrap(); @@ -1529,11 +1541,11 @@ async fn test_split_compaction_group_on_demand_non_trivial() { table_stats: Default::default(), }; hummock_manager - .register_table_ids(&[(100, 2)]) + .register_table_ids_for_test(&[(100, 2)]) .await .unwrap(); hummock_manager - .register_table_ids(&[(101, 2)]) + .register_table_ids_for_test(&[(101, 2)]) .await .unwrap(); hummock_manager @@ -1591,11 +1603,11 @@ async fn test_split_compaction_group_trivial_expired() { hummock_manager.compactor_manager.add_compactor(context_id); hummock_manager - .register_table_ids(&[(100, 2)]) + .register_table_ids_for_test(&[(100, 2)]) .await .unwrap(); hummock_manager - .register_table_ids(&[(101, 2)]) + .register_table_ids_for_test(&[(101, 2)]) .await .unwrap(); let sst_1 = ExtendedSstableInfo { @@ -1657,7 +1669,7 @@ async fn test_split_compaction_group_trivial_expired() { // Now group 2 has member tables [100,101,102], so split [100, 101] can succeed even though // there is no data of 102. hummock_manager - .register_table_ids(&[(102, 2)]) + .register_table_ids_for_test(&[(102, 2)]) .await .unwrap(); let task = hummock_manager @@ -1753,11 +1765,11 @@ async fn test_split_compaction_group_on_demand_bottom_levels() { let context_id = worker_node.id; hummock_manager - .register_table_ids(&[(100, 2)]) + .register_table_ids_for_test(&[(100, 2)]) .await .unwrap(); hummock_manager - .register_table_ids(&[(101, 2)]) + .register_table_ids_for_test(&[(101, 2)]) .await .unwrap(); @@ -1893,11 +1905,11 @@ async fn test_compaction_task_expiration_due_to_split_group() { let context_id = worker_node.id; hummock_manager - .register_table_ids(&[(100, 2)]) + .register_table_ids_for_test(&[(100, 2)]) .await .unwrap(); hummock_manager - .register_table_ids(&[(101, 2)]) + .register_table_ids_for_test(&[(101, 2)]) .await .unwrap(); let sst_1 = ExtendedSstableInfo { @@ -1984,15 +1996,15 @@ async fn test_move_tables_between_compaction_group() { let context_id = worker_node.id; hummock_manager - .register_table_ids(&[(100, 2)]) + .register_table_ids_for_test(&[(100, 2)]) .await .unwrap(); hummock_manager - .register_table_ids(&[(101, 2)]) + .register_table_ids_for_test(&[(101, 2)]) .await .unwrap(); hummock_manager - .register_table_ids(&[(102, 2)]) + .register_table_ids_for_test(&[(102, 2)]) .await .unwrap(); let sst_1 = gen_extend_sstable_info(10, 2, 1, vec![100, 101, 102]); @@ -2166,11 +2178,11 @@ async fn test_partition_level() { let context_id = worker_node.id; hummock_manager - .register_table_ids(&[(100, 2)]) + .register_table_ids_for_test(&[(100, 2)]) .await .unwrap(); hummock_manager - .register_table_ids(&[(101, 2)]) + .register_table_ids_for_test(&[(101, 2)]) .await .unwrap(); let sst_1 = gen_extend_sstable_info(10, 2, 1, vec![100, 101]); diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index f168a1fc013c..e5f2ba4b325b 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -158,6 +158,16 @@ impl<'a, 'b> SingleDeltaTransaction<'a, 'b> { pub(super) fn pre_apply(mut self) { self.version_txn.pre_apply(self.delta.take().unwrap()); } + + pub(super) fn with_latest_version( + &mut self, + f: impl FnOnce(&HummockVersion, &mut HummockVersionDelta), + ) { + f( + self.version_txn.latest_version(), + self.delta.as_mut().expect("should exist"), + ) + } } impl<'a, 'b> Deref for SingleDeltaTransaction<'a, 'b> { diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index 071867b4579c..2e5723e6361f 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -36,7 +36,6 @@ use risingwave_pb::hummock::{ HummockVersionStats, SstableInfo, TableStats, }; use risingwave_pb::meta::subscribe_response::{Info, Operation}; -use tracing::warn; use super::check_cg_write_limit; use crate::hummock::error::Result; @@ -44,10 +43,11 @@ use crate::hummock::manager::checkpoint::HummockVersionCheckpoint; use crate::hummock::manager::commit_multi_var; use crate::hummock::manager::context::ContextInfo; use crate::hummock::manager::gc::DeleteObjectTracker; +use crate::hummock::manager::transaction::HummockVersionTransaction; use crate::hummock::metrics_utils::{trigger_write_stop_stats, LocalTableMetrics}; use crate::hummock::model::CompactionGroup; use crate::hummock::HummockManager; -use crate::model::{BTreeMapEntryTransaction, VarTransaction}; +use crate::model::VarTransaction; use crate::MetaResult; #[derive(Default)] @@ -249,30 +249,25 @@ impl HummockManager { Ok(()) } - pub async fn may_fill_backward_snapshot_group( - &self, - existing_table_fragment_state_tables: &HashMap>, - ) -> Result<()> { + pub async fn may_fill_backward_state_table_info(&self) -> Result<()> { let mut versioning = self.versioning.write().await; - if let Some(new_version_delta) = versioning + if versioning .current_version - .gen_fill_backward_compatibility_snapshot_group_delta( - existing_table_fragment_state_tables, - ) + .need_fill_backward_compatible_state_table_info_delta() { - warn!( - ?new_version_delta, - "fill snapshot group for backward compatibility" - ); - let mut new_version: HummockVersion = versioning.current_version.clone(); - let new_version_delta = BTreeMapEntryTransaction::new_insert( + let versioning: &mut Versioning = &mut versioning; + let mut version = HummockVersionTransaction::new( + &mut versioning.current_version, &mut versioning.hummock_version_deltas, - new_version_delta.id, - new_version_delta, + self.env.notification_manager(), + &self.metrics, ); - new_version.apply_version_delta(&new_version_delta); - commit_multi_var!(self.meta_store_ref(), new_version_delta)?; - versioning.current_version = new_version; + let mut new_version_delta = version.new_delta(); + new_version_delta.with_latest_version(|version, delta| { + version.may_fill_backward_compatible_state_table_info_delta(delta) + }); + new_version_delta.pre_apply(); + commit_multi_var!(self.meta_store_ref(), version)?; } Ok(()) } @@ -360,7 +355,7 @@ pub(super) fn create_init_version(default_compaction_config: CompactionConfig) - safe_epoch: INVALID_EPOCH, table_watermarks: HashMap::new(), table_change_log: HashMap::new(), - snapshot_groups: HashMap::new(), + state_table_info: HashMap::new(), }; for group_id in [ StaticCompactionGroupId::StateDefault as CompactionGroupId, diff --git a/src/meta/src/hummock/test_utils.rs b/src/meta/src/hummock/test_utils.rs index eea5c4bd084e..0090cceeed1c 100644 --- a/src/meta/src/hummock/test_utils.rs +++ b/src/meta/src/hummock/test_utils.rs @@ -16,7 +16,6 @@ use std::sync::Arc; use std::time::Duration; use itertools::Itertools; -use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::test_epoch; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::key::key_with_epoch; @@ -236,25 +235,23 @@ pub async fn register_table_ids_to_compaction_group( table_ids: &[u32], compaction_group_id: CompactionGroupId, ) { - for table_id in table_ids { - hummock_manager_ref - .register_single_table_fragments(TableId::new(*table_id), compaction_group_id) - .await - .unwrap(); - } + hummock_manager_ref + .register_table_ids_for_test( + &table_ids + .iter() + .map(|table_id| (*table_id, compaction_group_id)) + .collect_vec(), + ) + .await + .unwrap(); } pub async fn unregister_table_ids_from_compaction_group( hummock_manager_ref: &HummockManager, - fragment_tables_ids: &[u32], + table_ids: &[u32], ) { hummock_manager_ref - .unregister_table_fragments_ids( - fragment_tables_ids - .iter() - .map(|table_id| TableId::new(*table_id)) - .collect(), - ) + .unregister_table_ids(table_ids) .await .unwrap(); } diff --git a/src/meta/src/hummock/vacuum.rs b/src/meta/src/hummock/vacuum.rs index 71e323630f2b..6cde13507836 100644 --- a/src/meta/src/hummock/vacuum.rs +++ b/src/meta/src/hummock/vacuum.rs @@ -234,7 +234,7 @@ mod tests { let sst_infos = add_test_tables(hummock_manager.as_ref(), context_id).await; assert_eq!(vacuum.vacuum_metadata().await.unwrap(), 0); hummock_manager.create_version_checkpoint(1).await.unwrap(); - assert_eq!(vacuum.vacuum_metadata().await.unwrap(), 8); + assert_eq!(vacuum.vacuum_metadata().await.unwrap(), 6); assert_eq!(vacuum.vacuum_metadata().await.unwrap(), 0); assert!(hummock_manager.get_objects_to_delete().is_empty()); diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 51674e78d118..241a47941755 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -534,44 +534,6 @@ impl MetadataManager { } } - pub async fn get_table_fragment_state_table_ids( - &self, - ) -> MetaResult>> { - Ok(match &self { - MetadataManager::V1(mgr) => mgr - .fragment_manager - .list_table_fragments() - .await - .into_iter() - .map(|table_fragment| { - ( - table_fragment.table_id().table_id, - table_fragment - .fragments() - .flat_map(|fragment| fragment.state_table_ids.iter().cloned()) - .collect(), - ) - }) - .collect(), - MetadataManager::V2(mgr) => mgr - .catalog_controller - .table_fragments() - .await? - .into_values() - .map(|table_fragments| { - ( - table_fragments.table_id, - table_fragments - .fragments - .values() - .flat_map(|fragment| fragment.state_table_ids.iter().cloned()) - .collect(), - ) - }) - .collect(), - }) - } - pub async fn get_downstream_chain_fragments( &self, job_id: u32, diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 3cb7f1660bbc..702c607afec9 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -26,7 +26,7 @@ use risingwave_pb::hummock::table_watermarks::PbEpochNewWatermarks; use risingwave_pb::hummock::{ CompactionConfig, CompatibilityVersion, GroupConstruct, GroupDestroy, GroupMetaChange, GroupTableChange, Level, LevelType, OverlappingLevel, PbLevelType, PbTableWatermarks, - SstableInfo, + SstableInfo, StateTableInfo, }; use tracing::warn; @@ -36,7 +36,7 @@ use crate::compaction_group::StaticCompactionGroupId; use crate::key_range::KeyRangeCommon; use crate::prost_key_range::KeyRangeExt; use crate::table_watermark::{TableWatermarks, VnodeWatermark}; -use crate::version::{HummockVersion, HummockVersionDelta, SnapshotGroup, SnapshotGroupDelta}; +use crate::version::{HummockVersion, HummockVersionDelta}; use crate::{can_concat, CompactionGroupId, HummockSstableId, HummockSstableObjectId}; pub struct GroupDeltasSummary { @@ -484,7 +484,8 @@ impl HummockVersion { pub fn apply_version_delta(&mut self, version_delta: &HummockVersionDelta) { assert_eq!(self.id, version_delta.prev_id); - let new_committed_epoch = version_delta.max_committed_epoch > self.max_committed_epoch; + + // apply to `levels`, which is different compaction groups for (compaction_group_id, group_deltas) in &version_delta.group_deltas { let summary = summarize_group_deltas(group_deltas); if let Some(group_construct) = &summary.group_construct { @@ -589,34 +590,56 @@ impl HummockVersion { } self.id = version_delta.id; self.max_committed_epoch = version_delta.max_committed_epoch; + self.safe_epoch = version_delta.safe_epoch; + + // apply to table watermark - let mut modified_table_watermarks: HashMap = HashMap::new(); + // Store the table watermarks that needs to be updated. None means to remove the table watermark of the table id + let mut modified_table_watermarks: HashMap> = + HashMap::new(); + // apply to table watermark for (table_id, table_watermarks) in &version_delta.new_table_watermarks { if let Some(current_table_watermarks) = self.table_watermarks.get(table_id) { - let mut current_table_watermarks = (**current_table_watermarks).clone(); - current_table_watermarks.apply_new_table_watermarks(table_watermarks); - modified_table_watermarks.insert(*table_id, current_table_watermarks); + if version_delta.removed_table_ids.contains(table_id) { + modified_table_watermarks.insert(*table_id, None); + } else { + let mut current_table_watermarks = (**current_table_watermarks).clone(); + current_table_watermarks.apply_new_table_watermarks(table_watermarks); + modified_table_watermarks.insert(*table_id, Some(current_table_watermarks)); + } } else { - modified_table_watermarks.insert(*table_id, table_watermarks.clone()); + modified_table_watermarks.insert(*table_id, Some(table_watermarks.clone())); } } - if version_delta.safe_epoch != self.safe_epoch { - assert!(version_delta.safe_epoch > self.safe_epoch); - for (table_id, table_watermarks) in &self.table_watermarks { - let table_watermarks = modified_table_watermarks - .entry(*table_id) - .or_insert_with(|| (**table_watermarks).clone()); + for (table_id, table_watermarks) in &self.table_watermarks { + if let Some(table_delta) = version_delta.state_table_info_delta.get(table_id) + && let Some(prev_table) = self.state_table_info.get(table_id) + && table_delta.safe_epoch > prev_table.safe_epoch + { + // safe epoch has progressed, need further clear. + } else { + // safe epoch not progressed. No need to truncate + continue; + } + let table_watermarks = modified_table_watermarks + .entry(*table_id) + .or_insert_with(|| Some((**table_watermarks).clone())); + if let Some(table_watermarks) = table_watermarks { table_watermarks.clear_stale_epoch_watermark(version_delta.safe_epoch); } - self.safe_epoch = version_delta.safe_epoch; } - + // apply the staging table watermark to hummock version for (table_id, table_watermarks) in modified_table_watermarks { - self.table_watermarks - .insert(table_id, Arc::new(table_watermarks)); + if let Some(table_watermarks) = table_watermarks { + self.table_watermarks + .insert(table_id, Arc::new(table_watermarks)); + } else { + self.table_watermarks.remove(&table_id); + } } + // apply to table change log for (table_id, change_log_delta) in &version_delta.change_log_delta { let new_change_log = change_log_delta.new_log.as_ref().unwrap(); match self.table_change_log.entry(*table_id) { @@ -636,89 +659,47 @@ impl HummockVersion { }; } - for table_id in &version_delta.removed_table_ids { - let _ = self.table_watermarks.remove(table_id); - let _ = self.table_change_log.remove(table_id); - } - // If a table has no new change log entry (even an empty one), it means we have stopped maintained - // the change log for the table - if new_committed_epoch { - self.table_change_log.retain(|table_id, _| { - let contains = version_delta.change_log_delta.contains_key(table_id); - if !contains { - warn!( + // the change log for the table, and then we will remove the table change log. + // The table change log will also be removed when the table id is removed. + self.table_change_log.retain(|table_id, _| { + if version_delta.removed_table_ids.contains(table_id) { + return false; + } + if let Some(table_info_delta) = version_delta.state_table_info_delta.get(table_id) + && let Some(prev_table_info) = self.state_table_info.get(table_id) && table_info_delta.committed_epoch > prev_table_info.committed_epoch { + // the table exists previously, and its committed epoch has progressed. + } else { + // otherwise, the table change log should be kept anyway + return true; + } + let contains = version_delta.change_log_delta.contains_key(table_id); + if !contains { + warn!( ?table_id, max_committed_epoch = version_delta.max_committed_epoch, "table change log dropped due to no further change log at newly committed epoch", ); - } - contains - }); - } + } + contains + }); + // truncate the remaining table change log for (table_id, change_log_delta) in &version_delta.change_log_delta { if let Some(change_log) = self.table_change_log.get_mut(table_id) { change_log.truncate(change_log_delta.truncate_epoch); } } - for (table_fragments_id, group_delta) in &version_delta.snapshot_group_delta { - match group_delta { - SnapshotGroupDelta::NewSnapshotGroup { - member_table_ids, - committed_epoch, - safe_epoch, - } => { - if let Some(prev_group) = self.snapshot_groups.insert( - *table_fragments_id, - SnapshotGroup { - table_fragments_id: *table_fragments_id, - committed_epoch: *committed_epoch, - safe_epoch: *safe_epoch, - member_table_ids: member_table_ids.clone(), - }, - ) { - panic!( - "add duplicate snapshot group: {:?} {:?} {} {} {:?}", - table_fragments_id, - member_table_ids, - committed_epoch, - safe_epoch, - prev_group - ); - } - } - SnapshotGroupDelta::NewCommittedEpoch(new_committed_epoch) => { - if let Some(group) = self.snapshot_groups.get_mut(table_fragments_id) { - assert!( - *new_committed_epoch >= group.committed_epoch, - "snapshot group {:?} has regressed committed epoch {}, prev is {}", - table_fragments_id, - new_committed_epoch, - group.committed_epoch, - ); - group.committed_epoch = *new_committed_epoch; - } - } - SnapshotGroupDelta::NewSafeEpoch(new_safe_epoch) => { - if let Some(group) = self.snapshot_groups.get_mut(table_fragments_id) { - assert!( - *new_safe_epoch >= group.safe_epoch, - "snapshot group {:?} has regressed safe epoch {}, prev is {}", - table_fragments_id, - new_safe_epoch, - group.safe_epoch - ); - group.safe_epoch = *new_safe_epoch; - } - } - SnapshotGroupDelta::Destroy => { - if self.snapshot_groups.remove(table_fragments_id).is_none() { - warn!(?table_fragments_id, "remove non-existing snapshot group"); - } - } - } + // apply the state table info delta + for (table_id, delta) in &version_delta.state_table_info_delta { + self.state_table_info.insert( + *table_id, + StateTableInfo { + committed_epoch: delta.committed_epoch, + safe_epoch: delta.safe_epoch, + }, + ); } } @@ -1161,9 +1142,9 @@ pub fn build_version_delta_after_version(version: &HummockVersion) -> HummockVer max_committed_epoch: version.max_committed_epoch, group_deltas: Default::default(), new_table_watermarks: HashMap::new(), - removed_table_ids: vec![], + removed_table_ids: HashSet::new(), change_log_delta: HashMap::new(), - snapshot_group_delta: Default::default(), + state_table_info_delta: Default::default(), } } diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 1181dea157bc..ba9ffdcd2143 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -22,51 +22,14 @@ use risingwave_pb::hummock::group_delta::DeltaType; use risingwave_pb::hummock::hummock_version::Levels as PbLevels; use risingwave_pb::hummock::hummock_version_delta::{ChangeLogDelta, GroupDeltas as PbGroupDeltas}; use risingwave_pb::hummock::{ - snapshot_group_delta, HummockVersion as PbHummockVersion, - HummockVersionDelta as PbHummockVersionDelta, SnapshotGroup as PbSnapshotGroup, - SnapshotGroupDelta as PbSnapshotGroupDelta, SstableInfo, + HummockVersion as PbHummockVersion, HummockVersionDelta as PbHummockVersionDelta, SstableInfo, + StateTableInfo as PbStateTableInfo, StateTableInfoDelta, }; use crate::change_log::TableChangeLog; use crate::table_watermark::TableWatermarks; use crate::{CompactionGroupId, HummockSstableObjectId, HummockVersionId}; -#[derive(Debug, Clone, PartialEq)] -pub struct SnapshotGroup { - pub table_fragments_id: TableId, - pub committed_epoch: u64, - pub safe_epoch: u64, - pub member_table_ids: HashSet, -} - -impl SnapshotGroup { - pub fn to_protobuf(&self) -> PbSnapshotGroup { - PbSnapshotGroup { - table_fragments_id: self.table_fragments_id.into(), - committed_epoch: self.committed_epoch, - safe_epoch: self.safe_epoch, - member_table_ids: self - .member_table_ids - .iter() - .map(|table_id| table_id.table_id) - .collect(), - } - } - - pub fn from_protobuf(group: &PbSnapshotGroup) -> Self { - SnapshotGroup { - table_fragments_id: TableId::new(group.table_fragments_id), - committed_epoch: group.committed_epoch, - safe_epoch: group.safe_epoch, - member_table_ids: group - .member_table_ids - .iter() - .map(|table_id| TableId::new(*table_id)) - .collect(), - } - } -} - #[derive(Debug, Clone, PartialEq)] pub struct HummockVersion { pub id: u64, @@ -75,7 +38,7 @@ pub struct HummockVersion { pub safe_epoch: u64, pub table_watermarks: HashMap>, pub table_change_log: HashMap, - pub snapshot_groups: HashMap, + pub state_table_info: HashMap, } impl Default for HummockVersion { @@ -127,15 +90,10 @@ impl HummockVersion { ) }) .collect(), - snapshot_groups: pb_version - .snapshot_groups + state_table_info: pb_version + .state_table_info .iter() - .map(|(table_fragments_id, group)| { - ( - TableId::new(*table_fragments_id), - SnapshotGroup::from_protobuf(group), - ) - }) + .map(|(table_id, info)| (TableId::new(*table_id), info.clone())) .collect(), } } @@ -160,10 +118,10 @@ impl HummockVersion { .iter() .map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf())) .collect(), - snapshot_groups: self - .snapshot_groups + state_table_info: self + .state_table_info .iter() - .map(|(group_id, group)| ((*group_id).into(), group.to_protobuf())) + .map(|(table_id, info)| (table_id.table_id(), info.clone())) .collect(), } } @@ -187,107 +145,37 @@ impl HummockVersion { self.id + 1 } - fn need_fill_backward_compatibility_snapshot_group(&self) -> bool { - let has_prev_table = self - .levels - .values() - .any(|group| !group.member_table_ids.is_empty()); - has_prev_table && self.snapshot_groups.is_empty() + pub fn need_fill_backward_compatible_state_table_info_delta(&self) -> bool { + // state_table_info is not previously filled, but there previously exists some tables + self.state_table_info.is_empty() + && self + .levels + .values() + .any(|group| !group.member_table_ids.is_empty()) } - pub fn gen_fill_backward_compatibility_snapshot_group_delta( + pub fn may_fill_backward_compatible_state_table_info_delta( &self, - existing_table_fragment_state_tables: &HashMap>, - ) -> Option { - if existing_table_fragment_state_tables.is_empty() - || !self.need_fill_backward_compatibility_snapshot_group() - { - return None; - } - let snapshot_group_delta = existing_table_fragment_state_tables - .iter() - .map(|(table_fragments_id, state_table_ids)| { - ( - TableId::new(*table_fragments_id), - SnapshotGroupDelta::NewSnapshotGroup { - member_table_ids: state_table_ids - .iter() - .map(|table_id| TableId::new(*table_id)) - .collect(), - committed_epoch: self.max_committed_epoch, - safe_epoch: self.safe_epoch, - }, - ) - }) - .collect(); - let delta = HummockVersionDelta { - id: self.id + 1, - prev_id: self.id, - max_committed_epoch: self.max_committed_epoch, - safe_epoch: self.safe_epoch, - snapshot_group_delta, - ..Default::default() - }; - Some(delta) - } -} - -#[derive(Debug, PartialEq, Clone)] -pub enum SnapshotGroupDelta { - NewSnapshotGroup { - member_table_ids: HashSet, - committed_epoch: u64, - safe_epoch: u64, - }, - NewCommittedEpoch(u64), - NewSafeEpoch(u64), - Destroy, -} - -impl SnapshotGroupDelta { - pub fn to_protobuf(&self) -> PbSnapshotGroupDelta { - let delta = match self { - SnapshotGroupDelta::NewSnapshotGroup { - member_table_ids, - committed_epoch, - safe_epoch, - } => snapshot_group_delta::Delta::NewSnapshotGroup( - snapshot_group_delta::NewSnapshotGroup { - member_table_ids: member_table_ids - .iter() - .map(|table_id| table_id.table_id) - .collect(), - committed_epoch: *committed_epoch, - safe_epoch: *safe_epoch, - }, - ), - SnapshotGroupDelta::NewCommittedEpoch(epoch) => { - snapshot_group_delta::Delta::NewCommittedEpoch(*epoch) - } - SnapshotGroupDelta::NewSafeEpoch(epoch) => { - snapshot_group_delta::Delta::NewSafeEpoch(*epoch) - } - SnapshotGroupDelta::Destroy => { - snapshot_group_delta::Delta::Destroy(snapshot_group_delta::DestroySnapshotGroup {}) - } - }; - PbSnapshotGroupDelta { delta: Some(delta) } - } - - pub fn from_protobuf(delta: &PbSnapshotGroupDelta) -> Self { - match delta.delta.as_ref().unwrap() { - snapshot_group_delta::Delta::NewSnapshotGroup(group) => Self::NewSnapshotGroup { - member_table_ids: HashSet::from_iter( - group.member_table_ids.iter().cloned().map(TableId::new), - ), - committed_epoch: group.committed_epoch, - safe_epoch: group.safe_epoch, - }, - snapshot_group_delta::Delta::NewCommittedEpoch(epoch) => { - Self::NewCommittedEpoch(*epoch) + delta: &mut HummockVersionDelta, + ) { + for (cg_id, group) in &self.levels { + for table_id in &group.member_table_ids { + assert!( + delta + .state_table_info_delta + .insert( + TableId::new(*table_id), + StateTableInfoDelta { + committed_epoch: self.max_committed_epoch, + safe_epoch: self.safe_epoch, + } + ) + .is_none(), + "duplicate table id {} in cg {}", + table_id, + cg_id + ); } - snapshot_group_delta::Delta::NewSafeEpoch(epoch) => Self::NewSafeEpoch(*epoch), - snapshot_group_delta::Delta::Destroy(_) => Self::Destroy, } } } @@ -301,9 +189,9 @@ pub struct HummockVersionDelta { pub safe_epoch: u64, pub trivial_move: bool, pub new_table_watermarks: HashMap, - pub removed_table_ids: Vec, + pub removed_table_ids: HashSet, pub change_log_delta: HashMap, - pub snapshot_group_delta: HashMap, + pub state_table_info_delta: HashMap, } impl Default for HummockVersionDelta { @@ -361,15 +249,10 @@ impl HummockVersionDelta { ) }) .collect(), - snapshot_group_delta: delta - .snapshot_group_delta + state_table_info_delta: delta + .state_table_info_delta .iter() - .map(|(table_fragments_id, delta)| { - ( - TableId::new(*table_fragments_id), - SnapshotGroupDelta::from_protobuf(delta), - ) - }) + .map(|(table_id, delta)| (TableId::new(*table_id), delta.clone())) .collect(), } } @@ -397,10 +280,10 @@ impl HummockVersionDelta { .iter() .map(|(table_id, log_delta)| (table_id.table_id, log_delta.clone())) .collect(), - snapshot_group_delta: self - .snapshot_group_delta + state_table_info_delta: self + .state_table_info_delta .iter() - .map(|(group_id, delta)| ((*group_id).into(), delta.to_protobuf())) + .map(|(table_id, delta)| (table_id.table_id, delta.clone())) .collect(), } } diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 4e44c1a09fd6..9831f5744856 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -34,7 +34,6 @@ use risingwave_hummock_sdk::key_range::KeyRangeCommon; use risingwave_hummock_sdk::table_watermark::{ TableWatermarksIndex, VnodeWatermark, WatermarkDirection, }; -use risingwave_hummock_sdk::version::HummockVersionDelta; use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo}; use risingwave_pb::hummock::{EpochNewChangeLog, LevelType, SstableInfo}; use sync_point::sync_point; @@ -136,7 +135,6 @@ pub enum StagingData { pub enum VersionUpdate { /// a new staging data entry will be added. Staging(StagingData), - CommittedDelta(Box), CommittedSnapshot(CommittedVersion), NewTableWatermark { direction: WatermarkDirection, @@ -344,10 +342,6 @@ impl HummockReadVersion { } }, - VersionUpdate::CommittedDelta(_) => { - unimplemented!() - } - VersionUpdate::CommittedSnapshot(committed_version) => { let max_committed_epoch = committed_version.max_committed_epoch(); self.committed = committed_version;