Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(WIP): alter table connector attr #17815

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,16 @@ message ApplyThrottleResponse {
common.Status status = 1;
}

message AlterConnectorConfigRequest {
uint32 source_id = 1;
bool require_clean_state = 2;
map<string, string> config = 3;
}

message AlterConnectorConfigResponse {
common.Status status = 1;
}

message RecoverRequest {}

message RecoverResponse {}
Expand All @@ -301,6 +311,7 @@ service StreamManagerService {
rpc ListObjectDependencies(ListObjectDependenciesRequest) returns (ListObjectDependenciesResponse);
rpc ApplyThrottle(ApplyThrottleRequest) returns (ApplyThrottleResponse);
rpc Recover(RecoverRequest) returns (RecoverResponse);
rpc AlterConnectorConfig(AlterConnectorConfigRequest) returns (AlterConnectorConfigResponse);
}

// Below for cluster service.
Expand Down
86 changes: 86 additions & 0 deletions src/frontend/src/handler/alter_table_connector_attr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::{bail, bail_not_implemented};
use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
use risingwave_sqlparser::ast::{ObjectName, Value, WithProperties};

use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
use crate::handler::{HandlerArgs, RwPgResponse};

const REQUIRE_CLEAN_STATE: &str = "clean_state";

pub fn handle_alter_table_connector_attr(
handler_args: HandlerArgs,
table_name: ObjectName,
mut new_attr: WithProperties,
) -> crate::error::Result<RwPgResponse> {
let session = handler_args.session;
let original_table = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;

if original_table.associated_source_id.is_none() {
bail!("Cannot alter a table without connector")
}
let clean_state_flag = check_attr(&mut new_attr)?;

if clean_state_flag {
bail_not_implemented!(
"alter connector params requiring clean its state is not implemented yet, {} should be set to false", REQUIRE_CLEAN_STATE
);
}

Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
}

fn check_attr(attr: &mut WithProperties) -> crate::error::Result<bool> {
// check for REQUIRE_CLEAN_STATE, should be contained in attr
let mut contain_clean_state_flag = false;
let mut clean_state_flag = false;

// cannot change connector
if attr.0.iter().any(|item| {
item.name
.real_value()
.eq_ignore_ascii_case(UPSTREAM_SOURCE_KEY)
}) {
bail!("cannot alter attribute {}", UPSTREAM_SOURCE_KEY);
}

for idx in 0..attr.0.len() {
if attr.0[idx]
.name
.real_value()
.eq_ignore_ascii_case(REQUIRE_CLEAN_STATE)
{
contain_clean_state_flag = true;
if let Value::Boolean(b) = &attr.0[idx].value {
clean_state_flag = *b
} else {
bail!("{} should be a boolean", REQUIRE_CLEAN_STATE);
}

attr.0.remove(idx);
break;
}
}
if !contain_clean_state_flag {
bail!(
"{} should be contained in the WITH clause",
REQUIRE_CLEAN_STATE
)
}

Ok(clean_state_flag)
}
6 changes: 6 additions & 0 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use risingwave_sqlparser::ast::*;
use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
use crate::catalog::table_catalog::TableType;
use crate::error::{ErrorCode, Result};
use crate::handler::alter_table_connector_attr::handle_alter_table_connector_attr;
use crate::handler::cancel_job::handle_cancel;
use crate::handler::kill_process::handle_kill;
use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
Expand All @@ -48,6 +49,7 @@ mod alter_source_with_sr;
mod alter_streaming_rate_limit;
mod alter_system;
mod alter_table_column;
mod alter_table_connector_attr;
mod alter_table_with_sr;
pub mod alter_user;
pub mod cancel_job;
Expand Down Expand Up @@ -707,6 +709,10 @@ pub async fn handle(
)
.await
}
Statement::AlterTable {
name,
operation: AlterTableOperation::AlterConnectorConfig { config: attr },
} => handle_alter_table_connector_attr(handler_args, name, attr),
Statement::AlterIndex {
name,
operation: AlterIndexOperation::RenameIndex { index_name },
Expand Down
6 changes: 6 additions & 0 deletions src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,4 +423,10 @@ impl StreamManagerService for StreamServiceImpl {
.await;
Ok(Response::new(RecoverResponse {}))
}

async fn alter_connector_config(
&self,
request: Request<AlterConnectorConfigRequest>,
) -> Result<Response<AlterConnectorConfigResponse>, Status> {
}
}
8 changes: 7 additions & 1 deletion src/sqlparser/src/ast/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use core::fmt;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

use super::ConnectorSchema;
use super::{ConnectorSchema, WithProperties};
use crate::ast::{
display_comma_separated, display_separated, DataType, Expr, Ident, ObjectName, SetVariableValue,
};
Expand Down Expand Up @@ -106,6 +106,9 @@ pub enum AlterTableOperation {
SetStreamingRateLimit {
rate_limit: i32,
},
AlterConnectorConfig {
config: WithProperties,
},
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -293,6 +296,9 @@ impl fmt::Display for AlterTableOperation {
AlterTableOperation::SetStreamingRateLimit { rate_limit } => {
write!(f, "SET STREAMING_RATE_LIMIT TO {}", rate_limit)
}
AlterTableOperation::AlterConnectorConfig { config } => {
write!(f, " CONNECTOR CONFIG {}", config)
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/sqlparser/src/keywords.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ define_keywords!(
CONNECT,
CONNECTION,
CONNECTIONS,
CONNECTOR,
CONSTRAINT,
CONTAINS,
CONVERT,
Expand Down
59 changes: 34 additions & 25 deletions src/sqlparser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3094,34 +3094,43 @@ impl Parser<'_> {
cascade,
}
} else if self.parse_keyword(Keyword::ALTER) {
let _ = self.parse_keyword(Keyword::COLUMN);
let column_name = self.parse_identifier_non_reserved()?;

let op = if self.parse_keywords(&[Keyword::SET, Keyword::NOT, Keyword::NULL]) {
AlterColumnOperation::SetNotNull {}
} else if self.parse_keywords(&[Keyword::DROP, Keyword::NOT, Keyword::NULL]) {
AlterColumnOperation::DropNotNull {}
} else if self.parse_keywords(&[Keyword::SET, Keyword::DEFAULT]) {
AlterColumnOperation::SetDefault {
value: self.parse_expr()?,
if self.parse_keyword(Keyword::CONNECTOR) {
// ALTER CONNECTOR WITH (...)
let config = self.parse_with_properties()?;
AlterTableOperation::AlterConnectorConfig {
config: WithProperties(config),
}
} else if self.parse_keywords(&[Keyword::DROP, Keyword::DEFAULT]) {
AlterColumnOperation::DropDefault {}
} else if self.parse_keywords(&[Keyword::SET, Keyword::DATA, Keyword::TYPE])
|| (self.parse_keyword(Keyword::TYPE))
{
let data_type = self.parse_data_type()?;
let using = if self.parse_keyword(Keyword::USING) {
Some(self.parse_expr()?)
} else {
// ALTER [ COLUMN ] column_name SET/DROP NOT NULL
let _ = self.parse_keyword(Keyword::COLUMN);
let column_name = self.parse_identifier_non_reserved()?;
let op = if self.parse_keywords(&[Keyword::SET, Keyword::NOT, Keyword::NULL]) {
AlterColumnOperation::SetNotNull {}
} else if self.parse_keywords(&[Keyword::DROP, Keyword::NOT, Keyword::NULL]) {
AlterColumnOperation::DropNotNull {}
} else if self.parse_keywords(&[Keyword::SET, Keyword::DEFAULT]) {
AlterColumnOperation::SetDefault {
value: self.parse_expr()?,
}
} else if self.parse_keywords(&[Keyword::DROP, Keyword::DEFAULT]) {
AlterColumnOperation::DropDefault {}
} else if self.parse_keywords(&[Keyword::SET, Keyword::DATA, Keyword::TYPE])
|| (self.parse_keyword(Keyword::TYPE))
{
let data_type = self.parse_data_type()?;
let using = if self.parse_keyword(Keyword::USING) {
Some(self.parse_expr()?)
} else {
None
};
AlterColumnOperation::SetDataType { data_type, using }
} else {
None
return self.expected(
"SET/DROP NOT NULL, SET DEFAULT, SET DATA TYPE after ALTER COLUMN",
);
};
AlterColumnOperation::SetDataType { data_type, using }
} else {
return self
.expected("SET/DROP NOT NULL, SET DEFAULT, SET DATA TYPE after ALTER COLUMN");
};
AlterTableOperation::AlterColumn { column_name, op }
AlterTableOperation::AlterColumn { column_name, op }
}
} else if self.parse_keywords(&[Keyword::REFRESH, Keyword::SCHEMA]) {
AlterTableOperation::RefreshSchema
} else {
Expand Down
Loading