diff --git a/proto/meta.proto b/proto/meta.proto index 0371b5540a6d..bcb6c331549f 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -273,6 +273,7 @@ enum ThrottleTarget { SOURCE = 1; MV = 2; TABLE_WITH_SOURCE = 3; + CDC_TABLE = 4; } message ApplyThrottleRequest { diff --git a/src/frontend/src/handler/alter_streaming_rate_limit.rs b/src/frontend/src/handler/alter_streaming_rate_limit.rs index 2dff4338a1de..e916d8ed8b87 100644 --- a/src/frontend/src/handler/alter_streaming_rate_limit.rs +++ b/src/frontend/src/handler/alter_streaming_rate_limit.rs @@ -14,7 +14,7 @@ use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::bail; -use risingwave_pb::meta::PbThrottleTarget; +use risingwave_pb::meta::ThrottleTarget as PbThrottleTarget; use risingwave_sqlparser::ast::ObjectName; use super::{HandlerArgs, RwPgResponse}; @@ -72,6 +72,19 @@ pub async fn handle_alter_streaming_rate_limit( }; (StatementType::ALTER_SOURCE, source_id) } + PbThrottleTarget::CdcTable => { + let reader = session.env().catalog_reader().read_guard(); + let (table, schema_name) = + reader.get_any_table_by_name(db_name, schema_path, &real_table_name)?; + if table.table_type != TableType::Table { + return Err(ErrorCode::InvalidInputSyntax(format!( + "\"{table_name}\" is not a table", + )) + .into()); + } + session.check_privilege_for_drop_alter(schema_name, &**table)?; + (StatementType::ALTER_TABLE, table.id.table_id) + } _ => bail!("Unsupported throttle target: {:?}", kind), }; diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index c3dcf0ce5976..95ebba2662f1 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -707,6 +707,18 @@ pub async fn handle( ) .await } + Statement::AlterTable { + name, + operation: AlterTableOperation::SetBackfillRateLimit { rate_limit }, + } => { + alter_streaming_rate_limit::handle_alter_streaming_rate_limit( + handler_args, + PbThrottleTarget::CdcTable, + name, + rate_limit, + ) + .await + } Statement::AlterIndex { name, operation: AlterIndexOperation::RenameIndex { index_name }, diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index d50a088972ee..cfbdda2e9650 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -118,6 +118,11 @@ impl StreamManagerService for StreamServiceImpl { .update_mv_rate_limit_by_table_id(TableId::from(request.id), request.rate) .await? } + ThrottleTarget::CdcTable => { + self.metadata_manager + .update_mv_rate_limit_by_table_id(TableId::from(request.id), request.rate) + .await? + } ThrottleTarget::Unspecified => { return Err(Status::invalid_argument("unspecified throttle target")) } diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index f5d97eca1c56..4756a9ded458 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -1354,6 +1354,10 @@ impl CatalogController { || (*fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0) { visit_stream_node(stream_node, |node| match node { + PbNodeBody::StreamCdcScan(node) => { + node.rate_limit = rate_limit; + found = true; + } PbNodeBody::StreamScan(node) => { node.rate_limit = rate_limit; found = true; diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index a523bbfeb3e7..b734cdb54602 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -1079,6 +1079,11 @@ impl FragmentManager { for actor in &mut fragment.actors { if let Some(node) = actor.nodes.as_mut() { visit_stream_node(node, |node_body| match node_body { + // rate limit for cdc backfill + NodeBody::StreamCdcScan(ref mut node) => { + node.rate_limit = rate_limit; + actor_to_apply.push(actor.actor_id); + } NodeBody::StreamScan(ref mut node) => { node.rate_limit = rate_limit; actor_to_apply.push(actor.actor_id); diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index 6ea385df950f..89e8f24bf592 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -106,6 +106,10 @@ pub enum AlterTableOperation { SetSourceRateLimit { rate_limit: i32, }, + /// SET BACKFILL_RATE_LIMIT TO + SetBackfillRateLimit { + rate_limit: i32, + }, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -293,6 +297,9 @@ impl fmt::Display for AlterTableOperation { AlterTableOperation::SetSourceRateLimit { rate_limit } => { write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit) } + AlterTableOperation::SetBackfillRateLimit { rate_limit } => { + write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit) + } } } } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 996fd9ebe849..8952eb9f215a 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -3092,6 +3092,8 @@ impl Parser<'_> { } } else if let Some(rate_limit) = self.parse_alter_source_rate_limit(true)? { AlterTableOperation::SetSourceRateLimit { rate_limit } + } else if let Some(rate_limit) = self.parse_alter_backfill_rate_limit()? { + AlterTableOperation::SetBackfillRateLimit { rate_limit } } else { return self.expected("SCHEMA/PARALLELISM/SOURCE_RATE_LIMIT after SET"); }