diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index 1164df548d5d0..2d3fbd7e0178a 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -24,10 +24,9 @@ use risingwave_pb::catalog::{ PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbSubscription, PbTable, PbView, }; -use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ - alter_name_request, alter_set_schema_request, create_connection_request, PbReplaceTablePlan, - PbTableJobType, ReplaceTablePlan, TableJobType, WaitVersion, + alter_name_request, alter_owner_request, alter_set_schema_request, create_connection_request, + PbReplaceTablePlan, PbTableJobType, ReplaceTablePlan, TableJobType, WaitVersion, }; use risingwave_pb::meta::PbTableParallelism; use risingwave_pb::stream_plan::StreamFragmentGraph; @@ -175,27 +174,13 @@ pub trait CatalogWriter: Send + Sync { async fn drop_secret(&self, secret_id: SecretId) -> Result<()>; - async fn alter_table_name(&self, table_id: u32, table_name: &str) -> Result<()>; - - async fn alter_view_name(&self, view_id: u32, view_name: &str) -> Result<()>; - - async fn alter_index_name(&self, index_id: u32, index_name: &str) -> Result<()>; - - async fn alter_sink_name(&self, sink_id: u32, sink_name: &str) -> Result<()>; - - async fn alter_subscription_name( + async fn alter_name( &self, - subscription_id: u32, - subscription_name: &str, + object_id: alter_name_request::Object, + object_name: &str, ) -> Result<()>; - async fn alter_source_name(&self, source_id: u32, source_name: &str) -> Result<()>; - - async fn alter_schema_name(&self, schema_id: u32, schema_name: &str) -> Result<()>; - - async fn alter_database_name(&self, database_id: u32, database_name: &str) -> Result<()>; - - async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()>; + async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()>; /// Replace the source in the catalog. async fn alter_source(&self, source: PbSource) -> Result<()>; @@ -469,81 +454,16 @@ impl CatalogWriter for CatalogWriterImpl { self.wait_version(version).await } - async fn alter_table_name(&self, table_id: u32, table_name: &str) -> Result<()> { - let version = self - .meta_client - .alter_name(alter_name_request::Object::TableId(table_id), table_name) - .await?; - self.wait_version(version).await - } - - async fn alter_view_name(&self, view_id: u32, view_name: &str) -> Result<()> { - let version = self - .meta_client - .alter_name(alter_name_request::Object::ViewId(view_id), view_name) - .await?; - self.wait_version(version).await - } - - async fn alter_index_name(&self, index_id: u32, index_name: &str) -> Result<()> { - let version = self - .meta_client - .alter_name(alter_name_request::Object::IndexId(index_id), index_name) - .await?; - self.wait_version(version).await - } - - async fn alter_sink_name(&self, sink_id: u32, sink_name: &str) -> Result<()> { - let version = self - .meta_client - .alter_name(alter_name_request::Object::SinkId(sink_id), sink_name) - .await?; - self.wait_version(version).await - } - - async fn alter_subscription_name( + async fn alter_name( &self, - subscription_id: u32, - subscription_name: &str, + object_id: alter_name_request::Object, + object_name: &str, ) -> Result<()> { - let version = self - .meta_client - .alter_name( - alter_name_request::Object::SubscriptionId(subscription_id), - subscription_name, - ) - .await?; - self.wait_version(version).await - } - - async fn alter_source_name(&self, source_id: u32, source_name: &str) -> Result<()> { - let version = self - .meta_client - .alter_name(alter_name_request::Object::SourceId(source_id), source_name) - .await?; - self.wait_version(version).await - } - - async fn alter_schema_name(&self, schema_id: u32, schema_name: &str) -> Result<()> { - let version = self - .meta_client - .alter_name(alter_name_request::Object::SchemaId(schema_id), schema_name) - .await?; - self.wait_version(version).await - } - - async fn alter_database_name(&self, database_id: u32, database_name: &str) -> Result<()> { - let version = self - .meta_client - .alter_name( - alter_name_request::Object::DatabaseId(database_id), - database_name, - ) - .await?; + let version = self.meta_client.alter_name(object_id, object_name).await?; self.wait_version(version).await } - async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()> { + async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()> { let version = self.meta_client.alter_owner(object, owner_id).await?; self.wait_version(version).await } diff --git a/src/frontend/src/handler/alter_rename.rs b/src/frontend/src/handler/alter_rename.rs index ba9b647c2fd7e..fe036067b1f57 100644 --- a/src/frontend/src/handler/alter_rename.rs +++ b/src/frontend/src/handler/alter_rename.rs @@ -15,7 +15,8 @@ use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::acl::AclMode; use risingwave_common::catalog::is_system_schema; -use risingwave_pb::user::grant_privilege::Object; +use risingwave_pb::ddl_service::alter_name_request; +use risingwave_pb::user::grant_privilege; use risingwave_sqlparser::ast::ObjectName; use super::{HandlerArgs, RwPgResponse}; @@ -58,7 +59,10 @@ pub async fn handle_rename_table( let catalog_writer = session.catalog_writer()?; catalog_writer - .alter_table_name(table_id.table_id, &new_table_name) + .alter_name( + alter_name_request::Object::TableId(table_id.table_id), + &new_table_name, + ) .await?; let stmt_type = match table_type { @@ -94,7 +98,10 @@ pub async fn handle_rename_index( let catalog_writer = session.catalog_writer()?; catalog_writer - .alter_index_name(index_id.index_id, &new_index_name) + .alter_name( + alter_name_request::Object::IndexId(index_id.index_id), + &new_index_name, + ) .await?; Ok(PgResponse::empty_result(StatementType::ALTER_INDEX)) @@ -124,7 +131,7 @@ pub async fn handle_rename_view( let catalog_writer = session.catalog_writer()?; catalog_writer - .alter_view_name(view_id, &new_view_name) + .alter_name(alter_name_request::Object::ViewId(view_id), &new_view_name) .await?; Ok(PgResponse::empty_result(StatementType::ALTER_VIEW)) @@ -154,7 +161,10 @@ pub async fn handle_rename_sink( let catalog_writer = session.catalog_writer()?; catalog_writer - .alter_sink_name(sink_id.sink_id, &new_sink_name) + .alter_name( + alter_name_request::Object::SinkId(sink_id.sink_id), + &new_sink_name, + ) .await?; Ok(PgResponse::empty_result(StatementType::ALTER_SINK)) @@ -185,7 +195,10 @@ pub async fn handle_rename_subscription( let catalog_writer = session.catalog_writer()?; catalog_writer - .alter_subscription_name(subscription_id.subscription_id, &new_subscription_name) + .alter_name( + alter_name_request::Object::SubscriptionId(subscription_id.subscription_id), + &new_subscription_name, + ) .await?; Ok(PgResponse::empty_result(StatementType::ALTER_SUBSCRIPTION)) @@ -225,7 +238,10 @@ pub async fn handle_rename_source( let catalog_writer = session.catalog_writer()?; catalog_writer - .alter_source_name(source_id, &new_source_name) + .alter_name( + alter_name_request::Object::SourceId(source_id), + &new_source_name, + ) .await?; Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE)) @@ -260,7 +276,9 @@ pub async fn handle_rename_schema( // To rename a schema you must also have the CREATE privilege for the database. if let Some(user) = user_reader.get_user_by_name(session.user_name()) { - if !user.is_super && !user.check_privilege(&Object::DatabaseId(db_id), AclMode::Create) + if !user.is_super + && !user + .check_privilege(&grant_privilege::Object::DatabaseId(db_id), AclMode::Create) { return Err(ErrorCode::PermissionDenied( "Do not have create privilege on the current database".to_string(), @@ -276,7 +294,10 @@ pub async fn handle_rename_schema( let catalog_writer = session.catalog_writer()?; catalog_writer - .alter_schema_name(schema_id, &new_schema_name) + .alter_name( + alter_name_request::Object::SchemaId(schema_id), + &new_schema_name, + ) .await?; Ok(PgResponse::empty_result(StatementType::ALTER_SCHEMA)) @@ -324,7 +345,10 @@ pub async fn handle_rename_database( let catalog_writer = session.catalog_writer()?; catalog_writer - .alter_database_name(database_id, &new_database_name) + .alter_name( + alter_name_request::Object::DatabaseId(database_id), + &new_database_name, + ) .await?; Ok(PgResponse::empty_result(StatementType::ALTER_DATABASE)) diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 42eedf2122fc2..91e4e60c515d3 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -43,8 +43,8 @@ use risingwave_pb::catalog::{ use risingwave_pb::common::WorkerNode; use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ - alter_set_schema_request, create_connection_request, DdlProgress, PbTableJobType, - ReplaceTablePlan, TableJobType, + alter_name_request, alter_set_schema_request, create_connection_request, DdlProgress, + PbTableJobType, ReplaceTablePlan, TableJobType, }; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ @@ -564,11 +564,22 @@ impl CatalogWriter for MockCatalogWriter { Ok(()) } - async fn alter_table_name(&self, table_id: u32, table_name: &str) -> Result<()> { - self.catalog - .write() - .alter_table_name_by_id(&table_id.into(), table_name); - Ok(()) + async fn alter_name( + &self, + object_id: alter_name_request::Object, + object_name: &str, + ) -> Result<()> { + match object_id { + alter_name_request::Object::TableId(table_id) => { + self.catalog + .write() + .alter_table_name_by_id(&table_id.into(), object_name); + Ok(()) + } + _ => { + unimplemented!() + } + } } async fn alter_source(&self, source: PbSource) -> Result<()> { @@ -622,38 +633,6 @@ impl CatalogWriter for MockCatalogWriter { } } - async fn alter_view_name(&self, _view_id: u32, _view_name: &str) -> Result<()> { - unreachable!() - } - - async fn alter_index_name(&self, _index_id: u32, _index_name: &str) -> Result<()> { - unreachable!() - } - - async fn alter_sink_name(&self, _sink_id: u32, _sink_name: &str) -> Result<()> { - unreachable!() - } - - async fn alter_subscription_name( - &self, - _subscription_id: u32, - _subscription_name: &str, - ) -> Result<()> { - unreachable!() - } - - async fn alter_source_name(&self, _source_id: u32, _source_name: &str) -> Result<()> { - unreachable!() - } - - async fn alter_schema_name(&self, _schema_id: u32, _schema_name: &str) -> Result<()> { - unreachable!() - } - - async fn alter_database_name(&self, _database_id: u32, _database_name: &str) -> Result<()> { - unreachable!() - } - async fn alter_parallelism( &self, _table_id: u32,