diff --git a/src/frontend/src/catalog/system_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/mod.rs index 4cd271f0495b9..48251dcd6e824 100644 --- a/src/frontend/src/catalog/system_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/mod.rs @@ -412,6 +412,7 @@ prepare_sys_catalog! { { BuiltinCatalog::Table(&RW_HUMMOCK_BRANCHED_OBJECTS), read_hummock_branched_objects await }, { BuiltinCatalog::Table(&RW_HUMMOCK_COMPACTION_GROUP_CONFIGS), read_hummock_compaction_group_configs await }, { BuiltinCatalog::Table(&RW_HUMMOCK_META_CONFIGS), read_hummock_meta_configs await}, + { BuiltinCatalog::Table(&RW_HUMMOCK_COMPACTION_STATUS), read_hummock_compaction_status await }, } #[cfg(test)] diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs index 9f89c9eed5e81..9ef99966baace 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs @@ -21,6 +21,7 @@ mod rw_fragments; mod rw_functions; mod rw_hummock_branched_objects; mod rw_hummock_compaction_group_configs; +mod rw_hummock_compaction_status; mod rw_hummock_meta_configs; mod rw_hummock_pinned_snapshots; mod rw_hummock_pinned_versions; @@ -54,6 +55,7 @@ pub use rw_fragments::*; pub use rw_functions::*; pub use rw_hummock_branched_objects::*; pub use rw_hummock_compaction_group_configs::*; +pub use rw_hummock_compaction_status::*; pub use rw_hummock_meta_configs::*; pub use rw_hummock_pinned_snapshots::*; pub use rw_hummock_pinned_versions::*; diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_compaction_status.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_compaction_status.rs new file mode 100644 index 0000000000000..98d0a6de7207a --- /dev/null +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_compaction_status.rs @@ -0,0 +1,76 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::catalog::RW_CATALOG_SCHEMA_NAME; +use risingwave_common::error::Result; +use risingwave_common::row::OwnedRow; +use risingwave_common::types::{DataType, ScalarImpl}; +use risingwave_pb::hummock::CompactTaskAssignment; +use serde_json::json; + +use crate::catalog::system_catalog::{BuiltinTable, SysCatalogReaderImpl}; + +pub const RW_HUMMOCK_COMPACTION_STATUS: BuiltinTable = BuiltinTable { + name: "rw_hummock_compaction_status", + schema: RW_CATALOG_SCHEMA_NAME, + columns: &[ + (DataType::Int64, "compaction_group_id"), + (DataType::Int64, "task_id"), + (DataType::Int32, "select_level"), + (DataType::Int32, "target_level"), + (DataType::Int32, "task_type"), + (DataType::Int32, "task_status"), + (DataType::Int64, "watermark"), + (DataType::Int32, "base_level"), + (DataType::Boolean, "gc_delete_keys"), + (DataType::Int64, "target_file_size"), + (DataType::Int64, "target_sub_level_id"), + (DataType::Jsonb, "compact_task"), + ], + pk: &[0], +}; + +impl SysCatalogReaderImpl { + pub async fn read_hummock_compaction_status(&self) -> Result> { + let (_compaction_status, compaction_assignment, _compaction_progress) = + self.meta_client.list_compaction_status().await?; + Ok(assignments_to_rows(compaction_assignment)) + } +} + +fn assignments_to_rows(assignments: Vec) -> Vec { + let mut rows = vec![]; + for assignment in assignments { + let compact_task = assignment.compact_task.unwrap(); + + let select_level = compact_task.input_ssts[0].level_idx; + + rows.push(OwnedRow::new(vec![ + Some(ScalarImpl::Int64(compact_task.compaction_group_id as _)), + Some(ScalarImpl::Int64(compact_task.task_id as _)), + Some(ScalarImpl::Int32(select_level as _)), + Some(ScalarImpl::Int32(compact_task.target_level as _)), + Some(ScalarImpl::Int32(compact_task.task_type as _)), + Some(ScalarImpl::Int32(compact_task.task_status as _)), + Some(ScalarImpl::Int64(compact_task.watermark as _)), + Some(ScalarImpl::Int32(compact_task.base_level as _)), + Some(ScalarImpl::Bool(compact_task.gc_delete_keys as _)), + Some(ScalarImpl::Int64(compact_task.target_file_size as _)), + Some(ScalarImpl::Int64(compact_task.target_sub_level_id as _)), + Some(ScalarImpl::Jsonb(json!(compact_task).into())), + ])); + } + + rows +} diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index ae90c2e345f9f..3071d347f6dc9 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -20,7 +20,8 @@ use risingwave_pb::catalog::Table; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ - BranchedObject, CompactionGroupInfo, HummockSnapshot, HummockVersion, HummockVersionDelta, + BranchedObject, CompactStatus, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo, + HummockSnapshot, HummockVersion, HummockVersionDelta, }; use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs; use risingwave_pb::meta::list_actor_states_response::ActorState; @@ -93,6 +94,14 @@ pub trait FrontendMetaClient: Send + Sync { async fn list_hummock_active_write_limits(&self) -> Result>; async fn list_hummock_meta_configs(&self) -> Result>; + + async fn list_compaction_status( + &self, + ) -> Result<( + Vec, + Vec, + Vec, + )>; } pub struct FrontendMetaClientImpl(pub MetaClient); @@ -233,4 +242,14 @@ impl FrontendMetaClient for FrontendMetaClientImpl { async fn list_hummock_meta_configs(&self) -> Result> { self.0.list_hummock_meta_config().await } + + async fn list_compaction_status( + &self, + ) -> Result<( + Vec, + Vec, + Vec, + )> { + self.0.risectl_list_compaction_status().await + } } diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 20eb252fc5053..191af6e72fea7 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -37,7 +37,8 @@ use risingwave_pb::catalog::{ use risingwave_pb::ddl_service::{create_connection_request, DdlProgress}; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ - BranchedObject, CompactionGroupInfo, HummockSnapshot, HummockVersion, HummockVersionDelta, + BranchedObject, CompactStatus, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo, + HummockSnapshot, HummockVersion, HummockVersionDelta, }; use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs; use risingwave_pb::meta::list_actor_states_response::ActorState; @@ -863,6 +864,16 @@ impl FrontendMetaClient for MockFrontendMetaClient { async fn list_hummock_meta_configs(&self) -> RpcResult> { unimplemented!() } + + async fn list_compaction_status( + &self, + ) -> RpcResult<( + Vec, + Vec, + Vec, + )> { + unimplemented!() + } } #[cfg(test)]