From a3129fc2680bd5beaa55c66ac357856ae7759e05 Mon Sep 17 00:00:00 2001 From: Courtney Holcomb Date: Mon, 9 Dec 2024 16:53:05 -0800 Subject: [PATCH] WIP --- .../metricflow_semantics/dag/id_prefix.py | 1 + .../simple_manifest/metrics.yaml | 21 ++++ .../dataflow/builder/dataflow_plan_builder.py | 52 ++++---- .../nodes/custom_granularity_bounds.py | 65 ++++++++++ .../nodes/offset_by_custom_granularity.py | 117 ------------------ metricflow/plan_conversion/dataflow_to_sql.py | 6 +- .../test_custom_granularity.py | 24 ++++ 7 files changed, 141 insertions(+), 145 deletions(-) create mode 100644 metricflow/dataflow/nodes/custom_granularity_bounds.py delete mode 100644 metricflow/dataflow/nodes/offset_by_custom_granularity.py diff --git a/metricflow-semantics/metricflow_semantics/dag/id_prefix.py b/metricflow-semantics/metricflow_semantics/dag/id_prefix.py index a9ae0df0e..8e497385c 100644 --- a/metricflow-semantics/metricflow_semantics/dag/id_prefix.py +++ b/metricflow-semantics/metricflow_semantics/dag/id_prefix.py @@ -56,6 +56,7 @@ class StaticIdPrefix(IdPrefix, Enum, metaclass=EnumMetaClassHelper): DATAFLOW_NODE_JOIN_CONVERSION_EVENTS_PREFIX = "jce" DATAFLOW_NODE_WINDOW_REAGGREGATION_ID_PREFIX = "wr" DATAFLOW_NODE_ALIAS_SPECS_ID_PREFIX = "as" + DATAFLOW_NODE_CUSTOM_GRANULARITY_BOUNDS_ID_PREFIX = "cgb" SQL_EXPR_COLUMN_REFERENCE_ID_PREFIX = "cr" SQL_EXPR_COMPARISON_ID_PREFIX = "cmp" diff --git a/metricflow-semantics/metricflow_semantics/test_helpers/semantic_manifest_yamls/simple_manifest/metrics.yaml b/metricflow-semantics/metricflow_semantics/test_helpers/semantic_manifest_yamls/simple_manifest/metrics.yaml index 7e34bebef..4cdad77e5 100644 --- a/metricflow-semantics/metricflow_semantics/test_helpers/semantic_manifest_yamls/simple_manifest/metrics.yaml +++ b/metricflow-semantics/metricflow_semantics/test_helpers/semantic_manifest_yamls/simple_manifest/metrics.yaml @@ -860,3 +860,24 @@ metric: - name: instant_bookings alias: shared_alias --- +metric: + name: bookings_offset_one_martian_day + description: bookings offset by one martian_day + type: derived + type_params: + expr: bookings + metrics: + - name: bookings + offset_window: 1 martian_day +--- +metric: + name: bookings_martian_day_over_martian_day + description: bookings growth martian day over martian day + type: derived + type_params: + expr: bookings - bookings_offset / NULLIF(bookings_offset, 0) + metrics: + - name: bookings + offset_window: 1 martian_day + alias: bookings_offset + - name: bookings diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index e8c4124f4..ace24e467 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -84,6 +84,7 @@ from metricflow.dataflow.nodes.combine_aggregated_outputs import CombineAggregatedOutputsNode from metricflow.dataflow.nodes.compute_metrics import ComputeMetricsNode from metricflow.dataflow.nodes.constrain_time import ConstrainTimeRangeNode +from metricflow.dataflow.nodes.custom_granularity_bounds import CustomGranularityBoundsNode from metricflow.dataflow.nodes.filter_elements import FilterElementsNode from metricflow.dataflow.nodes.join_conversion_events import JoinConversionEventsNode from metricflow.dataflow.nodes.join_over_time import JoinOverTimeRangeNode @@ -1898,33 +1899,34 @@ def _build_time_spine_node( # TODO: make sure this is checking the correct granularity type once DSI is updated if {spec.time_granularity for spec in queried_time_spine_specs} == {offset_window.granularity}: # If querying with only the same grain as is used in the offset_window, can use a simpler plan. - offset_node = OffsetCustomGranularityNode.create( - parent_node=time_spine_read_node, offset_window=offset_window - ) - time_spine_node: DataflowPlanNode = JoinToTimeSpineNode.create( - parent_node=offset_node, - # TODO: need to make sure we apply both agg time and metric time - requested_agg_time_dimension_specs=queried_time_spine_specs, - time_spine_node=time_spine_read_node, - join_type=SqlJoinType.INNER, - join_on_time_dimension_spec=custom_grain_metric_time_spec, - ) + # offset_node = OffsetCustomGranularityNode.create( + # parent_node=time_spine_read_node, offset_window=offset_window + # ) + # time_spine_node: DataflowPlanNode = JoinToTimeSpineNode.create( + # parent_node=offset_node, + # # TODO: need to make sure we apply both agg time and metric time + # requested_agg_time_dimension_specs=queried_time_spine_specs, + # time_spine_node=time_spine_read_node, + # join_type=SqlJoinType.INNER, + # join_on_time_dimension_spec=custom_grain_metric_time_spec, + # ) + pass else: - bounds_node = CustomGranularityBoundsNode.create( - parent_node=time_spine_read_node, offset_window=offset_window - ) - # need to add a property to these specs to indicate that they are offset or bounds or something - filtered_bounds_node = FilterElementsNode.create( - parent_node=bounds_node, include_specs=bounds_node.specs, distinct=True + time_spine_node: DataflowPlanNode = CustomGranularityBoundsNode.create( + parent_node=time_spine_read_node, custom_granularity_name=offset_window.granularity ) - offset_bounds_node = OffsetCustomGranularityBoundsNode.create(parent_node=filtered_bounds_node) - time_spine_node = OffsetByCustomGranularityNode( - parent_node=offset_bounds_node, offset_window=offset_window - ) - if queried_standard_specs: - time_spine_node = ApplyStandardGranularityNode.create( - parent_node=time_spine_node, time_dimension_specs=queried_standard_specs - ) + # # need to add a property to these specs to indicate that they are offset or bounds or something + # filtered_bounds_node = FilterElementsNode.create( + # parent_node=bounds_node, include_specs=bounds_node.specs, distinct=True + # ) + # offset_bounds_node = OffsetCustomGranularityBoundsNode.create(parent_node=filtered_bounds_node) + # time_spine_node = OffsetByCustomGranularityNode( + # parent_node=offset_bounds_node, offset_window=offset_window + # ) + # if queried_standard_specs: + # time_spine_node = ApplyStandardGranularityNode.create( + # parent_node=time_spine_node, time_dimension_specs=queried_standard_specs + # ) # TODO: check if this join is needed for the same grain as is used in offset window. Later for custom_spec in queried_custom_specs: time_spine_node = JoinToCustomGranularityNode.create( diff --git a/metricflow/dataflow/nodes/custom_granularity_bounds.py b/metricflow/dataflow/nodes/custom_granularity_bounds.py new file mode 100644 index 000000000..03529d35e --- /dev/null +++ b/metricflow/dataflow/nodes/custom_granularity_bounds.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +from abc import ABC +from dataclasses import dataclass +from typing import Sequence + +from metricflow_semantics.dag.id_prefix import IdPrefix, StaticIdPrefix +from metricflow_semantics.dag.mf_dag import DisplayedProperty +from metricflow_semantics.visitor import VisitorOutputT + +from metricflow.dataflow.dataflow_plan import DataflowPlanNode +from metricflow.dataflow.dataflow_plan_visitor import DataflowPlanNodeVisitor + + +@dataclass(frozen=True, eq=False) +class CustomGranularityBoundsNode(DataflowPlanNode, ABC): + """Calculate the start and end of a custom granularity period and each row number within that period.""" + + custom_granularity_name: str + + def __post_init__(self) -> None: # noqa: D105 + super().__post_init__() + assert len(self.parent_nodes) == 1 + + @staticmethod + def create( # noqa: D102 + parent_node: DataflowPlanNode, custom_granularity_name: str + ) -> CustomGranularityBoundsNode: + return CustomGranularityBoundsNode(parent_nodes=(parent_node,), custom_granularity_name=custom_granularity_name) + + @classmethod + def id_prefix(cls) -> IdPrefix: # noqa: D102 + return StaticIdPrefix.DATAFLOW_NODE_CUSTOM_GRANULARITY_BOUNDS_ID_PREFIX + + def accept(self, visitor: DataflowPlanNodeVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D102 + # Type checking not working here + return visitor.visit_custom_granularity_bounds_node(self) + + @property + def description(self) -> str: # noqa: D102 + return """Calculate Custom Granularity Bounds""" + + @property + def displayed_properties(self) -> Sequence[DisplayedProperty]: # noqa: D102 + return tuple(super().displayed_properties) + ( + DisplayedProperty("custom_granularity_name", self.custom_granularity_name), + ) + + @property + def parent_node(self) -> DataflowPlanNode: # noqa: D102 + return self.parent_nodes[0] + + def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa: D102 + return ( + isinstance(other_node, self.__class__) + and other_node.custom_granularity_name == self.custom_granularity_name + ) + + def with_new_parents( # noqa: D102 + self, new_parent_nodes: Sequence[DataflowPlanNode] + ) -> CustomGranularityBoundsNode: + assert len(new_parent_nodes) == 1 + return CustomGranularityBoundsNode.create( + parent_node=new_parent_nodes[0], custom_granularity_name=self.custom_granularity_name + ) diff --git a/metricflow/dataflow/nodes/offset_by_custom_granularity.py b/metricflow/dataflow/nodes/offset_by_custom_granularity.py deleted file mode 100644 index e5d66e286..000000000 --- a/metricflow/dataflow/nodes/offset_by_custom_granularity.py +++ /dev/null @@ -1,117 +0,0 @@ -from __future__ import annotations - -from abc import ABC -from dataclasses import dataclass -from typing import Optional, Sequence - -from dbt_semantic_interfaces.protocols import MetricTimeWindow -from dbt_semantic_interfaces.type_enums import TimeGranularity -from metricflow_semantics.dag.id_prefix import IdPrefix, StaticIdPrefix -from metricflow_semantics.dag.mf_dag import DisplayedProperty -from metricflow_semantics.specs.time_dimension_spec import TimeDimensionSpec -from metricflow_semantics.sql.sql_join_type import SqlJoinType -from metricflow_semantics.visitor import VisitorOutputT - -from metricflow.dataflow.dataflow_plan import DataflowPlanNode -from metricflow.dataflow.dataflow_plan_visitor import DataflowPlanNodeVisitor - - -@dataclass(frozen=True, eq=False) -class OffsetByCustomGranularityNode(DataflowPlanNode, ABC): - """Offset. - - Attributes: - offset_window: Time dimensions requested in the query. - join_type: Join type to use when joining to time spine. - join_on_time_dimension_spec: The time dimension to use in the join ON condition. - offset_window: Time window to offset the parent dataset by when joining to time spine. - offset_to_grain: Granularity period to offset the parent dataset to when joining to time spine. - """ - - time_spine_node: DataflowPlanNode - requested_agg_time_dimension_specs: Sequence[TimeDimensionSpec] - join_on_time_dimension_spec: TimeDimensionSpec - join_type: SqlJoinType - offset_window: Optional[MetricTimeWindow] - offset_to_grain: Optional[TimeGranularity] - - def __post_init__(self) -> None: # noqa: D105 - super().__post_init__() - assert len(self.parent_nodes) == 1 - - assert not ( - self.offset_window and self.offset_to_grain - ), "Can't set both offset_window and offset_to_grain when joining to time spine. Choose one or the other." - assert ( - len(self.requested_agg_time_dimension_specs) > 0 - ), "Must have at least one value in requested_agg_time_dimension_specs for JoinToTimeSpineNode." - - @staticmethod - def create( # noqa: D102 - parent_node: DataflowPlanNode, - time_spine_node: DataflowPlanNode, - requested_agg_time_dimension_specs: Sequence[TimeDimensionSpec], - join_on_time_dimension_spec: TimeDimensionSpec, - join_type: SqlJoinType, - offset_window: Optional[MetricTimeWindow] = None, - offset_to_grain: Optional[TimeGranularity] = None, - ) -> JoinToTimeSpineNode: - return JoinToTimeSpineNode( - parent_nodes=(parent_node,), - time_spine_node=time_spine_node, - requested_agg_time_dimension_specs=tuple(requested_agg_time_dimension_specs), - join_on_time_dimension_spec=join_on_time_dimension_spec, - join_type=join_type, - offset_window=offset_window, - offset_to_grain=offset_to_grain, - ) - - @classmethod - def id_prefix(cls) -> IdPrefix: # noqa: D102 - return StaticIdPrefix.DATAFLOW_NODE_JOIN_TO_TIME_SPINE_ID_PREFIX - - def accept(self, visitor: DataflowPlanNodeVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D102 - return visitor.visit_join_to_time_spine_node(self) - - @property - def description(self) -> str: # noqa: D102 - return """Join to Time Spine Dataset""" - - @property - def displayed_properties(self) -> Sequence[DisplayedProperty]: # noqa: D102 - props = tuple(super().displayed_properties) + ( - DisplayedProperty("requested_agg_time_dimension_specs", self.requested_agg_time_dimension_specs), - DisplayedProperty("join_on_time_dimension_spec", self.join_on_time_dimension_spec), - DisplayedProperty("join_type", self.join_type), - ) - if self.offset_window: - props += (DisplayedProperty("offset_window", self.offset_window),) - if self.offset_to_grain: - props += (DisplayedProperty("offset_to_grain", self.offset_to_grain),) - return props - - @property - def parent_node(self) -> DataflowPlanNode: # noqa: D102 - return self.parent_nodes[0] - - def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa: D102 - return ( - isinstance(other_node, self.__class__) - and other_node.offset_window == self.offset_window - and other_node.offset_to_grain == self.offset_to_grain - and other_node.requested_agg_time_dimension_specs == self.requested_agg_time_dimension_specs - and other_node.join_on_time_dimension_spec == self.join_on_time_dimension_spec - and other_node.join_type == self.join_type - ) - - def with_new_parents(self, new_parent_nodes: Sequence[DataflowPlanNode]) -> JoinToTimeSpineNode: # noqa: D102 - assert len(new_parent_nodes) == 1 - return JoinToTimeSpineNode.create( - parent_node=new_parent_nodes[0], - time_spine_node=self.time_spine_node, - requested_agg_time_dimension_specs=self.requested_agg_time_dimension_specs, - offset_window=self.offset_window, - offset_to_grain=self.offset_to_grain, - join_type=self.join_type, - join_on_time_dimension_spec=self.join_on_time_dimension_spec, - ) diff --git a/metricflow/plan_conversion/dataflow_to_sql.py b/metricflow/plan_conversion/dataflow_to_sql.py index 60f245360..ee2571deb 100644 --- a/metricflow/plan_conversion/dataflow_to_sql.py +++ b/metricflow/plan_conversion/dataflow_to_sql.py @@ -1208,9 +1208,9 @@ def visit_metric_time_dimension_transform_node(self, node: MetricTimeDimensionTr spec=metric_time_dimension_spec, ) ) - output_column_to_input_column[metric_time_dimension_column_association.column_name] = ( - matching_time_dimension_instance.associated_column.column_name - ) + output_column_to_input_column[ + metric_time_dimension_column_association.column_name + ] = matching_time_dimension_instance.associated_column.column_name output_instance_set = InstanceSet( measure_instances=tuple(output_measure_instances), diff --git a/tests_metricflow/query_rendering/test_custom_granularity.py b/tests_metricflow/query_rendering/test_custom_granularity.py index 4043c7b97..a87cbc620 100644 --- a/tests_metricflow/query_rendering/test_custom_granularity.py +++ b/tests_metricflow/query_rendering/test_custom_granularity.py @@ -610,3 +610,27 @@ def test_join_to_timespine_metric_with_custom_granularity_filter_not_in_group_by dataflow_plan_builder=dataflow_plan_builder, query_spec=query_spec, ) + + +@pytest.mark.sql_engine_snapshot +def test_custom_offset_window( # noqa: D103 + request: FixtureRequest, + mf_test_configuration: MetricFlowTestConfiguration, + dataflow_plan_builder: DataflowPlanBuilder, + dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter, + sql_client: SqlClient, + query_parser: MetricFlowQueryParser, +) -> None: + query_spec = query_parser.parse_and_validate_query( + metric_names=("bookings_offset_one_martian_day",), + group_by_names=("metric_time__day",), + ).query_spec + + render_and_check( + request=request, + mf_test_configuration=mf_test_configuration, + dataflow_to_sql_converter=dataflow_to_sql_converter, + sql_client=sql_client, + dataflow_plan_builder=dataflow_plan_builder, + query_spec=query_spec, + )