Skip to content

Commit

Permalink
refactor(frontend): rewrite 3 system functions using context (#14960)
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <[email protected]>
  • Loading branch information
wangrunji0408 authored Feb 18, 2024
1 parent dfb7989 commit 1b1d2f1
Show file tree
Hide file tree
Showing 17 changed files with 262 additions and 453 deletions.
3 changes: 3 additions & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ message ExprNode {
PG_GET_INDEXDEF = 2400;
COL_DESCRIPTION = 2401;
PG_GET_VIEWDEF = 2402;
PG_GET_USERBYID = 2403;
PG_INDEXES_SIZE = 2404;
PG_RELATION_SIZE = 2405;

// EXTERNAL
ICEBERG_TRANSFORM = 2201;
Expand Down
17 changes: 4 additions & 13 deletions src/frontend/planner_test/tests/testdata/output/pg_catalog.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -204,20 +204,11 @@
- sql: |
select pg_catalog.pg_get_userbyid(1)
logical_plan: |-
LogicalProject { exprs: [rw_users.name] }
└─LogicalApply { type: LeftOuter, on: true, correlated_id: 1, max_one_row: true }
├─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
└─LogicalProject { exprs: [rw_users.name] }
└─LogicalFilter { predicate: (1:Int32 = rw_users.id) }
└─LogicalShare { id: 2 }
└─LogicalProject { exprs: [rw_users.id, rw_users.name, rw_users.create_db, rw_users.is_super, '********':Varchar] }
└─LogicalSysScan { table: rw_users, columns: [rw_users.id, rw_users.name, rw_users.is_super, rw_users.create_db, rw_users.create_user, rw_users.can_login] }
LogicalProject { exprs: [PgGetUserbyid(1:Int32) as $expr1] }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
batch_plan: |-
BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all }
├─BatchValues { rows: [[]] }
└─BatchProject { exprs: [rw_users.name] }
└─BatchFilter { predicate: (1:Int32 = rw_users.id) }
└─BatchScan { table: rw_users, columns: [rw_users.name, rw_users.id], distribution: Single }
BatchProject { exprs: [PgGetUserbyid(1:Int32) as $expr1] }
└─BatchValues { rows: [[]] }
- sql: |
select 'pg_namespace'::regclass
logical_plan: |-
Expand Down
141 changes: 60 additions & 81 deletions src/frontend/planner_test/tests/testdata/output/subquery.yaml

Large diffs are not rendered by default.

125 changes: 20 additions & 105 deletions src/frontend/src/binder/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ use risingwave_sqlparser::parser::ParserError;
use thiserror_ext::AsReport;

use crate::binder::bind_context::Clause;
use crate::binder::{Binder, BoundQuery, BoundSetExpr, UdfContext};
use crate::binder::{Binder, UdfContext};
use crate::error::{ErrorCode, Result, RwError};
use crate::expr::{
AggCall, Expr, ExprImpl, ExprType, FunctionCall, FunctionCallWithLambda, Literal, Now, OrderBy,
Subquery, SubqueryKind, TableFunction, TableFunctionType, UserDefinedFunction, WindowFunction,
TableFunction, TableFunctionType, UserDefinedFunction, WindowFunction,
};
use crate::utils::Condition;

Expand Down Expand Up @@ -1230,112 +1230,27 @@ impl Binder {
("current_role", current_user()),
("current_user", current_user()),
("user", current_user()),
("pg_get_userbyid", guard_by_len(1, raw(|binder, inputs|{
let input = &inputs[0];
let bound_query = binder.bind_get_user_by_id_select(input)?;
Ok(ExprImpl::Subquery(Box::new(Subquery::new(
BoundQuery {
body: BoundSetExpr::Select(Box::new(bound_query)),
order: vec![],
limit: None,
offset: None,
with_ties: false,
extra_order_exprs: vec![],
},
SubqueryKind::Scalar,
))))
}
))),
("pg_get_userbyid", raw_call(ExprType::PgGetUserbyid)),
("pg_get_indexdef", raw_call(ExprType::PgGetIndexdef)),
("pg_get_viewdef", raw_call(ExprType::PgGetViewdef)),
("pg_relation_size", dispatch_by_len(vec![
(1, raw(|binder, inputs|{
let table_name = &inputs[0];
let bound_query = binder.bind_get_table_size_select("pg_relation_size", table_name)?;
Ok(ExprImpl::Subquery(Box::new(Subquery::new(
BoundQuery {
body: BoundSetExpr::Select(Box::new(bound_query)),
order: vec![],
limit: None,
offset: None,
with_ties: false,
extra_order_exprs: vec![],
},
SubqueryKind::Scalar,
))))
})),
(2, raw(|binder, inputs|{
let table_name = &inputs[0];
match inputs[1].as_literal() {
Some(literal) if literal.return_type() == DataType::Varchar => {
match literal
.get_data()
.as_ref()
.expect("ExprImpl value is a Literal but cannot get ref to data")
.as_utf8()
.as_ref() {
"main" => {
let bound_query = binder.bind_get_table_size_select("pg_relation_size", table_name)?;
Ok(ExprImpl::Subquery(Box::new(Subquery::new(
BoundQuery {
body: BoundSetExpr::Select(Box::new(bound_query)),
order: vec![],
limit: None,
offset: None,
with_ties: false,
extra_order_exprs: vec![],
},
SubqueryKind::Scalar,
))))
},
// These options are invalid in RW so we return 0 value as the result
"fsm"|"vm"|"init" => {
Ok(ExprImpl::literal_int(0))
},
_ => Err(ErrorCode::InvalidInputSyntax(
"invalid fork name. Valid fork names are \"main\", \"fsm\", \"vm\", and \"init\"".into()).into())
}
},
_ => Err(ErrorCode::ExprError(
"The 2nd argument of `pg_relation_size` must be string literal.".into(),
)
.into())
}
})),
]
)),
("pg_table_size", guard_by_len(1, raw(|binder, inputs|{
let input = &inputs[0];
let bound_query = binder.bind_get_table_size_select("pg_table_size", input)?;
Ok(ExprImpl::Subquery(Box::new(Subquery::new(
BoundQuery {
body: BoundSetExpr::Select(Box::new(bound_query)),
order: vec![],
limit: None,
offset: None,
with_ties: false,
extra_order_exprs: vec![],
},
SubqueryKind::Scalar,
))))
}
))),
("pg_indexes_size", guard_by_len(1, raw(|binder, inputs|{
let input = &inputs[0];
let bound_query = binder.bind_get_indexes_size_select(input)?;
Ok(ExprImpl::Subquery(Box::new(Subquery::new(
BoundQuery {
body: BoundSetExpr::Select(Box::new(bound_query)),
order: vec![],
limit: None,
offset: None,
with_ties: false,
extra_order_exprs: vec![],
},
SubqueryKind::Scalar,
))))
("pg_relation_size", raw(|_binder, mut inputs|{
if inputs.is_empty() {
return Err(ErrorCode::ExprError(
"function pg_relation_size() does not exist".into(),
)
.into());
}
))),
inputs[0].cast_to_regclass_mut()?;
Ok(FunctionCall::new(ExprType::PgRelationSize, inputs)?.into())
})),
("pg_table_size", guard_by_len(1, raw(|_binder, mut inputs|{
inputs[0].cast_to_regclass_mut()?;
Ok(FunctionCall::new(ExprType::PgRelationSize, inputs)?.into())
}))),
("pg_indexes_size", guard_by_len(1, raw(|_binder, mut inputs|{
inputs[0].cast_to_regclass_mut()?;
Ok(FunctionCall::new(ExprType::PgIndexesSize, inputs)?.into())
}))),
("pg_get_expr", raw(|_binder, inputs|{
if inputs.len() == 2 || inputs.len() == 3 {
// TODO: implement pg_get_expr rather than just return empty as an workaround.
Expand Down
16 changes: 1 addition & 15 deletions src/frontend/src/binder/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,23 +612,9 @@ impl Binder {
match &data_type {
// Casting to Regclass type means getting the oid of expr.
// See https://www.postgresql.org/docs/current/datatype-oid.html.
// Currently only string liter expr is supported since we cannot handle subquery in join
// on condition: https://github.com/risingwavelabs/risingwave/issues/6852
// TODO: Add generic expr support when needed
AstDataType::Regclass => {
let input = self.bind_expr_inner(expr)?;
match input.return_type() {
DataType::Varchar => Ok(ExprImpl::FunctionCall(Box::new(
FunctionCall::new_unchecked(
ExprType::CastRegclass,
vec![input],
DataType::Int32,
),
))),
DataType::Int32 => Ok(input),
dt if dt.is_int() => Ok(input.cast_explicit(DataType::Int32)?),
_ => Err(ErrorCode::BindError("Unsupported input type".to_string()).into()),
}
Ok(input.cast_to_regclass()?)
}
AstDataType::Regproc => {
let lhs = self.bind_expr_inner(expr)?;
Expand Down
Loading

0 comments on commit 1b1d2f1

Please sign in to comment.