Skip to content

Commit

Permalink
feat: notify user privilege change for drop ddl and add `create_sourc…
Browse files Browse the repository at this point in the history
…e` function (#13230)
  • Loading branch information
yezizp2012 authored Nov 3, 2023
1 parent 31bf68e commit a07fb73
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 55 deletions.
25 changes: 25 additions & 0 deletions src/meta/model_v2/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_pb::catalog::source::OptionalAssociatedTableId;
use risingwave_pb::catalog::PbSource;
use sea_orm::entity::prelude::*;
use sea_orm::ActiveValue::Set;

use crate::{
ColumnCatalogArray, ConnectionId, I32Array, Property, SourceId, StreamSourceInfo, TableId,
Expand Down Expand Up @@ -78,3 +81,25 @@ impl Related<super::table::Entity> for Entity {
}

impl ActiveModelBehavior for ActiveModel {}

impl From<PbSource> for ActiveModel {
fn from(source: PbSource) -> Self {
let optional_associated_table_id = source.optional_associated_table_id.map(|x| match x {
OptionalAssociatedTableId::AssociatedTableId(id) => id,
});
Self {
source_id: Set(source.id as _),
name: Set(source.name),
row_id_index: Set(source.row_id_index as _),
columns: Set(ColumnCatalogArray(source.columns)),
pk_column_ids: Set(I32Array(source.pk_column_ids)),
properties: Set(Property(source.properties)),
definition: Set(source.definition),
source_info: Set(source.info.map(StreamSourceInfo)),
watermark_descs: Set(WatermarkDescArray(source.watermark_descs)),
optional_associated_table_id: Set(optional_associated_table_id),
connection_id: Set(source.connection_id),
version: Set(source.version),
}
}
}
164 changes: 147 additions & 17 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use risingwave_meta_model_v2::object::ObjectType;
use risingwave_meta_model_v2::prelude::*;
use risingwave_meta_model_v2::{
connection, database, function, index, object, object_dependency, schema, sink, source, table,
view, ColumnCatalogArray, ConnectionId, DatabaseId, FunctionId, ObjectId, PrivateLinkService,
SchemaId, SourceId, TableId, UserId,
user_privilege, view, ColumnCatalogArray, ConnectionId, DatabaseId, FunctionId, ObjectId,
PrivateLinkService, SchemaId, SourceId, TableId, UserId,
};
use risingwave_pb::catalog::{
PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable,
Expand All @@ -33,11 +33,11 @@ use risingwave_pb::meta::relation::PbRelationInfo;
use risingwave_pb::meta::subscribe_response::{
Info as NotificationInfo, Operation as NotificationOperation,
};
use risingwave_pb::meta::{PbRelation, PbRelationGroup};
use risingwave_pb::meta::{PbRelation, PbRelationGroup, PbTableFragments};
use sea_orm::ActiveValue::Set;
use sea_orm::{
ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait,
QueryFilter, QuerySelect, TransactionTrait,
ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait, JoinType,
QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
};
use tokio::sync::RwLock;

Expand All @@ -46,10 +46,10 @@ use crate::controller::utils::{
check_connection_name_duplicate, check_function_signature_duplicate,
check_relation_name_duplicate, check_schema_name_duplicate, ensure_object_id,
ensure_object_not_refer, ensure_schema_empty, ensure_user_id, get_referring_objects,
get_referring_objects_cascade, PartialObject,
get_referring_objects_cascade, list_user_info_by_ids, PartialObject,
};
use crate::controller::ObjectModel;
use crate::manager::{MetaSrvEnv, NotificationVersion};
use crate::manager::{MetaSrvEnv, NotificationVersion, StreamingJob};
use crate::rpc::ddl_controller::DropMode;
use crate::{MetaError, MetaResult};

Expand Down Expand Up @@ -213,14 +213,31 @@ impl CatalogController {
.map(|conn| conn.info)
.collect_vec();

// Find affect users with privileges on the database and the objects in the database.
let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
.select_only()
.distinct()
.column(user_privilege::Column::UserId)
.join(JoinType::InnerJoin, user_privilege::Relation::Object.def())
.filter(
object::Column::DatabaseId
.eq(database_id)
.or(user_privilege::Column::Oid.eq(database_id)),
)
.into_tuple()
.all(&txn)
.await?;

// The schema and objects in the database will be delete cascade.
let res = Object::delete_by_id(database_id).exec(&txn).await?;
if res.rows_affected == 0 {
return Err(MetaError::catalog_id_not_found("database", database_id));
}
let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?;

txn.commit().await?;

self.notify_users_update(user_infos).await;
let version = self
.notify_frontend(
NotificationOperation::Delete,
Expand Down Expand Up @@ -276,25 +293,44 @@ impl CatalogController {
drop_mode: DropMode,
) -> MetaResult<NotificationVersion> {
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;
let schema_obj = Object::find_by_id(schema_id)
.one(&inner.db)
.one(&txn)
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("schema", schema_id))?;
if drop_mode == DropMode::Restrict {
ensure_schema_empty(schema_id, &inner.db).await?;
ensure_schema_empty(schema_id, &txn).await?;
}

// Find affect users with privileges on the schema and the objects in the schema.
let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
.select_only()
.distinct()
.column(user_privilege::Column::UserId)
.join(JoinType::InnerJoin, user_privilege::Relation::Object.def())
.filter(
object::Column::SchemaId
.eq(schema_id)
.or(user_privilege::Column::Oid.eq(schema_id)),
)
.into_tuple()
.all(&txn)
.await?;

let res = Object::delete(object::ActiveModel {
oid: Set(schema_id),
..Default::default()
})
.exec(&inner.db)
.exec(&txn)
.await?;
if res.rows_affected == 0 {
return Err(MetaError::catalog_id_not_found("schema", schema_id));
}
let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?;

txn.commit().await?;

// todo: update user privileges accordingly.
self.notify_users_update(user_infos).await;
let version = self
.notify_frontend(
NotificationOperation::Delete,
Expand All @@ -308,6 +344,52 @@ impl CatalogController {
Ok(version)
}

pub fn create_stream_job(
&self,
_stream_job: &StreamingJob,
_table_fragments: &PbTableFragments,
_internal_tables: Vec<PbTable>,
) -> MetaResult<()> {
todo!()
}

pub async fn create_source(&self, mut pb_source: PbSource) -> MetaResult<NotificationVersion> {
let inner = self.inner.write().await;
let owner_id = pb_source.owner;
let txn = inner.db.begin().await?;
ensure_user_id(owner_id, &txn).await?;
ensure_object_id(ObjectType::Database, pb_source.database_id, &txn).await?;
ensure_object_id(ObjectType::Schema, pb_source.schema_id, &txn).await?;
check_relation_name_duplicate(
&pb_source.name,
pb_source.database_id,
pb_source.schema_id,
&txn,
)
.await?;

let source_obj = Self::create_object(
&txn,
ObjectType::Source,
owner_id,
Some(pb_source.database_id),
Some(pb_source.schema_id),
)
.await?;
pb_source.id = source_obj.oid;
let source: source::ActiveModel = pb_source.clone().into();
source.insert(&txn).await?;
txn.commit().await?;

let version = self
.notify_frontend_relation_info(
NotificationOperation::Add,
PbRelationInfo::Source(pb_source),
)
.await;
Ok(version)
}

pub async fn create_function(
&self,
mut pb_function: PbFunction,
Expand Down Expand Up @@ -344,17 +426,32 @@ impl CatalogController {

pub async fn drop_function(&self, function_id: FunctionId) -> MetaResult<NotificationVersion> {
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;
let function_obj = Object::find_by_id(function_id)
.one(&inner.db)
.one(&txn)
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("function", function_id))?;
ensure_object_not_refer(ObjectType::Function, function_id, &inner.db).await?;
ensure_object_not_refer(ObjectType::Function, function_id, &txn).await?;

let res = Object::delete_by_id(function_id).exec(&inner.db).await?;
// Find affect users with privileges on the function.
let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
.select_only()
.distinct()
.column(user_privilege::Column::UserId)
.filter(user_privilege::Column::Oid.eq(function_id))
.into_tuple()
.all(&txn)
.await?;

let res = Object::delete_by_id(function_id).exec(&txn).await?;
if res.rows_affected == 0 {
return Err(MetaError::catalog_id_not_found("function", function_id));
}
let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?;

txn.commit().await?;

self.notify_users_update(user_infos).await;
let version = self
.notify_frontend(
NotificationOperation::Delete,
Expand Down Expand Up @@ -423,17 +520,32 @@ impl CatalogController {
connection_id: ConnectionId,
) -> MetaResult<NotificationVersion> {
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;
let connection_obj = Object::find_by_id(connection_id)
.one(&inner.db)
.one(&txn)
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("connection", connection_id))?;
ensure_object_not_refer(ObjectType::Connection, connection_id, &inner.db).await?;
ensure_object_not_refer(ObjectType::Connection, connection_id, &txn).await?;

let res = Object::delete_by_id(connection_id).exec(&inner.db).await?;
// Find affect users with privileges on the connection.
let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
.select_only()
.distinct()
.column(user_privilege::Column::UserId)
.filter(user_privilege::Column::Oid.eq(connection_id))
.into_tuple()
.all(&txn)
.await?;

let res = Object::delete_by_id(connection_id).exec(&txn).await?;
if res.rows_affected == 0 {
return Err(MetaError::catalog_id_not_found("connection", connection_id));
}
let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?;

txn.commit().await?;

self.notify_users_update(user_infos).await;
let version = self
.notify_frontend(
NotificationOperation::Delete,
Expand Down Expand Up @@ -636,6 +748,16 @@ impl CatalogController {
.await?;
to_drop_objects.extend(to_drop_internal_table_objs);

// Find affect users with privileges on all this objects.
let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
.select_only()
.distinct()
.column(user_privilege::Column::UserId)
.filter(user_privilege::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
.into_tuple()
.all(&txn)
.await?;

// delete all in to_drop_objects.
let res = Object::delete_many()
.filter(object::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
Expand All @@ -647,8 +769,12 @@ impl CatalogController {
object_id,
));
}
let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?;

txn.commit().await?;

// notify about them.
self.notify_users_update(user_infos).await;
let relations = to_drop_objects
.into_iter()
.map(|obj| match obj.obj_type {
Expand Down Expand Up @@ -894,6 +1020,10 @@ mod tests {
let view = View::find().one(&mgr.inner.read().await.db).await?.unwrap();
mgr.drop_relation(ObjectType::View, view.view_id, DropMode::Cascade)
.await?;
assert!(View::find_by_id(view.view_id)
.one(&mgr.inner.read().await.db)
.await?
.is_none());

Ok(())
}
Expand Down
Loading

0 comments on commit a07fb73

Please sign in to comment.