-
Notifications
You must be signed in to change notification settings - Fork 591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(planner): logical plan for rcte #16483
Changes from all commits
c9a8d63
7a43527
52148e8
e18d11a
c51f77a
1580c26
6740299
09c2231
799bd43
86545f8
61fe892
4ae584f
6a2fe60
35b6d61
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,16 @@ | ||
- name: basic | ||
sql: WITH RECURSIVE t1 AS (SELECT 1 AS a UNION ALL SELECT a + 1 FROM t1 WHERE a < 10) SELECT * FROM t1; | ||
expected_outputs: | ||
- planner_error | ||
- logical_plan | ||
- name: output column follows lhs | ||
sql: WITH RECURSIVE t1 AS (SELECT 1 AS a UNION ALL SELECT a + 1 FROM t1 WHERE a < 10) SELECT a FROM t1; | ||
expected_outputs: | ||
- planner_error | ||
- logical_plan | ||
- name: with normal column | ||
sql: WITH RECURSIVE t(a) AS (VALUES(1) UNION ALL SELECT a + 1 FROM t WHERE a < 100) SELECT * FROM t; | ||
expected_outputs: | ||
- logical_plan | ||
- name: name a is leaked outside | ||
sql: WITH RECURSIVE t1 AS (SELECT 1 AS a UNION ALL SELECT a + 1 FROM t1 WHERE a < 10) SELECT a; | ||
expected_outputs: | ||
- binder_error | ||
- binder_error |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,33 @@ | ||
# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information. | ||
- name: basic | ||
sql: WITH RECURSIVE t1 AS (SELECT 1 AS a UNION ALL SELECT a + 1 FROM t1 WHERE a < 10) SELECT * FROM t1; | ||
planner_error: |- | ||
Feature is not yet implemented: recursive CTE is not supported | ||
Tracking issue: https://github.com/risingwavelabs/risingwave/issues/15135 | ||
logical_plan: |- | ||
LogicalProject { exprs: [$expr1] } | ||
└─LogicalRecursiveUnion { id: 4 } | ||
├─LogicalProject { exprs: [1:Int32] } | ||
│ └─LogicalValues { rows: [[]], schema: Schema { fields: [] } } | ||
└─LogicalProject { exprs: [(1:Int32 + 1:Int32) as $expr1] } | ||
└─LogicalFilter { predicate: (1:Int32 < 10:Int32) } | ||
└─LogicalCteRef { share_id: 0 } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the single cc @chenzl25 @TennyZhuang @xiangjinwu for any potential problem of this approach. |
||
- name: output column follows lhs | ||
sql: WITH RECURSIVE t1 AS (SELECT 1 AS a UNION ALL SELECT a + 1 FROM t1 WHERE a < 10) SELECT a FROM t1; | ||
planner_error: |- | ||
Feature is not yet implemented: recursive CTE is not supported | ||
Tracking issue: https://github.com/risingwavelabs/risingwave/issues/15135 | ||
logical_plan: |- | ||
LogicalProject { exprs: [$expr1] } | ||
└─LogicalRecursiveUnion { id: 4 } | ||
├─LogicalProject { exprs: [1:Int32] } | ||
│ └─LogicalValues { rows: [[]], schema: Schema { fields: [] } } | ||
└─LogicalProject { exprs: [(1:Int32 + 1:Int32) as $expr1] } | ||
└─LogicalFilter { predicate: (1:Int32 < 10:Int32) } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. plus, I'm afraid currently its hard to handle the rcte without explicit columns being specified (i.e., using casting to implicit achieve recursive property here) - any idea on this? i.e., we should treat |
||
└─LogicalCteRef { share_id: 0 } | ||
- name: with normal column | ||
sql: WITH RECURSIVE t(a) AS (VALUES(1) UNION ALL SELECT a + 1 FROM t WHERE a < 100) SELECT * FROM t; | ||
logical_plan: |- | ||
LogicalProject { exprs: [$expr1] } | ||
└─LogicalRecursiveUnion { id: 3 } | ||
├─LogicalValues { rows: [[1:Int32]], schema: Schema { fields: [*VALUES*_0.column_0:Int32] } } | ||
└─LogicalProject { exprs: [(*VALUES*_0.column_0 + 1:Int32) as $expr1] } | ||
└─LogicalFilter { predicate: (*VALUES*_0.column_0 < 100:Int32) } | ||
└─LogicalCteRef { share_id: 0 } | ||
- name: name a is leaked outside | ||
sql: WITH RECURSIVE t1 AS (SELECT 1 AS a UNION ALL SELECT a + 1 FROM t1 WHERE a < 10) SELECT a; | ||
binder_error: | | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
// Copyright 2024 RisingWave Labs | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
use std::hash::Hash; | ||
|
||
use itertools::Itertools; | ||
use pretty_xmlish::{Pretty, StrAssocArr}; | ||
use risingwave_common::catalog::Schema; | ||
|
||
use super::{impl_distill_unit_from_fields, GenericPlanNode, GenericPlanRef}; | ||
use crate::binder::ShareId; | ||
use crate::optimizer::property::FunctionalDependencySet; | ||
use crate::{optimizer, OptimizerContextRef}; | ||
|
||
#[derive(Clone, Debug)] | ||
pub struct CteRef<PlanRef> { | ||
share_id: ShareId, | ||
base: PlanRef, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIUC, this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the I'm not sure what you exactly mean by "shouldn't be used after planning", but at least the methods for impl There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mean once we have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
yep, problem (and reason for |
||
} | ||
|
||
impl<PlanRef> PartialEq for CteRef<PlanRef> { | ||
fn eq(&self, other: &Self) -> bool { | ||
self.share_id == other.share_id | ||
} | ||
} | ||
|
||
impl<PlanRef> Eq for CteRef<PlanRef> {} | ||
|
||
impl<PlanRef> Hash for CteRef<PlanRef> { | ||
fn hash<H: std::hash::Hasher>(&self, state: &mut H) { | ||
self.share_id.hash(state); | ||
} | ||
} | ||
|
||
impl<PlanRef> CteRef<PlanRef> { | ||
pub fn new(share_id: ShareId, base: PlanRef) -> Self { | ||
Self { share_id, base } | ||
} | ||
} | ||
|
||
impl<PlanRef: GenericPlanRef> CteRef<PlanRef> { | ||
pub fn get_cte_ref(&self) -> Option<optimizer::plan_node::PlanRef> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is to serve as the "circle reference" purpose and point back to the (later) constructed & planned |
||
self.base.ctx().get_rcte_cache_plan(&self.share_id) | ||
} | ||
} | ||
|
||
impl<PlanRef: GenericPlanRef> GenericPlanNode for CteRef<PlanRef> { | ||
fn schema(&self) -> Schema { | ||
self.base.schema().clone() | ||
} | ||
|
||
fn stream_key(&self) -> Option<Vec<usize>> { | ||
if let Some(plan_ref) = self.get_cte_ref() { | ||
plan_ref | ||
.stream_key() | ||
.map(|s| s.iter().map(|i| i.to_owned()).collect_vec()) | ||
} else { | ||
self.base | ||
.stream_key() | ||
.map(|s| s.iter().map(|i| i.to_owned()).collect_vec()) | ||
} | ||
} | ||
|
||
fn ctx(&self) -> OptimizerContextRef { | ||
self.base.ctx() | ||
} | ||
|
||
fn functional_dependency(&self) -> FunctionalDependencySet { | ||
self.base.functional_dependency().clone() | ||
} | ||
Comment on lines
+59
to
+81
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why only There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for for |
||
} | ||
|
||
impl<PlanRef: GenericPlanRef> CteRef<PlanRef> { | ||
pub fn fields_pretty<'a>(&self) -> StrAssocArr<'a> { | ||
vec![("share_id", Pretty::debug(&self.share_id))] | ||
} | ||
} | ||
|
||
impl_distill_unit_from_fields! {CteRef, GenericPlanRef} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
// Copyright 2024 RisingWave Labs | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
use pretty_xmlish::StrAssocArr; | ||
use risingwave_common::catalog::Schema; | ||
|
||
use super::{impl_distill_unit_from_fields, GenericPlanNode, GenericPlanRef}; | ||
use crate::optimizer::property::FunctionalDependencySet; | ||
use crate::OptimizerContextRef; | ||
|
||
/// `RecursiveUnion` returns the union of the rows of its inputs. | ||
/// note: if `all` is false, it needs to eliminate duplicates. | ||
#[derive(Debug, Clone, PartialEq, Eq, Hash)] | ||
pub struct RecursiveUnion<PlanRef> { | ||
pub base: PlanRef, | ||
pub recursive: PlanRef, | ||
} | ||
|
||
impl<PlanRef: GenericPlanRef> GenericPlanNode for RecursiveUnion<PlanRef> { | ||
fn functional_dependency(&self) -> FunctionalDependencySet { | ||
self.recursive.functional_dependency().clone() | ||
} | ||
|
||
fn schema(&self) -> Schema { | ||
self.recursive.schema().clone() | ||
} | ||
|
||
fn stream_key(&self) -> Option<Vec<usize>> { | ||
let fields_len = self.base.schema().len(); | ||
let base = self.base.stream_key(); | ||
if let Some(base) = base { | ||
let mut base = base.to_vec(); | ||
base.push(fields_len); | ||
Some(base) | ||
} else { | ||
None | ||
} | ||
} | ||
|
||
fn ctx(&self) -> OptimizerContextRef { | ||
self.recursive.ctx() | ||
} | ||
} | ||
|
||
impl<PlanRef: GenericPlanRef> RecursiveUnion<PlanRef> { | ||
pub fn fields_pretty<'a>(&self) -> StrAssocArr<'a> { | ||
vec![] | ||
} | ||
} | ||
|
||
impl_distill_unit_from_fields!(RecursiveUnion, GenericPlanRef); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why it's
1+1
instead ofa+1
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I raised the same question just above. 😄
ps. maybe due to the on-the-fly constant folding, but I'm not sure.