Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb committed Dec 11, 2024
1 parent e7a2458 commit a3129fc
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 145 deletions.
1 change: 1 addition & 0 deletions metricflow-semantics/metricflow_semantics/dag/id_prefix.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class StaticIdPrefix(IdPrefix, Enum, metaclass=EnumMetaClassHelper):
DATAFLOW_NODE_JOIN_CONVERSION_EVENTS_PREFIX = "jce"
DATAFLOW_NODE_WINDOW_REAGGREGATION_ID_PREFIX = "wr"
DATAFLOW_NODE_ALIAS_SPECS_ID_PREFIX = "as"
DATAFLOW_NODE_CUSTOM_GRANULARITY_BOUNDS_ID_PREFIX = "cgb"

SQL_EXPR_COLUMN_REFERENCE_ID_PREFIX = "cr"
SQL_EXPR_COMPARISON_ID_PREFIX = "cmp"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -860,3 +860,24 @@ metric:
- name: instant_bookings
alias: shared_alias
---
metric:
name: bookings_offset_one_martian_day
description: bookings offset by one martian_day
type: derived
type_params:
expr: bookings
metrics:
- name: bookings
offset_window: 1 martian_day
---
metric:
name: bookings_martian_day_over_martian_day
description: bookings growth martian day over martian day
type: derived
type_params:
expr: bookings - bookings_offset / NULLIF(bookings_offset, 0)
metrics:
- name: bookings
offset_window: 1 martian_day
alias: bookings_offset
- name: bookings
52 changes: 27 additions & 25 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
from metricflow.dataflow.nodes.combine_aggregated_outputs import CombineAggregatedOutputsNode
from metricflow.dataflow.nodes.compute_metrics import ComputeMetricsNode
from metricflow.dataflow.nodes.constrain_time import ConstrainTimeRangeNode
from metricflow.dataflow.nodes.custom_granularity_bounds import CustomGranularityBoundsNode
from metricflow.dataflow.nodes.filter_elements import FilterElementsNode
from metricflow.dataflow.nodes.join_conversion_events import JoinConversionEventsNode
from metricflow.dataflow.nodes.join_over_time import JoinOverTimeRangeNode
Expand Down Expand Up @@ -1898,33 +1899,34 @@ def _build_time_spine_node(
# TODO: make sure this is checking the correct granularity type once DSI is updated
if {spec.time_granularity for spec in queried_time_spine_specs} == {offset_window.granularity}:
# If querying with only the same grain as is used in the offset_window, can use a simpler plan.
offset_node = OffsetCustomGranularityNode.create(
parent_node=time_spine_read_node, offset_window=offset_window
)
time_spine_node: DataflowPlanNode = JoinToTimeSpineNode.create(
parent_node=offset_node,
# TODO: need to make sure we apply both agg time and metric time
requested_agg_time_dimension_specs=queried_time_spine_specs,
time_spine_node=time_spine_read_node,
join_type=SqlJoinType.INNER,
join_on_time_dimension_spec=custom_grain_metric_time_spec,
)
# offset_node = OffsetCustomGranularityNode.create(
# parent_node=time_spine_read_node, offset_window=offset_window
# )
# time_spine_node: DataflowPlanNode = JoinToTimeSpineNode.create(
# parent_node=offset_node,
# # TODO: need to make sure we apply both agg time and metric time
# requested_agg_time_dimension_specs=queried_time_spine_specs,
# time_spine_node=time_spine_read_node,
# join_type=SqlJoinType.INNER,
# join_on_time_dimension_spec=custom_grain_metric_time_spec,
# )
pass
else:
bounds_node = CustomGranularityBoundsNode.create(
parent_node=time_spine_read_node, offset_window=offset_window
)
# need to add a property to these specs to indicate that they are offset or bounds or something
filtered_bounds_node = FilterElementsNode.create(
parent_node=bounds_node, include_specs=bounds_node.specs, distinct=True
time_spine_node: DataflowPlanNode = CustomGranularityBoundsNode.create(
parent_node=time_spine_read_node, custom_granularity_name=offset_window.granularity
)
offset_bounds_node = OffsetCustomGranularityBoundsNode.create(parent_node=filtered_bounds_node)
time_spine_node = OffsetByCustomGranularityNode(
parent_node=offset_bounds_node, offset_window=offset_window
)
if queried_standard_specs:
time_spine_node = ApplyStandardGranularityNode.create(
parent_node=time_spine_node, time_dimension_specs=queried_standard_specs
)
# # need to add a property to these specs to indicate that they are offset or bounds or something
# filtered_bounds_node = FilterElementsNode.create(
# parent_node=bounds_node, include_specs=bounds_node.specs, distinct=True
# )
# offset_bounds_node = OffsetCustomGranularityBoundsNode.create(parent_node=filtered_bounds_node)
# time_spine_node = OffsetByCustomGranularityNode(
# parent_node=offset_bounds_node, offset_window=offset_window
# )
# if queried_standard_specs:
# time_spine_node = ApplyStandardGranularityNode.create(
# parent_node=time_spine_node, time_dimension_specs=queried_standard_specs
# )
# TODO: check if this join is needed for the same grain as is used in offset window. Later
for custom_spec in queried_custom_specs:
time_spine_node = JoinToCustomGranularityNode.create(
Expand Down
65 changes: 65 additions & 0 deletions metricflow/dataflow/nodes/custom_granularity_bounds.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from __future__ import annotations

from abc import ABC
from dataclasses import dataclass
from typing import Sequence

from metricflow_semantics.dag.id_prefix import IdPrefix, StaticIdPrefix
from metricflow_semantics.dag.mf_dag import DisplayedProperty
from metricflow_semantics.visitor import VisitorOutputT

from metricflow.dataflow.dataflow_plan import DataflowPlanNode
from metricflow.dataflow.dataflow_plan_visitor import DataflowPlanNodeVisitor


@dataclass(frozen=True, eq=False)
class CustomGranularityBoundsNode(DataflowPlanNode, ABC):
"""Calculate the start and end of a custom granularity period and each row number within that period."""

custom_granularity_name: str

def __post_init__(self) -> None: # noqa: D105
super().__post_init__()
assert len(self.parent_nodes) == 1

@staticmethod
def create( # noqa: D102
parent_node: DataflowPlanNode, custom_granularity_name: str
) -> CustomGranularityBoundsNode:
return CustomGranularityBoundsNode(parent_nodes=(parent_node,), custom_granularity_name=custom_granularity_name)

@classmethod
def id_prefix(cls) -> IdPrefix: # noqa: D102
return StaticIdPrefix.DATAFLOW_NODE_CUSTOM_GRANULARITY_BOUNDS_ID_PREFIX

def accept(self, visitor: DataflowPlanNodeVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D102
# Type checking not working here
return visitor.visit_custom_granularity_bounds_node(self)

@property
def description(self) -> str: # noqa: D102
return """Calculate Custom Granularity Bounds"""

@property
def displayed_properties(self) -> Sequence[DisplayedProperty]: # noqa: D102
return tuple(super().displayed_properties) + (
DisplayedProperty("custom_granularity_name", self.custom_granularity_name),
)

@property
def parent_node(self) -> DataflowPlanNode: # noqa: D102
return self.parent_nodes[0]

def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa: D102
return (
isinstance(other_node, self.__class__)
and other_node.custom_granularity_name == self.custom_granularity_name
)

def with_new_parents( # noqa: D102
self, new_parent_nodes: Sequence[DataflowPlanNode]
) -> CustomGranularityBoundsNode:
assert len(new_parent_nodes) == 1
return CustomGranularityBoundsNode.create(
parent_node=new_parent_nodes[0], custom_granularity_name=self.custom_granularity_name
)
117 changes: 0 additions & 117 deletions metricflow/dataflow/nodes/offset_by_custom_granularity.py

This file was deleted.

6 changes: 3 additions & 3 deletions metricflow/plan_conversion/dataflow_to_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1208,9 +1208,9 @@ def visit_metric_time_dimension_transform_node(self, node: MetricTimeDimensionTr
spec=metric_time_dimension_spec,
)
)
output_column_to_input_column[metric_time_dimension_column_association.column_name] = (
matching_time_dimension_instance.associated_column.column_name
)
output_column_to_input_column[
metric_time_dimension_column_association.column_name
] = matching_time_dimension_instance.associated_column.column_name

output_instance_set = InstanceSet(
measure_instances=tuple(output_measure_instances),
Expand Down
24 changes: 24 additions & 0 deletions tests_metricflow/query_rendering/test_custom_granularity.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,3 +610,27 @@ def test_join_to_timespine_metric_with_custom_granularity_filter_not_in_group_by
dataflow_plan_builder=dataflow_plan_builder,
query_spec=query_spec,
)


@pytest.mark.sql_engine_snapshot
def test_custom_offset_window( # noqa: D103
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
sql_client: SqlClient,
query_parser: MetricFlowQueryParser,
) -> None:
query_spec = query_parser.parse_and_validate_query(
metric_names=("bookings_offset_one_martian_day",),
group_by_names=("metric_time__day",),
).query_spec

render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=query_spec,
)

0 comments on commit a3129fc

Please sign in to comment.