From 2aaf239817ea4153330ad1d88f63e22e632d5560 Mon Sep 17 00:00:00 2001 From: Courtney Holcomb Date: Tue, 27 Aug 2024 10:56:15 -0700 Subject: [PATCH] Add dataflow plan node JoinToCustomGranularityNode --- .../metricflow_semantics/dag/id_prefix.py | 1 + .../nodes/join_to_custom_granularity.py | 57 +++++++++++++++++++ 2 files changed, 58 insertions(+) create mode 100644 metricflow/dataflow/nodes/join_to_custom_granularity.py diff --git a/metricflow-semantics/metricflow_semantics/dag/id_prefix.py b/metricflow-semantics/metricflow_semantics/dag/id_prefix.py index fac0fd9ccf..b6d1b2791b 100644 --- a/metricflow-semantics/metricflow_semantics/dag/id_prefix.py +++ b/metricflow-semantics/metricflow_semantics/dag/id_prefix.py @@ -50,6 +50,7 @@ class StaticIdPrefix(IdPrefix, Enum, metaclass=EnumMetaClassHelper): DATAFLOW_NODE_SET_MEASURE_AGGREGATION_TIME = "sma" DATAFLOW_NODE_SEMI_ADDITIVE_JOIN_ID_PREFIX = "saj" DATAFLOW_NODE_JOIN_TO_TIME_SPINE_ID_PREFIX = "jts" + DATAFLOW_NODE_JOIN_TO_CUSTOM_GRANULARITY_ID_PREFIX = "jcg" DATAFLOW_NODE_MIN_MAX_ID_PREFIX = "mm" DATAFLOW_NODE_ADD_UUID_COLUMN_PREFIX = "auid" DATAFLOW_NODE_JOIN_CONVERSION_EVENTS_PREFIX = "jce" diff --git a/metricflow/dataflow/nodes/join_to_custom_granularity.py b/metricflow/dataflow/nodes/join_to_custom_granularity.py new file mode 100644 index 0000000000..14e5008c0e --- /dev/null +++ b/metricflow/dataflow/nodes/join_to_custom_granularity.py @@ -0,0 +1,57 @@ +from __future__ import annotations + +from abc import ABC +from dataclasses import dataclass +from typing import Sequence + +from metricflow_semantics.dag.id_prefix import IdPrefix, StaticIdPrefix +from metricflow_semantics.dag.mf_dag import DisplayedProperty +from metricflow_semantics.specs.time_dimension_spec import TimeDimensionSpec +from metricflow_semantics.visitor import VisitorOutputT + +from metricflow.dataflow.dataflow_plan import DataflowPlanNode, DataflowPlanNodeVisitor + + +@dataclass(frozen=True) +class JoinToCustomGranularityNode(DataflowPlanNode, ABC): + """Join parent dataset to time spine dataset to convert time dimension to a custom granularity.""" + + time_dimension_spec: TimeDimensionSpec + + @staticmethod + def create( # noqa: D102 + parent_node: DataflowPlanNode, time_dimension_spec: TimeDimensionSpec + ) -> JoinToCustomGranularityNode: + return JoinToCustomGranularityNode(parent_nodes=(parent_node,), time_dimension_spec=time_dimension_spec) + + @classmethod + def id_prefix(cls) -> IdPrefix: # noqa: D102 + return StaticIdPrefix.DATAFLOW_NODE_JOIN_TO_CUSTOM_GRANULARITY_ID_PREFIX + + def accept(self, visitor: DataflowPlanNodeVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D102 + return visitor.visit_join_to_custom_granularity_node(self) + + @property + def description(self) -> str: # noqa: D102 + return """Join to Custom Granularity Dataset""" + + @property + def displayed_properties(self) -> Sequence[DisplayedProperty]: # noqa: D102 + return tuple(super().displayed_properties) + ( + DisplayedProperty("time_dimension_spec", self.time_dimension_spec), + ) + + @property + def parent_node(self) -> DataflowPlanNode: # noqa: D102 + return self.parent_nodes[0] + + def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa: D102 + return isinstance(other_node, self.__class__) and other_node.time_dimension_spec == self.time_dimension_spec + + def with_new_parents( # noqa: D102 + self, new_parent_nodes: Sequence[DataflowPlanNode] + ) -> JoinToCustomGranularityNode: + assert len(new_parent_nodes) == 1, "JoinToCustomGranularity accepts exactly one parent node." + return JoinToCustomGranularityNode.create( + parent_node=new_parent_nodes[0], time_dimension_spec=self.time_dimension_spec + )