Skip to content

Commit

Permalink
fix: ALTER xxx OWNER TO xxx should check the CREATE privilege on …
Browse files Browse the repository at this point in the history
…the schema (#13593)
  • Loading branch information
Rossil2012 authored Nov 23, 2023
1 parent 5e47fd2 commit 0c4f8c6
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 59 deletions.
162 changes: 104 additions & 58 deletions src/frontend/src/handler/alter_owner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,45 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use pgwire::pg_response::StatementType;
use risingwave_common::acl::AclMode;
use risingwave_common::error::ErrorCode::PermissionDenied;
use risingwave_common::error::Result;
use risingwave_pb::ddl_service::alter_owner_request::Object;
use risingwave_pb::user::grant_privilege;
use risingwave_sqlparser::ast::{Ident, ObjectName};

use super::{HandlerArgs, RwPgResponse};
use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::{CatalogError, OwnedByUserCatalog};
use crate::session::SessionImpl;
use crate::user::user_catalog::UserCatalog;
use crate::Binder;

pub fn check_schema_create_privilege(
session: &Arc<SessionImpl>,
new_owner: &UserCatalog,
schema_id: u32,
) -> Result<()> {
if session.is_super_user() {
return Ok(());
}
if !new_owner.is_super
&& !new_owner.check_privilege(
&grant_privilege::Object::SchemaId(schema_id),
AclMode::Create,
)
{
return Err(PermissionDenied(
"Require new owner to have create privilege on the object.".to_string(),
)
.into());
}
Ok(())
}

pub async fn handle_alter_owner(
handler_args: HandlerArgs,
obj_name: ObjectName,
Expand All @@ -37,71 +66,88 @@ pub async fn handle_alter_owner(
let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

let new_owner_name = Binder::resolve_user_name(vec![new_owner_name].into())?;
let owner_id = session
.env()
.user_info_reader()
.read_guard()
.get_user_by_name(&new_owner_name)
.map(|u| u.id)
.ok_or(CatalogError::NotFound("user", new_owner_name))?;

let object = {
let (object, owner_id) = {
let catalog_reader = session.env().catalog_reader().read_guard();
match stmt_type {
StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => {
let (table, schema_name) =
catalog_reader.get_table_by_name(db_name, schema_path, &real_obj_name)?;
session.check_privilege_for_drop_alter(schema_name, &**table)?;
if table.owner() == owner_id {
return Ok(RwPgResponse::empty_result(stmt_type));
let user_reader = session.env().user_info_reader().read_guard();
let new_owner = user_reader
.get_user_by_name(&new_owner_name)
.ok_or(CatalogError::NotFound("user", new_owner_name))?;
let owner_id = new_owner.id;
(
match stmt_type {
StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => {
let (table, schema_name) =
catalog_reader.get_table_by_name(db_name, schema_path, &real_obj_name)?;
session.check_privilege_for_drop_alter(schema_name, &**table)?;
let schema_id = catalog_reader
.get_schema_by_name(db_name, schema_name)?
.id();
check_schema_create_privilege(&session, new_owner, schema_id)?;
if table.owner() == owner_id {
return Ok(RwPgResponse::empty_result(stmt_type));
}
Object::TableId(table.id.table_id)
}
Object::TableId(table.id.table_id)
}
StatementType::ALTER_VIEW => {
let (view, schema_name) =
catalog_reader.get_view_by_name(db_name, schema_path, &real_obj_name)?;
session.check_privilege_for_drop_alter(schema_name, &**view)?;
if view.owner() == owner_id {
return Ok(RwPgResponse::empty_result(stmt_type));
StatementType::ALTER_VIEW => {
let (view, schema_name) =
catalog_reader.get_view_by_name(db_name, schema_path, &real_obj_name)?;
session.check_privilege_for_drop_alter(schema_name, &**view)?;
let schema_id = catalog_reader
.get_schema_by_name(db_name, schema_name)?
.id();
check_schema_create_privilege(&session, new_owner, schema_id)?;
if view.owner() == owner_id {
return Ok(RwPgResponse::empty_result(stmt_type));
}
Object::ViewId(view.id)
}
Object::ViewId(view.id)
}
StatementType::ALTER_SOURCE => {
let (source, schema_name) =
catalog_reader.get_source_by_name(db_name, schema_path, &real_obj_name)?;
session.check_privilege_for_drop_alter(schema_name, &**source)?;
if source.owner() == owner_id {
return Ok(RwPgResponse::empty_result(stmt_type));
StatementType::ALTER_SOURCE => {
let (source, schema_name) =
catalog_reader.get_source_by_name(db_name, schema_path, &real_obj_name)?;
session.check_privilege_for_drop_alter(schema_name, &**source)?;
let schema_id = catalog_reader
.get_schema_by_name(db_name, schema_name)?
.id();
check_schema_create_privilege(&session, new_owner, schema_id)?;
if source.owner() == owner_id {
return Ok(RwPgResponse::empty_result(stmt_type));
}
Object::SourceId(source.id)
}
Object::SourceId(source.id)
}
StatementType::ALTER_SINK => {
let (sink, schema_name) =
catalog_reader.get_sink_by_name(db_name, schema_path, &real_obj_name)?;
session.check_privilege_for_drop_alter(schema_name, &**sink)?;
if sink.owner() == owner_id {
return Ok(RwPgResponse::empty_result(stmt_type));
StatementType::ALTER_SINK => {
let (sink, schema_name) =
catalog_reader.get_sink_by_name(db_name, schema_path, &real_obj_name)?;
session.check_privilege_for_drop_alter(schema_name, &**sink)?;
let schema_id = catalog_reader
.get_schema_by_name(db_name, schema_name)?
.id();
check_schema_create_privilege(&session, new_owner, schema_id)?;
if sink.owner() == owner_id {
return Ok(RwPgResponse::empty_result(stmt_type));
}
Object::SinkId(sink.id.sink_id)
}
Object::SinkId(sink.id.sink_id)
}
StatementType::ALTER_DATABASE => {
let database = catalog_reader.get_database_by_name(&obj_name.real_value())?;
session.check_privilege_for_drop_alter_db_schema(database)?;
if database.owner() == owner_id {
return Ok(RwPgResponse::empty_result(stmt_type));
StatementType::ALTER_DATABASE => {
let database = catalog_reader.get_database_by_name(&obj_name.real_value())?;
session.check_privilege_for_drop_alter_db_schema(database)?;
if database.owner() == owner_id {
return Ok(RwPgResponse::empty_result(stmt_type));
}
Object::DatabaseId(database.id())
}
Object::DatabaseId(database.id())
}
StatementType::ALTER_SCHEMA => {
let schema = catalog_reader.get_schema_by_name(db_name, &obj_name.real_value())?;
session.check_privilege_for_drop_alter_db_schema(schema)?;
if schema.owner() == owner_id {
return Ok(RwPgResponse::empty_result(stmt_type));
StatementType::ALTER_SCHEMA => {
let schema =
catalog_reader.get_schema_by_name(db_name, &obj_name.real_value())?;
session.check_privilege_for_drop_alter_db_schema(schema)?;
if schema.owner() == owner_id {
return Ok(RwPgResponse::empty_result(stmt_type));
}
Object::SchemaId(schema.id())
}
Object::SchemaId(schema.id())
}
_ => unreachable!(),
}
_ => unreachable!(),
},
owner_id,
)
};

let catalog_writer = session.catalog_writer()?;
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/privilege.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl SessionImpl {
}

/// Returns `true` if the user of the current session is a super user.
fn is_super_user(&self) -> bool {
pub fn is_super_user(&self) -> bool {
let reader = self.env().user_info_reader().read_guard();

if let Some(info) = reader.get_user_by_name(self.user_name()) {
Expand Down

0 comments on commit 0c4f8c6

Please sign in to comment.