Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): filter out SSTs for time travel metadata #19694

Merged
merged 6 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/common/src/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ macro_rules! impl_system_params_for_test {
ret.backup_storage_url = Some("memory".into());
ret.backup_storage_directory = Some("backup".into());
ret.use_new_object_prefix_strategy = Some(false);
ret.time_travel_retention_ms = Some(0);
ret
}
};
Expand Down
35 changes: 19 additions & 16 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2799,11 +2799,6 @@ impl CatalogController {
inner.list_all_state_tables().await
}

pub async fn list_all_state_table_ids(&self) -> MetaResult<Vec<TableId>> {
let inner = self.inner.read().await;
inner.list_all_state_table_ids().await
}

pub async fn list_readonly_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
let inner = self.inner.read().await;
let table_ids: Vec<TableId> = Table::find()
Expand Down Expand Up @@ -3228,6 +3223,10 @@ impl CatalogController {
.collect();
Ok(res)
}

pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
self.inner.read().await.list_time_travel_table_ids().await
}
}

/// `CatalogStats` is a struct to store the statistics of all catalogs.
Expand Down Expand Up @@ -3360,17 +3359,6 @@ impl CatalogControllerInner {
.collect())
}

/// `list_all_tables` return all ids of state tables.
pub async fn list_all_state_table_ids(&self) -> MetaResult<Vec<TableId>> {
let table_ids: Vec<TableId> = Table::find()
.select_only()
.column(table::Column::TableId)
.into_tuple()
.all(&self.db)
.await?;
Ok(table_ids)
}

/// `list_tables` return all `CREATED` tables, `CREATING` materialized views and internal tables that belong to them.
async fn list_tables(&self) -> MetaResult<Vec<PbTable>> {
let table_objs = Table::find()
Expand Down Expand Up @@ -3589,6 +3577,21 @@ impl CatalogControllerInner {
let _ = tx.send(Err(err.clone()));
}
}

pub async fn list_time_travel_table_ids(&self) -> MetaResult<Vec<TableId>> {
let table_ids: Vec<TableId> = Table::find()
.select_only()
.filter(table::Column::TableType.is_in(vec![
TableType::Table,
TableType::MaterializedView,
TableType::Index,
]))
.column(table::Column::TableId)
.into_tuple()
.all(&self.db)
.await?;
Ok(table_ids)
}
}

async fn update_internal_tables(
Expand Down
17 changes: 2 additions & 15 deletions src/meta/src/controller/system_param.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use anyhow::anyhow;
use risingwave_common::system_param::common::CommonHandler;
use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_common::system_param::{
check_missing_params, default, derive_missing_fields, set_system_param,
check_missing_params, derive_missing_fields, set_system_param,
};
use risingwave_common::{for_all_params, key_of};
use risingwave_meta_model::prelude::SystemParameter;
Expand Down Expand Up @@ -132,18 +132,6 @@ for_all_params!(impl_system_params_from_db);
for_all_params!(impl_merge_params);
for_all_params!(impl_system_params_to_models);

fn apply_hard_code_override(params: &mut PbSystemParams) {
if params
.time_travel_retention_ms
.map(|v| v == 0)
.unwrap_or(true)
{
let default_v = default::time_travel_retention_ms();
tracing::info!("time_travel_retention_ms has been overridden to {default_v}");
params.time_travel_retention_ms = Some(default_v);
}
}

impl SystemParamsController {
pub async fn new(
sql_meta_store: SqlMetaStore,
Expand All @@ -152,8 +140,7 @@ impl SystemParamsController {
) -> MetaResult<Self> {
let db = sql_meta_store.conn;
let params = SystemParameter::find().all(&db).await?;
let mut params = merge_params(system_params_from_db(params)?, init_params);
apply_hard_code_override(&mut params);
let params = merge_params(system_params_from_db(params)?, init_params);
tracing::info!(initial_params = ?SystemParamsReader::new(&params), "initialize system parameters");
check_missing_params(&params).map_err(|e| anyhow!(e))?;
let ctl = Self {
Expand Down
17 changes: 10 additions & 7 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,6 @@ impl HummockManager {
.time_travel_snapshot_interval_counter
.saturating_add(1);
}
let group_parents = version
.latest_version()
.levels
.values()
.map(|g| (g.group_id, g.parent_group_id))
.collect();
let time_travel_tables_to_commit =
table_compaction_group_mapping
.iter()
Expand All @@ -261,13 +255,22 @@ impl HummockManager {
.get(table_id)
.map(|committed_epoch| (table_id, cg_id, *committed_epoch))
});
let time_travel_table_ids: HashSet<_> = self
.metadata_manager
.catalog_controller
.list_time_travel_table_ids()
.await
.map_err(|e| Error::Internal(e.into()))?
.into_iter()
.map(|id| id.try_into().unwrap())
.collect();
let mut txn = self.env.meta_store_ref().conn.begin().await?;
let version_snapshot_sst_ids = self
.write_time_travel_metadata(
&txn,
time_travel_version,
time_travel_delta,
&group_parents,
time_travel_table_ids,
&versioning.last_time_travel_snapshot_sst_ids,
time_travel_tables_to_commit,
)
Expand Down
90 changes: 42 additions & 48 deletions src/meta/src/hummock/manager/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ use std::collections::{HashMap, HashSet, VecDeque};

use anyhow::anyhow;
use risingwave_common::catalog::TableId;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::util::epoch::Epoch;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::compaction_group::StateTableId;
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::time_travel::{
refill_version, IncompleteHummockVersion, IncompleteHummockVersionDelta,
Expand Down Expand Up @@ -380,20 +381,19 @@ impl HummockManager {
txn: &DatabaseTransaction,
version: Option<&HummockVersion>,
delta: HummockVersionDelta,
group_parents: &HashMap<CompactionGroupId, CompactionGroupId>,
time_travel_table_ids: HashSet<StateTableId>,
skip_sst_ids: &HashSet<HummockSstableId>,
tables_to_commit: impl Iterator<Item = (&TableId, &CompactionGroupId, u64)>,
) -> Result<Option<HashSet<HummockSstableId>>> {
let select_groups = group_parents
.iter()
.filter_map(|(cg_id, _)| {
if should_ignore_group(find_root_group(*cg_id, group_parents)) {
None
} else {
Some(*cg_id)
}
})
.collect::<HashSet<_>>();
if self
.env
.system_params_reader()
.await
.time_travel_retention_ms()
== 0
{
return Ok(None);
}
async fn write_sstable_infos(
mut sst_infos: impl Iterator<Item = &SstableInfo>,
txn: &DatabaseTransaction,
Expand Down Expand Up @@ -428,10 +428,7 @@ impl HummockManager {
Ok(count)
}

for (table_id, cg_id, committed_epoch) in tables_to_commit {
if !select_groups.contains(cg_id) {
continue;
}
for (table_id, _cg_id, committed_epoch) in tables_to_commit {
let version_id: u64 = delta.id.to_u64();
let m = hummock_epoch_to_version::ActiveModel {
epoch: Set(committed_epoch.try_into().unwrap()),
Expand All @@ -446,16 +443,28 @@ impl HummockManager {

let mut version_sst_ids = None;
if let Some(version) = version {
// `version_sst_ids` is used to update `last_time_travel_snapshot_sst_ids`.
version_sst_ids = Some(
version
.get_sst_infos_from_groups(&select_groups)
.map(|s| s.sst_id)
.get_sst_infos()
.filter_map(|s| {
if s.table_ids
.iter()
.any(|tid| time_travel_table_ids.contains(tid))
{
return Some(s.sst_id);
}
None
})
.collect(),
);
write_sstable_infos(
version
.get_sst_infos_from_groups(&select_groups)
.filter(|s| !skip_sst_ids.contains(&s.sst_id)),
version.get_sst_infos().filter(|s| {
!skip_sst_ids.contains(&s.sst_id)
&& s.table_ids
.iter()
.any(|tid| time_travel_table_ids.contains(tid))
}),
txn,
self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
)
Expand All @@ -465,19 +474,24 @@ impl HummockManager {
version.id.to_u64(),
)
.unwrap()),
version: Set((&IncompleteHummockVersion::from((version, &select_groups))
.to_protobuf())
.into()),
version: Set(
(&IncompleteHummockVersion::from((version, &time_travel_table_ids))
.to_protobuf())
.into(),
),
};
hummock_time_travel_version::Entity::insert(m)
.on_conflict_do_nothing()
.exec(txn)
.await?;
}
let written = write_sstable_infos(
delta
.newly_added_sst_infos(Some(&select_groups))
.filter(|s| !skip_sst_ids.contains(&s.sst_id)),
delta.newly_added_sst_infos().filter(|s| {
!skip_sst_ids.contains(&s.sst_id)
&& s.table_ids
.iter()
.any(|tid| time_travel_table_ids.contains(tid))
}),
txn,
self.env.opts.hummock_time_travel_sst_info_insert_batch_size,
)
Expand All @@ -491,7 +505,7 @@ impl HummockManager {
.unwrap()),
version_delta: Set((&IncompleteHummockVersionDelta::from((
&delta,
&select_groups,
&time_travel_table_ids,
))
.to_protobuf())
.into()),
Expand Down Expand Up @@ -531,26 +545,6 @@ fn replay_archive(
last_version
}

fn find_root_group(
group_id: CompactionGroupId,
parents: &HashMap<CompactionGroupId, CompactionGroupId>,
) -> CompactionGroupId {
let mut root = group_id;
while let Some(parent) = parents.get(&root)
&& *parent != 0
{
root = *parent;
}
root
}

fn should_ignore_group(root_group_id: CompactionGroupId) -> bool {
// It is possible some intermediate groups has been dropped,
// so it's impossible to tell whether the root group is MaterializedView or not.
// Just treat them as MaterializedView for correctness.
root_group_id == StaticCompactionGroupId::StateDefault as CompactionGroupId
}

pub fn require_sql_meta_store_err() -> Error {
Error::TimeTravel(anyhow!("require SQL meta store"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,35 +84,6 @@ impl HummockVersion {
.map(|s| s.sst_id)
}

/// `get_sst_infos_from_groups` doesn't guarantee that all returned sst info belongs to `select_group`.
/// i.e. `select_group` is just a hint.
/// We separate `get_sst_infos_from_groups` and `get_sst_infos` because `get_sst_infos_from_groups` may be further customized in the future.
pub fn get_sst_infos_from_groups<'a>(
&'a self,
select_group: &'a HashSet<CompactionGroupId>,
) -> impl Iterator<Item = &'a SstableInfo> + 'a {
self.levels
.iter()
.filter_map(|(cg_id, level)| {
if select_group.contains(cg_id) {
Some(level)
} else {
None
}
})
.flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
.flat_map(|level| level.table_infos.iter())
.chain(self.table_change_log.values().flat_map(|change_log| {
// TODO: optimization: strip table change log
change_log.0.iter().flat_map(|epoch_change_log| {
epoch_change_log
.old_value
.iter()
.chain(epoch_change_log.new_value.iter())
})
}))
}

pub fn level_iter<F: FnMut(&Level) -> bool>(
&self,
compaction_group_id: CompactionGroupId,
Expand Down
Loading
Loading