diff --git a/src/meta/model_v2/migration/src/m20230908_072257_init.rs b/src/meta/model_v2/migration/src/m20230908_072257_init.rs index 13c7a7a4a188..a21a9a601707 100644 --- a/src/meta/model_v2/migration/src/m20230908_072257_init.rs +++ b/src/meta/model_v2/migration/src/m20230908_072257_init.rs @@ -466,6 +466,7 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Table::Name).string().not_null()) .col(ColumnDef::new(Table::OptionalAssociatedSourceId).integer()) .col(ColumnDef::new(Table::TableType).string().not_null()) + .col(ColumnDef::new(Table::BelongsToJobId).integer()) .col(ColumnDef::new(Table::Columns).json().not_null()) .col(ColumnDef::new(Table::Pk).json().not_null()) .col(ColumnDef::new(Table::DistributionKey).json().not_null()) @@ -508,6 +509,14 @@ impl MigrationTrait for Migration { .on_delete(ForeignKeyAction::Cascade) .to_owned(), ) + .foreign_key( + &mut ForeignKey::create() + .name("FK_table_belongs_to_job_id") + .from(Table::Table, Table::BelongsToJobId) + .to(Object::Table, Object::Oid) + .on_delete(ForeignKeyAction::Cascade) + .to_owned(), + ) .foreign_key( &mut ForeignKey::create() .name("FK_table_fragment_id") @@ -914,6 +923,7 @@ enum Table { Name, OptionalAssociatedSourceId, TableType, + BelongsToJobId, Columns, Pk, DistributionKey, diff --git a/src/meta/model_v2/src/object.rs b/src/meta/model_v2/src/object.rs index 39506777068a..117992ff2051 100644 --- a/src/meta/model_v2/src/object.rs +++ b/src/meta/model_v2/src/object.rs @@ -104,6 +104,8 @@ pub enum Relation { Source, #[sea_orm(has_many = "super::table::Entity")] Table, + #[sea_orm(has_many = "super::streaming_job::Entity")] + StreamingJob, #[sea_orm( belongs_to = "super::user::Entity", from = "Column::OwnerId", @@ -172,6 +174,12 @@ impl Related for Entity { } } +impl Related for Entity { + fn to() -> RelationDef { + Relation::StreamingJob.def() + } +} + impl Related for Entity { fn to() -> RelationDef { Relation::User.def() diff --git a/src/meta/model_v2/src/table.rs b/src/meta/model_v2/src/table.rs index 2904fd550579..9b1538c059af 100644 --- a/src/meta/model_v2/src/table.rs +++ b/src/meta/model_v2/src/table.rs @@ -18,7 +18,7 @@ use sea_orm::entity::prelude::*; use crate::{ Cardinality, ColumnCatalogArray, ColumnOrderArray, CreateType, FragmentId, I32Array, JobStatus, - Property, SourceId, TableId, TableVersion, + ObjectId, Property, SourceId, TableId, TableVersion, }; #[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] @@ -74,6 +74,7 @@ pub struct Model { pub name: String, pub optional_associated_source_id: Option, pub table_type: TableType, + pub belongs_to_job_id: Option, pub columns: ColumnCatalogArray, pub pk: ColumnOrderArray, pub distribution_key: I32Array, @@ -116,6 +117,14 @@ pub enum Relation { on_delete = "NoAction" )] Fragment1, + #[sea_orm( + belongs_to = "super::object::Entity", + from = "Column::BelongsToJobId", + to = "super::object::Column::Oid", + on_update = "NoAction", + on_delete = "Cascade" + )] + Object2, #[sea_orm( belongs_to = "super::object::Entity", from = "Column::TableId", @@ -123,7 +132,7 @@ pub enum Relation { on_update = "NoAction", on_delete = "Cascade" )] - Object, + Object1, #[sea_orm( belongs_to = "super::source::Entity", from = "Column::OptionalAssociatedSourceId", @@ -136,7 +145,7 @@ pub enum Relation { impl Related for Entity { fn to() -> RelationDef { - Relation::Object.def() + Relation::Object1.def() } } diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index f7eb35b10f7b..72badbb21c41 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -21,10 +21,11 @@ use risingwave_common::bail; use risingwave_common::catalog::{DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS}; use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::prelude::*; +use risingwave_meta_model_v2::table::TableType; use risingwave_meta_model_v2::{ connection, database, function, index, object, object_dependency, schema, sink, source, table, - user_privilege, view, ColumnCatalogArray, ConnectionId, DatabaseId, FunctionId, ObjectId, - PrivateLinkService, SchemaId, SourceId, TableId, UserId, + user_privilege, view, ColumnCatalogArray, ConnectionId, DatabaseId, FunctionId, IndexId, + ObjectId, PrivateLinkService, SchemaId, SourceId, TableId, UserId, }; use risingwave_pb::catalog::{ PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, @@ -35,10 +36,11 @@ use risingwave_pb::meta::subscribe_response::{ Info as NotificationInfo, Operation as NotificationOperation, }; use risingwave_pb::meta::{PbRelation, PbRelationGroup, PbTableFragments}; +use sea_orm::sea_query::SimpleExpr; use sea_orm::ActiveValue::Set; use sea_orm::{ - ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait, JoinType, - QueryFilter, QuerySelect, RelationTrait, TransactionTrait, + ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait, + IntoActiveModel, JoinType, QueryFilter, QuerySelect, RelationTrait, TransactionTrait, Value, }; use tokio::sync::RwLock; @@ -50,7 +52,7 @@ use crate::controller::utils::{ get_referring_objects_cascade, list_user_info_by_ids, PartialObject, }; use crate::controller::ObjectModel; -use crate::manager::{MetaSrvEnv, NotificationVersion, StreamingJob}; +use crate::manager::{MetaSrvEnv, NotificationVersion, StreamingJob, IGNORED_NOTIFICATION_VERSION}; use crate::rpc::ddl_controller::DropMode; use crate::{MetaError, MetaResult}; @@ -613,6 +615,219 @@ impl CatalogController { Ok(version) } + pub async fn alter_owner( + &self, + object_type: ObjectType, + object_id: ObjectId, + new_owner: UserId, + ) -> MetaResult { + let inner = self.inner.write().await; + let txn = inner.db.begin().await?; + ensure_user_id(new_owner, &txn).await?; + + let obj = Object::find_by_id(object_id) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?; + if obj.owner_id == new_owner { + return Ok(IGNORED_NOTIFICATION_VERSION); + } + let mut obj = obj.into_active_model(); + obj.owner_id = Set(new_owner); + let obj = obj.update(&txn).await?; + + let mut relations = vec![]; + match object_type { + ObjectType::Database => { + let db = Database::find_by_id(object_id) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("database", object_id))?; + + txn.commit().await?; + + let version = self + .notify_frontend( + NotificationOperation::Update, + NotificationInfo::Database(ObjectModel(db, obj).into()), + ) + .await; + return Ok(version); + } + ObjectType::Schema => { + let schema = Schema::find_by_id(object_id) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("schema", object_id))?; + + txn.commit().await?; + + let version = self + .notify_frontend( + NotificationOperation::Update, + NotificationInfo::Schema(ObjectModel(schema, obj).into()), + ) + .await; + return Ok(version); + } + ObjectType::Table => { + let table = Table::find_by_id(object_id) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?; + + // associated source. + if let Some(associated_source_id) = table.optional_associated_source_id { + let src_obj = object::ActiveModel { + oid: Set(associated_source_id as _), + owner_id: Set(new_owner), + ..Default::default() + } + .update(&txn) + .await?; + let source = Source::find_by_id(associated_source_id) + .one(&txn) + .await? + .ok_or_else(|| { + MetaError::catalog_id_not_found("source", associated_source_id) + })?; + relations.push(PbRelationInfo::Source(ObjectModel(source, src_obj).into())); + } + + // indexes. + let (index_ids, mut table_ids): (Vec, Vec) = + if table.table_type == TableType::Table { + Index::find() + .select_only() + .columns([index::Column::IndexId, index::Column::IndexTableId]) + .filter(index::Column::PrimaryTableId.eq(object_id)) + .into_tuple::<(IndexId, TableId)>() + .all(&txn) + .await? + .into_iter() + .unzip() + } else { + (vec![], vec![]) + }; + relations.push(PbRelationInfo::Table(ObjectModel(table, obj).into())); + + // internal tables. + let internal_tables: Vec = Table::find() + .select_only() + .column(table::Column::TableId) + .filter( + table::Column::BelongsToJobId + .is_in(table_ids.iter().cloned().chain(std::iter::once(object_id))), + ) + .into_tuple() + .all(&txn) + .await?; + table_ids.extend(internal_tables); + + Object::update_many() + .col_expr( + object::Column::OwnerId, + SimpleExpr::Value(Value::Int(Some(new_owner))), + ) + .filter( + object::Column::Oid + .is_in(index_ids.iter().cloned().chain(table_ids.iter().cloned())), + ) + .exec(&txn) + .await?; + + let index_objs = Index::find() + .find_also_related(Object) + .filter(index::Column::IndexId.is_in(index_ids)) + .all(&txn) + .await?; + for (index, index_obj) in index_objs { + relations.push(PbRelationInfo::Index( + ObjectModel(index, index_obj.unwrap()).into(), + )); + } + let table_objs = Table::find() + .find_also_related(Object) + .filter(table::Column::TableId.is_in(table_ids)) + .all(&txn) + .await?; + for (table, table_obj) in table_objs { + relations.push(PbRelationInfo::Table( + ObjectModel(table, table_obj.unwrap()).into(), + )); + } + } + ObjectType::Source => { + let source = Source::find_by_id(object_id) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?; + relations.push(PbRelationInfo::Source(ObjectModel(source, obj).into())); + } + ObjectType::Sink => { + let sink = Sink::find_by_id(object_id) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?; + relations.push(PbRelationInfo::Sink(ObjectModel(sink, obj).into())); + + // internal tables. + let internal_tables: Vec = Table::find() + .select_only() + .column(table::Column::TableId) + .filter(table::Column::BelongsToJobId.eq(object_id)) + .into_tuple() + .all(&txn) + .await?; + + Object::update_many() + .col_expr( + object::Column::OwnerId, + SimpleExpr::Value(Value::Int(Some(new_owner))), + ) + .filter(object::Column::Oid.is_in(internal_tables.clone())) + .exec(&txn) + .await?; + + let table_objs = Table::find() + .find_also_related(Object) + .filter(table::Column::TableId.is_in(internal_tables)) + .all(&txn) + .await?; + for (table, table_obj) in table_objs { + relations.push(PbRelationInfo::Table( + ObjectModel(table, table_obj.unwrap()).into(), + )); + } + } + ObjectType::View => { + let view = View::find_by_id(object_id) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?; + relations.push(PbRelationInfo::View(ObjectModel(view, obj).into())); + } + _ => unreachable!("not supported object type: {:?}", object_type), + }; + + txn.commit().await?; + + let version = self + .notify_frontend( + NotificationOperation::Update, + NotificationInfo::RelationGroup(PbRelationGroup { + relations: relations + .into_iter() + .map(|relation| PbRelation { + relation_info: Some(relation), + }) + .collect(), + }), + ) + .await; + Ok(version) + } + pub async fn comment_on(&self, comment: PbComment) -> MetaResult { let inner = self.inner.write().await; let txn = inner.db.begin().await?; @@ -735,7 +950,7 @@ impl CatalogController { .into_partial_model() .all(&txn) .await?; - to_drop_objects.extend(to_drop_source_objs.clone()); + to_drop_objects.extend(to_drop_source_objs); if object_type == ObjectType::Source { to_drop_source_ids.push(object_id); } @@ -749,12 +964,24 @@ impl CatalogController { .all(&txn) .await?; to_drop_streaming_jobs.extend(index_table_ids); - let to_drop_internal_table_objs: Vec = Object::find() - .filter(object::Column::Oid.is_in(to_drop_streaming_jobs.clone())) - .into_partial_model() - .all(&txn) - .await?; - to_drop_objects.extend(to_drop_internal_table_objs); + + if !to_drop_streaming_jobs.is_empty() { + let to_drop_internal_table_objs: Vec = Object::find() + .select_only() + .columns([ + object::Column::Oid, + object::Column::ObjType, + object::Column::SchemaId, + object::Column::DatabaseId, + ]) + .join(JoinType::InnerJoin, object::Relation::Table.def()) + .filter(table::Column::BelongsToJobId.is_in(to_drop_streaming_jobs.clone())) + .into_partial_model() + .all(&txn) + .await?; + + to_drop_objects.extend(to_drop_internal_table_objs); + } // Find affect users with privileges on all this objects. let to_update_user_ids: Vec = UserPrivilege::find()