diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index 2dca39237a49b..58b365875302f 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -706,7 +706,9 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { .await? } Commands::Debug(DebugCommands::Dump { common }) => cmd_impl::debug::dump(common).await?, - Commands::Throttle(ThrottleCommands::Source(_args)) => todo!(), + Commands::Throttle(ThrottleCommands::Source(args)) => { + apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await? + } Commands::Throttle(ThrottleCommands::Mv(args)) => { apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Mv, args).await?; } diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index 958712ff13266..f47afb29ec634 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -18,6 +18,7 @@ use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_meta::model::ActorId; use risingwave_meta::stream::ThrottleConfig; +use risingwave_meta_model_v2::SourceId; use risingwave_pb::meta::cancel_creating_jobs_request::Jobs; use risingwave_pb::meta::list_table_fragments_response::{ ActorInfo, FragmentInfo, TableFragmentInfo, @@ -104,7 +105,11 @@ impl StreamManagerService for StreamServiceImpl { ) -> Result, Status> { let request = request.into_inner(); let actor_to_apply = match request.kind() { - ThrottleTarget::Source => todo!(), + ThrottleTarget::Source => { + self.fragment_manager + .update_source_rate_limit_by_source_id(request.id as SourceId, request.rate) + .await? + } ThrottleTarget::Mv => { self.fragment_manager .update_mv_rate_limit_by_table_id(TableId::from(request.id), request.rate) diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index cbec5cf842aa0..95bcc4c97f8ee 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -23,6 +23,7 @@ use risingwave_common::catalog::TableId; use risingwave_common::hash::{ActorMapping, ParallelUnitId, ParallelUnitMapping}; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_connector::source::SplitImpl; +use risingwave_meta_model_v2::SourceId; use risingwave_pb::ddl_service::TableJobType; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::table_fragments::actor_status::ActorState; @@ -775,6 +776,55 @@ impl FragmentManager { actor_maps } + // edit the `rate_limit` of the `Source` node in given `source_id`'s fragments + // return the actor_ids to be applied + pub async fn update_source_rate_limit_by_source_id( + &self, + source_id: SourceId, + rate_limit: Option, + ) -> MetaResult>> { + let map = &mut self.core.write().await.table_fragments; + let mut table_id_to_apply = HashSet::new(); + 'table: for (table_id, table_fragments) in map.iter() { + for fragment in table_fragments.fragments.values() { + for actor in &fragment.actors { + if let Some(stream_node) = actor.nodes.as_ref() { + if let Some(NodeBody::Source(ref node)) = stream_node.node_body.as_ref() { + if let Some(node_inner) = &node.source_inner && node_inner.source_id == source_id { + table_id_to_apply.insert(*table_id); + continue 'table; + } + } + } + } + } + } + + let mut table_fragments = BTreeMapTransaction::new(map); + let mut to_apply_fragment = HashMap::new(); + for table_id in table_id_to_apply { + let mut table_fragment = table_fragments.get_mut(table_id).unwrap(); + for fragment in table_fragment.fragments.values_mut() { + let mut actor_to_apply = Vec::new(); + for actor in &mut fragment.actors { + if let Some(stream_node) = actor.nodes.as_mut() { + if let Some(NodeBody::Source(ref mut node)) = stream_node.node_body.as_mut() + { + if let Some(source_inner) = &mut node.source_inner { + source_inner.rate_limit = rate_limit; + } + actor_to_apply.push(actor.actor_id); + } + } + } + to_apply_fragment.insert(fragment.fragment_id, actor_to_apply); + } + } + + commit_meta!(self, table_fragments)?; + Ok(to_apply_fragment) + } + // edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments // return the actor_ids to be applied pub async fn update_mv_rate_limit_by_table_id( diff --git a/src/stream/src/executor/flow_control.rs b/src/stream/src/executor/flow_control.rs index 3c14271ce03a3..5bdad291deb8f 100644 --- a/src/stream/src/executor/flow_control.rs +++ b/src/stream/src/executor/flow_control.rs @@ -94,7 +94,7 @@ impl FlowControlExecutor { "actor {:?} rate limit changed to {:?}", self.actor_ctx.id, self.rate_limit - ) + ); } } }