From 3a4eb80f3b88ba7b3d9b6aa52d384e4f6a98cf78 Mon Sep 17 00:00:00 2001 From: August Date: Mon, 30 May 2022 14:13:21 +0800 Subject: [PATCH] feat(meta): introduce user service in meta, to support user and privilege management (#2745) --- proto/user.proto | 53 ++- src/common/src/catalog/mod.rs | 3 + src/meta/src/manager/catalog.rs | 28 ++ src/meta/src/manager/mod.rs | 2 + src/meta/src/manager/user.rs | 456 +++++++++++++++++++++++ src/meta/src/model/mod.rs | 2 + src/meta/src/model/user.rs | 42 +++ src/meta/src/rpc/server.rs | 17 +- src/meta/src/rpc/service/mod.rs | 1 + src/meta/src/rpc/service/user_service.rs | 190 ++++++++++ 10 files changed, 770 insertions(+), 24 deletions(-) create mode 100644 src/meta/src/manager/user.rs create mode 100644 src/meta/src/model/user.rs create mode 100644 src/meta/src/rpc/service/user_service.rs diff --git a/proto/user.proto b/proto/user.proto index 43b9a9c6f99ba..0b4af6186b4c2 100644 --- a/proto/user.proto +++ b/proto/user.proto @@ -20,15 +20,14 @@ message AuthInfo { /// User defines a user in the system. message UserInfo { - uint32 id = 1; - string name = 2; - bool is_supper = 3; - bool can_create_db = 4; - bool can_login = 5; - AuthInfo auth_info = 6; + string name = 1; + bool is_supper = 2; + bool can_create_db = 3; + bool can_login = 4; + AuthInfo auth_info = 5; /// Granted privileges will be only updated through the command of GRANT/REVOKE. - repeated GrantPrivilege privileges = 7; + repeated GrantPrivilege grant_privileges = 6; } /// GrantPrivilege defines a privilege granted to a user. @@ -48,12 +47,23 @@ message GrantPrivilege { uint32 table_id = 3; } + message GrantSource { + uint32 database_id = 1; + uint32 schema_id = 2; + uint32 source_id = 3; + } + /// To support grant privilege on ALL TABLES IN SCHEMA schema_name. message GrantAllTables { uint32 database_id = 1; uint32 schema_id = 2; } + message GrantAllSources { + uint32 database_id = 1; + uint32 schema_id = 2; + } + enum Privilege { UNKNOWN = 0; SELECT = 1; @@ -62,16 +72,22 @@ message GrantPrivilege { DELETE = 4; CREATE = 5; CONNECT = 6; - ALL = 20; } + + message PrivilegeWithGrantOption { + Privilege privilege = 1; + bool with_grant_option = 2; + } + oneof target { GrantDatabase grant_database = 1; GrantSchema grant_schema = 2; GrantTable grant_table = 3; - GrantAllTables grant_all_tables = 4; + GrantSource grant_source = 4; + GrantAllTables grant_all_tables = 5; + GrantAllSources grant_all_sources = 6; } - repeated Privilege privileges = 5; - bool with_grant_option = 6; + repeated PrivilegeWithGrantOption privilege_with_opts = 7; } message CreateUserRequest { @@ -80,12 +96,11 @@ message CreateUserRequest { message CreateUserResponse { common.Status status = 1; - uint32 user_id = 2; - uint64 version = 3; + uint64 version = 2; } message DropUserRequest { - uint32 user_id = 1; + string name = 1; } message DropUserResponse { @@ -94,8 +109,9 @@ message DropUserResponse { } message GrantPrivilegeRequest { - uint32 user_id = 1; - GrantPrivilege privilege = 2; + string user_name = 1; + repeated GrantPrivilege privileges = 2; + bool with_grant_option = 3; } message GrantPrivilegeResponse { @@ -104,8 +120,9 @@ message GrantPrivilegeResponse { } message RevokePrivilegeRequest { - uint32 user_id = 1; - GrantPrivilege privilege = 2; + string user_name = 1; + repeated GrantPrivilege privileges = 2; + bool revoke_grant_option = 3; } message RevokePrivilegeResponse { diff --git a/src/common/src/catalog/mod.rs b/src/common/src/catalog/mod.rs index 1dfaee129a7f0..666ec432d41bd 100644 --- a/src/common/src/catalog/mod.rs +++ b/src/common/src/catalog/mod.rs @@ -25,6 +25,9 @@ pub use schema::{test_utils as schema_test_utils, Field, Schema}; pub const DEFAULT_DATABASE_NAME: &str = "dev"; pub const DEFAULT_SCHEMA_NAME: &str = "dev"; +pub const DEFAULT_SUPPER_USER: &str = "risingwave"; +pub const DEFAULT_SUPPER_USER_PASSWORD: &str = "risingwave"; + pub type CatalogVersion = u64; pub enum CatalogId { diff --git a/src/meta/src/manager/catalog.rs b/src/meta/src/manager/catalog.rs index 3ce2d9e388c3a..287ebb08f52bc 100644 --- a/src/meta/src/manager/catalog.rs +++ b/src/meta/src/manager/catalog.rs @@ -570,6 +570,34 @@ where ))), } } + + pub async fn list_tables( + &self, + database_id: DatabaseId, + schema_id: SchemaId, + ) -> Result> { + let core = self.core.lock().await; + let tables = Table::list(core.env.meta_store()).await?; + Ok(tables + .iter() + .filter(|t| t.database_id == database_id && t.schema_id == schema_id) + .map(|t| t.id) + .collect()) + } + + pub async fn list_sources( + &self, + database_id: DatabaseId, + schema_id: SchemaId, + ) -> Result> { + let core = self.core.lock().await; + let sources = Source::list(core.env.meta_store()).await?; + Ok(sources + .iter() + .filter(|s| s.database_id == database_id && s.schema_id == schema_id) + .map(|s| s.id) + .collect()) + } } type DatabaseKey = String; diff --git a/src/meta/src/manager/mod.rs b/src/meta/src/manager/mod.rs index ca497ac1b9121..74d7e61678d7e 100644 --- a/src/meta/src/manager/mod.rs +++ b/src/meta/src/manager/mod.rs @@ -17,9 +17,11 @@ mod env; mod hash_mapping; mod id; mod notification; +mod user; pub use catalog::*; pub use env::*; pub use hash_mapping::*; pub use id::*; pub use notification::*; +pub use user::*; diff --git a/src/meta/src/manager/user.rs b/src/meta/src/manager/user.rs new file mode 100644 index 0000000000000..892da8ddfb794 --- /dev/null +++ b/src/meta/src/manager/user.rs @@ -0,0 +1,456 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use risingwave_common::catalog::{DEFAULT_SUPPER_USER, DEFAULT_SUPPER_USER_PASSWORD}; +use risingwave_common::error::ErrorCode::InternalError; +use risingwave_common::error::{Result, RwError}; +use risingwave_pb::user::auth_info::EncryptionType; +use risingwave_pb::user::grant_privilege::{PrivilegeWithGrantOption, Target}; +use risingwave_pb::user::{AuthInfo, GrantPrivilege, UserInfo}; +use tokio::sync::Mutex; + +use crate::manager::MetaSrvEnv; +use crate::model::{MetadataModel, Transactional}; +use crate::storage::{MetaStore, Transaction}; + +/// We use `UserName` as the key for the user info. +type UserName = String; + +/// `UserManager` managers the user info, including authentication and privileges. It only responds +/// to manager the user info and some basic validation. Other authorization relate to the current +/// session user should be done in Frontend before passing to Meta. +pub struct UserManager { + env: MetaSrvEnv, + core: Mutex>, +} + +impl UserManager { + pub async fn new(env: MetaSrvEnv) -> Result { + let users = UserInfo::list(env.meta_store()).await?; + let user_manager = Self { + env, + core: Mutex::new(HashMap::from_iter( + users.into_iter().map(|user| (user.name.clone(), user)), + )), + }; + user_manager.init().await?; + Ok(user_manager) + } + + async fn init(&self) -> Result<()> { + let mut core = self.core.lock().await; + if !core.contains_key(DEFAULT_SUPPER_USER) { + let default_user = UserInfo { + name: DEFAULT_SUPPER_USER.to_string(), + is_supper: true, + can_create_db: true, + can_login: true, + auth_info: Some(AuthInfo { + encryption_type: EncryptionType::Plaintext as i32, + encrypted_value: Vec::from(DEFAULT_SUPPER_USER_PASSWORD.as_bytes()), + }), + ..Default::default() + }; + + default_user.insert(self.env.meta_store()).await?; + core.insert(DEFAULT_SUPPER_USER.to_string(), default_user); + } + + Ok(()) + } + + pub async fn list_users(&self) -> Result> { + let core = self.core.lock().await; + Ok(core.values().cloned().collect()) + } + + pub async fn create_user(&self, user: &UserInfo) -> Result<()> { + let mut core = self.core.lock().await; + if core.contains_key(&user.name) { + return Err(RwError::from(InternalError(format!( + "User {} already exists", + user.name + )))); + } + user.insert(self.env.meta_store()).await?; + core.insert(user.name.clone(), user.clone()); + // TODO: add notification support. + Ok(()) + } + + pub async fn get_user(&self, user_name: &UserName) -> Result { + let core = self.core.lock().await; + + core.get(user_name) + .cloned() + .ok_or_else(|| RwError::from(InternalError(format!("User {} not found", user_name)))) + } + + pub async fn drop_user(&self, user_name: &UserName) -> Result<()> { + let mut core = self.core.lock().await; + if !core.contains_key(user_name) { + return Err(RwError::from(InternalError(format!( + "User {} does not exist", + user_name + )))); + } + if user_name == DEFAULT_SUPPER_USER { + return Err(RwError::from(InternalError(format!( + "Cannot drop default super user {}", + user_name + )))); + } + if !core.get(user_name).unwrap().grant_privileges.is_empty() { + return Err(RwError::from(InternalError(format!( + "Cannot drop user {} with privileges", + user_name + )))); + } + + // TODO: add more check, like whether he owns any database/schema/table/source. + UserInfo::delete(self.env.meta_store(), user_name).await?; + core.remove(user_name); + + // TODO: add notification support. + Ok(()) + } +} + +/// Defines privilege grant for a user. +impl UserManager { + // Merge new granted privilege. + #[inline(always)] + fn merge_privilege(origin_privilege: &mut GrantPrivilege, new_privilege: &GrantPrivilege) { + assert_eq!(origin_privilege.target, new_privilege.target); + + let mut privilege_map = HashMap::::from_iter( + origin_privilege + .privilege_with_opts + .iter() + .map(|po| (po.privilege, po.with_grant_option)), + ); + for npo in &new_privilege.privilege_with_opts { + if let Some(po) = privilege_map.get_mut(&npo.privilege) { + *po |= npo.with_grant_option; + } else { + privilege_map.insert(npo.privilege, npo.with_grant_option); + } + } + origin_privilege.privilege_with_opts = privilege_map + .into_iter() + .map(|(privilege, with_grant_option)| PrivilegeWithGrantOption { + privilege, + with_grant_option, + }) + .collect(); + } + + pub async fn grant_privilege( + &self, + user_name: &UserName, + new_grant_privileges: &[GrantPrivilege], + ) -> Result<()> { + let mut core = self.core.lock().await; + let mut user = core + .get(user_name) + .ok_or_else(|| InternalError(format!("User {} does not exist", user_name))) + .cloned()?; + + if user.is_supper { + return Err(RwError::from(InternalError(format!( + "Cannot grant privilege to supper user {}", + user_name + )))); + } + + new_grant_privileges.iter().for_each(|new_grant_privilege| { + if let Some(privilege) = user + .grant_privileges + .iter_mut() + .find(|p| p.target == new_grant_privilege.target) + { + Self::merge_privilege(privilege, new_grant_privilege); + } else { + user.grant_privileges.push(new_grant_privilege.clone()); + } + }); + + user.insert(self.env.meta_store()).await?; + core.insert(user_name.clone(), user); + Ok(()) + } + + // Revoke privilege from target. + #[inline(always)] + fn revoke_privilege_inner( + origin_privilege: &mut GrantPrivilege, + revoke_grant_privilege: &GrantPrivilege, + revoke_grant_option: bool, + ) { + assert_eq!(origin_privilege.target, revoke_grant_privilege.target); + + if revoke_grant_option { + // Only revoke with grant option. + origin_privilege + .privilege_with_opts + .iter_mut() + .for_each(|po| { + if revoke_grant_privilege + .privilege_with_opts + .iter() + .any(|ro| ro.privilege == po.privilege) + { + po.with_grant_option = false; + } + }) + } else { + // Revoke all privileges matched with revoke_grant_privilege. + origin_privilege.privilege_with_opts.retain(|po| { + !revoke_grant_privilege + .privilege_with_opts + .iter() + .any(|ro| ro.privilege == po.privilege) + }); + } + } + + pub async fn revoke_privilege( + &self, + user_name: &UserName, + revoke_grant_privileges: &[GrantPrivilege], + revoke_grant_option: bool, + ) -> Result<()> { + let mut core = self.core.lock().await; + let mut user = core + .get(user_name) + .ok_or_else(|| InternalError(format!("User {} does not exist", user_name))) + .cloned()?; + + if user.is_supper { + return Err(RwError::from(InternalError(format!( + "Cannot revoke privilege from supper user {}", + user_name + )))); + } + + let mut empty_privilege = false; + revoke_grant_privileges + .iter() + .for_each(|revoke_grant_privilege| { + for privilege in &mut user.grant_privileges { + if privilege.target == revoke_grant_privilege.target { + Self::revoke_privilege_inner( + privilege, + revoke_grant_privilege, + revoke_grant_option, + ); + empty_privilege |= privilege.privilege_with_opts.is_empty(); + break; + } + } + }); + + if empty_privilege { + user.grant_privileges + .retain(|privilege| !privilege.privilege_with_opts.is_empty()); + } + + user.insert(self.env.meta_store()).await?; + core.insert(user_name.clone(), user); + Ok(()) + } + + /// `release_privileges` removes the privileges with given target from all users, it will be + /// called when a database/schema/table/source is dropped. + pub async fn release_privileges(&self, target: &Target) -> Result<()> { + let mut core = self.core.lock().await; + let mut transaction = Transaction::default(); + let mut users_need_update = vec![]; + for user in core.values() { + let cnt = user.grant_privileges.len(); + let mut user = user.clone(); + user.grant_privileges + .retain(|p| p.target.as_ref().unwrap() != target); + if cnt != user.grant_privileges.len() { + user.upsert_in_transaction(&mut transaction)?; + users_need_update.push(user); + } + } + self.env.meta_store().txn(transaction).await?; + for user in users_need_update { + core.insert(user.name.clone(), user); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use risingwave_pb::user::grant_privilege::{GrantTable, Privilege}; + + use super::*; + + fn make_test_user(name: &str) -> UserInfo { + UserInfo { + name: name.to_string(), + ..Default::default() + } + } + + fn make_privilege( + target: Target, + privileges: &[Privilege], + with_grant_option: bool, + ) -> GrantPrivilege { + GrantPrivilege { + target: Some(target), + privilege_with_opts: privileges + .iter() + .map(|&p| PrivilegeWithGrantOption { + privilege: p as i32, + with_grant_option, + }) + .collect(), + } + } + + #[tokio::test] + async fn test_user_manager() -> Result<()> { + let user_manager = UserManager::new(MetaSrvEnv::for_test().await).await?; + let test_user = "test_user"; + user_manager.create_user(&make_test_user(test_user)).await?; + assert!(user_manager + .create_user(&make_test_user(DEFAULT_SUPPER_USER)) + .await + .is_err()); + + let users = user_manager.list_users().await?; + assert_eq!(users.len(), 2); + + let target = Target::GrantTable(GrantTable { + database_id: 0, + schema_id: 0, + table_id: 0, + }); + // Grant Select/Insert without grant option. + user_manager + .grant_privilege( + &test_user.to_string(), + &[make_privilege( + target.clone(), + &[Privilege::Select, Privilege::Insert], + false, + )], + ) + .await?; + let user = user_manager.get_user(&test_user.to_string()).await?; + assert_eq!(user.grant_privileges.len(), 1); + assert_eq!(user.grant_privileges[0].target, Some(target.clone())); + assert_eq!(user.grant_privileges[0].privilege_with_opts.len(), 2); + assert!(user.grant_privileges[0] + .privilege_with_opts + .iter() + .all(|p| !p.with_grant_option)); + + // Grant Select/Insert with grant option. + user_manager + .grant_privilege( + &test_user.to_string(), + &[make_privilege( + target.clone(), + &[Privilege::Select, Privilege::Insert], + true, + )], + ) + .await?; + let user = user_manager.get_user(&test_user.to_string()).await?; + assert_eq!(user.grant_privileges.len(), 1); + assert_eq!(user.grant_privileges[0].target, Some(target.clone())); + assert_eq!(user.grant_privileges[0].privilege_with_opts.len(), 2); + assert!(user.grant_privileges[0] + .privilege_with_opts + .iter() + .all(|p| p.with_grant_option)); + + // Grant Select/Update/Delete with grant option, while Select is duplicated. + user_manager + .grant_privilege( + &test_user.to_string(), + &[make_privilege( + target.clone(), + &[Privilege::Select, Privilege::Update, Privilege::Delete], + true, + )], + ) + .await?; + let user = user_manager.get_user(&test_user.to_string()).await?; + assert_eq!(user.grant_privileges.len(), 1); + assert_eq!(user.grant_privileges[0].target, Some(target.clone())); + assert_eq!(user.grant_privileges[0].privilege_with_opts.len(), 4); + assert!(user.grant_privileges[0] + .privilege_with_opts + .iter() + .all(|p| p.with_grant_option)); + + // Revoke Select/Update/Delete/Insert with grant option. + user_manager + .revoke_privilege( + &test_user.to_string(), + &[make_privilege( + target.clone(), + &[ + Privilege::Select, + Privilege::Insert, + Privilege::Delete, + Privilege::Update, + ], + false, + )], + true, + ) + .await?; + let user = user_manager.get_user(&test_user.to_string()).await?; + assert_eq!(user.grant_privileges[0].privilege_with_opts.len(), 4); + assert!(user.grant_privileges[0] + .privilege_with_opts + .iter() + .all(|p| !p.with_grant_option)); + + // Revoke Select/Delete/Insert. + user_manager + .revoke_privilege( + &test_user.to_string(), + &[make_privilege( + target.clone(), + &[Privilege::Select, Privilege::Insert, Privilege::Delete], + false, + )], + false, + ) + .await?; + let user = user_manager.get_user(&test_user.to_string()).await?; + assert_eq!(user.grant_privileges.len(), 1); + assert_eq!(user.grant_privileges[0].privilege_with_opts.len(), 1); + + // Release all privileges with target. + user_manager.release_privileges(&target).await?; + let user = user_manager.get_user(&test_user.to_string()).await?; + assert!(user.grant_privileges.is_empty()); + + Ok(()) + } +} diff --git a/src/meta/src/model/mod.rs b/src/meta/src/model/mod.rs index fae3f4ef8a3f1..eab79db146025 100644 --- a/src/meta/src/model/mod.rs +++ b/src/meta/src/model/mod.rs @@ -16,6 +16,7 @@ mod barrier; mod catalog; mod cluster; mod stream; +mod user; use std::collections::BTreeMap; use std::ops::{Deref, DerefMut}; @@ -27,6 +28,7 @@ pub use cluster::*; use prost::Message; use risingwave_common::error::Result; pub use stream::*; +pub use user::*; use crate::storage::{self, MetaStore, Transaction}; diff --git a/src/meta/src/model/user.rs b/src/meta/src/model/user.rs new file mode 100644 index 0000000000000..df13314f1ef3d --- /dev/null +++ b/src/meta/src/model/user.rs @@ -0,0 +1,42 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_pb::user::UserInfo; + +use crate::model::MetadataModel; + +/// Column family name for user info. +const USER_INFO_CF_NAME: &str = "cf/user_info"; + +/// `UserInfo` stores the user information. +impl MetadataModel for UserInfo { + type KeyType = String; + type ProstType = UserInfo; + + fn cf_name() -> String { + USER_INFO_CF_NAME.to_string() + } + + fn to_protobuf(&self) -> Self::ProstType { + self.clone() + } + + fn from_protobuf(prost: Self::ProstType) -> Self { + prost + } + + fn key(&self) -> risingwave_common::error::Result { + Ok(self.name.clone()) + } +} diff --git a/src/meta/src/rpc/server.rs b/src/meta/src/rpc/server.rs index 737f2be1daac6..5211c958ecbf4 100644 --- a/src/meta/src/rpc/server.rs +++ b/src/meta/src/rpc/server.rs @@ -25,6 +25,7 @@ use risingwave_pb::meta::cluster_service_server::ClusterServiceServer; use risingwave_pb::meta::heartbeat_service_server::HeartbeatServiceServer; use risingwave_pb::meta::notification_service_server::NotificationServiceServer; use risingwave_pb::meta::stream_manager_service_server::StreamManagerServiceServer; +use risingwave_pb::user::user_service_server::UserServiceServer; use tokio::sync::oneshot::Sender; use tokio::task::JoinHandle; @@ -36,12 +37,13 @@ use crate::cluster::ClusterManager; use crate::dashboard::DashboardService; use crate::hummock; use crate::hummock::CompactionScheduler; -use crate::manager::{CatalogManager, MetaOpts, MetaSrvEnv}; +use crate::manager::{CatalogManager, MetaOpts, MetaSrvEnv, UserManager}; use crate::rpc::metrics::MetaMetrics; use crate::rpc::service::cluster_service::ClusterServiceImpl; use crate::rpc::service::heartbeat_service::HeartbeatServiceImpl; use crate::rpc::service::hummock_service::HummockServiceImpl; use crate::rpc::service::stream_service::StreamServiceImpl; +use crate::rpc::service::user_service::UserServiceImpl; use crate::storage::{EtcdMetaStore, MemStore, MetaStore}; use crate::stream::{FragmentManager, GlobalStreamManager, SourceManager}; @@ -136,12 +138,13 @@ pub async fn rpc_serve_with_store( tokio::spawn(dashboard_service.serve(ui_path)); } - let catalog_manager_v2 = Arc::new(CatalogManager::new(env.clone()).await.unwrap()); + let catalog_manager = Arc::new(CatalogManager::new(env.clone()).await.unwrap()); + let user_manager = UserManager::new(env.clone()).await.unwrap(); let barrier_manager = Arc::new(GlobalBarrierManager::new( env.clone(), cluster_manager.clone(), - catalog_manager_v2.clone(), + catalog_manager.clone(), fragment_manager.clone(), hummock_manager.clone(), meta_metrics.clone(), @@ -152,7 +155,7 @@ pub async fn rpc_serve_with_store( env.clone(), cluster_manager.clone(), barrier_manager.clone(), - catalog_manager_v2.clone(), + catalog_manager.clone(), ) .await .unwrap(), @@ -189,12 +192,13 @@ pub async fn rpc_serve_with_store( let heartbeat_srv = HeartbeatServiceImpl::new(cluster_manager.clone()); let ddl_srv = DdlServiceImpl::::new( env.clone(), - catalog_manager_v2.clone(), + catalog_manager.clone(), stream_manager.clone(), source_manager, cluster_manager.clone(), fragment_manager.clone(), ); + let user_srv = UserServiceImpl::::new(catalog_manager.clone(), user_manager); let cluster_srv = ClusterServiceImpl::::new(cluster_manager.clone()); let stream_srv = StreamServiceImpl::::new(stream_manager); let hummock_srv = HummockServiceImpl::new( @@ -204,7 +208,7 @@ pub async fn rpc_serve_with_store( ); let notification_manager = env.notification_manager_ref(); let notification_srv = - NotificationServiceImpl::new(env, catalog_manager_v2, cluster_manager.clone()); + NotificationServiceImpl::new(env, catalog_manager, cluster_manager.clone()); if let Some(prometheus_addr) = prometheus_addr { meta_metrics.boot_metrics_service(prometheus_addr); @@ -239,6 +243,7 @@ pub async fn rpc_serve_with_store( .add_service(HummockManagerServiceServer::new(hummock_srv)) .add_service(NotificationServiceServer::new(notification_srv)) .add_service(DdlServiceServer::new(ddl_srv)) + .add_service(UserServiceServer::new(user_srv)) .serve_with_shutdown(addr, async move { tokio::select! { _ = tokio::signal::ctrl_c() => {}, diff --git a/src/meta/src/rpc/service/mod.rs b/src/meta/src/rpc/service/mod.rs index 255e3b9253b29..3230b34c8aa09 100644 --- a/src/meta/src/rpc/service/mod.rs +++ b/src/meta/src/rpc/service/mod.rs @@ -18,6 +18,7 @@ pub mod heartbeat_service; pub mod hummock_service; pub mod notification_service; pub mod stream_service; +pub mod user_service; use std::pin::Pin; use std::task::{Context, Poll}; diff --git a/src/meta/src/rpc/service/user_service.rs b/src/meta/src/rpc/service/user_service.rs new file mode 100644 index 0000000000000..be1c468a9aaf6 --- /dev/null +++ b/src/meta/src/rpc/service/user_service.rs @@ -0,0 +1,190 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::error::{tonic_err, Result as RwResult}; +use risingwave_pb::user::grant_privilege::{GrantSource, GrantTable, Target}; +use risingwave_pb::user::user_service_server::UserService; +use risingwave_pb::user::{ + CreateUserRequest, CreateUserResponse, DropUserRequest, DropUserResponse, GrantPrivilege, + GrantPrivilegeRequest, GrantPrivilegeResponse, RevokePrivilegeRequest, RevokePrivilegeResponse, +}; +use tonic::{Request, Response, Status}; + +use crate::manager::{CatalogManagerRef, UserManager}; +use crate::storage::MetaStore; + +// TODO: Change user manager as a part of the catalog manager, to ensure that operations on Catalog +// and User are transactional. +pub struct UserServiceImpl { + catalog_manager: CatalogManagerRef, + user_manager: UserManager, +} + +impl UserServiceImpl +where + S: MetaStore, +{ + pub fn new(catalog_manager: CatalogManagerRef, user_manager: UserManager) -> Self { + Self { + catalog_manager, + user_manager, + } + } + + /// Expands `GrantPrivilege` with target `GrantAllTables` or `GrantAllSources` to specific + /// tables and sources, and set `with_grant_option` inside when grant privilege to a user. + async fn expand_privilege( + &self, + privileges: &[GrantPrivilege], + with_grant_option: Option, + ) -> RwResult> { + let mut expanded_privileges = Vec::new(); + for privilege in privileges { + if let Some(Target::GrantAllTables(target)) = &privilege.target { + let tables = self + .catalog_manager + .list_tables(target.database_id, target.schema_id) + .await?; + for table_id in tables { + let mut privilege = privilege.clone(); + privilege.target = Some(Target::GrantTable(GrantTable { + database_id: target.database_id, + schema_id: target.schema_id, + table_id, + })); + if let Some(true) = with_grant_option { + privilege + .privilege_with_opts + .iter_mut() + .for_each(|p| p.with_grant_option = true); + } + expanded_privileges.push(privilege); + } + } else if let Some(Target::GrantAllSources(target)) = &privilege.target { + let sources = self + .catalog_manager + .list_sources(target.database_id, target.schema_id) + .await?; + for source_id in sources { + let mut privilege = privilege.clone(); + privilege.target = Some(Target::GrantSource(GrantSource { + database_id: target.database_id, + schema_id: target.schema_id, + source_id, + })); + if let Some(with_grant_option) = with_grant_option { + privilege.privilege_with_opts.iter_mut().for_each(|p| { + p.with_grant_option = with_grant_option; + }); + } + expanded_privileges.push(privilege); + } + } else { + let mut privilege = privilege.clone(); + if let Some(with_grant_option) = with_grant_option { + privilege.privilege_with_opts.iter_mut().for_each(|p| { + p.with_grant_option = with_grant_option; + }); + } + expanded_privileges.push(privilege); + } + } + + Ok(expanded_privileges) + } +} + +#[async_trait::async_trait] +impl UserService for UserServiceImpl { + #[cfg_attr(coverage, no_coverage)] + async fn create_user( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let user = req.get_user().map_err(tonic_err)?; + self.user_manager + .create_user(user) + .await + .map_err(tonic_err)?; + + Ok(Response::new(CreateUserResponse { + status: None, + version: 0, // TODO: fill version when supported in notification manager. + })) + } + + #[cfg_attr(coverage, no_coverage)] + async fn drop_user( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let user_name = req.name; + self.user_manager + .drop_user(&user_name) + .await + .map_err(tonic_err)?; + + Ok(Response::new(DropUserResponse { + status: None, + version: 0, // TODO: fill version when supported in notification manager. + })) + } + + #[cfg_attr(coverage, no_coverage)] + async fn grant_privilege( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let user_name = req.get_user_name(); + let new_privileges = self + .expand_privilege(req.get_privileges(), Some(req.with_grant_option)) + .await + .map_err(tonic_err)?; + self.user_manager + .grant_privilege(user_name, &new_privileges) + .await + .map_err(tonic_err)?; + + Ok(Response::new(GrantPrivilegeResponse { + status: None, + version: 0, // TODO: fill version when supported in notification manager. + })) + } + + #[cfg_attr(coverage, no_coverage)] + async fn revoke_privilege( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let user_name = req.get_user_name(); + let privileges = self + .expand_privilege(req.get_privileges(), None) + .await + .map_err(tonic_err)?; + let revoke_grant_option = req.revoke_grant_option; + self.user_manager + .revoke_privilege(user_name, &privileges, revoke_grant_option) + .await + .map_err(tonic_err)?; + + Ok(Response::new(RevokePrivilegeResponse { + status: None, + version: 0, // TODO: fill version when supported in notification manager. + })) + } +}