diff --git a/src/meta/model_v2/migration/src/m20230908_072257_init.rs b/src/meta/model_v2/migration/src/m20230908_072257_init.rs index e2aaa0da59d51..9d14f8bac7201 100644 --- a/src/meta/model_v2/migration/src/m20230908_072257_init.rs +++ b/src/meta/model_v2/migration/src/m20230908_072257_init.rs @@ -124,13 +124,12 @@ impl MigrationTrait for Migration { .primary_key() .auto_increment(), ) - .col(ColumnDef::new(User::Name).string().not_null()) + .col(ColumnDef::new(User::Name).string().unique_key().not_null()) .col(ColumnDef::new(User::IsSuper).boolean().not_null()) .col(ColumnDef::new(User::CanCreateDb).boolean().not_null()) .col(ColumnDef::new(User::CanCreateUser).boolean().not_null()) .col(ColumnDef::new(User::CanLogin).boolean().not_null()) - .col(ColumnDef::new(User::AuthType).string()) - .col(ColumnDef::new(User::AuthValue).string()) + .col(ColumnDef::new(User::AuthInfo).json()) .to_owned(), ) .await?; @@ -197,6 +196,7 @@ impl MigrationTrait for Migration { .primary_key() .auto_increment(), ) + .col(ColumnDef::new(UserPrivilege::DependentId).integer()) .col(ColumnDef::new(UserPrivilege::UserId).integer().not_null()) .col(ColumnDef::new(UserPrivilege::Oid).integer().not_null()) .col( @@ -204,12 +204,20 @@ impl MigrationTrait for Migration { .integer() .not_null(), ) - .col(ColumnDef::new(UserPrivilege::Actions).string().not_null()) + .col(ColumnDef::new(UserPrivilege::Action).string().not_null()) .col( ColumnDef::new(UserPrivilege::WithGrantOption) .boolean() .not_null(), ) + .foreign_key( + &mut ForeignKey::create() + .name("FK_user_privilege_dependent_id") + .from(UserPrivilege::Table, UserPrivilege::DependentId) + .to(UserPrivilege::Table, UserPrivilege::Id) + .on_delete(ForeignKeyAction::Cascade) + .to_owned(), + ) .foreign_key( &mut ForeignKey::create() .name("FK_user_privilege_user_id") @@ -230,6 +238,7 @@ impl MigrationTrait for Migration { .name("FK_user_privilege_oid") .from(UserPrivilege::Table, UserPrivilege::Oid) .to(Object::Table, Object::Oid) + .on_delete(ForeignKeyAction::Cascade) .to_owned(), ) .to_owned(), @@ -651,6 +660,19 @@ impl MigrationTrait for Migration { .to_owned(), ) .await?; + manager + .create_index( + MigrationIndex::create() + .table(UserPrivilege::Table) + .name("idx_user_privilege_item") + .unique() + .col(UserPrivilege::UserId) + .col(UserPrivilege::Oid) + .col(UserPrivilege::Action) + .col(UserPrivilege::GrantedBy) + .to_owned(), + ) + .await?; // 4. initialize data. let insert_cluster_id = Query::insert() @@ -799,18 +821,18 @@ enum User { CanCreateDb, CanCreateUser, CanLogin, - AuthType, - AuthValue, + AuthInfo, } #[derive(DeriveIden)] enum UserPrivilege { Table, Id, + DependentId, UserId, Oid, GrantedBy, - Actions, + Action, WithGrantOption, } diff --git a/src/meta/model_v2/src/connection.rs b/src/meta/model_v2/src/connection.rs index 8cff6b2a6025b..a7a7e1ea33952 100644 --- a/src/meta/model_v2/src/connection.rs +++ b/src/meta/model_v2/src/connection.rs @@ -15,7 +15,7 @@ use risingwave_pb::catalog::connection::PbInfo; use risingwave_pb::catalog::PbConnection; use sea_orm::entity::prelude::*; -use sea_orm::ActiveValue; +use sea_orm::ActiveValue::Set; use crate::{ConnectionId, PrivateLinkService}; @@ -71,9 +71,9 @@ impl From for ActiveModel { }; Self { - connection_id: ActiveValue::Set(conn.id as _), - name: ActiveValue::Set(conn.name), - info: ActiveValue::Set(PrivateLinkService(private_link_srv)), + connection_id: Set(conn.id as _), + name: Set(conn.name), + info: Set(PrivateLinkService(private_link_srv)), } } } diff --git a/src/meta/model_v2/src/database.rs b/src/meta/model_v2/src/database.rs index 95ff3a8aee8e6..25f164c45805e 100644 --- a/src/meta/model_v2/src/database.rs +++ b/src/meta/model_v2/src/database.rs @@ -14,7 +14,7 @@ use risingwave_pb::catalog::PbDatabase; use sea_orm::entity::prelude::*; -use sea_orm::ActiveValue; +use sea_orm::ActiveValue::Set; use crate::DatabaseId; @@ -50,8 +50,8 @@ impl ActiveModelBehavior for ActiveModel {} impl From for ActiveModel { fn from(db: PbDatabase) -> Self { Self { - database_id: ActiveValue::Set(db.id), - name: ActiveValue::Set(db.name), + database_id: Set(db.id), + name: Set(db.name), } } } diff --git a/src/meta/model_v2/src/function.rs b/src/meta/model_v2/src/function.rs index 4126dddc0f5ee..c4774b177eabc 100644 --- a/src/meta/model_v2/src/function.rs +++ b/src/meta/model_v2/src/function.rs @@ -15,7 +15,7 @@ use risingwave_pb::catalog::function::Kind; use risingwave_pb::catalog::PbFunction; use sea_orm::entity::prelude::*; -use sea_orm::ActiveValue; +use sea_orm::ActiveValue::Set; use crate::{DataType, DataTypeArray, FunctionId}; @@ -77,14 +77,14 @@ impl From for FunctionKind { impl From for ActiveModel { fn from(function: PbFunction) -> Self { Self { - function_id: ActiveValue::Set(function.id as _), - name: ActiveValue::Set(function.name), - arg_types: ActiveValue::Set(DataTypeArray(function.arg_types)), - return_type: ActiveValue::Set(DataType(function.return_type.unwrap())), - language: ActiveValue::Set(function.language), - link: ActiveValue::Set(function.link), - identifier: ActiveValue::Set(function.identifier), - kind: ActiveValue::Set(function.kind.unwrap().into()), + function_id: Set(function.id as _), + name: Set(function.name), + arg_types: Set(DataTypeArray(function.arg_types)), + return_type: Set(DataType(function.return_type.unwrap())), + language: Set(function.language), + link: Set(function.link), + identifier: Set(function.identifier), + kind: Set(function.kind.unwrap().into()), } } } diff --git a/src/meta/model_v2/src/lib.rs b/src/meta/model_v2/src/lib.rs index ba2e114d5060d..0d0e373076703 100644 --- a/src/meta/model_v2/src/lib.rs +++ b/src/meta/model_v2/src/lib.rs @@ -61,6 +61,8 @@ pub type ViewId = ObjectId; pub type FunctionId = ObjectId; pub type ConnectionId = ObjectId; pub type UserId = u32; +pub type PrivilegeId = u32; + pub type HummockVersionId = u64; pub type Epoch = u64; pub type CompactionGroupId = u64; @@ -157,6 +159,7 @@ derive_from_json_struct!( PrivateLinkService, risingwave_pb::catalog::connection::PbPrivateLinkService ); +derive_from_json_struct!(AuthInfo, risingwave_pb::user::PbAuthInfo); derive_from_json_struct!(StreamNode, risingwave_pb::stream_plan::PbStreamNode); derive_from_json_struct!(Dispatchers, Vec); diff --git a/src/meta/model_v2/src/schema.rs b/src/meta/model_v2/src/schema.rs index 0af2d7fc020c9..6417c974d29fc 100644 --- a/src/meta/model_v2/src/schema.rs +++ b/src/meta/model_v2/src/schema.rs @@ -14,7 +14,7 @@ use risingwave_pb::catalog::PbSchema; use sea_orm::entity::prelude::*; -use sea_orm::ActiveValue; +use sea_orm::ActiveValue::Set; use crate::SchemaId; @@ -49,8 +49,8 @@ impl ActiveModelBehavior for ActiveModel {} impl From for ActiveModel { fn from(schema: PbSchema) -> Self { Self { - schema_id: ActiveValue::Set(schema.id), - name: ActiveValue::Set(schema.name), + schema_id: Set(schema.id), + name: Set(schema.name), } } } diff --git a/src/meta/model_v2/src/user.rs b/src/meta/model_v2/src/user.rs index e9cd36f75fb43..716e8aa309f4d 100644 --- a/src/meta/model_v2/src/user.rs +++ b/src/meta/model_v2/src/user.rs @@ -12,22 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_pb::user::PbUserInfo; use sea_orm::entity::prelude::*; +use sea_orm::ActiveValue::Set; +use sea_orm::NotSet; -use crate::UserId; +use crate::{AuthInfo, UserId}; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "user")] pub struct Model { #[sea_orm(primary_key)] pub user_id: UserId, + #[sea_orm(unique)] pub name: String, pub is_super: bool, pub can_create_db: bool, pub can_create_user: bool, pub can_login: bool, - pub auth_type: Option, - pub auth_value: Option, + pub auth_info: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -43,3 +46,33 @@ impl Related for Entity { } impl ActiveModelBehavior for ActiveModel {} + +impl From for ActiveModel { + fn from(user: PbUserInfo) -> Self { + let user_id = if user.id == 0 { NotSet } else { Set(user.id) }; + Self { + user_id, + name: Set(user.name), + is_super: Set(user.is_super), + can_create_db: Set(user.can_create_db), + can_create_user: Set(user.can_create_user), + can_login: Set(user.can_login), + auth_info: Set(user.auth_info.map(AuthInfo)), + } + } +} + +impl From for PbUserInfo { + fn from(val: Model) -> Self { + PbUserInfo { + id: val.user_id, + name: val.name, + is_super: val.is_super, + can_create_db: val.can_create_db, + can_create_user: val.can_create_user, + can_login: val.can_login, + auth_info: val.auth_info.map(|x| x.into_inner()), + grant_privileges: vec![], // fill in later + } + } +} diff --git a/src/meta/model_v2/src/user_privilege.rs b/src/meta/model_v2/src/user_privilege.rs index 7e12af225ed02..f77c146cfa66b 100644 --- a/src/meta/model_v2/src/user_privilege.rs +++ b/src/meta/model_v2/src/user_privilege.rs @@ -12,19 +12,69 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_pb::user::grant_privilege::PbAction; use sea_orm::entity::prelude::*; -use crate::{ObjectId, UserId}; +use crate::{ObjectId, PrivilegeId, UserId}; + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EnumIter, DeriveActiveEnum)] +#[sea_orm(rs_type = "String", db_type = "String(None)")] +pub enum Action { + #[sea_orm(string_value = "INSERT")] + Insert, + #[sea_orm(string_value = "SELECT")] + Select, + #[sea_orm(string_value = "UPDATE")] + Update, + #[sea_orm(string_value = "DELETE")] + Delete, + #[sea_orm(string_value = "USAGE")] + Usage, + #[sea_orm(string_value = "CREATE")] + Create, + #[sea_orm(string_value = "CONNECT")] + Connect, +} + +impl From for Action { + fn from(action: PbAction) -> Self { + match action { + PbAction::Unspecified => unreachable!("unspecified action"), + PbAction::Insert => Self::Insert, + PbAction::Select => Self::Select, + PbAction::Update => Self::Update, + PbAction::Delete => Self::Delete, + PbAction::Usage => Self::Usage, + PbAction::Create => Self::Create, + PbAction::Connect => Self::Connect, + } + } +} + +impl From for PbAction { + fn from(action: Action) -> Self { + match action { + Action::Insert => Self::Insert, + Action::Select => Self::Select, + Action::Update => Self::Update, + Action::Delete => Self::Delete, + Action::Usage => Self::Usage, + Action::Create => Self::Create, + Action::Connect => Self::Connect, + } + } +} #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "user_privilege")] pub struct Model { #[sea_orm(primary_key)] - pub id: i32, + pub id: PrivilegeId, + pub dependent_id: Option, pub user_id: UserId, pub oid: ObjectId, pub granted_by: UserId, - pub actions: String, + pub action: Action, pub with_grant_option: bool, } @@ -54,6 +104,14 @@ pub enum Relation { on_delete = "Cascade" )] User1, + #[sea_orm( + belongs_to = "Entity", + from = "Column::DependentId", + to = "Column::Id", + on_update = "NoAction", + on_delete = "Cascade" + )] + SelfRef, } impl Related for Entity { diff --git a/src/meta/model_v2/src/view.rs b/src/meta/model_v2/src/view.rs index 0de9ea64a616e..4547c5de559ec 100644 --- a/src/meta/model_v2/src/view.rs +++ b/src/meta/model_v2/src/view.rs @@ -14,7 +14,7 @@ use risingwave_pb::catalog::PbView; use sea_orm::entity::prelude::*; -use sea_orm::ActiveValue; +use sea_orm::ActiveValue::Set; use crate::{FieldArray, Property, ViewId}; @@ -52,11 +52,11 @@ impl ActiveModelBehavior for ActiveModel {} impl From for ActiveModel { fn from(view: PbView) -> Self { Self { - view_id: ActiveValue::Set(view.id as _), - name: ActiveValue::Set(view.name), - properties: ActiveValue::Set(Property(view.properties)), - definition: ActiveValue::Set(view.sql), - columns: ActiveValue::Set(FieldArray(view.columns)), + view_id: Set(view.id as _), + name: Set(view.name), + properties: Set(Property(view.properties)), + definition: Set(view.sql), + columns: Set(FieldArray(view.columns)), } } } diff --git a/src/meta/model_v2/src/worker.rs b/src/meta/model_v2/src/worker.rs index d164fba62b41e..a18453c67ef5d 100644 --- a/src/meta/model_v2/src/worker.rs +++ b/src/meta/model_v2/src/worker.rs @@ -15,7 +15,7 @@ use risingwave_pb::common::worker_node::PbState; use risingwave_pb::common::{PbWorkerNode, PbWorkerType}; use sea_orm::entity::prelude::*; -use sea_orm::ActiveValue; +use sea_orm::ActiveValue::Set; use crate::{TransactionId, WorkerId}; @@ -91,11 +91,11 @@ impl From<&PbWorkerNode> for ActiveModel { fn from(worker: &PbWorkerNode) -> Self { let host = worker.host.clone().unwrap(); Self { - worker_id: ActiveValue::Set(worker.id), - worker_type: ActiveValue::Set(worker.r#type().into()), - host: ActiveValue::Set(host.host), - port: ActiveValue::Set(host.port), - status: ActiveValue::Set(worker.state().into()), + worker_id: Set(worker.id), + worker_type: Set(worker.r#type().into()), + host: Set(host.host), + port: Set(host.port), + status: Set(worker.state().into()), ..Default::default() } } diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 0abfd5f4b354f..998af75bf3b3d 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -34,9 +34,10 @@ use risingwave_pb::meta::subscribe_response::{ Info as NotificationInfo, Operation as NotificationOperation, }; use risingwave_pb::meta::{PbRelation, PbRelationGroup}; +use sea_orm::ActiveValue::Set; use sea_orm::{ - ActiveModelTrait, ActiveValue, ColumnTrait, DatabaseConnection, DatabaseTransaction, - EntityTrait, QueryFilter, QuerySelect, TransactionTrait, + ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait, + QueryFilter, QuerySelect, TransactionTrait, }; use tokio::sync::RwLock; @@ -84,7 +85,7 @@ pub(crate) struct CatalogControllerInner { } impl CatalogController { - async fn notify_frontend( + pub(crate) async fn notify_frontend( &self, operation: NotificationOperation, info: NotificationInfo, @@ -95,7 +96,7 @@ impl CatalogController { .await } - async fn notify_frontend_relation_info( + pub(crate) async fn notify_frontend_relation_info( &self, operation: NotificationOperation, relation_info: PbRelationInfo, @@ -121,10 +122,10 @@ impl CatalogController { ) -> MetaResult { let active_db = object::ActiveModel { oid: Default::default(), - obj_type: ActiveValue::Set(obj_type), - owner_id: ActiveValue::Set(owner_id), - schema_id: ActiveValue::Set(schema_id), - database_id: ActiveValue::Set(database_id), + obj_type: Set(obj_type), + owner_id: Set(owner_id), + schema_id: Set(schema_id), + database_id: Set(database_id), initialized_at: Default::default(), created_at: Default::default(), }; @@ -139,7 +140,7 @@ impl CatalogController { let db_obj = Self::create_object(&txn, ObjectType::Database, owner_id, None, None).await?; let mut db: database::ActiveModel = db.into(); - db.database_id = ActiveValue::Set(db_obj.oid); + db.database_id = Set(db_obj.oid); let db = db.insert(&txn).await?; let mut schemas = vec![]; @@ -148,8 +149,8 @@ impl CatalogController { Self::create_object(&txn, ObjectType::Schema, owner_id, Some(db_obj.oid), None) .await?; let schema = schema::ActiveModel { - schema_id: ActiveValue::Set(schema_obj.oid), - name: ActiveValue::Set(schema_name.into()), + schema_id: Set(schema_obj.oid), + name: Set(schema_name.into()), }; let schema = schema.insert(&txn).await?; schemas.push(ObjectModel(schema, schema_obj).into()); @@ -256,7 +257,7 @@ impl CatalogController { ) .await?; let mut schema: schema::ActiveModel = schema.into(); - schema.schema_id = ActiveValue::Set(schema_obj.oid); + schema.schema_id = Set(schema_obj.oid); let schema = schema.insert(&txn).await?; txn.commit().await?; @@ -284,7 +285,7 @@ impl CatalogController { } let res = Object::delete(object::ActiveModel { - oid: ActiveValue::Set(schema_id), + oid: Set(schema_id), ..Default::default() }) .exec(&inner.db) @@ -473,8 +474,8 @@ impl CatalogController { // todo: shall we need to check existence of them Or let database handle it by FOREIGN KEY constraint. for obj_id in &pb_view.dependent_relations { object_dependency::ActiveModel { - oid: ActiveValue::Set(*obj_id), - used_by: ActiveValue::Set(view_obj.oid), + oid: Set(*obj_id), + used_by: Set(view_obj.oid), ..Default::default() } .insert(&txn) @@ -523,16 +524,16 @@ impl CatalogController { })?; column_desc.description = comment.description; table::ActiveModel { - table_id: ActiveValue::Set(comment.table_id), - columns: ActiveValue::Set(columns), + table_id: Set(comment.table_id), + columns: Set(columns), ..Default::default() } .update(&txn) .await? } else { table::ActiveModel { - table_id: ActiveValue::Set(comment.table_id), - description: ActiveValue::Set(comment.description), + table_id: Set(comment.table_id), + description: Set(comment.description), ..Default::default() } .update(&txn) @@ -746,9 +747,9 @@ impl CatalogController { relation.name = object_name.into(); relation.definition = alter_relation_rename(&relation.definition, object_name); let active_model = $table::ActiveModel { - $identity: ActiveValue::Set(relation.$identity), - name: ActiveValue::Set(object_name.into()), - definition: ActiveValue::Set(relation.definition.clone()), + $identity: Set(relation.$identity), + name: Set(object_name.into()), + definition: Set(relation.definition.clone()), ..Default::default() }; active_model.update(&txn).await?; @@ -777,8 +778,8 @@ impl CatalogController { // the name of index and its associated table is the same. let active_model = index::ActiveModel { - index_id: ActiveValue::Set(index.index_id), - name: ActiveValue::Set(object_name.into()), + index_id: Set(index.index_id), + name: Set(object_name.into()), ..Default::default() }; active_model.update(&txn).await?; @@ -803,8 +804,8 @@ impl CatalogController { relation.definition = alter_relation_rename_refs(&relation.definition, &old_name, object_name); let active_model = $table::ActiveModel { - $identity: ActiveValue::Set(relation.$identity), - definition: ActiveValue::Set(relation.definition.clone()), + $identity: Set(relation.$identity), + definition: Set(relation.definition.clone()), ..Default::default() }; active_model.update(&txn).await?; @@ -851,8 +852,6 @@ impl CatalogController { #[cfg(test)] #[cfg(not(madsim))] mod tests { - use risingwave_common::catalog::DEFAULT_SUPER_USER_ID; - use super::*; const TEST_DATABASE_ID: DatabaseId = 1; @@ -864,7 +863,7 @@ mod tests { let mgr = CatalogController::new(MetaSrvEnv::for_test().await)?; let db = PbDatabase { name: "test".to_string(), - owner: DEFAULT_SUPER_USER_ID, + owner: TEST_OWNER_ID, ..Default::default() }; mgr.create_database(db).await?; diff --git a/src/meta/src/controller/cluster.rs b/src/meta/src/controller/cluster.rs index 392a0def5d53f..50eb6cd5921a4 100644 --- a/src/meta/src/controller/cluster.rs +++ b/src/meta/src/controller/cluster.rs @@ -34,9 +34,10 @@ use risingwave_pb::meta::heartbeat_request; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability; use sea_orm::prelude::Expr; +use sea_orm::ActiveValue::Set; use sea_orm::{ - ActiveModelTrait, ActiveValue, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, - QuerySelect, TransactionTrait, + ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect, + TransactionTrait, }; use tokio::sync::oneshot::Sender; use tokio::sync::{RwLock, RwLockReadGuard}; @@ -536,9 +537,9 @@ impl ClusterControllerInner { let mut property: worker_property::ActiveModel = property.into(); // keep `is_unschedulable` unchanged. - property.is_streaming = ActiveValue::Set(add_property.is_streaming); - property.is_serving = ActiveValue::Set(add_property.is_serving); - property.parallel_unit_ids = ActiveValue::Set(I32Array(current_parallelism)); + property.is_streaming = Set(add_property.is_streaming); + property.is_serving = Set(add_property.is_serving); + property.parallel_unit_ids = Set(I32Array(current_parallelism)); WorkerProperty::update(property).exec(&txn).await?; txn.commit().await?; @@ -553,25 +554,25 @@ impl ClusterControllerInner { let worker = worker::ActiveModel { worker_id: Default::default(), - worker_type: ActiveValue::Set(r#type.into()), - host: ActiveValue::Set(host_address.host), - port: ActiveValue::Set(host_address.port), - status: ActiveValue::Set(WorkerStatus::Starting), - transaction_id: ActiveValue::Set(txn_id), + worker_type: Set(r#type.into()), + host: Set(host_address.host), + port: Set(host_address.port), + status: Set(WorkerStatus::Starting), + transaction_id: Set(txn_id), }; let insert_res = Worker::insert(worker).exec(&txn).await?; let worker_id = insert_res.last_insert_id as WorkerId; if r#type == PbWorkerType::ComputeNode { let property = worker_property::ActiveModel { - worker_id: ActiveValue::Set(worker_id), - parallel_unit_ids: ActiveValue::Set(I32Array(derive_parallel_units( + worker_id: Set(worker_id), + parallel_unit_ids: Set(I32Array(derive_parallel_units( *txn_id.as_ref().unwrap(), 0, add_property.worker_node_parallelism as _, ))), - is_streaming: ActiveValue::Set(add_property.is_streaming), - is_serving: ActiveValue::Set(add_property.is_streaming), - is_unschedulable: ActiveValue::Set(add_property.is_streaming), + is_streaming: Set(add_property.is_streaming), + is_serving: Set(add_property.is_streaming), + is_unschedulable: Set(add_property.is_streaming), }; WorkerProperty::insert(property).exec(&txn).await?; } @@ -588,8 +589,8 @@ impl ClusterControllerInner { pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult { let worker = worker::ActiveModel { - worker_id: ActiveValue::Set(worker_id), - status: ActiveValue::Set(WorkerStatus::Running), + worker_id: Set(worker_id), + status: Set(WorkerStatus::Running), ..Default::default() }; diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 7fe9de46e5742..37ca54dd1a18f 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![expect(dead_code, reason = "WIP")] + use anyhow::anyhow; use risingwave_common::util::epoch::Epoch; use risingwave_meta_model_v2::{ @@ -28,12 +30,12 @@ use sea_orm::{DatabaseConnection, ModelTrait}; use crate::MetaError; -#[allow(dead_code)] pub mod catalog; pub mod cluster; pub mod fragment; pub mod rename; pub mod system_param; +pub mod user; pub mod utils; // todo: refine the error transform. diff --git a/src/meta/src/controller/system_param.rs b/src/meta/src/controller/system_param.rs index 5c9761a9a119d..e64ddf5b4ab85 100644 --- a/src/meta/src/controller/system_param.rs +++ b/src/meta/src/controller/system_param.rs @@ -25,7 +25,8 @@ use risingwave_meta_model_v2::prelude::SystemParameter; use risingwave_meta_model_v2::system_parameter; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::PbSystemParams; -use sea_orm::{ActiveModelTrait, ActiveValue, DatabaseConnection, EntityTrait, TransactionTrait}; +use sea_orm::ActiveValue::Set; +use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait, TransactionTrait}; use tokio::sync::oneshot::Sender; use tokio::sync::RwLock; use tokio::task::JoinHandle; @@ -83,10 +84,10 @@ macro_rules! impl_system_params_to_models { $( let value = params.$field.as_ref().unwrap().to_string(); models.push(system_parameter::ActiveModel { - name: ActiveValue::Set(key_of!($field).to_string()), - value: ActiveValue::Set(value), - is_mutable: ActiveValue::Set($is_mutable), - description: ActiveValue::Set(None), + name: Set(key_of!($field).to_string()), + value: Set(value), + is_mutable: Set($is_mutable), + description: Set(None), }); )* Ok(models) @@ -190,9 +191,8 @@ impl SystemParamsController { }; let mut params = params_guard.clone(); let mut param: system_parameter::ActiveModel = param.into(); - param.value = ActiveValue::Set( - set_system_param(&mut params, name, value).map_err(MetaError::system_param)?, - ); + param.value = + Set(set_system_param(&mut params, name, value).map_err(MetaError::system_param)?); param.update(&self.db).await?; *params_guard = params.clone(); @@ -281,10 +281,10 @@ mod tests { // insert deprecated params. let deprecated_param = system_parameter::ActiveModel { - name: ActiveValue::Set("deprecated_param".into()), - value: ActiveValue::Set("foo".into()), - is_mutable: ActiveValue::Set(true), - description: ActiveValue::Set(None), + name: Set("deprecated_param".into()), + value: Set("foo".into()), + is_mutable: Set(true), + description: Set(None), }; deprecated_param.insert(&system_param_ctl.db).await.unwrap(); diff --git a/src/meta/src/controller/user.rs b/src/meta/src/controller/user.rs new file mode 100644 index 0000000000000..76e2d6225ca8d --- /dev/null +++ b/src/meta/src/controller/user.rs @@ -0,0 +1,695 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; + +use itertools::Itertools; +use risingwave_common::catalog::{DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_FOR_PG}; +use risingwave_meta_model_v2::prelude::{Object, User, UserPrivilege}; +use risingwave_meta_model_v2::user_privilege::Action; +use risingwave_meta_model_v2::{object, user, user_privilege, AuthInfo, PrivilegeId, UserId}; +use risingwave_pb::meta::subscribe_response::{ + Info as NotificationInfo, Operation as NotificationOperation, +}; +use risingwave_pb::user::update_user_request::PbUpdateField; +use risingwave_pb::user::{PbGrantPrivilege, PbUserInfo}; +use sea_orm::sea_query::{OnConflict, SimpleExpr, Value}; +use sea_orm::ActiveValue::Set; +use sea_orm::{ + ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, PaginatorTrait, QueryFilter, + QuerySelect, TransactionTrait, +}; + +use crate::controller::catalog::CatalogController; +use crate::controller::utils::{ + check_user_name_duplicate, ensure_privileges_not_referred, ensure_user_id, + extract_grant_obj_id, get_referring_privileges_cascade, get_user_privilege, + PartialUserPrivilege, +}; +use crate::manager::NotificationVersion; +use crate::{MetaError, MetaResult}; + +impl CatalogController { + async fn create_user(&self, pb_user: PbUserInfo) -> MetaResult { + let inner = self.inner.write().await; + let txn = inner.db.begin().await?; + check_user_name_duplicate(&pb_user.name, &txn).await?; + + let grant_privileges = pb_user.grant_privileges.clone(); + let user: user::ActiveModel = pb_user.into(); + let user = user.insert(&txn).await?; + + if !grant_privileges.is_empty() { + let mut privileges = vec![]; + for gp in &grant_privileges { + let id = extract_grant_obj_id(gp.get_object()?); + for action_with_opt in &gp.action_with_opts { + privileges.push(user_privilege::ActiveModel { + user_id: Set(user.user_id), + oid: Set(id), + granted_by: Set(action_with_opt.granted_by), + action: Set(action_with_opt.get_action()?.into()), + with_grant_option: Set(action_with_opt.with_grant_option), + ..Default::default() + }); + } + } + UserPrivilege::insert_many(privileges).exec(&txn).await?; + } + txn.commit().await?; + + let mut user_info: PbUserInfo = user.into(); + user_info.grant_privileges = grant_privileges; + let version = self + .notify_frontend( + NotificationOperation::Add, + NotificationInfo::User(user_info), + ) + .await; + + Ok(version) + } + + async fn update_user( + &self, + update_user: PbUserInfo, + update_fields: &[PbUpdateField], + ) -> MetaResult { + let inner = self.inner.write().await; + let rename_flag = update_fields + .iter() + .any(|&field| field == PbUpdateField::Rename); + if rename_flag { + check_user_name_duplicate(&update_user.name, &inner.db).await?; + } + + let user = User::find_by_id(update_user.id) + .one(&inner.db) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("user", update_user.id))?; + let mut user = user.into_active_model(); + update_fields.iter().for_each(|&field| match field { + PbUpdateField::Unspecified => unreachable!(), + PbUpdateField::Super => user.is_super = Set(update_user.is_super), + PbUpdateField::Login => user.can_login = Set(update_user.can_login), + PbUpdateField::CreateDb => user.can_create_db = Set(update_user.can_create_db), + PbUpdateField::CreateUser => user.can_create_user = Set(update_user.can_create_user), + PbUpdateField::AuthInfo => { + user.auth_info = Set(update_user.auth_info.clone().map(AuthInfo)) + } + PbUpdateField::Rename => user.name = Set(update_user.name.clone()), + }); + + let user = user.update(&inner.db).await?; + let mut user_info: PbUserInfo = user.into(); + user_info.grant_privileges = get_user_privilege(user_info.id, &inner.db).await?; + let version = self + .notify_frontend( + NotificationOperation::Update, + NotificationInfo::User(user_info), + ) + .await; + + Ok(version) + } + + #[cfg(test)] + pub async fn get_user(&self, id: UserId) -> MetaResult { + let inner = self.inner.read().await; + let user = User::find_by_id(id) + .one(&inner.db) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("user", id))?; + Ok(user) + } + + #[cfg(test)] + pub async fn get_user_by_name(&self, name: &str) -> MetaResult { + let inner = self.inner.read().await; + let user = User::find() + .filter(user::Column::Name.eq(name)) + .one(&inner.db) + .await? + .ok_or_else(|| anyhow::anyhow!("user {name} not found"))?; + Ok(user) + } + + async fn drop_user(&self, user_id: UserId) -> MetaResult { + let inner = self.inner.write().await; + let txn = inner.db.begin().await?; + let user = User::find_by_id(user_id) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?; + if user.name == DEFAULT_SUPER_USER || user.name == DEFAULT_SUPER_USER_FOR_PG { + return Err(MetaError::permission_denied(format!( + "drop default super user {} is not allowed", + user.name + ))); + } + + // check if the user is the owner of any objects. + let count = Object::find() + .filter(object::Column::OwnerId.eq(user_id)) + .count(&txn) + .await?; + if count != 0 { + return Err(MetaError::permission_denied(format!( + "drop user {} is not allowed, because it owns {} objects", + user.name, count + ))); + } + + // check if the user granted any privileges to other users. + let count = UserPrivilege::find() + .filter(user_privilege::Column::GrantedBy.eq(user_id)) + .count(&txn) + .await?; + if count != 0 { + return Err(MetaError::permission_denied(format!( + "drop user {} is not allowed, because it granted {} privileges to others", + user.name, count + ))); + } + + let res = User::delete_by_id(user_id).exec(&txn).await?; + if res.rows_affected != 1 { + return Err(MetaError::catalog_id_not_found("user", user_id)); + } + txn.commit().await?; + + let version = self + .notify_frontend( + NotificationOperation::Delete, + NotificationInfo::User(PbUserInfo { + id: user_id, + ..Default::default() + }), + ) + .await; + + Ok(version) + } + + pub async fn grant_privilege( + &self, + user_ids: Vec, + new_grant_privileges: &[PbGrantPrivilege], + grantor: UserId, + ) -> MetaResult { + let inner = self.inner.write().await; + let txn = inner.db.begin().await?; + for user_id in &user_ids { + ensure_user_id(*user_id, &txn).await?; + } + + let mut privileges = vec![]; + for gp in new_grant_privileges { + let id = extract_grant_obj_id(gp.get_object()?); + for action_with_opt in &gp.action_with_opts { + privileges.push(user_privilege::ActiveModel { + oid: Set(id), + granted_by: Set(grantor), + action: Set(action_with_opt.get_action()?.into()), + with_grant_option: Set(action_with_opt.with_grant_option), + ..Default::default() + }); + } + } + + // check whether grantor has the privilege to grant the privilege. + let user = User::find_by_id(grantor) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("user", grantor))?; + if !user.is_super { + for privilege in &mut privileges { + let filter = user_privilege::Column::UserId + .eq(grantor) + .and(user_privilege::Column::Oid.eq(*privilege.oid.as_ref())) + .and(user_privilege::Column::Action.eq(privilege.action.as_ref().clone())) + .and(user_privilege::Column::WithGrantOption.eq(true)); + let privilege_id: Option = UserPrivilege::find() + .select_only() + .column(user_privilege::Column::Id) + .filter(filter) + .into_tuple() + .one(&txn) + .await?; + let Some(privilege_id) = privilege_id else { + return Err(MetaError::permission_denied(format!( + "user {} don't have privilege {:?} or grant option", + grantor, privilege.action, + ))); + }; + privilege.dependent_id = Set(Some(privilege_id)); + } + } + + // insert privileges + let user_privileges = user_ids.iter().flat_map(|user_id| { + privileges.iter().map(|p| { + let mut p = p.clone(); + p.user_id = Set(*user_id); + p + }) + }); + for privilege in user_privileges { + let mut on_conflict = OnConflict::columns([ + user_privilege::Column::UserId, + user_privilege::Column::Oid, + user_privilege::Column::Action, + user_privilege::Column::GrantedBy, + ]); + if *privilege.with_grant_option.as_ref() { + on_conflict.update_columns([user_privilege::Column::WithGrantOption]); + } else { + on_conflict.do_nothing(); + } + UserPrivilege::insert(privilege) + .on_conflict(on_conflict) + .exec(&txn) + .await?; + } + + let mut user_infos = vec![]; + for user_id in user_ids { + let user = User::find_by_id(user_id) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?; + let mut user_info: PbUserInfo = user.into(); + user_info.grant_privileges = get_user_privilege(user_info.id, &txn).await?; + user_infos.push(user_info); + } + txn.commit().await?; + + let mut version = 0; + for info in user_infos { + version = self + .notify_frontend(NotificationOperation::Update, NotificationInfo::User(info)) + .await; + } + Ok(version) + } + + pub async fn revoke_privilege( + &self, + user_ids: Vec, + revoke_grant_privileges: &[PbGrantPrivilege], + granted_by: Option, + revoke_by: UserId, + revoke_grant_option: bool, + cascade: bool, + ) -> MetaResult { + let inner = self.inner.write().await; + let txn = inner.db.begin().await?; + for user_id in &user_ids { + ensure_user_id(*user_id, &txn).await?; + } + // check whether revoke has the privilege to grant the privilege. + let revoke_user = User::find_by_id(revoke_by) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("user", revoke_by))?; + + let granted_by = granted_by.unwrap_or(revoke_by); + // check whether user can revoke the privilege. + if !revoke_user.is_super && granted_by != revoke_by { + let granted_user_name: String = User::find_by_id(granted_by) + .select_only() + .column(user::Column::Name) + .into_tuple() + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("user", granted_by))?; + return Err(MetaError::permission_denied(format!( + "user {} is not super, can't revoke privileges for {}", + revoke_user.name, granted_user_name + ))); + } + + let mut revoke_items = HashMap::new(); + for privilege in revoke_grant_privileges { + let obj = extract_grant_obj_id(privilege.get_object()?); + let actions = privilege + .action_with_opts + .iter() + .map(|ao| Action::from(ao.get_action().unwrap())) + .collect_vec(); + revoke_items.insert(obj, actions); + } + + let filter = if !revoke_user.is_super { + // ensure user have grant options. + for (obj, actions) in &revoke_items { + let owned_actions: HashSet = UserPrivilege::find() + .select_only() + .column(user_privilege::Column::Action) + .filter( + user_privilege::Column::UserId + .eq(granted_by) + .and(user_privilege::Column::Oid.eq(*obj)) + .and(user_privilege::Column::WithGrantOption.eq(true)), + ) + .into_tuple::() + .all(&txn) + .await? + .into_iter() + .collect(); + if actions.iter().any(|ac| !owned_actions.contains(ac)) { + return Err(MetaError::permission_denied(format!( + "user {} don't have privileges {:?} or grant option", + revoke_user.name, actions, + ))); + } + } + + user_privilege::Column::GrantedBy + .eq(granted_by) + .and(user_privilege::Column::UserId.is_in(user_ids.clone())) + } else { + user_privilege::Column::UserId.is_in(user_ids.clone()) + }; + let mut root_user_privileges: Vec = vec![]; + for (obj, actions) in &revoke_items { + let filter = filter + .clone() + .and(user_privilege::Column::Oid.eq(*obj)) + .and(user_privilege::Column::Action.is_in(actions.clone())); + root_user_privileges.extend( + UserPrivilege::find() + .select_only() + .columns([user_privilege::Column::Id, user_privilege::Column::UserId]) + .filter(filter) + .into_partial_model() + .all(&txn) + .await?, + ); + } + if root_user_privileges.is_empty() { + return Err(MetaError::invalid_parameter( + "no privilege to revoke".to_string(), + )); + } + + // check if the user granted any privileges to other users. + let root_privilege_ids = root_user_privileges.iter().map(|ur| ur.id).collect_vec(); + let (all_privilege_ids, to_update_user_ids): (_, HashSet) = if !cascade { + ensure_privileges_not_referred(root_privilege_ids.clone(), &txn).await?; + ( + root_privilege_ids.clone(), + root_user_privileges.iter().map(|ur| ur.user_id).collect(), + ) + } else { + let all_user_privileges = + get_referring_privileges_cascade(root_privilege_ids.clone(), &txn).await?; + ( + all_user_privileges.iter().map(|ur| ur.id).collect_vec(), + all_user_privileges.iter().map(|ur| ur.user_id).collect(), + ) + }; + + if revoke_grant_option { + UserPrivilege::update_many() + .col_expr( + user_privilege::Column::WithGrantOption, + SimpleExpr::Value(Value::Bool(Some(false))), + ) + .filter( + user_privilege::Column::Id + .is_in(all_privilege_ids) + .and(user_privilege::Column::WithGrantOption.eq(true)), + ) + .exec(&txn) + .await?; + } else { + // The dependent privileges will be deleted cascade. + UserPrivilege::delete_many() + .filter(user_privilege::Column::Id.is_in(root_privilege_ids)) + .exec(&txn) + .await?; + } + + let mut user_infos = vec![]; + for user_id in to_update_user_ids { + let user = User::find_by_id(user_id) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("user", user_id))?; + let mut user_info: PbUserInfo = user.into(); + user_info.grant_privileges = get_user_privilege(user_info.id, &txn).await?; + user_infos.push(user_info); + } + txn.commit().await?; + + let mut version = 0; + for info in user_infos { + version = self + .notify_frontend(NotificationOperation::Update, NotificationInfo::User(info)) + .await; + } + Ok(version) + } +} + +#[cfg(test)] +#[cfg(not(madsim))] +mod tests { + use risingwave_meta_model_v2::DatabaseId; + use risingwave_pb::user::grant_privilege::{PbAction, PbActionWithGrantOption, PbObject}; + + use super::*; + use crate::manager::MetaSrvEnv; + + const TEST_DATABASE_ID: DatabaseId = 1; + const TEST_ROOT_USER_ID: UserId = 1; + + fn make_test_user(name: &str) -> PbUserInfo { + PbUserInfo { + name: name.to_string(), + ..Default::default() + } + } + + fn make_privilege( + object: PbObject, + actions: &[PbAction], + with_grant_option: bool, + ) -> PbGrantPrivilege { + PbGrantPrivilege { + object: Some(object), + action_with_opts: actions + .iter() + .map(|&action| PbActionWithGrantOption { + action: action as _, + with_grant_option, + ..Default::default() + }) + .collect(), + } + } + + #[tokio::test] + async fn test_user_and_privilege() -> MetaResult<()> { + let mgr = CatalogController::new(MetaSrvEnv::for_test().await)?; + mgr.create_user(make_test_user("test_user_1")).await?; + mgr.create_user(make_test_user("test_user_2")).await?; + let user_1 = mgr.get_user_by_name("test_user_1").await?; + let user_2 = mgr.get_user_by_name("test_user_2").await?; + + assert!( + mgr.create_user(make_test_user("test_user_1")) + .await + .is_err(), + "user_1 already exists" + ); + mgr.update_user( + PbUserInfo { + id: user_1.user_id, + name: "test_user_1_new".to_string(), + ..Default::default() + }, + &[PbUpdateField::Rename], + ) + .await?; + let user_1 = mgr.get_user(user_1.user_id).await?; + assert_eq!(user_1.name, "test_user_1_new".to_string()); + + let conn_with_option = make_privilege( + PbObject::DatabaseId(TEST_DATABASE_ID), + &[PbAction::Connect], + true, + ); + let create_without_option = make_privilege( + PbObject::DatabaseId(TEST_DATABASE_ID), + &[PbAction::Create], + false, + ); + // ROOT grant CONN with grant option to user_1. + mgr.grant_privilege( + vec![user_1.user_id], + &[conn_with_option.clone()], + TEST_ROOT_USER_ID, + ) + .await?; + // ROOT grant CREATE without grant option to user_1. + mgr.grant_privilege( + vec![user_1.user_id], + &[create_without_option.clone()], + TEST_ROOT_USER_ID, + ) + .await?; + // user_1 grant CONN with grant option to user_2. + mgr.grant_privilege( + vec![user_2.user_id], + &[conn_with_option.clone()], + user_1.user_id, + ) + .await?; + // user_1 grant CREATE without grant option to user_2. + assert!( + mgr.grant_privilege( + vec![user_2.user_id], + &[create_without_option.clone()], + user_1.user_id + ) + .await + .is_err(), + "user_1 don't have grant option" + ); + + assert!( + mgr.drop_user(user_1.user_id).await.is_err(), + "user_1 can't be dropped" + ); + + let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?; + assert_eq!(privilege_1.len(), 2); + assert!(privilege_1.iter().all(|gp| gp.object + == Some(PbObject::DatabaseId(TEST_DATABASE_ID)) + && gp.action_with_opts[0].granted_by == TEST_ROOT_USER_ID)); + + let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?; + assert_eq!(privilege_2.len(), 1); + assert!(privilege_2.iter().all(|gp| gp.object + == Some(PbObject::DatabaseId(TEST_DATABASE_ID)) + && gp.action_with_opts[0].granted_by == user_1.user_id + && gp.action_with_opts[0].with_grant_option)); + + // revoke privilege for others by non-super user. + assert!( + mgr.revoke_privilege( + vec![user_1.user_id], + &[conn_with_option.clone()], + Some(TEST_ROOT_USER_ID), + user_2.user_id, + false, + false + ) + .await + .is_err(), + "user_2 can't revoke for user_1" + ); + + // revoke privilege without grant option. + assert!( + mgr.revoke_privilege( + vec![user_2.user_id], + &[create_without_option.clone()], + None, + user_1.user_id, + false, + false + ) + .await + .is_err(), + "user_2 don't have grant option for CREATE" + ); + + // revoke referred privilege in restrict mode. + assert!( + mgr.revoke_privilege( + vec![user_1.user_id], + &[conn_with_option.clone()], + None, + TEST_ROOT_USER_ID, + false, + false + ) + .await + .is_err(), + "permission deny in restrict mode, CONN granted to user_2" + ); + + // revoke non-referred privilege in restrict mode. + mgr.revoke_privilege( + vec![user_1.user_id], + &[create_without_option.clone()], + None, + TEST_ROOT_USER_ID, + false, + false, + ) + .await?; + + let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?; + assert_eq!(privilege_1.len(), 1); + assert!(privilege_1.iter().all(|gp| gp.object + == Some(PbObject::DatabaseId(TEST_DATABASE_ID)) + && gp.action_with_opts[0].action == PbAction::Connect as i32)); + + // revoke grant option for referred privilege in cascade mode. + mgr.revoke_privilege( + vec![user_1.user_id], + &[conn_with_option.clone()], + None, + TEST_ROOT_USER_ID, + true, + true, + ) + .await?; + let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?; + assert_eq!(privilege_1.len(), 1); + assert!(privilege_1.iter().all(|gp| gp.object + == Some(PbObject::DatabaseId(TEST_DATABASE_ID)) + && gp.action_with_opts[0].action == PbAction::Connect as i32 + && !gp.action_with_opts[0].with_grant_option)); + let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?; + assert_eq!(privilege_2.len(), 1); + assert!(privilege_2.iter().all(|gp| gp.object + == Some(PbObject::DatabaseId(TEST_DATABASE_ID)) + && gp.action_with_opts[0].action == PbAction::Connect as i32 + && !gp.action_with_opts[0].with_grant_option)); + + // revoke referred privilege in cascade mode. + mgr.revoke_privilege( + vec![user_1.user_id], + &[conn_with_option.clone()], + None, + TEST_ROOT_USER_ID, + false, + true, + ) + .await?; + let privilege_1 = get_user_privilege(user_1.user_id, &mgr.inner.read().await.db).await?; + assert!(privilege_1.is_empty()); + let privilege_2 = get_user_privilege(user_2.user_id, &mgr.inner.read().await.db).await?; + assert!(privilege_2.is_empty()); + + mgr.drop_user(user_1.user_id).await?; + mgr.drop_user(user_2.user_id).await?; + Ok(()) + } +} diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 2dbd89ac92423..9b5ee9e6a2c0b 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -17,10 +17,12 @@ use risingwave_meta_model_migration::WithQuery; use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::prelude::*; use risingwave_meta_model_v2::{ - connection, function, index, object, object_dependency, schema, sink, source, table, view, - DataTypeArray, DatabaseId, ObjectId, SchemaId, UserId, + connection, function, index, object, object_dependency, schema, sink, source, table, user, + user_privilege, view, DataTypeArray, DatabaseId, ObjectId, PrivilegeId, SchemaId, UserId, }; use risingwave_pb::catalog::{PbConnection, PbFunction}; +use risingwave_pb::user::grant_privilege::{PbAction, PbActionWithGrantOption, PbObject}; +use risingwave_pb::user::PbGrantPrivilege; use sea_orm::sea_query::{ Alias, CommonTableExpression, Expr, Query, QueryStatementBuilder, SelectStatement, UnionType, WithClause, @@ -220,6 +222,22 @@ where Ok(()) } +/// `check_user_name_duplicate` checks whether the user is already existed in the cluster. +pub async fn check_user_name_duplicate(name: &str, db: &C) -> MetaResult<()> +where + C: ConnectionTrait, +{ + let count = User::find() + .filter(user::Column::Name.eq(name)) + .count(db) + .await?; + if count > 0 { + assert_eq!(count, 1); + return Err(MetaError::catalog_duplicated("user", name)); + } + Ok(()) +} + /// `check_relation_name_duplicate` checks whether the relation name is already used in the target namespace. pub async fn check_relation_name_duplicate( name: &str, @@ -354,3 +372,169 @@ where Ok(()) } + +/// `construct_privilege_dependency_query` constructs a query to find all privileges that are dependent on the given one. +/// +/// # Examples +/// +/// ``` +/// use risingwave_meta::controller::utils::construct_privilege_dependency_query; +/// use sea_orm::sea_query::*; +/// use sea_orm::*; +/// +/// let query = construct_privilege_dependency_query(vec![1, 2, 3]); +/// +/// assert_eq!( +/// query.to_string(MysqlQueryBuilder), +/// r#"WITH RECURSIVE `granted_privilege_ids` (`id`, `user_id`) AS (SELECT `id`, `user_id` FROM `user_privilege` WHERE `user_privilege`.`id` IN (1, 2, 3) UNION ALL (SELECT `user_privilege`.`id`, `user_privilege`.`user_id` FROM `user_privilege` INNER JOIN `granted_privilege_ids` ON `granted_privilege_ids`.`id` = `dependent_id`)) SELECT `id`, `user_id` FROM `granted_privilege_ids`"# +/// ); +/// assert_eq!( +/// query.to_string(PostgresQueryBuilder), +/// r#"WITH RECURSIVE "granted_privilege_ids" ("id", "user_id") AS (SELECT "id", "user_id" FROM "user_privilege" WHERE "user_privilege"."id" IN (1, 2, 3) UNION ALL (SELECT "user_privilege"."id", "user_privilege"."user_id" FROM "user_privilege" INNER JOIN "granted_privilege_ids" ON "granted_privilege_ids"."id" = "dependent_id")) SELECT "id", "user_id" FROM "granted_privilege_ids""# +/// ); +/// assert_eq!( +/// query.to_string(SqliteQueryBuilder), +/// r#"WITH RECURSIVE "granted_privilege_ids" ("id", "user_id") AS (SELECT "id", "user_id" FROM "user_privilege" WHERE "user_privilege"."id" IN (1, 2, 3) UNION ALL SELECT "user_privilege"."id", "user_privilege"."user_id" FROM "user_privilege" INNER JOIN "granted_privilege_ids" ON "granted_privilege_ids"."id" = "dependent_id") SELECT "id", "user_id" FROM "granted_privilege_ids""# +/// ); +/// ``` +pub fn construct_privilege_dependency_query(ids: Vec) -> WithQuery { + let cte_alias = Alias::new("granted_privilege_ids"); + let cte_return_privilege_alias = Alias::new("id"); + let cte_return_user_alias = Alias::new("user_id"); + + let mut base_query = SelectStatement::new() + .columns([user_privilege::Column::Id, user_privilege::Column::UserId]) + .from(UserPrivilege) + .and_where(user_privilege::Column::Id.is_in(ids)) + .to_owned(); + + let cte_referencing = Query::select() + .columns([ + (UserPrivilege, user_privilege::Column::Id), + (UserPrivilege, user_privilege::Column::UserId), + ]) + .from(UserPrivilege) + .inner_join( + cte_alias.clone(), + Expr::col((cte_alias.clone(), cte_return_privilege_alias.clone())) + .equals(user_privilege::Column::DependentId), + ) + .to_owned(); + + let common_table_expr = CommonTableExpression::new() + .query(base_query.union(UnionType::All, cte_referencing).to_owned()) + .columns([ + cte_return_privilege_alias.clone(), + cte_return_user_alias.clone(), + ]) + .table_name(cte_alias.clone()) + .to_owned(); + + SelectStatement::new() + .columns([cte_return_privilege_alias, cte_return_user_alias]) + .from(cte_alias.clone()) + .to_owned() + .with( + WithClause::new() + .recursive(true) + .cte(common_table_expr) + .to_owned(), + ) + .to_owned() +} + +#[derive(Clone, DerivePartialModel, FromQueryResult)] +#[sea_orm(entity = "UserPrivilege")] +pub struct PartialUserPrivilege { + pub id: PrivilegeId, + pub user_id: UserId, +} + +pub async fn get_referring_privileges_cascade( + ids: Vec, + db: &C, +) -> MetaResult> +where + C: ConnectionTrait, +{ + let query = construct_privilege_dependency_query(ids); + let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder()); + let privileges = PartialUserPrivilege::find_by_statement(Statement::from_sql_and_values( + db.get_database_backend(), + sql, + values, + )) + .all(db) + .await?; + + Ok(privileges) +} + +/// `ensure_privileges_not_referred` ensures that the privileges are not granted to any other users. +pub async fn ensure_privileges_not_referred(ids: Vec, db: &C) -> MetaResult<()> +where + C: ConnectionTrait, +{ + let count = UserPrivilege::find() + .filter(user_privilege::Column::DependentId.is_in(ids)) + .count(db) + .await?; + if count != 0 { + return Err(MetaError::permission_denied(format!( + "privileges granted to {} other ones.", + count + ))); + } + Ok(()) +} + +/// `get_user_privilege` returns the privileges of the given user. +pub async fn get_user_privilege(user_id: UserId, db: &C) -> MetaResult> +where + C: ConnectionTrait, +{ + let user_privileges = UserPrivilege::find() + .find_also_related(Object) + .filter(user_privilege::Column::UserId.eq(user_id)) + .all(db) + .await?; + Ok(user_privileges + .into_iter() + .map(|(privilege, object)| { + let object = object.unwrap(); + let obj = match object.obj_type { + ObjectType::Database => PbObject::DatabaseId(object.oid), + ObjectType::Schema => PbObject::SchemaId(object.oid), + ObjectType::Table => PbObject::TableId(object.oid), + ObjectType::Source => PbObject::SourceId(object.oid), + ObjectType::Sink => PbObject::SinkId(object.oid), + ObjectType::View => PbObject::ViewId(object.oid), + ObjectType::Function => PbObject::FunctionId(object.oid), + ObjectType::Index => unreachable!("index is not supported yet"), + ObjectType::Connection => unreachable!("connection is not supported yet"), + }; + PbGrantPrivilege { + action_with_opts: vec![PbActionWithGrantOption { + action: PbAction::from(privilege.action) as _, + with_grant_option: privilege.with_grant_option, + granted_by: privilege.granted_by, + }], + object: Some(obj), + } + }) + .collect()) +} + +// todo: remove it after migrated to sql backend. +pub fn extract_grant_obj_id(object: &PbObject) -> ObjectId { + match object { + PbObject::DatabaseId(id) + | PbObject::SchemaId(id) + | PbObject::TableId(id) + | PbObject::SourceId(id) + | PbObject::SinkId(id) + | PbObject::ViewId(id) + | PbObject::FunctionId(id) => *id, + _ => unreachable!("invalid object type: {:?}", object), + } +}