Skip to content

Commit

Permalink
Add source_semantic_models property to DataflowPlanNode
Browse files Browse the repository at this point in the history
This commit adds the source_semantic_models property to the DataflowPlanNode,
with a base case implementation of returning the recursively-collected set
of all source semantic models fed into the node parents, and the single
necessary override in ReadSqlSourceNode to populate the semantic model
reference as appropriate.

We currently have two use-cases for this, one in the cloud product
that needs the semantic model inputs for a dataflow plan, and the
upcoming predicate pushdown evaluation which needs the semantic model
inputs for a given DataflowPlanNode.

This is presented as a stand-alone change for discussion, as it
represents a departure from our previous de facto approach of limiting
all traversal operations to visitor classes. There are, in fact,
three alternatives to adding this property which would all suffice
for the purposes of predicate pushdown:

1. Write a separate class to do the traversal and fetch the property
only from the ReadSqlSourceNodes.
2. Use an existing property accessor to collect this information,
either via restricting return types from the SourceNodeSet or by
using the existing resolver and consolidating the column-level
semantic model references
3. Add this property as seen here.

The second option will not address the case on the cloud side, so
I chose between the traversal and the approach seen here. I went
with this for the following reasons:

1. This is, conceptually, a property of the DataflowPlanNode. Up until
now the existence of this property has been implicit - at some point,
at the root of all executable DataflowPlan DAGs, is at least one ReadSqlSourceNode
sourced from a semantic model. Adding an explicit accessor makes it more
obvious that this is our baseline expectation. The fact that it can be
an empty set is a shortcoming, but not a terrible one.
2. This limits the scope of what we need to import across module (or
even repository) boundaries in order to access this property information.
With a separate visitor, we'd have to always import a separate class
that implements the DataflowPlanNodeVisitor interface and pulls in
those other elements.
3. The property return type is always obvious to the caller, who likely
does not care how we got the information.

The disadvantages here are:

1. This is the first break in our current "traversals are always visitors"
model, which is a bit annoying as now we need to look in two places to
find traversal operations. However, this is already true on the other side
- we have a number of implicit property accessors which we fetch via visitors -
and I believe we'll end up in a better place if we move those to be properties
instead.
2. There may be other recursive property access operations we need to make,
and these may be scattered around across multiple node objects. There are
ways to consolidate this - including allowing a visitor-style class to operate
on these nodes and encapsulating it within the property accesor itself - but
that is not necessary at this juncture.
  • Loading branch information
tlento committed May 15, 2024
1 parent 26dd729 commit 0ce550c
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 2 deletions.
9 changes: 8 additions & 1 deletion metricflow/dataflow/dataflow_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@

from __future__ import annotations

import itertools
import logging
import typing
from abc import ABC, abstractmethod
from typing import Generic, Optional, Sequence, Set, Type, TypeVar
from typing import FrozenSet, Generic, Optional, Sequence, Set, Type, TypeVar

from metricflow_semantics.dag.id_prefix import StaticIdPrefix
from metricflow_semantics.dag.mf_dag import DagId, DagNode, MetricFlowDag, NodeId
from metricflow_semantics.visitor import Visitable, VisitorOutputT

if typing.TYPE_CHECKING:
from dbt_semantic_interfaces.references import SemanticModelReference
from metricflow_semantics.specs.spec_classes import LinkableInstanceSpec

from metricflow.dataflow.nodes.add_generated_uuid import AddGeneratedUuidColumnNode
Expand Down Expand Up @@ -61,6 +63,11 @@ def parent_nodes(self) -> Sequence[DataflowPlanNode]:
"""Return the nodes where data for this node comes from."""
return self._parent_nodes

@property
def source_semantic_models(self) -> FrozenSet[SemanticModelReference]:
"""Return the complete set of source semantic models for this node, collected recursively across all parents."""
return frozenset(itertools.chain.from_iterable([parent.source_semantic_models for parent in self.parent_nodes]))

@abstractmethod
def accept(self, visitor: DataflowPlanNodeVisitor[VisitorOutputT]) -> VisitorOutputT:
"""Called when a visitor needs to visit this node."""
Expand Down
13 changes: 12 additions & 1 deletion metricflow/dataflow/nodes/read_sql_source.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from __future__ import annotations

import textwrap
from typing import Sequence
from typing import FrozenSet, Sequence

import jinja2
from dbt_semantic_interfaces.references import SemanticModelReference
from metricflow_semantics.dag.id_prefix import IdPrefix, StaticIdPrefix
from metricflow_semantics.dag.mf_dag import DisplayedProperty
from metricflow_semantics.visitor import VisitorOutputT
from typing_extensions import override

from metricflow.dataflow.dataflow_plan import BaseOutput, DataflowPlanNode, DataflowPlanNodeVisitor
from metricflow.dataset.sql_dataset import SqlDataSet
Expand All @@ -31,6 +33,15 @@ def id_prefix(cls) -> IdPrefix: # noqa: D102
def accept(self, visitor: DataflowPlanNodeVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D102
return visitor.visit_source_node(self)

@override
@property
def source_semantic_models(self) -> FrozenSet[SemanticModelReference]:
return (
frozenset([self.data_set.semantic_model_reference])
if self.data_set.semantic_model_reference
else frozenset()
)

@property
def data_set(self) -> SqlDataSet:
"""Return the data set that this source represents and is passed to the child nodes."""
Expand Down
57 changes: 57 additions & 0 deletions tests_metricflow/dataflow/test_dataflow_plan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""Tests for operations on dataflow plans and dataflow plan nodes."""

from __future__ import annotations

from dbt_semantic_interfaces.references import EntityReference, SemanticModelReference
from metricflow_semantics.specs.query_spec import MetricFlowQuerySpec
from metricflow_semantics.specs.spec_classes import (
DimensionSpec,
MetricSpec,
)

from metricflow.dataflow.builder.dataflow_plan_builder import DataflowPlanBuilder


def test_source_semantic_models_accessor(
dataflow_plan_builder: DataflowPlanBuilder,
) -> None:
"""Tests source semantic models access for a simple query plan."""
dataflow_plan = dataflow_plan_builder.build_plan(
MetricFlowQuerySpec(
metric_specs=(MetricSpec(element_name="bookings"),),
)
)

assert len(dataflow_plan.sink_nodes) == 1, "Dataflow plan should have exactly one sink node."
assert dataflow_plan.sink_nodes[0].source_semantic_models == frozenset(
[SemanticModelReference(semantic_model_name="bookings_source")]
)


def test_multi_hop_joined_source_semantic_models_accessor(
dataflow_plan_builder: DataflowPlanBuilder,
) -> None:
"""Tests source semantic models access for a multi-hop join plan."""
dataflow_plan = dataflow_plan_builder.build_plan(
MetricFlowQuerySpec(
metric_specs=(MetricSpec(element_name="bookings"),),
dimension_specs=(
DimensionSpec(
element_name="home_state_latest",
entity_links=(
EntityReference(element_name="listing"),
EntityReference(element_name="user"),
),
),
),
)
)

assert len(dataflow_plan.sink_nodes) == 1, "Dataflow plan should have exactly one sink node."
assert dataflow_plan.sink_nodes[0].source_semantic_models == frozenset(
[
SemanticModelReference(semantic_model_name="bookings_source"),
SemanticModelReference(semantic_model_name="listings_latest"),
SemanticModelReference(semantic_model_name="users_latest"),
]
)

0 comments on commit 0ce550c

Please sign in to comment.