Skip to content

Commit

Permalink
refactor(binder): update bind_rcte to be more rusty
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu committed Apr 9, 2024
1 parent 249408e commit 8adca63
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 79 deletions.
2 changes: 1 addition & 1 deletion src/frontend/src/binder/bind_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ impl BindContext {
Entry::Occupied(e) => {
// check if this is a merge with recursive cte
if let Some(r) = self.cte_to_relation.get(&k) {
if let BindingCteState::Bound { .. } = r.borrow().state.clone() {
if let BindingCteState::Bound { .. } = r.borrow().state {
// no-op
continue;
}
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ mod tests {
"a",
),
Some(
"?column?",
"a",
),
],
from: Some(
Expand Down Expand Up @@ -702,7 +702,7 @@ mod tests {
),
schema: Schema {
fields: [
?column?:Int32,
a:Int32,
],
},
},
Expand All @@ -718,7 +718,7 @@ mod tests {
schema: Schema {
fields: [
a:Int32,
?column?:Int32,
a:Int32,
],
},
},
Expand Down
14 changes: 2 additions & 12 deletions src/frontend/src/binder/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ impl Binder {
all,
left,
right,
} = body.clone()
} = body
else {
return Err(ErrorCode::BindError(
"`UNION` is required in recursive CTE".to_string(),
Expand Down Expand Up @@ -362,18 +362,8 @@ impl Binder {
// bind the rest of the recursive cte
let mut recursive = self.bind_set_expr(*right)?;

// todo: add validate check here for *bound* `base` and `recursive`
Self::align_schema(&mut base, &mut recursive, SetOperator::Union)?;

// the final schema should be sticked with `recursive`'s
// e.g., <value> union all <select stmt>
let schema = recursive.schema().clone();
// yet another sanity check
assert_eq!(
schema,
recursive.schema().clone(),
"expect `schema` to be the same as recursive's"
);
let schema = base.schema().clone();

let recursive_union = RecursiveUnion {
all,
Expand Down
27 changes: 7 additions & 20 deletions src/frontend/src/binder/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::hash_map::Entry;
use std::ops::Deref;

use either::Either::{self, Left, Right};
use either::Either;
use itertools::{EitherOrBoth, Itertools};
use risingwave_common::bail;
use risingwave_common::catalog::{Field, TableId, DEFAULT_SCHEMA_NAME};
Expand Down Expand Up @@ -383,32 +383,19 @@ impl Binder {
Ok(Relation::BackCteRef(Box::new(BoundBackCteRef { share_id })))
}
BindingCteState::Bound { query } => {
let schema = match query.clone() {
Left(normal) => normal.schema().clone(),
Right(recursive) => recursive.schema.clone(),
let schema = match &query {
Either::Left(normal) => normal.schema(),
Either::Right(recursive) => &recursive.schema,
};
self.bind_table_to_context(
schema
.fields
.iter()
.map(|f| (false, f.clone())),
schema.fields.iter().map(|f| (false, f.clone())),
table_name.clone(),
Some(original_alias),
)?;
let input = match query {
// normal cte with union
Left(query) => Either::Left(query),
// recursive cte
Right(recursive) => Either::Right(recursive),
};
// we could always share the cte,
// no matter it's recursive or not.
let share_relation = Relation::Share(Box::new(BoundShare {
share_id,
// should either be a `BoundQuery` or `RecursiveUnion`
input,
}));
Ok(share_relation)
let input = query;
Ok(Relation::Share(Box::new(BoundShare { share_id, input })))
}
}
} else {
Expand Down
28 changes: 3 additions & 25 deletions src/frontend/src/binder/relation/share.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use either::Either;

use super::{BoundSubquery, Relation};
use crate::binder::bind_context::RecursiveUnion;
use crate::binder::statement::RewriteExprsRecursive;
use crate::binder::{BoundQuery, ShareId};
Expand All @@ -29,30 +28,9 @@ pub struct BoundShare {

impl RewriteExprsRecursive for BoundShare {
fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
let rewrite = match self.input.clone() {
Either::Left(mut q) => {
q.rewrite_exprs_recursive(rewriter);
Either::Left(q)
}
Either::Right(mut r) => {
r.rewrite_exprs_recursive(rewriter);
Either::Right(r)
}
match &mut self.input {
Either::Left(q) => q.rewrite_exprs_recursive(rewriter),
Either::Right(r) => r.rewrite_exprs_recursive(rewriter),
};
self.input = rewrite;
}
}

/// from inner `BoundQuery` to `Relation::Subquery`
impl From<BoundShare> for Relation {
fn from(value: BoundShare) -> Self {
let Either::Left(q) = value.input else {
// leave it intact
return Self::Share(Box::new(value));
};
Self::Subquery(Box::new(BoundSubquery {
query: q,
lateral: false,
}))
}
}
30 changes: 12 additions & 18 deletions src/frontend/src/planner/relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,7 @@ impl Planner {
} => self.plan_table_function(tf, with_ordinality),
Relation::Watermark(tf) => self.plan_watermark(*tf),
// note that rcte (i.e., RecursiveUnion) is included *implicitly* in share.
Relation::Share(share) => {
if let Either::Right(_) = share.input.clone() {
bail_not_implemented!(issue = 15135, "recursive CTE is not supported");
}
self.plan_share(*share)
}
Relation::Share(share) => self.plan_share(*share),
Relation::BackCteRef(..) => {
bail_not_implemented!(issue = 15135, "recursive CTE is not supported")
}
Expand Down Expand Up @@ -223,10 +218,15 @@ impl Planner {
}

pub(super) fn plan_share(&mut self, share: BoundShare) -> Result<PlanRef> {
let Either::Left(nonrecursive_query) = share.input else {
bail_not_implemented!(issue = 15135, "recursive CTE is not supported");
};
let id = share.share_id;
match self.share_cache.get(&id) {
None => {
let result = self.plan_relation(share.into())?;
let result = self
.plan_query(nonrecursive_query)?
.into_unordered_subplan();
let logical_share = LogicalShare::create(result);
self.share_cache.insert(id, logical_share.clone());
Ok(logical_share)
Expand All @@ -253,17 +253,11 @@ impl Planner {
.iter()
.map(|col| col.data_type().clone())
.collect(),
Relation::Subquery(q) => q
.query
.schema()
.fields
.iter()
.map(|f| f.data_type())
.collect(),
Relation::Share(share) => {
let s = share.clone();
Self::collect_col_data_types_for_tumble_window(&((*s).into()))?
}
Relation::Subquery(q) => q.query.schema().data_types(),
Relation::Share(share) => match &share.input {
Either::Left(nonrecursive) => nonrecursive.schema().data_types(),
Either::Right(recursive) => recursive.schema.data_types(),
},
r => {
return Err(ErrorCode::BindError(format!(
"Invalid input relation to tumble: {r:?}"
Expand Down

0 comments on commit 8adca63

Please sign in to comment.