Skip to content

Commit

Permalink
Add dataflow plan node JoinToCustomGranularityNode
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb committed Sep 5, 2024
1 parent 4af0e56 commit 2aaf239
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 0 deletions.
1 change: 1 addition & 0 deletions metricflow-semantics/metricflow_semantics/dag/id_prefix.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class StaticIdPrefix(IdPrefix, Enum, metaclass=EnumMetaClassHelper):
DATAFLOW_NODE_SET_MEASURE_AGGREGATION_TIME = "sma"
DATAFLOW_NODE_SEMI_ADDITIVE_JOIN_ID_PREFIX = "saj"
DATAFLOW_NODE_JOIN_TO_TIME_SPINE_ID_PREFIX = "jts"
DATAFLOW_NODE_JOIN_TO_CUSTOM_GRANULARITY_ID_PREFIX = "jcg"
DATAFLOW_NODE_MIN_MAX_ID_PREFIX = "mm"
DATAFLOW_NODE_ADD_UUID_COLUMN_PREFIX = "auid"
DATAFLOW_NODE_JOIN_CONVERSION_EVENTS_PREFIX = "jce"
Expand Down
57 changes: 57 additions & 0 deletions metricflow/dataflow/nodes/join_to_custom_granularity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from __future__ import annotations

from abc import ABC
from dataclasses import dataclass
from typing import Sequence

from metricflow_semantics.dag.id_prefix import IdPrefix, StaticIdPrefix
from metricflow_semantics.dag.mf_dag import DisplayedProperty
from metricflow_semantics.specs.time_dimension_spec import TimeDimensionSpec
from metricflow_semantics.visitor import VisitorOutputT

from metricflow.dataflow.dataflow_plan import DataflowPlanNode, DataflowPlanNodeVisitor


@dataclass(frozen=True)
class JoinToCustomGranularityNode(DataflowPlanNode, ABC):
"""Join parent dataset to time spine dataset to convert time dimension to a custom granularity."""

time_dimension_spec: TimeDimensionSpec

@staticmethod
def create( # noqa: D102
parent_node: DataflowPlanNode, time_dimension_spec: TimeDimensionSpec
) -> JoinToCustomGranularityNode:
return JoinToCustomGranularityNode(parent_nodes=(parent_node,), time_dimension_spec=time_dimension_spec)

@classmethod
def id_prefix(cls) -> IdPrefix: # noqa: D102
return StaticIdPrefix.DATAFLOW_NODE_JOIN_TO_CUSTOM_GRANULARITY_ID_PREFIX

def accept(self, visitor: DataflowPlanNodeVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D102
return visitor.visit_join_to_custom_granularity_node(self)

@property
def description(self) -> str: # noqa: D102
return """Join to Custom Granularity Dataset"""

@property
def displayed_properties(self) -> Sequence[DisplayedProperty]: # noqa: D102
return tuple(super().displayed_properties) + (
DisplayedProperty("time_dimension_spec", self.time_dimension_spec),
)

@property
def parent_node(self) -> DataflowPlanNode: # noqa: D102
return self.parent_nodes[0]

def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa: D102
return isinstance(other_node, self.__class__) and other_node.time_dimension_spec == self.time_dimension_spec

def with_new_parents( # noqa: D102
self, new_parent_nodes: Sequence[DataflowPlanNode]
) -> JoinToCustomGranularityNode:
assert len(new_parent_nodes) == 1, "JoinToCustomGranularity accepts exactly one parent node."
return JoinToCustomGranularityNode.create(
parent_node=new_parent_nodes[0], time_dimension_spec=self.time_dimension_spec
)

0 comments on commit 2aaf239

Please sign in to comment.