From b05575798bf5b3ad4f75cf50be16d7ec75806511 Mon Sep 17 00:00:00 2001 From: Courtney Holcomb Date: Wed, 21 Aug 2024 17:25:24 -0700 Subject: [PATCH] WIP - incorporate JoinToCustomGranularityNode in to DataflowPlanBuilder --- .../dataflow/builder/dataflow_plan_builder.py | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index 9538f1c87f..f92c5b9813 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -74,6 +74,7 @@ from metricflow.dataflow.nodes.join_conversion_events import JoinConversionEventsNode from metricflow.dataflow.nodes.join_over_time import JoinOverTimeRangeNode from metricflow.dataflow.nodes.join_to_base import JoinDescription, JoinOnEntitiesNode +from metricflow.dataflow.nodes.join_to_custom_granularity import JoinToCustomGranularityNode from metricflow.dataflow.nodes.join_to_time_spine import JoinToTimeSpineNode from metricflow.dataflow.nodes.min_max import MinMaxNode from metricflow.dataflow.nodes.order_by_limit import OrderByLimitNode @@ -1517,6 +1518,14 @@ def _build_aggregated_measure_from_measure_source_node( else: unaggregated_measure_node = filtered_measure_source_node + # TODO: duplicate this logic for no-metric queries + for time_dimension_spec in queried_linkable_specs.time_dimension_specs: + # TODO: Add conditional when avail - if the time dimension spec has a custom granularity + unaggregated_measure_node = JoinToCustomGranularityNode.create( + parent_node=unaggregated_measure_node, + time_dimension_spec=time_dimension_spec, + ) + # If time constraint was previously adjusted for cumulative window or grain, apply original time constraint # here. Can skip if metric is being aggregated over all time. cumulative_metric_constrained_node: Optional[ConstrainTimeRangeNode] = None @@ -1543,12 +1552,12 @@ def _build_aggregated_measure_from_measure_source_node( if non_additive_dimension_spec is not None: # Apply semi additive join on the node agg_time_dimension = measure_properties.agg_time_dimension - queried_time_dimension_spec: Optional[ - TimeDimensionSpec - ] = self._find_non_additive_dimension_in_linkable_specs( - agg_time_dimension=agg_time_dimension, - linkable_specs=queried_linkable_specs.as_tuple, - non_additive_dimension_spec=non_additive_dimension_spec, + queried_time_dimension_spec: Optional[TimeDimensionSpec] = ( + self._find_non_additive_dimension_in_linkable_specs( + agg_time_dimension=agg_time_dimension, + linkable_specs=queried_linkable_specs.as_tuple, + non_additive_dimension_spec=non_additive_dimension_spec, + ) ) time_dimension_spec = TimeDimensionSpec.from_name(non_additive_dimension_spec.name) window_groupings = tuple(