From 91a0b94952d7b5e1b26017819d640ce6fa9cf952 Mon Sep 17 00:00:00 2001 From: Courtney Holcomb Date: Tue, 18 Jun 2024 21:26:02 -0700 Subject: [PATCH] `DataflowPlan` for cumulative metrics queried with non-default granularity (#1281) When querying cumulative metrics without default granularity, build a dataflow plan that includes `WindowReaggregationNode`. E2E tests coming soon for this feature! --- .../unreleased/Features-20240613-172315.yaml | 6 ++ .../model/semantics/metric_lookup.py | 24 +++++++ .../model/semantics/semantic_model_lookup.py | 19 +++++- .../dataflow/builder/dataflow_plan_builder.py | 65 ++++++++++++++++++- 4 files changed, 109 insertions(+), 5 deletions(-) create mode 100644 .changes/unreleased/Features-20240613-172315.yaml diff --git a/.changes/unreleased/Features-20240613-172315.yaml b/.changes/unreleased/Features-20240613-172315.yaml new file mode 100644 index 0000000000..401eb1359a --- /dev/null +++ b/.changes/unreleased/Features-20240613-172315.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Build dataflow plan for cumulative metrics queried with non-default granularity. +time: 2024-06-13T17:23:15.095339-07:00 +custom: + Author: courtneyholcomb + Issue: "1281" diff --git a/metricflow-semantics/metricflow_semantics/model/semantics/metric_lookup.py b/metricflow-semantics/metricflow_semantics/model/semantics/metric_lookup.py index ca2b01eadb..21e5fb8ac1 100644 --- a/metricflow-semantics/metricflow_semantics/model/semantics/metric_lookup.py +++ b/metricflow-semantics/metricflow_semantics/model/semantics/metric_lookup.py @@ -9,6 +9,7 @@ from dbt_semantic_interfaces.protocols.metric import Metric, MetricInputMeasure, MetricType from dbt_semantic_interfaces.protocols.semantic_manifest import SemanticManifest from dbt_semantic_interfaces.references import MeasureReference, MetricReference +from dbt_semantic_interfaces.type_enums.time_granularity import TimeGranularity from metricflow_semantics.errors.error_classes import DuplicateMetricError, MetricNotFoundError, NonExistentMeasureError from metricflow_semantics.model.linkable_element_property import LinkableElementProperty @@ -186,3 +187,26 @@ def get_valid_agg_time_dimensions_for_metric( entity_links=agg_time_dimension_entity_links, ) return valid_agg_time_dimension_specs + + def get_min_queryable_time_granularity(self, metric_reference: MetricReference) -> TimeGranularity: + """The minimum grain that can be queried with this metric. + + Maps to the largest granularity defined for any of the metric's agg_time_dimensions. + """ + agg_time_dimension_specs = self._get_agg_time_dimension_specs_for_metric(metric_reference) + assert ( + agg_time_dimension_specs + ), f"No agg_time_dimension found for metric {metric_reference}. Something has been misconfigured." + + minimum_queryable_granularity = self._semantic_model_lookup.get_defined_time_granularity( + agg_time_dimension_specs[0].reference + ) + if len(agg_time_dimension_specs) > 1: + for agg_time_dimension_spec in agg_time_dimension_specs[1:]: + defined_time_granularity = self._semantic_model_lookup.get_defined_time_granularity( + agg_time_dimension_spec.reference + ) + if defined_time_granularity.to_int() > minimum_queryable_granularity.to_int(): + minimum_queryable_granularity = defined_time_granularity + + return minimum_queryable_granularity diff --git a/metricflow-semantics/metricflow_semantics/model/semantics/semantic_model_lookup.py b/metricflow-semantics/metricflow_semantics/model/semantics/semantic_model_lookup.py index a16783a55b..d820efb9a0 100644 --- a/metricflow-semantics/metricflow_semantics/model/semantics/semantic_model_lookup.py +++ b/metricflow-semantics/metricflow_semantics/model/semantics/semantic_model_lookup.py @@ -18,14 +18,14 @@ SemanticModelReference, TimeDimensionReference, ) -from dbt_semantic_interfaces.type_enums import DimensionType, EntityType -from dbt_semantic_interfaces.type_enums.aggregation_type import AggregationType +from dbt_semantic_interfaces.type_enums import AggregationType, DimensionType, EntityType, TimeGranularity from metricflow_semantics.errors.error_classes import InvalidSemanticModelError from metricflow_semantics.mf_logging.pretty_print import mf_pformat from metricflow_semantics.model.semantics.element_group import ElementGrouper from metricflow_semantics.model.spec_converters import MeasureConverter from metricflow_semantics.specs.spec_classes import ( + DEFAULT_TIME_GRANULARITY, DimensionSpec, EntitySpec, LinkableInstanceSpec, @@ -85,10 +85,13 @@ def get_dimension_from_semantic_model( def get_dimension(self, dimension_reference: DimensionReference) -> Dimension: """Retrieves a full dimension object by name.""" + # If the reference passed is a TimeDimensionReference, convert to DimensionReference. + dimension_reference = DimensionReference(dimension_reference.element_name) + semantic_models = self._dimension_index.get(dimension_reference) if not semantic_models: raise ValueError( - f"Could not find dimension with name ({dimension_reference.element_name}) in configured semantic models" + f"Could not find dimension with name '{dimension_reference.element_name}' in configured semantic models" ) dimension = SemanticModelLookup.get_dimension_from_semantic_model( @@ -366,3 +369,13 @@ def get_agg_time_dimension_specs_for_measure( time_dimension_reference=agg_time_dimension, entity_links=(entity_link,), ) + + def get_defined_time_granularity(self, time_dimension_reference: TimeDimensionReference) -> TimeGranularity: + """Time granularity from the time dimension's YAML definition. If not set, defaults to DAY.""" + time_dimension = self.get_dimension(time_dimension_reference) + + defined_time_granularity = DEFAULT_TIME_GRANULARITY + if time_dimension.type_params and time_dimension.type_params.time_granularity: + defined_time_granularity = time_dimension.type_params.time_granularity + + return defined_time_granularity diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index 9298fd8478..ef1913c06f 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -79,6 +79,7 @@ from metricflow.dataflow.nodes.order_by_limit import OrderByLimitNode from metricflow.dataflow.nodes.semi_additive_join import SemiAdditiveJoinNode from metricflow.dataflow.nodes.where_filter import WhereConstraintNode +from metricflow.dataflow.nodes.window_reaggregation_node import WindowReaggregationNode from metricflow.dataflow.nodes.write_to_data_table import WriteToResultDataTableNode from metricflow.dataflow.nodes.write_to_table import WriteToResultTableNode from metricflow.dataflow.optimizer.dataflow_plan_optimizer import DataflowPlanOptimizer @@ -411,6 +412,57 @@ def _build_conversion_metric_output_node( aggregated_to_elements=set(queried_linkable_specs.as_tuple), ) + def _build_cumulative_metric_output_node( + self, + metric_spec: MetricSpec, + queried_linkable_specs: LinkableSpecSet, + filter_spec_factory: WhereSpecFactory, + predicate_pushdown_state: PredicatePushdownState, + for_group_by_source_node: bool = False, + ) -> DataflowPlanNode: + # TODO: replace with default_grain once added to YAML spec + default_granularity = self._metric_lookup.get_min_queryable_time_granularity(metric_spec.reference) + + queried_agg_time_dimensions = queried_linkable_specs.included_agg_time_dimension_specs_for_metric( + metric_reference=metric_spec.reference, metric_lookup=self._metric_lookup + ) + query_includes_agg_time_dimension_with_default_granularity = False + for time_dimension_spec in queried_agg_time_dimensions: + if time_dimension_spec.time_granularity == default_granularity: + query_includes_agg_time_dimension_with_default_granularity = True + break + + if query_includes_agg_time_dimension_with_default_granularity or not queried_agg_time_dimensions: + return self._build_base_metric_output_node( + metric_spec=metric_spec, + queried_linkable_specs=queried_linkable_specs, + filter_spec_factory=filter_spec_factory, + predicate_pushdown_state=predicate_pushdown_state, + for_group_by_source_node=for_group_by_source_node, + ) + + # If a cumulative metric is queried without default granularity, it will need to be aggregated twice - + # once as a normal metric, and again using a window function to narrow down to one row per granularity period. + # In this case, add metric time at the default granularity to the linkable specs. It will be used in the order by + # clause of the window function and later excluded from the output selections. + default_metric_time = DataSet.metric_time_dimension_spec(default_granularity) + include_linkable_specs = queried_linkable_specs.merge( + LinkableSpecSet(time_dimension_specs=(default_metric_time,)) + ) + compute_metrics_node = self._build_base_metric_output_node( + metric_spec=metric_spec, + queried_linkable_specs=include_linkable_specs, + filter_spec_factory=filter_spec_factory, + predicate_pushdown_state=predicate_pushdown_state, + for_group_by_source_node=for_group_by_source_node, + ) + return WindowReaggregationNode( + parent_node=compute_metrics_node, + metric_spec=metric_spec, + order_by_spec=default_metric_time, + partition_by_specs=queried_linkable_specs.as_tuple, + ) + def _build_base_metric_output_node( self, metric_spec: MetricSpec, @@ -585,7 +637,7 @@ def _build_any_metric_output_node( """Builds a node to compute a metric of any type.""" metric = self._metric_lookup.get_metric(metric_spec.reference) - if metric.type is MetricType.SIMPLE or metric.type is MetricType.CUMULATIVE: + if metric.type is MetricType.SIMPLE: return self._build_base_metric_output_node( metric_spec=metric_spec, queried_linkable_specs=queried_linkable_specs, @@ -594,6 +646,15 @@ def _build_any_metric_output_node( for_group_by_source_node=for_group_by_source_node, ) + elif metric.type is MetricType.CUMULATIVE: + return self._build_cumulative_metric_output_node( + metric_spec=metric_spec, + queried_linkable_specs=queried_linkable_specs, + filter_spec_factory=filter_spec_factory, + predicate_pushdown_state=predicate_pushdown_state, + for_group_by_source_node=for_group_by_source_node, + ) + elif metric.type is MetricType.RATIO or metric.type is MetricType.DERIVED: return self._build_derived_metric_output_node( metric_spec=metric_spec, @@ -1351,7 +1412,7 @@ def _build_aggregated_measure_from_measure_source_node( measure_reference=measure_spec.reference, semantic_model_lookup=self._semantic_model_lookup ) - # If a cumulative metric is queried with metric_time, join over time range. + # If a cumulative metric is queried with metric_time / agg_time_dimension, join over time range. # Otherwise, the measure will be aggregated over all time. time_range_node: Optional[JoinOverTimeRangeNode] = None if cumulative and queried_agg_time_dimension_specs: