Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ALTER xxx OWNER TO xxx should check the CREATE privilege on the schema #13593

Merged
merged 6 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 102 additions & 58 deletions src/frontend/src/handler/alter_owner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,43 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use pgwire::pg_response::StatementType;
use risingwave_common::acl::AclMode;
use risingwave_common::error::ErrorCode::PermissionDenied;
use risingwave_common::error::Result;
use risingwave_pb::ddl_service::alter_owner_request::Object;
use risingwave_pb::user::grant_privilege;
use risingwave_sqlparser::ast::{Ident, ObjectName};

use super::{HandlerArgs, RwPgResponse};
use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::{CatalogError, OwnedByUserCatalog};
use crate::session::SessionImpl;
use crate::user::user_catalog::UserCatalog;
use crate::Binder;

pub fn check_schema_create_privilege(
session: &Arc<SessionImpl>,
new_owner: &UserCatalog,
schema_id: u32,
) -> Result<()> {
if !session.is_super_user()
|| (!new_owner.is_super
&& !new_owner.check_privilege(
&grant_privilege::Object::SchemaId(schema_id),
AclMode::Create,
))
{
return Err(PermissionDenied(
"Require new owner to have create privilege on the object.".to_string(),
)
.into());
}
Rossil2012 marked this conversation as resolved.
Show resolved Hide resolved
Ok(())
}

pub async fn handle_alter_owner(
handler_args: HandlerArgs,
obj_name: ObjectName,
Expand All @@ -37,71 +64,88 @@ pub async fn handle_alter_owner(
let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

let new_owner_name = Binder::resolve_user_name(vec![new_owner_name].into())?;
let owner_id = session
.env()
.user_info_reader()
.read_guard()
.get_user_by_name(&new_owner_name)
.map(|u| u.id)
.ok_or(CatalogError::NotFound("user", new_owner_name))?;

let object = {
let (object, owner_id) = {
let catalog_reader = session.env().catalog_reader().read_guard();
match stmt_type {
StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => {
let (table, schema_name) =
catalog_reader.get_table_by_name(db_name, schema_path, &real_obj_name)?;
session.check_privilege_for_drop_alter(schema_name, &**table)?;
if table.owner() == owner_id {
return Ok(RwPgResponse::empty_result(stmt_type));
let user_reader = session.env().user_info_reader().read_guard();
let new_owner = user_reader
.get_user_by_name(&new_owner_name)
.ok_or(CatalogError::NotFound("user", new_owner_name))?;
let owner_id = new_owner.id;
(
match stmt_type {
StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => {
let (table, schema_name) =
catalog_reader.get_table_by_name(db_name, schema_path, &real_obj_name)?;
session.check_privilege_for_drop_alter(schema_name, &**table)?;
let schema_id = catalog_reader
.get_schema_by_name(db_name, schema_name)?
.id();
check_schema_create_privilege(&session, new_owner, schema_id)?;
if table.owner() == owner_id {
return Ok(RwPgResponse::empty_result(stmt_type));
}
Object::TableId(table.id.table_id)
}
Object::TableId(table.id.table_id)
}
StatementType::ALTER_VIEW => {
let (view, schema_name) =
catalog_reader.get_view_by_name(db_name, schema_path, &real_obj_name)?;
session.check_privilege_for_drop_alter(schema_name, &**view)?;
if view.owner() == owner_id {
return Ok(RwPgResponse::empty_result(stmt_type));
StatementType::ALTER_VIEW => {
let (view, schema_name) =
catalog_reader.get_view_by_name(db_name, schema_path, &real_obj_name)?;
session.check_privilege_for_drop_alter(schema_name, &**view)?;
let schema_id = catalog_reader
.get_schema_by_name(db_name, schema_name)?
.id();
check_schema_create_privilege(&session, new_owner, schema_id)?;
if view.owner() == owner_id {
return Ok(RwPgResponse::empty_result(stmt_type));
}
Object::ViewId(view.id)
}
Object::ViewId(view.id)
}
StatementType::ALTER_SOURCE => {
let (source, schema_name) =
catalog_reader.get_source_by_name(db_name, schema_path, &real_obj_name)?;
session.check_privilege_for_drop_alter(schema_name, &**source)?;
if source.owner() == owner_id {
return Ok(RwPgResponse::empty_result(stmt_type));
StatementType::ALTER_SOURCE => {
let (source, schema_name) =
catalog_reader.get_source_by_name(db_name, schema_path, &real_obj_name)?;
session.check_privilege_for_drop_alter(schema_name, &**source)?;
let schema_id = catalog_reader
.get_schema_by_name(db_name, schema_name)?
.id();
check_schema_create_privilege(&session, new_owner, schema_id)?;
if source.owner() == owner_id {
return Ok(RwPgResponse::empty_result(stmt_type));
}
Object::SourceId(source.id)
}
Object::SourceId(source.id)
}
StatementType::ALTER_SINK => {
let (sink, schema_name) =
catalog_reader.get_sink_by_name(db_name, schema_path, &real_obj_name)?;
session.check_privilege_for_drop_alter(schema_name, &**sink)?;
if sink.owner() == owner_id {
return Ok(RwPgResponse::empty_result(stmt_type));
StatementType::ALTER_SINK => {
let (sink, schema_name) =
catalog_reader.get_sink_by_name(db_name, schema_path, &real_obj_name)?;
session.check_privilege_for_drop_alter(schema_name, &**sink)?;
let schema_id = catalog_reader
.get_schema_by_name(db_name, schema_name)?
.id();
check_schema_create_privilege(&session, new_owner, schema_id)?;
if sink.owner() == owner_id {
return Ok(RwPgResponse::empty_result(stmt_type));
}
Object::SinkId(sink.id.sink_id)
}
Object::SinkId(sink.id.sink_id)
}
StatementType::ALTER_DATABASE => {
let database = catalog_reader.get_database_by_name(&obj_name.real_value())?;
session.check_privilege_for_drop_alter_db_schema(database)?;
if database.owner() == owner_id {
return Ok(RwPgResponse::empty_result(stmt_type));
StatementType::ALTER_DATABASE => {
let database = catalog_reader.get_database_by_name(&obj_name.real_value())?;
session.check_privilege_for_drop_alter_db_schema(database)?;
if database.owner() == owner_id {
return Ok(RwPgResponse::empty_result(stmt_type));
}
Object::DatabaseId(database.id())
}
Object::DatabaseId(database.id())
}
StatementType::ALTER_SCHEMA => {
let schema = catalog_reader.get_schema_by_name(db_name, &obj_name.real_value())?;
session.check_privilege_for_drop_alter_db_schema(schema)?;
if schema.owner() == owner_id {
return Ok(RwPgResponse::empty_result(stmt_type));
StatementType::ALTER_SCHEMA => {
let schema =
catalog_reader.get_schema_by_name(db_name, &obj_name.real_value())?;
session.check_privilege_for_drop_alter_db_schema(schema)?;
if schema.owner() == owner_id {
return Ok(RwPgResponse::empty_result(stmt_type));
}
Object::SchemaId(schema.id())
}
Object::SchemaId(schema.id())
}
_ => unreachable!(),
}
_ => unreachable!(),
},
owner_id,
)
};

let catalog_writer = session.catalog_writer()?;
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/privilege.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl SessionImpl {
}

/// Returns `true` if the user of the current session is a super user.
fn is_super_user(&self) -> bool {
pub fn is_super_user(&self) -> bool {
let reader = self.env().user_info_reader().read_guard();

if let Some(info) = reader.get_user_by_name(self.user_name()) {
Expand Down
Loading