Skip to content

Commit

Permalink
Merge branch 'main' into xxh/mfa-change-demo
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Sep 8, 2023
2 parents 42e1f74 + 83637cd commit 97d12f6
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 1 deletion.
2 changes: 2 additions & 0 deletions src/ctl/src/cmd_impl/hummock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ mod list_version_deltas;
mod pause_resume;
mod trigger_full_gc;
mod trigger_manual_compaction;
mod validate_version;

pub use compaction_group::*;
pub use list_version_deltas::*;
pub use pause_resume::*;
pub use trigger_full_gc::*;
pub use trigger_manual_compaction::*;
pub use validate_version::*;
32 changes: 32 additions & 0 deletions src/ctl/src/cmd_impl/hummock/validate_version.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_hummock_sdk::compaction_group::hummock_version_ext;
use risingwave_rpc_client::HummockMetaClient;

use crate::CtlContext;

pub async fn validate_version(context: &CtlContext) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;
let version = meta_client.get_current_version().await?;
let result = hummock_version_ext::validate_version(&version);
if !result.is_empty() {
println!("Invalid HummockVersion. Violation lists:");
for s in result {
println!("{}", s);
}
}

Ok(())
}
5 changes: 5 additions & 0 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ enum HummockCommands {
#[clap(short, long = "verbose", default_value_t = false)]
verbose: bool,
},
/// Validate the current HummockVersion.
ValidateVersion,
}

#[derive(Subcommand)]
Expand Down Expand Up @@ -588,6 +590,9 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
Commands::Hummock(HummockCommands::ListCompactionStatus { verbose }) => {
cmd_impl::hummock::list_compaction_status(context, verbose).await?;
}
Commands::Hummock(HummockCommands::ValidateVersion) => {
cmd_impl::hummock::validate_version(context).await?;
}
Commands::Table(TableCommands::Scan { mv_name, data_dir }) => {
cmd_impl::table::scan(context, mv_name, data_dir).await?
}
Expand Down
143 changes: 142 additions & 1 deletion src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::cmp::Ordering;
use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};

use itertools::Itertools;
Expand All @@ -23,12 +24,13 @@ use risingwave_pb::hummock::hummock_version_delta::GroupDeltas;
use risingwave_pb::hummock::{
CompactionConfig, CompatibilityVersion, GroupConstruct, GroupDestroy, GroupMetaChange,
GroupTableChange, HummockVersion, HummockVersionDelta, Level, LevelType, OverlappingLevel,
SstableInfo,
PbLevelType, SstableInfo,
};
use tracing::warn;

use super::StateTableId;
use crate::compaction_group::StaticCompactionGroupId;
use crate::key_range::KeyRangeCommon;
use crate::prost_key_range::KeyRangeExt;
use crate::{can_concat, CompactionGroupId, HummockSstableId, HummockSstableObjectId};

Expand Down Expand Up @@ -1059,6 +1061,145 @@ pub fn object_size_map(version: &HummockVersion) -> HashMap<HummockSstableObject
.collect()
}

/// Verify the validity of a `HummockVersion` and return a list of violations if any.
/// Currently this method is only used by risectl validate-version.
pub fn validate_version(version: &HummockVersion) -> Vec<String> {
let mut res = Vec::new();

// Ensure safe_epoch <= max_committed_epoch
if version.safe_epoch > version.max_committed_epoch {
res.push(format!(
"VERSION: safe_epoch {} > max_committed_epoch {}",
version.safe_epoch, version.max_committed_epoch
));
}

let mut table_to_group = HashMap::new();
// Ensure each table maps to only one compaction group
for (group_id, levels) in &version.levels {
// Ensure compaction group id matches
if levels.group_id != *group_id {
res.push(format!(
"GROUP {}: inconsistent group id {} in Levels",
group_id, levels.group_id
));
}

// Ensure table id is sorted
if !levels.member_table_ids.is_sorted() {
res.push(format!(
"GROUP {}: memtable_table_ids is not sorted: {:?}",
group_id, levels.member_table_ids
));
}

// Ensure table id is unique
for table_id in &levels.member_table_ids {
match table_to_group.entry(table_id) {
Entry::Occupied(e) => {
res.push(format!(
"GROUP {}: Duplicated table_id {}. First found in group {}",
group_id,
table_id,
e.get()
));
}
Entry::Vacant(e) => {
e.insert(group_id);
}
}
}

let validate_level = |group: CompactionGroupId,
expected_level_idx: u32,
level: &Level,
res: &mut Vec<String>| {
let mut level_identifier = format!("GROUP {} LEVEL {}", group, level.level_idx);
if level.level_idx == 0 {
level_identifier.push_str(format!("SUBLEVEL {}", level.sub_level_id).as_str());
// Ensure sub-level is not empty
if level.table_infos.is_empty() {
res.push(format!("{}: empty level", level_identifier));
}
} else if level.level_type() != PbLevelType::Nonoverlapping {
// Ensure non-L0 level is non-overlapping level
res.push(format!(
"{}: level type {:?} is not non-overlapping",
level_identifier,
level.level_type()
));
}

// Ensure level idx matches
if level.level_idx != expected_level_idx {
res.push(format!(
"{}: mismatched level idx {}",
level_identifier, expected_level_idx
));
}

let mut prev_table_info: Option<&SstableInfo> = None;
for table_info in &level.table_infos {
// Ensure table_ids are sorted and unique
if !table_info.table_ids.is_sorted_by(|a, b| {
if a < b {
Some(Ordering::Less)
} else {
Some(Ordering::Greater)
}
}) {
res.push(format!(
"{} SST {}: table_ids not sorted",
level_identifier, table_info.object_id
));
}

// Ensure SSTs in non-overlapping level have non-overlapping key range
if level.level_type() == PbLevelType::Nonoverlapping {
if let Some(prev) = prev_table_info.take() {
if prev
.key_range
.as_ref()
.unwrap()
.compare_right_with(&table_info.key_range.as_ref().unwrap().left)
!= Ordering::Less
{
res.push(format!(
"{} SST {}: key range should not overlap. prev={:?}, cur={:?}",
level_identifier, table_info.object_id, prev, table_info
));
}
}
let _ = prev_table_info.insert(table_info);
}
}
};

if let Some(l0) = &levels.l0 {
let mut prev_sub_level_id = u64::MAX;
for sub_level in &l0.sub_levels {
// Ensure sub_level_id is sorted and unique
if sub_level.sub_level_id >= prev_sub_level_id {
res.push(format!(
"GROUP {} LEVEL 0: sub_level_id {} >= prev_sub_level {}",
group_id, sub_level.level_idx, prev_sub_level_id
));
}
prev_sub_level_id = sub_level.sub_level_id;

validate_level(*group_id, 0, sub_level, &mut res);
}
} else {
res.push(format!("GROUP {}: level0 not exist", group_id));
}

for idx in 1..=levels.levels.len() {
validate_level(*group_id, idx as u32, levels.get_level(idx), &mut res);
}
}
res
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
Expand Down
1 change: 1 addition & 0 deletions src/storage/hummock_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#![feature(bound_map)]
#![feature(type_alias_impl_trait)]
#![feature(impl_trait_in_assoc_type)]
#![feature(is_sorted)]

mod key_cmp;
use std::cmp::Ordering;
Expand Down

0 comments on commit 97d12f6

Please sign in to comment.