Skip to content

Commit

Permalink
feat: add a helper function to detect sink into table cycle using RCTE
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 committed Feb 20, 2024
1 parent 4197ad5 commit 2c52c64
Showing 1 changed file with 130 additions and 0 deletions.
130 changes: 130 additions & 0 deletions src/meta/src/controller/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,107 @@ pub fn construct_obj_dependency_query(obj_id: ObjectId) -> WithQuery {
.to_owned()
}

/// This function will construct a query using recursive cte to find if dependent objects are already relying on the target table.
///
/// # Examples
///
/// ```
/// use risingwave_meta::controller::utils::construct_sink_cycle_check_query;
/// use sea_orm::sea_query::*;
/// use sea_orm::*;
///
/// let query = construct_sink_cycle_check_query(1, vec![2, 3]);
///
/// assert_eq!(
/// query.to_string(MysqlQueryBuilder),
/// r#"WITH RECURSIVE `used_by_object_ids_with_sink` (`oid`, `used_by`) AS (SELECT `oid`, `used_by` FROM `object_dependency` WHERE `object_dependency`.`oid` = 1 UNION ALL (SELECT `obj_dependency_with_sink`.`oid`, `obj_dependency_with_sink`.`used_by` FROM (SELECT `oid`, `used_by` FROM `object_dependency` UNION ALL (SELECT `sink_id`, `target_table` FROM `sink` WHERE `sink`.`target_table` IS NOT NULL)) AS `obj_dependency_with_sink` INNER JOIN `used_by_object_ids_with_sink` ON `used_by_object_ids_with_sink`.`used_by` = `obj_dependency_with_sink`.`oid` WHERE `used_by_object_ids_with_sink`.`used_by` <> `used_by_object_ids_with_sink`.`oid`)) SELECT COUNT(`used_by_object_ids_with_sink`.`used_by`) FROM `used_by_object_ids_with_sink` WHERE `used_by_object_ids_with_sink`.`used_by` IN (2, 3)"#
/// );
/// assert_eq!(
/// query.to_string(PostgresQueryBuilder),
/// r#"WITH RECURSIVE "used_by_object_ids_with_sink" ("oid", "used_by") AS (SELECT "oid", "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL (SELECT "obj_dependency_with_sink"."oid", "obj_dependency_with_sink"."used_by" FROM (SELECT "oid", "used_by" FROM "object_dependency" UNION ALL (SELECT "sink_id", "target_table" FROM "sink" WHERE "sink"."target_table" IS NOT NULL)) AS "obj_dependency_with_sink" INNER JOIN "used_by_object_ids_with_sink" ON "used_by_object_ids_with_sink"."used_by" = "obj_dependency_with_sink"."oid" WHERE "used_by_object_ids_with_sink"."used_by" <> "used_by_object_ids_with_sink"."oid")) SELECT COUNT("used_by_object_ids_with_sink"."used_by") FROM "used_by_object_ids_with_sink" WHERE "used_by_object_ids_with_sink"."used_by" IN (2, 3)"#
/// );
/// assert_eq!(
/// query.to_string(SqliteQueryBuilder),
/// r#"WITH RECURSIVE "used_by_object_ids_with_sink" ("oid", "used_by") AS (SELECT "oid", "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL SELECT "obj_dependency_with_sink"."oid", "obj_dependency_with_sink"."used_by" FROM (SELECT "oid", "used_by" FROM "object_dependency" UNION ALL SELECT "sink_id", "target_table" FROM "sink" WHERE "sink"."target_table" IS NOT NULL) AS "obj_dependency_with_sink" INNER JOIN "used_by_object_ids_with_sink" ON "used_by_object_ids_with_sink"."used_by" = "obj_dependency_with_sink"."oid" WHERE "used_by_object_ids_with_sink"."used_by" <> "used_by_object_ids_with_sink"."oid") SELECT COUNT("used_by_object_ids_with_sink"."used_by") FROM "used_by_object_ids_with_sink" WHERE "used_by_object_ids_with_sink"."used_by" IN (2, 3)"#
/// );
/// ```
pub fn construct_sink_cycle_check_query(
target_table: ObjectId,
dependent_objects: Vec<ObjectId>,
) -> WithQuery {
let cte_alias = Alias::new("used_by_object_ids_with_sink");
let depend_alias = Alias::new("obj_dependency_with_sink");

let mut base_query = SelectStatement::new()
.columns([
object_dependency::Column::Oid,
object_dependency::Column::UsedBy,
])
.from(ObjectDependency)
.and_where(object_dependency::Column::Oid.eq(target_table))
.to_owned();

let query_sink_deps = SelectStatement::new()
.columns([sink::Column::SinkId, sink::Column::TargetTable])
.from(Sink)
.and_where(sink::Column::TargetTable.is_not_null())
.to_owned();

let cte_referencing = Query::select()
.column((depend_alias.clone(), object_dependency::Column::Oid))
.column((depend_alias.clone(), object_dependency::Column::UsedBy))
.from_subquery(
SelectStatement::new()
.columns([
object_dependency::Column::Oid,
object_dependency::Column::UsedBy,
])
.from(ObjectDependency)
.union(UnionType::All, query_sink_deps)
.to_owned(),
depend_alias.clone(),
)
.inner_join(
cte_alias.clone(),
Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).eq(Expr::col((
depend_alias.clone(),
object_dependency::Column::Oid,
))),
)
.and_where(
Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).ne(Expr::col((
cte_alias.clone(),
object_dependency::Column::Oid,
))),
)
.to_owned();

let common_table_expr = CommonTableExpression::new()
.query(base_query.union(UnionType::All, cte_referencing).to_owned())
.columns([
object_dependency::Column::Oid,
object_dependency::Column::UsedBy,
])
.table_name(cte_alias.clone())
.to_owned();

SelectStatement::new()
.expr(Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy)).count())
.from(cte_alias.clone())
.and_where(
Expr::col((cte_alias.clone(), object_dependency::Column::UsedBy))
.is_in(dependent_objects),
)
.to_owned()
.with(
WithClause::new()
.recursive(true)
.cte(common_table_expr)
.to_owned(),
)
.to_owned()
}

#[derive(Clone, DerivePartialModel, FromQueryResult)]
#[sea_orm(entity = "Object")]
pub struct PartialObject {
Expand Down Expand Up @@ -175,6 +276,35 @@ where
Ok(objects)
}

/// Check if create a sink with given dependent objects into the target table will cause a cycle, return true if it will.
pub async fn check_sink_into_table_cycle<C>(
target_table: ObjectId,
dependent_objs: Vec<ObjectId>,
db: &C,
) -> MetaResult<bool>
where
C: ConnectionTrait,
{
if dependent_objs.is_empty() {
return Ok(false);
}

let query = construct_sink_cycle_check_query(target_table, dependent_objs);
let (sql, values) = query.build_any(&*db.get_database_backend().get_query_builder());

let res = db
.query_one(Statement::from_sql_and_values(
db.get_database_backend(),
sql,
values,
))
.await?
.unwrap();
let cnt: i32 = res.try_get_by(0)?;

Ok(cnt != 0)
}

/// `ensure_object_id` ensures the existence of target object in the cluster.
pub async fn ensure_object_id<C>(
object_type: ObjectType,
Expand Down

0 comments on commit 2c52c64

Please sign in to comment.