Skip to content

Commit

Permalink
Test mypy
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb committed Nov 22, 2024
1 parent 52f9322 commit 91cefa7
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 3 deletions.
6 changes: 3 additions & 3 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
from metricflow.dataflow.nodes.order_by_limit import OrderByLimitNode
from metricflow.dataflow.nodes.read_sql_source import ReadSqlSourceNode
from metricflow.dataflow.nodes.semi_additive_join import SemiAdditiveJoinNode
from metricflow.dataflow.nodes.transform_time_dimensions import TransformTimeDimensionsNode
from metricflow.dataflow.nodes.where_filter import WhereConstraintNode
from metricflow.dataflow.nodes.window_reaggregation_node import WindowReaggregationNode
from metricflow.dataflow.nodes.write_to_data_table import WriteToResultDataTableNode
Expand Down Expand Up @@ -1839,7 +1840,6 @@ def _choose_time_spine_read_node(self, required_time_spine_specs: Sequence[TimeD
def _build_time_spine_node(self, required_time_spine_specs: Sequence[TimeDimensionSpec]) -> DataflowPlanNode:
"""Return the time spine node needed to satisfy the specs."""
original_time_spine_node = self._choose_time_spine_read_node(required_time_spine_specs)
# TODO: build this node. Transform columns to the requested ones
return TransformTimeDimensionsNode(
parent_node=original_time_spine_node, required_time_spine_specs=required_time_spine_specs
return TransformTimeDimensionsNode.create(
parent_node=original_time_spine_node, requested_time_dimension_specs=required_time_spine_specs
)
76 changes: 76 additions & 0 deletions metricflow/dataflow/nodes/transform_time_dimensions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from __future__ import annotations

from abc import ABC
from dataclasses import dataclass
from typing import Sequence

from metricflow_semantics.dag.id_prefix import IdPrefix, StaticIdPrefix
from metricflow_semantics.dag.mf_dag import DisplayedProperty
from metricflow_semantics.specs.time_dimension_spec import TimeDimensionSpec
from metricflow_semantics.visitor import VisitorOutputT

fiiiiir
from metricflow.dataflow.dataflow_plan import DataflowPlanNode
from metricflow.dataflow.dataflow_plan_visitor import DataflowPlanNodeVisitor


@dataclass(frozen=True, eq=False)
class TransformTimeDimensionsNode(DataflowPlanNode, ABC):
"""Change the columns in the parent node to match the requested time dimension specs.
Args:
requested_time_dimension_specs: The time dimension specs to match in the parent node and transform.
"""

requested_time_dimension_specs: Sequence[TimeDimensionSpec]

def __post_init__(self) -> None: # noqa: D105
super().__post_init__()
assert (
len(self.requested_time_dimension_specs) > 0
), "Must have at least one value in requested_time_dimension_specs for TransformTimeDimensionsNode."

@staticmethod
def create( # noqa: D102
parent_node: DataflowPlanNode, requested_time_dimension_specs: Sequence[TimeDimensionSpec]
) -> TransformTimeDimensionsNode:
return TransformTimeDimensionsNode(
parent_nodes=(parent_node,), requested_time_dimension_specs=requested_time_dimension_specs
)

@classmethod
def id_prefix(cls) -> IdPrefix: # noqa: D102
return StaticIdPrefix.DATAFLOW_NODE_JOIN_TO_CUSTOM_GRANULARITY_ID_PREFIX

def accept(self, visitor: DataflowPlanNodeVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D102
sup
return visitor.visit_transform_time_dimensions_node(self)

@property
def description(self) -> str: # noqa: D102
return """Transform Time Dimension Columns"""

@property
def displayed_properties(self) -> Sequence[DisplayedProperty]: # noqa: D102
return tuple(super().displayed_properties) + (
DisplayedProperty("requested_time_dimension_specs", self.requested_time_dimension_specs),
)

@property
def parent_node(self) -> DataflowPlanNode: # noqa: D102
return self.parent_nodes[0]

def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa: D102
return (
isinstance(other_node, self.__class__)
and other_node.requested_time_dimension_specs == self.requested_time_dimension_specs
)

def with_new_parents( # noqa: D102
self, new_parent_nodes: Sequence[DataflowPlanNode]
) -> TransformTimeDimensionsNode:
assert len(new_parent_nodes) == 1, "TransformTimeDimensionsNode accepts exactly one parent node."
return TransformTimeDimensionsNode.create(
parent_node=new_parent_nodes[0],
requested_time_dimension_specs=self.requested_time_dimension_specs,
)

0 comments on commit 91cefa7

Please sign in to comment.