Skip to content

Commit

Permalink
feat(frontend): support user alter (risingwavelabs#4261)
Browse files Browse the repository at this point in the history
* alter user

* fix

* fix

* fix typo in e2e test

* fmt

* fix

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and nasnoisaac committed Aug 9, 2022
1 parent 98a3303 commit c6da9b3
Show file tree
Hide file tree
Showing 17 changed files with 423 additions and 48 deletions.
12 changes: 10 additions & 2 deletions e2e_test/ddl/user.slt
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
# Create a user.
statement ok
CREATE USER ddl_user WITH NOSUPERUSER CREATEDB PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b';
CREATE USER user WITH SUPERUSER NOCREATEDB PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b';

# Create another user with duplicate name.
statement error
CREATE USER ddl_user;
CREATE USER user;

# Alter user name.
statement ok
ALTER USER user RENAME TO ddl_user;

# Alter user properties.
statement ok
ALTER USER ddl_user WITH NOSUPERUSER CREATEDB PASSWORD 'md59f2fa6a30871a92249bdd2f1eeee4ef6';

# Drop the user if exists.
statement ok
Expand Down
19 changes: 19 additions & 0 deletions proto/user.proto
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,24 @@ message DropUserResponse {
uint64 version = 2;
}

message UpdateUserRequest {
enum UpdateField {
UNKNOWN = 0;
SUPER = 1;
LOGIN = 2;
CREATE_DB = 3;
AUTH_INFO = 4;
RENAME = 5;
}
UserInfo user = 1;
repeated UpdateField update_fields = 2;
}

message UpdateUserResponse {
common.Status status = 1;
uint64 version = 2;
}

message GrantPrivilegeRequest {
repeated uint32 user_ids = 1;
repeated GrantPrivilege privileges = 2;
Expand Down Expand Up @@ -111,6 +129,7 @@ service UserService {
// https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/FieldMask.html.
rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
rpc DropUser(DropUserRequest) returns (DropUserResponse);
rpc UpdateUser(UpdateUserRequest) returns (UpdateUserResponse);

// GrantPrivilege grants a privilege to a user.
rpc GrantPrivilege(GrantPrivilegeRequest) returns (GrantPrivilegeResponse);
Expand Down
154 changes: 154 additions & 0 deletions src/frontend/src/handler/alter_user.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::error::Result;
use risingwave_pb::user::update_user_request::UpdateField;
use risingwave_pb::user::{UpdateUserRequest, UserInfo};
use risingwave_sqlparser::ast::{AlterUserStatement, UserOption, UserOptions};

use crate::binder::Binder;
use crate::catalog::CatalogError;
use crate::session::OptimizerContext;
use crate::user::user_authentication::{encrypt_default, encrypted_password};

fn alter_prost_user_info(
mut user_info: UserInfo,
options: &UserOptions,
) -> Result<UpdateUserRequest> {
let mut update_fields = Vec::new();
for option in &options.0 {
match option {
UserOption::SuperUser => {
user_info.is_supper = true;
update_fields.push(UpdateField::Super as i32);
}
UserOption::NoSuperUser => {
user_info.is_supper = false;
update_fields.push(UpdateField::Super as i32);
}
UserOption::CreateDB => {
user_info.can_create_db = true;
update_fields.push(UpdateField::CreateDb as i32);
}
UserOption::NoCreateDB => {
user_info.can_create_db = false;
update_fields.push(UpdateField::CreateDb as i32);
}
UserOption::Login => {
user_info.can_login = true;
update_fields.push(UpdateField::Login as i32);
}
UserOption::NoLogin => {
user_info.can_login = false;
update_fields.push(UpdateField::Login as i32);
}
UserOption::EncryptedPassword(p) => {
if !p.0.is_empty() {
user_info.auth_info = Some(encrypt_default(&user_info.name, &p.0));
update_fields.push(UpdateField::AuthInfo as i32);
}
}
UserOption::Password(opt) => {
if let Some(password) = opt {
user_info.auth_info = encrypted_password(&user_info.name, &password.0);
update_fields.push(UpdateField::AuthInfo as i32);
}
}
}
}
let request = UpdateUserRequest {
user: Some(user_info),
update_fields,
};
Ok(request)
}

pub async fn handle_alter_user(
context: OptimizerContext,
stmt: AlterUserStatement,
) -> Result<PgResponse> {
let session = context.session_ctx;
let user_name = Binder::resolve_user_name(stmt.user_name.clone())?;
let mut old_info = {
let user_reader = session.env().user_info_reader();
let reader = user_reader.read_guard();
if let Some(origin_info) = reader.get_user_by_name(&user_name) {
origin_info.clone()
} else {
return Err(CatalogError::NotFound("user", user_name).into());
}
};
let request = match stmt.mode {
risingwave_sqlparser::ast::AlterUserMode::Options(options) => {
alter_prost_user_info(old_info, &options)?
}
risingwave_sqlparser::ast::AlterUserMode::Rename(new_name) => {
old_info.name = Binder::resolve_user_name(new_name)?;
UpdateUserRequest {
user: Some(old_info),
update_fields: vec![UpdateField::Rename as i32],
}
}
};
let user_info_writer = session.env().user_info_writer();
user_info_writer.update_user(request).await?;
Ok(PgResponse::empty_result(StatementType::UPDATE_USER))
}

#[cfg(test)]
mod tests {
use risingwave_pb::user::auth_info::EncryptionType;
use risingwave_pb::user::AuthInfo;

use crate::test_utils::LocalFrontend;

#[tokio::test]
async fn test_alter_user() {
let frontend = LocalFrontend::new(Default::default()).await;
let session = frontend.session_ref();
let user_info_reader = session.env().user_info_reader();

frontend.run_sql("CREATE USER userB WITH SUPERUSER NOCREATEDB PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'").await.unwrap();
frontend
.run_sql("ALTER USER userB RENAME TO user")
.await
.unwrap();
assert!(user_info_reader
.read_guard()
.get_user_by_name("userB")
.is_none());
assert!(user_info_reader
.read_guard()
.get_user_by_name("user")
.is_some());

frontend.run_sql("ALTER USER user WITH NOSUPERUSER CREATEDB PASSWORD 'md59f2fa6a30871a92249bdd2f1eeee4ef6'").await.unwrap();

let user_info = user_info_reader
.read_guard()
.get_user_by_name("user")
.cloned()
.unwrap();
assert!(!user_info.is_supper);
assert!(user_info.can_create_db);
assert_eq!(
user_info.auth_info,
Some(AuthInfo {
encryption_type: EncryptionType::Md5 as i32,
encrypted_value: b"9f2fa6a30871a92249bdd2f1eeee4ef6".to_vec()
})
);
}
}
22 changes: 10 additions & 12 deletions src/frontend/src/handler/create_user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::error::Result;
use risingwave_pb::user::UserInfo;
use risingwave_sqlparser::ast::{
CreateUserOption, CreateUserStatement, CreateUserWithOptions, ObjectName,
};
use risingwave_sqlparser::ast::{CreateUserStatement, ObjectName, UserOption, UserOptions};

use crate::binder::Binder;
use crate::catalog::CatalogError;
use crate::session::OptimizerContext;
use crate::user::user_authentication::{encrypt_default, encrypted_password};

fn make_prost_user_info(name: ObjectName, options: &CreateUserWithOptions) -> Result<UserInfo> {
pub(crate) fn make_prost_user_info(name: ObjectName, options: &UserOptions) -> Result<UserInfo> {
let mut user_info = UserInfo {
name: Binder::resolve_user_name(name)?,
// the LOGIN option is implied if it is not explicitly specified.
Expand All @@ -33,18 +31,18 @@ fn make_prost_user_info(name: ObjectName, options: &CreateUserWithOptions) -> Re
};
for option in &options.0 {
match option {
CreateUserOption::SuperUser => user_info.is_supper = true,
CreateUserOption::NoSuperUser => user_info.is_supper = false,
CreateUserOption::CreateDB => user_info.can_create_db = true,
CreateUserOption::NoCreateDB => user_info.can_create_db = false,
CreateUserOption::Login => user_info.can_login = true,
CreateUserOption::NoLogin => user_info.can_login = false,
CreateUserOption::EncryptedPassword(p) => {
UserOption::SuperUser => user_info.is_supper = true,
UserOption::NoSuperUser => user_info.is_supper = false,
UserOption::CreateDB => user_info.can_create_db = true,
UserOption::NoCreateDB => user_info.can_create_db = false,
UserOption::Login => user_info.can_login = true,
UserOption::NoLogin => user_info.can_login = false,
UserOption::EncryptedPassword(p) => {
if !p.0.is_empty() {
user_info.auth_info = Some(encrypt_default(&user_info.name, &p.0));
}
}
CreateUserOption::Password(opt) => {
UserOption::Password(opt) => {
if let Some(password) = opt {
user_info.auth_info = encrypted_password(&user_info.name, &password.0);
}
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use risingwave_sqlparser::ast::{DropStatement, ObjectType, Statement, WithProper

use crate::session::{OptimizerContext, SessionImpl};

pub mod alter_user;
mod create_database;
pub mod create_index;
pub mod create_mv;
Expand Down Expand Up @@ -84,6 +85,7 @@ pub async fn handle(
..
} => create_schema::handle_create_schema(context, schema_name, if_not_exists).await,
Statement::CreateUser(stmt) => create_user::handle_create_user(context, stmt).await,
Statement::AlterUser(stmt) => alter_user::handle_alter_user(context, stmt).await,
Statement::Grant { .. } => handle_privilege::handle_grant_privilege(context, stmt).await,
Statement::Revoke { .. } => handle_privilege::handle_revoke_privilege(context, stmt).await,
Statement::Describe { name } => describe::handle_describe(context, name),
Expand Down
26 changes: 25 additions & 1 deletion src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ use risingwave_pb::catalog::{
use risingwave_pb::common::ParallelUnitMapping;
use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
use risingwave_pb::stream_plan::StreamFragmentGraph;
use risingwave_pb::user::{GrantPrivilege, UserInfo};
use risingwave_pb::user::update_user_request::UpdateField;
use risingwave_pb::user::{GrantPrivilege, UpdateUserRequest, UserInfo};
use risingwave_rpc_client::error::Result as RpcResult;
use risingwave_sqlparser::ast::Statement;
use risingwave_sqlparser::parser::Parser;
Expand Down Expand Up @@ -402,6 +403,29 @@ impl UserInfoWriter for MockUserInfoWriter {
Ok(())
}

async fn update_user(&self, request: UpdateUserRequest) -> Result<()> {
let mut lock = self.user_info.write();
let update_user = request.user.unwrap();
let id = update_user.get_id();
let old_name = lock.get_user_name_by_id(id).unwrap();
let mut user_info = lock.get_user_by_name(&old_name).unwrap().clone();
request.update_fields.into_iter().for_each(|field| {
if field == UpdateField::Super as i32 {
user_info.is_supper = update_user.is_supper;
} else if field == UpdateField::Login as i32 {
user_info.can_login = update_user.can_login;
} else if field == UpdateField::CreateDb as i32 {
user_info.can_create_db = update_user.can_create_db;
} else if field == UpdateField::AuthInfo as i32 {
user_info.auth_info = update_user.auth_info.clone();
} else if field == UpdateField::Rename as i32 {
user_info.name = update_user.name.clone();
}
});
lock.update_user(update_user);
Ok(())
}

/// In `MockUserInfoWriter`, we don't support expand privilege with `GrantAllTables` and
/// `GrantAllSources` when grant privilege to user.
async fn grant_privilege(
Expand Down
7 changes: 6 additions & 1 deletion src/frontend/src/user/user_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@ impl UserInfoManager {
pub fn update_user(&mut self, user_info: UserInfo) {
let id = user_info.id;
let name = user_info.name.clone();
self.user_by_name.insert(name.clone(), user_info).unwrap();
if let Some(old_name) = self.get_user_name_by_id(id) {
self.user_by_name.remove(&old_name);
self.user_by_name.insert(name.clone(), user_info);
} else {
self.user_by_name.insert(name.clone(), user_info).unwrap();
}
self.user_name_by_id.insert(id, name).unwrap();
}

Expand Down
9 changes: 8 additions & 1 deletion src/frontend/src/user/user_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use parking_lot::lock_api::ArcRwLockReadGuard;
use parking_lot::{RawRwLock, RwLock};
use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::{Result, RwError};
use risingwave_pb::user::{GrantPrivilege, UserInfo};
use risingwave_pb::user::{GrantPrivilege, UpdateUserRequest, UserInfo};
use risingwave_rpc_client::MetaClient;
use tokio::sync::watch::Receiver;

Expand All @@ -45,6 +45,8 @@ pub trait UserInfoWriter: Send + Sync {

async fn drop_user(&self, id: UserId) -> Result<()>;

async fn update_user(&self, request: UpdateUserRequest) -> Result<()>;

async fn grant_privilege(
&self,
users: Vec<UserId>,
Expand Down Expand Up @@ -82,6 +84,11 @@ impl UserInfoWriter for UserInfoWriterImpl {
self.wait_version(version).await
}

async fn update_user(&self, request: UpdateUserRequest) -> Result<()> {
let version = self.meta_client.update_user(request).await?;
self.wait_version(version).await
}

async fn grant_privilege(
&self,
users: Vec<UserId>,
Expand Down
Loading

0 comments on commit c6da9b3

Please sign in to comment.