From 3fdd6a5cc0c0789118ab726cbb776bfefb41a557 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 20 Nov 2024 21:41:47 +0800 Subject: [PATCH] feat: add rw_rate_limit system catalog (#19466) Signed-off-by: xxchan --- e2e_test/source_inline/fs/posix_fs.slt | 44 ++++++- ...slt => rate_limit_source_kafka.slt.serial} | 26 +++- ...rate_limit_source_kafka_shared.slt.serial} | 34 ++++- ....slt => rate_limit_table_kafka.slt.serial} | 2 +- proto/meta.proto | 14 ++ proto/stream_plan.proto | 2 + .../catalog/system_catalog/rw_catalog/mod.rs | 1 + .../system_catalog/rw_catalog/rw_fragments.rs | 2 +- .../rw_catalog/rw_rate_limit.rs | 50 ++++++++ src/frontend/src/meta_client.rs | 7 + src/frontend/src/stream_fragmenter/mod.rs | 4 + src/frontend/src/test_utils.rs | 5 + src/meta/service/src/stream_service.rs | 12 ++ src/meta/src/controller/streaming_job.rs | 121 ++++++++++++++++-- src/meta/src/manager/metadata.rs | 6 + src/prost/src/lib.rs | 19 +++ src/rpc_client/src/meta_client.rs | 9 ++ src/sqlparser/src/parser.rs | 2 +- 18 files changed, 340 insertions(+), 20 deletions(-) rename e2e_test/source_inline/kafka/alter/{rate_limit_source_kafka.slt => rate_limit_source_kafka.slt.serial} (80%) rename e2e_test/source_inline/kafka/alter/{rate_limit_source_kafka_shared.slt => rate_limit_source_kafka_shared.slt.serial} (74%) rename e2e_test/source_inline/kafka/alter/{rate_limit_table_kafka.slt => rate_limit_table_kafka.slt.serial} (99%) create mode 100644 src/frontend/src/catalog/system_catalog/rw_catalog/rw_rate_limit.rs diff --git a/e2e_test/source_inline/fs/posix_fs.slt b/e2e_test/source_inline/fs/posix_fs.slt index da56502e417e8..5408daf28321a 100644 --- a/e2e_test/source_inline/fs/posix_fs.slt +++ b/e2e_test/source_inline/fs/posix_fs.slt @@ -33,21 +33,36 @@ create materialized view diamonds_mv as select * from diamonds_source; sleep 1s # no output due to rate limit -query TTTT rowsort +statement count 0 select * from diamonds; ----- -query TTTT rowsort + +statement count 0 select * from diamonds_mv; + + +query T +select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id +order by name, node_name; ---- +diamonds FS_FETCH {FS_FETCH} 0 +diamonds SOURCE {SOURCE} 0 +diamonds_mv FS_FETCH {MVIEW,FS_FETCH} 0 +diamonds_mv SOURCE {SOURCE} 0 statement ok ALTER TABLE diamonds SET source_rate_limit TO DEFAULT; -statement ok -ALTER source diamonds_source SET source_rate_limit TO DEFAULT; -sleep 10s +query T +select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id +order by name, node_name; +---- +diamonds_mv FS_FETCH {MVIEW,FS_FETCH} 0 +diamonds_mv SOURCE {SOURCE} 0 + + +sleep 3s query TTTT rowsort select * from diamonds; @@ -63,6 +78,23 @@ select * from diamonds; 1.28 Good J 63.1 1.3 Fair E 64.7 + +statement count 0 +select * from diamonds_mv; + + + +statement ok +ALTER SOURCE diamonds_source SET source_rate_limit TO DEFAULT; + +query T +select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id +order by name, node_name; +---- + + +sleep 3s + query TTTT rowsort select * from diamonds_mv; ---- diff --git a/e2e_test/source_inline/kafka/alter/rate_limit_source_kafka.slt b/e2e_test/source_inline/kafka/alter/rate_limit_source_kafka.slt.serial similarity index 80% rename from e2e_test/source_inline/kafka/alter/rate_limit_source_kafka.slt rename to e2e_test/source_inline/kafka/alter/rate_limit_source_kafka.slt.serial index 96fd016c5812d..8353166b5a874 100644 --- a/e2e_test/source_inline/kafka/alter/rate_limit_source_kafka.slt +++ b/e2e_test/source_inline/kafka/alter/rate_limit_source_kafka.slt.serial @@ -80,16 +80,38 @@ select * from rl_mv3; ---- 0 +query T +select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id +order by name; +---- +rl_mv1 SOURCE {SOURCE} 0 +rl_mv2 SOURCE {SOURCE} 0 +rl_mv3 SOURCE {SOURCE} 0 + ############## Alter Source (rate_limit = 0 --> rate_limit = 1000) skipif in-memory -query I +statement count 0 alter source kafka_source set source_rate_limit to 1000; +query T +select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id +order by name; +---- +rl_mv1 SOURCE {SOURCE} 1000 +rl_mv2 SOURCE {SOURCE} 1000 +rl_mv3 SOURCE {SOURCE} 1000 + skipif in-memory -query I +statement count 0 alter source kafka_source set source_rate_limit to default; +# rate limit becomes None +query T +select count(*) from rw_rate_limit; +---- +0 + skipif in-memory sleep 3s diff --git a/e2e_test/source_inline/kafka/alter/rate_limit_source_kafka_shared.slt b/e2e_test/source_inline/kafka/alter/rate_limit_source_kafka_shared.slt.serial similarity index 74% rename from e2e_test/source_inline/kafka/alter/rate_limit_source_kafka_shared.slt rename to e2e_test/source_inline/kafka/alter/rate_limit_source_kafka_shared.slt.serial index 29c0b83aa40d8..a9a730930b1b2 100644 --- a/e2e_test/source_inline/kafka/alter/rate_limit_source_kafka_shared.slt +++ b/e2e_test/source_inline/kafka/alter/rate_limit_source_kafka_shared.slt.serial @@ -84,11 +84,26 @@ SELECT progress from rw_ddl_progress; ---- 0 rows consumed +query T +select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id +order by name; +---- +kafka_source SOURCE {SOURCE} 0 +rl_mv2 SOURCE_BACKFILL {SOURCE_SCAN} 0 + + ############## Alter Source (rate_limit = 0 --> rate_limit = 1000) statement ok alter source kafka_source set source_rate_limit to 1000; +query T +select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id +order by name; +---- +kafka_source SOURCE {SOURCE} 1000 +rl_mv2 SOURCE_BACKFILL {SOURCE_SCAN} 0 + sleep 3s query I @@ -114,17 +129,34 @@ LINE 1: alter materialized view rl_mv2 set source_rate_limit = 1000; ^ +query T +select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id +order by name; +---- +kafka_source SOURCE {SOURCE} 1000 +rl_mv2 SOURCE_BACKFILL {SOURCE_SCAN} 0 + + statement ok alter materialized view rl_mv2 set backfill_rate_limit = 2000; + +query T +select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id +order by name; +---- +kafka_source SOURCE {SOURCE} 1000 +rl_mv2 SOURCE_BACKFILL {SOURCE_SCAN} 2000 + sleep 3s -query ? +query T select * from rl_mv2; ---- 2000 + ############## Cleanup statement ok diff --git a/e2e_test/source_inline/kafka/alter/rate_limit_table_kafka.slt b/e2e_test/source_inline/kafka/alter/rate_limit_table_kafka.slt.serial similarity index 99% rename from e2e_test/source_inline/kafka/alter/rate_limit_table_kafka.slt rename to e2e_test/source_inline/kafka/alter/rate_limit_table_kafka.slt.serial index ac2a665fd10c0..5d22fc85dea4f 100644 --- a/e2e_test/source_inline/kafka/alter/rate_limit_table_kafka.slt +++ b/e2e_test/source_inline/kafka/alter/rate_limit_table_kafka.slt.serial @@ -63,7 +63,7 @@ select count(*) from kafka_source; ############## Alter source (rate_limit = 0 --> rate_limit = 1000) skipif in-memory -query I +statement ok alter table kafka_source set source_rate_limit to 1000; skipif in-memory diff --git a/proto/meta.proto b/proto/meta.proto index 15a16f36bdddc..5c6d1c64274fd 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -335,6 +335,7 @@ service StreamManagerService { rpc ListObjectDependencies(ListObjectDependenciesRequest) returns (ListObjectDependenciesResponse); rpc ApplyThrottle(ApplyThrottleRequest) returns (ApplyThrottleResponse); rpc Recover(RecoverRequest) returns (RecoverResponse); + rpc ListRateLimits(ListRateLimitsRequest) returns (ListRateLimitsResponse); } // Below for cluster service. @@ -862,3 +863,16 @@ message GetClusterLimitsResponse { service ClusterLimitService { rpc GetClusterLimits(GetClusterLimitsRequest) returns (GetClusterLimitsResponse); } + +message ListRateLimitsRequest {} + +message ListRateLimitsResponse { + message RateLimitInfo { + uint32 fragment_id = 1; + uint32 job_id = 2; + uint32 fragment_type_mask = 3; + uint32 rate_limit = 4; + string node_name = 5; + } + repeated RateLimitInfo rate_limits = 1; +} diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 70c0d229394bb..d5a47b53b6af0 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -985,6 +985,8 @@ enum FragmentTypeFlag { FRAGMENT_TYPE_FLAG_CDC_FILTER = 256; FRAGMENT_TYPE_FLAG_SOURCE_SCAN = 1024; FRAGMENT_TYPE_FLAG_SNAPSHOT_BACKFILL_STREAM_SCAN = 2048; + // Note: this flag is not available in old fragments, so only suitable for debugging purpose. + FRAGMENT_TYPE_FLAG_FS_FETCH = 4096; } // The streaming context associated with a stream plan diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs index 9c546f1ec7294..947560e44e62e 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs @@ -39,6 +39,7 @@ mod rw_indexes; mod rw_internal_tables; mod rw_materialized_views; mod rw_meta_snapshot; +mod rw_rate_limit; mod rw_relation_info; mod rw_relations; mod rw_schemas; diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragments.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragments.rs index 91f818e7919f7..75a040f2733cf 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragments.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragments.rs @@ -32,7 +32,7 @@ struct RwFragment { max_parallelism: i32, } -fn extract_fragment_type_flag(mask: u32) -> Vec { +pub(super) fn extract_fragment_type_flag(mask: u32) -> Vec { let mut result = vec![]; for i in 0..32 { let bit = 1 << i; diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_rate_limit.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_rate_limit.rs new file mode 100644 index 0000000000000..34602461ca3b1 --- /dev/null +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_rate_limit.rs @@ -0,0 +1,50 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::types::Fields; +use risingwave_frontend_macro::system_catalog; + +use super::rw_fragments::extract_fragment_type_flag; +use crate::catalog::system_catalog::SysCatalogReaderImpl; +use crate::error::Result; + +#[derive(Fields)] +#[primary_key(fragment_id, node_name)] +struct RwRateLimit { + fragment_id: i32, + fragment_type: Vec, + node_name: String, + table_id: i32, + rate_limit: i32, +} + +#[system_catalog(table, "rw_catalog.rw_rate_limit")] +async fn read_rw_rate_limit(reader: &SysCatalogReaderImpl) -> Result> { + let rate_limits = reader.meta_client.list_rate_limits().await?; + + Ok(rate_limits + .into_iter() + .map(|info| RwRateLimit { + fragment_id: info.fragment_id as i32, + fragment_type: extract_fragment_type_flag(info.fragment_type_mask) + .into_iter() + .flat_map(|t| t.as_str_name().strip_prefix("FRAGMENT_TYPE_FLAG_")) + .map(|s| s.into()) + .collect(), + table_id: info.job_id as i32, + rate_limit: info.rate_limit as i32, + node_name: info.node_name, + }) + .collect()) +} diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index a91a0d8abc878..760c7bd450e1e 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -33,6 +33,7 @@ use risingwave_pb::meta::list_actor_splits_response::ActorSplit; use risingwave_pb::meta::list_actor_states_response::ActorState; use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution; use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies; +use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo; use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState; use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo; use risingwave_pb::meta::{EventLog, PbThrottleTarget, RecoveryStatus}; @@ -125,6 +126,8 @@ pub trait FrontendMetaClient: Send + Sync { async fn get_cluster_recovery_status(&self) -> Result; async fn get_cluster_limits(&self) -> Result>; + + async fn list_rate_limits(&self) -> Result>; } pub struct FrontendMetaClientImpl(pub MetaClient); @@ -300,4 +303,8 @@ impl FrontendMetaClient for FrontendMetaClientImpl { async fn get_cluster_limits(&self) -> Result> { self.0.get_cluster_limits().await } + + async fn list_rate_limits(&self) -> Result> { + self.0.list_rate_limits().await + } } diff --git a/src/frontend/src/stream_fragmenter/mod.rs b/src/frontend/src/stream_fragmenter/mod.rs index daa48d99969ca..f30b0abf5b4c4 100644 --- a/src/frontend/src/stream_fragmenter/mod.rs +++ b/src/frontend/src/stream_fragmenter/mod.rs @@ -361,6 +361,10 @@ fn build_fragment( current_fragment.requires_singleton = true; } + NodeBody::StreamFsFetch(_) => { + current_fragment.fragment_type_mask |= FragmentTypeFlag::FsFetch as u32; + } + _ => {} }; diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index d94b1dd2652d6..15a5281dec5e2 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -56,6 +56,7 @@ use risingwave_pb::meta::list_actor_splits_response::ActorSplit; use risingwave_pb::meta::list_actor_states_response::ActorState; use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution; use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies; +use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo; use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState; use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo; use risingwave_pb::meta::{ @@ -1065,6 +1066,10 @@ impl FrontendMetaClient for MockFrontendMetaClient { async fn get_cluster_limits(&self) -> RpcResult> { Ok(vec![]) } + + async fn list_rate_limits(&self) -> RpcResult> { + Ok(vec![]) + } } #[cfg(test)] diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index dfd8ec21187fd..4bb9bfb2d4487 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -433,4 +433,16 @@ impl StreamManagerService for StreamServiceImpl { Ok(Response::new(ListActorSplitsResponse { actor_splits })) } + + async fn list_rate_limits( + &self, + _request: Request, + ) -> Result, Status> { + let rate_limits = self + .metadata_manager + .catalog_controller + .list_rate_limits() + .await?; + Ok(Response::new(ListRateLimitsResponse { rate_limits })) + } } diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 5139b5069d9d1..a908704129c75 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -39,6 +39,7 @@ use risingwave_meta_model::{ use risingwave_pb::catalog::source::PbOptionalAssociatedTableId; use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableVersion}; use risingwave_pb::catalog::{PbCreateType, PbTable}; +use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo; use risingwave_pb::meta::relation::{PbRelationInfo, RelationInfo}; use risingwave_pb::meta::subscribe_response::{ Info as NotificationInfo, Info, Operation as NotificationOperation, Operation, @@ -53,12 +54,12 @@ use risingwave_pb::stream_plan::update_mutation::PbMergeUpdate; use risingwave_pb::stream_plan::{ PbDispatcher, PbDispatcherType, PbFragmentTypeFlag, PbStreamActor, }; -use sea_orm::sea_query::{Expr, Query, SimpleExpr}; +use sea_orm::sea_query::{BinOper, Expr, Query, SimpleExpr}; use sea_orm::ActiveValue::Set; use sea_orm::{ ActiveEnum, ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, - JoinType, ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, - TransactionTrait, + IntoSimpleExpr, JoinType, ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect, + RelationTrait, TransactionTrait, }; use crate::barrier::{ReplaceTablePlan, Reschedule}; @@ -1332,9 +1333,11 @@ impl CatalogController { }); } if is_fs_source { - // scan all fragments for StreamFsFetch node if using fs connector + // in older versions, there's no fragment type flag for `FsFetch` node, + // so we just scan all fragments for StreamFsFetch node if using fs connector visit_stream_node(stream_node, |node| { if let PbNodeBody::StreamFsFetch(node) = node { + *fragment_type_mask |= PbFragmentTypeFlag::FsFetch as i32; if let Some(node_inner) = &mut node.node_inner && node_inner.source_id == source_id as u32 { @@ -1352,9 +1355,10 @@ impl CatalogController { "source id should be used by at least one fragment" ); let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec(); - for (id, _, stream_node) in fragments { + for (id, fragment_type_mask, stream_node) in fragments { fragment::ActiveModel { fragment_id: Set(id), + fragment_type_mask: Set(fragment_type_mask), stream_node: Set(StreamNode::from(&stream_node)), ..Default::default() } @@ -1409,9 +1413,7 @@ impl CatalogController { fragments.retain_mut(|(_, fragment_type_mask, stream_node)| { let mut found = false; - if (*fragment_type_mask & PbFragmentTypeFlag::StreamScan as i32 != 0) - || (*fragment_type_mask & PbFragmentTypeFlag::SourceScan as i32 != 0) - { + if *fragment_type_mask & PbFragmentTypeFlag::backfill_rate_limit_fragments() != 0 { visit_stream_node(stream_node, |node| match node { PbNodeBody::StreamCdcScan(node) => { node.rate_limit = rate_limit; @@ -1778,4 +1780,107 @@ impl CatalogController { Ok(()) } + + /// Note: `FsFetch` created in old versions are not included. + /// Since this is only used for debugging, it should be fine. + pub async fn list_rate_limits(&self) -> MetaResult> { + let inner = self.inner.read().await; + let txn = inner.db.begin().await?; + + let fragments: Vec<(FragmentId, ObjectId, i32, StreamNode)> = Fragment::find() + .select_only() + .columns([ + fragment::Column::FragmentId, + fragment::Column::JobId, + fragment::Column::FragmentTypeMask, + fragment::Column::StreamNode, + ]) + .filter(fragment_type_mask_intersects( + PbFragmentTypeFlag::rate_limit_fragments(), + )) + .into_tuple() + .all(&txn) + .await?; + + let mut rate_limits = Vec::new(); + for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments { + let mut stream_node = stream_node.to_protobuf(); + let mut rate_limit = None; + let mut node_name = None; + + visit_stream_node(&mut stream_node, |node| { + match node { + // source rate limit + PbNodeBody::Source(node) => { + if let Some(node_inner) = &mut node.source_inner { + debug_assert!( + rate_limit.is_none(), + "one fragment should only have 1 rate limit node" + ); + rate_limit = node_inner.rate_limit; + node_name = Some("SOURCE"); + } + } + PbNodeBody::StreamFsFetch(node) => { + if let Some(node_inner) = &mut node.node_inner { + debug_assert!( + rate_limit.is_none(), + "one fragment should only have 1 rate limit node" + ); + rate_limit = node_inner.rate_limit; + node_name = Some("FS_FETCH"); + } + } + // backfill rate limit + PbNodeBody::SourceBackfill(node) => { + debug_assert!( + rate_limit.is_none(), + "one fragment should only have 1 rate limit node" + ); + rate_limit = node.rate_limit; + node_name = Some("SOURCE_BACKFILL"); + } + PbNodeBody::StreamScan(node) => { + debug_assert!( + rate_limit.is_none(), + "one fragment should only have 1 rate limit node" + ); + rate_limit = node.rate_limit; + node_name = Some("STREAM_SCAN"); + } + PbNodeBody::StreamCdcScan(node) => { + debug_assert!( + rate_limit.is_none(), + "one fragment should only have 1 rate limit node" + ); + rate_limit = node.rate_limit; + node_name = Some("STREAM_CDC_SCAN"); + } + _ => {} + } + }); + + if let Some(rate_limit) = rate_limit { + rate_limits.push(RateLimitInfo { + fragment_id: fragment_id as u32, + job_id: job_id as u32, + fragment_type_mask: fragment_type_mask as u32, + rate_limit, + node_name: node_name.unwrap().to_string(), + }); + } + } + + Ok(rate_limits) + } +} + +fn bitflag_intersects(column: SimpleExpr, value: i32) -> SimpleExpr { + column + .binary(BinOper::Custom("&"), value) + .binary(BinOper::NotEqual, 0) +} + +fn fragment_type_mask_intersects(value: i32) -> SimpleExpr { + bitflag_intersects(fragment::Column::FragmentTypeMask.into_simple_expr(), value) } diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index db53f5fb8b6b3..b974ad82b0536 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -25,6 +25,7 @@ use risingwave_pb::catalog::{PbSink, PbSource, PbTable}; use risingwave_pb::common::worker_node::{PbResource, State}; use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType}; use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; +use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo; use risingwave_pb::meta::table_fragments::{Fragment, PbFragment}; use risingwave_pb::stream_plan::{PbDispatchStrategy, StreamActor}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; @@ -720,6 +721,11 @@ impl MetadataManager { pub fn cluster_id(&self) -> &ClusterId { self.cluster_controller.cluster_id() } + + pub async fn list_rate_limits(&self) -> MetaResult> { + let rate_limits = self.catalog_controller.list_rate_limits().await?; + Ok(rate_limits) + } } impl MetadataManager { diff --git a/src/prost/src/lib.rs b/src/prost/src/lib.rs index 5974a05664721..a4678df091270 100644 --- a/src/prost/src/lib.rs +++ b/src/prost/src/lib.rs @@ -302,6 +302,25 @@ impl stream_plan::StreamNode { } } +impl stream_plan::FragmentTypeFlag { + /// Fragments that may be affected by `BACKFILL_RATE_LIMIT`. + pub fn backfill_rate_limit_fragments() -> i32 { + stream_plan::FragmentTypeFlag::SourceScan as i32 + | stream_plan::FragmentTypeFlag::StreamScan as i32 + } + + /// Fragments that may be affected by `SOURCE_RATE_LIMIT`. + /// Note: for `FsFetch`, old fragments don't have this flag set, so don't use this to check. + pub fn source_rate_limit_fragments() -> i32 { + stream_plan::FragmentTypeFlag::Source as i32 | stream_plan::FragmentTypeFlag::FsFetch as i32 + } + + /// Note: this doesn't include `FsFetch` created in old versions. + pub fn rate_limit_fragments() -> i32 { + Self::backfill_rate_limit_fragments() | Self::source_rate_limit_fragments() + } +} + impl catalog::StreamSourceInfo { /// Refer to [`Self::cdc_source_job`] for details. pub fn is_shared(&self) -> bool { diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index be733e8d4ec1d..80213d0deda6c 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -25,6 +25,7 @@ use async_trait::async_trait; use cluster_limit_service_client::ClusterLimitServiceClient; use either::Either; use futures::stream::BoxStream; +use list_rate_limits_response::RateLimitInfo; use lru::LruCache; use risingwave_common::catalog::{FunctionId, IndexId, SecretId, TableId}; use risingwave_common::config::{MetaConfig, MAX_CONNECTION_WINDOW_SIZE}; @@ -1494,6 +1495,13 @@ impl MetaClient { self.inner.merge_compaction_group(req).await?; Ok(()) } + + /// List all rate limits for sources and backfills + pub async fn list_rate_limits(&self) -> Result> { + let request = ListRateLimitsRequest {}; + let resp = self.inner.list_rate_limits(request).await?; + Ok(resp.rate_limits) + } } #[async_trait] @@ -2044,6 +2052,7 @@ macro_rules! for_all_meta_rpc { ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse } ,{ stream_client, list_object_dependencies, ListObjectDependenciesRequest, ListObjectDependenciesResponse } ,{ stream_client, recover, RecoverRequest, RecoverResponse } + ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse } ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse } ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse } ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index f93d41aeed2cf..f8d449a253c30 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -3462,7 +3462,7 @@ impl Parser<'_> { } else if let Some(rate_limit) = self.parse_alter_source_rate_limit(false)? { AlterSourceOperation::SetSourceRateLimit { rate_limit } } else { - return self.expected("SCHEMA after SET"); + return self.expected("SCHEMA or SOURCE_RATE_LIMIT after SET"); } } else if self.peek_nth_any_of_keywords(0, &[Keyword::FORMAT]) { let format_encode = self.parse_schema()?.unwrap();