Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): support commit multi epoch for new compaction group #17749

Merged
merged 15 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1342,5 +1342,6 @@ fn collect_commit_epoch_info(
InflightActorInfo::existing_table_ids(&command_ctx.info.fragment_infos).collect(),
)]),
epoch,
vec![],
)
}
95 changes: 88 additions & 7 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ use crate::hummock::manager::versioning::Versioning;
use crate::hummock::metrics_utils::{
get_or_create_local_table_stat, trigger_local_table_stat, trigger_sst_stat,
};
use crate::hummock::sequence::next_sstable_object_id;
use crate::hummock::model::CompactionGroup;
use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id};
use crate::hummock::{
commit_multi_var, commit_multi_var_with_provided_txn, start_measure_real_process_timer,
HummockManager,
Expand All @@ -53,6 +54,11 @@ pub struct NewTableFragmentInfo {
pub internal_table_ids: Vec<TableId>,
}

pub struct BatchCommitForNewCg {
pub epoch_to_ssts: BTreeMap<HummockEpoch, Vec<LocalSstableInfo>>,
pub table_ids: Vec<TableId>,
}

pub struct CommitEpochInfo {
pub sstables: Vec<LocalSstableInfo>,
pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
Expand All @@ -61,6 +67,9 @@ pub struct CommitEpochInfo {
pub change_log_delta: HashMap<TableId, ChangeLogDelta>,
pub table_committed_epoch: BTreeMap<HummockEpoch, HashSet<TableId>>,
pub max_committed_epoch: HummockEpoch,

// commit multi Epoch and SSTs for new compaction group
pub batch_commit_for_new_cg: Vec<BatchCommitForNewCg>,
}

impl CommitEpochInfo {
Expand All @@ -72,6 +81,7 @@ impl CommitEpochInfo {
change_log_delta: HashMap<TableId, ChangeLogDelta>,
table_committed_epoch: BTreeMap<HummockEpoch, HashSet<TableId>>,
max_committed_epoch: HummockEpoch,
batch_commit_for_new_cg: Vec<BatchCommitForNewCg>,
) -> Self {
Self {
sstables,
Expand All @@ -81,17 +91,19 @@ impl CommitEpochInfo {
change_log_delta,
table_committed_epoch,
max_committed_epoch,
batch_commit_for_new_cg,
}
}
}

impl HummockManager {
#[cfg(any(test, feature = "test"))]
pub async fn commit_epoch_for_test(
pub async fn commit_epoch_with_batch_cg_for_test(
&self,
epoch: HummockEpoch,
sstables: Vec<impl Into<LocalSstableInfo>>,
sst_to_context: HashMap<HummockSstableObjectId, HummockContextId>,
batch_commit_for_new_cg: Vec<BatchCommitForNewCg>,
) -> Result<()> {
let tables = self
.versioning
Expand All @@ -111,11 +123,23 @@ impl HummockManager {
HashMap::new(),
BTreeMap::from_iter([(epoch, tables)]),
epoch,
batch_commit_for_new_cg,
);
self.commit_epoch(info).await?;
Ok(())
}

#[cfg(any(test, feature = "test"))]
pub async fn commit_epoch_for_test(
&self,
epoch: HummockEpoch,
sstables: Vec<impl Into<LocalSstableInfo>>,
sst_to_context: HashMap<HummockSstableObjectId, HummockContextId>,
) -> Result<()> {
self.commit_epoch_with_batch_cg_for_test(epoch, sstables, sst_to_context, vec![])
.await
}

/// Caller should ensure `epoch` > `max_committed_epoch`
pub async fn commit_epoch(
&self,
Expand All @@ -129,8 +153,11 @@ impl HummockManager {
change_log_delta,
table_committed_epoch,
max_committed_epoch: epoch,
batch_commit_for_new_cg,
} = commit_info;
let mut versioning_guard = self.versioning.write().await;
let mut compaction_group_manager_guard = self.compaction_group_manager.write().await;
let compaction_group_config = compaction_group_manager_guard.default_compaction_config();
let _timer = start_measure_real_process_timer!(self, "commit_epoch");
// Prevent commit new epochs if this flag is set
if versioning_guard.disable_commit_epochs {
Expand Down Expand Up @@ -161,16 +188,19 @@ impl HummockManager {
self.env.notification_manager(),
&self.metrics,
);
let mut compaction_group_manager =
compaction_group_manager_guard.start_compaction_groups_txn();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we only acquire the compaction_group_manager lock only when batch_commit_for_new_cg is not empty in L222?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved.

The original BTreeMapTransaction cannot fulfill this, because it holds the mutation reference to the RwLockGuard, and they cannot be created and stored together, and otherwise there will be the problem of self-referencing.

In this PR, I change to let BTreeMapTransaction support holding any pointer type that impl DerefMut<BTreeMap>, so that the ownership of RwLockGuard can be held by BTreeMapTransaction and there won't be self-referencing. We further introduce a new struct DerefMutForward, which forward &mut CompactionGroupManager to &mut BTreeMap<...>. This is required because RwLockGuard wraps CompactionGroupManager, but we actually need impl DerefMut<BTreeMap<...>>.


let state_table_info = version.latest_version().state_table_info.clone();

let state_table_info = &version.latest_version().state_table_info;
let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id();

let mut new_table_ids = HashMap::new();
// Add new table
if let Some(new_fragment_table_info) = new_table_fragment_info {
if !new_fragment_table_info.internal_table_ids.is_empty() {
on_handle_add_new_table(
state_table_info,
&state_table_info,
&new_fragment_table_info.internal_table_ids,
StaticCompactionGroupId::StateDefault as u64,
&mut table_compaction_group_mapping,
Expand All @@ -180,7 +210,7 @@ impl HummockManager {

if let Some(mv_table_id) = new_fragment_table_info.mv_table_id {
on_handle_add_new_table(
state_table_info,
&state_table_info,
&[mv_table_id],
StaticCompactionGroupId::MaterializedView as u64,
&mut table_compaction_group_mapping,
Expand All @@ -189,6 +219,45 @@ impl HummockManager {
}
}

let batch_commit_for_new_cg = if !batch_commit_for_new_cg.is_empty() {
let mut new_id_count = 0;
let mut batch_commit_info = HashMap::new();
for BatchCommitForNewCg {
epoch_to_ssts,
table_ids,
} in batch_commit_for_new_cg
{
let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
compaction_group_manager.insert(
new_compaction_group_id,
CompactionGroup {
group_id: new_compaction_group_id,
compaction_config: compaction_group_config.clone(),
},
);

new_id_count += epoch_to_ssts.values().map(|ssts| ssts.len()).sum::<usize>();

on_handle_add_new_table(
&state_table_info,
&table_ids,
new_compaction_group_id,
&mut table_compaction_group_mapping,
&mut new_table_ids,
)?;

batch_commit_info.insert(new_compaction_group_id, epoch_to_ssts);
}
let start_sst_id = next_sstable_object_id(&self.env, new_id_count).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to reserve new sst ids for the new compaction group, because its parent group will be StaticCompactionGroupId::NewCompactionGroup so that the reserved new sst ids won't never used anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this commit_epoch, we will insert data to the newly created compaction together in a single version delta, so we may need to reserve some sst ids for the newly added data?

Copy link
Contributor

@zwang28 zwang28 Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The newly added SSTs in commit_epoch already have their deterministic SST ids.
We only need reserve SST ids in advance during splitting compaction group, where branched SSTs are assigned ids in the reserved range.

See

pub fn split_sst(sst_info: &mut SstableInfo, new_sst_id: &mut u64) -> SstableInfo {

@Li0k Could you please confirm this again?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get it. I just check the usage of new_sst_start_id , and indeed there is no need to set it when we create an empty new compaction group. Have changed to avoid passing this new_sst_start_id value along the way.

Some((
batch_commit_info,
start_sst_id,
(*compaction_group_config).clone(),
))
} else {
None
};

let commit_sstables = self
.correct_commit_ssts(sstables, &table_compaction_group_mapping)
.await?;
Expand All @@ -201,6 +270,7 @@ impl HummockManager {
new_table_ids,
new_table_watermarks,
change_log_delta,
batch_commit_for_new_cg,
);

// TODO: remove the sanity check when supporting partial checkpoint
Expand Down Expand Up @@ -280,12 +350,22 @@ impl HummockManager {
&versioning.last_time_travel_snapshot_sst_ids,
)
.await?;
commit_multi_var_with_provided_txn!(txn, version, version_stats)?;
commit_multi_var_with_provided_txn!(
txn,
version,
version_stats,
compaction_group_manager
)?;
if let Some(version_snapshot_sst_ids) = version_snapshot_sst_ids {
versioning.last_time_travel_snapshot_sst_ids = version_snapshot_sst_ids;
}
} else {
commit_multi_var!(self.meta_store_ref(), version, version_stats)?;
commit_multi_var!(
self.meta_store_ref(),
version,
version_stats,
compaction_group_manager
)?;
}

let snapshot = HummockSnapshot {
Expand All @@ -306,6 +386,7 @@ impl HummockManager {
}

drop(versioning_guard);
drop(compaction_group_manager_guard);
wenym1 marked this conversation as resolved.
Show resolved Hide resolved
tracing::trace!("new committed epoch {}", epoch);

// Don't trigger compactions if we enable deterministic compaction
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ mod utils;
mod worker;

pub(crate) use commit_epoch::*;
#[cfg(any(test, feature = "test"))]
pub use commit_epoch::{BatchCommitForNewCg, CommitEpochInfo};
use compaction::*;
pub use compaction::{check_cg_write_limit, WriteLimitType};
pub(crate) use utils::*;
Expand Down
53 changes: 51 additions & 2 deletions src/meta/src/hummock/manager/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ use std::ops::{Deref, DerefMut};

use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::change_log::ChangeLogDelta;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::table_watermark::TableWatermarks;
use risingwave_hummock_sdk::version::{
GroupDelta, HummockVersion, HummockVersionDelta, IntraLevelDelta,
};
use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, HummockVersionId};
use risingwave_pb::hummock::{HummockVersionStats, StateTableInfoDelta};
use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, HummockVersionId, LocalSstableInfo};
use risingwave_pb::hummock::{
CompactionConfig, CompatibilityVersion, GroupConstruct, HummockVersionStats,
StateTableInfoDelta,
};
use risingwave_pb::meta::subscribe_response::{Info, Operation};

use crate::manager::NotificationManager;
Expand Down Expand Up @@ -109,18 +113,63 @@ impl<'a> HummockVersionTransaction<'a> {
}

/// Returns a duplicate delta, used by time travel.
#[expect(clippy::type_complexity)]
pub(super) fn pre_commit_epoch(
&mut self,
epoch: HummockEpoch,
commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>,
new_table_ids: HashMap<TableId, CompactionGroupId>,
new_table_watermarks: HashMap<TableId, TableWatermarks>,
change_log_delta: HashMap<TableId, ChangeLogDelta>,
batch_commit_for_new_cg: Option<(
HashMap<CompactionGroupId, BTreeMap<u64, Vec<LocalSstableInfo>>>,
u64,
CompactionConfig,
)>,
) -> HummockVersionDelta {
let mut new_version_delta = self.new_delta();
new_version_delta.max_committed_epoch = epoch;
new_version_delta.new_table_watermarks = new_table_watermarks;
new_version_delta.change_log_delta = change_log_delta;

if let Some((batch_commit_for_new_cg, start_sst_id, compaction_group_config)) =
batch_commit_for_new_cg
{
let mut start_sst_id = start_sst_id;

for (compaction_group_id, batch_commit_sst) in batch_commit_for_new_cg {
let group_deltas = &mut new_version_delta
.group_deltas
.entry(compaction_group_id)
.or_default()
.group_deltas;

#[expect(deprecated)]
group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct {
group_config: Some(compaction_group_config.clone()),
group_id: compaction_group_id,
parent_group_id: StaticCompactionGroupId::NewCompactionGroup
as CompactionGroupId,
new_sst_start_id: start_sst_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This specific new_sst_start_id will never be used, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here we just fill in a field required to create new compaction group. What is this field used for? If it's not used for newly created compaction group, I can change to fill a trivial value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this field used for

#17749 (comment)

table_ids: vec![],
version: CompatibilityVersion::NoMemberTableIds as i32,
}));

for (epoch, insert_ssts) in batch_commit_sst {
start_sst_id += insert_ssts.len() as u64;
let l0_sub_level_id = epoch;
wenym1 marked this conversation as resolved.
Show resolved Hide resolved
let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
0,
l0_sub_level_id, // default
vec![],
insert_ssts.into_iter().map(|s| s.sst_info).collect(), // default
0, // default
));
group_deltas.push(group_delta);
}
}
}

// Append SSTs to a new version.
for (compaction_group_id, inserted_table_infos) in commit_sstables {
let group_deltas = &mut new_version_delta
Expand Down
5 changes: 5 additions & 0 deletions src/meta/src/hummock/mock_hummock_meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ impl MockHummockMetaClient {
.await
.unwrap_or(None)
}

pub fn context_id(&self) -> HummockContextId {
self.context_id
}
}

fn mock_err(error: super::error::Error) -> RpcError {
Expand Down Expand Up @@ -193,6 +197,7 @@ impl HummockMetaClient for MockHummockMetaClient {
version.state_table_info.info().keys().cloned().collect(),
)]),
epoch,
vec![],
))
.await
.map_err(mock_err)?;
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/hummock/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
use crate::hummock::compaction::selector::{default_compaction_selector, LocalSelectorStatistic};
use crate::hummock::compaction::{CompactionDeveloperConfig, CompactionSelectorContext};
use crate::hummock::level_handler::LevelHandler;
pub use crate::hummock::manager::CommitEpochInfo;
use crate::hummock::model::CompactionGroup;
use crate::hummock::{CompactorManager, HummockManager, HummockManagerRef};
use crate::manager::{
Expand Down
Loading
Loading