Skip to content

Commit

Permalink
feat(sql-backend): support throttle rate limit for sql-backend (#14305)
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 authored and Little-Wallace committed Jan 20, 2024
1 parent 62320ce commit 2dd61a6
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 16 deletions.
7 changes: 2 additions & 5 deletions src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,18 +105,15 @@ impl StreamManagerService for StreamServiceImpl {
request: Request<ApplyThrottleRequest>,
) -> Result<Response<ApplyThrottleResponse>, Status> {
let request = request.into_inner();
let MetadataManager::V1(mgr) = &self.metadata_manager else {
return Err(Status::unimplemented("not supported in v2"));
};

let actor_to_apply = match request.kind() {
ThrottleTarget::Source => {
mgr.fragment_manager
self.metadata_manager
.update_source_rate_limit_by_source_id(request.id as SourceId, request.rate)
.await?
}
ThrottleTarget::Mv => {
mgr.fragment_manager
self.metadata_manager
.update_mv_rate_limit_by_table_id(TableId::from(request.id), request.rate)
.await?
}
Expand Down
167 changes: 160 additions & 7 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,39 @@
use std::collections::HashMap;

use itertools::Itertools;
use risingwave_common::util::stream_graph_visitor::visit_stream_node;
use risingwave_meta_model_v2::actor::ActorStatus;
use risingwave_meta_model_v2::object::ObjectType;
use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Object, ObjectDependency, Table};
use risingwave_meta_model_v2::prelude::{
Actor, ActorDispatcher, Fragment, Object, ObjectDependency, Source, Table,
};
use risingwave_meta_model_v2::{
actor, actor_dispatcher, index, object_dependency, sink, source, streaming_job, table, ActorId,
CreateType, DatabaseId, JobStatus, ObjectId, SchemaId, UserId,
actor, actor_dispatcher, fragment, index, object_dependency, sink, source, streaming_job,
table, ActorId, CreateType, DatabaseId, FragmentId, JobStatus, ObjectId, SchemaId, SourceId,
StreamNode, UserId,
};
use risingwave_pb::catalog::source::PbOptionalAssociatedTableId;
use risingwave_pb::catalog::table::PbOptionalAssociatedSourceId;
use risingwave_pb::catalog::{PbCreateType, PbTable};
use risingwave_pb::meta::PbTableFragments;
use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
use risingwave_pb::stream_plan::Dispatcher;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::{Dispatcher, PbFragmentTypeFlag};
use sea_orm::sea_query::SimpleExpr;
use sea_orm::ActiveValue::Set;
use sea_orm::{
ActiveEnum, ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel,
NotSet, QueryFilter, TransactionTrait,
NotSet, QueryFilter, QuerySelect, TransactionTrait,
};

use crate::controller::catalog::CatalogController;
use crate::controller::utils::{check_relation_name_duplicate, ensure_object_id, ensure_user_id};
use crate::controller::utils::{
check_relation_name_duplicate, ensure_object_id, ensure_user_id, get_fragment_actor_ids,
};
use crate::manager::StreamingJob;
use crate::model::StreamContext;
use crate::stream::SplitAssignment;
use crate::MetaResult;
use crate::{MetaError, MetaResult};

impl CatalogController {
pub async fn create_streaming_job_obj(
Expand Down Expand Up @@ -398,4 +405,150 @@ impl CatalogController {

Ok(())
}

// edit the `rate_limit` of the `Source` node in given `source_id`'s fragments
// return the actor_ids to be applied
pub async fn update_source_rate_limit_by_source_id(
&self,
source_id: SourceId,
rate_limit: Option<u32>,
) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
let inner = self.inner.read().await;
let txn = inner.db.begin().await?;

let source = Source::find_by_id(source_id)
.one(&txn)
.await?
.ok_or_else(|| {
MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
})?;
let streaming_job_ids: Vec<ObjectId> =
if let Some(table_id) = source.optional_associated_table_id {
vec![table_id]
} else if let Some(source_info) = &source.source_info
&& source_info.inner_ref().cdc_source_job
{
vec![source_id]
} else {
ObjectDependency::find()
.select_only()
.column(object_dependency::Column::UsedBy)
.filter(object_dependency::Column::Oid.eq(source_id))
.into_tuple()
.all(&txn)
.await?
};

if streaming_job_ids.is_empty() {
return Err(MetaError::invalid_parameter(format!(
"source id {source_id} not used by any streaming job"
)));
}

let mut fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
.select_only()
.columns([
fragment::Column::FragmentId,
fragment::Column::FragmentTypeMask,
fragment::Column::StreamNode,
])
.filter(fragment::Column::JobId.is_in(streaming_job_ids))
.into_tuple()
.all(&txn)
.await?;

fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
let mut found = false;
if *fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 {
visit_stream_node(&mut stream_node.0, |node| {
if let PbNodeBody::Source(node) = node {
if let Some(node_inner) = &mut node.source_inner
&& node_inner.source_id == source_id as u32
{
node_inner.rate_limit = rate_limit;
found = true;
}
}
});
}
found
});

assert!(
!fragments.is_empty(),
"source id should be used by at least one fragment"
);
let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
for (id, _, stream_node) in fragments {
fragment::ActiveModel {
fragment_id: Set(id),
stream_node: Set(stream_node),
..Default::default()
}
.update(&txn)
.await?;
}
let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;

txn.commit().await?;

Ok(fragment_actors)
}

// edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments
// return the actor_ids to be applied
pub async fn update_mv_rate_limit_by_job_id(
&self,
job_id: ObjectId,
rate_limit: Option<u32>,
) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
let inner = self.inner.read().await;
let txn = inner.db.begin().await?;

let mut fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
.select_only()
.columns([
fragment::Column::FragmentId,
fragment::Column::FragmentTypeMask,
fragment::Column::StreamNode,
])
.filter(fragment::Column::JobId.eq(job_id))
.into_tuple()
.all(&txn)
.await?;

fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
let mut found = false;
if *fragment_type_mask & PbFragmentTypeFlag::StreamScan as i32 != 0 {
visit_stream_node(&mut stream_node.0, |node| {
if let PbNodeBody::StreamScan(node) = node {
node.rate_limit = rate_limit;
found = true;
}
});
}
found
});

if fragments.is_empty() {
return Err(MetaError::invalid_parameter(format!(
"stream scan node not found in job id {job_id}"
)));
}
let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
for (id, _, stream_node) in fragments {
fragment::ActiveModel {
fragment_id: Set(id),
stream_node: Set(stream_node),
..Default::default()
}
.update(&txn)
.await?;
}
let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;

txn.commit().await?;

Ok(fragment_actors)
}
}
28 changes: 24 additions & 4 deletions src/meta/src/controller/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@
use std::collections::HashMap;

use anyhow::anyhow;
use itertools::Itertools;
use risingwave_meta_model_migration::WithQuery;
use risingwave_meta_model_v2::actor::ActorStatus;
use risingwave_meta_model_v2::fragment::DistributionType;
use risingwave_meta_model_v2::object::ObjectType;
use risingwave_meta_model_v2::prelude::*;
use risingwave_meta_model_v2::{
actor_dispatcher, connection, database, fragment, function, index, object, object_dependency,
schema, sink, source, table, user, user_privilege, view, worker_property, ActorId,
DataTypeArray, DatabaseId, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, PrivilegeId,
SchemaId, UserId, WorkerId,
actor, actor_dispatcher, connection, database, fragment, function, index, object,
object_dependency, schema, sink, source, table, user, user_privilege, view, worker_property,
ActorId, DataTypeArray, DatabaseId, FragmentId, FragmentVnodeMapping, I32Array, ObjectId,
PrivilegeId, SchemaId, UserId, WorkerId,
};
use risingwave_pb::catalog::{PbConnection, PbFunction};
use risingwave_pb::common::PbParallelUnit;
Expand Down Expand Up @@ -690,3 +691,22 @@ where
})
.collect())
}

/// `get_fragment_actor_ids` returns the fragment actor ids of the given fragments.
pub async fn get_fragment_actor_ids<C>(
db: &C,
fragment_ids: Vec<FragmentId>,
) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>>
where
C: ConnectionTrait,
{
let fragment_actors: Vec<(FragmentId, ActorId)> = Actor::find()
.select_only()
.columns([actor::Column::FragmentId, actor::Column::ActorId])
.filter(actor::Column::FragmentId.is_in(fragment_ids))
.into_tuple()
.all(db)
.await?;

Ok(fragment_actors.into_iter().into_group_map())
}
49 changes: 49 additions & 0 deletions src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::{HashMap, HashSet};

use risingwave_common::catalog::{TableId, TableOption};
use risingwave_meta_model_v2::SourceId;
use risingwave_pb::catalog::PbSource;
use risingwave_pb::common::worker_node::{PbResource, State};
use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerType};
Expand Down Expand Up @@ -366,4 +367,52 @@ impl MetadataManager {
}
}
}

pub async fn update_source_rate_limit_by_source_id(
&self,
source_id: SourceId,
rate_limit: Option<u32>,
) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
match self {
MetadataManager::V1(mgr) => {
mgr.fragment_manager
.update_source_rate_limit_by_source_id(source_id, rate_limit)
.await
}
MetadataManager::V2(mgr) => {
let fragment_actors = mgr
.catalog_controller
.update_source_rate_limit_by_source_id(source_id as _, rate_limit)
.await?;
Ok(fragment_actors
.into_iter()
.map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
.collect())
}
}
}

pub async fn update_mv_rate_limit_by_table_id(
&self,
table_id: TableId,
rate_limit: Option<u32>,
) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
match self {
MetadataManager::V1(mgr) => {
mgr.fragment_manager
.update_mv_rate_limit_by_table_id(table_id, rate_limit)
.await
}
MetadataManager::V2(mgr) => {
let fragment_actors = mgr
.catalog_controller
.update_mv_rate_limit_by_job_id(table_id.table_id as _, rate_limit)
.await?;
Ok(fragment_actors
.into_iter()
.map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
.collect())
}
}
}
}

0 comments on commit 2dd61a6

Please sign in to comment.