Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(planner): logical plan for rcte #16483

Closed
wants to merge 14 commits into from
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
- name: basic
sql: WITH RECURSIVE t1 AS (SELECT 1 AS a UNION ALL SELECT a + 1 FROM t1 WHERE a < 10) SELECT * FROM t1;
expected_outputs:
- planner_error
- logical_plan
- name: output column follows lhs
sql: WITH RECURSIVE t1 AS (SELECT 1 AS a UNION ALL SELECT a + 1 FROM t1 WHERE a < 10) SELECT a FROM t1;
expected_outputs:
- planner_error
- logical_plan
- name: with normal column
sql: WITH RECURSIVE t(a) AS (VALUES(1) UNION ALL SELECT a + 1 FROM t WHERE a < 100) SELECT * FROM t;
expected_outputs:
- logical_plan
- name: name a is leaked outside
sql: WITH RECURSIVE t1 AS (SELECT 1 AS a UNION ALL SELECT a + 1 FROM t1 WHERE a < 10) SELECT a;
expected_outputs:
- binder_error
- binder_error
31 changes: 25 additions & 6 deletions src/frontend/planner_test/tests/testdata/output/recursive_cte.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,33 @@
# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.
- name: basic
sql: WITH RECURSIVE t1 AS (SELECT 1 AS a UNION ALL SELECT a + 1 FROM t1 WHERE a < 10) SELECT * FROM t1;
planner_error: |-
Feature is not yet implemented: recursive CTE is not supported
Tracking issue: https://github.com/risingwavelabs/risingwave/issues/15135
logical_plan: |-
LogicalProject { exprs: [$expr1] }
└─LogicalRecursiveUnion { id: 4 }
├─LogicalProject { exprs: [1:Int32] }
│ └─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
└─LogicalProject { exprs: [(1:Int32 + 1:Int32) as $expr1] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it's 1+1 instead of a+1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I raised the same question just above. 😄

ps. maybe due to the on-the-fly constant folding, but I'm not sure.

└─LogicalFilter { predicate: (1:Int32 < 10:Int32) }
└─LogicalCteRef { share_id: 0 }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the single share_id here is a reference to the actual PlanRef stored in OptimizerContext::rcte_cache, should be used in subsequent batch/stream plans' generation.

cc @chenzl25 @TennyZhuang @xiangjinwu for any potential problem of this approach.

- name: output column follows lhs
sql: WITH RECURSIVE t1 AS (SELECT 1 AS a UNION ALL SELECT a + 1 FROM t1 WHERE a < 10) SELECT a FROM t1;
planner_error: |-
Feature is not yet implemented: recursive CTE is not supported
Tracking issue: https://github.com/risingwavelabs/risingwave/issues/15135
logical_plan: |-
LogicalProject { exprs: [$expr1] }
└─LogicalRecursiveUnion { id: 4 }
├─LogicalProject { exprs: [1:Int32] }
│ └─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
└─LogicalProject { exprs: [(1:Int32 + 1:Int32) as $expr1] }
└─LogicalFilter { predicate: (1:Int32 < 10:Int32) }
Copy link
Contributor Author

@xzhseh xzhseh Apr 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plus, I'm afraid currently its hard to handle the rcte without explicit columns being specified (i.e., using casting to implicit achieve recursive property here) - any idea on this?

i.e., we should treat 1 AS a as input ref (column) rather than a constant.

└─LogicalCteRef { share_id: 0 }
- name: with normal column
sql: WITH RECURSIVE t(a) AS (VALUES(1) UNION ALL SELECT a + 1 FROM t WHERE a < 100) SELECT * FROM t;
logical_plan: |-
LogicalProject { exprs: [$expr1] }
└─LogicalRecursiveUnion { id: 3 }
├─LogicalValues { rows: [[1:Int32]], schema: Schema { fields: [*VALUES*_0.column_0:Int32] } }
└─LogicalProject { exprs: [(*VALUES*_0.column_0 + 1:Int32) as $expr1] }
└─LogicalFilter { predicate: (*VALUES*_0.column_0 < 100:Int32) }
└─LogicalCteRef { share_id: 0 }
- name: name a is leaked outside
sql: WITH RECURSIVE t1 AS (SELECT 1 AS a UNION ALL SELECT a + 1 FROM t1 WHERE a < 10) SELECT a;
binder_error: |
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/binder/bind_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub enum BindingCteState {
#[default]
Init,
/// We know the schema form after the base term resolved.
BaseResolved { schema: Schema },
BaseResolved { base: BoundSetExpr },
/// We get the whole bound result of the (recursive) CTE.
Bound {
query: Either<BoundQuery, RecursiveUnion>,
Expand Down
5 changes: 3 additions & 2 deletions src/frontend/src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ pub use insert::BoundInsert;
use pgwire::pg_server::{Session, SessionId};
pub use query::BoundQuery;
pub use relation::{
BoundBaseTable, BoundJoin, BoundShare, BoundSource, BoundSystemTable, BoundWatermark,
BoundWindowTableFunction, Relation, ResolveQualifiedNameError, WindowTableFunctionKind,
BoundBackCteRef, BoundBaseTable, BoundJoin, BoundShare, BoundSource, BoundSystemTable,
BoundWatermark, BoundWindowTableFunction, Relation, ResolveQualifiedNameError,
WindowTableFunctionKind,
};
pub use select::{BoundDistinct, BoundSelect};
pub use set_expr::*;
Expand Down
4 changes: 1 addition & 3 deletions src/frontend/src/binder/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,9 +423,7 @@ impl Binder {
// reference: <https://www.postgresql.org/docs/16/sql-select.html#:~:text=the%20recursive%20self%2Dreference%20must%20appear%20on%20the%20right%2Dhand%20side%20of%20the%20UNION>
let mut base = self.bind_set_expr(left)?;

entry.borrow_mut().state = BindingCteState::BaseResolved {
schema: base.schema().clone(),
};
entry.borrow_mut().state = BindingCteState::BaseResolved { base: base.clone() };

// Reset context for right side, but keep `cte_to_relation`.
let new_context = std::mem::take(&mut self.context);
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/binder/relation/cte_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
// limitations under the License.

use crate::binder::statement::RewriteExprsRecursive;
use crate::binder::ShareId;
use crate::binder::{BoundSetExpr, ShareId};

/// A CTE reference, currently only used in the back reference of recursive CTE.
/// For the non-recursive one, see [`BoundShare`](super::BoundShare).
#[derive(Debug, Clone)]
pub struct BoundBackCteRef {
#[expect(dead_code)]
pub(crate) share_id: ShareId,
pub(crate) base: BoundSetExpr,
}

impl RewriteExprsRecursive for BoundBackCteRef {
Expand Down
8 changes: 4 additions & 4 deletions src/frontend/src/binder/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use risingwave_sqlparser::ast::{
use thiserror::Error;
use thiserror_ext::AsReport;

use self::cte_ref::BoundBackCteRef;
use super::bind_context::ColumnBinding;
use super::statement::RewriteExprsRecursive;
use crate::binder::bind_context::{BindingCte, BindingCteState};
Expand All @@ -43,6 +42,7 @@ mod table_or_source;
mod watermark;
mod window_table_function;

pub use cte_ref::BoundBackCteRef;
pub use join::BoundJoin;
pub use share::BoundShare;
pub use subquery::BoundSubquery;
Expand Down Expand Up @@ -374,13 +374,13 @@ impl Binder {
BindingCteState::Init => {
Err(ErrorCode::BindError("Base term of recursive CTE not found, consider writing it to left side of the `UNION ALL` operator".to_string()).into())
}
BindingCteState::BaseResolved { schema } => {
BindingCteState::BaseResolved { base } => {
self.bind_table_to_context(
schema.fields.iter().map(|f| (false, f.clone())),
base.schema().fields.iter().map(|f| (false, f.clone())),
table_name.clone(),
Some(original_alias),
)?;
Ok(Relation::BackCteRef(Box::new(BoundBackCteRef { share_id })))
Ok(Relation::BackCteRef(Box::new(BoundBackCteRef { share_id, base })))
}
BindingCteState::Bound { query } => {
let schema = match &query {
Expand Down
16 changes: 16 additions & 0 deletions src/frontend/src/optimizer/optimizer_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@
use core::convert::Into;
use core::fmt::Formatter;
use std::cell::{RefCell, RefMut};
use std::collections::HashMap;
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::Arc;

use risingwave_sqlparser::ast::{ExplainOptions, ExplainType};

use crate::binder::ShareId;
use crate::expr::{CorrelatedId, SessionTimezone};
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::PlanNodeId;
use crate::session::SessionImpl;
use crate::utils::{OverwriteOptions, WithOptions};
use crate::PlanRef;

const RESERVED_ID_NUM: u16 = 10000;

Expand Down Expand Up @@ -58,6 +61,9 @@ pub struct OptimizerContext {
/// Store the configs can be overwritten in with clause
/// if not specified, use the value from session variable.
overwrite_options: OverwriteOptions,
/// Store the mapping between `share_id` and the corresponding
/// `PlanRef`, used by rcte's planning. (e.g., in `LogicalCteRef`)
rcte_cache: RefCell<HashMap<ShareId, PlanRef>>,

_phantom: PhantomUnsend,
}
Expand Down Expand Up @@ -91,6 +97,7 @@ impl OptimizerContext {
next_expr_display_id: RefCell::new(RESERVED_ID_NUM.into()),
total_rule_applied: RefCell::new(0),
overwrite_options,
rcte_cache: RefCell::new(HashMap::new()),
_phantom: Default::default(),
}
}
Expand All @@ -113,6 +120,7 @@ impl OptimizerContext {
next_expr_display_id: RefCell::new(0),
total_rule_applied: RefCell::new(0),
overwrite_options: OverwriteOptions::default(),
rcte_cache: RefCell::new(HashMap::new()),
_phantom: Default::default(),
}
.into()
Expand Down Expand Up @@ -230,6 +238,14 @@ impl OptimizerContext {
pub fn get_session_timezone(&self) -> String {
self.session_timezone.borrow().timezone()
}

pub fn get_rcte_cache_plan(&self, id: &ShareId) -> Option<PlanRef> {
self.rcte_cache.borrow().get(id).cloned()
}

pub fn insert_rcte_cache_plan(&self, id: ShareId, plan: PlanRef) {
self.rcte_cache.borrow_mut().insert(id, plan);
}
}

impl std::fmt::Debug for OptimizerContext {
Expand Down
90 changes: 90 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/cte_ref.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::hash::Hash;

use itertools::Itertools;
use pretty_xmlish::{Pretty, StrAssocArr};
use risingwave_common::catalog::Schema;

use super::{impl_distill_unit_from_fields, GenericPlanNode, GenericPlanRef};
use crate::binder::ShareId;
use crate::optimizer::property::FunctionalDependencySet;
use crate::{optimizer, OptimizerContextRef};

#[derive(Clone, Debug)]
pub struct CteRef<PlanRef> {
share_id: ShareId,
base: PlanRef,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this base shouldn't be used after planning, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the base here refers to the left-hand side base case for a rcte, and the use here is just to fetch the schema and the OptimizerContextRef - since plan_recursive_union will finish after plan_cte_ref.

I'm not sure what you exactly mean by "shouldn't be used after planning", but at least the methods for impl GenericPlanRef need this base.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean once we have rcte_cache_plan, it should be always true to use rcte_cache_plan, right? Maybe we should add an assertion to ensure schema, stream_key and functional_dependency are the same between rcte_cache_plan and the base.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean once we have rcte_cache_plan, it should be always true to use rcte_cache_plan

yep, problem (and reason for base here) is we need to handle the gap between plan_cte_ref and plan_recursive_union.

}

impl<PlanRef> PartialEq for CteRef<PlanRef> {
fn eq(&self, other: &Self) -> bool {
self.share_id == other.share_id
}
}

impl<PlanRef> Eq for CteRef<PlanRef> {}

impl<PlanRef> Hash for CteRef<PlanRef> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.share_id.hash(state);
}
}

impl<PlanRef> CteRef<PlanRef> {
pub fn new(share_id: ShareId, base: PlanRef) -> Self {
Self { share_id, base }
}
}

impl<PlanRef: GenericPlanRef> CteRef<PlanRef> {
pub fn get_cte_ref(&self) -> Option<optimizer::plan_node::PlanRef> {
Copy link
Contributor Author

@xzhseh xzhseh Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to serve as the "circle reference" purpose and point back to the (later) constructed & planned RecursiveUnion, but I'm not pretty sure if this is okay especially for subsequent to_batch and to_stream.

cc @TennyZhuang @xiangjinwu.

self.base.ctx().get_rcte_cache_plan(&self.share_id)
}
}

impl<PlanRef: GenericPlanRef> GenericPlanNode for CteRef<PlanRef> {
fn schema(&self) -> Schema {
self.base.schema().clone()
}

fn stream_key(&self) -> Option<Vec<usize>> {
if let Some(plan_ref) = self.get_cte_ref() {
plan_ref
.stream_key()
.map(|s| s.iter().map(|i| i.to_owned()).collect_vec())
} else {
self.base
.stream_key()
.map(|s| s.iter().map(|i| i.to_owned()).collect_vec())
}
}

fn ctx(&self) -> OptimizerContextRef {
self.base.ctx()
}

fn functional_dependency(&self) -> FunctionalDependencySet {
self.base.functional_dependency().clone()
}
Comment on lines +59 to +81
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why only stream_key need to check get_cte_ref and others not?

Copy link
Contributor Author

@xzhseh xzhseh May 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for schema, I believe it has been aligned during bind_rcte, ctx should also be the same since we only have a ref of that.

for functional_dependency, I'm not sure should we stick to base's dependency or the later RecursiveUnion's.

}

impl<PlanRef: GenericPlanRef> CteRef<PlanRef> {
pub fn fields_pretty<'a>(&self) -> StrAssocArr<'a> {
vec![("share_id", Pretty::debug(&self.share_id))]
}
}

impl_distill_unit_from_fields! {CteRef, GenericPlanRef}
4 changes: 4 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ mod limit;
pub use limit::*;
mod max_one_row;
pub use max_one_row::*;
mod cte_ref;
pub use cte_ref::*;
mod recursive_union;
pub use recursive_union::*;

pub trait DistillUnit {
fn distill_with_name<'a>(&self, name: impl Into<Cow<'a, str>>) -> XmlNode<'a>;
Expand Down
62 changes: 62 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/recursive_union.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use pretty_xmlish::StrAssocArr;
use risingwave_common::catalog::Schema;

use super::{impl_distill_unit_from_fields, GenericPlanNode, GenericPlanRef};
use crate::optimizer::property::FunctionalDependencySet;
use crate::OptimizerContextRef;

/// `RecursiveUnion` returns the union of the rows of its inputs.
/// note: if `all` is false, it needs to eliminate duplicates.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct RecursiveUnion<PlanRef> {
pub base: PlanRef,
pub recursive: PlanRef,
}

impl<PlanRef: GenericPlanRef> GenericPlanNode for RecursiveUnion<PlanRef> {
fn functional_dependency(&self) -> FunctionalDependencySet {
self.recursive.functional_dependency().clone()
}

fn schema(&self) -> Schema {
self.recursive.schema().clone()
}

fn stream_key(&self) -> Option<Vec<usize>> {
let fields_len = self.base.schema().len();
let base = self.base.stream_key();
if let Some(base) = base {
let mut base = base.to_vec();
base.push(fields_len);
Some(base)
} else {
None
}
}

fn ctx(&self) -> OptimizerContextRef {
self.recursive.ctx()
}
}

impl<PlanRef: GenericPlanRef> RecursiveUnion<PlanRef> {
pub fn fields_pretty<'a>(&self) -> StrAssocArr<'a> {
vec![]
}
}

impl_distill_unit_from_fields!(RecursiveUnion, GenericPlanRef);
Loading
Loading