Skip to content

Commit

Permalink
add doctest
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 committed Sep 26, 2023
1 parent 6b9c067 commit d1846b5
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 71 deletions.
77 changes: 7 additions & 70 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,20 @@ use risingwave_pb::catalog::{PbDatabase, PbSchema};
use risingwave_pb::meta::subscribe_response::{
Info as NotificationInfo, Operation as NotificationOperation,
};
use sea_orm::sea_query::{
Alias, CommonTableExpression, Expr, Query, QueryStatementBuilder, SelectStatement, UnionType,
WithClause,
};
use sea_orm::sea_query::{Expr, Query, QueryStatementBuilder, UnionType};
use sea_orm::{
ActiveModelTrait, ActiveValue, ColumnTrait, ConnectionTrait, DatabaseConnection,
DatabaseTransaction, EntityTrait, Iden, JoinType, ModelTrait, Order, QueryFilter, Statement,
TransactionTrait,
DatabaseTransaction, EntityTrait, ModelTrait, QueryFilter, Statement, TransactionTrait,
};
use tokio::sync::RwLock;

use crate::controller::utils::construct_obj_dependency_query;
use crate::controller::ObjectModel;
use crate::manager::{DatabaseId, MetaSrvEnv, NotificationVersion, UserId};
use crate::model_v2::object::ObjectType;
use crate::model_v2::prelude::*;
use crate::model_v2::{
connection, database, function, index, object, object_dependency, schema, sink, source, table,
view,
connection, database, function, index, object, schema, sink, source, table, view,
};
use crate::{MetaError, MetaResult};

Expand Down Expand Up @@ -142,70 +138,11 @@ impl CatalogController {
Ok(version)
}

/// This function will list all the objects that are using the given one. It runs a recursive CTE to find all the dependencies.
/// The cte and the query is as follows:
/// ```sql
/// WITH RECURSIVE used_by_object_ids (used_by) AS
/// (
/// SELECT used_by FROM object_dependency WHERE object_dependency.oid = $1
/// UNION ALL
/// (
/// SELECT object_dependency.used_by
/// FROM object_dependency
/// INNER JOIN used_by_object_ids
/// ON used_by_object_ids.used_by = oid
/// )
/// )
/// SELECT DISTINCT used_by FROM used_by_object_ids ORDER BY used_by DESC;
/// ```
/// List all objects that are using the given one. It runs a recursive CTE to find all the dependencies.
async fn list_used_by(&self, obj_id: i32) -> MetaResult<Vec<i32>> {
let inner = self.inner.read().await;

let cte_alias = Alias::new("used_by_object_ids");
let cte_return_alias = Alias::new("used_by");

let base_query = SelectStatement::new()
.column(object_dependency::Column::UsedBy)
.from(ObjectDependency)
.and_where(object_dependency::Column::Oid.eq(obj_id))
.to_owned();

let cte_referencing = Query::select()
.column((ObjectDependency, object_dependency::Column::UsedBy))
.from(ObjectDependency)
.join(
JoinType::InnerJoin,
cte_alias.clone(),
Expr::col((cte_alias.clone(), cte_return_alias.clone()))
.equals(object_dependency::Column::Oid),
)
.to_owned();

let common_table_expr = CommonTableExpression::new()
.query(
base_query
.clone()
.union(UnionType::All, cte_referencing)
.to_owned(),
)
.column(cte_return_alias.clone())
.table_name(cte_alias.clone())
.to_owned();

let query = SelectStatement::new()
.distinct()
.column(cte_return_alias.clone())
.from(cte_alias)
.order_by(cte_return_alias.clone(), Order::Desc)
.to_owned()
.with(
WithClause::new()
.recursive(true)
.cte(common_table_expr)
.to_owned(),
)
.to_owned();

let query = construct_obj_dependency_query(obj_id, "used_by");
let (sql, values) = query.build_any(&*inner.db.get_database_backend().get_query_builder());
let res = inner
.db
Expand All @@ -218,7 +155,7 @@ impl CatalogController {

let ids: Vec<i32> = res
.into_iter()
.map(|row| row.try_get("", &cte_return_alias.to_string()).unwrap())
.map(|row| row.try_get("", "user_by").unwrap())
.collect_vec();
Ok(ids)
}
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ use crate::model_v2::{database, object, schema};
use crate::MetaError;

#[allow(dead_code)]
mod catalog;
pub mod catalog;
pub mod system_param;
pub mod utils;

// todo: refine the error transform.
impl From<sea_orm::DbErr> for MetaError {
Expand Down
93 changes: 93 additions & 0 deletions src/meta/src/controller/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use model_migration::WithQuery;
use sea_orm::sea_query::{
Alias, CommonTableExpression, Expr, Query, SelectStatement, UnionType, WithClause,
};
use sea_orm::{ColumnTrait, JoinType, Order};

use crate::model_v2::object_dependency;
use crate::model_v2::prelude::*;

/// This function will construct a query using recursive cte to find all objects that are used by the given object.
///
/// # Examples
///
/// ```
/// use risingwave_meta::controller::utils::construct_obj_dependency_query;
/// use sea_orm::sea_query::*;
/// use sea_orm::*;
///
/// let query = construct_obj_dependency_query(1, "used_by");
///
/// assert_eq!(
/// query.to_string(MysqlQueryBuilder),
/// r#"WITH RECURSIVE `used_by_object_ids` (`used_by`) AS (SELECT `used_by` FROM `object_dependency` WHERE `object_dependency`.`oid` = 1 UNION ALL (SELECT `object_dependency`.`used_by` FROM `object_dependency` INNER JOIN `used_by_object_ids` ON `used_by_object_ids`.`used_by` = `oid`)) SELECT DISTINCT `used_by` FROM `used_by_object_ids` ORDER BY `used_by` DESC"#
/// );
/// assert_eq!(
/// query.to_string(PostgresQueryBuilder),
/// r#"WITH RECURSIVE "used_by_object_ids" ("used_by") AS (SELECT "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL (SELECT "object_dependency"."used_by" FROM "object_dependency" INNER JOIN "used_by_object_ids" ON "used_by_object_ids"."used_by" = "oid")) SELECT DISTINCT "used_by" FROM "used_by_object_ids" ORDER BY "used_by" DESC"#
/// );
/// assert_eq!(
/// query.to_string(SqliteQueryBuilder),
/// r#"WITH RECURSIVE "used_by_object_ids" ("used_by") AS (SELECT "used_by" FROM "object_dependency" WHERE "object_dependency"."oid" = 1 UNION ALL SELECT "object_dependency"."used_by" FROM "object_dependency" INNER JOIN "used_by_object_ids" ON "used_by_object_ids"."used_by" = "oid") SELECT DISTINCT "used_by" FROM "used_by_object_ids" ORDER BY "used_by" DESC"#
/// );
/// ```
pub fn construct_obj_dependency_query(obj_id: i32, column: &str) -> WithQuery {
let cte_alias = Alias::new("used_by_object_ids");
let cte_return_alias = Alias::new(column);

let base_query = SelectStatement::new()
.column(object_dependency::Column::UsedBy)
.from(ObjectDependency)
.and_where(object_dependency::Column::Oid.eq(obj_id))
.to_owned();

let cte_referencing = Query::select()
.column((ObjectDependency, object_dependency::Column::UsedBy))
.from(ObjectDependency)
.join(
JoinType::InnerJoin,
cte_alias.clone(),
Expr::col((cte_alias.clone(), cte_return_alias.clone()))
.equals(object_dependency::Column::Oid),
)
.to_owned();

let common_table_expr = CommonTableExpression::new()
.query(
base_query
.clone()
.union(UnionType::All, cte_referencing)
.to_owned(),
)
.column(cte_return_alias.clone())
.table_name(cte_alias.clone())
.to_owned();

SelectStatement::new()
.distinct()
.column(cte_return_alias.clone())
.from(cte_alias)
.order_by(cte_return_alias.clone(), Order::Desc)
.to_owned()
.with(
WithClause::new()
.recursive(true)
.cte(common_table_expr)
.to_owned(),
)
.to_owned()
}

0 comments on commit d1846b5

Please sign in to comment.