diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index db9c1d03eab4b..174410f7eb83a 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -53,7 +53,7 @@ use risingwave_pb::stream_plan::update_mutation::{MergeUpdate, PbMergeUpdate}; use risingwave_pb::stream_plan::{ PbDispatcher, PbDispatcherType, PbFragmentTypeFlag, PbStreamActor, }; -use sea_orm::sea_query::{Expr, SimpleExpr}; +use sea_orm::sea_query::{Expr, Query, SimpleExpr}; use sea_orm::ActiveValue::Set; use sea_orm::{ ActiveEnum, ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, @@ -124,6 +124,39 @@ impl CatalogController { ) .await?; + // check if any dependent relation is in altering status. + let dependent_relations = streaming_job.dependent_relations(); + if !dependent_relations.is_empty() { + let altering_cnt = ObjectDependency::find() + .join( + JoinType::InnerJoin, + object_dependency::Relation::Object1.def(), + ) + .join(JoinType::InnerJoin, object::Relation::StreamingJob.def()) + .filter( + object_dependency::Column::Oid + .is_in(dependent_relations.iter().map(|id| *id as ObjectId)) + .and(object::Column::ObjType.eq(ObjectType::Table)) + .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)) + .and( + // It means the referring table is just dummy for altering. + object::Column::Oid.not_in_subquery( + Query::select() + .column(table::Column::TableId) + .from(Table) + .to_owned(), + ), + ), + ) + .count(&txn) + .await?; + if altering_cnt != 0 { + return Err(MetaError::permission_denied( + "some dependent relations are being altered", + )); + } + } + match streaming_job { StreamingJob::MaterializedView(table) => { let job_id = Self::create_streaming_job_obj( @@ -256,7 +289,6 @@ impl CatalogController { } // record object dependency. - let dependent_relations = streaming_job.dependent_relations(); if !dependent_relations.is_empty() { ObjectDependency::insert_many(dependent_relations.into_iter().map(|id| { object_dependency::ActiveModel { @@ -541,12 +573,33 @@ impl CatalogController { return Err(MetaError::permission_denied("table version is stale")); } + // 2. check concurrent replace. + let referring_cnt = ObjectDependency::find() + .join( + JoinType::InnerJoin, + object_dependency::Relation::Object1.def(), + ) + .join(JoinType::InnerJoin, object::Relation::StreamingJob.def()) + .filter( + object_dependency::Column::Oid + .eq(id as ObjectId) + .and(object::Column::ObjType.eq(ObjectType::Table)) + .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)), + ) + .count(&txn) + .await?; + if referring_cnt != 0 { + return Err(MetaError::permission_denied( + "table is being altered or referenced by some creating jobs", + )); + } + let parallelism = match specified_parallelism { None => StreamingParallelism::Adaptive, Some(n) => StreamingParallelism::Fixed(n.get() as _), }; - // 2. create streaming object for new replace table. + // 3. create streaming object for new replace table. let obj_id = Self::create_streaming_job_obj( &txn, ObjectType::Table, @@ -559,7 +612,7 @@ impl CatalogController { ) .await?; - // 3. record dependency for new replace table. + // 4. record dependency for new replace table. ObjectDependency::insert(object_dependency::ActiveModel { oid: Set(id as _), used_by: Set(obj_id as _),