diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version.rs index cb3bba6f899a..e3c0578ac686 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_version.rs @@ -220,6 +220,7 @@ async fn read_hummock_snapshot_groups( let version = reader.meta_client.get_hummock_current_version().await?; Ok(version .state_table_info + .info() .iter() .map(|(table_id, info)| RwHummockSnapshot { table_id: table_id.table_id as _, diff --git a/src/meta/src/hummock/manager/compaction.rs b/src/meta/src/hummock/manager/compaction.rs index 5cff0ff8a07b..e485b138f491 100644 --- a/src/meta/src/hummock/manager/compaction.rs +++ b/src/meta/src/hummock/manager/compaction.rs @@ -175,6 +175,7 @@ impl<'a> HummockVersionTransaction<'a> { version_delta.state_table_info_delta = version_delta .latest_version() .state_table_info + .info() .iter() .map(|(table_id, info)| { ( diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index 2e5723e6361f..4200b10e0b44 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -24,7 +24,7 @@ use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ }; use risingwave_hummock_sdk::compaction_group::{StateTableId, StaticCompactionGroupId}; use risingwave_hummock_sdk::table_stats::add_prost_table_stats_map; -use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; +use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta, HummockVersionStateTableInfo}; use risingwave_hummock_sdk::{ CompactionGroupId, HummockContextId, HummockEpoch, HummockSstableObjectId, HummockVersionId, FIRST_VERSION_ID, @@ -355,7 +355,7 @@ pub(super) fn create_init_version(default_compaction_config: CompactionConfig) - safe_epoch: INVALID_EPOCH, table_watermarks: HashMap::new(), table_change_log: HashMap::new(), - state_table_info: HashMap::new(), + state_table_info: HummockVersionStateTableInfo::empty(), }; for group_id in [ StaticCompactionGroupId::StateDefault as CompactionGroupId, diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 702c607afec9..52f3c1cb15ca 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -26,7 +26,7 @@ use risingwave_pb::hummock::table_watermarks::PbEpochNewWatermarks; use risingwave_pb::hummock::{ CompactionConfig, CompatibilityVersion, GroupConstruct, GroupDestroy, GroupMetaChange, GroupTableChange, Level, LevelType, OverlappingLevel, PbLevelType, PbTableWatermarks, - SstableInfo, StateTableInfo, + SstableInfo, }; use tracing::warn; @@ -254,17 +254,6 @@ impl HummockVersion { } } -pub type SstSplitInfo = ( - // Object id. - HummockSstableObjectId, - // SST id. - HummockSstableId, - // Old SST id in parent group. - HummockSstableId, - // New SST id in parent group. - HummockSstableId, -); - impl HummockVersion { pub fn count_new_ssts_in_group_split( &self, @@ -485,6 +474,11 @@ impl HummockVersion { pub fn apply_version_delta(&mut self, version_delta: &HummockVersionDelta) { assert_eq!(self.id, version_delta.prev_id); + let changed_table_info = self.state_table_info.apply_delta( + &version_delta.state_table_info_delta, + &version_delta.removed_table_ids, + ); + // apply to `levels`, which is different compaction groups for (compaction_group_id, group_deltas) in &version_delta.group_deltas { let summary = summarize_group_deltas(group_deltas); @@ -614,7 +608,7 @@ impl HummockVersion { } for (table_id, table_watermarks) in &self.table_watermarks { if let Some(table_delta) = version_delta.state_table_info_delta.get(table_id) - && let Some(prev_table) = self.state_table_info.get(table_id) + && let Some(Some(prev_table)) = changed_table_info.get(table_id) && table_delta.safe_epoch > prev_table.safe_epoch { // safe epoch has progressed, need further clear. @@ -667,7 +661,7 @@ impl HummockVersion { return false; } if let Some(table_info_delta) = version_delta.state_table_info_delta.get(table_id) - && let Some(prev_table_info) = self.state_table_info.get(table_id) && table_info_delta.committed_epoch > prev_table_info.committed_epoch { + && let Some(Some(prev_table_info)) = changed_table_info.get(table_id) && table_info_delta.committed_epoch > prev_table_info.committed_epoch { // the table exists previously, and its committed epoch has progressed. } else { // otherwise, the table change log should be kept anyway @@ -690,17 +684,6 @@ impl HummockVersion { change_log.truncate(change_log_delta.truncate_epoch); } } - - // apply the state table info delta - for (table_id, delta) in &version_delta.state_table_info_delta { - self.state_table_info.insert( - *table_id, - StateTableInfo { - committed_epoch: delta.committed_epoch, - safe_epoch: delta.safe_epoch, - }, - ); - } } pub fn build_compaction_group_info(&self) -> HashMap { diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index ba9ffdcd2143..a82082e584b3 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -12,10 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; -use std::mem::size_of; +use std::mem::{replace, size_of}; use std::sync::Arc; +use itertools::Itertools; use prost::Message; use risingwave_common::catalog::TableId; use risingwave_pb::hummock::group_delta::DeltaType; @@ -23,13 +25,85 @@ use risingwave_pb::hummock::hummock_version::Levels as PbLevels; use risingwave_pb::hummock::hummock_version_delta::{ChangeLogDelta, GroupDeltas as PbGroupDeltas}; use risingwave_pb::hummock::{ HummockVersion as PbHummockVersion, HummockVersionDelta as PbHummockVersionDelta, SstableInfo, - StateTableInfo as PbStateTableInfo, StateTableInfoDelta, + StateTableInfo as PbStateTableInfo, StateTableInfo, StateTableInfoDelta, }; +use tracing::{info, warn}; use crate::change_log::TableChangeLog; use crate::table_watermark::TableWatermarks; use crate::{CompactionGroupId, HummockSstableObjectId, HummockVersionId}; +#[derive(Debug, Clone, PartialEq)] +pub struct HummockVersionStateTableInfo { + state_table_info: HashMap, +} + +impl HummockVersionStateTableInfo { + pub fn empty() -> Self { + Self { + state_table_info: HashMap::new(), + } + } + + pub fn from_protobuf(state_table_info: &HashMap) -> Self { + let state_table_info = state_table_info + .iter() + .map(|(table_id, info)| (TableId::new(*table_id), info.clone())) + .collect(); + Self { state_table_info } + } + + pub fn to_protobuf(&self) -> HashMap { + self.state_table_info + .iter() + .map(|(table_id, info)| (table_id.table_id, info.clone())) + .collect() + } + + pub fn apply_delta( + &mut self, + delta: &HashMap, + removed_table_id: &HashSet, + ) -> HashMap> { + let mut changed_table = HashMap::new(); + info!(removed_table_id = ?removed_table_id.iter().map(|table_id| table_id.table_id).collect_vec(), "removing table id"); + for table_id in removed_table_id { + if let Some(prev_info) = self.state_table_info.remove(table_id) { + assert!(changed_table.insert(*table_id, Some(prev_info)).is_none()); + } else { + warn!( + table_id = table_id.table_id, + "table to remove does not exist" + ); + } + } + for (table_id, delta) in delta { + if removed_table_id.contains(table_id) { + continue; + } + let new_info = StateTableInfo { + committed_epoch: delta.committed_epoch, + safe_epoch: delta.safe_epoch, + }; + match self.state_table_info.entry(*table_id) { + Entry::Occupied(mut entry) => { + let prev_info = replace(entry.get_mut(), new_info); + changed_table.insert(*table_id, Some(prev_info)); + } + Entry::Vacant(entry) => { + entry.insert(new_info); + changed_table.insert(*table_id, None); + } + } + } + changed_table + } + + pub fn info(&self) -> &HashMap { + &self.state_table_info + } +} + #[derive(Debug, Clone, PartialEq)] pub struct HummockVersion { pub id: u64, @@ -38,7 +112,7 @@ pub struct HummockVersion { pub safe_epoch: u64, pub table_watermarks: HashMap>, pub table_change_log: HashMap, - pub state_table_info: HashMap, + pub state_table_info: HummockVersionStateTableInfo, } impl Default for HummockVersion { @@ -90,11 +164,9 @@ impl HummockVersion { ) }) .collect(), - state_table_info: pb_version - .state_table_info - .iter() - .map(|(table_id, info)| (TableId::new(*table_id), info.clone())) - .collect(), + state_table_info: HummockVersionStateTableInfo::from_protobuf( + &pb_version.state_table_info, + ), } } @@ -118,11 +190,7 @@ impl HummockVersion { .iter() .map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf())) .collect(), - state_table_info: self - .state_table_info - .iter() - .map(|(table_id, info)| (table_id.table_id(), info.clone())) - .collect(), + state_table_info: self.state_table_info.to_protobuf(), } } @@ -147,7 +215,7 @@ impl HummockVersion { pub fn need_fill_backward_compatible_state_table_info_delta(&self) -> bool { // state_table_info is not previously filled, but there previously exists some tables - self.state_table_info.is_empty() + self.state_table_info.state_table_info.is_empty() && self .levels .values() diff --git a/src/storage/hummock_test/src/test_utils.rs b/src/storage/hummock_test/src/test_utils.rs index 9a4e674e1f8a..7c6be2eb041c 100644 --- a/src/storage/hummock_test/src/test_utils.rs +++ b/src/storage/hummock_test/src/test_utils.rs @@ -81,7 +81,7 @@ pub async fn prepare_first_valid_version( }; ( - PinnedVersion::new(hummock_version, unbounded_channel().0), + PinnedVersion::new(*hummock_version, unbounded_channel().0), tx, rx, ) diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index c04ad40ae7ed..6004f9830f20 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -717,7 +717,7 @@ impl HummockEventHandler { version_to_apply } - HummockVersionUpdate::PinnedVersion(version) => version, + HummockVersionUpdate::PinnedVersion(version) => *version, }; validate_table_key_range(&newly_pinned_version); @@ -1127,7 +1127,9 @@ mod tests { let version1 = make_new_version(epoch1); { version_update_tx - .send(HummockVersionUpdate::PinnedVersion(version1.clone())) + .send(HummockVersionUpdate::PinnedVersion(Box::new( + version1.clone(), + ))) .unwrap(); let (old_version, new_version, refill_finish_tx) = refill_task_rx.recv().await.unwrap(); assert_eq!(old_version.version(), initial_version.version()); @@ -1147,10 +1149,14 @@ mod tests { let version3 = make_new_version(epoch3); { version_update_tx - .send(HummockVersionUpdate::PinnedVersion(version2.clone())) + .send(HummockVersionUpdate::PinnedVersion(Box::new( + version2.clone(), + ))) .unwrap(); version_update_tx - .send(HummockVersionUpdate::PinnedVersion(version3.clone())) + .send(HummockVersionUpdate::PinnedVersion(Box::new( + version3.clone(), + ))) .unwrap(); let (old_version2, new_version2, _refill_finish_tx2) = refill_task_rx.recv().await.unwrap(); @@ -1180,11 +1186,15 @@ mod tests { let mut rx = send_clear(epoch5); assert_pending(&mut rx).await; version_update_tx - .send(HummockVersionUpdate::PinnedVersion(version4.clone())) + .send(HummockVersionUpdate::PinnedVersion(Box::new( + version4.clone(), + ))) .unwrap(); assert_pending(&mut rx).await; version_update_tx - .send(HummockVersionUpdate::PinnedVersion(version5.clone())) + .send(HummockVersionUpdate::PinnedVersion(Box::new( + version5.clone(), + ))) .unwrap(); rx.await.unwrap(); assert_eq!(latest_version.load().version(), &version5); diff --git a/src/storage/src/hummock/event_handler/mod.rs b/src/storage/src/hummock/event_handler/mod.rs index efbae0ac2154..8ffaa6831781 100644 --- a/src/storage/src/hummock/event_handler/mod.rs +++ b/src/storage/src/hummock/event_handler/mod.rs @@ -47,7 +47,7 @@ pub struct BufferWriteRequest { #[derive(Debug)] pub enum HummockVersionUpdate { VersionDeltas(Vec), - PinnedVersion(HummockVersion), + PinnedVersion(Box), } pub enum HummockEvent { diff --git a/src/storage/src/hummock/observer_manager.rs b/src/storage/src/hummock/observer_manager.rs index 4e10d9a52395..0725424aaca7 100644 --- a/src/storage/src/hummock/observer_manager.rs +++ b/src/storage/src/hummock/observer_manager.rs @@ -121,13 +121,13 @@ impl ObserverState for HummockObserverNode { ); let _ = self .version_update_sender - .send(HummockVersionUpdate::PinnedVersion( + .send(HummockVersionUpdate::PinnedVersion(Box::new( HummockVersion::from_rpc_protobuf( &snapshot .hummock_version .expect("should get hummock version"), ), - )) + ))) .inspect_err(|e| { tracing::error!(event = ?e.0, "unable to send full version"); }); diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 10c5e0e421ad..3108c45b7fb9 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -170,7 +170,7 @@ impl HummockStorage { observer_manager.start().await; let hummock_version = match version_update_rx.recv().await { - Some(HummockVersionUpdate::PinnedVersion(version)) => version, + Some(HummockVersionUpdate::PinnedVersion(version)) => *version, _ => unreachable!("the hummock observer manager is the first one to take the event tx. Should be full hummock version") }; @@ -653,7 +653,7 @@ impl HummockStorage { use tokio::task::yield_now; let version_id = version.id; self._version_update_sender - .send(HummockVersionUpdate::PinnedVersion(version)) + .send(HummockVersionUpdate::PinnedVersion(Box::new(version))) .unwrap(); loop { if self.pinned_version.load().id() >= version_id {