Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(planner): logical plan for rcte #16483

Closed
wants to merge 14 commits into from
Closed

feat(planner): logical plan for rcte #16483

wants to merge 14 commits into from

Conversation

xzhseh
Copy link
Contributor

@xzhseh xzhseh commented Apr 25, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

after this pr we are able to generate logical plan for rcte.

to_batch and to_stream will be supported in subsequent pr(s).

related: #15135.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@xzhseh xzhseh requested a review from TennyZhuang April 25, 2024 02:47
@xzhseh xzhseh self-assigned this Apr 25, 2024
@xzhseh xzhseh force-pushed the xzhseh/plan-rcte branch from 21ee398 to e18d11a Compare April 25, 2024 03:08
@xzhseh xzhseh marked this pull request as ready for review April 25, 2024 22:15
@xzhseh
Copy link
Contributor Author

xzhseh commented Apr 25, 2024

for query

with recursive t(a) as (
values (1)
union all
select a + 1 from t where a < 100
)
select * from t;

the current logical plan is as below.

after plan relation: PlanRef(
    LogicalRecursiveUnion {
        base: PlanBase {
            id: PlanNodeId(
                10006,
            ),
            ctx: QueryContext { next_plan_node_id = 10006, sql = EXPLAIN WITH RECURSIVE t (a) AS (VALUES (1) UNION ALL SELECT a + 1 FROM t WHERE a < 100) SELECT * FROM t, explain_options = , next_correlated_id = 0, with_options = WithOptions { inner: {} } },
            schema: Schema {
                fields: [
                    $expr10001:Int32,
                ],
            },
            stream_key: None,
            functional_dependency: FunctionalDependencySet {
                column_count: 1,
                strict: [],
            },
            extra: NoExtra,
        },
        core: RecursiveUnion {
            base: PlanRef(
                LogicalValues {
                    base: PlanBase {
                        id: PlanNodeId(
                            10001,
                        ),
                        ctx: QueryContext { next_plan_node_id = 10006, sql = EXPLAIN WITH RECURSIVE t (a) AS (VALUES (1) UNION ALL SELECT a + 1 FROM t WHERE a < 100) SELECT * FROM t, explain_options = , next_correlated_id = 0, with_options = WithOptions { inner: {} } },
                        schema: Schema {
                            fields: [
                                *VALUES*_0.column_0:Int32,
                            ],
                        },
                        stream_key: None,
                        functional_dependency: FunctionalDependencySet {
                            column_count: 1,
                            strict: [],
                        },
                        extra: NoExtra,
                    },
                    rows: [
                        [
                            Literal(
                                Literal {
                                    data: Some(
                                        Int32(
                                            1,
                                        ),
                                    ),
                                    data_type: Some(
                                        Int32,
                                    ),
                                },
                            ),
                        ],
                    ],
                },
            ),
            recursive: PlanRef(
                LogicalProject {
                    base: PlanBase {
                        id: PlanNodeId(
                            10005,
                        ),
                        ctx: QueryContext { next_plan_node_id = 10006, sql = EXPLAIN WITH RECURSIVE t (a) AS (VALUES (1) UNION ALL SELECT a + 1 FROM t WHERE a < 100) SELECT * FROM t, explain_options = , next_correlated_id = 0, with_options = WithOptions { inner: {} } },
                        schema: Schema {
                            fields: [
                                $expr10001:Int32,
                            ],
                        },
                        stream_key: None,
                        functional_dependency: FunctionalDependencySet {
                            column_count: 1,
                            strict: [],
                        },
                        extra: NoExtra,
                    },
                    core: Project {
                        exprs: [
                            FunctionCall(
                                FunctionCall {
                                    func_type: Add,
                                    return_type: Int32,
                                    inputs: [
                                        InputRef(
                                            InputRef {
                                                index: 0,
                                                data_type: Int32,
                                            },
                                        ),
                                        Literal(
                                            Literal {
                                                data: Some(
                                                    Int32(
                                                        1,
                                                    ),
                                                ),
                                                data_type: Some(
                                                    Int32,
                                                ),
                                            },
                                        ),
                                    ],
                                },
                            ),
                        ],
                        field_names: {},
                        input: PlanRef(
                            LogicalFilter {
                                base: PlanBase {
                                    id: PlanNodeId(
                                        10004,
                                    ),
                                    ctx: QueryContext { next_plan_node_id = 10006, sql = EXPLAIN WITH RECURSIVE t (a) AS (VALUES (1) UNION ALL SELECT a + 1 FROM t WHERE a < 100) SELECT * FROM t, explain_options = , next_correlated_id = 0, with_options = WithOptions { inner: {} } },
                                    schema: Schema {
                                        fields: [
                                            *VALUES*_0.column_0:Int32,
                                        ],
                                    },
                                    stream_key: None,
                                    functional_dependency: FunctionalDependencySet {
                                        column_count: 1,
                                        strict: [],
                                    },
                                    extra: NoExtra,
                                },
                                core: Filter {
                                    predicate: Condition {
                                        conjunctions: [
                                            FunctionCall(
                                                FunctionCall {
                                                    func_type: LessThan,
                                                    return_type: Boolean,
                                                    inputs: [
                                                        InputRef(
                                                            InputRef {
                                                                index: 0,
                                                                data_type: Int32,
                                                            },
                                                        ),
                                                        Literal(
                                                            Literal {
                                                                data: Some(
                                                                    Int32(
                                                                        100,
                                                                    ),
                                                                ),
                                                                data_type: Some(
                                                                    Int32,
                                                                ),
                                                            },
                                                        ),
                                                    ],
                                                },
                                            ),
                                        ],
                                    },
                                    input: PlanRef(
                                        LogicalCteRef {
                                            base: PlanBase {
                                                id: PlanNodeId(
                                                    10003,
                                                ),
                                                ctx: QueryContext { next_plan_node_id = 10006, sql = EXPLAIN WITH RECURSIVE t (a) AS (VALUES (1) UNION ALL SELECT a + 1 FROM t WHERE a < 100) SELECT * FROM t, explain_options = , next_correlated_id = 0, with_options = WithOptions { inner: {} } },
                                                schema: Schema {
                                                    fields: [
                                                        *VALUES*_0.column_0:Int32,
                                                    ],
                                                },
                                                stream_key: None,
                                                functional_dependency: FunctionalDependencySet {
                                                    column_count: 1,
                                                    strict: [],
                                                },
                                                extra: NoExtra,
                                            },
                                            core: CteRef {
                                                share_id: 0,
                                                base: PlanRef(
                                                    LogicalValues {
                                                        base: PlanBase {
                                                            id: PlanNodeId(
                                                                10002,
                                                            ),
                                                            ctx: QueryContext { next_plan_node_id = 10006, sql = EXPLAIN WITH RECURSIVE t (a) AS (VALUES (1) UNION ALL SELECT a + 1 FROM t WHERE a < 100) SELECT * FROM t, explain_options = , next_correlated_id = 0, with_options = WithOptions { inner: {} } },
                                                            schema: Schema {
                                                                fields: [
                                                                    *VALUES*_0.column_0:Int32,
                                                                ],
                                                            },
                                                            stream_key: None,
                                                            functional_dependency: FunctionalDependencySet {
                                                                column_count: 1,
                                                                strict: [],
                                                            },
                                                            extra: NoExtra,
                                                        },
                                                        rows: [
                                                            [
                                                                Literal(
                                                                    Literal {
                                                                        data: Some(
                                                                            Int32(
                                                                                1,
                                                                            ),
                                                                        ),
                                                                        data_type: Some(
                                                                            Int32,
                                                                        ),
                                                                    },
                                                                ),
                                                            ],
                                                        ],
                                                    },
                                                ),
                                            },
                                        },
                                    ),
                                },
                            },
                        ),
                        _private: (),
                    },
                },
            ),
        },
    },
)

}

impl<PlanRef: GenericPlanRef> CteRef<PlanRef> {
pub fn get_cte_ref(&self) -> Option<optimizer::plan_node::PlanRef> {
Copy link
Contributor Author

@xzhseh xzhseh Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to serve as the "circle reference" purpose and point back to the (later) constructed & planned RecursiveUnion, but I'm not pretty sure if this is okay especially for subsequent to_batch and to_stream.

cc @TennyZhuang @xiangjinwu.

@TennyZhuang
Copy link
Contributor

Can you add some planner tests?

src/frontend/planner_test/README.md

│ └─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
└─LogicalProject { exprs: [(1:Int32 + 1:Int32) as $expr1] }
└─LogicalFilter { predicate: (1:Int32 < 10:Int32) }
└─LogicalCteRef { share_id: 0 }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the single share_id here is a reference to the actual PlanRef stored in OptimizerContext::rcte_cache, should be used in subsequent batch/stream plans' generation.

cc @chenzl25 @TennyZhuang @xiangjinwu for any potential problem of this approach.

├─LogicalProject { exprs: [1:Int32] }
│ └─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
└─LogicalProject { exprs: [(1:Int32 + 1:Int32) as $expr1] }
└─LogicalFilter { predicate: (1:Int32 < 10:Int32) }
Copy link
Contributor Author

@xzhseh xzhseh Apr 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plus, I'm afraid currently its hard to handle the rcte without explicit columns being specified (i.e., using casting to implicit achieve recursive property here) - any idea on this?

i.e., we should treat 1 AS a as input ref (column) rather than a constant.

└─LogicalRecursiveUnion { id: 4 }
├─LogicalProject { exprs: [1:Int32] }
│ └─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
└─LogicalProject { exprs: [(1:Int32 + 1:Int32) as $expr1] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it's 1+1 instead of a+1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I raised the same question just above. 😄

ps. maybe due to the on-the-fly constant folding, but I'm not sure.

#[derive(Clone, Debug)]
pub struct CteRef<PlanRef> {
share_id: ShareId,
base: PlanRef,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this base shouldn't be used after planning, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the base here refers to the left-hand side base case for a rcte, and the use here is just to fetch the schema and the OptimizerContextRef - since plan_recursive_union will finish after plan_cte_ref.

I'm not sure what you exactly mean by "shouldn't be used after planning", but at least the methods for impl GenericPlanRef need this base.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean once we have rcte_cache_plan, it should be always true to use rcte_cache_plan, right? Maybe we should add an assertion to ensure schema, stream_key and functional_dependency are the same between rcte_cache_plan and the base.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean once we have rcte_cache_plan, it should be always true to use rcte_cache_plan

yep, problem (and reason for base here) is we need to handle the gap between plan_cte_ref and plan_recursive_union.

Comment on lines +59 to +81
fn schema(&self) -> Schema {
self.base.schema().clone()
}

fn stream_key(&self) -> Option<Vec<usize>> {
if let Some(plan_ref) = self.get_cte_ref() {
plan_ref
.stream_key()
.map(|s| s.iter().map(|i| i.to_owned()).collect_vec())
} else {
self.base
.stream_key()
.map(|s| s.iter().map(|i| i.to_owned()).collect_vec())
}
}

fn ctx(&self) -> OptimizerContextRef {
self.base.ctx()
}

fn functional_dependency(&self) -> FunctionalDependencySet {
self.base.functional_dependency().clone()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why only stream_key need to check get_cte_ref and others not?

Copy link
Contributor Author

@xzhseh xzhseh May 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for schema, I believe it has been aligned during bind_rcte, ctx should also be the same since we only have a ref of that.

for functional_dependency, I'm not sure should we stick to base's dependency or the later RecursiveUnion's.

Comment on lines +21 to +26
pub(super) fn plan_recursive_union(
&mut self,
base: BoundSetExpr,
recursive: BoundSetExpr,
id: ShareId,
) -> Result<PlanRef> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about change the signature to fn plan_recursive_union(&mut self, recursiveUnion: RecursiveUnion)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the id here needs to be passed in plan_share, so it at least needs to include an extra id field in addition to the RecursiveUnion - maybe keeping the current signature is clear enough?

let base = self.plan_set_expr(base, vec![], &[])?;
let recursive = self.plan_set_expr(recursive, vec![], &[])?;
let plan = LogicalRecursiveUnion::create(base, recursive);
self.ctx.insert_rcte_cache_plan(id, plan.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find that the mapping here won't change anymore, so the PlanRef always points to a specific plan, considering that the optimizer performs a column pruning optimization on LogicalRecursiveUnion, it will generate a different PlanRef, but the rcte_cache would be stale.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then we could update the rcte_cache whenever prune_col is called for LogicalRecursiveUnion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I didn't see this logic for LogicalRecursiveUnion in this PR.

Copy link
Contributor Author

@xzhseh xzhseh May 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's in the latest commit in #16673 - since I can't update this branch anymore.

@xzhseh
Copy link
Contributor Author

xzhseh commented May 9, 2024

needs to merge with main, otherwise #16673 is too large for review.

cc @chenzl25 @TennyZhuang.

@TennyZhuang
Copy link
Contributor

How about closing the PR and use #16673 directly?

@xzhseh xzhseh closed this May 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants