Skip to content

Commit

Permalink
refactor: Refactor names and messages in meta and ctl modules
Browse files Browse the repository at this point in the history
- Renamed `ThrottleRequest` message to `ApplyThrottleRequest` in `proto/meta.proto`
- Renamed `ThrottleResponse` message to `ApplyThrottleResponse` in `proto/meta.proto`
- Renamed `ThrottleTarget` enum value `Source` to `SOURCE_UNSPECIFIED` in `proto/meta.proto`
- Renamed `ThrottleTarget` enum value `Mv` to `MV` in `proto/meta.proto`
- Renamed `ApplyThrottle` RPC request message to `ApplyThrottleRequest` in `proto/meta.proto`
- Renamed `ApplyThrottle` RPC response message to `ApplyThrottleResponse` in `proto/meta.proto`

Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion committed Nov 1, 2023
1 parent 981bb8e commit e5cb7cd
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 13 deletions.
10 changes: 5 additions & 5 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -239,17 +239,17 @@ message ListActorStatesResponse {
}

enum ThrottleTarget {
Source = 0;
Mv = 1;
SOURCE_UNSPECIFIED = 0;
MV = 1;
}

message ThrottleRequest {
message ApplyThrottleRequest {
ThrottleTarget kind = 1;
uint32 id = 2;
optional uint32 rate = 3;
}

message ThrottleResponse {
message ApplyThrottleResponse {
common.Status status = 1;
}

Expand All @@ -262,7 +262,7 @@ service StreamManagerService {
rpc ListTableFragmentStates(ListTableFragmentStatesRequest) returns (ListTableFragmentStatesResponse);
rpc ListFragmentDistribution(ListFragmentDistributionRequest) returns (ListFragmentDistributionResponse);
rpc ListActorStates(ListActorStatesRequest) returns (ListActorStatesResponse);
rpc ApplyThrottle(ThrottleRequest) returns (ThrottleResponse);
rpc ApplyThrottle(ApplyThrottleRequest) returns (ApplyThrottleResponse);
}

// Below for cluster service.
Expand Down
7 changes: 6 additions & 1 deletion src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,12 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
}
Commands::Debug(DebugCommands::Dump { common }) => cmd_impl::debug::dump(common).await?,
Commands::Throttle(ThrottleCommands::Source(args)) => {
apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await?
apply_throttle(
context,
risingwave_pb::meta::PbThrottleTarget::SourceUnspecified,
args,
)
.await?
}
Commands::Throttle(ThrottleCommands::Mv(args)) => {
apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Mv, args).await?;
Expand Down
8 changes: 4 additions & 4 deletions src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ impl StreamManagerService for StreamServiceImpl {
#[cfg_attr(coverage, coverage(off))]
async fn apply_throttle(
&self,
request: Request<ThrottleRequest>,
) -> Result<Response<ThrottleResponse>, Status> {
request: Request<ApplyThrottleRequest>,
) -> Result<Response<ApplyThrottleResponse>, Status> {
let request = request.into_inner();
let actor_to_apply = match request.kind() {
ThrottleTarget::Source => {
ThrottleTarget::SourceUnspecified => {
self.fragment_manager
.update_source_rate_limit_by_source_id(request.id as SourceId, request.rate)
.await?
Expand Down Expand Up @@ -134,7 +134,7 @@ impl StreamManagerService for StreamServiceImpl {
.run_command(Command::Throttle(mutation))
.await?;

Ok(Response::new(ThrottleResponse { status: None }))
Ok(Response::new(ApplyThrottleResponse { status: None }))
}

async fn cancel_creating_jobs(
Expand Down
6 changes: 3 additions & 3 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,8 +789,8 @@ impl MetaClient {
kind: PbThrottleTarget,
id: u32,
rate: Option<u32>,
) -> Result<ThrottleResponse> {
let request = ThrottleRequest {
) -> Result<ApplyThrottleResponse> {
let request = ApplyThrottleRequest {
kind: kind as i32,
id,
rate,
Expand Down Expand Up @@ -1735,7 +1735,7 @@ macro_rules! for_all_meta_rpc {
,{ stream_client, flush, FlushRequest, FlushResponse }
,{ stream_client, pause, PauseRequest, PauseResponse }
,{ stream_client, resume, ResumeRequest, ResumeResponse }
,{ stream_client, apply_throttle, ThrottleRequest, ThrottleResponse }
,{ stream_client, apply_throttle, ApplyThrottleRequest, ApplyThrottleResponse }
,{ stream_client, cancel_creating_jobs, CancelCreatingJobsRequest, CancelCreatingJobsResponse }
,{ stream_client, list_table_fragments, ListTableFragmentsRequest, ListTableFragmentsResponse }
,{ stream_client, list_table_fragment_states, ListTableFragmentStatesRequest, ListTableFragmentStatesResponse }
Expand Down

0 comments on commit e5cb7cd

Please sign in to comment.