Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make source_semantic_models property accessible from a DataflowPlanNode #1218

Merged
merged 5 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20240516-144603.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Make source semantic models available from DataflowPlanNode instances
time: 2024-05-16T14:46:03.707367-07:00
custom:
Author: tlento
Issue: "1218"
1 change: 1 addition & 0 deletions metricflow-semantics/metricflow_semantics/dag/id_prefix.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class StaticIdPrefix(IdPrefix, Enum, metaclass=EnumMetaClassHelper):
VALUES_GROUP_BY_ITEM_RESOLUTION_NODE = "vr"

DATAFLOW_PLAN_PREFIX = "dfp"
DATAFLOW_PLAN_SUBGRAPH_PREFIX = "dfpsub"
OPTIMIZED_DATAFLOW_PLAN_PREFIX = "dfpo"
SQL_QUERY_PLAN_PREFIX = "sqp"
EXEC_PLAN_PREFIX = "ep"
Expand Down
48 changes: 47 additions & 1 deletion metricflow/dataflow/dataflow_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@
import logging
import typing
from abc import ABC, abstractmethod
from typing import Generic, Optional, Sequence, Set, Type, TypeVar
from typing import FrozenSet, Generic, Optional, Sequence, Set, Type, TypeVar

import more_itertools
from metricflow_semantics.dag.id_prefix import StaticIdPrefix
from metricflow_semantics.dag.mf_dag import DagId, DagNode, MetricFlowDag, NodeId
from metricflow_semantics.specs.spec_classes import LinkableInstanceSpec
from metricflow_semantics.visitor import Visitable, VisitorOutputT

if typing.TYPE_CHECKING:
from dbt_semantic_interfaces.references import SemanticModelReference
from metricflow_semantics.specs.spec_classes import LinkableInstanceSpec

from metricflow.dataflow.nodes.add_generated_uuid import AddGeneratedUuidColumnNode
from metricflow.dataflow.nodes.aggregate_measures import AggregateMeasuresNode
from metricflow.dataflow.nodes.combine_aggregated_outputs import CombineAggregatedOutputsNode
Expand Down Expand Up @@ -60,6 +64,24 @@ def parent_nodes(self) -> Sequence[DataflowPlanNode]:
"""Return the nodes where data for this node comes from."""
return self._parent_nodes

@property
def _input_semantic_model(self) -> Optional[SemanticModelReference]:
"""Return the semantic model serving as direct input for this node, if one exists."""
return None

def as_plan(self) -> DataflowPlan:
"""Converter method for taking an arbitrary mode and producing an associated DataflowPlan.

This is useful for doing lookups for plan-level properties at points in the call stack where we only have
a subgraph of a complete plan. For example, the total number of nodes represented by this node and all of
its parents would be a property of a given subgraph of the DAG. Rather than doing recursive property walks
inside of each node, we make those properties of the DataflowPlan, and this node-level converter makes
such properties easily accessible.
"""
return DataflowPlan(
sink_nodes=(self,), plan_id=DagId.from_id_prefix(id_prefix=StaticIdPrefix.DATAFLOW_PLAN_SUBGRAPH_PREFIX)
)

@abstractmethod
def accept(self, visitor: DataflowPlanNodeVisitor[VisitorOutputT]) -> VisitorOutputT:
"""Called when a visitor needs to visit this node."""
Expand Down Expand Up @@ -188,3 +210,27 @@ def __init__(self, sink_nodes: Sequence[DataflowPlanNode], plan_id: Optional[Dag
@property
def sink_node(self) -> DataflowPlanNode: # noqa: D102
return self._sink_nodes[0]

@staticmethod
def __all_nodes_in_subgraph(node: DataflowPlanNode) -> Sequence[DataflowPlanNode]:
"""Node accessor for retrieving a flattened sequence of all nodes in the subgraph upstream of the input node.

Useful for gathering nodes for subtype-agnostic operations, such as common property access or simple counts.
"""
flattened_parent_subgraphs = tuple(
more_itertools.collapse(
DataflowPlan.__all_nodes_in_subgraph(parent_node) for parent_node in node.parent_nodes
)
)
return (node,) + flattened_parent_subgraphs

@property
def source_semantic_models(self) -> FrozenSet[SemanticModelReference]:
"""Return the complete set of source semantic models for this DataflowPlan."""
return frozenset(
[
node._input_semantic_model
for node in DataflowPlan.__all_nodes_in_subgraph(self.sink_node)
if node._input_semantic_model is not None
]
)
10 changes: 9 additions & 1 deletion metricflow/dataflow/nodes/read_sql_source.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from __future__ import annotations

import textwrap
from typing import Sequence
from typing import Optional, Sequence

import jinja2
from dbt_semantic_interfaces.references import SemanticModelReference
from metricflow_semantics.dag.id_prefix import IdPrefix, StaticIdPrefix
from metricflow_semantics.dag.mf_dag import DisplayedProperty
from metricflow_semantics.visitor import VisitorOutputT
from typing_extensions import override

from metricflow.dataflow.dataflow_plan import DataflowPlanNode, DataflowPlanNodeVisitor
from metricflow.dataset.sql_dataset import SqlDataSet
Expand All @@ -31,6 +33,12 @@ def id_prefix(cls) -> IdPrefix: # noqa: D102
def accept(self, visitor: DataflowPlanNodeVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D102
return visitor.visit_source_node(self)

@override
@property
def _input_semantic_model(self) -> Optional[SemanticModelReference]:
"""Return the semantic model serving as direct input for this node, if one exists."""
return self.data_set.semantic_model_reference

@property
def data_set(self) -> SqlDataSet:
"""Return the data set that this source represents and is passed to the child nodes."""
Expand Down
55 changes: 55 additions & 0 deletions tests_metricflow/dataflow/test_dataflow_plan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""Tests for operations on dataflow plans and dataflow plan nodes."""

from __future__ import annotations

from dbt_semantic_interfaces.references import EntityReference, SemanticModelReference
from metricflow_semantics.specs.query_spec import MetricFlowQuerySpec
from metricflow_semantics.specs.spec_classes import (
DimensionSpec,
MetricSpec,
)

from metricflow.dataflow.builder.dataflow_plan_builder import DataflowPlanBuilder


def test_source_semantic_models_accessor(
dataflow_plan_builder: DataflowPlanBuilder,
) -> None:
"""Tests source semantic models access for a simple query plan."""
dataflow_plan = dataflow_plan_builder.build_plan(
MetricFlowQuerySpec(
metric_specs=(MetricSpec(element_name="bookings"),),
)
)

assert dataflow_plan.source_semantic_models == frozenset(
[SemanticModelReference(semantic_model_name="bookings_source")]
)


def test_multi_hop_joined_source_semantic_models_accessor(
dataflow_plan_builder: DataflowPlanBuilder,
) -> None:
"""Tests source semantic models access for a multi-hop join plan."""
dataflow_plan = dataflow_plan_builder.build_plan(
MetricFlowQuerySpec(
metric_specs=(MetricSpec(element_name="bookings"),),
dimension_specs=(
DimensionSpec(
element_name="home_state_latest",
entity_links=(
EntityReference(element_name="listing"),
EntityReference(element_name="user"),
),
),
),
)
)

assert dataflow_plan.source_semantic_models == frozenset(
[
SemanticModelReference(semantic_model_name="bookings_source"),
SemanticModelReference(semantic_model_name="listings_latest"),
SemanticModelReference(semantic_model_name="users_latest"),
]
)
Loading