Skip to content

Commit

Permalink
chore(storage): ctl support cancel specific task (#14325)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k committed Jan 10, 2024
1 parent 7a784cc commit ca74b3c
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 0 deletions.
10 changes: 10 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,15 @@ message ListCompactTaskProgressResponse {
repeated CompactTaskProgress task_progress = 1;
}

message CancelCompactTaskRequest {
uint64 task_id = 1;
CompactTask.TaskStatus task_status = 2;
}

message CancelCompactTaskResponse {
bool ret = 1;
}

service HummockManagerService {
rpc UnpinVersionBefore(UnpinVersionBeforeRequest) returns (UnpinVersionBeforeResponse);
rpc GetCurrentVersion(GetCurrentVersionRequest) returns (GetCurrentVersionResponse);
Expand Down Expand Up @@ -788,6 +797,7 @@ service HummockManagerService {
rpc GetCompactionScore(GetCompactionScoreRequest) returns (GetCompactionScoreResponse);
rpc ListCompactTaskAssignment(ListCompactTaskAssignmentRequest) returns (ListCompactTaskAssignmentResponse);
rpc ListCompactTaskProgress(ListCompactTaskProgressRequest) returns (ListCompactTaskProgressResponse);
rpc CancelCompactTask(CancelCompactTaskRequest) returns (CancelCompactTaskResponse);
}

message CompactionConfig {
Expand Down
11 changes: 11 additions & 0 deletions src/ctl/src/cmd_impl/hummock/compaction_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use comfy_table::{Row, Table};
use itertools::Itertools;
use risingwave_hummock_sdk::compaction_group::StateTableId;
use risingwave_hummock_sdk::{CompactionGroupId, HummockContextId};
use risingwave_pb::hummock::compact_task::TaskStatus;
use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;

use crate::CtlContext;
Expand Down Expand Up @@ -260,3 +261,13 @@ pub async fn get_compaction_score(
println!("{table}");
Ok(())
}

pub async fn cancel_compact_task(context: &CtlContext, task_id: u64) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;
let ret = meta_client
.cancel_compact_task(task_id, TaskStatus::ManualCanceled)
.await?;
println!("cancel_compact_task {} ret {:?}", task_id, ret);

Ok(())
}
8 changes: 8 additions & 0 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,11 @@ enum HummockCommands {
ValidateVersion,
/// Rebuild table stats
RebuildTableStats,

CancelCompactTask {
#[clap(short, long)]
task_id: u64,
},
}

#[derive(Subcommand)]
Expand Down Expand Up @@ -655,6 +660,9 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
Commands::Hummock(HummockCommands::RebuildTableStats) => {
cmd_impl::hummock::rebuild_table_stats(context).await?;
}
Commands::Hummock(HummockCommands::CancelCompactTask { task_id }) => {
cmd_impl::hummock::cancel_compact_task(context, task_id).await?;
}
Commands::Table(TableCommands::Scan { mv_name, data_dir }) => {
cmd_impl::table::scan(context, mv_name, data_dir).await?
}
Expand Down
14 changes: 14 additions & 0 deletions src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,20 @@ impl HummockManagerService for HummockServiceImpl {
task_progress,
}))
}

async fn cancel_compact_task(
&self,
request: Request<CancelCompactTaskRequest>,
) -> Result<Response<CancelCompactTaskResponse>, Status> {
let request = request.into_inner();
let ret = self
.hummock_manager
.cancel_compact_task(request.task_id, request.task_status())
.await?;

let response = Response::new(CancelCompactTaskResponse { ret });
return Ok(response);
}
}

#[cfg(test)]
Expand Down
11 changes: 11 additions & 0 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use risingwave_pb::ddl_service::alter_owner_request::Object;
use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
use risingwave_pb::ddl_service::drop_table_request::SourceId;
use risingwave_pb::ddl_service::*;
use risingwave_pb::hummock::compact_task::TaskStatus;
use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
Expand Down Expand Up @@ -1250,6 +1251,15 @@ impl MetaClient {
})
.join();
}

pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
let req = CancelCompactTaskRequest {
task_id,
task_status: task_status as _,
};
let resp = self.inner.cancel_compact_task(req).await?;
Ok(resp.ret)
}
}

#[async_trait]
Expand Down Expand Up @@ -1891,6 +1901,7 @@ macro_rules! for_all_meta_rpc {
,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse }
,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse }
,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse }
,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse}
,{ user_client, create_user, CreateUserRequest, CreateUserResponse }
,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse }
,{ user_client, drop_user, DropUserRequest, DropUserResponse }
Expand Down

0 comments on commit ca74b3c

Please sign in to comment.