-
Notifications
You must be signed in to change notification settings - Fork 591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(planner): logical plan for rcte #16483
Conversation
21ee398
to
e18d11a
Compare
for query with recursive t(a) as (
values (1)
union all
select a + 1 from t where a < 100
)
select * from t; the current logical plan is as below.
|
} | ||
|
||
impl<PlanRef: GenericPlanRef> CteRef<PlanRef> { | ||
pub fn get_cte_ref(&self) -> Option<optimizer::plan_node::PlanRef> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is to serve as the "circle reference" purpose and point back to the (later) constructed & planned RecursiveUnion
, but I'm not pretty sure if this is okay especially for subsequent to_batch
and to_stream
.
Can you add some planner tests? src/frontend/planner_test/README.md |
│ └─LogicalValues { rows: [[]], schema: Schema { fields: [] } } | ||
└─LogicalProject { exprs: [(1:Int32 + 1:Int32) as $expr1] } | ||
└─LogicalFilter { predicate: (1:Int32 < 10:Int32) } | ||
└─LogicalCteRef { share_id: 0 } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the single share_id
here is a reference to the actual PlanRef
stored in OptimizerContext::rcte_cache
, should be used in subsequent batch/stream plans' generation.
cc @chenzl25 @TennyZhuang @xiangjinwu for any potential problem of this approach.
├─LogicalProject { exprs: [1:Int32] } | ||
│ └─LogicalValues { rows: [[]], schema: Schema { fields: [] } } | ||
└─LogicalProject { exprs: [(1:Int32 + 1:Int32) as $expr1] } | ||
└─LogicalFilter { predicate: (1:Int32 < 10:Int32) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
plus, I'm afraid currently its hard to handle the rcte without explicit columns being specified (i.e., using casting to implicit achieve recursive property here) - any idea on this?
i.e., we should treat 1 AS a
as input ref (column) rather than a constant.
└─LogicalRecursiveUnion { id: 4 } | ||
├─LogicalProject { exprs: [1:Int32] } | ||
│ └─LogicalValues { rows: [[]], schema: Schema { fields: [] } } | ||
└─LogicalProject { exprs: [(1:Int32 + 1:Int32) as $expr1] } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why it's 1+1
instead of a+1
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I raised the same question just above. 😄
ps. maybe due to the on-the-fly constant folding, but I'm not sure.
#[derive(Clone, Debug)] | ||
pub struct CteRef<PlanRef> { | ||
share_id: ShareId, | ||
base: PlanRef, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, this base
shouldn't be used after planning, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the base
here refers to the left-hand side base case for a rcte, and the use here is just to fetch the schema
and the OptimizerContextRef
- since plan_recursive_union
will finish after plan_cte_ref
.
I'm not sure what you exactly mean by "shouldn't be used after planning", but at least the methods for impl GenericPlanRef
need this base
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean once we have rcte_cache_plan
, it should be always true to use rcte_cache_plan
, right? Maybe we should add an assertion to ensure schema, stream_key and functional_dependency are the same between rcte_cache_plan
and the base
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean once we have rcte_cache_plan, it should be always true to use rcte_cache_plan
yep, problem (and reason for base
here) is we need to handle the gap between plan_cte_ref
and plan_recursive_union
.
fn schema(&self) -> Schema { | ||
self.base.schema().clone() | ||
} | ||
|
||
fn stream_key(&self) -> Option<Vec<usize>> { | ||
if let Some(plan_ref) = self.get_cte_ref() { | ||
plan_ref | ||
.stream_key() | ||
.map(|s| s.iter().map(|i| i.to_owned()).collect_vec()) | ||
} else { | ||
self.base | ||
.stream_key() | ||
.map(|s| s.iter().map(|i| i.to_owned()).collect_vec()) | ||
} | ||
} | ||
|
||
fn ctx(&self) -> OptimizerContextRef { | ||
self.base.ctx() | ||
} | ||
|
||
fn functional_dependency(&self) -> FunctionalDependencySet { | ||
self.base.functional_dependency().clone() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why only stream_key
need to check get_cte_ref
and others not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for schema
, I believe it has been aligned during bind_rcte
, ctx
should also be the same since we only have a ref of that.
for functional_dependency
, I'm not sure should we stick to base
's dependency or the later RecursiveUnion
's.
pub(super) fn plan_recursive_union( | ||
&mut self, | ||
base: BoundSetExpr, | ||
recursive: BoundSetExpr, | ||
id: ShareId, | ||
) -> Result<PlanRef> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about change the signature to fn plan_recursive_union(&mut self, recursiveUnion: RecursiveUnion)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the id
here needs to be passed in plan_share
, so it at least needs to include an extra id
field in addition to the RecursiveUnion
- maybe keeping the current signature is clear enough?
let base = self.plan_set_expr(base, vec![], &[])?; | ||
let recursive = self.plan_set_expr(recursive, vec![], &[])?; | ||
let plan = LogicalRecursiveUnion::create(base, recursive); | ||
self.ctx.insert_rcte_cache_plan(id, plan.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find that the mapping here won't change anymore, so the PlanRef
always points to a specific plan, considering that the optimizer performs a column pruning optimization on LogicalRecursiveUnion
, it will generate a different PlanRef
, but the rcte_cache
would be stale.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then we could update the rcte_cache
whenever prune_col
is called for LogicalRecursiveUnion
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I didn't see this logic for LogicalRecursiveUnion
in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's in the latest commit in #16673 - since I can't update this branch anymore.
needs to merge with main, otherwise #16673 is too large for review. |
How about closing the PR and use #16673 directly? |
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
after this pr we are able to generate logical plan for
rcte
.to_batch
andto_stream
will be supported in subsequent pr(s).related: #15135.
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.