Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): support commit epoch on multiple graphs at once #18819

Merged
merged 14 commits into from
Oct 16, 2024
1 change: 0 additions & 1 deletion proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -525,10 +525,9 @@
common.Status status = 1;
}

message ValidationTask {

Check failure on line 528 in proto/hummock.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "3" with name "epoch" on message "ValidationTask" was deleted without reserving the name "epoch".

Check failure on line 528 in proto/hummock.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "3" with name "epoch" on message "ValidationTask" was deleted without reserving the number "3".
repeated SstableInfo sst_infos = 1;
map<uint64, uint32> sst_id_to_worker_id = 2;
uint64 epoch = 3;
}

// Delete SSTs in object store
Expand Down
535 changes: 225 additions & 310 deletions src/meta/src/barrier/mod.rs

Large diffs are not rendered by default.

83 changes: 50 additions & 33 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;

use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::change_log::ChangeLogDelta;
Expand All @@ -28,6 +29,7 @@ use risingwave_hummock_sdk::{
CompactionGroupId, HummockContextId, HummockSstableObjectId, LocalSstableInfo,
};
use risingwave_pb::hummock::compact_task::{self};
use risingwave_pb::hummock::CompactionConfig;
use sea_orm::TransactionTrait;

use crate::hummock::error::{Error, Result};
Expand All @@ -47,7 +49,6 @@ use crate::hummock::{
};

pub enum NewTableFragmentInfo {
None,
Normal {
mv_table_id: Option<TableId>,
internal_table_ids: Vec<TableId>,
Expand All @@ -57,14 +58,15 @@ pub enum NewTableFragmentInfo {
},
}

#[derive(Default)]
pub struct CommitEpochInfo {
pub sstables: Vec<LocalSstableInfo>,
pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
pub sst_to_context: HashMap<HummockSstableObjectId, HummockContextId>,
pub new_table_fragment_info: NewTableFragmentInfo,
pub new_table_fragment_infos: Vec<NewTableFragmentInfo>,
pub change_log_delta: HashMap<TableId, ChangeLogDelta>,
pub committed_epoch: u64,
pub tables_to_commit: HashSet<TableId>,
/// `table_id` -> `committed_epoch`
pub tables_to_commit: HashMap<TableId, u64>,
}

impl HummockManager {
Expand All @@ -75,9 +77,8 @@ impl HummockManager {
mut sstables,
new_table_watermarks,
sst_to_context,
new_table_fragment_info,
new_table_fragment_infos,
change_log_delta,
committed_epoch,
tables_to_commit,
} = commit_info;
let mut versioning_guard = self.versioning.write().await;
Expand All @@ -91,7 +92,6 @@ impl HummockManager {

let versioning: &mut Versioning = &mut versioning_guard;
self.commit_epoch_sanity_check(
committed_epoch,
&tables_to_commit,
&sstables,
&sst_to_context,
Expand Down Expand Up @@ -124,15 +124,18 @@ impl HummockManager {

let state_table_info = &version.latest_version().state_table_info;
let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id();
let mut new_table_ids = HashMap::new();
let mut new_compaction_groups = HashMap::new();
let mut compaction_group_manager_txn = None;
let mut compaction_group_config: Option<Arc<CompactionConfig>> = None;

// Add new table
let (new_table_ids, new_compaction_group, compaction_group_manager_txn) =
for new_table_fragment_info in new_table_fragment_infos {
match new_table_fragment_info {
NewTableFragmentInfo::Normal {
mv_table_id,
internal_table_ids,
} => {
let mut new_table_ids = HashMap::new();
on_handle_add_new_table(
state_table_info,
&internal_table_ids,
Expand All @@ -148,24 +151,40 @@ impl HummockManager {
&mut table_compaction_group_mapping,
&mut new_table_ids,
)?;
(new_table_ids, None, None)
}
NewTableFragmentInfo::NewCompactionGroup { table_ids } => {
let compaction_group_manager_guard =
self.compaction_group_manager.write().await;
let compaction_group_config =
compaction_group_manager_guard.default_compaction_config();
let mut compaction_group_manager =
CompactionGroupManager::start_owned_compaction_groups_txn(
compaction_group_manager_guard,
);
let mut new_table_ids = HashMap::new();
let (compaction_group_manager, compaction_group_config) =
if let Some(compaction_group_manager) = &mut compaction_group_manager_txn {
(
compaction_group_manager,
(*compaction_group_config
.as_ref()
.expect("must be set with compaction_group_manager_txn"))
.clone(),
)
} else {
let compaction_group_manager_guard =
self.compaction_group_manager.write().await;
let new_compaction_group_config =
compaction_group_manager_guard.default_compaction_config();
compaction_group_config = Some(new_compaction_group_config.clone());
(
compaction_group_manager_txn.insert(
CompactionGroupManager::start_owned_compaction_groups_txn(
compaction_group_manager_guard,
),
),
new_compaction_group_config,
)
};
let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
new_compaction_groups
.insert(new_compaction_group_id, compaction_group_config.clone());
compaction_group_manager.insert(
new_compaction_group_id,
CompactionGroup {
group_id: new_compaction_group_id,
compaction_config: compaction_group_config.clone(),
compaction_config: compaction_group_config,
},
);

Expand All @@ -176,14 +195,9 @@ impl HummockManager {
&mut table_compaction_group_mapping,
&mut new_table_ids,
)?;
(
new_table_ids,
Some((new_compaction_group_id, (*compaction_group_config).clone())),
Some(compaction_group_manager),
)
}
NewTableFragmentInfo::None => (HashMap::new(), None, None),
};
}
}

let commit_sstables = self
.correct_commit_ssts(sstables, &table_compaction_group_mapping)
Expand All @@ -192,9 +206,8 @@ impl HummockManager {
let modified_compaction_groups: Vec<_> = commit_sstables.keys().cloned().collect();

let time_travel_delta = version.pre_commit_epoch(
committed_epoch,
&tables_to_commit,
new_compaction_group,
new_compaction_groups,
commit_sstables,
&new_table_ids,
new_table_watermarks,
Expand Down Expand Up @@ -251,9 +264,14 @@ impl HummockManager {
.values()
.map(|g| (g.group_id, g.parent_group_id))
.collect();
let time_travel_tables_to_commit = table_compaction_group_mapping
.iter()
.filter(|(table_id, _)| tables_to_commit.contains(table_id));
let time_travel_tables_to_commit =
table_compaction_group_mapping
.iter()
.filter_map(|(table_id, cg_id)| {
tables_to_commit
.get(table_id)
.map(|committed_epoch| (table_id, cg_id, *committed_epoch))
});
let mut txn = self.env.meta_store_ref().conn.begin().await?;
let version_snapshot_sst_ids = self
.write_time_travel_metadata(
Expand All @@ -263,7 +281,6 @@ impl HummockManager {
&group_parents,
&versioning.last_time_travel_snapshot_sst_ids,
time_travel_tables_to_commit,
committed_epoch,
)
.await?;
commit_multi_var_with_provided_txn!(
Expand Down
10 changes: 4 additions & 6 deletions src/meta/src/hummock/manager/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_hummock_sdk::{
HummockContextId, HummockEpoch, HummockSstableObjectId, HummockVersionId, LocalSstableInfo,
HummockContextId, HummockSstableObjectId, HummockVersionId, LocalSstableInfo,
INVALID_VERSION_ID,
};
use risingwave_pb::hummock::{HummockPinnedVersion, ValidationTask};
Expand Down Expand Up @@ -189,8 +189,7 @@ impl HummockManager {

pub async fn commit_epoch_sanity_check(
&self,
committed_epoch: HummockEpoch,
tables_to_commit: &HashSet<TableId>,
tables_to_commit: &HashMap<TableId, u64>,
sstables: &[LocalSstableInfo],
sst_to_context: &HashMap<HummockSstableObjectId, HummockContextId>,
current_version: &HummockVersion,
Expand All @@ -216,9 +215,9 @@ impl HummockManager {
}

// sanity check on monotonically increasing table committed epoch
for table_id in tables_to_commit {
for (table_id, committed_epoch) in tables_to_commit {
if let Some(info) = current_version.state_table_info.info().get(table_id) {
if committed_epoch <= info.committed_epoch {
if *committed_epoch <= info.committed_epoch {
return Err(anyhow::anyhow!(
"table {} Epoch {} <= committed_epoch {}",
table_id,
Expand Down Expand Up @@ -265,7 +264,6 @@ impl HummockManager {
.send_event(ResponseEvent::ValidationTask(ValidationTask {
sst_infos: sst_infos.into_iter().map(|sst| sst.into()).collect_vec(),
sst_id_to_worker_id: sst_to_context.clone(),
epoch: committed_epoch,
}))
.is_err()
{
Expand Down
5 changes: 2 additions & 3 deletions src/meta/src/hummock/manager/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,7 @@ impl HummockManager {
delta: HummockVersionDelta,
group_parents: &HashMap<CompactionGroupId, CompactionGroupId>,
skip_sst_ids: &HashSet<HummockSstableId>,
tables_to_commit: impl Iterator<Item = (&TableId, &CompactionGroupId)>,
committed_epoch: u64,
tables_to_commit: impl Iterator<Item = (&TableId, &CompactionGroupId, u64)>,
) -> Result<Option<HashSet<HummockSstableId>>> {
let select_groups = group_parents
.iter()
Expand Down Expand Up @@ -397,7 +396,7 @@ impl HummockManager {
Ok(count)
}

for (table_id, cg_id) in tables_to_commit {
for (table_id, cg_id, committed_epoch) in tables_to_commit {
if !select_groups.contains(cg_id) {
continue;
}
Expand Down
21 changes: 10 additions & 11 deletions src/meta/src/hummock/manager/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{BTreeMap, HashMap};
use std::ops::{Deref, DerefMut};
use std::sync::Arc;

use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::change_log::ChangeLogDelta;
Expand All @@ -23,9 +24,7 @@ use risingwave_hummock_sdk::table_watermark::TableWatermarks;
use risingwave_hummock_sdk::version::{
GroupDelta, HummockVersion, HummockVersionDelta, IntraLevelDelta,
};
use risingwave_hummock_sdk::{
CompactionGroupId, FrontendHummockVersionDelta, HummockEpoch, HummockVersionId,
};
use risingwave_hummock_sdk::{CompactionGroupId, FrontendHummockVersionDelta, HummockVersionId};
use risingwave_pb::hummock::{
CompactionConfig, CompatibilityVersion, GroupConstruct, HummockVersionDeltas,
HummockVersionStats, StateTableInfoDelta,
Expand Down Expand Up @@ -113,9 +112,8 @@ impl<'a> HummockVersionTransaction<'a> {
/// Returns a duplicate delta, used by time travel.
pub(super) fn pre_commit_epoch(
&mut self,
committed_epoch: HummockEpoch,
tables_to_commit: &HashSet<TableId>,
new_compaction_group: Option<(CompactionGroupId, CompactionConfig)>,
tables_to_commit: &HashMap<TableId, u64>,
new_compaction_groups: HashMap<CompactionGroupId, Arc<CompactionConfig>>,
commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>,
new_table_ids: &HashMap<TableId, CompactionGroupId>,
new_table_watermarks: HashMap<TableId, TableWatermarks>,
Expand All @@ -125,7 +123,7 @@ impl<'a> HummockVersionTransaction<'a> {
new_version_delta.new_table_watermarks = new_table_watermarks;
new_version_delta.change_log_delta = change_log_delta;

if let Some((compaction_group_id, compaction_group_config)) = new_compaction_group {
for (compaction_group_id, compaction_group_config) in new_compaction_groups {
{
let group_deltas = &mut new_version_delta
.group_deltas
Expand All @@ -135,7 +133,7 @@ impl<'a> HummockVersionTransaction<'a> {

#[expect(deprecated)]
group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct {
group_config: Some(compaction_group_config.clone()),
group_config: Some((*compaction_group_config).clone()),
group_id: compaction_group_id,
parent_group_id: StaticCompactionGroupId::NewCompactionGroup
as CompactionGroupId,
Expand Down Expand Up @@ -173,6 +171,7 @@ impl<'a> HummockVersionTransaction<'a> {
"newly added table exists previously: {:?}",
table_id
);
let committed_epoch = *tables_to_commit.get(table_id).expect("newly added table must exist in tables_to_commit");
delta.state_table_info_delta.insert(
*table_id,
StateTableInfoDelta {
Expand All @@ -182,7 +181,7 @@ impl<'a> HummockVersionTransaction<'a> {
);
}

for table_id in tables_to_commit {
for (table_id, committed_epoch) in tables_to_commit {
if new_table_ids.contains_key(table_id) {
continue;
}
Expand All @@ -194,7 +193,7 @@ impl<'a> HummockVersionTransaction<'a> {
.insert(
*table_id,
StateTableInfoDelta {
committed_epoch,
committed_epoch: *committed_epoch,
compaction_group_id: info.compaction_group_id,
}
)
Expand Down
13 changes: 6 additions & 7 deletions src/meta/src/hummock/mock_hummock_meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,20 +171,20 @@ impl HummockMetaClient for MockHummockMetaClient {
.chain(table_ids.iter().cloned())
.collect::<BTreeSet<_>>();

let new_table_fragment_info = if commit_table_ids
let new_table_fragment_infos = if commit_table_ids
.iter()
.all(|table_id| table_ids.contains(table_id))
{
NewTableFragmentInfo::None
vec![]
} else {
NewTableFragmentInfo::Normal {
vec![NewTableFragmentInfo::Normal {
mv_table_id: None,
internal_table_ids: commit_table_ids
.iter()
.cloned()
.map(TableId::from)
.collect_vec(),
}
}]
};

let sst_to_context = sync_result
Expand Down Expand Up @@ -215,13 +215,12 @@ impl HummockMetaClient for MockHummockMetaClient {
sstables: sync_result.uncommitted_ssts,
new_table_watermarks: new_table_watermark,
sst_to_context,
new_table_fragment_info,
new_table_fragment_infos,
change_log_delta: table_change_log,
committed_epoch: epoch,
tables_to_commit: commit_table_ids
.iter()
.cloned()
.map(TableId::from)
.map(|table_id| (TableId::new(table_id), epoch))
.collect(),
})
.await
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#![feature(const_option)]
#![feature(anonymous_lifetime_in_impl_trait)]
#![feature(duration_millis_float)]
#![feature(option_get_or_insert_default)]

pub mod backup_restore;
pub mod barrier;
Expand Down
Loading
Loading