From 6d702a902ffb39245f18cd04ba12d212b894d63c Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Fri, 6 Dec 2024 10:35:09 +0800 Subject: [PATCH 1/5] refactor(meta): clean up legacy code --- src/meta/src/hummock/manager/commit_epoch.rs | 7 --- src/meta/src/hummock/manager/time_travel.rs | 61 +++---------------- .../compaction_group/hummock_version_ext.rs | 29 --------- src/storage/hummock_sdk/src/time_travel.rs | 36 ++++------- src/storage/hummock_sdk/src/version.rs | 20 ++---- 5 files changed, 23 insertions(+), 130 deletions(-) diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index e56a79307b042..cee70feb19b97 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -247,12 +247,6 @@ impl HummockManager { .time_travel_snapshot_interval_counter .saturating_add(1); } - let group_parents = version - .latest_version() - .levels - .values() - .map(|g| (g.group_id, g.parent_group_id)) - .collect(); let time_travel_tables_to_commit = table_compaction_group_mapping .iter() @@ -267,7 +261,6 @@ impl HummockManager { &txn, time_travel_version, time_travel_delta, - &group_parents, &versioning.last_time_travel_snapshot_sst_ids, time_travel_tables_to_commit, ) diff --git a/src/meta/src/hummock/manager/time_travel.rs b/src/meta/src/hummock/manager/time_travel.rs index e33d9a3e148a8..da9bfcb4f4111 100644 --- a/src/meta/src/hummock/manager/time_travel.rs +++ b/src/meta/src/hummock/manager/time_travel.rs @@ -17,7 +17,6 @@ use std::collections::{HashMap, HashSet, VecDeque}; use anyhow::anyhow; use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::Epoch; -use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::time_travel::{ refill_version, IncompleteHummockVersion, IncompleteHummockVersionDelta, @@ -380,20 +379,9 @@ impl HummockManager { txn: &DatabaseTransaction, version: Option<&HummockVersion>, delta: HummockVersionDelta, - group_parents: &HashMap, skip_sst_ids: &HashSet, tables_to_commit: impl Iterator, ) -> Result>> { - let select_groups = group_parents - .iter() - .filter_map(|(cg_id, _)| { - if should_ignore_group(find_root_group(*cg_id, group_parents)) { - None - } else { - Some(*cg_id) - } - }) - .collect::>(); async fn write_sstable_infos( mut sst_infos: impl Iterator, txn: &DatabaseTransaction, @@ -428,10 +416,7 @@ impl HummockManager { Ok(count) } - for (table_id, cg_id, committed_epoch) in tables_to_commit { - if !select_groups.contains(cg_id) { - continue; - } + for (table_id, _cg_id, committed_epoch) in tables_to_commit { let version_id: u64 = delta.id.to_u64(); let m = hummock_epoch_to_version::ActiveModel { epoch: Set(committed_epoch.try_into().unwrap()), @@ -446,15 +431,10 @@ impl HummockManager { let mut version_sst_ids = None; if let Some(version) = version { - version_sst_ids = Some( - version - .get_sst_infos_from_groups(&select_groups) - .map(|s| s.sst_id) - .collect(), - ); + version_sst_ids = Some(version.get_sst_ids()); write_sstable_infos( version - .get_sst_infos_from_groups(&select_groups) + .get_sst_infos() .filter(|s| !skip_sst_ids.contains(&s.sst_id)), txn, self.env.opts.hummock_time_travel_sst_info_insert_batch_size, @@ -465,9 +445,7 @@ impl HummockManager { version.id.to_u64(), ) .unwrap()), - version: Set((&IncompleteHummockVersion::from((version, &select_groups)) - .to_protobuf()) - .into()), + version: Set((&IncompleteHummockVersion::from(version).to_protobuf()).into()), }; hummock_time_travel_version::Entity::insert(m) .on_conflict_do_nothing() @@ -476,7 +454,7 @@ impl HummockManager { } let written = write_sstable_infos( delta - .newly_added_sst_infos(Some(&select_groups)) + .newly_added_sst_infos() .filter(|s| !skip_sst_ids.contains(&s.sst_id)), txn, self.env.opts.hummock_time_travel_sst_info_insert_batch_size, @@ -489,12 +467,9 @@ impl HummockManager { delta.id.to_u64(), ) .unwrap()), - version_delta: Set((&IncompleteHummockVersionDelta::from(( - &delta, - &select_groups, - )) - .to_protobuf()) - .into()), + version_delta: Set( + (&IncompleteHummockVersionDelta::from(&delta).to_protobuf()).into() + ), }; hummock_time_travel_delta::Entity::insert(m) .on_conflict_do_nothing() @@ -531,26 +506,6 @@ fn replay_archive( last_version } -fn find_root_group( - group_id: CompactionGroupId, - parents: &HashMap, -) -> CompactionGroupId { - let mut root = group_id; - while let Some(parent) = parents.get(&root) - && *parent != 0 - { - root = *parent; - } - root -} - -fn should_ignore_group(root_group_id: CompactionGroupId) -> bool { - // It is possible some intermediate groups has been dropped, - // so it's impossible to tell whether the root group is MaterializedView or not. - // Just treat them as MaterializedView for correctness. - root_group_id == StaticCompactionGroupId::StateDefault as CompactionGroupId -} - pub fn require_sql_meta_store_err() -> Error { Error::TimeTravel(anyhow!("require SQL meta store")) } diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index bcf99e18bfe7a..6575d69886968 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -84,35 +84,6 @@ impl HummockVersion { .map(|s| s.sst_id) } - /// `get_sst_infos_from_groups` doesn't guarantee that all returned sst info belongs to `select_group`. - /// i.e. `select_group` is just a hint. - /// We separate `get_sst_infos_from_groups` and `get_sst_infos` because `get_sst_infos_from_groups` may be further customized in the future. - pub fn get_sst_infos_from_groups<'a>( - &'a self, - select_group: &'a HashSet, - ) -> impl Iterator + 'a { - self.levels - .iter() - .filter_map(|(cg_id, level)| { - if select_group.contains(cg_id) { - Some(level) - } else { - None - } - }) - .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter())) - .flat_map(|level| level.table_infos.iter()) - .chain(self.table_change_log.values().flat_map(|change_log| { - // TODO: optimization: strip table change log - change_log.0.iter().flat_map(|epoch_change_log| { - epoch_change_log - .old_value - .iter() - .chain(epoch_change_log.new_value.iter()) - }) - })) - } - pub fn level_iter bool>( &self, compaction_group_id: CompactionGroupId, diff --git a/src/storage/hummock_sdk/src/time_travel.rs b/src/storage/hummock_sdk/src/time_travel.rs index 4f2508a5772b7..a8133cbeb7873 100644 --- a/src/storage/hummock_sdk/src/time_travel.rs +++ b/src/storage/hummock_sdk/src/time_travel.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use risingwave_pb::hummock::hummock_version::PbLevels; use risingwave_pb::hummock::hummock_version_delta::{PbChangeLogDelta, PbGroupDeltas}; @@ -89,29 +89,23 @@ fn refill_sstable_info( } /// `SStableInfo` will be stripped. -impl From<(&HummockVersion, &HashSet)> for IncompleteHummockVersion { - fn from(p: (&HummockVersion, &HashSet)) -> Self { - let (version, select_group) = p; +impl From<&HummockVersion> for IncompleteHummockVersion { + fn from(version: &HummockVersion) -> Self { #[expect(deprecated)] Self { id: version.id, levels: version .levels .iter() - .filter_map(|(group_id, levels)| { - if select_group.contains(group_id) { - Some(( - *group_id as CompactionGroupId, - PbLevels::from(levels).into(), - )) - } else { - None - } + .map(|(group_id, levels)| { + ( + *group_id as CompactionGroupId, + PbLevels::from(levels).into(), + ) }) .collect(), max_committed_epoch: version.max_committed_epoch, table_watermarks: version.table_watermarks.clone(), - // TODO: optimization: strip table change log based on select_group table_change_log: version .table_change_log .iter() @@ -135,9 +129,8 @@ impl From<(&HummockVersion, &HashSet)> for IncompleteHummockV pub type IncompleteHummockVersionDelta = HummockVersionDeltaCommon; /// `SStableInfo` will be stripped. -impl From<(&HummockVersionDelta, &HashSet)> for IncompleteHummockVersionDelta { - fn from(p: (&HummockVersionDelta, &HashSet)) -> Self { - let (delta, select_group) = p; +impl From<&HummockVersionDelta> for IncompleteHummockVersionDelta { + fn from(delta: &HummockVersionDelta) -> Self { #[expect(deprecated)] Self { id: delta.id, @@ -145,19 +138,12 @@ impl From<(&HummockVersionDelta, &HashSet)> for IncompleteHum group_deltas: delta .group_deltas .iter() - .filter_map(|(cg_id, deltas)| { - if select_group.contains(cg_id) { - Some((*cg_id, PbGroupDeltas::from(deltas).into())) - } else { - None - } - }) + .map(|(cg_id, deltas)| (*cg_id, PbGroupDeltas::from(deltas).into())) .collect(), max_committed_epoch: delta.max_committed_epoch, trivial_move: delta.trivial_move, new_table_watermarks: delta.new_table_watermarks.clone(), removed_table_ids: delta.removed_table_ids.clone(), - // TODO: optimization: strip table change log based on select_group change_log_delta: delta .change_log_delta .iter() diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 6266ee84474b3..7f4b4dafb1bf7 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -524,32 +524,20 @@ where /// Note: the result can be false positive because we only collect the set of sst object ids in the `inserted_table_infos`, /// but it is possible that the object is moved or split from other compaction groups or levels. pub fn newly_added_object_ids(&self) -> HashSet { - self.newly_added_sst_infos(None) + self.newly_added_sst_infos() .map(|sst| sst.object_id()) .collect() } pub fn newly_added_sst_ids(&self) -> HashSet { - self.newly_added_sst_infos(None) + self.newly_added_sst_infos() .map(|sst| sst.sst_id()) .collect() } - pub fn newly_added_sst_infos<'a>( - &'a self, - select_group: Option<&'a HashSet>, - ) -> impl Iterator + 'a { + pub fn newly_added_sst_infos(&self) -> impl Iterator { self.group_deltas - .iter() - .filter_map(move |(cg_id, group_deltas)| { - if let Some(select_group) = select_group - && !select_group.contains(cg_id) - { - None - } else { - Some(group_deltas) - } - }) + .values() .flat_map(|group_deltas| { group_deltas.group_deltas.iter().flat_map(|group_delta| { let sst_slice = match &group_delta { From 2040af3861adf45c8354fe3411578a8b2823cd98 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Fri, 6 Dec 2024 11:35:57 +0800 Subject: [PATCH 2/5] disable time travel in unit test --- src/common/src/system_param/mod.rs | 1 + src/meta/src/controller/system_param.rs | 17 ++--------------- src/meta/src/hummock/manager/time_travel.rs | 10 ++++++++++ 3 files changed, 13 insertions(+), 15 deletions(-) diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index 4d3a994631534..7ade9b71799ea 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -382,6 +382,7 @@ macro_rules! impl_system_params_for_test { ret.backup_storage_url = Some("memory".into()); ret.backup_storage_directory = Some("backup".into()); ret.use_new_object_prefix_strategy = Some(false); + ret.time_travel_retention_ms = Some(0); ret } }; diff --git a/src/meta/src/controller/system_param.rs b/src/meta/src/controller/system_param.rs index 4bb36c8e1962c..14f6d7c667084 100644 --- a/src/meta/src/controller/system_param.rs +++ b/src/meta/src/controller/system_param.rs @@ -19,7 +19,7 @@ use anyhow::anyhow; use risingwave_common::system_param::common::CommonHandler; use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_common::system_param::{ - check_missing_params, default, derive_missing_fields, set_system_param, + check_missing_params, derive_missing_fields, set_system_param, }; use risingwave_common::{for_all_params, key_of}; use risingwave_meta_model::prelude::SystemParameter; @@ -132,18 +132,6 @@ for_all_params!(impl_system_params_from_db); for_all_params!(impl_merge_params); for_all_params!(impl_system_params_to_models); -fn apply_hard_code_override(params: &mut PbSystemParams) { - if params - .time_travel_retention_ms - .map(|v| v == 0) - .unwrap_or(true) - { - let default_v = default::time_travel_retention_ms(); - tracing::info!("time_travel_retention_ms has been overridden to {default_v}"); - params.time_travel_retention_ms = Some(default_v); - } -} - impl SystemParamsController { pub async fn new( sql_meta_store: SqlMetaStore, @@ -152,8 +140,7 @@ impl SystemParamsController { ) -> MetaResult { let db = sql_meta_store.conn; let params = SystemParameter::find().all(&db).await?; - let mut params = merge_params(system_params_from_db(params)?, init_params); - apply_hard_code_override(&mut params); + let params = merge_params(system_params_from_db(params)?, init_params); tracing::info!(initial_params = ?SystemParamsReader::new(¶ms), "initialize system parameters"); check_missing_params(¶ms).map_err(|e| anyhow!(e))?; let ctl = Self { diff --git a/src/meta/src/hummock/manager/time_travel.rs b/src/meta/src/hummock/manager/time_travel.rs index da9bfcb4f4111..5fad8c5cc9c9d 100644 --- a/src/meta/src/hummock/manager/time_travel.rs +++ b/src/meta/src/hummock/manager/time_travel.rs @@ -16,6 +16,7 @@ use std::collections::{HashMap, HashSet, VecDeque}; use anyhow::anyhow; use risingwave_common::catalog::TableId; +use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::util::epoch::Epoch; use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::time_travel::{ @@ -382,6 +383,15 @@ impl HummockManager { skip_sst_ids: &HashSet, tables_to_commit: impl Iterator, ) -> Result>> { + if self + .env + .system_params_reader() + .await + .time_travel_retention_ms() + == 0 + { + return Ok(None); + } async fn write_sstable_infos( mut sst_infos: impl Iterator, txn: &DatabaseTransaction, From 2bfd3d4a378f44e859ad390fa98f758b033b2f67 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Fri, 6 Dec 2024 13:45:33 +0800 Subject: [PATCH 3/5] filter out SSTs by table ids --- src/meta/src/controller/catalog.rs | 35 ++++---- src/meta/src/hummock/manager/commit_epoch.rs | 10 +++ src/meta/src/hummock/manager/time_travel.rs | 35 +++++--- src/storage/hummock_sdk/src/time_travel.rs | 86 ++++++++++++++++---- 4 files changed, 126 insertions(+), 40 deletions(-) diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index b927f13c6023c..3e836e627c862 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -2799,11 +2799,6 @@ impl CatalogController { inner.list_all_state_tables().await } - pub async fn list_all_state_table_ids(&self) -> MetaResult> { - let inner = self.inner.read().await; - inner.list_all_state_table_ids().await - } - pub async fn list_readonly_table_ids(&self, schema_id: SchemaId) -> MetaResult> { let inner = self.inner.read().await; let table_ids: Vec = Table::find() @@ -3231,6 +3226,10 @@ impl CatalogController { .collect(); Ok(res) } + + pub async fn list_time_travel_table_ids(&self) -> MetaResult> { + self.inner.read().await.list_time_travel_table_ids().await + } } /// `CatalogStats` is a struct to store the statistics of all catalogs. @@ -3363,17 +3362,6 @@ impl CatalogControllerInner { .collect()) } - /// `list_all_tables` return all ids of state tables. - pub async fn list_all_state_table_ids(&self) -> MetaResult> { - let table_ids: Vec = Table::find() - .select_only() - .column(table::Column::TableId) - .into_tuple() - .all(&self.db) - .await?; - Ok(table_ids) - } - /// `list_tables` return all `CREATED` tables, `CREATING` materialized views and internal tables that belong to them. async fn list_tables(&self) -> MetaResult> { let table_objs = Table::find() @@ -3592,6 +3580,21 @@ impl CatalogControllerInner { let _ = tx.send(Err(err.clone())); } } + + pub async fn list_time_travel_table_ids(&self) -> MetaResult> { + let table_ids: Vec = Table::find() + .select_only() + .filter(table::Column::TableType.is_in(vec![ + TableType::Table, + TableType::MaterializedView, + TableType::Index, + ])) + .column(table::Column::TableId) + .into_tuple() + .all(&self.db) + .await?; + Ok(table_ids) + } } async fn update_internal_tables( diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index cee70feb19b97..0407682d0a1ad 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -255,12 +255,22 @@ impl HummockManager { .get(table_id) .map(|committed_epoch| (table_id, cg_id, *committed_epoch)) }); + let time_travel_table_ids: HashSet<_> = self + .metadata_manager + .catalog_controller + .list_time_travel_table_ids() + .await + .map_err(|e| Error::Internal(e.into()))? + .into_iter() + .map(|id| id.try_into().unwrap()) + .collect(); let mut txn = self.env.meta_store_ref().conn.begin().await?; let version_snapshot_sst_ids = self .write_time_travel_metadata( &txn, time_travel_version, time_travel_delta, + time_travel_table_ids, &versioning.last_time_travel_snapshot_sst_ids, time_travel_tables_to_commit, ) diff --git a/src/meta/src/hummock/manager/time_travel.rs b/src/meta/src/hummock/manager/time_travel.rs index 5fad8c5cc9c9d..280197c49dfb0 100644 --- a/src/meta/src/hummock/manager/time_travel.rs +++ b/src/meta/src/hummock/manager/time_travel.rs @@ -18,6 +18,7 @@ use anyhow::anyhow; use risingwave_common::catalog::TableId; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::util::epoch::Epoch; +use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::time_travel::{ refill_version, IncompleteHummockVersion, IncompleteHummockVersionDelta, @@ -380,6 +381,7 @@ impl HummockManager { txn: &DatabaseTransaction, version: Option<&HummockVersion>, delta: HummockVersionDelta, + time_travel_table_ids: HashSet, skip_sst_ids: &HashSet, tables_to_commit: impl Iterator, ) -> Result>> { @@ -443,9 +445,12 @@ impl HummockManager { if let Some(version) = version { version_sst_ids = Some(version.get_sst_ids()); write_sstable_infos( - version - .get_sst_infos() - .filter(|s| !skip_sst_ids.contains(&s.sst_id)), + version.get_sst_infos().filter(|s| { + !skip_sst_ids.contains(&s.sst_id) + && s.table_ids + .iter() + .any(|tid| time_travel_table_ids.contains(tid)) + }), txn, self.env.opts.hummock_time_travel_sst_info_insert_batch_size, ) @@ -455,7 +460,11 @@ impl HummockManager { version.id.to_u64(), ) .unwrap()), - version: Set((&IncompleteHummockVersion::from(version).to_protobuf()).into()), + version: Set( + (&IncompleteHummockVersion::from((version, &time_travel_table_ids)) + .to_protobuf()) + .into(), + ), }; hummock_time_travel_version::Entity::insert(m) .on_conflict_do_nothing() @@ -463,9 +472,12 @@ impl HummockManager { .await?; } let written = write_sstable_infos( - delta - .newly_added_sst_infos() - .filter(|s| !skip_sst_ids.contains(&s.sst_id)), + delta.newly_added_sst_infos().filter(|s| { + !skip_sst_ids.contains(&s.sst_id) + && s.table_ids + .iter() + .any(|tid| time_travel_table_ids.contains(tid)) + }), txn, self.env.opts.hummock_time_travel_sst_info_insert_batch_size, ) @@ -477,9 +489,12 @@ impl HummockManager { delta.id.to_u64(), ) .unwrap()), - version_delta: Set( - (&IncompleteHummockVersionDelta::from(&delta).to_protobuf()).into() - ), + version_delta: Set((&IncompleteHummockVersionDelta::from(( + &delta, + &time_travel_table_ids, + )) + .to_protobuf()) + .into()), }; hummock_time_travel_delta::Entity::insert(m) .on_conflict_do_nothing() diff --git a/src/storage/hummock_sdk/src/time_travel.rs b/src/storage/hummock_sdk/src/time_travel.rs index a8133cbeb7873..f3793b1f30be1 100644 --- a/src/storage/hummock_sdk/src/time_travel.rs +++ b/src/storage/hummock_sdk/src/time_travel.rs @@ -12,13 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use risingwave_pb::hummock::hummock_version::PbLevels; use risingwave_pb::hummock::hummock_version_delta::{PbChangeLogDelta, PbGroupDeltas}; -use risingwave_pb::hummock::{PbEpochNewChangeLog, PbSstableInfo}; +use risingwave_pb::hummock::{group_delta, PbEpochNewChangeLog, PbLevel, PbSstableInfo}; use crate::change_log::{TableChangeLog, TableChangeLogCommon}; +use crate::compaction_group::StateTableId; use crate::level::Level; use crate::sstable_info::SstableInfo; use crate::version::{ @@ -89,8 +90,9 @@ fn refill_sstable_info( } /// `SStableInfo` will be stripped. -impl From<&HummockVersion> for IncompleteHummockVersion { - fn from(version: &HummockVersion) -> Self { +impl From<(&HummockVersion, &HashSet)> for IncompleteHummockVersion { + fn from(p: (&HummockVersion, &HashSet)) -> Self { + let (version, time_travel_table_ids) = p; #[expect(deprecated)] Self { id: version.id, @@ -98,10 +100,8 @@ impl From<&HummockVersion> for IncompleteHummockVersion { .levels .iter() .map(|(group_id, levels)| { - ( - *group_id as CompactionGroupId, - PbLevels::from(levels).into(), - ) + let pblevels = rewrite_levels(PbLevels::from(levels), time_travel_table_ids); + (*group_id as CompactionGroupId, pblevels.into()) }) .collect(), max_committed_epoch: version.max_committed_epoch, @@ -109,13 +109,17 @@ impl From<&HummockVersion> for IncompleteHummockVersion { table_change_log: version .table_change_log .iter() - .map(|(table_id, change_log)| { + .filter_map(|(table_id, change_log)| { + if !time_travel_table_ids.contains(&table_id.table_id()) { + return None; + } + // TODO: sst must contains table_id let incomplete_table_change_log = change_log .0 .iter() .map(|e| PbEpochNewChangeLog::from(e).into()) .collect(); - (*table_id, TableChangeLogCommon(incomplete_table_change_log)) + Some((*table_id, TableChangeLogCommon(incomplete_table_change_log))) }) .collect(), state_table_info: version.state_table_info.clone(), @@ -123,14 +127,37 @@ impl From<&HummockVersion> for IncompleteHummockVersion { } } +/// Removes SST refs that don't contain any of `time_travel_table_ids`. +fn rewrite_levels(mut levels: PbLevels, time_travel_table_ids: &HashSet) -> PbLevels { + fn rewrite_level(level: &mut PbLevel, time_travel_table_ids: &HashSet) { + // The stats like `total_file_size` are not updated accordingly since they won't be used in time travel query. + level.table_infos.retain(|sst| { + sst.table_ids + .iter() + .any(|tid| time_travel_table_ids.contains(tid)) + }); + } + for level in &mut levels.levels { + rewrite_level(level, time_travel_table_ids); + } + if let Some(l0) = levels.l0.as_mut() { + for sub_level in &mut l0.sub_levels { + rewrite_level(sub_level, time_travel_table_ids); + } + l0.sub_levels.retain(|s| !s.table_infos.is_empty()); + } + levels +} + /// [`IncompleteHummockVersionDelta`] is incomplete because `SSTableInfo` only has the `sst_id` set in the following fields: /// - `PbGroupDeltas` /// - `ChangeLogDelta` pub type IncompleteHummockVersionDelta = HummockVersionDeltaCommon; /// `SStableInfo` will be stripped. -impl From<&HummockVersionDelta> for IncompleteHummockVersionDelta { - fn from(delta: &HummockVersionDelta) -> Self { +impl From<(&HummockVersionDelta, &HashSet)> for IncompleteHummockVersionDelta { + fn from(p: (&HummockVersionDelta, &HashSet)) -> Self { + let (delta, time_travel_table_ids) = p; #[expect(deprecated)] Self { id: delta.id, @@ -138,7 +165,11 @@ impl From<&HummockVersionDelta> for IncompleteHummockVersionDelta { group_deltas: delta .group_deltas .iter() - .map(|(cg_id, deltas)| (*cg_id, PbGroupDeltas::from(deltas).into())) + .map(|(cg_id, deltas)| { + let pb_group_deltas = + rewrite_group_deltas(PbGroupDeltas::from(deltas), time_travel_table_ids); + (*cg_id, pb_group_deltas.into()) + }) .collect(), max_committed_epoch: delta.max_committed_epoch, trivial_move: delta.trivial_move, @@ -147,13 +178,40 @@ impl From<&HummockVersionDelta> for IncompleteHummockVersionDelta { change_log_delta: delta .change_log_delta .iter() - .map(|(table_id, log_delta)| (*table_id, PbChangeLogDelta::from(log_delta).into())) + .filter_map(|(table_id, log_delta)| { + if !time_travel_table_ids.contains(&table_id.table_id()) { + return None; + } + // TODO: sst must contains table_id + Some((*table_id, PbChangeLogDelta::from(log_delta).into())) + }) .collect(), state_table_info_delta: delta.state_table_info_delta.clone(), } } } +/// Removes SST refs that don't contain any of `time_travel_table_ids`. +fn rewrite_group_deltas( + mut group_deltas: PbGroupDeltas, + time_travel_table_ids: &HashSet, +) -> PbGroupDeltas { + for group_delta in &mut group_deltas.group_deltas { + let Some(group_delta::DeltaType::NewL0SubLevel(new_sub_level)) = + &mut group_delta.delta_type + else { + tracing::error!(?group_delta, "unexpected delta type"); + continue; + }; + new_sub_level.inserted_table_infos.retain(|sst| { + sst.table_ids + .iter() + .any(|tid| time_travel_table_ids.contains(tid)) + }); + } + group_deltas +} + pub struct SstableIdInVersion { sst_id: HummockSstableId, object_id: HummockSstableObjectId, From b2cb1aa53be03bf06167f5d1bcc5445ad6a2c6b7 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Fri, 6 Dec 2024 15:29:50 +0800 Subject: [PATCH 4/5] refactor --- src/meta/src/hummock/manager/time_travel.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/meta/src/hummock/manager/time_travel.rs b/src/meta/src/hummock/manager/time_travel.rs index 280197c49dfb0..3e333ecafb94c 100644 --- a/src/meta/src/hummock/manager/time_travel.rs +++ b/src/meta/src/hummock/manager/time_travel.rs @@ -443,7 +443,21 @@ impl HummockManager { let mut version_sst_ids = None; if let Some(version) = version { - version_sst_ids = Some(version.get_sst_ids()); + // `version_sst_ids` is used to update `last_time_travel_snapshot_sst_ids`. + version_sst_ids = Some( + version + .get_sst_infos() + .filter_map(|s| { + if s.table_ids + .iter() + .any(|tid| time_travel_table_ids.contains(tid)) + { + return Some(s.sst_id); + } + None + }) + .collect(), + ); write_sstable_infos( version.get_sst_infos().filter(|s| { !skip_sst_ids.contains(&s.sst_id) From 15c87fbc8025c9e9fedb81f45d5c0e3d6933c633 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Fri, 6 Dec 2024 16:17:39 +0800 Subject: [PATCH 5/5] add assertion --- src/storage/hummock_sdk/src/time_travel.rs | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/src/storage/hummock_sdk/src/time_travel.rs b/src/storage/hummock_sdk/src/time_travel.rs index f3793b1f30be1..d088d3e2ae843 100644 --- a/src/storage/hummock_sdk/src/time_travel.rs +++ b/src/storage/hummock_sdk/src/time_travel.rs @@ -113,7 +113,13 @@ impl From<(&HummockVersion, &HashSet)> for IncompleteHummockVersio if !time_travel_table_ids.contains(&table_id.table_id()) { return None; } - // TODO: sst must contains table_id + debug_assert!(change_log.0.iter().all(|d| { + d.new_value.iter().chain(d.old_value.iter()).all(|s| { + s.table_ids + .iter() + .any(|tid| time_travel_table_ids.contains(tid)) + }) + })); let incomplete_table_change_log = change_log .0 .iter() @@ -182,7 +188,17 @@ impl From<(&HummockVersionDelta, &HashSet)> for IncompleteHummockV if !time_travel_table_ids.contains(&table_id.table_id()) { return None; } - // TODO: sst must contains table_id + debug_assert!(log_delta + .new_log + .as_ref() + .map(|d| { + d.new_value.iter().chain(d.old_value.iter()).all(|s| { + s.table_ids + .iter() + .any(|tid| time_travel_table_ids.contains(tid)) + }) + }) + .unwrap_or(true)); Some((*table_id, PbChangeLogDelta::from(log_delta).into())) }) .collect(),