diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index ff19892d516b5..fe22d6dc079d6 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -118,6 +118,107 @@ pub fn construct_obj_dependency_query(obj_id: ObjectId) -> WithQuery { .to_owned() } +/// This function will construct a query using recursive cte to find if dependent objects are already relying on the target table. +/// +/// # Examples +/// +/// ``` +/// use risingwave_meta::controller::utils::construct_sink_cycle_check_query; +/// use sea_orm::sea_query::*; +/// use sea_orm::*; +/// +/// let query = construct_sink_cycle_check_query(1, vec![2, 3]); +/// +/// assert_eq!( +/// query.to_string(MysqlQueryBuilder), +/// r#"WITH RECURSIVE `used_by_object_ids_with_sink` (`oid`, `used_by`) AS (SELECT `oid`, `used_by` FROM `object_dependency` WHERE `object_dependency`.`oid` = 1 UNION ALL (SELECT `obj_dependency_with_sink`.`oid`, `obj_dependency_with_sink`.`used_by` FROM (SELECT `oid`, `used_by` FROM `object_dependency` UNION ALL (SELECT `sink_id`, `target_table` FROM `sink` WHERE `sink`.`target_table` IS NOT NULL)) AS `obj_dependency_with_sink` INNER JOIN `used_by_object_ids_with_sink` ON `used_by_object_ids_with_sink`.`used_by` = `obj_dependency_with_sink`.`oid` WHERE `used_by_object_ids_with_sink`.`used_by` <> `used_by_object_ids_with_sink`.`oid`)) SELECT COUNT(`used_by_object_ids_with_sink`.`used_by`) FROM `used_by_object_ids_with_sink` WHERE `used_by_object_ids_with_sink`.`used_by` IN (2, 3)"# +/// ); +/// assert_eq!( +/// query.to_string(PostgresQueryBuilder), +/// r#"WITH RECURSIVE "used_by_object_ids_with_sink" ("oid", "used_by") AS (SELECT "oid", "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL (SELECT "obj_dependency_with_sink"."oid", "obj_dependency_with_sink"."used_by" FROM (SELECT "oid", "used_by" FROM "object_dependency" UNION ALL (SELECT "sink_id", "target_table" FROM "sink" WHERE "sink"."target_table" IS NOT NULL)) AS "obj_dependency_with_sink" INNER JOIN "used_by_object_ids_with_sink" ON "used_by_object_ids_with_sink"."used_by" = "obj_dependency_with_sink"."oid" WHERE "used_by_object_ids_with_sink"."used_by" <> "used_by_object_ids_with_sink"."oid")) SELECT COUNT("used_by_object_ids_with_sink"."used_by") FROM "used_by_object_ids_with_sink" WHERE "used_by_object_ids_with_sink"."used_by" IN (2, 3)"# +/// ); +/// assert_eq!( +/// query.to_string(SqliteQueryBuilder), +/// r#"WITH RECURSIVE "used_by_object_ids_with_sink" ("oid", "used_by") AS (SELECT "oid", "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL SELECT "obj_dependency_with_sink"."oid", "obj_dependency_with_sink"."used_by" FROM (SELECT "oid", "used_by" FROM "object_dependency" UNION ALL SELECT "sink_id", "target_table" FROM "sink" WHERE "sink"."target_table" IS NOT NULL) AS "obj_dependency_with_sink" INNER JOIN "used_by_object_ids_with_sink" ON "used_by_object_ids_with_sink"."used_by" = "obj_dependency_with_sink"."oid" WHERE "used_by_object_ids_with_sink"."used_by" <> "used_by_object_ids_with_sink"."oid") SELECT COUNT("used_by_object_ids_with_sink"."used_by") FROM "used_by_object_ids_with_sink" WHERE "used_by_object_ids_with_sink"."used_by" IN (2, 3)"# +/// ); +/// ``` +pub fn construct_sink_cycle_check_query( + target_table: ObjectId, + dependent_objects: Vec, +) -> WithQuery { + let cte_alias = Alias::new("used_by_object_ids_with_sink"); + let depend_alias = Alias::new("obj_dependency_with_sink"); + + let mut base_query = SelectStatement::new() + .columns([ + object_dependency::Column::Oid, + object_dependency::Column::UsedBy, + ]) + .from(ObjectDependency) + .and_where(object_dependency::Column::Oid.eq(target_table)) + .to_owned(); + + let query_sink_deps = SelectStatement::new() + .columns([sink::Column::SinkId, sink::Column::TargetTable]) + .from(Sink) + .and_where(sink::Column::TargetTable.is_not_null()) + .to_owned(); + + let cte_referencing = Query::select() + .column((depend_alias.clone(), object_dependency::Column::Oid)) + .column((depend_alias.clone(), object_dependency::Column::UsedBy)) + .from_subquery( + SelectStatement::new() + .columns([ + object_dependency::Column::Oid, + object_dependency::Column::UsedBy, + ]) + .from(ObjectDependency) + .union(UnionType::All, query_sink_deps) + .to_owned(), + depend_alias.clone(), + ) + .inner_join( + cte_alias.clone(), + Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).eq(Expr::col(( + depend_alias.clone(), + object_dependency::Column::Oid, + ))), + ) + .and_where( + Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).ne(Expr::col(( + cte_alias.clone(), + object_dependency::Column::Oid, + ))), + ) + .to_owned(); + + let common_table_expr = CommonTableExpression::new() + .query(base_query.union(UnionType::All, cte_referencing).to_owned()) + .columns([ + object_dependency::Column::Oid, + object_dependency::Column::UsedBy, + ]) + .table_name(cte_alias.clone()) + .to_owned(); + + SelectStatement::new() + .expr(Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).count()) + .from(cte_alias.clone()) + .and_where( + Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)) + .is_in(dependent_objects), + ) + .to_owned() + .with( + WithClause::new() + .recursive(true) + .cte(common_table_expr) + .to_owned(), + ) + .to_owned() +} + #[derive(Clone, DerivePartialModel, FromQueryResult)] #[sea_orm(entity = "Object")] pub struct PartialObject { @@ -175,6 +276,35 @@ where Ok(objects) } +/// Check if create a sink with given dependent objects into the target table will cause a cycle, return true if it will. +pub async fn check_sink_into_table_cycle( + target_table: ObjectId, + dependent_objs: Vec, + db: &C, +) -> MetaResult +where + C: ConnectionTrait, +{ + if dependent_objs.is_empty() { + return Ok(false); + } + + let query = construct_sink_cycle_check_query(target_table, dependent_objs); + let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder()); + + let res = db + .query_one(Statement::from_sql_and_values( + db.get_database_backend(), + sql, + values, + )) + .await? + .unwrap(); + let cnt: i32 = res.try_get_by(0)?; + + Ok(cnt != 0) +} + /// `ensure_object_id` ensures the existence of target object in the cluster. pub async fn ensure_object_id( object_type: ObjectType,