diff --git a/src/common/src/acl/mod.rs b/src/common/src/acl/mod.rs index b1c41bc8ec6a..b0bf939a2353 100644 --- a/src/common/src/acl/mod.rs +++ b/src/common/src/acl/mod.rs @@ -104,7 +104,7 @@ pub static ALL_AVAILABLE_TABLE_MODES: LazyLock = LazyLock::new(|| make_bitflags!(AclMode::{Select | Insert | Update | Delete}).into()); pub static ALL_AVAILABLE_SOURCE_MODES: LazyLock = LazyLock::new(AclModeSet::readonly); pub static ALL_AVAILABLE_MVIEW_MODES: LazyLock = LazyLock::new(AclModeSet::readonly); -pub static ALL_AVAILABLE_SINK_MODES: LazyLock = LazyLock::new(AclModeSet::empty); +pub static ALL_AVAILABLE_SINK_MODES: LazyLock = LazyLock::new(AclModeSet::readonly); pub static ALL_AVAILABLE_SUBSCRIPTION_MODES: LazyLock = LazyLock::new(AclModeSet::empty); pub static ALL_AVAILABLE_FUNCTION_MODES: LazyLock = diff --git a/src/frontend/src/handler/handle_privilege.rs b/src/frontend/src/handler/handle_privilege.rs index 851797c7265b..c70b638c13e2 100644 --- a/src/frontend/src/handler/handle_privilege.rs +++ b/src/frontend/src/handler/handle_privilege.rs @@ -136,6 +136,20 @@ fn make_prost_privilege( grant_objs.push(PbObject::SourceId(source.id)); } } + GrantObjects::Sinks(sinks) => { + let db_name = session.database(); + let search_path = session.config().search_path(); + let user_name = &session.auth_context().user_name; + + for name in sinks { + let (schema_name, sink_name) = + Binder::resolve_schema_qualified_name(db_name, name)?; + let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); + + let (sink, _) = reader.get_sink_by_name(db_name, schema_path, &sink_name)?; + grant_objs.push(PbObject::SinkId(sink.id.sink_id)); + } + } GrantObjects::AllSourcesInSchema { schemas } => { for schema in schemas { let schema_name = Binder::resolve_schema_name(schema)?; diff --git a/src/meta/src/controller/user.rs b/src/meta/src/controller/user.rs index 5b539b09e1c2..5e319d93133b 100644 --- a/src/meta/src/controller/user.rs +++ b/src/meta/src/controller/user.rs @@ -34,8 +34,8 @@ use sea_orm::{ use crate::controller::catalog::CatalogController; use crate::controller::utils::{ check_user_name_duplicate, ensure_privileges_not_referred, ensure_user_id, - extract_grant_obj_id, get_internal_tables_by_id, get_object_owner, - get_referring_privileges_cascade, get_user_privilege, list_user_info_by_ids, + extract_grant_obj_id, get_index_state_tables_by_table_id, get_internal_tables_by_id, + get_object_owner, get_referring_privileges_cascade, get_user_privilege, list_user_info_by_ids, PartialUserPrivilege, }; use crate::manager::{NotificationVersion, IGNORED_NOTIFICATION_VERSION}; @@ -232,6 +232,7 @@ impl CatalogController { for gp in new_grant_privileges { let id = extract_grant_obj_id(gp.get_object()?); let internal_table_ids = get_internal_tables_by_id(id, &txn).await?; + let index_state_table_ids = get_index_state_tables_by_table_id(id, &txn).await?; for action_with_opt in &gp.action_with_opts { let action = action_with_opt.get_action()?.into(); privileges.push(user_privilege::ActiveModel { @@ -242,15 +243,18 @@ impl CatalogController { ..Default::default() }); if action == Action::Select { - privileges.extend(internal_table_ids.iter().map(|&tid| { - user_privilege::ActiveModel { - oid: Set(tid), - granted_by: Set(grantor), - action: Set(Action::Select), - with_grant_option: Set(action_with_opt.with_grant_option), - ..Default::default() - } - })); + privileges.extend( + internal_table_ids + .iter() + .chain(index_state_table_ids.iter()) + .map(|&tid| user_privilege::ActiveModel { + oid: Set(tid), + granted_by: Set(grantor), + action: Set(Action::Select), + with_grant_option: Set(action_with_opt.with_grant_option), + ..Default::default() + }), + ); } } } @@ -366,6 +370,7 @@ impl CatalogController { for privilege in revoke_grant_privileges { let obj = extract_grant_obj_id(privilege.get_object()?); let internal_table_ids = get_internal_tables_by_id(obj, &txn).await?; + let index_state_table_ids = get_index_state_tables_by_table_id(obj, &txn).await?; let mut include_select = false; let actions = privilege .action_with_opts @@ -383,6 +388,7 @@ impl CatalogController { revoke_items.extend( internal_table_ids .iter() + .chain(index_state_table_ids.iter()) .map(|&tid| (tid, vec![Action::Select])), ); } diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index b76f8af38ebb..84d3d4da6736 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -754,6 +754,40 @@ where Ok(table_ids) } +pub async fn get_index_state_tables_by_table_id( + table_id: TableId, + db: &C, +) -> MetaResult> +where + C: ConnectionTrait, +{ + let mut index_table_ids: Vec = Index::find() + .select_only() + .column(index::Column::IndexTableId) + .filter(index::Column::PrimaryTableId.eq(table_id)) + .into_tuple() + .all(db) + .await?; + + if !index_table_ids.is_empty() { + let internal_table_ids: Vec = Table::find() + .select_only() + .column(table::Column::TableId) + .filter( + table::Column::TableType + .eq(TableType::Internal) + .and(table::Column::BelongsToJobId.is_in(index_table_ids.clone())), + ) + .into_tuple() + .all(db) + .await?; + + index_table_ids.extend(internal_table_ids.into_iter()); + } + + Ok(index_table_ids) +} + #[derive(Clone, DerivePartialModel, FromQueryResult)] #[sea_orm(entity = "UserPrivilege")] pub struct PartialUserPrivilege { diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index d5582f31a64d..7d01a2b35cf6 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -4999,6 +4999,7 @@ impl Parser<'_> { Keyword::SCHEMA, Keyword::TABLE, Keyword::SOURCE, + Keyword::SINK, ]); let objects = self.parse_comma_separated(Parser::parse_object_name); match object_type { @@ -5006,6 +5007,7 @@ impl Parser<'_> { Some(Keyword::SCHEMA) => GrantObjects::Schemas(objects?), Some(Keyword::SEQUENCE) => GrantObjects::Sequences(objects?), Some(Keyword::SOURCE) => GrantObjects::Sources(objects?), + Some(Keyword::SINK) => GrantObjects::Sinks(objects?), Some(Keyword::TABLE) | None => GrantObjects::Tables(objects?), _ => unreachable!(), }