-
Notifications
You must be signed in to change notification settings - Fork 97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove Dataflow Plan Node Types #1205
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I love this PR. It is third in the set of cleanup changes I was planning to make after predicate pushdown, but that ordering had more to do with personal preferences than anything else.
I have not really read it because it's, well, hard to read, and I'm trying to get another feature out the door.
The main problem I ran into here is this PR represents, conceptually, three or four changes in one:
- removing useless type marker nodes
- removing SinkNodes
- simplifying the optimizer return types (which is probably fine to consolidate into the SinkNode removal, and is a separate commit here)
- doing the natural consolidation of BaseOutput into DataflowPlanNode.
If this were split along these - or other equally readable - lines it'd be a "scroll through and make sure nothing looks weird" review. As it is now it's quite a bit more daunting.
Similarly, if this causes some weirdness then viewing via git diff or whatever is going to be a headache since we squash and merge, and it'd be more accessible in every piece of tooling if it was broken into more readily addressable chunks.
Anyway, you can do what you will with this. If you're going to break this up let me know and I won't review it until you're done, otherwise I'll take a pass through when I have more time.
def visit_source_node(self, node: ReadSqlSourceNode) -> ConvertToExecutionPlanResult: | ||
raise NotImplementedError | ||
|
||
@override | ||
def visit_join_on_entities_node(self, node: JoinOnEntitiesNode) -> ConvertToExecutionPlanResult: | ||
raise NotImplementedError | ||
|
||
@override | ||
def visit_aggregate_measures_node(self, node: AggregateMeasuresNode) -> ConvertToExecutionPlanResult: | ||
raise NotImplementedError | ||
|
||
@override | ||
def visit_compute_metrics_node(self, node: ComputeMetricsNode) -> ConvertToExecutionPlanResult: | ||
raise NotImplementedError | ||
|
||
@override | ||
def visit_order_by_limit_node(self, node: OrderByLimitNode) -> ConvertToExecutionPlanResult: | ||
raise NotImplementedError | ||
|
||
@override | ||
def visit_where_constraint_node(self, node: WhereConstraintNode) -> ConvertToExecutionPlanResult: | ||
raise NotImplementedError | ||
|
||
@override | ||
def visit_filter_elements_node(self, node: FilterElementsNode) -> ConvertToExecutionPlanResult: | ||
raise NotImplementedError | ||
|
||
@override | ||
def visit_combine_aggregated_outputs_node(self, node: CombineAggregatedOutputsNode) -> ConvertToExecutionPlanResult: | ||
raise NotImplementedError | ||
|
||
@override | ||
def visit_constrain_time_range_node(self, node: ConstrainTimeRangeNode) -> ConvertToExecutionPlanResult: | ||
raise NotImplementedError | ||
|
||
@override | ||
def visit_join_over_time_range_node(self, node: JoinOverTimeRangeNode) -> ConvertToExecutionPlanResult: | ||
raise NotImplementedError | ||
|
||
@override | ||
def visit_semi_additive_join_node(self, node: SemiAdditiveJoinNode) -> ConvertToExecutionPlanResult: | ||
raise NotImplementedError | ||
|
||
@override | ||
def visit_metric_time_dimension_transform_node( | ||
self, node: MetricTimeDimensionTransformNode | ||
) -> ConvertToExecutionPlanResult: | ||
raise NotImplementedError | ||
|
||
@override | ||
def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> ConvertToExecutionPlanResult: | ||
raise NotImplementedError | ||
|
||
@override | ||
def visit_min_max_node(self, node: MinMaxNode) -> ConvertToExecutionPlanResult: | ||
raise NotImplementedError | ||
|
||
@override | ||
def visit_add_generated_uuid_column_node(self, node: AddGeneratedUuidColumnNode) -> ConvertToExecutionPlanResult: | ||
raise NotImplementedError | ||
|
||
@override | ||
def visit_join_conversion_events_node(self, node: JoinConversionEventsNode) -> ConvertToExecutionPlanResult: | ||
raise NotImplementedError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Speaking of things looking weird, big blocks of green inside mechanical change diffs always make an impression. Now if anybody attempts to convert a dataflow plan to an execution plan using the wrong node type the runtime will blow up with a NotImplementedError. This seems undesirable. While I was never a fan of the existence of the SinkNodeVisitor interface, its one redeeming feature was preventing this from happening. Indeed, I originally suggested it as a "if you must use a visitor to do this graph level property access then please at least make it a different type"
Do you have a follow-up planned where you get rid of this one way or another? My planned stack was going to involve replacing the visitor itself with a property on the DataflowPlan and pausing on removing the execution plan stuff until later just because I don't want to deal with the MFS changes right now, but removing execution plans (so much for our earlier execution plan aspirations....) altogether would be welcome.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consolidating to the one execution plan type we actually use looks like a win to me. Implementing a DagWalker that does not - and in fact absolutely SHOULD NOT - walk the DAG is confusing. Maybe just update that to be a regular class?
Unfortunately, breaking out that change might be a bit tough, so let's go with the assumption that the commits will be as is. If that changes, I'll let you know. |
7b1693b
to
abd1872
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for splitting out the rename, that made a surprisingly big difference!
At some point we should really reconsider the multiple sink nodes with the runtime errors. I don't think we're likely to need multiple sinks anymore, since all we do is render queries to a single output stream (i.e., the outer SELECT statement), and any forking of that output data stream is probably best handled outside of MetricFlow. Not sure if you're doing that upstack or not, but it's a thing to consider.
raise RuntimeError("Can't create a dataflow plan without sink node(s).") | ||
self._sink_output_nodes = tuple(sink_output_nodes) | ||
def __init__(self, sink_nodes: Sequence[DataflowPlanNode], plan_id: Optional[DagId] = None) -> None: # noqa: D107 | ||
assert len(sink_nodes) == 1, f"Exactly 1 sink node is supported. Got: {sink_nodes}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, are we going to formalize this via the type system?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't had a chance to think about how to simplify this, but yeah, that sounds like a good idea.
metricflow/dataflow/dataflow_plan.py
Outdated
def sink_output_node(self) -> DataflowPlanNode: # noqa: D102 | ||
assert len(self._sink_output_nodes) == 1, f"Only 1 sink node supported. Got: {self._sink_output_nodes}" | ||
return self._sink_output_nodes[0] | ||
def checked_sink_node(self) -> DataflowPlanNode: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this just be sink_node? We already have the assertion in the initializer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, updated.
Description
The original class hierarchy for the
DataflowPlanNodes
included types that described the data that was output by the node. However, those turned out to not be useful in practice (e.g.BaseOutput
was the majority of use cases), so this PR removes them.