Skip to content

Commit

Permalink
DataflowPlan for cumulative metrics queried with non-default granul…
Browse files Browse the repository at this point in the history
…arity (#1281)

When querying cumulative metrics without default granularity, build a
dataflow plan that includes `WindowReaggregationNode`.

E2E tests coming soon for this feature!
  • Loading branch information
courtneyholcomb authored Jun 19, 2024
1 parent 8f715d9 commit 91a0b94
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 5 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240613-172315.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Build dataflow plan for cumulative metrics queried with non-default granularity.
time: 2024-06-13T17:23:15.095339-07:00
custom:
Author: courtneyholcomb
Issue: "1281"
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dbt_semantic_interfaces.protocols.metric import Metric, MetricInputMeasure, MetricType
from dbt_semantic_interfaces.protocols.semantic_manifest import SemanticManifest
from dbt_semantic_interfaces.references import MeasureReference, MetricReference
from dbt_semantic_interfaces.type_enums.time_granularity import TimeGranularity

from metricflow_semantics.errors.error_classes import DuplicateMetricError, MetricNotFoundError, NonExistentMeasureError
from metricflow_semantics.model.linkable_element_property import LinkableElementProperty
Expand Down Expand Up @@ -186,3 +187,26 @@ def get_valid_agg_time_dimensions_for_metric(
entity_links=agg_time_dimension_entity_links,
)
return valid_agg_time_dimension_specs

def get_min_queryable_time_granularity(self, metric_reference: MetricReference) -> TimeGranularity:
"""The minimum grain that can be queried with this metric.
Maps to the largest granularity defined for any of the metric's agg_time_dimensions.
"""
agg_time_dimension_specs = self._get_agg_time_dimension_specs_for_metric(metric_reference)
assert (
agg_time_dimension_specs
), f"No agg_time_dimension found for metric {metric_reference}. Something has been misconfigured."

minimum_queryable_granularity = self._semantic_model_lookup.get_defined_time_granularity(
agg_time_dimension_specs[0].reference
)
if len(agg_time_dimension_specs) > 1:
for agg_time_dimension_spec in agg_time_dimension_specs[1:]:
defined_time_granularity = self._semantic_model_lookup.get_defined_time_granularity(
agg_time_dimension_spec.reference
)
if defined_time_granularity.to_int() > minimum_queryable_granularity.to_int():
minimum_queryable_granularity = defined_time_granularity

return minimum_queryable_granularity
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
SemanticModelReference,
TimeDimensionReference,
)
from dbt_semantic_interfaces.type_enums import DimensionType, EntityType
from dbt_semantic_interfaces.type_enums.aggregation_type import AggregationType
from dbt_semantic_interfaces.type_enums import AggregationType, DimensionType, EntityType, TimeGranularity

from metricflow_semantics.errors.error_classes import InvalidSemanticModelError
from metricflow_semantics.mf_logging.pretty_print import mf_pformat
from metricflow_semantics.model.semantics.element_group import ElementGrouper
from metricflow_semantics.model.spec_converters import MeasureConverter
from metricflow_semantics.specs.spec_classes import (
DEFAULT_TIME_GRANULARITY,
DimensionSpec,
EntitySpec,
LinkableInstanceSpec,
Expand Down Expand Up @@ -85,10 +85,13 @@ def get_dimension_from_semantic_model(

def get_dimension(self, dimension_reference: DimensionReference) -> Dimension:
"""Retrieves a full dimension object by name."""
# If the reference passed is a TimeDimensionReference, convert to DimensionReference.
dimension_reference = DimensionReference(dimension_reference.element_name)

semantic_models = self._dimension_index.get(dimension_reference)
if not semantic_models:
raise ValueError(
f"Could not find dimension with name ({dimension_reference.element_name}) in configured semantic models"
f"Could not find dimension with name '{dimension_reference.element_name}' in configured semantic models"
)

dimension = SemanticModelLookup.get_dimension_from_semantic_model(
Expand Down Expand Up @@ -366,3 +369,13 @@ def get_agg_time_dimension_specs_for_measure(
time_dimension_reference=agg_time_dimension,
entity_links=(entity_link,),
)

def get_defined_time_granularity(self, time_dimension_reference: TimeDimensionReference) -> TimeGranularity:
"""Time granularity from the time dimension's YAML definition. If not set, defaults to DAY."""
time_dimension = self.get_dimension(time_dimension_reference)

defined_time_granularity = DEFAULT_TIME_GRANULARITY
if time_dimension.type_params and time_dimension.type_params.time_granularity:
defined_time_granularity = time_dimension.type_params.time_granularity

return defined_time_granularity
65 changes: 63 additions & 2 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
from metricflow.dataflow.nodes.order_by_limit import OrderByLimitNode
from metricflow.dataflow.nodes.semi_additive_join import SemiAdditiveJoinNode
from metricflow.dataflow.nodes.where_filter import WhereConstraintNode
from metricflow.dataflow.nodes.window_reaggregation_node import WindowReaggregationNode
from metricflow.dataflow.nodes.write_to_data_table import WriteToResultDataTableNode
from metricflow.dataflow.nodes.write_to_table import WriteToResultTableNode
from metricflow.dataflow.optimizer.dataflow_plan_optimizer import DataflowPlanOptimizer
Expand Down Expand Up @@ -411,6 +412,57 @@ def _build_conversion_metric_output_node(
aggregated_to_elements=set(queried_linkable_specs.as_tuple),
)

def _build_cumulative_metric_output_node(
self,
metric_spec: MetricSpec,
queried_linkable_specs: LinkableSpecSet,
filter_spec_factory: WhereSpecFactory,
predicate_pushdown_state: PredicatePushdownState,
for_group_by_source_node: bool = False,
) -> DataflowPlanNode:
# TODO: replace with default_grain once added to YAML spec
default_granularity = self._metric_lookup.get_min_queryable_time_granularity(metric_spec.reference)

queried_agg_time_dimensions = queried_linkable_specs.included_agg_time_dimension_specs_for_metric(
metric_reference=metric_spec.reference, metric_lookup=self._metric_lookup
)
query_includes_agg_time_dimension_with_default_granularity = False
for time_dimension_spec in queried_agg_time_dimensions:
if time_dimension_spec.time_granularity == default_granularity:
query_includes_agg_time_dimension_with_default_granularity = True
break

if query_includes_agg_time_dimension_with_default_granularity or not queried_agg_time_dimensions:
return self._build_base_metric_output_node(
metric_spec=metric_spec,
queried_linkable_specs=queried_linkable_specs,
filter_spec_factory=filter_spec_factory,
predicate_pushdown_state=predicate_pushdown_state,
for_group_by_source_node=for_group_by_source_node,
)

# If a cumulative metric is queried without default granularity, it will need to be aggregated twice -
# once as a normal metric, and again using a window function to narrow down to one row per granularity period.
# In this case, add metric time at the default granularity to the linkable specs. It will be used in the order by
# clause of the window function and later excluded from the output selections.
default_metric_time = DataSet.metric_time_dimension_spec(default_granularity)
include_linkable_specs = queried_linkable_specs.merge(
LinkableSpecSet(time_dimension_specs=(default_metric_time,))
)
compute_metrics_node = self._build_base_metric_output_node(
metric_spec=metric_spec,
queried_linkable_specs=include_linkable_specs,
filter_spec_factory=filter_spec_factory,
predicate_pushdown_state=predicate_pushdown_state,
for_group_by_source_node=for_group_by_source_node,
)
return WindowReaggregationNode(
parent_node=compute_metrics_node,
metric_spec=metric_spec,
order_by_spec=default_metric_time,
partition_by_specs=queried_linkable_specs.as_tuple,
)

def _build_base_metric_output_node(
self,
metric_spec: MetricSpec,
Expand Down Expand Up @@ -585,7 +637,7 @@ def _build_any_metric_output_node(
"""Builds a node to compute a metric of any type."""
metric = self._metric_lookup.get_metric(metric_spec.reference)

if metric.type is MetricType.SIMPLE or metric.type is MetricType.CUMULATIVE:
if metric.type is MetricType.SIMPLE:
return self._build_base_metric_output_node(
metric_spec=metric_spec,
queried_linkable_specs=queried_linkable_specs,
Expand All @@ -594,6 +646,15 @@ def _build_any_metric_output_node(
for_group_by_source_node=for_group_by_source_node,
)

elif metric.type is MetricType.CUMULATIVE:
return self._build_cumulative_metric_output_node(
metric_spec=metric_spec,
queried_linkable_specs=queried_linkable_specs,
filter_spec_factory=filter_spec_factory,
predicate_pushdown_state=predicate_pushdown_state,
for_group_by_source_node=for_group_by_source_node,
)

elif metric.type is MetricType.RATIO or metric.type is MetricType.DERIVED:
return self._build_derived_metric_output_node(
metric_spec=metric_spec,
Expand Down Expand Up @@ -1351,7 +1412,7 @@ def _build_aggregated_measure_from_measure_source_node(
measure_reference=measure_spec.reference, semantic_model_lookup=self._semantic_model_lookup
)

# If a cumulative metric is queried with metric_time, join over time range.
# If a cumulative metric is queried with metric_time / agg_time_dimension, join over time range.
# Otherwise, the measure will be aggregated over all time.
time_range_node: Optional[JoinOverTimeRangeNode] = None
if cumulative and queried_agg_time_dimension_specs:
Expand Down

0 comments on commit 91a0b94

Please sign in to comment.