Skip to content

Commit

Permalink
refactor: unify duplicated alter_*_name methods
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Sep 29, 2024
1 parent b4ac5ab commit fe849f7
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 140 deletions.
102 changes: 11 additions & 91 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ use risingwave_pb::catalog::{
PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
PbSubscription, PbTable, PbView,
};
use risingwave_pb::ddl_service::alter_owner_request::Object;
use risingwave_pb::ddl_service::{
alter_name_request, alter_set_schema_request, create_connection_request, PbReplaceTablePlan,
PbTableJobType, ReplaceTablePlan, TableJobType, WaitVersion,
alter_name_request, alter_owner_request, alter_set_schema_request, create_connection_request,
PbReplaceTablePlan, PbTableJobType, ReplaceTablePlan, TableJobType, WaitVersion,
};
use risingwave_pb::meta::PbTableParallelism;
use risingwave_pb::stream_plan::StreamFragmentGraph;
Expand Down Expand Up @@ -175,27 +174,13 @@ pub trait CatalogWriter: Send + Sync {

async fn drop_secret(&self, secret_id: SecretId) -> Result<()>;

async fn alter_table_name(&self, table_id: u32, table_name: &str) -> Result<()>;

async fn alter_view_name(&self, view_id: u32, view_name: &str) -> Result<()>;

async fn alter_index_name(&self, index_id: u32, index_name: &str) -> Result<()>;

async fn alter_sink_name(&self, sink_id: u32, sink_name: &str) -> Result<()>;

async fn alter_subscription_name(
async fn alter_name(
&self,
subscription_id: u32,
subscription_name: &str,
object_id: alter_name_request::Object,
object_name: &str,
) -> Result<()>;

async fn alter_source_name(&self, source_id: u32, source_name: &str) -> Result<()>;

async fn alter_schema_name(&self, schema_id: u32, schema_name: &str) -> Result<()>;

async fn alter_database_name(&self, database_id: u32, database_name: &str) -> Result<()>;

async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()>;
async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()>;

/// Replace the source in the catalog.
async fn alter_source(&self, source: PbSource) -> Result<()>;
Expand Down Expand Up @@ -469,81 +454,16 @@ impl CatalogWriter for CatalogWriterImpl {
self.wait_version(version).await
}

async fn alter_table_name(&self, table_id: u32, table_name: &str) -> Result<()> {
let version = self
.meta_client
.alter_name(alter_name_request::Object::TableId(table_id), table_name)
.await?;
self.wait_version(version).await
}

async fn alter_view_name(&self, view_id: u32, view_name: &str) -> Result<()> {
let version = self
.meta_client
.alter_name(alter_name_request::Object::ViewId(view_id), view_name)
.await?;
self.wait_version(version).await
}

async fn alter_index_name(&self, index_id: u32, index_name: &str) -> Result<()> {
let version = self
.meta_client
.alter_name(alter_name_request::Object::IndexId(index_id), index_name)
.await?;
self.wait_version(version).await
}

async fn alter_sink_name(&self, sink_id: u32, sink_name: &str) -> Result<()> {
let version = self
.meta_client
.alter_name(alter_name_request::Object::SinkId(sink_id), sink_name)
.await?;
self.wait_version(version).await
}

async fn alter_subscription_name(
async fn alter_name(
&self,
subscription_id: u32,
subscription_name: &str,
object_id: alter_name_request::Object,
object_name: &str,
) -> Result<()> {
let version = self
.meta_client
.alter_name(
alter_name_request::Object::SubscriptionId(subscription_id),
subscription_name,
)
.await?;
self.wait_version(version).await
}

async fn alter_source_name(&self, source_id: u32, source_name: &str) -> Result<()> {
let version = self
.meta_client
.alter_name(alter_name_request::Object::SourceId(source_id), source_name)
.await?;
self.wait_version(version).await
}

async fn alter_schema_name(&self, schema_id: u32, schema_name: &str) -> Result<()> {
let version = self
.meta_client
.alter_name(alter_name_request::Object::SchemaId(schema_id), schema_name)
.await?;
self.wait_version(version).await
}

async fn alter_database_name(&self, database_id: u32, database_name: &str) -> Result<()> {
let version = self
.meta_client
.alter_name(
alter_name_request::Object::DatabaseId(database_id),
database_name,
)
.await?;
let version = self.meta_client.alter_name(object_id, object_name).await?;
self.wait_version(version).await
}

async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()> {
async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()> {
let version = self.meta_client.alter_owner(object, owner_id).await?;
self.wait_version(version).await
}
Expand Down
44 changes: 34 additions & 10 deletions src/frontend/src/handler/alter_rename.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::acl::AclMode;
use risingwave_common::catalog::is_system_schema;
use risingwave_pb::user::grant_privilege::Object;
use risingwave_pb::ddl_service::alter_name_request;
use risingwave_pb::user::grant_privilege;
use risingwave_sqlparser::ast::ObjectName;

use super::{HandlerArgs, RwPgResponse};
Expand Down Expand Up @@ -58,7 +59,10 @@ pub async fn handle_rename_table(

let catalog_writer = session.catalog_writer()?;
catalog_writer
.alter_table_name(table_id.table_id, &new_table_name)
.alter_name(
alter_name_request::Object::TableId(table_id.table_id),
&new_table_name,
)
.await?;

let stmt_type = match table_type {
Expand Down Expand Up @@ -94,7 +98,10 @@ pub async fn handle_rename_index(

let catalog_writer = session.catalog_writer()?;
catalog_writer
.alter_index_name(index_id.index_id, &new_index_name)
.alter_name(
alter_name_request::Object::IndexId(index_id.index_id),
&new_index_name,
)
.await?;

Ok(PgResponse::empty_result(StatementType::ALTER_INDEX))
Expand Down Expand Up @@ -124,7 +131,7 @@ pub async fn handle_rename_view(

let catalog_writer = session.catalog_writer()?;
catalog_writer
.alter_view_name(view_id, &new_view_name)
.alter_name(alter_name_request::Object::ViewId(view_id), &new_view_name)
.await?;

Ok(PgResponse::empty_result(StatementType::ALTER_VIEW))
Expand Down Expand Up @@ -154,7 +161,10 @@ pub async fn handle_rename_sink(

let catalog_writer = session.catalog_writer()?;
catalog_writer
.alter_sink_name(sink_id.sink_id, &new_sink_name)
.alter_name(
alter_name_request::Object::SinkId(sink_id.sink_id),
&new_sink_name,
)
.await?;

Ok(PgResponse::empty_result(StatementType::ALTER_SINK))
Expand Down Expand Up @@ -185,7 +195,10 @@ pub async fn handle_rename_subscription(

let catalog_writer = session.catalog_writer()?;
catalog_writer
.alter_subscription_name(subscription_id.subscription_id, &new_subscription_name)
.alter_name(
alter_name_request::Object::SubscriptionId(subscription_id.subscription_id),
&new_subscription_name,
)
.await?;

Ok(PgResponse::empty_result(StatementType::ALTER_SUBSCRIPTION))
Expand Down Expand Up @@ -225,7 +238,10 @@ pub async fn handle_rename_source(

let catalog_writer = session.catalog_writer()?;
catalog_writer
.alter_source_name(source_id, &new_source_name)
.alter_name(
alter_name_request::Object::SourceId(source_id),
&new_source_name,
)
.await?;

Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE))
Expand Down Expand Up @@ -260,7 +276,9 @@ pub async fn handle_rename_schema(

// To rename a schema you must also have the CREATE privilege for the database.
if let Some(user) = user_reader.get_user_by_name(session.user_name()) {
if !user.is_super && !user.check_privilege(&Object::DatabaseId(db_id), AclMode::Create)
if !user.is_super
&& !user
.check_privilege(&grant_privilege::Object::DatabaseId(db_id), AclMode::Create)
{
return Err(ErrorCode::PermissionDenied(
"Do not have create privilege on the current database".to_string(),
Expand All @@ -276,7 +294,10 @@ pub async fn handle_rename_schema(

let catalog_writer = session.catalog_writer()?;
catalog_writer
.alter_schema_name(schema_id, &new_schema_name)
.alter_name(
alter_name_request::Object::SchemaId(schema_id),
&new_schema_name,
)
.await?;

Ok(PgResponse::empty_result(StatementType::ALTER_SCHEMA))
Expand Down Expand Up @@ -324,7 +345,10 @@ pub async fn handle_rename_database(

let catalog_writer = session.catalog_writer()?;
catalog_writer
.alter_database_name(database_id, &new_database_name)
.alter_name(
alter_name_request::Object::DatabaseId(database_id),
&new_database_name,
)
.await?;

Ok(PgResponse::empty_result(StatementType::ALTER_DATABASE))
Expand Down
57 changes: 18 additions & 39 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ use risingwave_pb::catalog::{
use risingwave_pb::common::WorkerNode;
use risingwave_pb::ddl_service::alter_owner_request::Object;
use risingwave_pb::ddl_service::{
alter_set_schema_request, create_connection_request, DdlProgress, PbTableJobType,
ReplaceTablePlan, TableJobType,
alter_name_request, alter_set_schema_request, create_connection_request, DdlProgress,
PbTableJobType, ReplaceTablePlan, TableJobType,
};
use risingwave_pb::hummock::write_limits::WriteLimit;
use risingwave_pb::hummock::{
Expand Down Expand Up @@ -564,11 +564,22 @@ impl CatalogWriter for MockCatalogWriter {
Ok(())
}

async fn alter_table_name(&self, table_id: u32, table_name: &str) -> Result<()> {
self.catalog
.write()
.alter_table_name_by_id(&table_id.into(), table_name);
Ok(())
async fn alter_name(
&self,
object_id: alter_name_request::Object,
object_name: &str,
) -> Result<()> {
match object_id {
alter_name_request::Object::TableId(table_id) => {
self.catalog
.write()
.alter_table_name_by_id(&table_id.into(), object_name);
Ok(())
}
_ => {
unimplemented!()
}
}
}

async fn alter_source(&self, source: PbSource) -> Result<()> {
Expand Down Expand Up @@ -622,38 +633,6 @@ impl CatalogWriter for MockCatalogWriter {
}
}

async fn alter_view_name(&self, _view_id: u32, _view_name: &str) -> Result<()> {
unreachable!()
}

async fn alter_index_name(&self, _index_id: u32, _index_name: &str) -> Result<()> {
unreachable!()
}

async fn alter_sink_name(&self, _sink_id: u32, _sink_name: &str) -> Result<()> {
unreachable!()
}

async fn alter_subscription_name(
&self,
_subscription_id: u32,
_subscription_name: &str,
) -> Result<()> {
unreachable!()
}

async fn alter_source_name(&self, _source_id: u32, _source_name: &str) -> Result<()> {
unreachable!()
}

async fn alter_schema_name(&self, _schema_id: u32, _schema_name: &str) -> Result<()> {
unreachable!()
}

async fn alter_database_name(&self, _database_id: u32, _database_name: &str) -> Result<()> {
unreachable!()
}

async fn alter_parallelism(
&self,
_table_id: u32,
Expand Down

0 comments on commit fe849f7

Please sign in to comment.