From 0c4f8c68c1529f485eb938b97617b9581c650f24 Mon Sep 17 00:00:00 2001 From: Rossil <40714231+Rossil2012@users.noreply.github.com> Date: Thu, 23 Nov 2023 17:01:05 +0800 Subject: [PATCH] fix: `ALTER xxx OWNER TO xxx` should check the `CREATE` privilege on the schema (#13593) --- src/frontend/src/handler/alter_owner.rs | 162 +++++++++++++++--------- src/frontend/src/handler/privilege.rs | 2 +- 2 files changed, 105 insertions(+), 59 deletions(-) diff --git a/src/frontend/src/handler/alter_owner.rs b/src/frontend/src/handler/alter_owner.rs index 4f4ce2550fc98..349dda495e907 100644 --- a/src/frontend/src/handler/alter_owner.rs +++ b/src/frontend/src/handler/alter_owner.rs @@ -12,16 +12,45 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use pgwire::pg_response::StatementType; +use risingwave_common::acl::AclMode; +use risingwave_common::error::ErrorCode::PermissionDenied; use risingwave_common::error::Result; use risingwave_pb::ddl_service::alter_owner_request::Object; +use risingwave_pb::user::grant_privilege; use risingwave_sqlparser::ast::{Ident, ObjectName}; use super::{HandlerArgs, RwPgResponse}; use crate::catalog::root_catalog::SchemaPath; use crate::catalog::{CatalogError, OwnedByUserCatalog}; +use crate::session::SessionImpl; +use crate::user::user_catalog::UserCatalog; use crate::Binder; +pub fn check_schema_create_privilege( + session: &Arc, + new_owner: &UserCatalog, + schema_id: u32, +) -> Result<()> { + if session.is_super_user() { + return Ok(()); + } + if !new_owner.is_super + && !new_owner.check_privilege( + &grant_privilege::Object::SchemaId(schema_id), + AclMode::Create, + ) + { + return Err(PermissionDenied( + "Require new owner to have create privilege on the object.".to_string(), + ) + .into()); + } + Ok(()) +} + pub async fn handle_alter_owner( handler_args: HandlerArgs, obj_name: ObjectName, @@ -37,71 +66,88 @@ pub async fn handle_alter_owner( let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); let new_owner_name = Binder::resolve_user_name(vec![new_owner_name].into())?; - let owner_id = session - .env() - .user_info_reader() - .read_guard() - .get_user_by_name(&new_owner_name) - .map(|u| u.id) - .ok_or(CatalogError::NotFound("user", new_owner_name))?; - - let object = { + let (object, owner_id) = { let catalog_reader = session.env().catalog_reader().read_guard(); - match stmt_type { - StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => { - let (table, schema_name) = - catalog_reader.get_table_by_name(db_name, schema_path, &real_obj_name)?; - session.check_privilege_for_drop_alter(schema_name, &**table)?; - if table.owner() == owner_id { - return Ok(RwPgResponse::empty_result(stmt_type)); + let user_reader = session.env().user_info_reader().read_guard(); + let new_owner = user_reader + .get_user_by_name(&new_owner_name) + .ok_or(CatalogError::NotFound("user", new_owner_name))?; + let owner_id = new_owner.id; + ( + match stmt_type { + StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => { + let (table, schema_name) = + catalog_reader.get_table_by_name(db_name, schema_path, &real_obj_name)?; + session.check_privilege_for_drop_alter(schema_name, &**table)?; + let schema_id = catalog_reader + .get_schema_by_name(db_name, schema_name)? + .id(); + check_schema_create_privilege(&session, new_owner, schema_id)?; + if table.owner() == owner_id { + return Ok(RwPgResponse::empty_result(stmt_type)); + } + Object::TableId(table.id.table_id) } - Object::TableId(table.id.table_id) - } - StatementType::ALTER_VIEW => { - let (view, schema_name) = - catalog_reader.get_view_by_name(db_name, schema_path, &real_obj_name)?; - session.check_privilege_for_drop_alter(schema_name, &**view)?; - if view.owner() == owner_id { - return Ok(RwPgResponse::empty_result(stmt_type)); + StatementType::ALTER_VIEW => { + let (view, schema_name) = + catalog_reader.get_view_by_name(db_name, schema_path, &real_obj_name)?; + session.check_privilege_for_drop_alter(schema_name, &**view)?; + let schema_id = catalog_reader + .get_schema_by_name(db_name, schema_name)? + .id(); + check_schema_create_privilege(&session, new_owner, schema_id)?; + if view.owner() == owner_id { + return Ok(RwPgResponse::empty_result(stmt_type)); + } + Object::ViewId(view.id) } - Object::ViewId(view.id) - } - StatementType::ALTER_SOURCE => { - let (source, schema_name) = - catalog_reader.get_source_by_name(db_name, schema_path, &real_obj_name)?; - session.check_privilege_for_drop_alter(schema_name, &**source)?; - if source.owner() == owner_id { - return Ok(RwPgResponse::empty_result(stmt_type)); + StatementType::ALTER_SOURCE => { + let (source, schema_name) = + catalog_reader.get_source_by_name(db_name, schema_path, &real_obj_name)?; + session.check_privilege_for_drop_alter(schema_name, &**source)?; + let schema_id = catalog_reader + .get_schema_by_name(db_name, schema_name)? + .id(); + check_schema_create_privilege(&session, new_owner, schema_id)?; + if source.owner() == owner_id { + return Ok(RwPgResponse::empty_result(stmt_type)); + } + Object::SourceId(source.id) } - Object::SourceId(source.id) - } - StatementType::ALTER_SINK => { - let (sink, schema_name) = - catalog_reader.get_sink_by_name(db_name, schema_path, &real_obj_name)?; - session.check_privilege_for_drop_alter(schema_name, &**sink)?; - if sink.owner() == owner_id { - return Ok(RwPgResponse::empty_result(stmt_type)); + StatementType::ALTER_SINK => { + let (sink, schema_name) = + catalog_reader.get_sink_by_name(db_name, schema_path, &real_obj_name)?; + session.check_privilege_for_drop_alter(schema_name, &**sink)?; + let schema_id = catalog_reader + .get_schema_by_name(db_name, schema_name)? + .id(); + check_schema_create_privilege(&session, new_owner, schema_id)?; + if sink.owner() == owner_id { + return Ok(RwPgResponse::empty_result(stmt_type)); + } + Object::SinkId(sink.id.sink_id) } - Object::SinkId(sink.id.sink_id) - } - StatementType::ALTER_DATABASE => { - let database = catalog_reader.get_database_by_name(&obj_name.real_value())?; - session.check_privilege_for_drop_alter_db_schema(database)?; - if database.owner() == owner_id { - return Ok(RwPgResponse::empty_result(stmt_type)); + StatementType::ALTER_DATABASE => { + let database = catalog_reader.get_database_by_name(&obj_name.real_value())?; + session.check_privilege_for_drop_alter_db_schema(database)?; + if database.owner() == owner_id { + return Ok(RwPgResponse::empty_result(stmt_type)); + } + Object::DatabaseId(database.id()) } - Object::DatabaseId(database.id()) - } - StatementType::ALTER_SCHEMA => { - let schema = catalog_reader.get_schema_by_name(db_name, &obj_name.real_value())?; - session.check_privilege_for_drop_alter_db_schema(schema)?; - if schema.owner() == owner_id { - return Ok(RwPgResponse::empty_result(stmt_type)); + StatementType::ALTER_SCHEMA => { + let schema = + catalog_reader.get_schema_by_name(db_name, &obj_name.real_value())?; + session.check_privilege_for_drop_alter_db_schema(schema)?; + if schema.owner() == owner_id { + return Ok(RwPgResponse::empty_result(stmt_type)); + } + Object::SchemaId(schema.id()) } - Object::SchemaId(schema.id()) - } - _ => unreachable!(), - } + _ => unreachable!(), + }, + owner_id, + ) }; let catalog_writer = session.catalog_writer()?; diff --git a/src/frontend/src/handler/privilege.rs b/src/frontend/src/handler/privilege.rs index 39984929a5dea..d89cad27b450c 100644 --- a/src/frontend/src/handler/privilege.rs +++ b/src/frontend/src/handler/privilege.rs @@ -157,7 +157,7 @@ impl SessionImpl { } /// Returns `true` if the user of the current session is a super user. - fn is_super_user(&self) -> bool { + pub fn is_super_user(&self) -> bool { let reader = self.env().user_info_reader().read_guard(); if let Some(info) = reader.get_user_by_name(self.user_name()) {