Skip to content

Commit

Permalink
fix(storage): address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k committed Jul 23, 2024
1 parent 705f7b0 commit bf1addf
Show file tree
Hide file tree
Showing 54 changed files with 333 additions and 587 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions src/ctl/src/cmd_impl/hummock/list_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ pub async fn list_version(
let l0 = levels.l0.as_mut().unwrap();
for sub_level in &mut l0.sub_levels {
for t in &mut sub_level.table_infos {
t.key_range = None;
t.remove_key_range();
}
}
}

// l1 ~ lmax
for level in &mut levels.levels {
for t in &mut level.table_infos {
t.key_range = None;
t.remove_key_range();
}
}
});
Expand All @@ -63,18 +63,18 @@ pub async fn list_version(
println!(
"sub_level_id {} type {} sst_num {} size {}",
sub_level.sub_level_id,
sub_level.level_type().as_str_name(),
sub_level.level_type.as_str_name(),
sub_level.table_infos.len(),
sub_level.total_file_size
)
}
}

for level in levels.get_levels() {
for level in &levels.levels {
println!(
"level_idx {} type {} sst_num {} size {}",
level.level_idx,
level.level_type().as_str_name(),
level.level_type.as_str_name(),
level.table_infos.len(),
level.total_file_size
)
Expand Down
6 changes: 3 additions & 3 deletions src/ctl/src/cmd_impl/hummock/sst_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result
for level in version.get_combined_levels() {
for sstable_info in &level.table_infos {
if let Some(object_id) = &args.object_id {
if *object_id == sstable_info.get_object_id() {
if *object_id == sstable_info.object_id {
print_level(level, sstable_info);
sst_dump_via_sstable_store(
&sstable_store,
sstable_info.get_object_id(),
sstable_info.object_id,
sstable_info.meta_offset,
sstable_info.file_size,
&table_data,
Expand All @@ -100,7 +100,7 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result
print_level(level, sstable_info);
sst_dump_via_sstable_store(
&sstable_store,
sstable_info.get_object_id(),
sstable_info.object_id,
sstable_info.meta_offset,
sstable_info.file_size,
&table_data,
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/cmd_impl/hummock/validate_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async fn print_user_key_in_version(
.chain(cg.levels.iter())
{
for sstable_info in &level.table_infos {
let key_range = sstable_info.key_range.as_ref().unwrap();
let key_range = &sstable_info.key_range;
let left_user_key = FullKey::decode(&key_range.left);
let right_user_key = FullKey::decode(&key_range.right);
if left_user_key.user_key > *target_key || *target_key > right_user_key.user_key {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ fn remove_key_range_from_version(mut version: HummockVersion) -> HummockVersion
.chain(cg.l0.as_mut().unwrap().sub_levels.iter_mut())
{
for sst in &mut level.table_infos {
sst.key_range.take();
sst.remove_key_range();
}
}
}
Expand All @@ -117,7 +117,7 @@ fn version_to_sstable_rows(version: HummockVersion) -> Vec<RwHummockSstable> {
for cg in version.levels.into_values() {
for level in cg.levels.into_iter().chain(cg.l0.unwrap().sub_levels) {
for sst in level.table_infos {
let key_range = sst.key_range.unwrap();
let key_range = sst.key_range;
sstables.push(RwHummockSstable {
sstable_id: sst.sst_id as _,
object_id: sst.object_id as _,
Expand Down
3 changes: 1 addition & 2 deletions src/meta/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ normal = ["workspace-hack"]
[dependencies]
anyhow = "1"
async-trait = "0.1"
bytes = { version = "1" }
either = "1"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
itertools = { workspace = true }
prost ={ workspace = true }
prost = { workspace = true }
rand = { workspace = true }
regex = "1"
risingwave_common = { workspace = true }
Expand Down
11 changes: 7 additions & 4 deletions src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::{HashMap, HashSet};
use std::time::Duration;

use compact_task::PbTaskStatus;
use futures::StreamExt;
use itertools::Itertools;
use risingwave_common::catalog::{TableId, SYS_CATALOG_START_ID};
Expand Down Expand Up @@ -225,7 +226,6 @@ impl HummockManagerService for HummockServiceImpl {
&self,
request: Request<TriggerManualCompactionRequest>,
) -> Result<Response<TriggerManualCompactionResponse>, Status> {
use bytes::Bytes;
let request = request.into_inner();
let compaction_group_id = request.compaction_group_id;
let mut option = ManualCompactionOption {
Expand All @@ -238,8 +238,8 @@ impl HummockManagerService for HummockServiceImpl {
match request.key_range {
Some(pb_key_range) => {
option.key_range = KeyRange {
left: Bytes::from(pb_key_range.left),
right: Bytes::from(pb_key_range.right),
left: pb_key_range.left.into(),
right: pb_key_range.right.into(),
right_exclusive: pb_key_range.right_exclusive,
};
}
Expand Down Expand Up @@ -666,7 +666,10 @@ impl HummockManagerService for HummockServiceImpl {
let request = request.into_inner();
let ret = self
.hummock_manager
.cancel_compact_task(request.task_id, request.task_status())
.cancel_compact_task(
request.task_id,
PbTaskStatus::try_from(request.task_status).unwrap(),
)
.await?;

let response = Response::new(CancelCompactTaskResponse { ret });
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1160,7 +1160,7 @@ fn collect_commit_epoch_info(
for resp in resps {
let ssts_iter = resp.synced_sstables.into_iter().map(|grouped| {
let sst_info = grouped.sst.expect("field not None");
sst_to_worker.insert(sst_info.get_object_id(), resp.worker_id);
sst_to_worker.insert(sst_info.object_id, resp.worker_id);
LocalSstableInfo::new(
sst_info.into(),
from_prost_table_stats_map(grouped.table_stats_map),
Expand Down
16 changes: 4 additions & 12 deletions src/meta/src/hummock/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use risingwave_hummock_sdk::table_watermark::TableWatermarks;
use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId};
use risingwave_pb::hummock::compaction_config::CompactionMode;
use risingwave_pb::hummock::{CompactionConfig, LevelType, PbCompactTask};
use risingwave_pb::hummock::{CompactionConfig, LevelType};
pub use selector::{CompactionSelector, CompactionSelectorContext};

use self::selector::{EmergencySelector, LocalSelectorStatistic};
Expand Down Expand Up @@ -145,12 +145,11 @@ impl CompactStatus {
}

pub fn is_trivial_move_task(task: &CompactTask) -> bool {
if task.task_type() != TaskType::Dynamic && task.task_type() != TaskType::Emergency {
if task.task_type != TaskType::Dynamic && task.task_type != TaskType::Emergency {
return false;
}

if task.input_ssts.len() != 2
|| task.input_ssts[0].level_type() != LevelType::Nonoverlapping
if task.input_ssts.len() != 2 || task.input_ssts[0].level_type != LevelType::Nonoverlapping
{
return false;
}
Expand All @@ -173,7 +172,7 @@ impl CompactStatus {

pub fn is_trivial_reclaim(task: &CompactTask) -> bool {
// Currently all VnodeWatermark tasks are trivial reclaim.
if task.task_type() == TaskType::VnodeWatermark {
if task.task_type == TaskType::VnodeWatermark {
return true;
}
let exist_table_ids = HashSet::<u32>::from_iter(task.existing_table_ids.clone());
Expand All @@ -186,13 +185,6 @@ impl CompactStatus {
})
}

/// Declares a task as either succeeded, failed or canceled.
pub fn report_compact_task_pb(&mut self, compact_task: &PbCompactTask) {
for level in &compact_task.input_ssts {
self.level_handlers[level.level_idx as usize].remove_task(compact_task.task_id);
}
}

pub fn report_compact_task(&mut self, compact_task: &CompactTask) {
for level in &compact_task.input_ssts {
self.level_handlers[level.level_idx as usize].remove_task(compact_task.task_id);
Expand Down
22 changes: 7 additions & 15 deletions src/meta/src/hummock/compaction/overlap_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,14 @@ impl OverlapInfo for RangeOverlapInfo {
match self.target_range.as_ref() {
Some(key_range) => {
let overlap_begin = others.partition_point(|table_status| {
table_status
.key_range
.as_ref()
.unwrap()
.compare_right_with(&key_range.left)
table_status.key_range.compare_right_with(&key_range.left)
== cmp::Ordering::Less
});
if overlap_begin >= others.len() {
return overlap_begin..overlap_begin;
}
let overlap_end = others.partition_point(|table_status| {
key_range.compare_right_with(&table_status.key_range.as_ref().unwrap().left)
key_range.compare_right_with(&table_status.key_range.left)
!= cmp::Ordering::Less
});
overlap_begin..overlap_end
Expand All @@ -110,7 +106,7 @@ impl OverlapInfo for RangeOverlapInfo {
Some(key_range) => {
let overlap_begin = others.partition_point(|table_status| {
KeyComparator::compare_encoded_full_key(
&table_status.key_range.as_ref().unwrap().left,
&table_status.key_range.left,
&key_range.left,
) == cmp::Ordering::Less
});
Expand All @@ -119,9 +115,7 @@ impl OverlapInfo for RangeOverlapInfo {
}
let mut overlap_end = overlap_begin;
for table in &others[overlap_begin..] {
if key_range.compare_right_with(&table.key_range.as_ref().unwrap().right)
== cmp::Ordering::Less
{
if key_range.compare_right_with(&table.key_range.right) == cmp::Ordering::Less {
break;
}
overlap_end += 1;
Expand All @@ -133,7 +127,7 @@ impl OverlapInfo for RangeOverlapInfo {
}

fn update(&mut self, table: &SstableInfo) {
let other = table.key_range.as_ref().unwrap();
let other = &table.key_range;
if let Some(range) = self.target_range.as_mut() {
range.full_key_extend(other);
return;
Expand All @@ -147,8 +141,7 @@ pub struct RangeOverlapStrategy {}

impl OverlapStrategy for RangeOverlapStrategy {
fn check_overlap(&self, a: &SstableInfo, b: &SstableInfo) -> bool {
let key_range = a.key_range.as_ref().unwrap();
check_table_overlap(key_range, b)
check_table_overlap(&a.key_range, b)
}

fn create_overlap_info(&self) -> Box<dyn OverlapInfo> {
Expand All @@ -157,6 +150,5 @@ impl OverlapStrategy for RangeOverlapStrategy {
}

fn check_table_overlap(key_range: &KeyRange, table: &SstableInfo) -> bool {
let other = table.key_range.as_ref().unwrap();
key_range.sstable_overlap(other)
key_range.sstable_overlap(&table.key_range)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::sync::Arc;

use itertools::Itertools;
use risingwave_common::config::default::compaction_config;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockLevelsExt;
use risingwave_hummock_sdk::version::{InputLevel, Level, Levels, OverlappingLevel};
use risingwave_pb::hummock::{CompactionConfig, LevelType};

Expand Down Expand Up @@ -338,8 +337,8 @@ pub mod tests {
.pick_compaction(&levels, &levels_handler, &mut local_stats)
.unwrap();
assert_eq!(ret.input_levels[0].table_infos.len(), 1);
assert_eq!(ret.input_levels[0].table_infos[0].get_sst_id(), 4);
assert_eq!(ret.input_levels[1].table_infos[0].get_sst_id(), 1);
assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 4);
assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 1);

ret.add_pending_task(0, &mut levels_handler);
{
Expand All @@ -350,13 +349,13 @@ pub mod tests {
.unwrap();

assert_eq!(ret2.input_levels[0].table_infos.len(), 1);
assert_eq!(ret2.input_levels[0].table_infos[0].get_sst_id(), 6);
assert_eq!(ret2.input_levels[1].table_infos[0].get_sst_id(), 5);
assert_eq!(ret2.input_levels[0].table_infos[0].sst_id, 6);
assert_eq!(ret2.input_levels[1].table_infos[0].sst_id, 5);
}

levels.l0.as_mut().unwrap().sub_levels[0]
.table_infos
.retain(|table| table.get_sst_id() != 4);
.retain(|table| table.sst_id != 4);
levels.l0.as_mut().unwrap().total_file_size -= ret.input_levels[0].table_infos[0].file_size;

levels_handler[0].remove_task(0);
Expand All @@ -366,11 +365,11 @@ pub mod tests {
.pick_compaction(&levels, &levels_handler, &mut local_stats)
.unwrap();
assert_eq!(ret.input_levels.len(), 3);
assert_eq!(ret.input_levels[0].table_infos[0].get_sst_id(), 6);
assert_eq!(ret.input_levels[1].table_infos[0].get_sst_id(), 5);
assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 6);
assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 5);
assert_eq!(ret.input_levels[2].table_infos.len(), 2);
assert_eq!(ret.input_levels[2].table_infos[0].get_sst_id(), 3);
assert_eq!(ret.input_levels[2].table_infos[1].get_sst_id(), 2);
assert_eq!(ret.input_levels[2].table_infos[0].sst_id, 3);
assert_eq!(ret.input_levels[2].table_infos[1].sst_id, 2);
ret.add_pending_task(1, &mut levels_handler);

let mut local_stats = LocalPickerStatistic::default();
Expand All @@ -387,8 +386,8 @@ pub mod tests {
.pick_compaction(&levels, &levels_handler, &mut local_stats)
.unwrap();
assert_eq!(ret.input_levels.len(), 3);
assert_eq!(ret.input_levels[0].table_infos[0].get_sst_id(), 6);
assert_eq!(ret.input_levels[1].table_infos[0].get_sst_id(), 5);
assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 6);
assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 5);
assert_eq!(ret.input_levels[2].table_infos.len(), 2);
}
#[test]
Expand Down Expand Up @@ -447,7 +446,7 @@ pub mod tests {
ret.input_levels[0]
.table_infos
.iter()
.map(|t| t.get_sst_id())
.map(|t| t.sst_id)
.collect_vec(),
vec![1]
);
Expand All @@ -456,7 +455,7 @@ pub mod tests {
ret.input_levels[1]
.table_infos
.iter()
.map(|t| t.get_sst_id())
.map(|t| t.sst_id)
.collect_vec(),
vec![3,]
);
Expand Down Expand Up @@ -606,7 +605,7 @@ pub mod tests {
let ret = picker
.pick_compaction(&levels, &levels_handler, &mut local_stats)
.unwrap();
assert_eq!(ret.input_levels[0].table_infos[0].get_sst_id(), 7);
assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 7);
assert_eq!(
3,
ret.input_levels.iter().filter(|l| l.level_idx == 0).count()
Expand Down Expand Up @@ -634,7 +633,7 @@ pub mod tests {
let ret = picker
.pick_compaction(&levels, &levels_handler, &mut local_stats)
.unwrap();
assert_eq!(ret.input_levels[0].table_infos[0].get_sst_id(), 6);
assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 6);
assert_eq!(
2,
ret.input_levels.iter().filter(|l| l.level_idx == 0).count()
Expand Down
Loading

0 comments on commit bf1addf

Please sign in to comment.