From 0ce550c4d114ac3057c8bb07c3018dd884ddfd02 Mon Sep 17 00:00:00 2001 From: tlento Date: Wed, 15 May 2024 14:59:47 -0700 Subject: [PATCH] Add source_semantic_models property to DataflowPlanNode This commit adds the source_semantic_models property to the DataflowPlanNode, with a base case implementation of returning the recursively-collected set of all source semantic models fed into the node parents, and the single necessary override in ReadSqlSourceNode to populate the semantic model reference as appropriate. We currently have two use-cases for this, one in the cloud product that needs the semantic model inputs for a dataflow plan, and the upcoming predicate pushdown evaluation which needs the semantic model inputs for a given DataflowPlanNode. This is presented as a stand-alone change for discussion, as it represents a departure from our previous de facto approach of limiting all traversal operations to visitor classes. There are, in fact, three alternatives to adding this property which would all suffice for the purposes of predicate pushdown: 1. Write a separate class to do the traversal and fetch the property only from the ReadSqlSourceNodes. 2. Use an existing property accessor to collect this information, either via restricting return types from the SourceNodeSet or by using the existing resolver and consolidating the column-level semantic model references 3. Add this property as seen here. The second option will not address the case on the cloud side, so I chose between the traversal and the approach seen here. I went with this for the following reasons: 1. This is, conceptually, a property of the DataflowPlanNode. Up until now the existence of this property has been implicit - at some point, at the root of all executable DataflowPlan DAGs, is at least one ReadSqlSourceNode sourced from a semantic model. Adding an explicit accessor makes it more obvious that this is our baseline expectation. The fact that it can be an empty set is a shortcoming, but not a terrible one. 2. This limits the scope of what we need to import across module (or even repository) boundaries in order to access this property information. With a separate visitor, we'd have to always import a separate class that implements the DataflowPlanNodeVisitor interface and pulls in those other elements. 3. The property return type is always obvious to the caller, who likely does not care how we got the information. The disadvantages here are: 1. This is the first break in our current "traversals are always visitors" model, which is a bit annoying as now we need to look in two places to find traversal operations. However, this is already true on the other side - we have a number of implicit property accessors which we fetch via visitors - and I believe we'll end up in a better place if we move those to be properties instead. 2. There may be other recursive property access operations we need to make, and these may be scattered around across multiple node objects. There are ways to consolidate this - including allowing a visitor-style class to operate on these nodes and encapsulating it within the property accesor itself - but that is not necessary at this juncture. --- metricflow/dataflow/dataflow_plan.py | 9 ++- metricflow/dataflow/nodes/read_sql_source.py | 13 ++++- .../dataflow/test_dataflow_plan.py | 57 +++++++++++++++++++ 3 files changed, 77 insertions(+), 2 deletions(-) create mode 100644 tests_metricflow/dataflow/test_dataflow_plan.py diff --git a/metricflow/dataflow/dataflow_plan.py b/metricflow/dataflow/dataflow_plan.py index 34ba99fee7..c8229e0c1f 100644 --- a/metricflow/dataflow/dataflow_plan.py +++ b/metricflow/dataflow/dataflow_plan.py @@ -2,16 +2,18 @@ from __future__ import annotations +import itertools import logging import typing from abc import ABC, abstractmethod -from typing import Generic, Optional, Sequence, Set, Type, TypeVar +from typing import FrozenSet, Generic, Optional, Sequence, Set, Type, TypeVar from metricflow_semantics.dag.id_prefix import StaticIdPrefix from metricflow_semantics.dag.mf_dag import DagId, DagNode, MetricFlowDag, NodeId from metricflow_semantics.visitor import Visitable, VisitorOutputT if typing.TYPE_CHECKING: + from dbt_semantic_interfaces.references import SemanticModelReference from metricflow_semantics.specs.spec_classes import LinkableInstanceSpec from metricflow.dataflow.nodes.add_generated_uuid import AddGeneratedUuidColumnNode @@ -61,6 +63,11 @@ def parent_nodes(self) -> Sequence[DataflowPlanNode]: """Return the nodes where data for this node comes from.""" return self._parent_nodes + @property + def source_semantic_models(self) -> FrozenSet[SemanticModelReference]: + """Return the complete set of source semantic models for this node, collected recursively across all parents.""" + return frozenset(itertools.chain.from_iterable([parent.source_semantic_models for parent in self.parent_nodes])) + @abstractmethod def accept(self, visitor: DataflowPlanNodeVisitor[VisitorOutputT]) -> VisitorOutputT: """Called when a visitor needs to visit this node.""" diff --git a/metricflow/dataflow/nodes/read_sql_source.py b/metricflow/dataflow/nodes/read_sql_source.py index 8c0ce2cc5d..92d4613dd7 100644 --- a/metricflow/dataflow/nodes/read_sql_source.py +++ b/metricflow/dataflow/nodes/read_sql_source.py @@ -1,12 +1,14 @@ from __future__ import annotations import textwrap -from typing import Sequence +from typing import FrozenSet, Sequence import jinja2 +from dbt_semantic_interfaces.references import SemanticModelReference from metricflow_semantics.dag.id_prefix import IdPrefix, StaticIdPrefix from metricflow_semantics.dag.mf_dag import DisplayedProperty from metricflow_semantics.visitor import VisitorOutputT +from typing_extensions import override from metricflow.dataflow.dataflow_plan import BaseOutput, DataflowPlanNode, DataflowPlanNodeVisitor from metricflow.dataset.sql_dataset import SqlDataSet @@ -31,6 +33,15 @@ def id_prefix(cls) -> IdPrefix: # noqa: D102 def accept(self, visitor: DataflowPlanNodeVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D102 return visitor.visit_source_node(self) + @override + @property + def source_semantic_models(self) -> FrozenSet[SemanticModelReference]: + return ( + frozenset([self.data_set.semantic_model_reference]) + if self.data_set.semantic_model_reference + else frozenset() + ) + @property def data_set(self) -> SqlDataSet: """Return the data set that this source represents and is passed to the child nodes.""" diff --git a/tests_metricflow/dataflow/test_dataflow_plan.py b/tests_metricflow/dataflow/test_dataflow_plan.py new file mode 100644 index 0000000000..eec55b9750 --- /dev/null +++ b/tests_metricflow/dataflow/test_dataflow_plan.py @@ -0,0 +1,57 @@ +"""Tests for operations on dataflow plans and dataflow plan nodes.""" + +from __future__ import annotations + +from dbt_semantic_interfaces.references import EntityReference, SemanticModelReference +from metricflow_semantics.specs.query_spec import MetricFlowQuerySpec +from metricflow_semantics.specs.spec_classes import ( + DimensionSpec, + MetricSpec, +) + +from metricflow.dataflow.builder.dataflow_plan_builder import DataflowPlanBuilder + + +def test_source_semantic_models_accessor( + dataflow_plan_builder: DataflowPlanBuilder, +) -> None: + """Tests source semantic models access for a simple query plan.""" + dataflow_plan = dataflow_plan_builder.build_plan( + MetricFlowQuerySpec( + metric_specs=(MetricSpec(element_name="bookings"),), + ) + ) + + assert len(dataflow_plan.sink_nodes) == 1, "Dataflow plan should have exactly one sink node." + assert dataflow_plan.sink_nodes[0].source_semantic_models == frozenset( + [SemanticModelReference(semantic_model_name="bookings_source")] + ) + + +def test_multi_hop_joined_source_semantic_models_accessor( + dataflow_plan_builder: DataflowPlanBuilder, +) -> None: + """Tests source semantic models access for a multi-hop join plan.""" + dataflow_plan = dataflow_plan_builder.build_plan( + MetricFlowQuerySpec( + metric_specs=(MetricSpec(element_name="bookings"),), + dimension_specs=( + DimensionSpec( + element_name="home_state_latest", + entity_links=( + EntityReference(element_name="listing"), + EntityReference(element_name="user"), + ), + ), + ), + ) + ) + + assert len(dataflow_plan.sink_nodes) == 1, "Dataflow plan should have exactly one sink node." + assert dataflow_plan.sink_nodes[0].source_semantic_models == frozenset( + [ + SemanticModelReference(semantic_model_name="bookings_source"), + SemanticModelReference(semantic_model_name="listings_latest"), + SemanticModelReference(semantic_model_name="users_latest"), + ] + )