From 717e5dc99d3a4db4f91e614e85630f950b56e4a7 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Mon, 29 Jul 2024 17:35:18 +0800 Subject: [PATCH] change streaming rate limit to source rate limit for sources --- src/frontend/src/handler/mod.rs | 2 +- src/sqlparser/src/ast/ddl.rs | 6 +++--- src/sqlparser/src/keywords.rs | 2 -- src/sqlparser/src/parser.rs | 31 +++++++++++++++++++++---------- 4 files changed, 25 insertions(+), 16 deletions(-) diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 21f956d53942..a12c5e8b4a68 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -946,7 +946,7 @@ pub async fn handle( } => alter_source_with_sr::handler_refresh_schema(handler_args, name).await, Statement::AlterSource { name, - operation: AlterSourceOperation::SetStreamingRateLimit { rate_limit }, + operation: AlterSourceOperation::SetSourceRateLimit { rate_limit }, } => { alter_streaming_rate_limit::handle_alter_streaming_rate_limit( handler_args, diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index 4a318f640a70..39f73a1a172e 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -180,7 +180,7 @@ pub enum AlterSourceOperation { SetSchema { new_schema_name: ObjectName }, FormatEncode { connector_schema: ConnectorSchema }, RefreshSchema, - SetStreamingRateLimit { rate_limit: i32 }, + SetSourceRateLimit { rate_limit: i32 }, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -412,8 +412,8 @@ impl fmt::Display for AlterSourceOperation { AlterSourceOperation::RefreshSchema => { write!(f, "REFRESH SCHEMA") } - AlterSourceOperation::SetStreamingRateLimit { rate_limit } => { - write!(f, "SET STREAMING_RATE_LIMIT TO {}", rate_limit) + AlterSourceOperation::SetSourceRateLimit { rate_limit } => { + write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit) } } } diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index 4f827f3612bf..768301544ef2 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -95,7 +95,6 @@ define_keywords!( AUTHORIZATION, AUTO, AVG, - BACKFILL_RATE_LIMIT, BASE64, BEGIN, BEGIN_FRAME, @@ -490,7 +489,6 @@ define_keywords!( STDDEV_SAMP, STDIN, STORED, - STREAMING_RATE_LIMIT, STRING, STRUCT, SUBMULTISET, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index ada9995de783..fabc50568650 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -1812,6 +1812,16 @@ impl Parser<'_> { self.expected(expected) } + pub fn parse_word(&mut self, expected: &str) -> bool { + match self.peek_token().token { + Token::Word(w) if w.value == expected => { + self.next_token(); + true + } + _ => false, + } + } + /// Look for an expected keyword and consume it if it exists #[must_use] pub fn parse_keyword(&mut self, expected: Keyword) -> bool { @@ -3078,10 +3088,10 @@ impl Parser<'_> { parallelism: value, deferred, } - } else if let Some(rate_limit) = self.parse_alter_streaming_rate_limit()? { + } else if let Some(rate_limit) = self.parse_alter_source_rate_limit(true)? { AlterTableOperation::SetStreamingRateLimit { rate_limit } } else { - return self.expected("SCHEMA/PARALLELISM/STREAMING_RATE_LIMIT after SET"); + return self.expected("SCHEMA/PARALLELISM/SOURCE_RATE_LIMIT after SET"); } } else if self.parse_keyword(Keyword::DROP) { let _ = self.parse_keyword(Keyword::COLUMN); @@ -3136,7 +3146,7 @@ impl Parser<'_> { /// BACKFILL_RATE_LIMIT = default | NUMBER /// BACKFILL_RATE_LIMIT TO default | NUMBER pub fn parse_alter_backfill_rate_limit(&mut self) -> PResult> { - if !self.parse_keyword(Keyword::BACKFILL_RATE_LIMIT) { + if !self.parse_word("BACKFILL_RATE_LIMIT") { return Ok(None); } if self.expect_keyword(Keyword::TO).is_err() && self.expect_token(&Token::Eq).is_err() { @@ -3155,14 +3165,15 @@ impl Parser<'_> { Ok(Some(rate_limit)) } - /// STREAMING_RATE_LIMIT = default | NUMBER - /// STREAMING_RATE_LIMIT TO default | NUMBER - pub fn parse_alter_streaming_rate_limit(&mut self) -> PResult> { - if !self.parse_keyword(Keyword::STREAMING_RATE_LIMIT) { + /// SOURCE_RATE_LIMIT = default | NUMBER + /// SOURCE_RATE_LIMIT TO default | NUMBER + pub fn parse_alter_source_rate_limit(&mut self, is_table: bool) -> PResult> { + if !self.parse_word("SOURCE_RATE_LIMIT") { return Ok(None); } if self.expect_keyword(Keyword::TO).is_err() && self.expect_token(&Token::Eq).is_err() { - return self.expected("TO or = after ALTER TABLE SET STREAMING_RATE_LIMIT"); + let ddl = if is_table { "TABLE" } else { "SOURCE" }; + return self.expected(&format!("TO or = after ALTER {ddl} SET SOURCE_RATE_LIMIT")); } let rate_limit = if self.parse_keyword(Keyword::DEFAULT) { -1 @@ -3376,8 +3387,8 @@ impl Parser<'_> { AlterSourceOperation::SetSchema { new_schema_name: schema_name, } - } else if let Some(rate_limit) = self.parse_alter_streaming_rate_limit()? { - AlterSourceOperation::SetStreamingRateLimit { rate_limit } + } else if let Some(rate_limit) = self.parse_alter_source_rate_limit(false)? { + AlterSourceOperation::SetSourceRateLimit { rate_limit } } else { return self.expected("SCHEMA after SET"); }