Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): support commit multi epoch for new compaction group #17749

Merged
merged 15 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1342,5 +1342,6 @@ fn collect_commit_epoch_info(
InflightActorInfo::existing_table_ids(&command_ctx.info.fragment_infos).collect(),
)]),
epoch,
vec![],
)
}
74 changes: 71 additions & 3 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@ use risingwave_pb::hummock::HummockSnapshot;
use sea_orm::TransactionTrait;

use crate::hummock::error::{Error, Result};
use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
use crate::hummock::manager::transaction::{
HummockVersionStatsTransaction, HummockVersionTransaction,
};
use crate::hummock::manager::versioning::Versioning;
use crate::hummock::metrics_utils::{
get_or_create_local_table_stat, trigger_local_table_stat, trigger_sst_stat,
};
use crate::hummock::sequence::next_sstable_object_id;
use crate::hummock::model::CompactionGroup;
use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id};
use crate::hummock::{
commit_multi_var, commit_multi_var_with_provided_txn, start_measure_real_process_timer,
HummockManager,
Expand All @@ -52,6 +54,11 @@ pub struct NewTableFragmentInfo {
pub internal_table_ids: Vec<TableId>,
}

pub struct BatchCommitForNewCg {
pub epoch_to_ssts: BTreeMap<HummockEpoch, Vec<LocalSstableInfo>>,
pub table_ids: Vec<TableId>,
}

pub struct CommitEpochInfo {
pub sstables: Vec<LocalSstableInfo>,
pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
Expand All @@ -60,6 +67,9 @@ pub struct CommitEpochInfo {
pub change_log_delta: HashMap<TableId, ChangeLogDelta>,
pub table_committed_epoch: BTreeMap<HummockEpoch, HashSet<TableId>>,
pub max_committed_epoch: HummockEpoch,

// commit multi Epoch and SSTs for new compaction group
pub batch_commit_for_new_cg: Vec<BatchCommitForNewCg>,
}

impl CommitEpochInfo {
Expand All @@ -71,6 +81,7 @@ impl CommitEpochInfo {
change_log_delta: HashMap<TableId, ChangeLogDelta>,
table_committed_epoch: BTreeMap<HummockEpoch, HashSet<TableId>>,
max_committed_epoch: HummockEpoch,
batch_commit_for_new_cg: Vec<BatchCommitForNewCg>,
) -> Self {
Self {
sstables,
Expand All @@ -80,6 +91,7 @@ impl CommitEpochInfo {
change_log_delta,
table_committed_epoch,
max_committed_epoch,
batch_commit_for_new_cg,
}
}
}
Expand Down Expand Up @@ -110,6 +122,7 @@ impl HummockManager {
HashMap::new(),
BTreeMap::from_iter([(epoch, tables)]),
epoch,
vec![],
);
self.commit_epoch(info).await?;
Ok(())
Expand All @@ -128,6 +141,7 @@ impl HummockManager {
change_log_delta,
table_committed_epoch,
max_committed_epoch: epoch,
batch_commit_for_new_cg,
} = commit_info;
let mut versioning_guard = self.versioning.write().await;
let _timer = start_measure_real_process_timer!(self, "commit_epoch");
Expand Down Expand Up @@ -169,6 +183,7 @@ impl HummockManager {
);

let state_table_info = &version.latest_version().state_table_info;

let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id();

let mut new_table_ids = HashMap::new();
Expand All @@ -195,6 +210,48 @@ impl HummockManager {
}
}

let (batch_commit_for_new_cg, compaction_group_manager_txn) =
if !batch_commit_for_new_cg.is_empty() {
let compaction_group_manager_guard = self.compaction_group_manager.write().await;
let compaction_group_config =
compaction_group_manager_guard.default_compaction_config();
let mut compaction_group_manager =
CompactionGroupManager::start_owned_compaction_groups_txn(
compaction_group_manager_guard,
);
let mut batch_commit_info = HashMap::new();
for BatchCommitForNewCg {
epoch_to_ssts,
table_ids,
} in batch_commit_for_new_cg
{
let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
compaction_group_manager.insert(
new_compaction_group_id,
CompactionGroup {
group_id: new_compaction_group_id,
compaction_config: compaction_group_config.clone(),
},
);

on_handle_add_new_table(
state_table_info,
&table_ids,
new_compaction_group_id,
&mut table_compaction_group_mapping,
&mut new_table_ids,
)?;

batch_commit_info.insert(new_compaction_group_id, epoch_to_ssts);
}
(
Some((batch_commit_info, (*compaction_group_config).clone())),
Some(compaction_group_manager),
)
} else {
(None, None)
};

let commit_sstables = self
.correct_commit_ssts(sstables, &table_compaction_group_mapping)
.await?;
Expand All @@ -207,6 +264,7 @@ impl HummockManager {
new_table_ids,
new_table_watermarks,
change_log_delta,
batch_commit_for_new_cg,
);

// TODO: remove the sanity check when supporting partial checkpoint
Expand Down Expand Up @@ -287,12 +345,22 @@ impl HummockManager {
&versioning.last_time_travel_snapshot_sst_ids,
)
.await?;
commit_multi_var_with_provided_txn!(txn, version, version_stats)?;
commit_multi_var_with_provided_txn!(
txn,
version,
version_stats,
compaction_group_manager_txn
)?;
if let Some(version_snapshot_sst_ids) = version_snapshot_sst_ids {
versioning.last_time_travel_snapshot_sst_ids = version_snapshot_sst_ids;
}
} else {
commit_multi_var!(self.meta_store_ref(), version, version_stats)?;
commit_multi_var!(
self.meta_store_ref(),
version,
version_stats,
compaction_group_manager_txn
)?;
}

let snapshot = HummockSnapshot {
Expand Down
25 changes: 24 additions & 1 deletion src/meta/src/hummock/manager/compaction_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
use crate::hummock::model::CompactionGroup;
use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id};
use crate::manager::{MetaSrvEnv, MetaStoreImpl};
use crate::model::{BTreeMapTransaction, MetadataModel, MetadataModelError};
use crate::model::{
BTreeMapTransaction, BTreeMapTransactionInner, DerefMutForward, MetadataModel,
MetadataModelError,
};

type CompactionGroupTransaction<'a> = BTreeMapTransaction<'a, CompactionGroupId, CompactionGroup>;

Expand Down Expand Up @@ -693,6 +696,26 @@ impl CompactionGroupManager {
CompactionGroupTransaction::new(&mut self.compaction_groups)
}

pub fn start_owned_compaction_groups_txn<P: DerefMut<Target = Self>>(
inner: P,
) -> BTreeMapTransactionInner<
CompactionGroupId,
CompactionGroup,
DerefMutForward<
Self,
BTreeMap<CompactionGroupId, CompactionGroup>,
P,
impl Fn(&Self) -> &BTreeMap<CompactionGroupId, CompactionGroup>,
impl Fn(&mut Self) -> &mut BTreeMap<CompactionGroupId, CompactionGroup>,
>,
> {
BTreeMapTransactionInner::new(DerefMutForward::new(
inner,
|mgr| &mgr.compaction_groups,
|mgr| &mut mgr.compaction_groups,
))
}

/// Tries to get compaction group config for `compaction_group_id`.
pub(super) fn try_get_compaction_group_config(
&self,
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ mod utils;
mod worker;

pub(crate) use commit_epoch::*;
#[cfg(any(test, feature = "test"))]
pub use commit_epoch::{BatchCommitForNewCg, CommitEpochInfo};
use compaction::*;
pub use compaction::{check_cg_write_limit, WriteLimitType};
pub(crate) use utils::*;
Expand Down
89 changes: 67 additions & 22 deletions src/meta/src/hummock/manager/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ use std::ops::{Deref, DerefMut};

use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::change_log::ChangeLogDelta;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::table_watermark::TableWatermarks;
use risingwave_hummock_sdk::version::{
GroupDelta, HummockVersion, HummockVersionDelta, IntraLevelDelta,
};
use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, HummockVersionId};
use risingwave_pb::hummock::{HummockVersionStats, StateTableInfoDelta};
use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, HummockVersionId, LocalSstableInfo};
use risingwave_pb::hummock::{
CompactionConfig, CompatibilityVersion, GroupConstruct, HummockVersionStats,
StateTableInfoDelta,
};
use risingwave_pb::meta::subscribe_response::{Info, Operation};

use crate::manager::NotificationManager;
Expand Down Expand Up @@ -109,26 +113,66 @@ impl<'a> HummockVersionTransaction<'a> {
}

/// Returns a duplicate delta, used by time travel.
#[expect(clippy::type_complexity)]
pub(super) fn pre_commit_epoch(
&mut self,
epoch: HummockEpoch,
max_committed_epoch: HummockEpoch,
commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>,
new_table_ids: HashMap<TableId, CompactionGroupId>,
new_table_watermarks: HashMap<TableId, TableWatermarks>,
change_log_delta: HashMap<TableId, ChangeLogDelta>,
batch_commit_for_new_cg: Option<(
HashMap<CompactionGroupId, BTreeMap<u64, Vec<LocalSstableInfo>>>,
CompactionConfig,
)>,
) -> HummockVersionDelta {
let mut new_version_delta = self.new_delta();
new_version_delta.max_committed_epoch = epoch;
new_version_delta.max_committed_epoch = max_committed_epoch;
new_version_delta.new_table_watermarks = new_table_watermarks;
new_version_delta.change_log_delta = change_log_delta;

if let Some((batch_commit_for_new_cg, compaction_group_config)) = batch_commit_for_new_cg {
for (compaction_group_id, batch_commit_sst) in batch_commit_for_new_cg {
let group_deltas = &mut new_version_delta
.group_deltas
.entry(compaction_group_id)
.or_default()
.group_deltas;

#[expect(deprecated)]
group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct {
group_config: Some(compaction_group_config.clone()),
group_id: compaction_group_id,
parent_group_id: StaticCompactionGroupId::NewCompactionGroup
as CompactionGroupId,
new_sst_start_id: 0, // No need to set it when `NewCompactionGroup`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you help to add an assertion in either init_with_parent_group or split_sst, to guarantee new_sst_start_id is 0 IFF NewCompactionGroup?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a warning in non-debug context and an assertion in debug context.

table_ids: vec![],
version: CompatibilityVersion::NoMemberTableIds as i32,
}));

for (epoch, insert_ssts) in batch_commit_sst {
assert!(epoch < max_committed_epoch);
let l0_sub_level_id = epoch;
wenym1 marked this conversation as resolved.
Show resolved Hide resolved
let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
0,
l0_sub_level_id,
vec![], // default
insert_ssts.into_iter().map(|s| s.sst_info).collect(),
0, // default
));
group_deltas.push(group_delta);
}
}
}

// Append SSTs to a new version.
for (compaction_group_id, inserted_table_infos) in commit_sstables {
let group_deltas = &mut new_version_delta
.group_deltas
.entry(compaction_group_id)
.or_default()
.group_deltas;
let l0_sub_level_id = epoch;
let l0_sub_level_id = max_committed_epoch;
let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
0,
l0_sub_level_id,
Expand All @@ -143,32 +187,33 @@ impl<'a> HummockVersionTransaction<'a> {
// update state table info
new_version_delta.with_latest_version(|version, delta| {
for (table_id, cg_id) in new_table_ids {
assert!(
!version.state_table_info.info().contains_key(&table_id),
"newly added table exists previously: {:?}",
table_id
);
delta.state_table_info_delta.insert(
table_id,
StateTableInfoDelta {
committed_epoch: epoch,
safe_epoch: epoch,
committed_epoch: max_committed_epoch,
safe_epoch: max_committed_epoch,
compaction_group_id: cg_id,
},
);
}

for (table_id, info) in version.state_table_info.info() {
assert!(
delta
.state_table_info_delta
.insert(
*table_id,
StateTableInfoDelta {
committed_epoch: epoch,
safe_epoch: info.safe_epoch,
compaction_group_id: info.compaction_group_id,
}
)
.is_none(),
"newly added table exists previously: {:?}",
table_id
);
assert!(delta
.state_table_info_delta
.insert(
*table_id,
StateTableInfoDelta {
committed_epoch: max_committed_epoch,
safe_epoch: info.safe_epoch,
compaction_group_id: info.compaction_group_id,
}
)
.is_none(),);
}
});

Expand Down
5 changes: 5 additions & 0 deletions src/meta/src/hummock/mock_hummock_meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ impl MockHummockMetaClient {
.await
.unwrap_or(None)
}

pub fn context_id(&self) -> HummockContextId {
self.context_id
}
}

fn mock_err(error: super::error::Error) -> RpcError {
Expand Down Expand Up @@ -193,6 +197,7 @@ impl HummockMetaClient for MockHummockMetaClient {
version.state_table_info.info().keys().cloned().collect(),
)]),
epoch,
vec![],
))
.await
.map_err(mock_err)?;
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/hummock/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
use crate::hummock::compaction::selector::{default_compaction_selector, LocalSelectorStatistic};
use crate::hummock::compaction::{CompactionDeveloperConfig, CompactionSelectorContext};
use crate::hummock::level_handler::LevelHandler;
pub use crate::hummock::manager::CommitEpochInfo;
use crate::hummock::model::CompactionGroup;
use crate::hummock::{CompactorManager, HummockManager, HummockManagerRef};
use crate::manager::{
Expand Down
Loading
Loading