diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs
index 21f956d539421..a12c5e8b4a68e 100644
--- a/src/frontend/src/handler/mod.rs
+++ b/src/frontend/src/handler/mod.rs
@@ -946,7 +946,7 @@ pub async fn handle(
} => alter_source_with_sr::handler_refresh_schema(handler_args, name).await,
Statement::AlterSource {
name,
- operation: AlterSourceOperation::SetStreamingRateLimit { rate_limit },
+ operation: AlterSourceOperation::SetSourceRateLimit { rate_limit },
} => {
alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
handler_args,
diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs
index 4a318f640a706..39f73a1a172e1 100644
--- a/src/sqlparser/src/ast/ddl.rs
+++ b/src/sqlparser/src/ast/ddl.rs
@@ -180,7 +180,7 @@ pub enum AlterSourceOperation {
SetSchema { new_schema_name: ObjectName },
FormatEncode { connector_schema: ConnectorSchema },
RefreshSchema,
- SetStreamingRateLimit { rate_limit: i32 },
+ SetSourceRateLimit { rate_limit: i32 },
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
@@ -412,8 +412,8 @@ impl fmt::Display for AlterSourceOperation {
AlterSourceOperation::RefreshSchema => {
write!(f, "REFRESH SCHEMA")
}
- AlterSourceOperation::SetStreamingRateLimit { rate_limit } => {
- write!(f, "SET STREAMING_RATE_LIMIT TO {}", rate_limit)
+ AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
+ write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
}
}
}
diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs
index 4f827f3612bf9..768301544ef21 100644
--- a/src/sqlparser/src/keywords.rs
+++ b/src/sqlparser/src/keywords.rs
@@ -95,7 +95,6 @@ define_keywords!(
AUTHORIZATION,
AUTO,
AVG,
- BACKFILL_RATE_LIMIT,
BASE64,
BEGIN,
BEGIN_FRAME,
@@ -490,7 +489,6 @@ define_keywords!(
STDDEV_SAMP,
STDIN,
STORED,
- STREAMING_RATE_LIMIT,
STRING,
STRUCT,
SUBMULTISET,
diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs
index ada9995de783c..fabc505686507 100644
--- a/src/sqlparser/src/parser.rs
+++ b/src/sqlparser/src/parser.rs
@@ -1812,6 +1812,16 @@ impl Parser<'_> {
self.expected(expected)
}
+ pub fn parse_word(&mut self, expected: &str) -> bool {
+ match self.peek_token().token {
+ Token::Word(w) if w.value == expected => {
+ self.next_token();
+ true
+ }
+ _ => false,
+ }
+ }
+
/// Look for an expected keyword and consume it if it exists
#[must_use]
pub fn parse_keyword(&mut self, expected: Keyword) -> bool {
@@ -3078,10 +3088,10 @@ impl Parser<'_> {
parallelism: value,
deferred,
}
- } else if let Some(rate_limit) = self.parse_alter_streaming_rate_limit()? {
+ } else if let Some(rate_limit) = self.parse_alter_source_rate_limit(true)? {
AlterTableOperation::SetStreamingRateLimit { rate_limit }
} else {
- return self.expected("SCHEMA/PARALLELISM/STREAMING_RATE_LIMIT after SET");
+ return self.expected("SCHEMA/PARALLELISM/SOURCE_RATE_LIMIT after SET");
}
} else if self.parse_keyword(Keyword::DROP) {
let _ = self.parse_keyword(Keyword::COLUMN);
@@ -3136,7 +3146,7 @@ impl Parser<'_> {
/// BACKFILL_RATE_LIMIT = default | NUMBER
/// BACKFILL_RATE_LIMIT TO default | NUMBER
pub fn parse_alter_backfill_rate_limit(&mut self) -> PResult