diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index 53ce748915c0..5b81c8333bf7 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -105,18 +105,15 @@ impl StreamManagerService for StreamServiceImpl { request: Request, ) -> Result, Status> { let request = request.into_inner(); - let MetadataManager::V1(mgr) = &self.metadata_manager else { - return Err(Status::unimplemented("not supported in v2")); - }; let actor_to_apply = match request.kind() { ThrottleTarget::Source => { - mgr.fragment_manager + self.metadata_manager .update_source_rate_limit_by_source_id(request.id as SourceId, request.rate) .await? } ThrottleTarget::Mv => { - mgr.fragment_manager + self.metadata_manager .update_mv_rate_limit_by_table_id(TableId::from(request.id), request.rate) .await? } diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 56f86aab8d85..bcd86e28c528 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -15,32 +15,39 @@ use std::collections::HashMap; use itertools::Itertools; +use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model_v2::actor::ActorStatus; use risingwave_meta_model_v2::object::ObjectType; -use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Object, ObjectDependency, Table}; +use risingwave_meta_model_v2::prelude::{ + Actor, ActorDispatcher, Fragment, Object, ObjectDependency, Source, Table, +}; use risingwave_meta_model_v2::{ - actor, actor_dispatcher, index, object_dependency, sink, source, streaming_job, table, ActorId, - CreateType, DatabaseId, JobStatus, ObjectId, SchemaId, UserId, + actor, actor_dispatcher, fragment, index, object_dependency, sink, source, streaming_job, + table, ActorId, CreateType, DatabaseId, FragmentId, JobStatus, ObjectId, SchemaId, SourceId, + StreamNode, UserId, }; use risingwave_pb::catalog::source::PbOptionalAssociatedTableId; use risingwave_pb::catalog::table::PbOptionalAssociatedSourceId; use risingwave_pb::catalog::{PbCreateType, PbTable}; use risingwave_pb::meta::PbTableFragments; use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits}; -use risingwave_pb::stream_plan::Dispatcher; +use risingwave_pb::stream_plan::stream_node::PbNodeBody; +use risingwave_pb::stream_plan::{Dispatcher, PbFragmentTypeFlag}; use sea_orm::sea_query::SimpleExpr; use sea_orm::ActiveValue::Set; use sea_orm::{ ActiveEnum, ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, - NotSet, QueryFilter, TransactionTrait, + NotSet, QueryFilter, QuerySelect, TransactionTrait, }; use crate::controller::catalog::CatalogController; -use crate::controller::utils::{check_relation_name_duplicate, ensure_object_id, ensure_user_id}; +use crate::controller::utils::{ + check_relation_name_duplicate, ensure_object_id, ensure_user_id, get_fragment_actor_ids, +}; use crate::manager::StreamingJob; use crate::model::StreamContext; use crate::stream::SplitAssignment; -use crate::MetaResult; +use crate::{MetaError, MetaResult}; impl CatalogController { pub async fn create_streaming_job_obj( @@ -398,4 +405,150 @@ impl CatalogController { Ok(()) } + + // edit the `rate_limit` of the `Source` node in given `source_id`'s fragments + // return the actor_ids to be applied + pub async fn update_source_rate_limit_by_source_id( + &self, + source_id: SourceId, + rate_limit: Option, + ) -> MetaResult>> { + let inner = self.inner.read().await; + let txn = inner.db.begin().await?; + + let source = Source::find_by_id(source_id) + .one(&txn) + .await? + .ok_or_else(|| { + MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id) + })?; + let streaming_job_ids: Vec = + if let Some(table_id) = source.optional_associated_table_id { + vec![table_id] + } else if let Some(source_info) = &source.source_info + && source_info.inner_ref().cdc_source_job + { + vec![source_id] + } else { + ObjectDependency::find() + .select_only() + .column(object_dependency::Column::UsedBy) + .filter(object_dependency::Column::Oid.eq(source_id)) + .into_tuple() + .all(&txn) + .await? + }; + + if streaming_job_ids.is_empty() { + return Err(MetaError::invalid_parameter(format!( + "source id {source_id} not used by any streaming job" + ))); + } + + let mut fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find() + .select_only() + .columns([ + fragment::Column::FragmentId, + fragment::Column::FragmentTypeMask, + fragment::Column::StreamNode, + ]) + .filter(fragment::Column::JobId.is_in(streaming_job_ids)) + .into_tuple() + .all(&txn) + .await?; + + fragments.retain_mut(|(_, fragment_type_mask, stream_node)| { + let mut found = false; + if *fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 { + visit_stream_node(&mut stream_node.0, |node| { + if let PbNodeBody::Source(node) = node { + if let Some(node_inner) = &mut node.source_inner + && node_inner.source_id == source_id as u32 + { + node_inner.rate_limit = rate_limit; + found = true; + } + } + }); + } + found + }); + + assert!( + !fragments.is_empty(), + "source id should be used by at least one fragment" + ); + let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec(); + for (id, _, stream_node) in fragments { + fragment::ActiveModel { + fragment_id: Set(id), + stream_node: Set(stream_node), + ..Default::default() + } + .update(&txn) + .await?; + } + let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?; + + txn.commit().await?; + + Ok(fragment_actors) + } + + // edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments + // return the actor_ids to be applied + pub async fn update_mv_rate_limit_by_job_id( + &self, + job_id: ObjectId, + rate_limit: Option, + ) -> MetaResult>> { + let inner = self.inner.read().await; + let txn = inner.db.begin().await?; + + let mut fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find() + .select_only() + .columns([ + fragment::Column::FragmentId, + fragment::Column::FragmentTypeMask, + fragment::Column::StreamNode, + ]) + .filter(fragment::Column::JobId.eq(job_id)) + .into_tuple() + .all(&txn) + .await?; + + fragments.retain_mut(|(_, fragment_type_mask, stream_node)| { + let mut found = false; + if *fragment_type_mask & PbFragmentTypeFlag::StreamScan as i32 != 0 { + visit_stream_node(&mut stream_node.0, |node| { + if let PbNodeBody::StreamScan(node) = node { + node.rate_limit = rate_limit; + found = true; + } + }); + } + found + }); + + if fragments.is_empty() { + return Err(MetaError::invalid_parameter(format!( + "stream scan node not found in job id {job_id}" + ))); + } + let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec(); + for (id, _, stream_node) in fragments { + fragment::ActiveModel { + fragment_id: Set(id), + stream_node: Set(stream_node), + ..Default::default() + } + .update(&txn) + .await?; + } + let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?; + + txn.commit().await?; + + Ok(fragment_actors) + } } diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 41bad85b3f4c..76364697195f 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -15,16 +15,17 @@ use std::collections::HashMap; use anyhow::anyhow; +use itertools::Itertools; use risingwave_meta_model_migration::WithQuery; use risingwave_meta_model_v2::actor::ActorStatus; use risingwave_meta_model_v2::fragment::DistributionType; use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::prelude::*; use risingwave_meta_model_v2::{ - actor_dispatcher, connection, database, fragment, function, index, object, object_dependency, - schema, sink, source, table, user, user_privilege, view, worker_property, ActorId, - DataTypeArray, DatabaseId, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, PrivilegeId, - SchemaId, UserId, WorkerId, + actor, actor_dispatcher, connection, database, fragment, function, index, object, + object_dependency, schema, sink, source, table, user, user_privilege, view, worker_property, + ActorId, DataTypeArray, DatabaseId, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, + PrivilegeId, SchemaId, UserId, WorkerId, }; use risingwave_pb::catalog::{PbConnection, PbFunction}; use risingwave_pb::common::PbParallelUnit; @@ -690,3 +691,22 @@ where }) .collect()) } + +/// `get_fragment_actor_ids` returns the fragment actor ids of the given fragments. +pub async fn get_fragment_actor_ids( + db: &C, + fragment_ids: Vec, +) -> MetaResult>> +where + C: ConnectionTrait, +{ + let fragment_actors: Vec<(FragmentId, ActorId)> = Actor::find() + .select_only() + .columns([actor::Column::FragmentId, actor::Column::ActorId]) + .filter(actor::Column::FragmentId.is_in(fragment_ids)) + .into_tuple() + .all(db) + .await?; + + Ok(fragment_actors.into_iter().into_group_map()) +} diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 67f077f55316..0d50f7e1dc8c 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -15,6 +15,7 @@ use std::collections::{HashMap, HashSet}; use risingwave_common::catalog::{TableId, TableOption}; +use risingwave_meta_model_v2::SourceId; use risingwave_pb::catalog::PbSource; use risingwave_pb::common::worker_node::{PbResource, State}; use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerType}; @@ -366,4 +367,52 @@ impl MetadataManager { } } } + + pub async fn update_source_rate_limit_by_source_id( + &self, + source_id: SourceId, + rate_limit: Option, + ) -> MetaResult>> { + match self { + MetadataManager::V1(mgr) => { + mgr.fragment_manager + .update_source_rate_limit_by_source_id(source_id, rate_limit) + .await + } + MetadataManager::V2(mgr) => { + let fragment_actors = mgr + .catalog_controller + .update_source_rate_limit_by_source_id(source_id as _, rate_limit) + .await?; + Ok(fragment_actors + .into_iter() + .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect())) + .collect()) + } + } + } + + pub async fn update_mv_rate_limit_by_table_id( + &self, + table_id: TableId, + rate_limit: Option, + ) -> MetaResult>> { + match self { + MetadataManager::V1(mgr) => { + mgr.fragment_manager + .update_mv_rate_limit_by_table_id(table_id, rate_limit) + .await + } + MetadataManager::V2(mgr) => { + let fragment_actors = mgr + .catalog_controller + .update_mv_rate_limit_by_job_id(table_id.table_id as _, rate_limit) + .await?; + Ok(fragment_actors + .into_iter() + .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect())) + .collect()) + } + } + } }