-
Notifications
You must be signed in to change notification settings - Fork 97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a method to figure out common nodes in a dataflow plan #1520
Conversation
Thank you for your pull request! We could not find a changelog entry for this change. For details on how to document a change, see the contributing guide. |
5e3fbd4
to
030886e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few small comments inline, but overall looks great! 🚀 🚀 🚀
@@ -81,6 +86,12 @@ def aggregated_to_elements(self) -> Set[LinkableInstanceSpec]: | |||
"""Indicates that the node has been aggregated to these specs, guaranteeing uniqueness in all combinations.""" | |||
return set() | |||
|
|||
def __lt__(self, other: ComparisonAnyType) -> bool: # noqa: D105 | |||
if not isinstance(other, DataflowPlanNode): | |||
raise NotImplementedError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add an error message here in case this somehow gets hit? Seems similar to a bare AssertionError
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually a special value that is supposed to be returned for __lt__
when you try to compare two objects that aren't comparable:
|
||
|
||
class DataflowPlanAnalyzer: | ||
"""CLass to determine more complex properties of the dataflow plan. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Class*
return tuple(sorted(dataflow_plan.sink_node.accept(common_branches_visitor))) | ||
|
||
|
||
class _CountCommonDataflowNodeVisitor(DataflowPlanNodeVisitorWithDefaultHandler[None]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This isn't counting just common nodes, it's counting all nodes (i.e., you'll get some nodes that are just used once, so they're not common). Could change the name of the class and related variables to reflect that - e.g., _CountDataflowNodeVisitor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
|
||
|
||
class _CountCommonDataflowNodeVisitor(DataflowPlanNodeVisitorWithDefaultHandler[None]): | ||
"""Helper visitor to build a dict from a node in the plan to the number of times it appears in the plans.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This only traverses one dataflow plan at a time, right? If so, plan*
instead of plans
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
raise NotImplementedError | ||
|
||
|
||
class DataflowPlanNodeVisitorWithDefaultHandler(DataflowPlanNodeVisitor[VisitorOutputT], Generic[VisitorOutputT]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Appreciate this reduction in boilerplate! 🙏
|
||
@override | ||
def _default_handler(self, node: DataflowPlanNode) -> FrozenSet[DataflowPlanNode]: | ||
if node in self._common_nodes: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It took me a minute to understand why this gets the largest common nodes! Mind adding a comment or a docstring to explain that this early return works because we're traversing from largest to smallest?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
docstring: | ||
For a known case, test that a metric computation node is identified as a common branch. | ||
|
||
A query for `bookings` and `bookings_per_booker` should have the computation for `bookings` as a common branch in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an interesting case! The ComputeMetricsNode
for bookings
is common, but within that node there is another common node that could also be used as a CTE - the ReadSqlSourceNode
for the bookings_source
table, which is also used in the bookers
branch. Curious if you plan to add that optimization later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct, and the ReadSqlSourceNode
is included in the result snapshot under common_branch_1
below. I should probably add newlines because the different items are easy to miss.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh nice! Yep I missed that
This PR: * Adds `SqlGenerationOptionSet` to encapsulate the options for how SQL should be generated from the dataflow plan. * Adds `O5` level that uses all previous optimizers and generates CTEs. * Updates `DataflowToSqlQueryPlanConverter` to use `SqlGenerationOptionSet`. When the `allow_cte` option it set, converts the common nodes (as implemented in #1520) in a dataflow plan to map to a CTE instead of a subquery. Since CTEs are not generated by default, the generated SQL is the same for test cases and there are no snapshot changes (aside from the ones that specifically test this feature).
This PR adds a method to figure out nodes in a dataflow plan that appear more than once. i.e. a node that is the parent of multiple nodes. These common nodes indicate operations where a computation is reused, e.g. a metric that is used in the computation of multiple derived metrics in a query. These nodes will be later used to generate CTEs.