Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incorporate default_grain #1291

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7fc74c0
Refactor _make_time_spine_data_set() for readability & simplicity
courtneyholcomb Jun 12, 2024
ad7c2bc
Refactor JoinOverTimeRangeNode SQL rendering for readability & simpli…
courtneyholcomb Jun 12, 2024
a92ef2f
Write tests for cumulative metrics queried with multiple agg time dim…
courtneyholcomb Jun 12, 2024
1c0af81
Allow multiple agg_time_dimension_specs in JoinOverTimeRangeNode
courtneyholcomb Jun 12, 2024
f5acfab
Support for multiple queried agg time dimensions in JoinOverTimeRange…
courtneyholcomb Jun 12, 2024
8bdb7d9
Update snapshots to reflect code changes
courtneyholcomb Jun 12, 2024
946cbae
Changelog
courtneyholcomb Jun 12, 2024
317e64b
Update SQL engine snapshots
courtneyholcomb Jun 13, 2024
0cf0f7a
Add requires_ordering property to SqlWindowFunction
courtneyholcomb Jun 13, 2024
bc3f0d3
Add WindowReaggregationNode
courtneyholcomb Jun 13, 2024
1500f9a
Visitor methods for WindowReaggregation node - primarily SQL renderin…
courtneyholcomb Jun 13, 2024
dcae9f4
Changelog
courtneyholcomb Jun 13, 2024
f3d478e
Only render SelectStatementNode description if it's not an empty string
courtneyholcomb Jun 13, 2024
57e5464
Don't optimize subqueries if it will put a window function into the g…
courtneyholcomb Jun 13, 2024
7a25047
Fix type for WindowReaggregationNode.partition_by_specs
courtneyholcomb Jun 14, 2024
22d9c06
Helpers to get min queryable granularity for metric
courtneyholcomb Jun 14, 2024
09e5d8e
Build DataflowPlan for cumulative metrics queried with non-default gr…
courtneyholcomb Jun 14, 2024
ce72cf1
Changelog
courtneyholcomb Jun 14, 2024
644bb5d
Remove base time grain restriction for cumulative metrics
courtneyholcomb Jun 14, 2024
9fe0996
Changelog
courtneyholcomb Jun 14, 2024
ae72a15
Include source_spec_patterns in log
courtneyholcomb Jun 14, 2024
9891bbf
Dataflow plan tests
courtneyholcomb Jun 14, 2024
2177713
Check query tests
courtneyholcomb Jun 14, 2024
571e03c
SQL rendering tests
courtneyholcomb Jun 14, 2024
72912f4
SQL engine snapshots
courtneyholcomb Jun 14, 2024
34149ae
Incorporate default_grain
courtneyholcomb Jun 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changes/unreleased/Features-20240613-160758.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Features
body: Adds a new dataflow plan node to re-aggregate metrics using window functions.
Needed to calculate cumulative metrics at non-default granularities.
time: 2024-06-13T16:07:58.767447-07:00
custom:
Author: courtneyholcomb
Issue: "1274"
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240613-172315.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Build dataflow plan for cumulative metrics queried with non-default granularity.
time: 2024-06-13T17:23:15.095339-07:00
custom:
Author: courtneyholcomb
Issue: "1281"
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240614-071108.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Remove restriction on querying non-default granularities with cumulative metrics.
time: 2024-06-14T07:11:08.765605-07:00
custom:
Author: courtneyholcomb
Issue: "1282"
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20240612-161605.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: When querying multiple agg time or metric time dimensions with a cumulative
metric, select all of them from the time spine table.
time: 2024-06-12T16:16:05.678697-07:00
custom:
Author: courtneyholcomb
Issue: "1271"
1 change: 1 addition & 0 deletions metricflow-semantics/metricflow_semantics/dag/id_prefix.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class StaticIdPrefix(IdPrefix, Enum, metaclass=EnumMetaClassHelper):
DATAFLOW_NODE_MIN_MAX_ID_PREFIX = "mm"
DATAFLOW_NODE_ADD_UUID_COLUMN_PREFIX = "auid"
DATAFLOW_NODE_JOIN_CONVERSION_EVENTS_PREFIX = "jce"
DATAFLOW_NODE_WINDOW_REAGGREGATION_ID_PREFIX = "wr"

SQL_EXPR_COLUMN_REFERENCE_ID_PREFIX = "cr"
SQL_EXPR_COMPARISON_ID_PREFIX = "cmp"
Expand Down
12 changes: 12 additions & 0 deletions metricflow-semantics/metricflow_semantics/instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,15 @@ def spec_set(self) -> InstanceSpecSet: # noqa: D102
metric_specs=tuple(x.spec for x in self.metric_instances),
metadata_specs=tuple(x.spec for x in self.metadata_instances),
)

@property
def as_tuple(self) -> Tuple[MdoInstance, ...]: # noqa: D102
return (
self.measure_instances
+ self.dimension_instances
+ self.time_dimension_instances
+ self.entity_instances
+ self.group_by_metric_instances
+ self.metric_instances
+ self.metadata_instances
)
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from dbt_semantic_interfaces.transformations.convert_median import (
ConvertMedianToPercentileRule,
)
from dbt_semantic_interfaces.transformations.default_grain import SetDefaultGrainRule
from dbt_semantic_interfaces.transformations.names import LowerCaseNamesRule
from dbt_semantic_interfaces.transformations.proxy_measure import CreateProxyMeasureRule
from dbt_semantic_interfaces.transformations.semantic_manifest_transformer import (
Expand All @@ -35,6 +36,7 @@ def parse_manifest_from_dbt_generated_manifest(manifest_json_string: str) -> Pyd
ConvertCountToSumRule(),
ConvertMedianToPercentileRule(),
DedupeMetricInputMeasuresRule(), # Remove once fix is in core
SetDefaultGrainRule(),
),
)
model = PydanticSemanticManifestTransformer.transform(raw_model, rules)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dbt_semantic_interfaces.protocols.metric import Metric, MetricInputMeasure, MetricType
from dbt_semantic_interfaces.protocols.semantic_manifest import SemanticManifest
from dbt_semantic_interfaces.references import MeasureReference, MetricReference
from dbt_semantic_interfaces.type_enums.time_granularity import TimeGranularity

from metricflow_semantics.errors.error_classes import DuplicateMetricError, MetricNotFoundError, NonExistentMeasureError
from metricflow_semantics.model.linkable_element_property import LinkableElementProperty
Expand Down Expand Up @@ -186,3 +187,26 @@ def get_valid_agg_time_dimensions_for_metric(
entity_links=agg_time_dimension_entity_links,
)
return valid_agg_time_dimension_specs

def get_min_queryable_time_granularity(self, metric_reference: MetricReference) -> TimeGranularity:
"""The minimum grain that can be queried with this metric.

Maps to the largest granularity defined for any of the metric's agg_time_dimensions.
"""
agg_time_dimension_specs = self._get_agg_time_dimension_specs_for_metric(metric_reference)
assert (
agg_time_dimension_specs
), f"No agg_time_dimension found for metric {metric_reference}. Something has been misconfigured."

minimum_queryable_granularity = self._semantic_model_lookup.get_defined_time_granularity(
agg_time_dimension_specs[0].reference
)
if len(agg_time_dimension_specs) > 1:
for agg_time_dimension_spec in agg_time_dimension_specs[1:]:
defined_time_granularity = self._semantic_model_lookup.get_defined_time_granularity(
agg_time_dimension_spec.reference
)
if defined_time_granularity.to_int() > minimum_queryable_granularity.to_int():
minimum_queryable_granularity = defined_time_granularity

return minimum_queryable_granularity
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
SemanticModelReference,
TimeDimensionReference,
)
from dbt_semantic_interfaces.type_enums import DimensionType, EntityType
from dbt_semantic_interfaces.type_enums.aggregation_type import AggregationType
from dbt_semantic_interfaces.type_enums import AggregationType, DimensionType, EntityType, TimeGranularity

from metricflow_semantics.errors.error_classes import InvalidSemanticModelError
from metricflow_semantics.mf_logging.pretty_print import mf_pformat
from metricflow_semantics.model.semantics.element_group import ElementGrouper
from metricflow_semantics.model.spec_converters import MeasureConverter
from metricflow_semantics.specs.spec_classes import (
DEFAULT_TIME_GRANULARITY,
DimensionSpec,
EntitySpec,
LinkableInstanceSpec,
Expand Down Expand Up @@ -85,10 +85,13 @@ def get_dimension_from_semantic_model(

def get_dimension(self, dimension_reference: DimensionReference) -> Dimension:
"""Retrieves a full dimension object by name."""
# If the reference passed is a TimeDimensionReference, convert to DimensionReference.
dimension_reference = DimensionReference(dimension_reference.element_name)

semantic_models = self._dimension_index.get(dimension_reference)
if not semantic_models:
raise ValueError(
f"Could not find dimension with name ({dimension_reference.element_name}) in configured semantic models"
f"Could not find dimension with name '{dimension_reference.element_name}' in configured semantic models"
)

dimension = SemanticModelLookup.get_dimension_from_semantic_model(
Expand Down Expand Up @@ -366,3 +369,13 @@ def get_agg_time_dimension_specs_for_measure(
time_dimension_reference=agg_time_dimension,
entity_links=(entity_link,),
)

def get_defined_time_granularity(self, time_dimension_reference: TimeDimensionReference) -> TimeGranularity:
"""Time granularity from the time dimension's YAML definition. If not set, defaults to DAY."""
time_dimension = self.get_dimension(time_dimension_reference)

defined_time_granularity = DEFAULT_TIME_GRANULARITY
if time_dimension.type_params and time_dimension.type_params.time_granularity:
defined_time_granularity = time_dimension.type_params.time_granularity

return defined_time_granularity
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
MetricFlowQueryResolutionIssueSet,
)
from metricflow_semantics.query.suggestion_generator import QueryItemSuggestionGenerator
from metricflow_semantics.specs.patterns.base_time_grain import BaseTimeGrainPattern
from metricflow_semantics.specs.patterns.none_date_part import NoneDatePartPattern
from metricflow_semantics.specs.patterns.spec_pattern import SpecPattern

Expand Down Expand Up @@ -171,19 +170,8 @@ def visit_measure_node(self, node: MeasureGroupByItemSourceNode) -> PushDownResu
elif metric.type is MetricType.RATIO or metric.type is MetricType.DERIVED:
assert False, f"A measure should have a simple or cumulative metric as a child, but got {metric.type}"
elif metric.type is MetricType.CUMULATIVE:
# To handle the restriction that cumulative metrics can only be queried at the base grain, it's
# easiest to handle that by applying the pattern to remove non-base grain time dimension specs at the
# measure node and generate the issue here if there's nothing that matches. Generating the issue here
# allows for creation of a more specific issue (i.e. include the measure) vs. generating the issue
# at a higher level. This can be more cleanly handled once we add additional context to the
# LinkableInstanceSpec.
patterns_to_apply = (
# From comment in ValidLinkableSpecResolver:
# It's possible to aggregate measures to coarser time granularities
# (except with cumulative metrics).
BaseTimeGrainPattern(only_apply_for_metric_time=True),
# From comment in previous query parser:
# Cannot extract date part for cumulative metrics.
# Date part doesn't make clear sense with cumulative metrics, so we don't allow it.
NoneDatePartPattern(),
)
else:
Expand All @@ -198,7 +186,7 @@ def visit_measure_node(self, node: MeasureGroupByItemSourceNode) -> PushDownResu
f"For {node.ui_description}:\n"
+ indent(
"After applying patterns:\n"
+ indent(mf_pformat(patterns_to_apply))
+ indent(mf_pformat(patterns_to_apply + self._source_spec_patterns))
+ "\n"
+ "to inputs, matches are:\n"
+ indent(mf_pformat(matching_items.specs))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
[
"TimeDimension('metric_time', 'month')",
"TimeDimension('metric_time', 'quarter')",
"TimeDimension('metric_time', 'year')",
"TimeDimension('monthly_measure_entity__creation_time', 'month')",
"TimeDimension('monthly_measure_entity__creation_time', 'quarter')",
"TimeDimension('monthly_measure_entity__creation_time', 'year')",
Expand Down
74 changes: 67 additions & 7 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
from metricflow.dataflow.nodes.order_by_limit import OrderByLimitNode
from metricflow.dataflow.nodes.semi_additive_join import SemiAdditiveJoinNode
from metricflow.dataflow.nodes.where_filter import WhereConstraintNode
from metricflow.dataflow.nodes.window_reaggregation_node import WindowReaggregationNode
from metricflow.dataflow.nodes.write_to_data_table import WriteToResultDataTableNode
from metricflow.dataflow.nodes.write_to_table import WriteToResultTableNode
from metricflow.dataflow.optimizer.dataflow_plan_optimizer import DataflowPlanOptimizer
Expand Down Expand Up @@ -411,6 +412,60 @@ def _build_conversion_metric_output_node(
aggregated_to_elements=set(queried_linkable_specs.as_tuple),
)

def _build_cumulative_metric_output_node(
self,
metric_spec: MetricSpec,
queried_linkable_specs: LinkableSpecSet,
filter_spec_factory: WhereSpecFactory,
predicate_pushdown_state: PredicatePushdownState,
for_group_by_source_node: bool = False,
) -> DataflowPlanNode:
# TODO: use default grain
# What is the expected behavior if you query with default grain?
# What if you query with a grain smaller than default? And larger?
# TODO elsewhere: use default grain for metric_time resolution
default_granularity = self._metric_lookup.get_min_queryable_time_granularity(metric_spec.reference)

queried_agg_time_dimensions = queried_linkable_specs.included_agg_time_dimension_specs_for_metric(
metric_reference=metric_spec.reference, metric_lookup=self._metric_lookup
)
query_includes_agg_time_dimension_with_default_granularity = False
for time_dimension_spec in queried_agg_time_dimensions:
if time_dimension_spec.time_granularity == default_granularity:
query_includes_agg_time_dimension_with_default_granularity = True
break

if query_includes_agg_time_dimension_with_default_granularity or not queried_agg_time_dimensions:
return self._build_base_metric_output_node(
metric_spec=metric_spec,
queried_linkable_specs=queried_linkable_specs,
filter_spec_factory=filter_spec_factory,
predicate_pushdown_state=predicate_pushdown_state,
for_group_by_source_node=for_group_by_source_node,
)

# If a cumulative metric is queried without default granularity, it will need to be aggregated twice -
# once as a normal metric, and again using a window function to narrow down to one row per granularity period.
# In this case, add metric time at the default granularity to the linkable specs. It will be used in the order by
# clause of the window function and later excluded from the output selections.
default_metric_time = DataSet.metric_time_dimension_spec(default_granularity)
include_linkable_specs = queried_linkable_specs.merge(
LinkableSpecSet(time_dimension_specs=(default_metric_time,))
)
compute_metrics_node = self._build_base_metric_output_node(
metric_spec=metric_spec,
queried_linkable_specs=include_linkable_specs,
filter_spec_factory=filter_spec_factory,
predicate_pushdown_state=predicate_pushdown_state,
for_group_by_source_node=for_group_by_source_node,
)
return WindowReaggregationNode(
parent_node=compute_metrics_node,
metric_spec=metric_spec,
order_by_spec=default_metric_time,
partition_by_specs=queried_linkable_specs.as_tuple,
)

def _build_base_metric_output_node(
self,
metric_spec: MetricSpec,
Expand Down Expand Up @@ -585,7 +640,7 @@ def _build_any_metric_output_node(
"""Builds a node to compute a metric of any type."""
metric = self._metric_lookup.get_metric(metric_spec.reference)

if metric.type is MetricType.SIMPLE or metric.type is MetricType.CUMULATIVE:
if metric.type is MetricType.SIMPLE:
return self._build_base_metric_output_node(
metric_spec=metric_spec,
queried_linkable_specs=queried_linkable_specs,
Expand All @@ -594,6 +649,15 @@ def _build_any_metric_output_node(
for_group_by_source_node=for_group_by_source_node,
)

elif metric.type is MetricType.CUMULATIVE:
return self._build_cumulative_metric_output_node(
metric_spec=metric_spec,
queried_linkable_specs=queried_linkable_specs,
filter_spec_factory=filter_spec_factory,
predicate_pushdown_state=predicate_pushdown_state,
for_group_by_source_node=for_group_by_source_node,
)

elif metric.type is MetricType.RATIO or metric.type is MetricType.DERIVED:
return self._build_derived_metric_output_node(
metric_spec=metric_spec,
Expand Down Expand Up @@ -1351,17 +1415,13 @@ def _build_aggregated_measure_from_measure_source_node(
measure_reference=measure_spec.reference, semantic_model_lookup=self._semantic_model_lookup
)

# If a cumulative metric is queried with metric_time, join over time range.
# If a cumulative metric is queried with metric_time / agg_time_dimension, join over time range.
# Otherwise, the measure will be aggregated over all time.
time_range_node: Optional[JoinOverTimeRangeNode] = None
if cumulative and queried_agg_time_dimension_specs:
# Use the time dimension spec with the smallest granularity.
agg_time_dimension_spec_for_join = sorted(
queried_agg_time_dimension_specs, key=lambda spec: spec.time_granularity.to_int()
)[0]
time_range_node = JoinOverTimeRangeNode(
parent_node=measure_recipe.source_node,
time_dimension_spec_for_join=agg_time_dimension_spec_for_join,
queried_agg_time_dimension_specs=tuple(queried_agg_time_dimension_specs),
window=cumulative_window,
grain_to_date=cumulative_grain_to_date,
# Note: we use the original constraint here because the JoinOverTimeRangeNode will eventually get
Expand Down
5 changes: 5 additions & 0 deletions metricflow/dataflow/dataflow_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from metricflow.dataflow.nodes.read_sql_source import ReadSqlSourceNode
from metricflow.dataflow.nodes.semi_additive_join import SemiAdditiveJoinNode
from metricflow.dataflow.nodes.where_filter import WhereConstraintNode
from metricflow.dataflow.nodes.window_reaggregation_node import WindowReaggregationNode
from metricflow.dataflow.nodes.write_to_data_table import WriteToResultDataTableNode
from metricflow.dataflow.nodes.write_to_table import WriteToResultTableNode

Expand Down Expand Up @@ -137,6 +138,10 @@ def visit_aggregate_measures_node(self, node: AggregateMeasuresNode) -> VisitorO
def visit_compute_metrics_node(self, node: ComputeMetricsNode) -> VisitorOutputT: # noqa: D102
pass

@abstractmethod
def visit_window_reaggregation_node(self, node: WindowReaggregationNode) -> VisitorOutputT: # noqa: D102
pass

@abstractmethod
def visit_order_by_limit_node(self, node: OrderByLimitNode) -> VisitorOutputT: # noqa: D102
pass
Expand Down
Loading
Loading