diff --git a/ci/scripts/run-e2e-test.sh b/ci/scripts/run-e2e-test.sh index b6b84d029aac0..6bb08796cf5e1 100755 --- a/ci/scripts/run-e2e-test.sh +++ b/ci/scripts/run-e2e-test.sh @@ -86,6 +86,7 @@ sleep 1 sqllogictest -p 4566 -d dev './e2e_test/udf/udf.slt' pkill python3 +sqllogictest -p 4566 -d dev './e2e_test/udf/alter_function.slt' sqllogictest -p 4566 -d dev './e2e_test/udf/graceful_shutdown_python.slt' echo "--- e2e, $mode, java udf" diff --git a/e2e_test/ddl/alter_set_schema.slt b/e2e_test/ddl/alter_set_schema.slt new file mode 100644 index 0000000000000..8206449a8b727 --- /dev/null +++ b/e2e_test/ddl/alter_set_schema.slt @@ -0,0 +1,107 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +CREATE SCHEMA test_schema; + +statement ok +CREATE TABLE test_table (u INT, v INT) WITH ( + connector = 'datagen', + fields.u.kind = 'sequence', + fields.u.start = '1', + fields.u.end = '10', + fields.v.kind = 'sequence', + fields.v.start = '1', + fields.v.end = '10', + datagen.rows.per.second='15', + datagen.split.num = '1' +) FORMAT PLAIN ENCODE JSON; + +statement ok +CREATE INDEX test_index1 ON test_table(u); + +statement ok +CREATE INDEX test_index2 ON test_table(v); + +statement ok +ALTER TABLE test_table SET SCHEMA public; + +statement ok +ALTER TABLE test_table SET SCHEMA test_schema; + +query TT +SELECT tablename, schemaname FROM pg_tables WHERE schemaname = 'test_schema'; +---- +test_table test_schema + +query TT rowsort +SELECT indexname, schemaname FROM pg_indexes WHERE schemaname = 'test_schema'; +---- +test_index1 test_schema +test_index2 test_schema + +statement ok +CREATE SOURCE test_source (v INT) WITH ( + connector = 'datagen', + fields.v.kind = 'sequence', + fields.v.start = '1', + fields.v.end = '10', + datagen.rows.per.second='15', + datagen.split.num = '1' +) FORMAT PLAIN ENCODE JSON; + +statement ok +ALTER SOURCE test_source SET SCHEMA test_schema; + +query TT +SELECT name AS sourcename, nspname AS schemaname +FROM rw_sources +JOIN pg_namespace ON pg_namespace.oid = rw_sources.schema_id +WHERE nspname = 'test_schema'; +---- +test_source test_schema + +statement ok +CREATE SINK test_sink AS SELECT u FROM test_schema.test_table WITH ( + connector = 'blackhole' +); + +statement ok +ALTER SINK test_sink SET SCHEMA test_schema; + +query TT +SELECT name AS sinkname, nspname AS schemaname +FROM rw_sinks +JOIN pg_namespace ON pg_namespace.oid = rw_sinks.schema_id +WHERE nspname = 'test_schema'; +---- +test_sink test_schema + +statement ok +CREATE CONNECTION test_conn WITH (type = 'privatelink', provider = 'mock'); + +statement ok +ALTER CONNECTION test_conn SET SCHEMA test_schema; + +query TT +SELECT name AS connname, nspname AS schemaname +FROM rw_connections +JOIN pg_namespace ON pg_namespace.oid = rw_connections.schema_id +WHERE nspname = 'test_schema'; +---- +test_conn test_schema + +statement ok +DROP CONNECTION test_schema.test_conn; + +statement ok +DROP SINK test_schema.test_sink; + +statement ok +DROP SOURCE test_schema.test_source; + +statement ok +DROP TABLE test_schema.test_table; + +statement ok +DROP SCHEMA test_schema; diff --git a/e2e_test/udf/alter_function.slt b/e2e_test/udf/alter_function.slt new file mode 100644 index 0000000000000..9e650500e0136 --- /dev/null +++ b/e2e_test/udf/alter_function.slt @@ -0,0 +1,45 @@ +system ok +python3 e2e_test/udf/test.py & + +# wait for server to start +sleep 1s + +# Test `ALTER FUNCTION xxx SET SCHEMA xxx` +statement ok +CREATE SCHEMA test_schema; + +statement ok +CREATE FUNCTION test_func() RETURNS INT LANGUAGE python as int_42 using link 'http://localhost:8815'; + +statement ok +CREATE FUNCTION test_func(INT) RETURNS INT LANGUAGE python as sleep using link 'http://localhost:8815'; + +statement error +ALTER FUNCTION test_func SET SCHEMA test_schema; + +statement ok +ALTER FUNCTION test_func() SET SCHEMA test_schema; + +statement ok +ALTER FUNCTION test_func(INT) SET SCHEMA test_schema; + +query TT +SELECT name AS funcname, nspname AS schemaname +FROM rw_functions +JOIN pg_namespace ON pg_namespace.oid = rw_functions.schema_id +WHERE nspname = 'test_schema'; +---- +test_func test_schema +test_func test_schema + +statement ok +DROP FUNCTION test_schema.test_func(); + +statement ok +DROP FUNCTION test_schema.test_func(INT); + +statement ok +DROP SCHEMA test_schema; + +system ok +pkill python diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 7cc76234acdb6..759f8e0f91217 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -194,6 +194,23 @@ message AlterOwnerRequest { uint32 owner_id = 20; } +message AlterSetSchemaRequest { + oneof object { + uint32 table_id = 1; + uint32 view_id = 2; + uint32 source_id = 3; + uint32 sink_id = 4; + uint32 function_id = 5; + uint32 connection_id = 6; + } + uint32 new_schema_id = 20; +} + +message AlterSetSchemaResponse { + common.Status status = 1; + uint64 version = 2; +} + message AlterOwnerResponse { common.Status status = 1; uint64 version = 2; @@ -373,6 +390,7 @@ service DdlService { rpc AlterRelationName(AlterRelationNameRequest) returns (AlterRelationNameResponse); rpc AlterSource(AlterSourceRequest) returns (AlterSourceResponse); rpc AlterOwner(AlterOwnerRequest) returns (AlterOwnerResponse); + rpc AlterSetSchema(AlterSetSchemaRequest) returns (AlterSetSchemaResponse); rpc DropTable(DropTableRequest) returns (DropTableResponse); rpc RisectlListStateTables(RisectlListStateTablesRequest) returns (RisectlListStateTablesResponse); rpc CreateView(CreateViewRequest) returns (CreateViewResponse); diff --git a/proto/user.proto b/proto/user.proto index c998f66d15133..69661d46f0db3 100644 --- a/proto/user.proto +++ b/proto/user.proto @@ -44,6 +44,7 @@ message GrantPrivilege { CREATE = 5; CONNECT = 6; USAGE = 7; + EXECUTE = 8; } message ActionWithGrantOption { diff --git a/src/common/src/acl/mod.rs b/src/common/src/acl/mod.rs index 929577437571d..7acb3d0c274b1 100644 --- a/src/common/src/acl/mod.rs +++ b/src/common/src/acl/mod.rs @@ -69,6 +69,7 @@ impl From for AclMode { PbAction::Create => AclMode::Create, PbAction::Connect => AclMode::Connect, PbAction::Usage => AclMode::Usage, + PbAction::Execute => AclMode::Execute, } } } @@ -83,6 +84,7 @@ impl From for PbAction { AclMode::Create => PbAction::Create, AclMode::Connect => PbAction::Connect, AclMode::Usage => PbAction::Usage, + AclMode::Execute => PbAction::Execute, _ => unreachable!(), } } diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index afb7ecad6c2d8..a7ef67c46d962 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -26,7 +26,9 @@ use risingwave_pb::catalog::{ }; use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::alter_relation_name_request::Relation; -use risingwave_pb::ddl_service::{create_connection_request, PbTableJobType}; +use risingwave_pb::ddl_service::{ + alter_set_schema_request, create_connection_request, PbTableJobType, +}; use risingwave_pb::stream_plan::StreamFragmentGraph; use risingwave_rpc_client::MetaClient; use tokio::sync::watch::Receiver; @@ -158,6 +160,12 @@ pub trait CatalogWriter: Send + Sync { async fn alter_source_name(&self, source_id: u32, source_name: &str) -> Result<()>; async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()>; + + async fn alter_set_schema( + &self, + object: alter_set_schema_request::Object, + new_schema_id: u32, + ) -> Result<()>; } #[derive(Clone)] @@ -421,6 +429,18 @@ impl CatalogWriter for CatalogWriterImpl { let version = self.meta_client.alter_owner(object, owner_id).await?; self.wait_version(version).await } + + async fn alter_set_schema( + &self, + object: alter_set_schema_request::Object, + new_schema_id: u32, + ) -> Result<()> { + let version = self + .meta_client + .alter_set_schema(object, new_schema_id) + .await?; + self.wait_version(version).await + } } impl CatalogWriterImpl { diff --git a/src/frontend/src/catalog/database_catalog.rs b/src/frontend/src/catalog/database_catalog.rs index 17ae8725bc80b..d2de6d935df5d 100644 --- a/src/frontend/src/catalog/database_catalog.rs +++ b/src/frontend/src/catalog/database_catalog.rs @@ -75,6 +75,10 @@ impl DatabaseCatalog { self.schema_by_name.values() } + pub fn iter_schemas_mut(&mut self) -> impl Iterator { + self.schema_by_name.values_mut() + } + pub fn get_schema_by_name(&self, name: &str) -> Option<&SchemaCatalog> { self.schema_by_name.get(name) } diff --git a/src/frontend/src/catalog/root_catalog.rs b/src/frontend/src/catalog/root_catalog.rs index fde63498709b6..bd1a4815023df 100644 --- a/src/frontend/src/catalog/root_catalog.rs +++ b/src/frontend/src/catalog/root_catalog.rs @@ -231,6 +231,25 @@ impl Catalog { .drop_connection(connection_id); } + pub fn update_connection(&mut self, proto: &PbConnection) { + let database = self.get_database_mut(proto.database_id).unwrap(); + let schema = database.get_schema_mut(proto.schema_id).unwrap(); + if schema.get_connection_by_id(&proto.id).is_some() { + schema.update_connection(proto); + } else { + // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement. + schema.create_connection(proto); + database + .iter_schemas_mut() + .find(|schema| { + schema.id() != proto.schema_id + && schema.get_connection_by_id(&proto.id).is_some() + }) + .unwrap() + .drop_connection(proto.id); + } + } + pub fn drop_database(&mut self, db_id: DatabaseId) { let name = self.db_name_by_id.remove(&db_id).unwrap(); let database = self.database_by_name.remove(&name).unwrap(); @@ -253,12 +272,24 @@ impl Catalog { } pub fn update_table(&mut self, proto: &PbTable) { - let table = self - .get_database_mut(proto.database_id) - .unwrap() - .get_schema_mut(proto.schema_id) - .unwrap() - .update_table(proto); + let database = self.get_database_mut(proto.database_id).unwrap(); + let schema = database.get_schema_mut(proto.schema_id).unwrap(); + let table = if schema.get_table_by_id(&proto.id.into()).is_some() { + schema.update_table(proto) + } else { + // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement. + let new_table = schema.create_table(proto); + database + .iter_schemas_mut() + .find(|schema| { + schema.id() != proto.schema_id + && schema.get_table_by_id(&proto.id.into()).is_some() + }) + .unwrap() + .drop_table(proto.id.into()); + new_table + }; + self.table_by_id.insert(proto.id.into(), table); } @@ -275,11 +306,22 @@ impl Catalog { } pub fn update_index(&mut self, proto: &PbIndex) { - self.get_database_mut(proto.database_id) - .unwrap() - .get_schema_mut(proto.schema_id) - .unwrap() - .update_index(proto); + let database = self.get_database_mut(proto.database_id).unwrap(); + let schema = database.get_schema_mut(proto.schema_id).unwrap(); + if schema.get_index_by_id(&proto.id.into()).is_some() { + schema.update_index(proto); + } else { + // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement. + schema.create_index(proto); + database + .iter_schemas_mut() + .find(|schema| { + schema.id() != proto.schema_id + && schema.get_index_by_id(&proto.id.into()).is_some() + }) + .unwrap() + .drop_index(proto.id.into()); + } } pub fn drop_source(&mut self, db_id: DatabaseId, schema_id: SchemaId, source_id: SourceId) { @@ -291,11 +333,21 @@ impl Catalog { } pub fn update_source(&mut self, proto: &PbSource) { - self.get_database_mut(proto.database_id) - .unwrap() - .get_schema_mut(proto.schema_id) - .unwrap() - .update_source(proto); + let database = self.get_database_mut(proto.database_id).unwrap(); + let schema = database.get_schema_mut(proto.schema_id).unwrap(); + if schema.get_source_by_id(&proto.id).is_some() { + schema.update_source(proto); + } else { + // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement. + schema.create_source(proto); + database + .iter_schemas_mut() + .find(|schema| { + schema.id() != proto.schema_id && schema.get_source_by_id(&proto.id).is_some() + }) + .unwrap() + .drop_source(proto.id); + } } pub fn drop_sink(&mut self, db_id: DatabaseId, schema_id: SchemaId, sink_id: SinkId) { @@ -307,11 +359,21 @@ impl Catalog { } pub fn update_sink(&mut self, proto: &PbSink) { - self.get_database_mut(proto.database_id) - .unwrap() - .get_schema_mut(proto.schema_id) - .unwrap() - .update_sink(proto); + let database = self.get_database_mut(proto.database_id).unwrap(); + let schema = database.get_schema_mut(proto.schema_id).unwrap(); + if schema.get_sink_by_id(&proto.id).is_some() { + schema.update_sink(proto); + } else { + // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement. + schema.create_sink(proto); + database + .iter_schemas_mut() + .find(|schema| { + schema.id() != proto.schema_id && schema.get_sink_by_id(&proto.id).is_some() + }) + .unwrap() + .drop_sink(proto.id); + } } pub fn drop_index(&mut self, db_id: DatabaseId, schema_id: SchemaId, index_id: IndexId) { @@ -331,11 +393,21 @@ impl Catalog { } pub fn update_view(&mut self, proto: &PbView) { - self.get_database_mut(proto.database_id) - .unwrap() - .get_schema_mut(proto.schema_id) - .unwrap() - .update_view(proto); + let database = self.get_database_mut(proto.database_id).unwrap(); + let schema = database.get_schema_mut(proto.schema_id).unwrap(); + if schema.get_view_by_id(&proto.id).is_some() { + schema.update_view(proto); + } else { + // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement. + schema.create_view(proto); + database + .iter_schemas_mut() + .find(|schema| { + schema.id() != proto.schema_id && schema.get_view_by_id(&proto.id).is_some() + }) + .unwrap() + .drop_view(proto.id); + } } pub fn drop_function( @@ -351,6 +423,31 @@ impl Catalog { .drop_function(function_id); } + pub fn update_function(&mut self, proto: &PbFunction) { + let database = self.get_database_mut(proto.database_id).unwrap(); + let schema = database.get_schema_mut(proto.schema_id).unwrap(); + if schema.get_function_by_id(proto.id.into()).is_some() { + schema.update_function(proto); + } else { + // Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement. + schema.create_function(proto); + database + .iter_schemas_mut() + .find(|schema| { + schema.id() != proto.schema_id + && schema.get_function_by_id(proto.id.into()).is_some() + }) + .unwrap() + .drop_function(proto.id.into()); + } + + self.get_database_mut(proto.database_id) + .unwrap() + .get_schema_mut(proto.schema_id) + .unwrap() + .update_function(proto); + } + pub fn get_database_by_name(&self, db_name: &str) -> CatalogResult<&DatabaseCatalog> { self.database_by_name .get(db_name) @@ -681,6 +778,29 @@ impl Catalog { } } + pub fn check_function_name_duplicated( + &self, + db_name: &str, + schema_name: &str, + function_name: &str, + arg_types: &[DataType], + ) -> CatalogResult<()> { + let schema = self.get_schema_by_name(db_name, schema_name)?; + + if schema + .get_function_by_name_args(function_name, arg_types) + .is_some() + { + let name = format!( + "{function_name}({})", + arg_types.iter().map(|t| t.to_string()).join(",") + ); + Err(CatalogError::Duplicated("function", name)) + } else { + Ok(()) + } + } + /// Check if the name duplicates with existing connection. pub fn check_connection_name_duplicated( &self, diff --git a/src/frontend/src/catalog/schema_catalog.rs b/src/frontend/src/catalog/schema_catalog.rs index ab69c95180e01..00f9145d34e3f 100644 --- a/src/frontend/src/catalog/schema_catalog.rs +++ b/src/frontend/src/catalog/schema_catalog.rs @@ -192,7 +192,7 @@ impl SchemaCatalog { .unwrap(); entry.get_mut().remove(pos); } - Vacant(_entry) => unreachable!(), + Vacant(_entry) => (), }; } @@ -356,6 +356,32 @@ impl SchemaCatalog { .expect("function not found by argument types"); } + pub fn update_function(&mut self, prost: &PbFunction) { + let name = prost.name.clone(); + let id = prost.id.into(); + let function = FunctionCatalog::from(prost); + let function_ref = Arc::new(function); + + let old_function_by_id = self.function_by_id.get(&id).unwrap(); + let old_function_by_name = self + .function_by_name + .get_mut(&old_function_by_id.name) + .unwrap(); + // check if function name get updated. + if old_function_by_id.name != name { + old_function_by_name.remove(&old_function_by_id.arg_types); + if old_function_by_name.is_empty() { + self.function_by_name.remove(&old_function_by_id.name); + } + } + + self.function_by_name + .entry(name) + .or_default() + .insert(old_function_by_id.arg_types.clone(), function_ref.clone()); + self.function_by_id.insert(id, function_ref); + } + pub fn create_connection(&mut self, prost: &PbConnection) { let name = prost.name.clone(); let id = prost.id; @@ -369,6 +395,22 @@ impl SchemaCatalog { .unwrap(); } + pub fn update_connection(&mut self, prost: &PbConnection) { + let name = prost.name.clone(); + let id = prost.id; + let connection = ConnectionCatalog::from(prost); + let connection_ref = Arc::new(connection); + + let old_connection = self.connection_by_id.get(&id).unwrap(); + // check if connection name get updated. + if old_connection.name != name { + self.connection_by_name.remove(&old_connection.name); + } + + self.connection_by_name.insert(name, connection_ref.clone()); + self.connection_by_id.insert(id, connection_ref); + } + pub fn drop_connection(&mut self, connection_id: ConnectionId) { let connection_ref = self .connection_by_id @@ -498,6 +540,10 @@ impl SchemaCatalog { .map(|table| table.name.clone()) } + pub fn get_function_by_id(&self, function_id: FunctionId) -> Option<&Arc> { + self.function_by_id.get(&function_id) + } + pub fn get_function_by_name_args( &self, name: &str, @@ -514,6 +560,13 @@ impl SchemaCatalog { Some(functions.values().collect()) } + pub fn get_connection_by_id( + &self, + connection_id: &ConnectionId, + ) -> Option<&Arc> { + self.connection_by_id.get(connection_id) + } + pub fn get_connection_by_name(&self, connection_name: &str) -> Option<&Arc> { self.connection_by_name.get(connection_name) } diff --git a/src/frontend/src/handler/alter_set_schema.rs b/src/frontend/src/handler/alter_set_schema.rs new file mode 100644 index 0000000000000..4a6b8f6543af9 --- /dev/null +++ b/src/frontend/src/handler/alter_set_schema.rs @@ -0,0 +1,217 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use pgwire::pg_response::StatementType; +use risingwave_common::error::{ErrorCode, Result}; +use risingwave_pb::ddl_service::alter_set_schema_request::Object; +use risingwave_sqlparser::ast::{ObjectName, OperateFunctionArg}; + +use super::{HandlerArgs, RwPgResponse}; +use crate::catalog::root_catalog::SchemaPath; +use crate::{bind_data_type, Binder}; + +// Steps for validation: +// 1. Check permission to alter original object. +// 2. Check duplicate name in the new schema. +// 3. Check permission to create in the new schema. + +/// Handle `ALTER [TABLE | [MATERIALIZED] VIEW | SOURCE | SINK | CONNECTION | FUNCTION] SET SCHEMA ` statements. +pub async fn handle_alter_set_schema( + handler_args: HandlerArgs, + obj_name: ObjectName, + new_schema_name: ObjectName, + stmt_type: StatementType, + func_args: Option>, +) -> Result { + let session = handler_args.session; + let db_name = session.database(); + let (schema_name, real_obj_name) = + Binder::resolve_schema_qualified_name(db_name, obj_name.clone())?; + let search_path = session.config().get_search_path(); + let user_name = &session.auth_context().user_name; + let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); + + let new_schema_name = Binder::resolve_schema_name(new_schema_name)?; + let object = { + let catalog_reader = session.env().catalog_reader().read_guard(); + + match stmt_type { + StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => { + let (table, old_schema_name) = + catalog_reader.get_table_by_name(db_name, schema_path, &real_obj_name)?; + if old_schema_name == new_schema_name { + return Ok(RwPgResponse::empty_result(stmt_type)); + } + session.check_privilege_for_drop_alter(old_schema_name, &**table)?; + catalog_reader.check_relation_name_duplicated( + db_name, + &new_schema_name, + table.name(), + )?; + Object::TableId(table.id.table_id) + } + StatementType::ALTER_VIEW => { + let (view, old_schema_name) = + catalog_reader.get_view_by_name(db_name, schema_path, &real_obj_name)?; + if old_schema_name == new_schema_name { + return Ok(RwPgResponse::empty_result(stmt_type)); + } + session.check_privilege_for_drop_alter(old_schema_name, &**view)?; + catalog_reader.check_relation_name_duplicated( + db_name, + &new_schema_name, + view.name(), + )?; + Object::ViewId(view.id) + } + StatementType::ALTER_SOURCE => { + let (source, old_schema_name) = + catalog_reader.get_source_by_name(db_name, schema_path, &real_obj_name)?; + if old_schema_name == new_schema_name { + return Ok(RwPgResponse::empty_result(stmt_type)); + } + session.check_privilege_for_drop_alter(old_schema_name, &**source)?; + catalog_reader.check_relation_name_duplicated( + db_name, + &new_schema_name, + &source.name, + )?; + Object::SourceId(source.id) + } + StatementType::ALTER_SINK => { + let (sink, old_schema_name) = + catalog_reader.get_sink_by_name(db_name, schema_path, &real_obj_name)?; + if old_schema_name == new_schema_name { + return Ok(RwPgResponse::empty_result(stmt_type)); + } + session.check_privilege_for_drop_alter(old_schema_name, &**sink)?; + catalog_reader.check_relation_name_duplicated( + db_name, + &new_schema_name, + &sink.name, + )?; + Object::SinkId(sink.id.sink_id) + } + StatementType::ALTER_CONNECTION => { + let (connection, old_schema_name) = + catalog_reader.get_connection_by_name(db_name, schema_path, &real_obj_name)?; + if old_schema_name == new_schema_name { + return Ok(RwPgResponse::empty_result(stmt_type)); + } + session.check_privilege_for_drop_alter(old_schema_name, &**connection)?; + catalog_reader.check_connection_name_duplicated( + db_name, + &new_schema_name, + &connection.name, + )?; + Object::ConnectionId(connection.id) + } + StatementType::ALTER_FUNCTION => { + let (function, old_schema_name) = if let Some(args) = func_args { + let mut arg_types = Vec::with_capacity(args.len()); + for arg in args { + arg_types.push(bind_data_type(&arg.data_type)?); + } + catalog_reader.get_function_by_name_args( + db_name, + schema_path, + &real_obj_name, + &arg_types, + )? + } else { + let (functions, old_schema_name) = catalog_reader.get_functions_by_name( + db_name, + schema_path, + &real_obj_name, + )?; + if functions.len() > 1 { + return Err(ErrorCode::CatalogError(format!("function name {real_obj_name:?} is not unique\nHINT: Specify the argument list to select the function unambiguously.").into()).into()); + } + ( + functions.into_iter().next().expect("no functions"), + old_schema_name, + ) + }; + if old_schema_name == new_schema_name { + return Ok(RwPgResponse::empty_result(stmt_type)); + } + session.check_privilege_for_drop_alter(old_schema_name, &**function)?; + catalog_reader.check_function_name_duplicated( + db_name, + &new_schema_name, + &function.name, + &function.arg_types, + )?; + Object::FunctionId(function.id.function_id()) + } + _ => unreachable!(), + } + }; + + let (_, new_schema_id) = + session.get_database_and_schema_id_for_create(Some(new_schema_name))?; + + let catalog_writer = session.catalog_writer()?; + catalog_writer + .alter_set_schema(object, new_schema_id) + .await?; + + Ok(RwPgResponse::empty_result(stmt_type)) +} + +#[cfg(test)] +pub mod tests { + use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME}; + + use crate::catalog::root_catalog::SchemaPath; + use crate::test_utils::LocalFrontend; + + #[tokio::test] + async fn test_alter_set_schema_handler() { + let frontend = LocalFrontend::new(Default::default()).await; + let session = frontend.session_ref(); + + let sql = "CREATE TABLE test_table (u INT, v INT);"; + frontend.run_sql(sql).await.unwrap(); + + let sql = "CREATE SCHEMA test_schema;"; + frontend.run_sql(sql).await.unwrap(); + + let get_table = |schema_name| { + let catalog_reader = session.env().catalog_reader().read_guard(); + let schema_path = SchemaPath::Name(schema_name); + catalog_reader + .get_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "test_table") + .unwrap() + .0 + .clone() + }; + + let get_schema_name_by_table_id = |table_id| { + let catalog_reader = session.env().catalog_reader().read_guard(); + catalog_reader + .get_schema_by_table_id(DEFAULT_DATABASE_NAME, &table_id) + .unwrap() + .name() + }; + + let old_schema_name = get_schema_name_by_table_id(get_table(DEFAULT_SCHEMA_NAME).id); + assert_eq!(old_schema_name, "public"); + + let sql = "ALTER TABLE test_table SET SCHEMA test_schema;"; + frontend.run_sql(sql).await.unwrap(); + let new_schema_name = get_schema_name_by_table_id(get_table("test_schema").id); + assert_eq!(new_schema_name, "test_schema"); + } +} diff --git a/src/frontend/src/handler/drop_function.rs b/src/frontend/src/handler/drop_function.rs index 0e4f271b2e731..c3bda771ec7cd 100644 --- a/src/frontend/src/handler/drop_function.rs +++ b/src/frontend/src/handler/drop_function.rs @@ -13,7 +13,7 @@ // limitations under the License. use pgwire::pg_response::StatementType; -use risingwave_sqlparser::ast::{DropFunctionDesc, ReferentialAction}; +use risingwave_sqlparser::ast::{FunctionDesc, ReferentialAction}; use super::*; use crate::catalog::root_catalog::SchemaPath; @@ -23,7 +23,7 @@ use crate::{bind_data_type, Binder}; pub async fn handle_drop_function( handler_args: HandlerArgs, if_exists: bool, - mut func_desc: Vec, + mut func_desc: Vec, _option: Option, ) -> Result { if func_desc.len() != 1 { diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index cc7b242e07056..1a7d3bcca5515 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -36,6 +36,7 @@ use crate::utils::WithOptions; mod alter_owner; mod alter_relation_rename; +mod alter_set_schema; mod alter_source_column; mod alter_system; mod alter_table_column; @@ -513,6 +514,19 @@ pub async fn handle( ) .await } + Statement::AlterTable { + name, + operation: AlterTableOperation::SetSchema { new_schema_name }, + } => { + alter_set_schema::handle_alter_set_schema( + handler_args, + name, + new_schema_name, + StatementType::ALTER_TABLE, + None, + ) + .await + } Statement::AlterIndex { name, operation: AlterIndexOperation::RenameIndex { index_name }, @@ -557,6 +571,31 @@ pub async fn handle( .await } } + Statement::AlterView { + materialized, + name, + operation: AlterViewOperation::SetSchema { new_schema_name }, + } => { + if materialized { + alter_set_schema::handle_alter_set_schema( + handler_args, + name, + new_schema_name, + StatementType::ALTER_MATERIALIZED_VIEW, + None, + ) + .await + } else { + alter_set_schema::handle_alter_set_schema( + handler_args, + name, + new_schema_name, + StatementType::ALTER_VIEW, + None, + ) + .await + } + } Statement::AlterSink { name, operation: AlterSinkOperation::RenameSink { sink_name }, @@ -573,6 +612,19 @@ pub async fn handle( ) .await } + Statement::AlterSink { + name, + operation: AlterSinkOperation::SetSchema { new_schema_name }, + } => { + alter_set_schema::handle_alter_set_schema( + handler_args, + name, + new_schema_name, + StatementType::ALTER_SINK, + None, + ) + .await + } Statement::AlterSource { name, operation: AlterSourceOperation::RenameSource { source_name }, @@ -593,6 +645,46 @@ pub async fn handle( ) .await } + Statement::AlterSource { + name, + operation: AlterSourceOperation::SetSchema { new_schema_name }, + } => { + alter_set_schema::handle_alter_set_schema( + handler_args, + name, + new_schema_name, + StatementType::ALTER_SOURCE, + None, + ) + .await + } + Statement::AlterFunction { + name, + args, + operation: AlterFunctionOperation::SetSchema { new_schema_name }, + } => { + alter_set_schema::handle_alter_set_schema( + handler_args, + name, + new_schema_name, + StatementType::ALTER_FUNCTION, + args, + ) + .await + } + Statement::AlterConnection { + name, + operation: AlterConnectionOperation::SetSchema { new_schema_name }, + } => { + alter_set_schema::handle_alter_set_schema( + handler_args, + name, + new_schema_name, + StatementType::ALTER_CONNECTION, + None, + ) + .await + } Statement::AlterSystem { param, value } => { alter_system::handle_alter_system(handler_args, param, value).await } diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index 84fac6da59b42..90a39aea1d68d 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -298,6 +298,7 @@ impl FrontendObserverNode { function.schema_id, function.id.into(), ), + Operation::Update => catalog_guard.update_function(function), _ => panic!("receive an unsupported notify {:?}", resp), }, Info::Connection(connection) => match resp.operation() { @@ -307,6 +308,7 @@ impl FrontendObserverNode { connection.schema_id, connection.id, ), + Operation::Update => catalog_guard.update_connection(connection), _ => panic!("receive an unsupported notify {:?}", resp), }, _ => unreachable!(), diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 4e336b29c002d..4106da0c7c57a 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -37,7 +37,9 @@ use risingwave_pb::catalog::{ PbComment, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView, Table, }; use risingwave_pb::ddl_service::alter_owner_request::Object; -use risingwave_pb::ddl_service::{create_connection_request, DdlProgress, PbTableJobType}; +use risingwave_pb::ddl_service::{ + alter_set_schema_request, create_connection_request, DdlProgress, PbTableJobType, +}; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ BranchedObject, CompactionGroupInfo, HummockSnapshot, HummockVersion, HummockVersionDelta, @@ -525,6 +527,30 @@ impl CatalogWriter for MockCatalogWriter { Err(ErrorCode::ItemNotFound(format!("object not found: {:?}", object)).into()) } + async fn alter_set_schema( + &self, + object: alter_set_schema_request::Object, + new_schema_id: u32, + ) -> Result<()> { + match object { + alter_set_schema_request::Object::TableId(table_id) => { + let &schema_id = self.table_id_to_schema_id.read().get(&table_id).unwrap(); + let database_id = self.get_database_id_by_schema(schema_id); + let pb_table = { + let reader = self.catalog.read(); + let table = reader.get_table_by_id(&table_id.into())?.to_owned(); + table.to_prost(new_schema_id, database_id) + }; + self.catalog.write().update_table(&pb_table); + self.table_id_to_schema_id + .write() + .insert(table_id, new_schema_id); + Ok(()) + } + _ => unreachable!(), + } + } + async fn alter_view_name(&self, _view_id: u32, _view_name: &str) -> Result<()> { unreachable!() } diff --git a/src/meta/model_v2/src/user_privilege.rs b/src/meta/model_v2/src/user_privilege.rs index f77c146cfa66b..e521229ac7e15 100644 --- a/src/meta/model_v2/src/user_privilege.rs +++ b/src/meta/model_v2/src/user_privilege.rs @@ -34,6 +34,8 @@ pub enum Action { Create, #[sea_orm(string_value = "CONNECT")] Connect, + #[sea_orm(string_value = "EXECUTE")] + Execute, } impl From for Action { @@ -47,6 +49,7 @@ impl From for Action { PbAction::Usage => Self::Usage, PbAction::Create => Self::Create, PbAction::Connect => Self::Connect, + PbAction::Execute => Self::Execute, } } } @@ -61,6 +64,7 @@ impl From for PbAction { Action::Usage => Self::Usage, Action::Create => Self::Create, Action::Connect => Self::Connect, + Action::Execute => Self::Execute, } } } diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 10d4524db5370..f3843b7f4b19e 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -671,6 +671,24 @@ impl DdlService for DdlServiceImpl { })) } + async fn alter_set_schema( + &self, + request: Request, + ) -> Result, Status> { + let AlterSetSchemaRequest { + object, + new_schema_id, + } = request.into_inner(); + let version = self + .ddl_controller + .run_command(DdlCommand::AlterSetSchema(object.unwrap(), new_schema_id)) + .await?; + Ok(Response::new(AlterSetSchemaResponse { + status: None, + version, + })) + } + async fn get_ddl_progress( &self, _request: Request, diff --git a/src/meta/src/manager/catalog/database.rs b/src/meta/src/manager/catalog/database.rs index d1a002408bc25..735ef4c338e6a 100644 --- a/src/meta/src/manager/catalog/database.rs +++ b/src/meta/src/manager/catalog/database.rs @@ -23,6 +23,7 @@ use risingwave_pb::catalog::{ Connection, CreateType, Database, Function, Index, PbStreamJobStatus, Schema, Sink, Source, StreamJobStatus, Table, View, }; +use risingwave_pb::data::DataType; use super::{ConnectionId, DatabaseId, FunctionId, RelationId, SchemaId, SinkId, SourceId, ViewId}; use crate::manager::{IndexId, MetaSrvEnv, TableId}; @@ -44,6 +45,7 @@ pub type Catalog = ( type DatabaseKey = String; type SchemaKey = (DatabaseId, String); type RelationKey = (DatabaseId, SchemaId, String); +type FunctionKey = (DatabaseId, SchemaId, String, Vec); /// [`DatabaseManager`] caches meta catalog information and maintains dependent relationship /// between tables. @@ -235,14 +237,14 @@ impl DatabaseManager { } } - pub fn check_function_duplicated(&self, function: &Function) -> MetaResult<()> { + pub fn check_function_duplicated(&self, function_key: &FunctionKey) -> MetaResult<()> { if self.functions.values().any(|x| { - x.database_id == function.database_id - && x.schema_id == function.schema_id - && x.name.eq(&function.name) - && x.arg_types == function.arg_types + x.database_id == function_key.0 + && x.schema_id == function_key.1 + && x.name.eq(&function_key.2) + && x.arg_types == function_key.3 }) { - Err(MetaError::catalog_duplicated("function", &function.name)) + Err(MetaError::catalog_duplicated("function", &function_key.2)) } else { Ok(()) } @@ -541,6 +543,14 @@ impl DatabaseManager { } } + pub fn ensure_function_id(&self, function_id: FunctionId) -> MetaResult<()> { + if self.functions.contains_key(&function_id) { + Ok(()) + } else { + Err(MetaError::catalog_id_not_found("function", function_id)) + } + } + // TODO(zehua): refactor when using SourceId. pub fn ensure_table_view_or_source_id(&self, table_id: &TableId) -> MetaResult<()> { if self.tables.contains_key(table_id) diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index abd48bc868402..70106be4ece3f 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -37,7 +37,7 @@ use risingwave_pb::catalog::{ Comment, Connection, CreateType, Database, Function, Index, PbStreamJobStatus, Schema, Sink, Source, StreamJobStatus, Table, View, }; -use risingwave_pb::ddl_service::alter_owner_request; +use risingwave_pb::ddl_service::{alter_owner_request, alter_set_schema_request}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::user::grant_privilege::{ActionWithGrantOption, Object}; use risingwave_pb::user::update_user_request::UpdateField; @@ -583,7 +583,12 @@ impl CatalogManager { let user_core = &mut core.user; database_core.ensure_database_id(function.database_id)?; database_core.ensure_schema_id(function.schema_id)?; - database_core.check_function_duplicated(function)?; + database_core.check_function_duplicated(&( + function.database_id, + function.schema_id, + function.name.clone(), + function.arg_types.clone(), + ))?; #[cfg(not(test))] user_core.ensure_user_id(function.owner)?; @@ -1973,6 +1978,239 @@ impl CatalogManager { Ok(version) } + pub async fn alter_set_schema( + &self, + fragment_manager: FragmentManagerRef, + object: alter_set_schema_request::Object, + new_schema_id: SchemaId, + ) -> MetaResult { + let core = &mut *self.core.lock().await; + let database_core = &mut core.database; + + database_core.ensure_schema_id(new_schema_id)?; + let database_id = database_core + .schemas + .get(&new_schema_id) + .unwrap() + .get_database_id(); + + let mut relation_infos = Vec::new(); + match object { + alter_set_schema_request::Object::TableId(table_id) => { + database_core.ensure_table_id(table_id)?; + let Table { + name, + optional_associated_source_id, + schema_id, + .. + } = database_core.tables.get(&table_id).unwrap(); + if *schema_id == new_schema_id { + return Ok(IGNORED_NOTIFICATION_VERSION); + } + + database_core.check_relation_name_duplicated(&( + database_id, + new_schema_id, + name.to_owned(), + ))?; + + // associated source id. + let to_update_source_id = if let Some( + OptionalAssociatedSourceId::AssociatedSourceId(associated_source_id), + ) = optional_associated_source_id + { + Some(*associated_source_id) + } else { + None + }; + + let mut to_update_table_ids = vec![table_id]; + let mut to_update_internal_table_ids = vec![]; + + // indexes and index tables. + let (to_update_index_ids, index_table_ids): (Vec<_>, Vec<_>) = database_core + .indexes + .iter() + .filter(|(_, index)| index.primary_table_id == table_id) + .map(|(index_id, index)| (*index_id, index.index_table_id)) + .unzip(); + to_update_table_ids.extend(index_table_ids); + + // internal tables. + for table_id in &to_update_table_ids { + let table_fragment = fragment_manager + .select_table_fragments_by_table_id(&(table_id.into())) + .await?; + to_update_internal_table_ids.extend(table_fragment.internal_table_ids()); + } + + let mut tables = BTreeMapTransaction::new(&mut database_core.tables); + for table_id in to_update_table_ids + .into_iter() + .chain(to_update_internal_table_ids) + { + let mut table = tables.get_mut(table_id).unwrap(); + table.schema_id = new_schema_id; + relation_infos.push(Some(RelationInfo::Table(table.clone()))); + } + + let mut sources = BTreeMapTransaction::new(&mut database_core.sources); + if let Some(source_id) = to_update_source_id { + let mut source = sources.get_mut(source_id).unwrap(); + source.schema_id = new_schema_id; + relation_infos.push(Some(RelationInfo::Source(source.clone()))); + } + + let mut indexes = BTreeMapTransaction::new(&mut database_core.indexes); + for index_id in to_update_index_ids { + let mut index = indexes.get_mut(index_id).unwrap(); + index.schema_id = new_schema_id; + relation_infos.push(Some(RelationInfo::Index(index.clone()))); + } + + commit_meta!(self, tables, sources, indexes)?; + } + alter_set_schema_request::Object::ViewId(view_id) => { + database_core.ensure_view_id(view_id)?; + let View { + name, schema_id, .. + } = database_core.views.get(&view_id).unwrap(); + if *schema_id == new_schema_id { + return Ok(IGNORED_NOTIFICATION_VERSION); + } + + database_core.check_relation_name_duplicated(&( + database_id, + new_schema_id, + name.to_owned(), + ))?; + let mut views = BTreeMapTransaction::new(&mut database_core.views); + let mut view = views.get_mut(view_id).unwrap(); + view.schema_id = new_schema_id; + relation_infos.push(Some(RelationInfo::View(view.clone()))); + commit_meta!(self, views)?; + } + alter_set_schema_request::Object::SourceId(source_id) => { + database_core.ensure_source_id(source_id)?; + let Source { + name, schema_id, .. + } = database_core.sources.get(&source_id).unwrap(); + if *schema_id == new_schema_id { + return Ok(IGNORED_NOTIFICATION_VERSION); + } + + database_core.check_relation_name_duplicated(&( + database_id, + new_schema_id, + name.to_owned(), + ))?; + let mut sources = BTreeMapTransaction::new(&mut database_core.sources); + let mut source = sources.get_mut(source_id).unwrap(); + source.schema_id = new_schema_id; + relation_infos.push(Some(RelationInfo::Source(source.clone()))); + commit_meta!(self, sources)?; + } + alter_set_schema_request::Object::SinkId(sink_id) => { + database_core.ensure_sink_id(sink_id)?; + let Sink { + name, schema_id, .. + } = database_core.sinks.get(&sink_id).unwrap(); + if *schema_id == new_schema_id { + return Ok(IGNORED_NOTIFICATION_VERSION); + } + + // internal tables. + let to_update_internal_table_ids = Vec::from_iter( + fragment_manager + .select_table_fragments_by_table_id(&(sink_id.into())) + .await? + .internal_table_ids(), + ); + + database_core.check_relation_name_duplicated(&( + database_id, + new_schema_id, + name.to_owned(), + ))?; + let mut sinks = BTreeMapTransaction::new(&mut database_core.sinks); + let mut sink = sinks.get_mut(sink_id).unwrap(); + sink.schema_id = new_schema_id; + relation_infos.push(Some(RelationInfo::Sink(sink.clone()))); + + let mut tables = BTreeMapTransaction::new(&mut database_core.tables); + for table_id in to_update_internal_table_ids { + let mut table = tables.get_mut(table_id).unwrap(); + table.schema_id = new_schema_id; + relation_infos.push(Some(RelationInfo::Table(table.clone()))); + } + + commit_meta!(self, sinks, tables)?; + } + alter_set_schema_request::Object::ConnectionId(connection_id) => { + database_core.ensure_connection_id(connection_id)?; + let Connection { + name, schema_id, .. + } = database_core.connections.get(&connection_id).unwrap(); + if *schema_id == new_schema_id { + return Ok(IGNORED_NOTIFICATION_VERSION); + } + + database_core.check_connection_name_duplicated(&( + database_id, + new_schema_id, + name.to_owned(), + ))?; + + let mut connections = BTreeMapTransaction::new(&mut database_core.connections); + let mut connection = connections.get_mut(connection_id).unwrap(); + connection.schema_id = new_schema_id; + let notify_info = Info::Connection(connection.clone()); + commit_meta!(self, connections)?; + let version = self.notify_frontend(Operation::Update, notify_info).await; + return Ok(version); + } + alter_set_schema_request::Object::FunctionId(function_id) => { + database_core.ensure_function_id(function_id)?; + let Function { + name, + arg_types, + schema_id, + .. + } = database_core.functions.get(&function_id).unwrap(); + if *schema_id == new_schema_id { + return Ok(IGNORED_NOTIFICATION_VERSION); + } + + database_core.check_function_duplicated(&( + database_id, + new_schema_id, + name.to_owned(), + arg_types.to_owned(), + ))?; + let mut functions = BTreeMapTransaction::new(&mut database_core.functions); + let mut function = functions.get_mut(function_id).unwrap(); + function.schema_id = new_schema_id; + let notify_info = Info::Function(function.clone()); + commit_meta!(self, functions)?; + let version = self.notify_frontend(Operation::Update, notify_info).await; + return Ok(version); + } + } + + let version = self + .notify_frontend( + Operation::Update, + Info::RelationGroup(RelationGroup { + relations: relation_infos + .into_iter() + .map(|relation_info| Relation { relation_info }) + .collect_vec(), + }), + ) + .await; + Ok(version) + } + pub async fn alter_index_name( &self, index_id: IndexId, diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index bf171951da8a5..776c39b7131fb 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -29,7 +29,7 @@ use risingwave_pb::catalog::{ }; use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::alter_relation_name_request::Relation; -use risingwave_pb::ddl_service::DdlProgress; +use risingwave_pb::ddl_service::{alter_set_schema_request, DdlProgress}; use risingwave_pb::stream_plan::StreamFragmentGraph as StreamFragmentGraphProto; use tokio::sync::Semaphore; use tokio::time::sleep; @@ -104,6 +104,7 @@ pub enum DdlCommand { AlterRelationName(Relation, String), AlterSourceColumn(Source), AlterTableOwner(Object, UserId), + AlterSetSchema(alter_set_schema_request::Object, SchemaId), CreateConnection(Connection), DropConnection(ConnectionId), CommentOn(Comment), @@ -260,6 +261,9 @@ impl DdlController { DdlCommand::AlterTableOwner(object, owner_id) => { ctrl.alter_owner(object, owner_id).await } + DdlCommand::AlterSetSchema(object, new_schema_id) => { + ctrl.alter_set_schema(object, new_schema_id).await + } DdlCommand::CreateConnection(connection) => { ctrl.create_connection(connection).await } @@ -1142,6 +1146,16 @@ impl DdlController { .await } + async fn alter_set_schema( + &self, + object: alter_set_schema_request::Object, + new_schema_id: SchemaId, + ) -> MetaResult { + self.catalog_manager + .alter_set_schema(self.fragment_manager.clone(), object, new_schema_id) + .await + } + pub async fn wait(&self) -> MetaResult<()> { let timeout_secs = 30 * 60; for _ in 0..timeout_secs { diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 0b9de6439d19b..f18a1f92cd4f3 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -466,6 +466,19 @@ impl MetaClient { Ok(resp.version) } + pub async fn alter_set_schema( + &self, + object: alter_set_schema_request::Object, + new_schema_id: u32, + ) -> Result { + let request = AlterSetSchemaRequest { + new_schema_id, + object: Some(object), + }; + let resp = self.inner.alter_set_schema(request).await?; + Ok(resp.version) + } + pub async fn replace_table( &self, source: Option, @@ -1738,6 +1751,7 @@ macro_rules! for_all_meta_rpc { ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse } ,{ ddl_client, alter_relation_name, AlterRelationNameRequest, AlterRelationNameResponse } ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse } + ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse } ,{ ddl_client, create_materialized_view, CreateMaterializedViewRequest, CreateMaterializedViewResponse } ,{ ddl_client, create_view, CreateViewRequest, CreateViewResponse } ,{ ddl_client, create_source, CreateSourceRequest, CreateSourceResponse } diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index 502b3230c89e9..3db23193fe346 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -44,13 +44,9 @@ pub enum AlterTableOperation { /// `ADD ` AddConstraint(TableConstraint), /// `ADD [ COLUMN ] ` - AddColumn { - column_def: ColumnDef, - }, + AddColumn { column_def: ColumnDef }, /// TODO: implement `DROP CONSTRAINT ` - DropConstraint { - name: Ident, - }, + DropConstraint { name: Ident }, /// `DROP [ COLUMN ] [ IF EXISTS ] [ CASCADE ]` DropColumn { column_name: Ident, @@ -63,9 +59,7 @@ pub enum AlterTableOperation { new_column_name: Ident, }, /// `RENAME TO ` - RenameTable { - table_name: ObjectName, - }, + RenameTable { table_name: ObjectName }, // CHANGE [ COLUMN ] [ ] ChangeColumn { old_name: Ident, @@ -76,19 +70,16 @@ pub enum AlterTableOperation { /// `RENAME CONSTRAINT TO ` /// /// Note: this is a PostgreSQL-specific operation. - RenameConstraint { - old_name: Ident, - new_name: Ident, - }, + RenameConstraint { old_name: Ident, new_name: Ident }, /// `ALTER [ COLUMN ]` AlterColumn { column_name: Ident, op: AlterColumnOperation, }, - - ChangeOwner { - new_owner_name: Ident, - }, + /// `OWNER TO ` + ChangeOwner { new_owner_name: Ident }, + /// `SET SCHEMA ` + SetSchema { new_schema_name: ObjectName }, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -104,6 +95,7 @@ pub enum AlterIndexOperation { pub enum AlterViewOperation { RenameView { view_name: ObjectName }, ChangeOwner { new_owner_name: Ident }, + SetSchema { new_schema_name: ObjectName }, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -112,6 +104,7 @@ pub enum AlterViewOperation { pub enum AlterSinkOperation { RenameSink { sink_name: ObjectName }, ChangeOwner { new_owner_name: Ident }, + SetSchema { new_schema_name: ObjectName }, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -121,6 +114,21 @@ pub enum AlterSourceOperation { RenameSource { source_name: ObjectName }, AddColumn { column_def: ColumnDef }, ChangeOwner { new_owner_name: Ident }, + SetSchema { new_schema_name: ObjectName }, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))] +pub enum AlterFunctionOperation { + SetSchema { new_schema_name: ObjectName }, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))] +pub enum AlterConnectionOperation { + SetSchema { new_schema_name: ObjectName }, } impl fmt::Display for AlterDatabaseOperation { @@ -195,6 +203,9 @@ impl fmt::Display for AlterTableOperation { AlterTableOperation::ChangeOwner { new_owner_name } => { write!(f, "OWNER TO {}", new_owner_name) } + AlterTableOperation::SetSchema { new_schema_name } => { + write!(f, "SET SCHEMA {}", new_schema_name) + } } } } @@ -218,6 +229,9 @@ impl fmt::Display for AlterViewOperation { AlterViewOperation::ChangeOwner { new_owner_name } => { write!(f, "OWNER TO {}", new_owner_name) } + AlterViewOperation::SetSchema { new_schema_name } => { + write!(f, "SET SCHEMA {}", new_schema_name) + } } } } @@ -231,6 +245,9 @@ impl fmt::Display for AlterSinkOperation { AlterSinkOperation::ChangeOwner { new_owner_name } => { write!(f, "OWNER TO {}", new_owner_name) } + AlterSinkOperation::SetSchema { new_schema_name } => { + write!(f, "SET SCHEMA {}", new_schema_name) + } } } } @@ -247,6 +264,29 @@ impl fmt::Display for AlterSourceOperation { AlterSourceOperation::ChangeOwner { new_owner_name } => { write!(f, "OWNER TO {}", new_owner_name) } + AlterSourceOperation::SetSchema { new_schema_name } => { + write!(f, "SET SCHEMA {}", new_schema_name) + } + } + } +} + +impl fmt::Display for AlterFunctionOperation { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + AlterFunctionOperation::SetSchema { new_schema_name } => { + write!(f, "SET SCHEMA {new_schema_name}") + } + } + } +} + +impl fmt::Display for AlterConnectionOperation { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + AlterConnectionOperation::SetSchema { new_schema_name } => { + write!(f, "SET SCHEMA {new_schema_name}") + } } } } diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index 3f253343be274..db610c8f8f504 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -32,8 +32,9 @@ use serde::{Deserialize, Serialize}; pub use self::data_type::{DataType, StructField}; pub use self::ddl::{ - AlterColumnOperation, AlterDatabaseOperation, AlterSchemaOperation, AlterTableOperation, - ColumnDef, ColumnOption, ColumnOptionDef, ReferentialAction, SourceWatermark, TableConstraint, + AlterColumnOperation, AlterConnectionOperation, AlterDatabaseOperation, AlterFunctionOperation, + AlterSchemaOperation, AlterTableOperation, ColumnDef, ColumnOption, ColumnOptionDef, + ReferentialAction, SourceWatermark, TableConstraint, }; pub use self::operator::{BinaryOperator, QualifiedOperator, UnaryOperator}; pub use self::query::{ @@ -1178,6 +1179,19 @@ pub enum Statement { name: ObjectName, operation: AlterSourceOperation, }, + /// ALTER FUNCTION + AlterFunction { + /// Function name + name: ObjectName, + args: Option>, + operation: AlterFunctionOperation, + }, + /// ALTER CONNECTION + AlterConnection { + /// Connection name + name: ObjectName, + operation: AlterConnectionOperation, + }, /// DESCRIBE TABLE OR SOURCE Describe { /// Table or Source name @@ -1206,7 +1220,7 @@ pub enum Statement { DropFunction { if_exists: bool, /// One or more function to drop - func_desc: Vec, + func_desc: Vec, /// `CASCADE` or `RESTRICT` option: Option, }, @@ -1637,6 +1651,16 @@ impl fmt::Display for Statement { Statement::AlterSource { name, operation } => { write!(f, "ALTER SOURCE {} {}", name, operation) } + Statement::AlterFunction { name, args, operation } => { + write!(f, "ALTER FUNCTION {}", name)?; + if let Some(args) = args { + write!(f, "({})", display_comma_separated(args))?; + } + write!(f, " {}", operation) + } + Statement::AlterConnection { name, operation } => { + write!(f, "ALTER CONNECTION {} {}", name, operation) + } Statement::Drop(stmt) => write!(f, "DROP {}", stmt), Statement::DropFunction { if_exists, @@ -2416,12 +2440,12 @@ impl fmt::Display for DropFunctionOption { #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[cfg_attr(feature = "visitor", derive(Visit, VisitMut))] -pub struct DropFunctionDesc { +pub struct FunctionDesc { pub name: ObjectName, pub args: Option>, } -impl fmt::Display for DropFunctionDesc { +impl fmt::Display for FunctionDesc { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", self.name)?; if let Some(args) = &self.args { diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index c4c70c8a938ce..a8a20db439f11 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -26,8 +26,8 @@ use itertools::Itertools; use tracing::{debug, instrument}; use crate::ast::ddl::{ - AlterDatabaseOperation, AlterIndexOperation, AlterSchemaOperation, AlterSinkOperation, - AlterViewOperation, SourceWatermark, + AlterConnectionOperation, AlterDatabaseOperation, AlterFunctionOperation, AlterIndexOperation, + AlterSchemaOperation, AlterSinkOperation, AlterViewOperation, SourceWatermark, }; use crate::ast::{ParseTo, *}; use crate::keywords::{self, Keyword}; @@ -2360,7 +2360,7 @@ impl Parser { /// ``` fn parse_drop_function(&mut self) -> Result { let if_exists = self.parse_keywords(&[Keyword::IF, Keyword::EXISTS]); - let func_desc = self.parse_comma_separated(Parser::parse_drop_function_desc)?; + let func_desc = self.parse_comma_separated(Parser::parse_function_desc)?; let option = match self.parse_one_of_keywords(&[Keyword::CASCADE, Keyword::RESTRICT]) { Some(Keyword::CASCADE) => Some(ReferentialAction::Cascade), Some(Keyword::RESTRICT) => Some(ReferentialAction::Restrict), @@ -2373,7 +2373,7 @@ impl Parser { }) } - fn parse_drop_function_desc(&mut self) -> Result { + fn parse_function_desc(&mut self) -> Result { let name = self.parse_object_name()?; let args = if self.consume_token(&Token::LParen) { @@ -2388,7 +2388,7 @@ impl Parser { None }; - Ok(DropFunctionDesc { name, args }) + Ok(FunctionDesc { name, args }) } pub fn parse_create_index(&mut self, unique: bool) -> Result { @@ -2801,13 +2801,17 @@ impl Parser { self.parse_alter_sink() } else if self.parse_keyword(Keyword::SOURCE) { self.parse_alter_source() + } else if self.parse_keyword(Keyword::FUNCTION) { + self.parse_alter_function() + } else if self.parse_keyword(Keyword::CONNECTION) { + self.parse_alter_connection() } else if self.parse_keyword(Keyword::USER) { self.parse_alter_user() } else if self.parse_keyword(Keyword::SYSTEM) { self.parse_alter_system() } else { self.expected( - "DATABASE, SCHEMA, TABLE, INDEX, MATERIALIZED, VIEW, SINK, SOURCE, USER or SYSTEM after ALTER", + "DATABASE, SCHEMA, TABLE, INDEX, MATERIALIZED, VIEW, SINK, SOURCE, FUNCTION, USER or SYSTEM after ALTER", self.peek_token(), ) } @@ -2888,6 +2892,15 @@ impl Parser { AlterTableOperation::ChangeOwner { new_owner_name: owner_name, } + } else if self.parse_keyword(Keyword::SET) { + if self.parse_keyword(Keyword::SCHEMA) { + let schema_name = self.parse_object_name()?; + AlterTableOperation::SetSchema { + new_schema_name: schema_name, + } + } else { + return self.expected("SCHEMA after SET", self.peek_token()); + } } else if self.parse_keyword(Keyword::DROP) { let _ = self.parse_keyword(Keyword::COLUMN); let if_exists = self.parse_keywords(&[Keyword::IF, Keyword::EXISTS]); @@ -2931,7 +2944,7 @@ impl Parser { AlterTableOperation::AlterColumn { column_name, op } } else { return self.expected( - "ADD, RENAME, OWNER TO or DROP after ALTER TABLE", + "ADD or RENAME or OWNER TO or SET or DROP after ALTER TABLE", self.peek_token(), ); }; @@ -2974,10 +2987,19 @@ impl Parser { AlterViewOperation::ChangeOwner { new_owner_name: owner_name, } + } else if self.parse_keyword(Keyword::SET) { + if self.parse_keyword(Keyword::SCHEMA) { + let schema_name = self.parse_object_name()?; + AlterViewOperation::SetSchema { + new_schema_name: schema_name, + } + } else { + return self.expected("SCHEMA after SET", self.peek_token()); + } } else { return self.expected( &format!( - "RENAME or OWNER TO after ALTER {}VIEW", + "RENAME or OWNER TO or SET after ALTER {}VIEW", if materialized { "MATERIALIZED " } else { "" } ), self.peek_token(), @@ -3005,8 +3027,20 @@ impl Parser { AlterSinkOperation::ChangeOwner { new_owner_name: owner_name, } + } else if self.parse_keyword(Keyword::SET) { + if self.parse_keyword(Keyword::SCHEMA) { + let schema_name = self.parse_object_name()?; + AlterSinkOperation::SetSchema { + new_schema_name: schema_name, + } + } else { + return self.expected("SCHEMA after SET", self.peek_token()); + } } else { - return self.expected("RENAME or OWNER TO after ALTER SINK", self.peek_token()); + return self.expected( + "RENAME or OWNER TO or SET after ALTER SINK", + self.peek_token(), + ); }; Ok(Statement::AlterSink { @@ -3034,9 +3068,18 @@ impl Parser { AlterSourceOperation::ChangeOwner { new_owner_name: owner_name, } + } else if self.parse_keyword(Keyword::SET) { + if self.parse_keyword(Keyword::SCHEMA) { + let schema_name = self.parse_object_name()?; + AlterSourceOperation::SetSchema { + new_schema_name: schema_name, + } + } else { + return self.expected("SCHEMA after SET", self.peek_token()); + } } else { return self.expected( - "RENAME, ADD COLUMN or OWNER TO after ALTER SOURCE", + "RENAME, ADD COLUMN or OWNER TO or SET after ALTER SOURCE", self.peek_token(), ); }; @@ -3047,6 +3090,50 @@ impl Parser { }) } + pub fn parse_alter_function(&mut self) -> Result { + let FunctionDesc { name, args } = self.parse_function_desc()?; + + let operation = if self.parse_keyword(Keyword::SET) { + if self.parse_keyword(Keyword::SCHEMA) { + let schema_name = self.parse_object_name()?; + AlterFunctionOperation::SetSchema { + new_schema_name: schema_name, + } + } else { + return self.expected("SCHEMA after SET", self.peek_token()); + } + } else { + return self.expected("SET after ALTER FUNCTION", self.peek_token()); + }; + + Ok(Statement::AlterFunction { + name, + args, + operation, + }) + } + + pub fn parse_alter_connection(&mut self) -> Result { + let connection_name = self.parse_object_name()?; + let operation = if self.parse_keyword(Keyword::SET) { + if self.parse_keyword(Keyword::SCHEMA) { + let schema_name = self.parse_object_name()?; + AlterConnectionOperation::SetSchema { + new_schema_name: schema_name, + } + } else { + return self.expected("SCHEMA after SET", self.peek_token()); + } + } else { + return self.expected("SET after ALTER CONNECTION", self.peek_token()); + }; + + Ok(Statement::AlterConnection { + name: connection_name, + operation, + }) + } + pub fn parse_alter_system(&mut self) -> Result { self.expect_keyword(Keyword::SET)?; let param = self.parse_identifier()?; diff --git a/src/sqlparser/tests/sqlparser_postgres.rs b/src/sqlparser/tests/sqlparser_postgres.rs index d94f1b06b166b..f80f95b7a6d06 100644 --- a/src/sqlparser/tests/sqlparser_postgres.rs +++ b/src/sqlparser/tests/sqlparser_postgres.rs @@ -853,7 +853,7 @@ fn parse_drop_function() { verified_stmt(sql), Statement::DropFunction { if_exists: true, - func_desc: vec![DropFunctionDesc { + func_desc: vec![FunctionDesc { name: ObjectName(vec![Ident::new_unchecked("test_func")]), args: None }], @@ -866,7 +866,7 @@ fn parse_drop_function() { verified_stmt(sql), Statement::DropFunction { if_exists: true, - func_desc: vec![DropFunctionDesc { + func_desc: vec![FunctionDesc { name: ObjectName(vec![Ident::new_unchecked("test_func")]), args: Some(vec![ OperateFunctionArg::with_name("a", DataType::Int), @@ -888,7 +888,7 @@ fn parse_drop_function() { Statement::DropFunction { if_exists: true, func_desc: vec![ - DropFunctionDesc { + FunctionDesc { name: ObjectName(vec![Ident::new_unchecked("test_func1")]), args: Some(vec![ OperateFunctionArg::with_name("a", DataType::Int), @@ -900,7 +900,7 @@ fn parse_drop_function() { } ]), }, - DropFunctionDesc { + FunctionDesc { name: ObjectName(vec![Ident::new_unchecked("test_func2")]), args: Some(vec![ OperateFunctionArg::with_name("a", DataType::Varchar), diff --git a/src/utils/pgwire/src/pg_response.rs b/src/utils/pgwire/src/pg_response.rs index d6a35f35e5e1d..915f05e122042 100644 --- a/src/utils/pgwire/src/pg_response.rs +++ b/src/utils/pgwire/src/pg_response.rs @@ -75,6 +75,8 @@ pub enum StatementType { ALTER_MATERIALIZED_VIEW, ALTER_SINK, ALTER_SOURCE, + ALTER_FUNCTION, + ALTER_CONNECTION, ALTER_SYSTEM, REVOKE_PRIVILEGE, // Introduce ORDER_BY statement type cuz Calcite unvalidated AST has SqlKind.ORDER_BY. Note