From 18cf195d13e70812e8dbaff7e861543284f6dbe9 Mon Sep 17 00:00:00 2001 From: tabversion Date: Thu, 25 Jul 2024 17:00:46 +0800 Subject: [PATCH 1/2] stash Signed-off-by: tabversion --- .../src/handler/alter_table_connector_attr.rs | 79 +++++++++++++++++++ src/frontend/src/handler/mod.rs | 6 ++ src/sqlparser/src/ast/ddl.rs | 5 +- src/sqlparser/src/keywords.rs | 1 + src/sqlparser/src/parser.rs | 60 ++++++++------ 5 files changed, 125 insertions(+), 26 deletions(-) create mode 100644 src/frontend/src/handler/alter_table_connector_attr.rs diff --git a/src/frontend/src/handler/alter_table_connector_attr.rs b/src/frontend/src/handler/alter_table_connector_attr.rs new file mode 100644 index 0000000000000..3ee19169c6805 --- /dev/null +++ b/src/frontend/src/handler/alter_table_connector_attr.rs @@ -0,0 +1,79 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use pgwire::pg_response::{PgResponse, StatementType}; +use risingwave_connector::bail_invalid_option_error; +use risingwave_connector::source::UPSTREAM_SOURCE_KEY; +use risingwave_sqlparser::ast::{ObjectName, Value, WithProperties}; + +use crate::handler::alter_table_column::fetch_table_catalog_for_alter; +use crate::handler::{HandlerArgs, RwPgResponse}; + +const REQUIRE_CLEAN_STATE: &str = "clean_state"; + +pub fn handle_alter_table_connector_attr( + handler_args: HandlerArgs, + table_name: ObjectName, + mut new_attr: WithProperties, +) -> crate::error::Result { + let session = handler_args.session; + let original_table = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?; + + if original_table.associated_source_id.is_none() { + bail_invalid_option_error!("Cannot alter a table without connector") + } + let clean_state_flag = check_attr(&mut new_attr)?; + + return PgResponse::empty_result(StatementType::ALTER_TABLE); +} + +fn check_attr(attr: &mut WithProperties) -> crate::error::Result { + // check for REQUIRE_CLEAN_STATE, should be contained in attr + let mut contain_clean_state_flag = false; + let mut clean_state_flag = false; + + // cannot change connector + if attr.0.iter().any(|item| { + item.name + .real_value() + .eq_ignore_ascii_case(UPSTREAM_SOURCE_KEY) + }) { + bail_invalid_option_error!("cannot alter attribute {}", UPSTREAM_SOURCE_KEY); + } + + for idx in 0..attr.0.len() { + if attr.0[idx] + .name + .real_value() + .eq_ignore_ascii_case(REQUIRE_CLEAN_STATE) + { + contain_clean_state_flag = true; + if let Value::Boolean(b) = &attr.0[idx].value { + clean_state_flag = *b + } else { + bail_invalid_option_error!("{} should be a boolean", REQUIRE_CLEAN_STATE); + } + } + attr.0.remove(idx); + break; + } + if !contain_clean_state_flag { + bail_invalid_option_error!( + "{} should be contained in the WITH clause", + REQUIRE_CLEAN_STATE + ) + } + + return Ok(clean_state_flag); +} diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index f8beeedb19438..5ace00bbb46f2 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -33,6 +33,7 @@ use risingwave_sqlparser::ast::*; use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt}; use crate::catalog::table_catalog::TableType; use crate::error::{ErrorCode, Result}; +use crate::handler::alter_table_connector_attr::handle_alter_table_connector_attr; use crate::handler::cancel_job::handle_cancel; use crate::handler::kill_process::handle_kill; use crate::scheduler::{DistributedQueryStream, LocalQueryStream}; @@ -48,6 +49,7 @@ mod alter_source_with_sr; mod alter_streaming_rate_limit; mod alter_system; mod alter_table_column; +mod alter_table_connector_attr; mod alter_table_with_sr; pub mod alter_user; pub mod cancel_job; @@ -707,6 +709,10 @@ pub async fn handle( ) .await } + Statement::AlterTable { + name, + operation: AlterTableOperation::AlterConnectorAttr { attr }, + } => handle_alter_table_connector_attr(handler_args, name, attr), Statement::AlterIndex { name, operation: AlterIndexOperation::RenameIndex { index_name }, diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index 92683f42e742a..cf21b83abaee4 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -20,7 +20,7 @@ use core::fmt; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; -use super::ConnectorSchema; +use super::{ConnectorSchema, WithProperties}; use crate::ast::{ display_comma_separated, display_separated, DataType, Expr, Ident, ObjectName, SetVariableValue, }; @@ -106,6 +106,9 @@ pub enum AlterTableOperation { SetStreamingRateLimit { rate_limit: i32, }, + AlterConnectorAttr { + attr: WithProperties, + }, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index 561b602298f11..ec58b939c63a9 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -146,6 +146,7 @@ define_keywords!( CONNECT, CONNECTION, CONNECTIONS, + CONNECTOR, CONSTRAINT, CONTAINS, CONVERT, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 449db7864f2b2..9dd05d181ddd9 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -3094,34 +3094,44 @@ impl Parser<'_> { cascade, } } else if self.parse_keyword(Keyword::ALTER) { - let _ = self.parse_keyword(Keyword::COLUMN); - let column_name = self.parse_identifier_non_reserved()?; - - let op = if self.parse_keywords(&[Keyword::SET, Keyword::NOT, Keyword::NULL]) { - AlterColumnOperation::SetNotNull {} - } else if self.parse_keywords(&[Keyword::DROP, Keyword::NOT, Keyword::NULL]) { - AlterColumnOperation::DropNotNull {} - } else if self.parse_keywords(&[Keyword::SET, Keyword::DEFAULT]) { - AlterColumnOperation::SetDefault { - value: self.parse_expr()?, - } - } else if self.parse_keywords(&[Keyword::DROP, Keyword::DEFAULT]) { - AlterColumnOperation::DropDefault {} - } else if self.parse_keywords(&[Keyword::SET, Keyword::DATA, Keyword::TYPE]) - || (self.parse_keyword(Keyword::TYPE)) - { - let data_type = self.parse_data_type()?; - let using = if self.parse_keyword(Keyword::USING) { - Some(self.parse_expr()?) + if self.parse_keyword(Keyword::COLUMN) { + // ALTER COLUMN + let column_name = self.parse_identifier_non_reserved()?; + let op = if self.parse_keywords(&[Keyword::SET, Keyword::NOT, Keyword::NULL]) { + AlterColumnOperation::SetNotNull {} + } else if self.parse_keywords(&[Keyword::DROP, Keyword::NOT, Keyword::NULL]) { + AlterColumnOperation::DropNotNull {} + } else if self.parse_keywords(&[Keyword::SET, Keyword::DEFAULT]) { + AlterColumnOperation::SetDefault { + value: self.parse_expr()?, + } + } else if self.parse_keywords(&[Keyword::DROP, Keyword::DEFAULT]) { + AlterColumnOperation::DropDefault {} + } else if self.parse_keywords(&[Keyword::SET, Keyword::DATA, Keyword::TYPE]) + || (self.parse_keyword(Keyword::TYPE)) + { + let data_type = self.parse_data_type()?; + let using = if self.parse_keyword(Keyword::USING) { + Some(self.parse_expr()?) + } else { + None + }; + AlterColumnOperation::SetDataType { data_type, using } } else { - None + return self.expected( + "SET/DROP NOT NULL, SET DEFAULT, SET DATA TYPE after ALTER COLUMN", + ); }; - AlterColumnOperation::SetDataType { data_type, using } + AlterTableOperation::AlterColumn { column_name, op } + } else if self.parse_keyword(Keyword::CONNECTOR) { + // ALTER CONNECTOR WITH (...) + let attr = self.parse_with_properties()?; + AlterTableOperation::AlterConnectorAttr { + attr: WithProperties(attr), + } } else { - return self - .expected("SET/DROP NOT NULL, SET DEFAULT, SET DATA TYPE after ALTER COLUMN"); - }; - AlterTableOperation::AlterColumn { column_name, op } + return self.expected("COLUMN or CONNECTOR after ALTER"); + } } else if self.parse_keywords(&[Keyword::REFRESH, Keyword::SCHEMA]) { AlterTableOperation::RefreshSchema } else { From 316b0355fe4d9e2069d172669e0ff44544982d6f Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 26 Jul 2024 16:45:42 +0800 Subject: [PATCH 2/2] stash Signed-off-by: tabVersion --- proto/meta.proto | 11 ++++++++ .../src/handler/alter_table_connector_attr.rs | 25 ++++++++++++------- src/frontend/src/handler/mod.rs | 2 +- src/meta/service/src/stream_service.rs | 6 +++++ src/sqlparser/src/ast/ddl.rs | 7 ++++-- src/sqlparser/src/parser.rs | 19 +++++++------- 6 files changed, 48 insertions(+), 22 deletions(-) diff --git a/proto/meta.proto b/proto/meta.proto index 2023ad432a438..d5bbdf2b87fa1 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -285,6 +285,16 @@ message ApplyThrottleResponse { common.Status status = 1; } +message AlterConnectorConfigRequest { + uint32 source_id = 1; + bool require_clean_state = 2; + map config = 3; +} + +message AlterConnectorConfigResponse { + common.Status status = 1; +} + message RecoverRequest {} message RecoverResponse {} @@ -301,6 +311,7 @@ service StreamManagerService { rpc ListObjectDependencies(ListObjectDependenciesRequest) returns (ListObjectDependenciesResponse); rpc ApplyThrottle(ApplyThrottleRequest) returns (ApplyThrottleResponse); rpc Recover(RecoverRequest) returns (RecoverResponse); + rpc AlterConnectorConfig(AlterConnectorConfigRequest) returns (AlterConnectorConfigResponse); } // Below for cluster service. diff --git a/src/frontend/src/handler/alter_table_connector_attr.rs b/src/frontend/src/handler/alter_table_connector_attr.rs index 3ee19169c6805..cce8f49a2e5c3 100644 --- a/src/frontend/src/handler/alter_table_connector_attr.rs +++ b/src/frontend/src/handler/alter_table_connector_attr.rs @@ -13,7 +13,7 @@ // limitations under the License. use pgwire::pg_response::{PgResponse, StatementType}; -use risingwave_connector::bail_invalid_option_error; +use risingwave_common::{bail, bail_not_implemented}; use risingwave_connector::source::UPSTREAM_SOURCE_KEY; use risingwave_sqlparser::ast::{ObjectName, Value, WithProperties}; @@ -31,11 +31,17 @@ pub fn handle_alter_table_connector_attr( let original_table = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?; if original_table.associated_source_id.is_none() { - bail_invalid_option_error!("Cannot alter a table without connector") + bail!("Cannot alter a table without connector") } let clean_state_flag = check_attr(&mut new_attr)?; - return PgResponse::empty_result(StatementType::ALTER_TABLE); + if clean_state_flag { + bail_not_implemented!( + "alter connector params requiring clean its state is not implemented yet, {} should be set to false", REQUIRE_CLEAN_STATE + ); + } + + Ok(PgResponse::empty_result(StatementType::ALTER_TABLE)) } fn check_attr(attr: &mut WithProperties) -> crate::error::Result { @@ -49,7 +55,7 @@ fn check_attr(attr: &mut WithProperties) -> crate::error::Result { .real_value() .eq_ignore_ascii_case(UPSTREAM_SOURCE_KEY) }) { - bail_invalid_option_error!("cannot alter attribute {}", UPSTREAM_SOURCE_KEY); + bail!("cannot alter attribute {}", UPSTREAM_SOURCE_KEY); } for idx in 0..attr.0.len() { @@ -62,18 +68,19 @@ fn check_attr(attr: &mut WithProperties) -> crate::error::Result { if let Value::Boolean(b) = &attr.0[idx].value { clean_state_flag = *b } else { - bail_invalid_option_error!("{} should be a boolean", REQUIRE_CLEAN_STATE); + bail!("{} should be a boolean", REQUIRE_CLEAN_STATE); } + + attr.0.remove(idx); + break; } - attr.0.remove(idx); - break; } if !contain_clean_state_flag { - bail_invalid_option_error!( + bail!( "{} should be contained in the WITH clause", REQUIRE_CLEAN_STATE ) } - return Ok(clean_state_flag); + Ok(clean_state_flag) } diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 5ace00bbb46f2..a1289f5b6d232 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -711,7 +711,7 @@ pub async fn handle( } Statement::AlterTable { name, - operation: AlterTableOperation::AlterConnectorAttr { attr }, + operation: AlterTableOperation::AlterConnectorConfig { config: attr }, } => handle_alter_table_connector_attr(handler_args, name, attr), Statement::AlterIndex { name, diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index d50a088972eeb..f05fdf63b90b0 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -423,4 +423,10 @@ impl StreamManagerService for StreamServiceImpl { .await; Ok(Response::new(RecoverResponse {})) } + + async fn alter_connector_config( + &self, + request: Request, + ) -> Result, Status> { + } } diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index cf21b83abaee4..6414b9ff8d38f 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -106,8 +106,8 @@ pub enum AlterTableOperation { SetStreamingRateLimit { rate_limit: i32, }, - AlterConnectorAttr { - attr: WithProperties, + AlterConnectorConfig { + config: WithProperties, }, } @@ -296,6 +296,9 @@ impl fmt::Display for AlterTableOperation { AlterTableOperation::SetStreamingRateLimit { rate_limit } => { write!(f, "SET STREAMING_RATE_LIMIT TO {}", rate_limit) } + AlterTableOperation::AlterConnectorConfig { config } => { + write!(f, " CONNECTOR CONFIG {}", config) + } } } } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 9dd05d181ddd9..2025343b36a18 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -3094,8 +3094,15 @@ impl Parser<'_> { cascade, } } else if self.parse_keyword(Keyword::ALTER) { - if self.parse_keyword(Keyword::COLUMN) { - // ALTER COLUMN + if self.parse_keyword(Keyword::CONNECTOR) { + // ALTER CONNECTOR WITH (...) + let config = self.parse_with_properties()?; + AlterTableOperation::AlterConnectorConfig { + config: WithProperties(config), + } + } else { + // ALTER [ COLUMN ] column_name SET/DROP NOT NULL + let _ = self.parse_keyword(Keyword::COLUMN); let column_name = self.parse_identifier_non_reserved()?; let op = if self.parse_keywords(&[Keyword::SET, Keyword::NOT, Keyword::NULL]) { AlterColumnOperation::SetNotNull {} @@ -3123,14 +3130,6 @@ impl Parser<'_> { ); }; AlterTableOperation::AlterColumn { column_name, op } - } else if self.parse_keyword(Keyword::CONNECTOR) { - // ALTER CONNECTOR WITH (...) - let attr = self.parse_with_properties()?; - AlterTableOperation::AlterConnectorAttr { - attr: WithProperties(attr), - } - } else { - return self.expected("COLUMN or CONNECTOR after ALTER"); } } else if self.parse_keywords(&[Keyword::REFRESH, Keyword::SCHEMA]) { AlterTableOperation::RefreshSchema