Skip to content

Commit

Permalink
change streaming rate limit to source rate limit for sources
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jul 29, 2024
1 parent 2a7e975 commit 717e5dc
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 16 deletions.
2 changes: 1 addition & 1 deletion src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ pub async fn handle(
} => alter_source_with_sr::handler_refresh_schema(handler_args, name).await,
Statement::AlterSource {
name,
operation: AlterSourceOperation::SetStreamingRateLimit { rate_limit },
operation: AlterSourceOperation::SetSourceRateLimit { rate_limit },
} => {
alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
handler_args,
Expand Down
6 changes: 3 additions & 3 deletions src/sqlparser/src/ast/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ pub enum AlterSourceOperation {
SetSchema { new_schema_name: ObjectName },
FormatEncode { connector_schema: ConnectorSchema },
RefreshSchema,
SetStreamingRateLimit { rate_limit: i32 },
SetSourceRateLimit { rate_limit: i32 },
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -412,8 +412,8 @@ impl fmt::Display for AlterSourceOperation {
AlterSourceOperation::RefreshSchema => {
write!(f, "REFRESH SCHEMA")
}
AlterSourceOperation::SetStreamingRateLimit { rate_limit } => {
write!(f, "SET STREAMING_RATE_LIMIT TO {}", rate_limit)
AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions src/sqlparser/src/keywords.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ define_keywords!(
AUTHORIZATION,
AUTO,
AVG,
BACKFILL_RATE_LIMIT,
BASE64,
BEGIN,
BEGIN_FRAME,
Expand Down Expand Up @@ -490,7 +489,6 @@ define_keywords!(
STDDEV_SAMP,
STDIN,
STORED,
STREAMING_RATE_LIMIT,
STRING,
STRUCT,
SUBMULTISET,
Expand Down
31 changes: 21 additions & 10 deletions src/sqlparser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1812,6 +1812,16 @@ impl Parser<'_> {
self.expected(expected)
}

pub fn parse_word(&mut self, expected: &str) -> bool {
match self.peek_token().token {
Token::Word(w) if w.value == expected => {
self.next_token();
true
}
_ => false,
}
}

/// Look for an expected keyword and consume it if it exists
#[must_use]
pub fn parse_keyword(&mut self, expected: Keyword) -> bool {
Expand Down Expand Up @@ -3078,10 +3088,10 @@ impl Parser<'_> {
parallelism: value,
deferred,
}
} else if let Some(rate_limit) = self.parse_alter_streaming_rate_limit()? {
} else if let Some(rate_limit) = self.parse_alter_source_rate_limit(true)? {
AlterTableOperation::SetStreamingRateLimit { rate_limit }
} else {
return self.expected("SCHEMA/PARALLELISM/STREAMING_RATE_LIMIT after SET");
return self.expected("SCHEMA/PARALLELISM/SOURCE_RATE_LIMIT after SET");
}
} else if self.parse_keyword(Keyword::DROP) {
let _ = self.parse_keyword(Keyword::COLUMN);
Expand Down Expand Up @@ -3136,7 +3146,7 @@ impl Parser<'_> {
/// BACKFILL_RATE_LIMIT = default | NUMBER
/// BACKFILL_RATE_LIMIT TO default | NUMBER
pub fn parse_alter_backfill_rate_limit(&mut self) -> PResult<Option<i32>> {
if !self.parse_keyword(Keyword::BACKFILL_RATE_LIMIT) {
if !self.parse_word("BACKFILL_RATE_LIMIT") {
return Ok(None);
}
if self.expect_keyword(Keyword::TO).is_err() && self.expect_token(&Token::Eq).is_err() {
Expand All @@ -3155,14 +3165,15 @@ impl Parser<'_> {
Ok(Some(rate_limit))
}

/// STREAMING_RATE_LIMIT = default | NUMBER
/// STREAMING_RATE_LIMIT TO default | NUMBER
pub fn parse_alter_streaming_rate_limit(&mut self) -> PResult<Option<i32>> {
if !self.parse_keyword(Keyword::STREAMING_RATE_LIMIT) {
/// SOURCE_RATE_LIMIT = default | NUMBER
/// SOURCE_RATE_LIMIT TO default | NUMBER
pub fn parse_alter_source_rate_limit(&mut self, is_table: bool) -> PResult<Option<i32>> {
if !self.parse_word("SOURCE_RATE_LIMIT") {
return Ok(None);
}
if self.expect_keyword(Keyword::TO).is_err() && self.expect_token(&Token::Eq).is_err() {
return self.expected("TO or = after ALTER TABLE SET STREAMING_RATE_LIMIT");
let ddl = if is_table { "TABLE" } else { "SOURCE" };
return self.expected(&format!("TO or = after ALTER {ddl} SET SOURCE_RATE_LIMIT"));
}
let rate_limit = if self.parse_keyword(Keyword::DEFAULT) {
-1
Expand Down Expand Up @@ -3376,8 +3387,8 @@ impl Parser<'_> {
AlterSourceOperation::SetSchema {
new_schema_name: schema_name,
}
} else if let Some(rate_limit) = self.parse_alter_streaming_rate_limit()? {
AlterSourceOperation::SetStreamingRateLimit { rate_limit }
} else if let Some(rate_limit) = self.parse_alter_source_rate_limit(false)? {
AlterSourceOperation::SetSourceRateLimit { rate_limit }
} else {
return self.expected("SCHEMA after SET");
}
Expand Down

0 comments on commit 717e5dc

Please sign in to comment.