Skip to content

Commit

Permalink
stash
Browse files Browse the repository at this point in the history
Signed-off-by: tabversion <[email protected]>
  • Loading branch information
tabversion committed Jul 25, 2024
1 parent 6834de8 commit 18cf195
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 26 deletions.
79 changes: 79 additions & 0 deletions src/frontend/src/handler/alter_table_connector_attr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_connector::bail_invalid_option_error;
use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
use risingwave_sqlparser::ast::{ObjectName, Value, WithProperties};

use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
use crate::handler::{HandlerArgs, RwPgResponse};

const REQUIRE_CLEAN_STATE: &str = "clean_state";

pub fn handle_alter_table_connector_attr(
handler_args: HandlerArgs,
table_name: ObjectName,
mut new_attr: WithProperties,
) -> crate::error::Result<RwPgResponse> {
let session = handler_args.session;
let original_table = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;

if original_table.associated_source_id.is_none() {
bail_invalid_option_error!("Cannot alter a table without connector")
}
let clean_state_flag = check_attr(&mut new_attr)?;

return PgResponse::empty_result(StatementType::ALTER_TABLE);
}

fn check_attr(attr: &mut WithProperties) -> crate::error::Result<bool> {
// check for REQUIRE_CLEAN_STATE, should be contained in attr
let mut contain_clean_state_flag = false;
let mut clean_state_flag = false;

// cannot change connector
if attr.0.iter().any(|item| {
item.name
.real_value()
.eq_ignore_ascii_case(UPSTREAM_SOURCE_KEY)
}) {
bail_invalid_option_error!("cannot alter attribute {}", UPSTREAM_SOURCE_KEY);
}

for idx in 0..attr.0.len() {
if attr.0[idx]
.name
.real_value()
.eq_ignore_ascii_case(REQUIRE_CLEAN_STATE)
{
contain_clean_state_flag = true;
if let Value::Boolean(b) = &attr.0[idx].value {
clean_state_flag = *b
} else {
bail_invalid_option_error!("{} should be a boolean", REQUIRE_CLEAN_STATE);
}
}
attr.0.remove(idx);
break;
}
if !contain_clean_state_flag {
bail_invalid_option_error!(
"{} should be contained in the WITH clause",
REQUIRE_CLEAN_STATE
)
}

return Ok(clean_state_flag);
}
6 changes: 6 additions & 0 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use risingwave_sqlparser::ast::*;
use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
use crate::catalog::table_catalog::TableType;
use crate::error::{ErrorCode, Result};
use crate::handler::alter_table_connector_attr::handle_alter_table_connector_attr;
use crate::handler::cancel_job::handle_cancel;
use crate::handler::kill_process::handle_kill;
use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
Expand All @@ -48,6 +49,7 @@ mod alter_source_with_sr;
mod alter_streaming_rate_limit;
mod alter_system;
mod alter_table_column;
mod alter_table_connector_attr;
mod alter_table_with_sr;
pub mod alter_user;
pub mod cancel_job;
Expand Down Expand Up @@ -707,6 +709,10 @@ pub async fn handle(
)
.await
}
Statement::AlterTable {
name,
operation: AlterTableOperation::AlterConnectorAttr { attr },
} => handle_alter_table_connector_attr(handler_args, name, attr),
Statement::AlterIndex {
name,
operation: AlterIndexOperation::RenameIndex { index_name },
Expand Down
5 changes: 4 additions & 1 deletion src/sqlparser/src/ast/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use core::fmt;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

use super::ConnectorSchema;
use super::{ConnectorSchema, WithProperties};
use crate::ast::{
display_comma_separated, display_separated, DataType, Expr, Ident, ObjectName, SetVariableValue,
};
Expand Down Expand Up @@ -106,6 +106,9 @@ pub enum AlterTableOperation {
SetStreamingRateLimit {
rate_limit: i32,
},
AlterConnectorAttr {
attr: WithProperties,
},
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down
1 change: 1 addition & 0 deletions src/sqlparser/src/keywords.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ define_keywords!(
CONNECT,
CONNECTION,
CONNECTIONS,
CONNECTOR,
CONSTRAINT,
CONTAINS,
CONVERT,
Expand Down
60 changes: 35 additions & 25 deletions src/sqlparser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3094,34 +3094,44 @@ impl Parser<'_> {
cascade,
}
} else if self.parse_keyword(Keyword::ALTER) {
let _ = self.parse_keyword(Keyword::COLUMN);
let column_name = self.parse_identifier_non_reserved()?;

let op = if self.parse_keywords(&[Keyword::SET, Keyword::NOT, Keyword::NULL]) {
AlterColumnOperation::SetNotNull {}
} else if self.parse_keywords(&[Keyword::DROP, Keyword::NOT, Keyword::NULL]) {
AlterColumnOperation::DropNotNull {}
} else if self.parse_keywords(&[Keyword::SET, Keyword::DEFAULT]) {
AlterColumnOperation::SetDefault {
value: self.parse_expr()?,
}
} else if self.parse_keywords(&[Keyword::DROP, Keyword::DEFAULT]) {
AlterColumnOperation::DropDefault {}
} else if self.parse_keywords(&[Keyword::SET, Keyword::DATA, Keyword::TYPE])
|| (self.parse_keyword(Keyword::TYPE))
{
let data_type = self.parse_data_type()?;
let using = if self.parse_keyword(Keyword::USING) {
Some(self.parse_expr()?)
if self.parse_keyword(Keyword::COLUMN) {
// ALTER COLUMN
let column_name = self.parse_identifier_non_reserved()?;
let op = if self.parse_keywords(&[Keyword::SET, Keyword::NOT, Keyword::NULL]) {
AlterColumnOperation::SetNotNull {}
} else if self.parse_keywords(&[Keyword::DROP, Keyword::NOT, Keyword::NULL]) {
AlterColumnOperation::DropNotNull {}
} else if self.parse_keywords(&[Keyword::SET, Keyword::DEFAULT]) {
AlterColumnOperation::SetDefault {
value: self.parse_expr()?,
}
} else if self.parse_keywords(&[Keyword::DROP, Keyword::DEFAULT]) {
AlterColumnOperation::DropDefault {}
} else if self.parse_keywords(&[Keyword::SET, Keyword::DATA, Keyword::TYPE])
|| (self.parse_keyword(Keyword::TYPE))
{
let data_type = self.parse_data_type()?;
let using = if self.parse_keyword(Keyword::USING) {
Some(self.parse_expr()?)
} else {
None
};
AlterColumnOperation::SetDataType { data_type, using }
} else {
None
return self.expected(
"SET/DROP NOT NULL, SET DEFAULT, SET DATA TYPE after ALTER COLUMN",
);
};
AlterColumnOperation::SetDataType { data_type, using }
AlterTableOperation::AlterColumn { column_name, op }
} else if self.parse_keyword(Keyword::CONNECTOR) {
// ALTER CONNECTOR WITH (...)
let attr = self.parse_with_properties()?;
AlterTableOperation::AlterConnectorAttr {
attr: WithProperties(attr),
}
} else {
return self
.expected("SET/DROP NOT NULL, SET DEFAULT, SET DATA TYPE after ALTER COLUMN");
};
AlterTableOperation::AlterColumn { column_name, op }
return self.expected("COLUMN or CONNECTOR after ALTER");
}
} else if self.parse_keywords(&[Keyword::REFRESH, Keyword::SCHEMA]) {
AlterTableOperation::RefreshSchema
} else {
Expand Down

0 comments on commit 18cf195

Please sign in to comment.