diff --git a/src/common/src/acl/mod.rs b/src/common/src/acl/mod.rs index b1c41bc8ec6ad..b0bf939a2353c 100644 --- a/src/common/src/acl/mod.rs +++ b/src/common/src/acl/mod.rs @@ -104,7 +104,7 @@ pub static ALL_AVAILABLE_TABLE_MODES: LazyLock = LazyLock::new(|| make_bitflags!(AclMode::{Select | Insert | Update | Delete}).into()); pub static ALL_AVAILABLE_SOURCE_MODES: LazyLock = LazyLock::new(AclModeSet::readonly); pub static ALL_AVAILABLE_MVIEW_MODES: LazyLock = LazyLock::new(AclModeSet::readonly); -pub static ALL_AVAILABLE_SINK_MODES: LazyLock = LazyLock::new(AclModeSet::empty); +pub static ALL_AVAILABLE_SINK_MODES: LazyLock = LazyLock::new(AclModeSet::readonly); pub static ALL_AVAILABLE_SUBSCRIPTION_MODES: LazyLock = LazyLock::new(AclModeSet::empty); pub static ALL_AVAILABLE_FUNCTION_MODES: LazyLock = diff --git a/src/frontend/src/handler/handle_privilege.rs b/src/frontend/src/handler/handle_privilege.rs index 851797c7265b3..c70b638c13e2b 100644 --- a/src/frontend/src/handler/handle_privilege.rs +++ b/src/frontend/src/handler/handle_privilege.rs @@ -136,6 +136,20 @@ fn make_prost_privilege( grant_objs.push(PbObject::SourceId(source.id)); } } + GrantObjects::Sinks(sinks) => { + let db_name = session.database(); + let search_path = session.config().search_path(); + let user_name = &session.auth_context().user_name; + + for name in sinks { + let (schema_name, sink_name) = + Binder::resolve_schema_qualified_name(db_name, name)?; + let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); + + let (sink, _) = reader.get_sink_by_name(db_name, schema_path, &sink_name)?; + grant_objs.push(PbObject::SinkId(sink.id.sink_id)); + } + } GrantObjects::AllSourcesInSchema { schemas } => { for schema in schemas { let schema_name = Binder::resolve_schema_name(schema)?; diff --git a/src/meta/model/src/user_privilege.rs b/src/meta/model/src/user_privilege.rs index bebf8485f4643..a0743a1f0b56b 100644 --- a/src/meta/model/src/user_privilege.rs +++ b/src/meta/model/src/user_privilege.rs @@ -18,7 +18,9 @@ use serde::{Deserialize, Serialize}; use crate::{ObjectId, PrivilegeId, UserId}; -#[derive(Clone, Debug, Hash, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] +#[derive( + Clone, Copy, Debug, Hash, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize, +)] #[sea_orm(rs_type = "String", db_type = "string(None)")] pub enum Action { #[sea_orm(string_value = "INSERT")] diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 227807b679d94..2bdd59ea39124 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -64,8 +64,8 @@ use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use tracing::info; use super::utils::{ - check_subscription_name_duplicate, get_fragment_ids_by_jobs, rename_relation, - rename_relation_refer, + check_subscription_name_duplicate, get_fragment_ids_by_jobs, get_internal_tables_by_id, + rename_relation, rename_relation_refer, }; use crate::controller::utils::{ build_relation_group_for_delete, check_connection_name_duplicate, @@ -3493,13 +3493,7 @@ async fn update_internal_tables( new_value: Value, relations_to_notify: &mut Vec, ) -> MetaResult<()> { - let internal_tables: Vec = Table::find() - .select_only() - .column(table::Column::TableId) - .filter(table::Column::BelongsToJobId.eq(object_id)) - .into_tuple() - .all(txn) - .await?; + let internal_tables = get_internal_tables_by_id(object_id, txn).await?; if !internal_tables.is_empty() { Object::update_many() diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 665a68ab14d6d..8496904d44c0c 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -66,7 +66,7 @@ use crate::controller::rename::ReplaceTableExprRewriter; use crate::controller::utils::{ build_relation_group_for_delete, check_relation_name_duplicate, check_sink_into_table_cycle, ensure_object_id, ensure_user_id, get_fragment_actor_ids, get_fragment_mappings, - rebuild_fragment_mapping_from_actors, PartialObject, + get_internal_tables_by_id, rebuild_fragment_mapping_from_actors, PartialObject, }; use crate::controller::ObjectModel; use crate::manager::{NotificationVersion, StreamingJob}; @@ -521,13 +521,7 @@ impl CatalogController { } } - let internal_table_ids: Vec = Table::find() - .select_only() - .column(table::Column::TableId) - .filter(table::Column::BelongsToJobId.eq(job_id)) - .into_tuple() - .all(&txn) - .await?; + let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?; // Get the notification info if the job is a materialized view. let table_obj = Table::find_by_id(job_id).one(&txn).await?; diff --git a/src/meta/src/controller/user.rs b/src/meta/src/controller/user.rs index 3a8d728c84fd9..5e319d93133bc 100644 --- a/src/meta/src/controller/user.rs +++ b/src/meta/src/controller/user.rs @@ -34,8 +34,9 @@ use sea_orm::{ use crate::controller::catalog::CatalogController; use crate::controller::utils::{ check_user_name_duplicate, ensure_privileges_not_referred, ensure_user_id, - extract_grant_obj_id, get_object_owner, get_referring_privileges_cascade, get_user_privilege, - list_user_info_by_ids, PartialUserPrivilege, + extract_grant_obj_id, get_index_state_tables_by_table_id, get_internal_tables_by_id, + get_object_owner, get_referring_privileges_cascade, get_user_privilege, list_user_info_by_ids, + PartialUserPrivilege, }; use crate::manager::{NotificationVersion, IGNORED_NOTIFICATION_VERSION}; use crate::{MetaError, MetaResult}; @@ -230,14 +231,31 @@ impl CatalogController { let mut privileges = vec![]; for gp in new_grant_privileges { let id = extract_grant_obj_id(gp.get_object()?); + let internal_table_ids = get_internal_tables_by_id(id, &txn).await?; + let index_state_table_ids = get_index_state_tables_by_table_id(id, &txn).await?; for action_with_opt in &gp.action_with_opts { + let action = action_with_opt.get_action()?.into(); privileges.push(user_privilege::ActiveModel { oid: Set(id), granted_by: Set(grantor), - action: Set(action_with_opt.get_action()?.into()), + action: Set(action), with_grant_option: Set(action_with_opt.with_grant_option), ..Default::default() }); + if action == Action::Select { + privileges.extend( + internal_table_ids + .iter() + .chain(index_state_table_ids.iter()) + .map(|&tid| user_privilege::ActiveModel { + oid: Set(tid), + granted_by: Set(grantor), + action: Set(Action::Select), + with_grant_option: Set(action_with_opt.with_grant_option), + ..Default::default() + }), + ); + } } } @@ -254,7 +272,7 @@ impl CatalogController { let filter = user_privilege::Column::UserId .eq(grantor) .and(user_privilege::Column::Oid.eq(*privilege.oid.as_ref())) - .and(user_privilege::Column::Action.eq(privilege.action.as_ref().clone())) + .and(user_privilege::Column::Action.eq(*privilege.action.as_ref())) .and(user_privilege::Column::WithGrantOption.eq(true)); let privilege_id: Option = UserPrivilege::find() .select_only() @@ -351,12 +369,29 @@ impl CatalogController { let mut revoke_items = HashMap::new(); for privilege in revoke_grant_privileges { let obj = extract_grant_obj_id(privilege.get_object()?); + let internal_table_ids = get_internal_tables_by_id(obj, &txn).await?; + let index_state_table_ids = get_index_state_tables_by_table_id(obj, &txn).await?; + let mut include_select = false; let actions = privilege .action_with_opts .iter() - .map(|ao| Action::from(ao.get_action().unwrap())) + .map(|ao| { + let action = Action::from(ao.get_action().unwrap()); + if action == Action::Select { + include_select = true; + } + action + }) .collect_vec(); revoke_items.insert(obj, actions); + if include_select { + revoke_items.extend( + internal_table_ids + .iter() + .chain(index_state_table_ids.iter()) + .map(|&tid| (tid, vec![Action::Select])), + ); + } } let filter = if !revoke_user.is_super { diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 7a2158b8584d2..84d3d4da6736c 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -23,6 +23,7 @@ use risingwave_meta_model::actor::ActorStatus; use risingwave_meta_model::fragment::DistributionType; use risingwave_meta_model::object::ObjectType; use risingwave_meta_model::prelude::*; +use risingwave_meta_model::table::TableType; use risingwave_meta_model::{ actor, actor_dispatcher, connection, database, fragment, function, index, object, object_dependency, schema, secret, sink, source, subscription, table, user, user_privilege, @@ -735,6 +736,58 @@ pub fn construct_privilege_dependency_query(ids: Vec) -> WithQuery .to_owned() } +pub async fn get_internal_tables_by_id(job_id: ObjectId, db: &C) -> MetaResult> +where + C: ConnectionTrait, +{ + let table_ids: Vec = Table::find() + .select_only() + .column(table::Column::TableId) + .filter( + table::Column::TableType + .eq(TableType::Internal) + .and(table::Column::BelongsToJobId.eq(job_id)), + ) + .into_tuple() + .all(db) + .await?; + Ok(table_ids) +} + +pub async fn get_index_state_tables_by_table_id( + table_id: TableId, + db: &C, +) -> MetaResult> +where + C: ConnectionTrait, +{ + let mut index_table_ids: Vec = Index::find() + .select_only() + .column(index::Column::IndexTableId) + .filter(index::Column::PrimaryTableId.eq(table_id)) + .into_tuple() + .all(db) + .await?; + + if !index_table_ids.is_empty() { + let internal_table_ids: Vec = Table::find() + .select_only() + .column(table::Column::TableId) + .filter( + table::Column::TableType + .eq(TableType::Internal) + .and(table::Column::BelongsToJobId.is_in(index_table_ids.clone())), + ) + .into_tuple() + .all(db) + .await?; + + index_table_ids.extend(internal_table_ids.into_iter()); + } + + Ok(index_table_ids) +} + #[derive(Clone, DerivePartialModel, FromQueryResult)] #[sea_orm(entity = "UserPrivilege")] pub struct PartialUserPrivilege { diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index d5582f31a64de..7d01a2b35cf67 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -4999,6 +4999,7 @@ impl Parser<'_> { Keyword::SCHEMA, Keyword::TABLE, Keyword::SOURCE, + Keyword::SINK, ]); let objects = self.parse_comma_separated(Parser::parse_object_name); match object_type { @@ -5006,6 +5007,7 @@ impl Parser<'_> { Some(Keyword::SCHEMA) => GrantObjects::Schemas(objects?), Some(Keyword::SEQUENCE) => GrantObjects::Sequences(objects?), Some(Keyword::SOURCE) => GrantObjects::Sources(objects?), + Some(Keyword::SINK) => GrantObjects::Sinks(objects?), Some(Keyword::TABLE) | None => GrantObjects::Tables(objects?), _ => unreachable!(), }