diff --git a/e2e_test/backfill/sink/create_sink.slt b/e2e_test/backfill/sink/create_sink.slt index 016e3bcb2049b..1eea929f3d50b 100644 --- a/e2e_test/backfill/sink/create_sink.slt +++ b/e2e_test/backfill/sink/create_sink.slt @@ -5,7 +5,7 @@ statement ok create table t (v1 int); statement ok -SET STREAMING_RATE_LIMIT = 500; +SET BACKFILL_RATE_LIMIT = 500; # Should finish in 20s statement ok diff --git a/e2e_test/backfill/sink/different_pk_and_dist_key.slt b/e2e_test/backfill/sink/different_pk_and_dist_key.slt index bc8256b28e62a..41c99f315b153 100644 --- a/e2e_test/backfill/sink/different_pk_and_dist_key.slt +++ b/e2e_test/backfill/sink/different_pk_and_dist_key.slt @@ -18,7 +18,7 @@ statement ok create materialized view m1 as select t.v1, t.v2, t.v3 from t join t2 using(v1); statement ok -set streaming_rate_limit = 1; +set backfill_rate_limit = 1; statement ok set background_ddl = true; diff --git a/e2e_test/background_ddl/basic.slt b/e2e_test/background_ddl/basic.slt index b0be505eb1dd9..3c98b6943610e 100644 --- a/e2e_test/background_ddl/basic.slt +++ b/e2e_test/background_ddl/basic.slt @@ -14,7 +14,7 @@ statement ok FLUSH; statement ok -SET STREAMING_RATE_LIMIT=10000; +SET BACKFILL_RATE_LIMIT=10000; statement ok CREATE MATERIALIZED VIEW m1 as SELECT * FROM t; diff --git a/e2e_test/background_ddl/sim/basic.slt b/e2e_test/background_ddl/sim/basic.slt index 35f5814fe8b4f..40cb606410130 100644 --- a/e2e_test/background_ddl/sim/basic.slt +++ b/e2e_test/background_ddl/sim/basic.slt @@ -14,7 +14,7 @@ statement ok FLUSH; statement ok -SET STREAMING_RATE_LIMIT=4000; +SET BACKFILL_RATE_LIMIT=4000; statement ok CREATE MATERIALIZED VIEW m1 as SELECT * FROM t; diff --git a/e2e_test/batch/catalog/pg_settings.slt.part b/e2e_test/batch/catalog/pg_settings.slt.part index f405cc71c2c0d..5744cd0362cbf 100644 --- a/e2e_test/batch/catalog/pg_settings.slt.part +++ b/e2e_test/batch/catalog/pg_settings.slt.part @@ -17,6 +17,7 @@ postmaster license_key postmaster max_concurrent_creating_streaming_jobs postmaster pause_on_next_bootstrap user application_name +user backfill_rate_limit user background_ddl user batch_enable_distributed_dml user batch_parallelism @@ -52,10 +53,10 @@ user server_encoding user server_version user server_version_num user sink_decouple +user source_rate_limit user standard_conforming_strings user statement_timeout user streaming_parallelism -user streaming_rate_limit user streaming_use_arrangement_backfill user synchronize_seqscans user timezone diff --git a/e2e_test/ddl/drop/drop_creating_mv.slt b/e2e_test/ddl/drop/drop_creating_mv.slt index 621ac216d4ec0..e9d8423f3cdec 100644 --- a/e2e_test/ddl/drop/drop_creating_mv.slt +++ b/e2e_test/ddl/drop/drop_creating_mv.slt @@ -8,7 +8,7 @@ statement ok flush; statement ok -set streaming_rate_limit=1; +set backfill_rate_limit=1; ############## Test drop foreground mv onlyif can-use-recover @@ -61,7 +61,7 @@ drop materialized view m1; ############## Make sure the mv can still be successfully created later. statement ok -set streaming_rate_limit=default; +set backfill_rate_limit=default; statement ok set background_ddl=false; diff --git a/e2e_test/ddl/throttle.slt b/e2e_test/ddl/throttle.slt index 9b6c2f053bf63..e8a33b0bfb433 100644 --- a/e2e_test/ddl/throttle.slt +++ b/e2e_test/ddl/throttle.slt @@ -1,20 +1,22 @@ -# streaming_rate_limit also applies to create sink and create source, please refer to -# e2e_test/source/basic/kafka.slt and e2e_test/sink/kafka/create_sink.slt for this part +# streaming_rate_limit applies to create source, please refer to +# e2e_test/source/basic/kafka.slt. +# backfill_rate_limit applies to create sink, please refer to +# e2e_test/sink/kafka/create_sink.slt. statement ok create table t1 (v1 int); # tracked in https://github.com/risingwavelabs/risingwave/issues/13474 -# create with duplicate streaming_rate_limit +# create with duplicate backfill_rate_limit statement error Duplicated option -create materialized view mv1 with (streaming_rate_limit = 1000, streaming_rate_limit = 2000) as select * from t1; +create materialized view mv1 with (backfill_rate_limit = 1000, backfill_rate_limit = 2000) as select * from t1; # create with unknown fields statement error unexpected options in WITH clause -create materialized view mv1 with (streaming_rate_limit = 1000, unknown_field = 2000) as select * from t1; +create materialized view mv1 with (backfill_rate_limit = 1000, unknown_field = 2000) as select * from t1; statement ok -create materialized view mv1 with (streaming_rate_limit = 1000) as select * from t1; +create materialized view mv1 with (backfill_rate_limit = 1000) as select * from t1; statement ok drop materialized view mv1; diff --git a/e2e_test/sink/kafka/create_sink.slt b/e2e_test/sink/kafka/create_sink.slt index be4c39c7cb1c6..338465c471af9 100644 --- a/e2e_test/sink/kafka/create_sink.slt +++ b/e2e_test/sink/kafka/create_sink.slt @@ -153,7 +153,7 @@ create sink multiple_pk_throttle from t_kafka with ( topic = 'test-rw-sink-debezium', type = 'debezium', primary_key = 'id,v_varchar', - streaming_rate_limit = 200 + backfill_rate_limit = 200 ); statement ok @@ -165,7 +165,7 @@ create sink multiple_pk_throttle_1 topic = 'test-rw-sink-debezium', type = 'debezium', primary_key = 'id,v_varchar', - streaming_rate_limit = 200 + backfill_rate_limit = 200 ); statement ok diff --git a/e2e_test/slow_tests/backfill/rate_limit/slow-udf.slt b/e2e_test/slow_tests/backfill/rate_limit/slow-udf.slt index a2b1a6fc63130..adf133ff7f3e7 100644 --- a/e2e_test/slow_tests/backfill/rate_limit/slow-udf.slt +++ b/e2e_test/slow_tests/backfill/rate_limit/slow-udf.slt @@ -5,7 +5,7 @@ statement ok insert into t select 2 from generate_series(1, 1000000); statement ok -set streaming_rate_limit=1; +set backfill_rate_limit=1; statement ok set background_ddl=true; @@ -25,7 +25,7 @@ statement ok set background_ddl = false; statement ok -set streaming_rate_limit=default; +set backfill_rate_limit=default; statement ok flush; diff --git a/e2e_test/slow_tests/udf/always_retry_python.slt b/e2e_test/slow_tests/udf/always_retry_python.slt index 78bf926c32986..18184846f272a 100644 --- a/e2e_test/slow_tests/udf/always_retry_python.slt +++ b/e2e_test/slow_tests/udf/always_retry_python.slt @@ -21,7 +21,7 @@ statement ok flush; statement ok -SET STREAMING_RATE_LIMIT=1; +SET BACKFILL_RATE_LIMIT=1; statement ok SET BACKGROUND_DDL=true; @@ -57,7 +57,7 @@ SELECT count(*) FROM mv_always_retry where s1 is NULL; # t statement ok -SET STREAMING_RATE_LIMIT TO DEFAULT; +SET BACKFILL_RATE_LIMIT TO DEFAULT; statement ok SET BACKGROUND_DDL=false; diff --git a/e2e_test/source/basic/alter/rate_limit_source_kafka.slt b/e2e_test/source/basic/alter/rate_limit_source_kafka.slt index 7d991083345a1..e76cf72c0220a 100644 --- a/e2e_test/source/basic/alter/rate_limit_source_kafka.slt +++ b/e2e_test/source/basic/alter/rate_limit_source_kafka.slt @@ -44,7 +44,7 @@ sleep 3s ############## Create MV on source statement ok -SET STREAMING_RATE_LIMIT=0; +SET SOURCE_RATE_LIMIT=0; statement ok create materialized view rl_mv1 as select count(*) from kafka_source; @@ -56,7 +56,7 @@ statement ok create materialized view rl_mv3 as select count(*) from kafka_source; statement ok -SET STREAMING_RATE_LIMIT=default; +SET SOURCE_RATE_LIMIT=default; ############## MVs should have 0 records, since source has (rate_limit = 0) @@ -82,11 +82,11 @@ select * from rl_mv3; skipif in-memory query I -alter source kafka_source set streaming_rate_limit to 1000; +alter source kafka_source set source_rate_limit to 1000; skipif in-memory query I -alter source kafka_source set streaming_rate_limit to default; +alter source kafka_source set source_rate_limit to default; skipif in-memory sleep 3s diff --git a/e2e_test/source/basic/alter/rate_limit_table_kafka.slt b/e2e_test/source/basic/alter/rate_limit_table_kafka.slt index bf1fd6672d6ea..d93687799cac7 100644 --- a/e2e_test/source/basic/alter/rate_limit_table_kafka.slt +++ b/e2e_test/source/basic/alter/rate_limit_table_kafka.slt @@ -27,7 +27,7 @@ create table kafka_source (v1 int) with ( topic = 'kafka_source', properties.bootstrap.server = 'message_queue:29092', scan.startup.mode = 'earliest', - streaming_rate_limit = 0 + source_rate_limit = 0 ) FORMAT PLAIN ENCODE JSON statement ok @@ -61,11 +61,11 @@ select count(*) from kafka_source; skipif in-memory query I -alter table kafka_source set streaming_rate_limit to 1000; +alter table kafka_source set source_rate_limit to 1000; skipif in-memory query I -alter table kafka_source set streaming_rate_limit to default; +alter table kafka_source set source_rate_limit to default; skipif in-memory sleep 3s diff --git a/e2e_test/source/basic/kafka.slt b/e2e_test/source/basic/kafka.slt index dee5de1cbb539..40e9b46036112 100644 --- a/e2e_test/source/basic/kafka.slt +++ b/e2e_test/source/basic/kafka.slt @@ -391,7 +391,7 @@ create table s29 (id bytea) with ( topic = 'kafka_source_format_bytes', properties.bootstrap.server = 'message_queue:29092', scan.startup.mode = 'earliest', - streaming_rate_limit = 200 + source_rate_limit = 200 ) FORMAT PLAIN ENCODE BYTES statement ok diff --git a/e2e_test/streaming/rate_limit/basic.slt b/e2e_test/streaming/rate_limit/basic.slt index 3d5464d604d8b..5fb3548197fae 100644 --- a/e2e_test/streaming/rate_limit/basic.slt +++ b/e2e_test/streaming/rate_limit/basic.slt @@ -5,7 +5,7 @@ statement ok CREATE TABLE t1(v1 int, v2 int); statement ok -SET STREAMING_RATE_LIMIT TO 2000; +SET BACKFILL_RATE_LIMIT TO 2000; statement ok CREATE MATERIALIZED VIEW m AS SELECT * FROM t1; @@ -35,7 +35,7 @@ FLUSH; # Test with small rate_limit. In this case, the stream chunk needs to be split statement ok -SET STREAMING_RATE_LIMIT TO 1; +SET BACKFILL_RATE_LIMIT TO 1; statement ok CREATE MATERIALIZED VIEW m AS SELECT * FROM t1; diff --git a/e2e_test/streaming/rate_limit/snapshot_amplification.slt b/e2e_test/streaming/rate_limit/snapshot_amplification.slt index f704bc637c02c..087bc01a80465 100644 --- a/e2e_test/streaming/rate_limit/snapshot_amplification.slt +++ b/e2e_test/streaming/rate_limit/snapshot_amplification.slt @@ -6,7 +6,7 @@ statement ok SET STREAMING_PARALLELISM=2; statement ok -SET STREAMING_RATE_LIMIT=1; +SET BACKFILL_RATE_LIMIT=1; statement ok CREATE TABLE table (i1 int); diff --git a/e2e_test/streaming/rate_limit/upstream_amplification.slt b/e2e_test/streaming/rate_limit/upstream_amplification.slt index 63528472050a8..989a8310bf4ce 100644 --- a/e2e_test/streaming/rate_limit/upstream_amplification.slt +++ b/e2e_test/streaming/rate_limit/upstream_amplification.slt @@ -7,7 +7,7 @@ statement ok SET STREAMING_PARALLELISM=2; statement ok -SET STREAMING_RATE_LIMIT=1; +SET BACKFILL_RATE_LIMIT=1; statement ok CREATE TABLE source_table (i1 int) diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index 8fbfd2cd6e969..16cc337c6f46a 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -54,7 +54,8 @@ type SessionConfigResult = std::result::Result; // NOTE(kwannoel): We declare it separately as a constant, // otherwise seems like it can't infer the type of -1 when written inline. -const DISABLE_STREAMING_RATE_LIMIT: i32 = -1; +const DISABLE_BACKFILL_RATE_LIMIT: i32 = -1; +const DISABLE_SOURCE_RATE_LIMIT: i32 = -1; #[serde_as] /// This is the Session Config of RisingWave. @@ -253,11 +254,17 @@ pub struct SessionConfig { #[parameter(default = STANDARD_CONFORMING_STRINGS)] standard_conforming_strings: String, + /// Set streaming rate limit (rows per second) for each parallelism for mv / source / sink backfilling + /// If set to -1, disable rate limit. + /// If set to 0, this pauses the snapshot read / source read. + #[parameter(default = DISABLE_BACKFILL_RATE_LIMIT)] + backfill_rate_limit: i32, + /// Set streaming rate limit (rows per second) for each parallelism for mv / source backfilling, source reads. /// If set to -1, disable rate limit. /// If set to 0, this pauses the snapshot read / source read. - #[parameter(default = DISABLE_STREAMING_RATE_LIMIT)] - streaming_rate_limit: i32, + #[parameter(default = DISABLE_SOURCE_RATE_LIMIT)] + source_rate_limit: i32, /// Cache policy for partition cache in streaming over window. /// Can be "full", "recent", "`recent_first_n`" or "`recent_last_n`". diff --git a/src/frontend/src/handler/alter_streaming_rate_limit.rs b/src/frontend/src/handler/alter_streaming_rate_limit.rs index e5273b85d57b8..d255da31a52ed 100644 --- a/src/frontend/src/handler/alter_streaming_rate_limit.rs +++ b/src/frontend/src/handler/alter_streaming_rate_limit.rs @@ -68,7 +68,7 @@ pub async fn handle_alter_streaming_rate_limit( let source_id = if let Some(id) = table.associated_source_id { id.table_id() } else { - bail!("ALTER STREAMING_RATE_LIMIT is not for table without source") + bail!("ALTER SOURCE_RATE_LIMIT is not for table without source") }; (StatementType::ALTER_SOURCE, source_id) } diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index f8beeedb19438..c3dcf0ce59768 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -697,7 +697,7 @@ pub async fn handle( } => alter_table_with_sr::handle_refresh_schema(handler_args, name).await, Statement::AlterTable { name, - operation: AlterTableOperation::SetStreamingRateLimit { rate_limit }, + operation: AlterTableOperation::SetSourceRateLimit { rate_limit }, } => { alter_streaming_rate_limit::handle_alter_streaming_rate_limit( handler_args, @@ -814,7 +814,7 @@ pub async fn handle( Statement::AlterView { materialized, name, - operation: AlterViewOperation::SetStreamingRateLimit { rate_limit }, + operation: AlterViewOperation::SetBackfillRateLimit { rate_limit }, } if materialized => { alter_streaming_rate_limit::handle_alter_streaming_rate_limit( handler_args, @@ -946,7 +946,7 @@ pub async fn handle( } => alter_source_with_sr::handler_refresh_schema(handler_args, name).await, Statement::AlterSource { name, - operation: AlterSourceOperation::SetStreamingRateLimit { rate_limit }, + operation: AlterSourceOperation::SetSourceRateLimit { rate_limit }, } => { alter_streaming_rate_limit::handle_alter_streaming_rate_limit( handler_args, diff --git a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs index a7aef5195ea5a..406ebdc8bfae5 100644 --- a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs @@ -256,7 +256,7 @@ impl StreamCdcTableScan { // The table desc used by backfill executor state_table: Some(catalog), cdc_table_desc: Some(self.core.cdc_table_desc.to_protobuf()), - rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit, + rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit, disable_backfill: options.disable_backfill, options: Some(options), }); diff --git a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs index 08516631dc75b..3f8c89d7def1a 100644 --- a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs +++ b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs @@ -119,7 +119,7 @@ impl StreamNode for StreamFsFetch { .map(|c| c.to_protobuf()) .collect_vec(), with_properties, - rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit, + rate_limit: self.base.ctx().overwrite_options().source_rate_limit, secret_refs, } }); diff --git a/src/frontend/src/optimizer/plan_node/stream_source.rs b/src/frontend/src/optimizer/plan_node/stream_source.rs index 980df7911c7f3..159748bbdc18a 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source.rs @@ -112,7 +112,7 @@ impl StreamNode for StreamSource { .map(|c| c.to_protobuf()) .collect_vec(), with_properties, - rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit, + rate_limit: self.base.ctx().overwrite_options().source_rate_limit, secret_refs, } }); diff --git a/src/frontend/src/optimizer/plan_node/stream_source_scan.rs b/src/frontend/src/optimizer/plan_node/stream_source_scan.rs index 47689c1038790..83c79259952b2 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source_scan.rs @@ -158,7 +158,7 @@ impl StreamSourceScan { .map(|c| c.to_protobuf()) .collect_vec(), with_properties, - rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit, + rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit, secret_refs, }; diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index 2255194dbee64..8a85a0c324dca 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -320,7 +320,7 @@ impl StreamTableScan { table_desc: Some(self.core.table_desc.try_to_protobuf()?), state_table: Some(catalog), arrangement_table, - rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit, + rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit, ..Default::default() }); diff --git a/src/frontend/src/utils/overwrite_options.rs b/src/frontend/src/utils/overwrite_options.rs index 904518331234f..185f1b80b154b 100644 --- a/src/frontend/src/utils/overwrite_options.rs +++ b/src/frontend/src/utils/overwrite_options.rs @@ -16,20 +16,34 @@ use crate::handler::HandlerArgs; #[derive(Debug, Clone, Default)] pub struct OverwriteOptions { - pub streaming_rate_limit: Option, + pub source_rate_limit: Option, + pub backfill_rate_limit: Option, } impl OverwriteOptions { - pub(crate) const STREAMING_RATE_LIMIT_KEY: &'static str = "streaming_rate_limit"; + pub(crate) const BACKFILL_RATE_LIMIT_KEY: &'static str = "backfill_rate_limit"; + pub(crate) const SOURCE_RATE_LIMIT_KEY: &'static str = "source_rate_limit"; pub fn new(args: &mut HandlerArgs) -> Self { - let streaming_rate_limit = { - // CREATE MATERIALIZED VIEW m1 WITH (rate_limit = N) ... - if let Some(x) = args.with_options.remove(Self::STREAMING_RATE_LIMIT_KEY) { + let source_rate_limit = { + if let Some(x) = args.with_options.remove(Self::SOURCE_RATE_LIMIT_KEY) { // FIXME(tabVersion): validate the value Some(x.parse::().unwrap()) } else { - let rate_limit = args.session.config().streaming_rate_limit(); + let rate_limit = args.session.config().source_rate_limit(); + if rate_limit < 0 { + None + } else { + Some(rate_limit as u32) + } + } + }; + let backfill_rate_limit = { + if let Some(x) = args.with_options.remove(Self::BACKFILL_RATE_LIMIT_KEY) { + // FIXME(tabVersion): validate the value + Some(x.parse::().unwrap()) + } else { + let rate_limit = args.session.config().backfill_rate_limit(); if rate_limit < 0 { None } else { @@ -38,7 +52,8 @@ impl OverwriteOptions { } }; Self { - streaming_rate_limit, + source_rate_limit, + backfill_rate_limit, } } } diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs index 2456dceb9fb19..e306103c02e39 100644 --- a/src/frontend/src/utils/with_options.rs +++ b/src/frontend/src/utils/with_options.rs @@ -89,8 +89,7 @@ impl WithOptions { .inner .into_iter() .filter(|(key, _)| { - key != OverwriteOptions::STREAMING_RATE_LIMIT_KEY - && key != options::RETENTION_SECONDS + key != OverwriteOptions::SOURCE_RATE_LIMIT_KEY && key != options::RETENTION_SECONDS }) .collect(); diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index 92683f42e742a..6ea385df950fc 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -102,8 +102,8 @@ pub enum AlterTableOperation { deferred: bool, }, RefreshSchema, - /// `SET STREAMING_RATE_LIMIT TO ` - SetStreamingRateLimit { + /// `SET SOURCE_RATE_LIMIT TO ` + SetSourceRateLimit { rate_limit: i32, }, } @@ -138,8 +138,8 @@ pub enum AlterViewOperation { parallelism: SetVariableValue, deferred: bool, }, - /// `SET STREAMING_RATE_LIMIT TO ` - SetStreamingRateLimit { + /// `SET BACKFILL_RATE_LIMIT TO ` + SetBackfillRateLimit { rate_limit: i32, }, } @@ -180,7 +180,7 @@ pub enum AlterSourceOperation { SetSchema { new_schema_name: ObjectName }, FormatEncode { connector_schema: ConnectorSchema }, RefreshSchema, - SetStreamingRateLimit { rate_limit: i32 }, + SetSourceRateLimit { rate_limit: i32 }, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -290,8 +290,8 @@ impl fmt::Display for AlterTableOperation { AlterTableOperation::RefreshSchema => { write!(f, "REFRESH SCHEMA") } - AlterTableOperation::SetStreamingRateLimit { rate_limit } => { - write!(f, "SET STREAMING_RATE_LIMIT TO {}", rate_limit) + AlterTableOperation::SetSourceRateLimit { rate_limit } => { + write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit) } } } @@ -341,8 +341,8 @@ impl fmt::Display for AlterViewOperation { if *deferred { " DEFERRED" } else { "" } ) } - AlterViewOperation::SetStreamingRateLimit { rate_limit } => { - write!(f, "SET STREAMING_RATE_LIMIT TO {}", rate_limit) + AlterViewOperation::SetBackfillRateLimit { rate_limit } => { + write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit) } } } @@ -412,8 +412,8 @@ impl fmt::Display for AlterSourceOperation { AlterSourceOperation::RefreshSchema => { write!(f, "REFRESH SCHEMA") } - AlterSourceOperation::SetStreamingRateLimit { rate_limit } => { - write!(f, "SET STREAMING_RATE_LIMIT TO {}", rate_limit) + AlterSourceOperation::SetSourceRateLimit { rate_limit } => { + write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit) } } } diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index 561b602298f11..768301544ef21 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -489,7 +489,6 @@ define_keywords!( STDDEV_SAMP, STDIN, STORED, - STREAMING_RATE_LIMIT, STRING, STRUCT, SUBMULTISET, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 449db7864f2b2..1fe1b16e9601c 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -1812,6 +1812,18 @@ impl Parser<'_> { self.expected(expected) } + /// Check if the expected match is the next token. + /// The equality check is case-insensitive. + pub fn parse_word(&mut self, expected: &str) -> bool { + match self.peek_token().token { + Token::Word(w) if w.value.to_uppercase() == expected => { + self.next_token(); + true + } + _ => false, + } + } + /// Look for an expected keyword and consume it if it exists #[must_use] pub fn parse_keyword(&mut self, expected: Keyword) -> bool { @@ -3078,10 +3090,10 @@ impl Parser<'_> { parallelism: value, deferred, } - } else if let Some(rate_limit) = self.parse_alter_streaming_rate_limit()? { - AlterTableOperation::SetStreamingRateLimit { rate_limit } + } else if let Some(rate_limit) = self.parse_alter_source_rate_limit(true)? { + AlterTableOperation::SetSourceRateLimit { rate_limit } } else { - return self.expected("SCHEMA/PARALLELISM/STREAMING_RATE_LIMIT after SET"); + return self.expected("SCHEMA/PARALLELISM/SOURCE_RATE_LIMIT after SET"); } } else if self.parse_keyword(Keyword::DROP) { let _ = self.parse_keyword(Keyword::COLUMN); @@ -3133,14 +3145,37 @@ impl Parser<'_> { }) } - /// STREAMING_RATE_LIMIT = default | NUMBER - /// STREAMING_RATE_LIMIT TO default | NUMBER - pub fn parse_alter_streaming_rate_limit(&mut self) -> PResult> { - if !self.parse_keyword(Keyword::STREAMING_RATE_LIMIT) { + /// BACKFILL_RATE_LIMIT = default | NUMBER + /// BACKFILL_RATE_LIMIT TO default | NUMBER + pub fn parse_alter_backfill_rate_limit(&mut self) -> PResult> { + if !self.parse_word("BACKFILL_RATE_LIMIT") { + return Ok(None); + } + if self.expect_keyword(Keyword::TO).is_err() && self.expect_token(&Token::Eq).is_err() { + return self.expected("TO or = after ALTER TABLE SET BACKFILL_RATE_LIMIT"); + } + let rate_limit = if self.parse_keyword(Keyword::DEFAULT) { + -1 + } else { + let s = self.parse_number_value()?; + if let Ok(n) = s.parse::() { + n + } else { + return self.expected("number or DEFAULT"); + } + }; + Ok(Some(rate_limit)) + } + + /// SOURCE_RATE_LIMIT = default | NUMBER + /// SOURCE_RATE_LIMIT TO default | NUMBER + pub fn parse_alter_source_rate_limit(&mut self, is_table: bool) -> PResult> { + if !self.parse_word("SOURCE_RATE_LIMIT") { return Ok(None); } if self.expect_keyword(Keyword::TO).is_err() && self.expect_token(&Token::Eq).is_err() { - return self.expected("TO or = after ALTER TABLE SET STREAMING_RATE_LIMIT"); + let ddl = if is_table { "TABLE" } else { "SOURCE" }; + return self.expected(&format!("TO or = after ALTER {ddl} SET SOURCE_RATE_LIMIT")); } let rate_limit = if self.parse_keyword(Keyword::DEFAULT) { -1 @@ -3229,11 +3264,11 @@ impl Parser<'_> { deferred, } } else if materialized - && let Some(rate_limit) = self.parse_alter_streaming_rate_limit()? + && let Some(rate_limit) = self.parse_alter_backfill_rate_limit()? { - AlterViewOperation::SetStreamingRateLimit { rate_limit } + AlterViewOperation::SetBackfillRateLimit { rate_limit } } else { - return self.expected("SCHEMA/PARALLELISM/STREAMING_RATE_LIMIT after SET"); + return self.expected("SCHEMA/PARALLELISM/BACKFILL_RATE_LIMIT after SET"); } } else { return self.expected(&format!( @@ -3354,8 +3389,8 @@ impl Parser<'_> { AlterSourceOperation::SetSchema { new_schema_name: schema_name, } - } else if let Some(rate_limit) = self.parse_alter_streaming_rate_limit()? { - AlterSourceOperation::SetStreamingRateLimit { rate_limit } + } else if let Some(rate_limit) = self.parse_alter_source_rate_limit(false)? { + AlterSourceOperation::SetSourceRateLimit { rate_limit } } else { return self.expected("SCHEMA after SET"); } @@ -3369,7 +3404,7 @@ impl Parser<'_> { AlterSourceOperation::RefreshSchema } else { return self.expected( - "RENAME, ADD COLUMN, OWNER TO, SET or STREAMING_RATE_LIMIT after ALTER SOURCE", + "RENAME, ADD COLUMN, OWNER TO, SET or SOURCE_RATE_LIMIT after ALTER SOURCE", ); }; diff --git a/src/tests/simulation/tests/integration_tests/backfill_tests.rs b/src/tests/simulation/tests/integration_tests/backfill_tests.rs index 1d8f7bb727101..fb7cb3db6fb8b 100644 --- a/src/tests/simulation/tests/integration_tests/backfill_tests.rs +++ b/src/tests/simulation/tests/integration_tests/backfill_tests.rs @@ -167,7 +167,7 @@ async fn test_arrangement_backfill_replication() -> Result<()> { session .run("SET STREAMING_USE_ARRANGEMENT_BACKFILL=true") .await?; - session.run("SET STREAMING_RATE_LIMIT=30").await?; + session.run("SET BACKFILL_RATE_LIMIT=30").await?; session .run("create materialized view m1 as select * from t") .await?; @@ -251,7 +251,7 @@ async fn test_arrangement_backfill_progress() -> Result<()> { // Create arrangement backfill with rate limit session.run("SET STREAMING_PARALLELISM=1").await?; session.run("SET BACKGROUND_DDL=true").await?; - session.run("SET STREAMING_RATE_LIMIT=1").await?; + session.run("SET BACKFILL_RATE_LIMIT=1").await?; session .run("CREATE MATERIALIZED VIEW m1 AS SELECT * FROM t") .await?; @@ -311,7 +311,7 @@ async fn test_enable_arrangement_backfill() -> Result<()> { async fn test_recovery_cancels_foreground_ddl() -> Result<()> { let mut cluster = Cluster::start(Configuration::enable_arrangement_backfill()).await?; let mut session = cluster.start_session(); - session.run("SET STREAMING_RATE_LIMIT=1").await?; + session.run("SET BACKFILL_RATE_LIMIT=1").await?; session.run("CREATE TABLE t(v1 int);").await?; session .run("INSERT INTO t select * from generate_series(1, 100000);") diff --git a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs index 80ff76a3f1020..b3776bcd57b64 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs @@ -28,9 +28,9 @@ const DROP_TABLE: &str = "DROP TABLE t;"; const SEED_TABLE_500: &str = "INSERT INTO t SELECT generate_series FROM generate_series(1, 500);"; const SEED_TABLE_100: &str = "INSERT INTO t SELECT generate_series FROM generate_series(1, 100);"; const SET_BACKGROUND_DDL: &str = "SET BACKGROUND_DDL=true;"; -const SET_RATE_LIMIT_2: &str = "SET STREAMING_RATE_LIMIT=2;"; -const SET_RATE_LIMIT_1: &str = "SET STREAMING_RATE_LIMIT=1;"; -const RESET_RATE_LIMIT: &str = "SET STREAMING_RATE_LIMIT=DEFAULT;"; +const SET_RATE_LIMIT_2: &str = "SET BACKFILL_RATE_LIMIT=2;"; +const SET_RATE_LIMIT_1: &str = "SET BACKFILL_RATE_LIMIT=1;"; +const RESET_RATE_LIMIT: &str = "SET BACKFILL_RATE_LIMIT=DEFAULT;"; const CREATE_MV1: &str = "CREATE MATERIALIZED VIEW mv1 as SELECT * FROM t;"; const DROP_MV1: &str = "DROP MATERIALIZED VIEW mv1;"; const WAIT: &str = "WAIT;";