Skip to content

Commit

Permalink
feat: Implement logic for throttling rate limits and managing table f…
Browse files Browse the repository at this point in the history
…ragments

- Implemented throttling rate limit logic for `ThrottleTarget::Source` and applied the rate
- Added functions for canceling creating jobs, listing table fragments and fragment distribution, and listing actor states
- Updated rate limit and applied reschedules for fragments associated with specific source and table IDs
- Implemented functions for managing actors, including migrating, getting running actors, and getting upstream and downstream fragments
- Created functions for listing and creating table fragments, and for replacing and dropping fragments
- Added functions for selecting specific table fragments and getting table ID mappings for materialized views
- Updated functions for getting table revisions, checking fragment presence, and locking the fragment manager

Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion committed Nov 1, 2023
1 parent 479f95f commit 6165df7
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 3 deletions.
4 changes: 3 additions & 1 deletion src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,9 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
.await?
}
Commands::Debug(DebugCommands::Dump { common }) => cmd_impl::debug::dump(common).await?,
Commands::Throttle(ThrottleCommands::Source(_args)) => todo!(),
Commands::Throttle(ThrottleCommands::Source(args)) => {
apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await?
}
Commands::Throttle(ThrottleCommands::Mv(args)) => {
apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Mv, args).await?;
}
Expand Down
7 changes: 6 additions & 1 deletion src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_meta::model::ActorId;
use risingwave_meta::stream::ThrottleConfig;
use risingwave_meta_model_v2::SourceId;
use risingwave_pb::meta::cancel_creating_jobs_request::Jobs;
use risingwave_pb::meta::list_table_fragments_response::{
ActorInfo, FragmentInfo, TableFragmentInfo,
Expand Down Expand Up @@ -104,7 +105,11 @@ impl StreamManagerService for StreamServiceImpl {
) -> Result<Response<ThrottleResponse>, Status> {
let request = request.into_inner();
let actor_to_apply = match request.kind() {
ThrottleTarget::Source => todo!(),
ThrottleTarget::Source => {
self.fragment_manager
.update_source_rate_limit_by_source_id(request.id as SourceId, request.rate)
.await?
}
ThrottleTarget::Mv => {
self.fragment_manager
.update_mv_rate_limit_by_table_id(TableId::from(request.id), request.rate)
Expand Down
50 changes: 50 additions & 0 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use risingwave_common::catalog::TableId;
use risingwave_common::hash::{ActorMapping, ParallelUnitId, ParallelUnitMapping};
use risingwave_common::util::stream_graph_visitor::visit_stream_node;
use risingwave_connector::source::SplitImpl;
use risingwave_meta_model_v2::SourceId;
use risingwave_pb::ddl_service::TableJobType;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::table_fragments::actor_status::ActorState;
Expand Down Expand Up @@ -775,6 +776,55 @@ impl FragmentManager {
actor_maps
}

// edit the `rate_limit` of the `Source` node in given `source_id`'s fragments
// return the actor_ids to be applied
pub async fn update_source_rate_limit_by_source_id(
&self,
source_id: SourceId,
rate_limit: Option<u32>,
) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
let map = &mut self.core.write().await.table_fragments;
let mut table_id_to_apply = HashSet::new();
'table: for (table_id, table_fragments) in map.iter() {
for fragment in table_fragments.fragments.values() {
for actor in &fragment.actors {
if let Some(stream_node) = actor.nodes.as_ref() {
if let Some(NodeBody::Source(ref node)) = stream_node.node_body.as_ref() {
if let Some(node_inner) = &node.source_inner && node_inner.source_id == source_id {
table_id_to_apply.insert(*table_id);
continue 'table;
}
}
}
}
}
}

let mut table_fragments = BTreeMapTransaction::new(map);
let mut to_apply_fragment = HashMap::new();
for table_id in table_id_to_apply {
let mut table_fragment = table_fragments.get_mut(table_id).unwrap();
for fragment in table_fragment.fragments.values_mut() {
let mut actor_to_apply = Vec::new();
for actor in &mut fragment.actors {
if let Some(stream_node) = actor.nodes.as_mut() {
if let Some(NodeBody::Source(ref mut node)) = stream_node.node_body.as_mut()
{
if let Some(source_inner) = &mut node.source_inner {
source_inner.rate_limit = rate_limit;
}
actor_to_apply.push(actor.actor_id);
}
}
}
to_apply_fragment.insert(fragment.fragment_id, actor_to_apply);
}
}

commit_meta!(self, table_fragments)?;
Ok(to_apply_fragment)
}

// edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments
// return the actor_ids to be applied
pub async fn update_mv_rate_limit_by_table_id(
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl FlowControlExecutor {
"actor {:?} rate limit changed to {:?}",
self.actor_ctx.id,
self.rate_limit
)
);
}
}
}
Expand Down

0 comments on commit 6165df7

Please sign in to comment.