diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index ccf1ea271..804b491a1 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -92,6 +92,7 @@ from metricflow.dataflow.nodes.order_by_limit import OrderByLimitNode from metricflow.dataflow.nodes.read_sql_source import ReadSqlSourceNode from metricflow.dataflow.nodes.semi_additive_join import SemiAdditiveJoinNode +from metricflow.dataflow.nodes.transform_time_dimensions import TransformTimeDimensionsNode from metricflow.dataflow.nodes.where_filter import WhereConstraintNode from metricflow.dataflow.nodes.window_reaggregation_node import WindowReaggregationNode from metricflow.dataflow.nodes.write_to_data_table import WriteToResultDataTableNode @@ -1839,7 +1840,6 @@ def _choose_time_spine_read_node(self, required_time_spine_specs: Sequence[TimeD def _build_time_spine_node(self, required_time_spine_specs: Sequence[TimeDimensionSpec]) -> DataflowPlanNode: """Return the time spine node needed to satisfy the specs.""" original_time_spine_node = self._choose_time_spine_read_node(required_time_spine_specs) - # TODO: build this node. Transform columns to the requested ones - return TransformTimeDimensionsNode( - parent_node=original_time_spine_node, required_time_spine_specs=required_time_spine_specs + return TransformTimeDimensionsNode.create( + parent_node=original_time_spine_node, requested_time_dimension_specs=required_time_spine_specs ) diff --git a/metricflow/dataflow/nodes/transform_time_dimensions.py b/metricflow/dataflow/nodes/transform_time_dimensions.py new file mode 100644 index 000000000..be836b86f --- /dev/null +++ b/metricflow/dataflow/nodes/transform_time_dimensions.py @@ -0,0 +1,76 @@ +from __future__ import annotations + +from abc import ABC +from dataclasses import dataclass +from typing import Sequence + +from metricflow_semantics.dag.id_prefix import IdPrefix, StaticIdPrefix +from metricflow_semantics.dag.mf_dag import DisplayedProperty +from metricflow_semantics.specs.time_dimension_spec import TimeDimensionSpec +from metricflow_semantics.visitor import VisitorOutputT + +fiiiiir +from metricflow.dataflow.dataflow_plan import DataflowPlanNode +from metricflow.dataflow.dataflow_plan_visitor import DataflowPlanNodeVisitor + + +@dataclass(frozen=True, eq=False) +class TransformTimeDimensionsNode(DataflowPlanNode, ABC): + """Change the columns in the parent node to match the requested time dimension specs. + + Args: + requested_time_dimension_specs: The time dimension specs to match in the parent node and transform. + """ + + requested_time_dimension_specs: Sequence[TimeDimensionSpec] + + def __post_init__(self) -> None: # noqa: D105 + super().__post_init__() + assert ( + len(self.requested_time_dimension_specs) > 0 + ), "Must have at least one value in requested_time_dimension_specs for TransformTimeDimensionsNode." + + @staticmethod + def create( # noqa: D102 + parent_node: DataflowPlanNode, requested_time_dimension_specs: Sequence[TimeDimensionSpec] + ) -> TransformTimeDimensionsNode: + return TransformTimeDimensionsNode( + parent_nodes=(parent_node,), requested_time_dimension_specs=requested_time_dimension_specs + ) + + @classmethod + def id_prefix(cls) -> IdPrefix: # noqa: D102 + return StaticIdPrefix.DATAFLOW_NODE_JOIN_TO_CUSTOM_GRANULARITY_ID_PREFIX + + def accept(self, visitor: DataflowPlanNodeVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D102 + sup + return visitor.visit_transform_time_dimensions_node(self) + + @property + def description(self) -> str: # noqa: D102 + return """Transform Time Dimension Columns""" + + @property + def displayed_properties(self) -> Sequence[DisplayedProperty]: # noqa: D102 + return tuple(super().displayed_properties) + ( + DisplayedProperty("requested_time_dimension_specs", self.requested_time_dimension_specs), + ) + + @property + def parent_node(self) -> DataflowPlanNode: # noqa: D102 + return self.parent_nodes[0] + + def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa: D102 + return ( + isinstance(other_node, self.__class__) + and other_node.requested_time_dimension_specs == self.requested_time_dimension_specs + ) + + def with_new_parents( # noqa: D102 + self, new_parent_nodes: Sequence[DataflowPlanNode] + ) -> TransformTimeDimensionsNode: + assert len(new_parent_nodes) == 1, "TransformTimeDimensionsNode accepts exactly one parent node." + return TransformTimeDimensionsNode.create( + parent_node=new_parent_nodes[0], + requested_time_dimension_specs=self.requested_time_dimension_specs, + )