Skip to content

Commit

Permalink
feat(ctl): support for compaction score query (#12966)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Oct 23, 2023
1 parent 19643ad commit 156c52e
Show file tree
Hide file tree
Showing 12 changed files with 150 additions and 171 deletions.
16 changes: 16 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,21 @@ message RiseCtlRebuildTableStatsRequest {}

message RiseCtlRebuildTableStatsResponse {}

message GetCompactionScoreRequest {
uint64 compaction_group_id = 1;
}

message GetCompactionScoreResponse {
message PickerInfo {
uint64 score = 1;
uint64 select_level = 2;
uint64 target_level = 3;
string picker_type = 4;
}
uint64 compaction_group_id = 1;
repeated PickerInfo scores = 2;
}

service HummockManagerService {
rpc UnpinVersionBefore(UnpinVersionBeforeRequest) returns (UnpinVersionBeforeResponse);
rpc GetCurrentVersion(GetCurrentVersionRequest) returns (GetCurrentVersionResponse);
Expand Down Expand Up @@ -720,6 +735,7 @@ service HummockManagerService {
rpc ListBranchedObject(ListBranchedObjectRequest) returns (ListBranchedObjectResponse);
rpc ListActiveWriteLimit(ListActiveWriteLimitRequest) returns (ListActiveWriteLimitResponse);
rpc ListHummockMetaConfig(ListHummockMetaConfigRequest) returns (ListHummockMetaConfigResponse);
rpc GetCompactionScore(GetCompactionScoreRequest) returns (GetCompactionScoreResponse);
}

message CompactionConfig {
Expand Down
4 changes: 4 additions & 0 deletions src/cmd/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![feature(result_option_inspect)]
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -72,5 +73,8 @@ pub fn ctl(opts: CtlOpts) {
.build()
.unwrap()
.block_on(risingwave_ctl::start(opts))
.inspect_err(|e| {
eprintln!("{:#?}", e);
})
.unwrap();
}
31 changes: 31 additions & 0 deletions src/ctl/src/cmd_impl/hummock/compaction_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,34 @@ pub async fn list_compaction_status(context: &CtlContext, verbose: bool) -> anyh
}
Ok(())
}

pub async fn get_compaction_score(
context: &CtlContext,
id: CompactionGroupId,
) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;
let scores = meta_client.get_compaction_score(id).await?;
let mut table = Table::new();
table.set_header({
let mut row = Row::new();
row.add_cell("Select Level".into());
row.add_cell("Target Level".into());
row.add_cell("Type".into());
row.add_cell("Score".into());
row
});
for s in scores.into_iter().sorted_by(|a, b| {
a.select_level
.cmp(&b.select_level)
.then_with(|| a.target_level.cmp(&b.target_level))
}) {
let mut row = Row::new();
row.add_cell(s.select_level.into());
row.add_cell(s.target_level.into());
row.add_cell(s.picker_type.into());
row.add_cell(s.score.into());
table.add_row(row);
}
println!("{table}");
Ok(())
}
9 changes: 9 additions & 0 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@ enum HummockCommands {
#[clap(short, long = "verbose", default_value_t = false)]
verbose: bool,
},
GetCompactionScore {
#[clap(long)]
compaction_group_id: u64,
},
/// Validate the current HummockVersion.
ValidateVersion,
/// Rebuild table stats
Expand Down Expand Up @@ -608,6 +612,11 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
Commands::Hummock(HummockCommands::ListCompactionStatus { verbose }) => {
cmd_impl::hummock::list_compaction_status(context, verbose).await?;
}
Commands::Hummock(HummockCommands::GetCompactionScore {
compaction_group_id,
}) => {
cmd_impl::hummock::get_compaction_score(context, compaction_group_id).await?;
}
Commands::Hummock(HummockCommands::ValidateVersion) => {
cmd_impl::hummock::validate_version(context).await?;
}
Expand Down
24 changes: 24 additions & 0 deletions src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::time::Duration;
use futures::StreamExt;
use itertools::Itertools;
use risingwave_common::catalog::{TableId, NON_RESERVED_SYS_CATALOG_ID};
use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerService;
use risingwave_pb::hummock::subscribe_compaction_event_request::Event as RequestEvent;
use risingwave_pb::hummock::version_update_payload::Payload;
Expand Down Expand Up @@ -602,6 +603,29 @@ impl HummockManagerService for HummockServiceImpl {
self.hummock_manager.rebuild_table_stats().await?;
Ok(Response::new(RiseCtlRebuildTableStatsResponse {}))
}

async fn get_compaction_score(
&self,
request: Request<GetCompactionScoreRequest>,
) -> Result<Response<GetCompactionScoreResponse>, Status> {
let compaction_group_id = request.into_inner().compaction_group_id;
let scores = self
.hummock_manager
.get_compaction_scores(compaction_group_id)
.await
.into_iter()
.map(|s| PickerInfo {
score: s.score,
select_level: s.select_level as _,
target_level: s.target_level as _,
picker_type: s.picker_type.to_string(),
})
.collect();
Ok(Response::new(GetCompactionScoreResponse {
compaction_group_id,
scores,
}))
}
}

#[cfg(test)]
Expand Down
25 changes: 20 additions & 5 deletions src/meta/src/hummock/compaction/selector/level_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,23 @@ pub enum PickerType {
BottomLevel,
}

impl ToString for PickerType {
fn to_string(&self) -> String {
match self {
PickerType::Tier => String::from("Tier"),
PickerType::Intra => String::from("Intra"),
PickerType::ToBase => String::from("ToBase"),
PickerType::BottomLevel => String::from("BottomLevel"),
}
}
}

#[derive(Default, Debug)]
pub struct PickerInfo {
score: u64,
select_level: usize,
target_level: usize,
picker_type: PickerType,
pub score: u64,
pub select_level: usize,
pub target_level: usize,
pub picker_type: PickerType,
}

#[derive(Default, Debug)]
Expand Down Expand Up @@ -178,7 +189,11 @@ impl DynamicLevelSelectorCore {
ctx
}

fn get_priority_levels(&self, levels: &Levels, handlers: &[LevelHandler]) -> SelectContext {
pub(crate) fn get_priority_levels(
&self,
levels: &Levels,
handlers: &[LevelHandler],
) -> SelectContext {
let mut ctx = self.calculate_level_base_size(levels);

let idle_file_count = levels
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/hummock/compaction/selector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// (found in the LICENSE.Apache file in the root directory).

mod emergency_selector;
mod level_selector;
pub(crate) mod level_selector;
mod manual_selector;
mod space_reclaim_selector;
mod tombstone_compaction_selector;
Expand Down
27 changes: 27 additions & 0 deletions src/meta/src/hummock/manager/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use itertools::Itertools;
use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId};
use risingwave_pb::hummock::{CompactStatus as PbCompactStatus, CompactTaskAssignment};

use crate::hummock::compaction::selector::level_selector::PickerInfo;
use crate::hummock::compaction::selector::DynamicLevelSelectorCore;
use crate::hummock::compaction::CompactStatus;
use crate::hummock::manager::read_lock;
use crate::hummock::HummockManager;
Expand Down Expand Up @@ -71,4 +73,29 @@ impl HummockManager {
.collect(),
)
}

#[named]
pub async fn get_compaction_scores(
&self,
compaction_group_id: CompactionGroupId,
) -> Vec<PickerInfo> {
let (status, levels, config) = {
let compaction = read_lock!(self, compaction).await;
let versioning = read_lock!(self, versioning).await;
let config_manager = self.compaction_group_manager.read().await;
match (
compaction.compaction_statuses.get(&compaction_group_id),
versioning.current_version.levels.get(&compaction_group_id),
config_manager.try_get_compaction_group_config(compaction_group_id),
) {
(Some(cs), Some(v), Some(cf)) => (cs.to_owned(), v.to_owned(), cf),
_ => {
return vec![];
}
}
};
let dynamic_level_core = DynamicLevelSelectorCore::new(config.compaction_config);
let ctx = dynamic_level_core.get_priority_levels(&levels, &status.level_handlers);
ctx.score_levels
}
}
11 changes: 5 additions & 6 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,13 @@ pub struct HummockManager {
catalog_manager: CatalogManagerRef,

fragment_manager: FragmentManagerRef,
// `CompactionGroupManager` manages `CompactionGroup`'s members.
// Note that all hummock state store user should register to `CompactionGroupManager`. It
// includes all state tables of streaming jobs except sink.
compaction_group_manager: tokio::sync::RwLock<CompactionGroupManager>,
// When trying to locks compaction and versioning at the same time, compaction lock should
// be requested before versioning lock.
/// Lock order: compaction, versioning, compaction_group_manager.
/// - Lock compaction first, then versioning, and finally compaction_group_manager.
/// - This order should be strictly followed to prevent deadlock.
compaction: MonitoredRwLock<Compaction>,
versioning: MonitoredRwLock<Versioning>,
/// `CompactionGroupManager` manages compaction configs for compaction groups.
compaction_group_manager: tokio::sync::RwLock<CompactionGroupManager>,
latest_snapshot: Snapshot,

pub metrics: Arc<MetaMetrics>,
Expand Down
13 changes: 13 additions & 0 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use risingwave_pb::ddl_service::alter_relation_name_request::Relation;
use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
use risingwave_pb::ddl_service::drop_table_request::SourceId;
use risingwave_pb::ddl_service::*;
use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
use risingwave_pb::hummock::subscribe_compaction_event_request::Register;
Expand Down Expand Up @@ -1045,6 +1046,17 @@ impl MetaClient {
))
}

pub async fn get_compaction_score(
&self,
compaction_group_id: CompactionGroupId,
) -> Result<Vec<PickerInfo>> {
let req = GetCompactionScoreRequest {
compaction_group_id,
};
let resp = self.inner.get_compaction_score(req).await?;
Ok(resp.scores)
}

pub async fn risectl_rebuild_table_stats(&self) -> Result<()> {
let req = RiseCtlRebuildTableStatsRequest {};
let _resp = self.inner.rise_ctl_rebuild_table_stats(req).await?;
Expand Down Expand Up @@ -1734,6 +1746,7 @@ macro_rules! for_all_meta_rpc {
,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse }
,{ hummock_client, get_compaction_score, GetCompactionScoreRequest, GetCompactionScoreResponse }
,{ hummock_client, rise_ctl_rebuild_table_stats, RiseCtlRebuildTableStatsRequest, RiseCtlRebuildTableStatsResponse }
,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest<Message = SubscribeCompactionEventRequest>, Streaming<SubscribeCompactionEventResponse> }
,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse }
Expand Down
3 changes: 0 additions & 3 deletions src/storage/src/hummock/sstable/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,6 @@ impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {

/// Finish building sst.
///
/// Unlike most LSM-Tree implementations, sstable meta and data are encoded separately.
/// Both meta and data has its own object (file).
///
/// # Format
///
/// data:
Expand Down
Loading

0 comments on commit 156c52e

Please sign in to comment.