-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(storage): support commit multi epoch for new compaction group #17749
Changes from 8 commits
0d71708
b68a0b3
36f2175
cdad987
9c47322
006a54c
c5bcda2
e1b188c
8666c2d
044c891
c2194d7
7f0d985
913c5d1
a7c301d
5475645
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -40,7 +40,8 @@ use crate::hummock::manager::versioning::Versioning; | |||||
use crate::hummock::metrics_utils::{ | ||||||
get_or_create_local_table_stat, trigger_local_table_stat, trigger_sst_stat, | ||||||
}; | ||||||
use crate::hummock::sequence::next_sstable_object_id; | ||||||
use crate::hummock::model::CompactionGroup; | ||||||
use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id}; | ||||||
use crate::hummock::{ | ||||||
commit_multi_var, commit_multi_var_with_provided_txn, start_measure_real_process_timer, | ||||||
HummockManager, | ||||||
|
@@ -53,6 +54,11 @@ pub struct NewTableFragmentInfo { | |||||
pub internal_table_ids: Vec<TableId>, | ||||||
} | ||||||
|
||||||
pub struct BatchCommitForNewCg { | ||||||
pub epoch_to_ssts: BTreeMap<HummockEpoch, Vec<LocalSstableInfo>>, | ||||||
pub table_ids: Vec<TableId>, | ||||||
} | ||||||
|
||||||
pub struct CommitEpochInfo { | ||||||
pub sstables: Vec<LocalSstableInfo>, | ||||||
pub new_table_watermarks: HashMap<TableId, TableWatermarks>, | ||||||
|
@@ -61,6 +67,9 @@ pub struct CommitEpochInfo { | |||||
pub change_log_delta: HashMap<TableId, ChangeLogDelta>, | ||||||
pub table_committed_epoch: BTreeMap<HummockEpoch, HashSet<TableId>>, | ||||||
pub max_committed_epoch: HummockEpoch, | ||||||
|
||||||
// commit multi Epoch and SSTs for new compaction group | ||||||
pub batch_commit_for_new_cg: Vec<BatchCommitForNewCg>, | ||||||
} | ||||||
|
||||||
impl CommitEpochInfo { | ||||||
|
@@ -72,6 +81,7 @@ impl CommitEpochInfo { | |||||
change_log_delta: HashMap<TableId, ChangeLogDelta>, | ||||||
table_committed_epoch: BTreeMap<HummockEpoch, HashSet<TableId>>, | ||||||
max_committed_epoch: HummockEpoch, | ||||||
batch_commit_for_new_cg: Vec<BatchCommitForNewCg>, | ||||||
) -> Self { | ||||||
Self { | ||||||
sstables, | ||||||
|
@@ -81,17 +91,19 @@ impl CommitEpochInfo { | |||||
change_log_delta, | ||||||
table_committed_epoch, | ||||||
max_committed_epoch, | ||||||
batch_commit_for_new_cg, | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
impl HummockManager { | ||||||
#[cfg(any(test, feature = "test"))] | ||||||
pub async fn commit_epoch_for_test( | ||||||
pub async fn commit_epoch_with_batch_cg_for_test( | ||||||
&self, | ||||||
epoch: HummockEpoch, | ||||||
sstables: Vec<impl Into<LocalSstableInfo>>, | ||||||
sst_to_context: HashMap<HummockSstableObjectId, HummockContextId>, | ||||||
batch_commit_for_new_cg: Vec<BatchCommitForNewCg>, | ||||||
) -> Result<()> { | ||||||
let tables = self | ||||||
.versioning | ||||||
|
@@ -111,11 +123,23 @@ impl HummockManager { | |||||
HashMap::new(), | ||||||
BTreeMap::from_iter([(epoch, tables)]), | ||||||
epoch, | ||||||
batch_commit_for_new_cg, | ||||||
); | ||||||
self.commit_epoch(info).await?; | ||||||
Ok(()) | ||||||
} | ||||||
|
||||||
#[cfg(any(test, feature = "test"))] | ||||||
pub async fn commit_epoch_for_test( | ||||||
&self, | ||||||
epoch: HummockEpoch, | ||||||
sstables: Vec<impl Into<LocalSstableInfo>>, | ||||||
sst_to_context: HashMap<HummockSstableObjectId, HummockContextId>, | ||||||
) -> Result<()> { | ||||||
self.commit_epoch_with_batch_cg_for_test(epoch, sstables, sst_to_context, vec![]) | ||||||
.await | ||||||
} | ||||||
|
||||||
/// Caller should ensure `epoch` > `max_committed_epoch` | ||||||
pub async fn commit_epoch( | ||||||
&self, | ||||||
|
@@ -129,8 +153,11 @@ impl HummockManager { | |||||
change_log_delta, | ||||||
table_committed_epoch, | ||||||
max_committed_epoch: epoch, | ||||||
batch_commit_for_new_cg, | ||||||
} = commit_info; | ||||||
let mut versioning_guard = self.versioning.write().await; | ||||||
let mut compaction_group_manager_guard = self.compaction_group_manager.write().await; | ||||||
let compaction_group_config = compaction_group_manager_guard.default_compaction_config(); | ||||||
let _timer = start_measure_real_process_timer!(self, "commit_epoch"); | ||||||
// Prevent commit new epochs if this flag is set | ||||||
if versioning_guard.disable_commit_epochs { | ||||||
|
@@ -161,16 +188,19 @@ impl HummockManager { | |||||
self.env.notification_manager(), | ||||||
&self.metrics, | ||||||
); | ||||||
let mut compaction_group_manager = | ||||||
compaction_group_manager_guard.start_compaction_groups_txn(); | ||||||
|
||||||
let state_table_info = version.latest_version().state_table_info.clone(); | ||||||
|
||||||
let state_table_info = &version.latest_version().state_table_info; | ||||||
let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id(); | ||||||
|
||||||
let mut new_table_ids = HashMap::new(); | ||||||
// Add new table | ||||||
if let Some(new_fragment_table_info) = new_table_fragment_info { | ||||||
if !new_fragment_table_info.internal_table_ids.is_empty() { | ||||||
on_handle_add_new_table( | ||||||
state_table_info, | ||||||
&state_table_info, | ||||||
&new_fragment_table_info.internal_table_ids, | ||||||
StaticCompactionGroupId::StateDefault as u64, | ||||||
&mut table_compaction_group_mapping, | ||||||
|
@@ -180,7 +210,7 @@ impl HummockManager { | |||||
|
||||||
if let Some(mv_table_id) = new_fragment_table_info.mv_table_id { | ||||||
on_handle_add_new_table( | ||||||
state_table_info, | ||||||
&state_table_info, | ||||||
&[mv_table_id], | ||||||
StaticCompactionGroupId::MaterializedView as u64, | ||||||
&mut table_compaction_group_mapping, | ||||||
|
@@ -189,6 +219,45 @@ impl HummockManager { | |||||
} | ||||||
} | ||||||
|
||||||
let batch_commit_for_new_cg = if !batch_commit_for_new_cg.is_empty() { | ||||||
let mut new_id_count = 0; | ||||||
let mut batch_commit_info = HashMap::new(); | ||||||
for BatchCommitForNewCg { | ||||||
epoch_to_ssts, | ||||||
table_ids, | ||||||
} in batch_commit_for_new_cg | ||||||
{ | ||||||
let new_compaction_group_id = next_compaction_group_id(&self.env).await?; | ||||||
compaction_group_manager.insert( | ||||||
new_compaction_group_id, | ||||||
CompactionGroup { | ||||||
group_id: new_compaction_group_id, | ||||||
compaction_config: compaction_group_config.clone(), | ||||||
}, | ||||||
); | ||||||
|
||||||
new_id_count += epoch_to_ssts.values().map(|ssts| ssts.len()).sum::<usize>(); | ||||||
|
||||||
on_handle_add_new_table( | ||||||
&state_table_info, | ||||||
&table_ids, | ||||||
new_compaction_group_id, | ||||||
&mut table_compaction_group_mapping, | ||||||
&mut new_table_ids, | ||||||
)?; | ||||||
|
||||||
batch_commit_info.insert(new_compaction_group_id, epoch_to_ssts); | ||||||
} | ||||||
let start_sst_id = next_sstable_object_id(&self.env, new_id_count).await?; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we don't need to reserve new sst ids for the new compaction group, because its parent group will be StaticCompactionGroupId::NewCompactionGroup so that the reserved new sst ids won't never used anyway. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The newly added SSTs in See
@Li0k Could you please confirm this again? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Get it. I just check the usage of |
||||||
Some(( | ||||||
batch_commit_info, | ||||||
start_sst_id, | ||||||
(*compaction_group_config).clone(), | ||||||
)) | ||||||
} else { | ||||||
None | ||||||
}; | ||||||
|
||||||
let commit_sstables = self | ||||||
.correct_commit_ssts(sstables, &table_compaction_group_mapping) | ||||||
.await?; | ||||||
|
@@ -201,6 +270,7 @@ impl HummockManager { | |||||
new_table_ids, | ||||||
new_table_watermarks, | ||||||
change_log_delta, | ||||||
batch_commit_for_new_cg, | ||||||
); | ||||||
|
||||||
// TODO: remove the sanity check when supporting partial checkpoint | ||||||
|
@@ -280,12 +350,22 @@ impl HummockManager { | |||||
&versioning.last_time_travel_snapshot_sst_ids, | ||||||
) | ||||||
.await?; | ||||||
commit_multi_var_with_provided_txn!(txn, version, version_stats)?; | ||||||
commit_multi_var_with_provided_txn!( | ||||||
txn, | ||||||
version, | ||||||
version_stats, | ||||||
compaction_group_manager | ||||||
)?; | ||||||
if let Some(version_snapshot_sst_ids) = version_snapshot_sst_ids { | ||||||
versioning.last_time_travel_snapshot_sst_ids = version_snapshot_sst_ids; | ||||||
} | ||||||
} else { | ||||||
commit_multi_var!(self.meta_store_ref(), version, version_stats)?; | ||||||
commit_multi_var!( | ||||||
self.meta_store_ref(), | ||||||
version, | ||||||
version_stats, | ||||||
compaction_group_manager | ||||||
)?; | ||||||
} | ||||||
|
||||||
let snapshot = HummockSnapshot { | ||||||
|
@@ -306,6 +386,7 @@ impl HummockManager { | |||||
} | ||||||
|
||||||
drop(versioning_guard); | ||||||
drop(compaction_group_manager_guard); | ||||||
wenym1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
tracing::trace!("new committed epoch {}", epoch); | ||||||
|
||||||
// Don't trigger compactions if we enable deterministic compaction | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,13 +17,17 @@ use std::ops::{Deref, DerefMut}; | |
|
||
use risingwave_common::catalog::TableId; | ||
use risingwave_hummock_sdk::change_log::ChangeLogDelta; | ||
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; | ||
use risingwave_hummock_sdk::sstable_info::SstableInfo; | ||
use risingwave_hummock_sdk::table_watermark::TableWatermarks; | ||
use risingwave_hummock_sdk::version::{ | ||
GroupDelta, HummockVersion, HummockVersionDelta, IntraLevelDelta, | ||
}; | ||
use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, HummockVersionId}; | ||
use risingwave_pb::hummock::{HummockVersionStats, StateTableInfoDelta}; | ||
use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, HummockVersionId, LocalSstableInfo}; | ||
use risingwave_pb::hummock::{ | ||
CompactionConfig, CompatibilityVersion, GroupConstruct, HummockVersionStats, | ||
StateTableInfoDelta, | ||
}; | ||
use risingwave_pb::meta::subscribe_response::{Info, Operation}; | ||
|
||
use crate::manager::NotificationManager; | ||
|
@@ -109,18 +113,63 @@ impl<'a> HummockVersionTransaction<'a> { | |
} | ||
|
||
/// Returns a duplicate delta, used by time travel. | ||
#[expect(clippy::type_complexity)] | ||
pub(super) fn pre_commit_epoch( | ||
&mut self, | ||
epoch: HummockEpoch, | ||
commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>, | ||
new_table_ids: HashMap<TableId, CompactionGroupId>, | ||
new_table_watermarks: HashMap<TableId, TableWatermarks>, | ||
change_log_delta: HashMap<TableId, ChangeLogDelta>, | ||
batch_commit_for_new_cg: Option<( | ||
HashMap<CompactionGroupId, BTreeMap<u64, Vec<LocalSstableInfo>>>, | ||
u64, | ||
CompactionConfig, | ||
)>, | ||
) -> HummockVersionDelta { | ||
let mut new_version_delta = self.new_delta(); | ||
new_version_delta.max_committed_epoch = epoch; | ||
new_version_delta.new_table_watermarks = new_table_watermarks; | ||
new_version_delta.change_log_delta = change_log_delta; | ||
|
||
if let Some((batch_commit_for_new_cg, start_sst_id, compaction_group_config)) = | ||
batch_commit_for_new_cg | ||
{ | ||
let mut start_sst_id = start_sst_id; | ||
|
||
for (compaction_group_id, batch_commit_sst) in batch_commit_for_new_cg { | ||
let group_deltas = &mut new_version_delta | ||
.group_deltas | ||
.entry(compaction_group_id) | ||
.or_default() | ||
.group_deltas; | ||
|
||
#[expect(deprecated)] | ||
group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct { | ||
group_config: Some(compaction_group_config.clone()), | ||
group_id: compaction_group_id, | ||
parent_group_id: StaticCompactionGroupId::NewCompactionGroup | ||
as CompactionGroupId, | ||
new_sst_start_id: start_sst_id, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This specific new_sst_start_id will never be used, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think here we just fill in a field required to create new compaction group. What is this field used for? If it's not used for newly created compaction group, I can change to fill a trivial value. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
table_ids: vec![], | ||
version: CompatibilityVersion::NoMemberTableIds as i32, | ||
})); | ||
|
||
for (epoch, insert_ssts) in batch_commit_sst { | ||
start_sst_id += insert_ssts.len() as u64; | ||
let l0_sub_level_id = epoch; | ||
wenym1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new( | ||
0, | ||
l0_sub_level_id, // default | ||
vec![], | ||
insert_ssts.into_iter().map(|s| s.sst_info).collect(), // default | ||
0, // default | ||
)); | ||
group_deltas.push(group_delta); | ||
} | ||
} | ||
} | ||
|
||
// Append SSTs to a new version. | ||
for (compaction_group_id, inserted_table_infos) in commit_sstables { | ||
let group_deltas = &mut new_version_delta | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we only acquire the
compaction_group_manager
lock only whenbatch_commit_for_new_cg
is not empty in L222?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
The original
BTreeMapTransaction
cannot fulfill this, because it holds the mutation reference to the RwLockGuard, and they cannot be created and stored together, and otherwise there will be the problem of self-referencing.In this PR, I change to let
BTreeMapTransaction
support holding any pointer type that implDerefMut<BTreeMap>
, so that the ownership ofRwLockGuard
can be held byBTreeMapTransaction
and there won't be self-referencing. We further introduce a new structDerefMutForward
, which forward&mut CompactionGroupManager
to&mut BTreeMap<...>
. This is required becauseRwLockGuard
wrapsCompactionGroupManager
, but we actually needimpl DerefMut<BTreeMap<...>>
.