From a3c65db0e5119b92224a18bc107251a83e95de91 Mon Sep 17 00:00:00 2001 From: Courtney Holcomb Date: Tue, 5 Nov 2024 22:04:44 -0800 Subject: [PATCH] Align Dataflow Plans - why did snapshots change? --- .../dataflow/builder/dataflow_plan_builder.py | 144 +++++++++--------- ...dimensions_with_time_constraint__dfp_0.xml | 8 +- ..._distinct_values_plan_with_join__dfp_0.xml | 8 +- ...tric_time_with_other_dimensions__dfp_0.xml | 14 +- ...rsion_metric_predicate_pushdown__dfp_0.xml | 10 +- ...sion_metric_predicate_pushdown__dfpo_0.xml | 18 +-- 6 files changed, 100 insertions(+), 102 deletions(-) diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index 1d5f08619d..b326b4b003 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -84,7 +84,7 @@ from metricflow.dataflow.nodes.filter_elements import FilterElementsNode from metricflow.dataflow.nodes.join_conversion_events import JoinConversionEventsNode from metricflow.dataflow.nodes.join_over_time import JoinOverTimeRangeNode -from metricflow.dataflow.nodes.join_to_base import JoinOnEntitiesNode +from metricflow.dataflow.nodes.join_to_base import JoinDescription, JoinOnEntitiesNode from metricflow.dataflow.nodes.join_to_custom_granularity import JoinToCustomGranularityNode from metricflow.dataflow.nodes.join_to_time_spine import JoinToTimeSpineNode from metricflow.dataflow.nodes.min_max import MinMaxNode @@ -327,26 +327,15 @@ def _build_aggregated_conversion_node( ) # Build the unaggregated base measure node for computing conversions - unaggregated_base_measure_node = base_measure_recipe.source_node - if base_measure_recipe.join_targets: - unaggregated_base_measure_node = JoinOnEntitiesNode.create( - left_node=unaggregated_base_measure_node, join_targets=base_measure_recipe.join_targets - ) - for time_dimension_spec in base_required_linkable_specs.time_dimension_specs: - if time_dimension_spec.time_granularity.is_custom_granularity: - unaggregated_base_measure_node = JoinToCustomGranularityNode.create( - parent_node=unaggregated_base_measure_node, time_dimension_spec=time_dimension_spec - ) - if len(base_measure_spec.filter_spec_set.all_filter_specs) > 0: - unaggregated_base_measure_node = WhereConstraintNode.create( - parent_node=unaggregated_base_measure_node, - where_specs=base_measure_spec.filter_spec_set.all_filter_specs, - ) - filtered_unaggregated_base_node = FilterElementsNode.create( - parent_node=unaggregated_base_measure_node, - include_specs=group_specs_by_type(required_local_specs) + unaggregated_base_measure_node = self._build_pre_aggregation_plan( + source_node=base_measure_recipe.source_node, + join_targets=base_measure_recipe.join_targets, + filter_to_specs=group_specs_by_type(required_local_specs) .merge(base_required_linkable_specs.as_instance_spec_set) .dedupe(), + custom_granularity_specs=base_required_linkable_specs.time_dimension_specs_with_custom_grain, + time_range_constraint=None, + where_filter_specs=base_measure_spec.filter_spec_set.all_filter_specs, ) # Gets the successful conversions using JoinConversionEventsNode @@ -354,7 +343,7 @@ def _build_aggregated_conversion_node( # be still be constrained, where we adjust the time range to the window size similar to cumulative, but # adjusted in the opposite direction. join_conversion_node = JoinConversionEventsNode.create( - base_node=filtered_unaggregated_base_node, + base_node=unaggregated_base_measure_node, base_time_dimension_spec=metric_time_dimension_spec, conversion_node=unaggregated_conversion_measure_node, conversion_measure_spec=conversion_measure_spec.measure_spec, @@ -829,26 +818,13 @@ def _build_plan_for_distinct_values( if not dataflow_recipe: raise UnableToSatisfyQueryError(f"Unable to join all items in request: {required_linkable_specs}") - output_node = dataflow_recipe.source_node - if dataflow_recipe.join_targets: - output_node = JoinOnEntitiesNode.create(left_node=output_node, join_targets=dataflow_recipe.join_targets) - - for time_dimension_spec in required_linkable_specs.time_dimension_specs: - if time_dimension_spec.time_granularity.is_custom_granularity: - output_node = JoinToCustomGranularityNode.create( - parent_node=output_node, time_dimension_spec=time_dimension_spec - ) - - if len(query_level_filter_specs) > 0: - output_node = WhereConstraintNode.create(parent_node=output_node, where_specs=query_level_filter_specs) - if query_spec.time_range_constraint: - output_node = ConstrainTimeRangeNode.create( - parent_node=output_node, time_range_constraint=query_spec.time_range_constraint - ) - - output_node = FilterElementsNode.create( - parent_node=output_node, - include_specs=InstanceSpecSet.create_from_specs(query_spec.linkable_specs.as_tuple), + output_node = self._build_pre_aggregation_plan( + source_node=dataflow_recipe.source_node, + join_targets=dataflow_recipe.join_targets, + filter_to_specs=InstanceSpecSet.create_from_specs(query_spec.linkable_specs.as_tuple), + custom_granularity_specs=required_linkable_specs.time_dimension_specs_with_custom_grain, + where_filter_specs=query_level_filter_specs, + time_range_constraint=query_spec.time_range_constraint, distinct=True, ) @@ -1643,43 +1619,30 @@ def _build_aggregated_measure_from_measure_source_node( join_type=before_aggregation_time_spine_join_description.join_type, ) - join_targets = measure_recipe.join_targets - if len(join_targets) > 0: - unaggregated_measure_node = JoinOnEntitiesNode.create( - left_node=unaggregated_measure_node, join_targets=join_targets - ) - - for time_dimension_spec in required_linkable_specs.time_dimension_specs: - if ( - time_dimension_spec.time_granularity.is_custom_granularity - # If this is the second layer of aggregation for a conversion metric, we have already joined the custom granularity. - and time_dimension_spec not in measure_recipe.all_linkable_specs_required_for_source_nodes.as_tuple - ): - unaggregated_measure_node = JoinToCustomGranularityNode.create( - parent_node=unaggregated_measure_node, time_dimension_spec=time_dimension_spec - ) - + custom_granularity_specs_to_join = [ + spec + for spec in required_linkable_specs.time_dimension_specs_with_custom_grain + # If this is the second layer of aggregation for a conversion metric, we have already joined the custom granularity. + if spec not in measure_recipe.all_linkable_specs_required_for_source_nodes.as_tuple + ] # If time constraint was previously adjusted for cumulative window or grain, apply original time constraint # here. Can skip if metric is being aggregated over all time. # TODO - Pushdown: Encapsulate all of this window sliding bookkeeping in the pushdown params object - if ( - cumulative_metric_adjusted_time_constraint is not None - and predicate_pushdown_state.time_range_constraint is not None - ): - assert ( - queried_linkable_specs.contains_metric_time - ), "Using time constraints currently requires querying with metric_time." - unaggregated_measure_node = ConstrainTimeRangeNode.create( - parent_node=unaggregated_measure_node, - time_range_constraint=predicate_pushdown_state.time_range_constraint, - ) - - if len(metric_input_measure_spec.filter_spec_set.all_filter_specs) > 0: - # Apply where constraint on the node - unaggregated_measure_node = WhereConstraintNode.create( - parent_node=unaggregated_measure_node, - where_specs=metric_input_measure_spec.filter_spec_set.all_filter_specs, + time_range_constraint_to_apply = ( + predicate_pushdown_state.time_range_constraint + if ( + cumulative_metric_adjusted_time_constraint is not None + and predicate_pushdown_state.time_range_constraint is not None ) + else None + ) + unaggregated_measure_node = self._build_pre_aggregation_plan( + source_node=unaggregated_measure_node, + join_targets=measure_recipe.join_targets, + custom_granularity_specs=custom_granularity_specs_to_join, + where_filter_specs=metric_input_measure_spec.filter_spec_set.all_filter_specs, + time_range_constraint=time_range_constraint_to_apply, + ) non_additive_dimension_spec = measure_properties.non_additive_dimension_spec if non_additive_dimension_spec is not None: @@ -1779,3 +1742,38 @@ def _build_aggregated_measure_from_measure_source_node( return output_node return aggregate_measures_node + + def _build_pre_aggregation_plan( + self, + source_node: DataflowPlanNode, + join_targets: List[JoinDescription], + custom_granularity_specs: Sequence[TimeDimensionSpec], + where_filter_specs: Sequence[WhereFilterSpec], + time_range_constraint: Optional[TimeRangeConstraint], + filter_to_specs: Optional[InstanceSpecSet] = None, + distinct: bool = False, + ) -> DataflowPlanNode: + # TODO: docstring + output_node = source_node + if join_targets: + output_node = JoinOnEntitiesNode.create(left_node=output_node, join_targets=join_targets) + + for custom_granularity_spec in custom_granularity_specs: + output_node = JoinToCustomGranularityNode.create( + parent_node=output_node, time_dimension_spec=custom_granularity_spec + ) + + if len(where_filter_specs) > 0: + output_node = WhereConstraintNode.create(parent_node=output_node, where_specs=where_filter_specs) + + if time_range_constraint: + output_node = ConstrainTimeRangeNode.create( + parent_node=output_node, time_range_constraint=time_range_constraint + ) + + if filter_to_specs: + output_node = FilterElementsNode.create( + parent_node=output_node, include_specs=filter_to_specs, distinct=distinct + ) + + return output_node diff --git a/tests_metricflow/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_dimensions_with_time_constraint__dfp_0.xml b/tests_metricflow/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_dimensions_with_time_constraint__dfp_0.xml index b83cb223ff..b1b8501a25 100644 --- a/tests_metricflow/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_dimensions_with_time_constraint__dfp_0.xml +++ b/tests_metricflow/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_dimensions_with_time_constraint__dfp_0.xml @@ -4,7 +4,7 @@ - + @@ -21,8 +21,8 @@ - - + + @@ -30,7 +30,7 @@ - + diff --git a/tests_metricflow/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_distinct_values_plan_with_join__dfp_0.xml b/tests_metricflow/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_distinct_values_plan_with_join__dfp_0.xml index 30f666de96..00125db532 100644 --- a/tests_metricflow/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_distinct_values_plan_with_join__dfp_0.xml +++ b/tests_metricflow/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_distinct_values_plan_with_join__dfp_0.xml @@ -16,7 +16,7 @@ - + @@ -73,9 +73,9 @@ - + - + @@ -86,7 +86,7 @@ - + diff --git a/tests_metricflow/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_metric_time_with_other_dimensions__dfp_0.xml b/tests_metricflow/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_metric_time_with_other_dimensions__dfp_0.xml index 98dbbba896..4bcb8d432c 100644 --- a/tests_metricflow/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_metric_time_with_other_dimensions__dfp_0.xml +++ b/tests_metricflow/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_metric_time_with_other_dimensions__dfp_0.xml @@ -6,7 +6,7 @@ - + @@ -25,11 +25,11 @@ - - - + + + - + @@ -40,7 +40,7 @@ - + @@ -65,7 +65,7 @@ - + diff --git a/tests_metricflow/snapshots/test_predicate_pushdown_optimizer.py/DataflowPlan/test_conversion_metric_predicate_pushdown__dfp_0.xml b/tests_metricflow/snapshots/test_predicate_pushdown_optimizer.py/DataflowPlan/test_conversion_metric_predicate_pushdown__dfp_0.xml index ed5d968b08..9472dba119 100644 --- a/tests_metricflow/snapshots/test_predicate_pushdown_optimizer.py/DataflowPlan/test_conversion_metric_predicate_pushdown__dfp_0.xml +++ b/tests_metricflow/snapshots/test_predicate_pushdown_optimizer.py/DataflowPlan/test_conversion_metric_predicate_pushdown__dfp_0.xml @@ -157,7 +157,7 @@ - + @@ -190,7 +190,7 @@ - + @@ -254,9 +254,9 @@ - + - + @@ -272,7 +272,7 @@ - + diff --git a/tests_metricflow/snapshots/test_predicate_pushdown_optimizer.py/DataflowPlan/test_conversion_metric_predicate_pushdown__dfpo_0.xml b/tests_metricflow/snapshots/test_predicate_pushdown_optimizer.py/DataflowPlan/test_conversion_metric_predicate_pushdown__dfpo_0.xml index 0eb01cf6bd..72ac971202 100644 --- a/tests_metricflow/snapshots/test_predicate_pushdown_optimizer.py/DataflowPlan/test_conversion_metric_predicate_pushdown__dfpo_0.xml +++ b/tests_metricflow/snapshots/test_predicate_pushdown_optimizer.py/DataflowPlan/test_conversion_metric_predicate_pushdown__dfpo_0.xml @@ -62,7 +62,7 @@ - + @@ -78,9 +78,9 @@ - + - + @@ -139,7 +139,7 @@ - + @@ -157,7 +157,7 @@ - + @@ -190,7 +190,7 @@ - + @@ -212,9 +212,9 @@ - + - + @@ -273,7 +273,7 @@ - +