From ca74b3c1fcf006433c4f3b24f1e38e47ca5a3db8 Mon Sep 17 00:00:00 2001 From: Li0k Date: Fri, 5 Jan 2024 12:36:08 +0800 Subject: [PATCH] chore(storage): ctl support cancel specific task (#14325) --- proto/hummock.proto | 10 ++++++++++ src/ctl/src/cmd_impl/hummock/compaction_group.rs | 11 +++++++++++ src/ctl/src/lib.rs | 8 ++++++++ src/meta/service/src/hummock_service.rs | 14 ++++++++++++++ src/rpc_client/src/meta_client.rs | 11 +++++++++++ 5 files changed, 54 insertions(+) diff --git a/proto/hummock.proto b/proto/hummock.proto index ccbfef42278f1..58007117811d5 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -750,6 +750,15 @@ message ListCompactTaskProgressResponse { repeated CompactTaskProgress task_progress = 1; } +message CancelCompactTaskRequest { + uint64 task_id = 1; + CompactTask.TaskStatus task_status = 2; +} + +message CancelCompactTaskResponse { + bool ret = 1; +} + service HummockManagerService { rpc UnpinVersionBefore(UnpinVersionBeforeRequest) returns (UnpinVersionBeforeResponse); rpc GetCurrentVersion(GetCurrentVersionRequest) returns (GetCurrentVersionResponse); @@ -788,6 +797,7 @@ service HummockManagerService { rpc GetCompactionScore(GetCompactionScoreRequest) returns (GetCompactionScoreResponse); rpc ListCompactTaskAssignment(ListCompactTaskAssignmentRequest) returns (ListCompactTaskAssignmentResponse); rpc ListCompactTaskProgress(ListCompactTaskProgressRequest) returns (ListCompactTaskProgressResponse); + rpc CancelCompactTask(CancelCompactTaskRequest) returns (CancelCompactTaskResponse); } message CompactionConfig { diff --git a/src/ctl/src/cmd_impl/hummock/compaction_group.rs b/src/ctl/src/cmd_impl/hummock/compaction_group.rs index 60ec391e9b7c5..068472dd1ce4a 100644 --- a/src/ctl/src/cmd_impl/hummock/compaction_group.rs +++ b/src/ctl/src/cmd_impl/hummock/compaction_group.rs @@ -18,6 +18,7 @@ use comfy_table::{Row, Table}; use itertools::Itertools; use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_hummock_sdk::{CompactionGroupId, HummockContextId}; +use risingwave_pb::hummock::compact_task::TaskStatus; use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig; use crate::CtlContext; @@ -260,3 +261,13 @@ pub async fn get_compaction_score( println!("{table}"); Ok(()) } + +pub async fn cancel_compact_task(context: &CtlContext, task_id: u64) -> anyhow::Result<()> { + let meta_client = context.meta_client().await?; + let ret = meta_client + .cancel_compact_task(task_id, TaskStatus::ManualCanceled) + .await?; + println!("cancel_compact_task {} ret {:?}", task_id, ret); + + Ok(()) +} diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index 004b36302cf3b..95c9aa25a6b7a 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -266,6 +266,11 @@ enum HummockCommands { ValidateVersion, /// Rebuild table stats RebuildTableStats, + + CancelCompactTask { + #[clap(short, long)] + task_id: u64, + }, } #[derive(Subcommand)] @@ -655,6 +660,9 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { Commands::Hummock(HummockCommands::RebuildTableStats) => { cmd_impl::hummock::rebuild_table_stats(context).await?; } + Commands::Hummock(HummockCommands::CancelCompactTask { task_id }) => { + cmd_impl::hummock::cancel_compact_task(context, task_id).await?; + } Commands::Table(TableCommands::Scan { mv_name, data_dir }) => { cmd_impl::table::scan(context, mv_name, data_dir).await? } diff --git a/src/meta/service/src/hummock_service.rs b/src/meta/service/src/hummock_service.rs index 599b7a7cdb79d..a082b723dd124 100644 --- a/src/meta/service/src/hummock_service.rs +++ b/src/meta/service/src/hummock_service.rs @@ -651,6 +651,20 @@ impl HummockManagerService for HummockServiceImpl { task_progress, })) } + + async fn cancel_compact_task( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let ret = self + .hummock_manager + .cancel_compact_task(request.task_id, request.task_status()) + .await?; + + let response = Response::new(CancelCompactTaskResponse { ret }); + return Ok(response); + } } #[cfg(test)] diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index d8318d79b63b1..2dc51961c901e 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -55,6 +55,7 @@ use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient; use risingwave_pb::ddl_service::drop_table_request::SourceId; use risingwave_pb::ddl_service::*; +use risingwave_pb::hummock::compact_task::TaskStatus; use risingwave_pb::hummock::get_compaction_score_response::PickerInfo; use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient; use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig; @@ -1250,6 +1251,15 @@ impl MetaClient { }) .join(); } + + pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result { + let req = CancelCompactTaskRequest { + task_id, + task_status: task_status as _, + }; + let resp = self.inner.cancel_compact_task(req).await?; + Ok(resp.ret) + } } #[async_trait] @@ -1891,6 +1901,7 @@ macro_rules! for_all_meta_rpc { ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse } ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse } ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse } + ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse} ,{ user_client, create_user, CreateUserRequest, CreateUserResponse } ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse } ,{ user_client, drop_user, DropUserRequest, DropUserResponse }