Skip to content

Commit

Permalink
feat(frontend): support compaction status
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k committed Oct 25, 2023
1 parent 6939da6 commit 1495925
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/frontend/src/catalog/system_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ prepare_sys_catalog! {
{ BuiltinCatalog::Table(&RW_HUMMOCK_BRANCHED_OBJECTS), read_hummock_branched_objects await },
{ BuiltinCatalog::Table(&RW_HUMMOCK_COMPACTION_GROUP_CONFIGS), read_hummock_compaction_group_configs await },
{ BuiltinCatalog::Table(&RW_HUMMOCK_META_CONFIGS), read_hummock_meta_configs await},
{ BuiltinCatalog::Table(&RW_HUMMOCK_COMPACTION_STATUS), read_hummock_compaction_status await },
}

#[cfg(test)]
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod rw_fragments;
mod rw_functions;
mod rw_hummock_branched_objects;
mod rw_hummock_compaction_group_configs;
mod rw_hummock_compaction_status;
mod rw_hummock_meta_configs;
mod rw_hummock_pinned_snapshots;
mod rw_hummock_pinned_versions;
Expand Down Expand Up @@ -54,6 +55,7 @@ pub use rw_fragments::*;
pub use rw_functions::*;
pub use rw_hummock_branched_objects::*;
pub use rw_hummock_compaction_group_configs::*;
pub use rw_hummock_compaction_status::*;
pub use rw_hummock_meta_configs::*;
pub use rw_hummock_pinned_snapshots::*;
pub use rw_hummock_pinned_versions::*;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::catalog::RW_CATALOG_SCHEMA_NAME;
use risingwave_common::error::Result;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_pb::hummock::CompactTaskAssignment;
use serde_json::json;

use crate::catalog::system_catalog::{BuiltinTable, SysCatalogReaderImpl};

pub const RW_HUMMOCK_COMPACTION_STATUS: BuiltinTable = BuiltinTable {
name: "rw_hummock_compaction_status",
schema: RW_CATALOG_SCHEMA_NAME,
columns: &[
(DataType::Int64, "compaction_group_id"),
(DataType::Int64, "task_id"),
(DataType::Int32, "select_level"),
(DataType::Int32, "target_level"),
(DataType::Int32, "task_type"),
(DataType::Int32, "task_status"),
(DataType::Int64, "watermark"),
(DataType::Int32, "base_level"),
(DataType::Boolean, "gc_delete_keys"),
(DataType::Int64, "target_file_size"),
(DataType::Int64, "target_sub_level_id"),
(DataType::Jsonb, "compact_task"),
],
pk: &[0],
};

impl SysCatalogReaderImpl {
pub async fn read_hummock_compaction_status(&self) -> Result<Vec<OwnedRow>> {
let (_compaction_status, compaction_assignment, _compaction_progress) =
self.meta_client.list_compaction_status().await?;
Ok(assignments_to_rows(compaction_assignment))
}
}

fn assignments_to_rows(assignments: Vec<CompactTaskAssignment>) -> Vec<OwnedRow> {
let mut rows = vec![];
for assignment in assignments {
let compact_task = assignment.compact_task.unwrap();

let select_level = compact_task.input_ssts[0].level_idx;

rows.push(OwnedRow::new(vec![
Some(ScalarImpl::Int64(compact_task.compaction_group_id as _)),
Some(ScalarImpl::Int64(compact_task.task_id as _)),
Some(ScalarImpl::Int32(select_level as _)),
Some(ScalarImpl::Int32(compact_task.target_level as _)),
Some(ScalarImpl::Int32(compact_task.task_type as _)),
Some(ScalarImpl::Int32(compact_task.task_status as _)),
Some(ScalarImpl::Int64(compact_task.watermark as _)),
Some(ScalarImpl::Int32(compact_task.base_level as _)),
Some(ScalarImpl::Bool(compact_task.gc_delete_keys as _)),
Some(ScalarImpl::Int64(compact_task.target_file_size as _)),
Some(ScalarImpl::Int64(compact_task.target_sub_level_id as _)),
Some(ScalarImpl::Jsonb(json!(compact_task).into())),
]));
}

rows
}
21 changes: 20 additions & 1 deletion src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use risingwave_pb::catalog::Table;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::write_limits::WriteLimit;
use risingwave_pb::hummock::{
BranchedObject, CompactionGroupInfo, HummockSnapshot, HummockVersion, HummockVersionDelta,
BranchedObject, CompactStatus, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
HummockSnapshot, HummockVersion, HummockVersionDelta,
};
use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
use risingwave_pb::meta::list_actor_states_response::ActorState;
Expand Down Expand Up @@ -93,6 +94,14 @@ pub trait FrontendMetaClient: Send + Sync {
async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>>;

async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>>;

async fn list_compaction_status(
&self,
) -> Result<(
Vec<CompactStatus>,
Vec<CompactTaskAssignment>,
Vec<CompactTaskProgress>,
)>;
}

pub struct FrontendMetaClientImpl(pub MetaClient);
Expand Down Expand Up @@ -233,4 +242,14 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>> {
self.0.list_hummock_meta_config().await
}

async fn list_compaction_status(
&self,
) -> Result<(
Vec<CompactStatus>,
Vec<CompactTaskAssignment>,
Vec<CompactTaskProgress>,
)> {
self.0.risectl_list_compaction_status().await
}
}
13 changes: 12 additions & 1 deletion src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ use risingwave_pb::catalog::{
use risingwave_pb::ddl_service::{create_connection_request, DdlProgress};
use risingwave_pb::hummock::write_limits::WriteLimit;
use risingwave_pb::hummock::{
BranchedObject, CompactionGroupInfo, HummockSnapshot, HummockVersion, HummockVersionDelta,
BranchedObject, CompactStatus, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
HummockSnapshot, HummockVersion, HummockVersionDelta,
};
use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
use risingwave_pb::meta::list_actor_states_response::ActorState;
Expand Down Expand Up @@ -863,6 +864,16 @@ impl FrontendMetaClient for MockFrontendMetaClient {
async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
unimplemented!()
}

async fn list_compaction_status(
&self,
) -> RpcResult<(
Vec<CompactStatus>,
Vec<CompactTaskAssignment>,
Vec<CompactTaskProgress>,
)> {
unimplemented!()
}
}

#[cfg(test)]
Expand Down

0 comments on commit 1495925

Please sign in to comment.