From a07fb73c4b756436ec05342a6963794a344824b8 Mon Sep 17 00:00:00 2001 From: August Date: Fri, 3 Nov 2023 15:47:42 +0800 Subject: [PATCH] feat: notify user privilege change for drop ddl and add `create_source` function (#13230) --- src/meta/model_v2/src/source.rs | 25 +++++ src/meta/src/controller/catalog.rs | 164 ++++++++++++++++++++++++++--- src/meta/src/controller/user.rs | 64 +++++------ src/meta/src/controller/utils.rs | 20 +++- 4 files changed, 218 insertions(+), 55 deletions(-) diff --git a/src/meta/model_v2/src/source.rs b/src/meta/model_v2/src/source.rs index 620d002c27b55..d92f11b290456 100644 --- a/src/meta/model_v2/src/source.rs +++ b/src/meta/model_v2/src/source.rs @@ -12,7 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_pb::catalog::source::OptionalAssociatedTableId; +use risingwave_pb::catalog::PbSource; use sea_orm::entity::prelude::*; +use sea_orm::ActiveValue::Set; use crate::{ ColumnCatalogArray, ConnectionId, I32Array, Property, SourceId, StreamSourceInfo, TableId, @@ -78,3 +81,25 @@ impl Related for Entity { } impl ActiveModelBehavior for ActiveModel {} + +impl From for ActiveModel { + fn from(source: PbSource) -> Self { + let optional_associated_table_id = source.optional_associated_table_id.map(|x| match x { + OptionalAssociatedTableId::AssociatedTableId(id) => id, + }); + Self { + source_id: Set(source.id as _), + name: Set(source.name), + row_id_index: Set(source.row_id_index as _), + columns: Set(ColumnCatalogArray(source.columns)), + pk_column_ids: Set(I32Array(source.pk_column_ids)), + properties: Set(Property(source.properties)), + definition: Set(source.definition), + source_info: Set(source.info.map(StreamSourceInfo)), + watermark_descs: Set(WatermarkDescArray(source.watermark_descs)), + optional_associated_table_id: Set(optional_associated_table_id), + connection_id: Set(source.connection_id), + version: Set(source.version), + } + } +} diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 998af75bf3b3d..1836158e1793d 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -22,8 +22,8 @@ use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::prelude::*; use risingwave_meta_model_v2::{ connection, database, function, index, object, object_dependency, schema, sink, source, table, - view, ColumnCatalogArray, ConnectionId, DatabaseId, FunctionId, ObjectId, PrivateLinkService, - SchemaId, SourceId, TableId, UserId, + user_privilege, view, ColumnCatalogArray, ConnectionId, DatabaseId, FunctionId, ObjectId, + PrivateLinkService, SchemaId, SourceId, TableId, UserId, }; use risingwave_pb::catalog::{ PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, @@ -33,11 +33,11 @@ use risingwave_pb::meta::relation::PbRelationInfo; use risingwave_pb::meta::subscribe_response::{ Info as NotificationInfo, Operation as NotificationOperation, }; -use risingwave_pb::meta::{PbRelation, PbRelationGroup}; +use risingwave_pb::meta::{PbRelation, PbRelationGroup, PbTableFragments}; use sea_orm::ActiveValue::Set; use sea_orm::{ - ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait, - QueryFilter, QuerySelect, TransactionTrait, + ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait, JoinType, + QueryFilter, QuerySelect, RelationTrait, TransactionTrait, }; use tokio::sync::RwLock; @@ -46,10 +46,10 @@ use crate::controller::utils::{ check_connection_name_duplicate, check_function_signature_duplicate, check_relation_name_duplicate, check_schema_name_duplicate, ensure_object_id, ensure_object_not_refer, ensure_schema_empty, ensure_user_id, get_referring_objects, - get_referring_objects_cascade, PartialObject, + get_referring_objects_cascade, list_user_info_by_ids, PartialObject, }; use crate::controller::ObjectModel; -use crate::manager::{MetaSrvEnv, NotificationVersion}; +use crate::manager::{MetaSrvEnv, NotificationVersion, StreamingJob}; use crate::rpc::ddl_controller::DropMode; use crate::{MetaError, MetaResult}; @@ -213,14 +213,31 @@ impl CatalogController { .map(|conn| conn.info) .collect_vec(); + // Find affect users with privileges on the database and the objects in the database. + let to_update_user_ids: Vec = UserPrivilege::find() + .select_only() + .distinct() + .column(user_privilege::Column::UserId) + .join(JoinType::InnerJoin, user_privilege::Relation::Object.def()) + .filter( + object::Column::DatabaseId + .eq(database_id) + .or(user_privilege::Column::Oid.eq(database_id)), + ) + .into_tuple() + .all(&txn) + .await?; + // The schema and objects in the database will be delete cascade. let res = Object::delete_by_id(database_id).exec(&txn).await?; if res.rows_affected == 0 { return Err(MetaError::catalog_id_not_found("database", database_id)); } + let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?; txn.commit().await?; + self.notify_users_update(user_infos).await; let version = self .notify_frontend( NotificationOperation::Delete, @@ -276,25 +293,44 @@ impl CatalogController { drop_mode: DropMode, ) -> MetaResult { let inner = self.inner.write().await; + let txn = inner.db.begin().await?; let schema_obj = Object::find_by_id(schema_id) - .one(&inner.db) + .one(&txn) .await? .ok_or_else(|| MetaError::catalog_id_not_found("schema", schema_id))?; if drop_mode == DropMode::Restrict { - ensure_schema_empty(schema_id, &inner.db).await?; + ensure_schema_empty(schema_id, &txn).await?; } + // Find affect users with privileges on the schema and the objects in the schema. + let to_update_user_ids: Vec = UserPrivilege::find() + .select_only() + .distinct() + .column(user_privilege::Column::UserId) + .join(JoinType::InnerJoin, user_privilege::Relation::Object.def()) + .filter( + object::Column::SchemaId + .eq(schema_id) + .or(user_privilege::Column::Oid.eq(schema_id)), + ) + .into_tuple() + .all(&txn) + .await?; + let res = Object::delete(object::ActiveModel { oid: Set(schema_id), ..Default::default() }) - .exec(&inner.db) + .exec(&txn) .await?; if res.rows_affected == 0 { return Err(MetaError::catalog_id_not_found("schema", schema_id)); } + let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?; + + txn.commit().await?; - // todo: update user privileges accordingly. + self.notify_users_update(user_infos).await; let version = self .notify_frontend( NotificationOperation::Delete, @@ -308,6 +344,52 @@ impl CatalogController { Ok(version) } + pub fn create_stream_job( + &self, + _stream_job: &StreamingJob, + _table_fragments: &PbTableFragments, + _internal_tables: Vec, + ) -> MetaResult<()> { + todo!() + } + + pub async fn create_source(&self, mut pb_source: PbSource) -> MetaResult { + let inner = self.inner.write().await; + let owner_id = pb_source.owner; + let txn = inner.db.begin().await?; + ensure_user_id(owner_id, &txn).await?; + ensure_object_id(ObjectType::Database, pb_source.database_id, &txn).await?; + ensure_object_id(ObjectType::Schema, pb_source.schema_id, &txn).await?; + check_relation_name_duplicate( + &pb_source.name, + pb_source.database_id, + pb_source.schema_id, + &txn, + ) + .await?; + + let source_obj = Self::create_object( + &txn, + ObjectType::Source, + owner_id, + Some(pb_source.database_id), + Some(pb_source.schema_id), + ) + .await?; + pb_source.id = source_obj.oid; + let source: source::ActiveModel = pb_source.clone().into(); + source.insert(&txn).await?; + txn.commit().await?; + + let version = self + .notify_frontend_relation_info( + NotificationOperation::Add, + PbRelationInfo::Source(pb_source), + ) + .await; + Ok(version) + } + pub async fn create_function( &self, mut pb_function: PbFunction, @@ -344,17 +426,32 @@ impl CatalogController { pub async fn drop_function(&self, function_id: FunctionId) -> MetaResult { let inner = self.inner.write().await; + let txn = inner.db.begin().await?; let function_obj = Object::find_by_id(function_id) - .one(&inner.db) + .one(&txn) .await? .ok_or_else(|| MetaError::catalog_id_not_found("function", function_id))?; - ensure_object_not_refer(ObjectType::Function, function_id, &inner.db).await?; + ensure_object_not_refer(ObjectType::Function, function_id, &txn).await?; - let res = Object::delete_by_id(function_id).exec(&inner.db).await?; + // Find affect users with privileges on the function. + let to_update_user_ids: Vec = UserPrivilege::find() + .select_only() + .distinct() + .column(user_privilege::Column::UserId) + .filter(user_privilege::Column::Oid.eq(function_id)) + .into_tuple() + .all(&txn) + .await?; + + let res = Object::delete_by_id(function_id).exec(&txn).await?; if res.rows_affected == 0 { return Err(MetaError::catalog_id_not_found("function", function_id)); } + let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?; + txn.commit().await?; + + self.notify_users_update(user_infos).await; let version = self .notify_frontend( NotificationOperation::Delete, @@ -423,17 +520,32 @@ impl CatalogController { connection_id: ConnectionId, ) -> MetaResult { let inner = self.inner.write().await; + let txn = inner.db.begin().await?; let connection_obj = Object::find_by_id(connection_id) - .one(&inner.db) + .one(&txn) .await? .ok_or_else(|| MetaError::catalog_id_not_found("connection", connection_id))?; - ensure_object_not_refer(ObjectType::Connection, connection_id, &inner.db).await?; + ensure_object_not_refer(ObjectType::Connection, connection_id, &txn).await?; - let res = Object::delete_by_id(connection_id).exec(&inner.db).await?; + // Find affect users with privileges on the connection. + let to_update_user_ids: Vec = UserPrivilege::find() + .select_only() + .distinct() + .column(user_privilege::Column::UserId) + .filter(user_privilege::Column::Oid.eq(connection_id)) + .into_tuple() + .all(&txn) + .await?; + + let res = Object::delete_by_id(connection_id).exec(&txn).await?; if res.rows_affected == 0 { return Err(MetaError::catalog_id_not_found("connection", connection_id)); } + let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?; + + txn.commit().await?; + self.notify_users_update(user_infos).await; let version = self .notify_frontend( NotificationOperation::Delete, @@ -636,6 +748,16 @@ impl CatalogController { .await?; to_drop_objects.extend(to_drop_internal_table_objs); + // Find affect users with privileges on all this objects. + let to_update_user_ids: Vec = UserPrivilege::find() + .select_only() + .distinct() + .column(user_privilege::Column::UserId) + .filter(user_privilege::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid))) + .into_tuple() + .all(&txn) + .await?; + // delete all in to_drop_objects. let res = Object::delete_many() .filter(object::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid))) @@ -647,8 +769,12 @@ impl CatalogController { object_id, )); } + let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?; + + txn.commit().await?; // notify about them. + self.notify_users_update(user_infos).await; let relations = to_drop_objects .into_iter() .map(|obj| match obj.obj_type { @@ -894,6 +1020,10 @@ mod tests { let view = View::find().one(&mgr.inner.read().await.db).await?.unwrap(); mgr.drop_relation(ObjectType::View, view.view_id, DropMode::Cascade) .await?; + assert!(View::find_by_id(view.view_id) + .one(&mgr.inner.read().await.db) + .await? + .is_none()); Ok(()) } diff --git a/src/meta/src/controller/user.rs b/src/meta/src/controller/user.rs index 76e2d6225ca8d..8ae3e29373bb6 100644 --- a/src/meta/src/controller/user.rs +++ b/src/meta/src/controller/user.rs @@ -35,12 +35,25 @@ use crate::controller::catalog::CatalogController; use crate::controller::utils::{ check_user_name_duplicate, ensure_privileges_not_referred, ensure_user_id, extract_grant_obj_id, get_referring_privileges_cascade, get_user_privilege, - PartialUserPrivilege, + list_user_info_by_ids, PartialUserPrivilege, }; use crate::manager::NotificationVersion; use crate::{MetaError, MetaResult}; impl CatalogController { + pub(crate) async fn notify_users_update( + &self, + user_infos: Vec, + ) -> NotificationVersion { + let mut version = 0; + for info in user_infos { + version = self + .notify_frontend(NotificationOperation::Update, NotificationInfo::User(info)) + .await; + } + version + } + async fn create_user(&self, pb_user: PbUserInfo) -> MetaResult { let inner = self.inner.write().await; let txn = inner.db.begin().await?; @@ -282,25 +295,10 @@ impl CatalogController { .exec(&txn) .await?; } - - let mut user_infos = vec![]; - for user_id in user_ids { - let user = User::find_by_id(user_id) - .one(&txn) - .await? - .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?; - let mut user_info: PbUserInfo = user.into(); - user_info.grant_privileges = get_user_privilege(user_info.id, &txn).await?; - user_infos.push(user_info); - } + let user_infos = list_user_info_by_ids(user_ids, &txn).await?; txn.commit().await?; - let mut version = 0; - for info in user_infos { - version = self - .notify_frontend(NotificationOperation::Update, NotificationInfo::User(info)) - .await; - } + let version = self.notify_users_update(user_infos).await; Ok(version) } @@ -406,18 +404,24 @@ impl CatalogController { // check if the user granted any privileges to other users. let root_privilege_ids = root_user_privileges.iter().map(|ur| ur.id).collect_vec(); - let (all_privilege_ids, to_update_user_ids): (_, HashSet) = if !cascade { + let (all_privilege_ids, to_update_user_ids) = if !cascade { ensure_privileges_not_referred(root_privilege_ids.clone(), &txn).await?; ( root_privilege_ids.clone(), - root_user_privileges.iter().map(|ur| ur.user_id).collect(), + root_user_privileges + .iter() + .map(|ur| ur.user_id) + .collect_vec(), ) } else { let all_user_privileges = get_referring_privileges_cascade(root_privilege_ids.clone(), &txn).await?; ( all_user_privileges.iter().map(|ur| ur.id).collect_vec(), - all_user_privileges.iter().map(|ur| ur.user_id).collect(), + all_user_privileges + .iter() + .map(|ur| ur.user_id) + .collect_vec(), ) }; @@ -442,24 +446,10 @@ impl CatalogController { .await?; } - let mut user_infos = vec![]; - for user_id in to_update_user_ids { - let user = User::find_by_id(user_id) - .one(&txn) - .await? - .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?; - let mut user_info: PbUserInfo = user.into(); - user_info.grant_privileges = get_user_privilege(user_info.id, &txn).await?; - user_infos.push(user_info); - } + let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?; txn.commit().await?; - let mut version = 0; - for info in user_infos { - version = self - .notify_frontend(NotificationOperation::Update, NotificationInfo::User(info)) - .await; - } + let version = self.notify_users_update(user_infos).await; Ok(version) } } diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 9b5ee9e6a2c0b..5934888b00357 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -22,7 +22,7 @@ use risingwave_meta_model_v2::{ }; use risingwave_pb::catalog::{PbConnection, PbFunction}; use risingwave_pb::user::grant_privilege::{PbAction, PbActionWithGrantOption, PbObject}; -use risingwave_pb::user::PbGrantPrivilege; +use risingwave_pb::user::{PbGrantPrivilege, PbUserInfo}; use sea_orm::sea_query::{ Alias, CommonTableExpression, Expr, Query, QueryStatementBuilder, SelectStatement, UnionType, WithClause, @@ -373,6 +373,24 @@ where Ok(()) } +/// `list_user_info_by_ids` lists all users' info by their ids. +pub async fn list_user_info_by_ids(user_ids: Vec, db: &C) -> MetaResult> +where + C: ConnectionTrait, +{ + let mut user_infos = vec![]; + for user_id in user_ids { + let user = User::find_by_id(user_id) + .one(db) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?; + let mut user_info: PbUserInfo = user.into(); + user_info.grant_privileges = get_user_privilege(user_info.id, db).await?; + user_infos.push(user_info); + } + Ok(user_infos) +} + /// `construct_privilege_dependency_query` constructs a query to find all privileges that are dependent on the given one. /// /// # Examples