Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use MetricTimeDefaultGranularityPattern to resolve metric_time granularity #1324

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240628-074617.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Use default_granularity to resolve metric_time.
time: 2024-06-28T07:46:17.768805-07:00
custom:
Author: courtneyholcomb
Issue: "1310"
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ def filter_by_spec_patterns(self, spec_patterns: Sequence[SpecPattern]) -> Linka
"""
start_time = time.time()

# Spec patterns need all specs to match properly e.g. `BaseTimeGrainPattern`.
# Spec patterns need all specs to match properly e.g. `MinimumTimeGrainPattern`.
matching_specs: Sequence[InstanceSpec] = self.specs

for spec_pattern in spec_patterns:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from dbt_semantic_interfaces.call_parameter_sets import TimeDimensionCallParameterSet
from dbt_semantic_interfaces.naming.keywords import METRIC_TIME_ELEMENT_NAME
from dbt_semantic_interfaces.references import SemanticModelReference, TimeDimensionReference
from dbt_semantic_interfaces.references import MetricReference, SemanticModelReference, TimeDimensionReference
from dbt_semantic_interfaces.type_enums import TimeGranularity
from typing_extensions import override

Expand All @@ -27,7 +27,8 @@
MetricFlowQueryResolutionIssueSet,
)
from metricflow_semantics.query.suggestion_generator import QueryItemSuggestionGenerator
from metricflow_semantics.specs.patterns.base_time_grain import BaseTimeGrainPattern
from metricflow_semantics.specs.patterns.metric_time_default_granularity import MetricTimeDefaultGranularityPattern
from metricflow_semantics.specs.patterns.minimum_time_grain import MinimumTimeGrainPattern
from metricflow_semantics.specs.patterns.no_group_by_metric import NoGroupByMetricPattern
from metricflow_semantics.specs.patterns.spec_pattern import SpecPattern
from metricflow_semantics.specs.patterns.typed_patterns import TimeDimensionPattern
Expand Down Expand Up @@ -80,11 +81,15 @@ def resolve_matching_item_for_querying(
self,
spec_pattern: SpecPattern,
suggestion_generator: Optional[QueryItemSuggestionGenerator],
queried_metrics: Sequence[MetricReference],
only_use_minimum_grain: bool = False,
) -> GroupByItemResolution:
"""Returns the spec that corresponds the one described by spec_pattern and is valid for the query.
"""Returns the spec that corresponds to the one described by spec_pattern and is valid for the query.

For queries, if the pattern matches to a spec for the same element at different grains, the spec with the finest
common grain is returned.
common grain is returned, unless the spec is metric_time, in which case the default grain is returned.

If only_use_minimum_grain is True, will use minimum grain instead of default for metric_time, too.
"""
push_down_visitor = _PushDownGroupByItemCandidatesVisitor(
manifest_lookup=self._manifest_lookup,
Expand All @@ -101,9 +106,18 @@ def resolve_matching_item_for_querying(
issue_set=push_down_result.issue_set,
)

push_down_result = push_down_result.filter_candidates_by_pattern(
BaseTimeGrainPattern(),
)
filters_to_use: Tuple[SpecPattern, ...] = (MinimumTimeGrainPattern(),)
if not only_use_minimum_grain:
# Default pattern must come first to avoid removing default grain options prematurely.
filters_to_use = (
MetricTimeDefaultGranularityPattern(
metric_lookup=self._manifest_lookup.metric_lookup, queried_metrics=queried_metrics
),
) + filters_to_use

for filter_to_use in filters_to_use:
push_down_result = push_down_result.filter_candidates_by_pattern(filter_to_use)

logger.info(
f"Spec pattern:\n"
f"{indent(mf_pformat(spec_pattern))}\n"
Expand Down Expand Up @@ -152,7 +166,15 @@ def resolve_matching_item_for_filters(

push_down_visitor = _PushDownGroupByItemCandidatesVisitor(
manifest_lookup=self._manifest_lookup,
source_spec_patterns=(spec_pattern, BaseTimeGrainPattern()),
source_spec_patterns=(
spec_pattern,
# MetricTimeDefaultGranularityPattern must come before MinimumTimeGrainPattern to ensure we don't remove the
# default grain from candiate set prematurely.
MetricTimeDefaultGranularityPattern(
metric_lookup=self._manifest_lookup.metric_lookup, queried_metrics=filter_location.metric_references
),
MinimumTimeGrainPattern(),
),
suggestion_generator=suggestion_generator,
)

Expand Down Expand Up @@ -210,16 +232,18 @@ def resolve_available_items(
issue_set=push_down_result.issue_set,
)

def resolve_min_metric_time_grain(self) -> TimeGranularity:
def resolve_min_metric_time_grain(self, metrics_in_query: Sequence[MetricReference]) -> TimeGranularity:
"""Returns the finest time grain of metric_time for querying."""
metric_time_grain_resolution = self.resolve_matching_item_for_querying(
spec_pattern=TimeDimensionPattern.from_call_parameter_set(
TimeDimensionCallParameterSet(
entity_path=(),
time_dimension_reference=TimeDimensionReference(element_name=METRIC_TIME_ELEMENT_NAME),
)
),
),
suggestion_generator=None,
queried_metrics=metrics_in_query,
only_use_minimum_grain=True,
)
metric_time_spec_set = (
group_specs_by_type((metric_time_grain_resolution.spec,))
Expand Down
39 changes: 22 additions & 17 deletions metricflow-semantics/metricflow_semantics/query/query_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
)
from dbt_semantic_interfaces.protocols import SavedQuery
from dbt_semantic_interfaces.protocols.where_filter import WhereFilter
from dbt_semantic_interfaces.references import SemanticModelReference
from dbt_semantic_interfaces.references import MetricReference, SemanticModelReference
from dbt_semantic_interfaces.type_enums import TimeGranularity

from metricflow_semantics.assert_one_arg import assert_at_most_one_arg_set
Expand Down Expand Up @@ -52,8 +52,8 @@
ResolverInputForQuery,
ResolverInputForQueryLevelWhereFilterIntersection,
)
from metricflow_semantics.specs.patterns.base_time_grain import BaseTimeGrainPattern
from metricflow_semantics.specs.patterns.metric_time_pattern import MetricTimePattern
from metricflow_semantics.specs.patterns.min_time_grain import MinimumTimeGrainPattern
from metricflow_semantics.specs.patterns.none_date_part import NoneDatePartPattern
from metricflow_semantics.specs.query_param_implementations import DimensionOrEntityParameter, MetricParameter
from metricflow_semantics.specs.query_spec import MetricFlowQuerySpec
Expand Down Expand Up @@ -147,13 +147,14 @@ def _get_saved_query(self, saved_query_parameter: SavedQueryParameter) -> SavedQ

return matching_saved_queries[0]

@staticmethod
def _metric_time_granularity(time_dimension_specs: Sequence[TimeDimensionSpec]) -> Optional[TimeGranularity]:
def _get_smallest_requested_metric_time_granularity(
self, time_dimension_specs: Sequence[TimeDimensionSpec]
) -> Optional[TimeGranularity]:
matching_specs: Sequence[InstanceSpec] = time_dimension_specs

for pattern_to_apply in (
MetricTimePattern(),
BaseTimeGrainPattern(),
MinimumTimeGrainPattern(),
NoneDatePartPattern(),
):
matching_specs = pattern_to_apply.match(matching_specs)
Expand All @@ -164,7 +165,7 @@ def _metric_time_granularity(time_dimension_specs: Sequence[TimeDimensionSpec])

assert (
len(time_dimension_specs) == 1
), f"Bug with BaseTimeGrainPattern - should have returned exactly 1 spec but got {time_dimension_specs}"
), f"Bug with MinimumTimeGrainPattern - should have returned exactly 1 spec but got {time_dimension_specs}"

return time_dimension_specs[0].time_granularity

Expand All @@ -173,19 +174,23 @@ def _adjust_time_constraint(
resolution_dag: GroupByItemResolutionDag,
time_dimension_specs_in_query: Sequence[TimeDimensionSpec],
time_constraint: TimeRangeConstraint,
metrics_in_query: Sequence[MetricReference],
) -> TimeRangeConstraint:
metric_time_granularity = MetricFlowQueryParser._metric_time_granularity(time_dimension_specs_in_query)
"""Change the time range so that the ends are at the ends of the requested time granularity windows.

e.g. [2020-01-15, 2020-2-15] with MONTH granularity -> [2020-01-01, 2020-02-29]
"""
metric_time_granularity = self._get_smallest_requested_metric_time_granularity(time_dimension_specs_in_query)
if metric_time_granularity is None:
# This indicates there were no metric time specs in the query, so use smallest available granularity for metric_time.
group_by_item_resolver = GroupByItemResolver(
manifest_lookup=self._manifest_lookup,
resolution_dag=resolution_dag,
)
metric_time_granularity = group_by_item_resolver.resolve_min_metric_time_grain()

"""Change the time range so that the ends are at the ends of the appropriate time granularity windows.
metric_time_granularity = group_by_item_resolver.resolve_min_metric_time_grain(
metrics_in_query=metrics_in_query
)

e.g. [2020-01-15, 2020-2-15] with MONTH granularity -> [2020-01-01, 2020-02-29]
"""
return self._time_period_adjuster.expand_time_constraint_to_fill_granularity(
time_constraint=time_constraint,
granularity=metric_time_granularity,
Expand Down Expand Up @@ -495,13 +500,13 @@ def _parse_and_validate_query(
resolution_dag=query_resolution.resolution_dag,
time_dimension_specs_in_query=query_spec.time_dimension_specs,
time_constraint=time_constraint,
metrics_in_query=tuple(
metric_resolver_input.spec_pattern.metric_reference
for metric_resolver_input in resolver_inputs_for_metrics
),
)
logger.info(f"Time constraint after adjustment is: {time_constraint}")

return ParseQueryResult(
query_spec=query_spec.with_time_range_constraint(time_constraint),
queried_semantic_models=query_resolution.queried_semantic_models,
)
query_spec = query_spec.with_time_range_constraint(time_constraint)

return ParseQueryResult(
query_spec=query_spec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,11 @@ def _resolve_group_by_item_input(
),
),
)

return group_by_item_resolver.resolve_matching_item_for_querying(
spec_pattern=group_by_item_input.spec_pattern,
suggestion_generator=suggestion_generator,
queried_metrics=queried_metrics,
)

def _resolve_metric_inputs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from metricflow_semantics.naming.naming_scheme import QueryItemNamingScheme
from metricflow_semantics.query.similarity import top_fuzzy_matches
from metricflow_semantics.specs.patterns.base_time_grain import BaseTimeGrainPattern
from metricflow_semantics.specs.patterns.minimum_time_grain import MinimumTimeGrainPattern
from metricflow_semantics.specs.patterns.no_group_by_metric import NoGroupByMetricPattern
from metricflow_semantics.specs.patterns.none_date_part import NoneDatePartPattern
from metricflow_semantics.specs.patterns.spec_pattern import SpecPattern
Expand All @@ -24,9 +24,9 @@ class QueryItemSuggestionGenerator:

# Adding these filters so that we don't get multiple suggestions that are similar, but with different
# grains. Some additional thought is needed to tweak this as the base grain may not be the best suggestion.
FILTER_ITEM_CANDIDATE_FILTERS: Tuple[SpecPattern, ...] = (BaseTimeGrainPattern(), NoneDatePartPattern())
FILTER_ITEM_CANDIDATE_FILTERS: Tuple[SpecPattern, ...] = (MinimumTimeGrainPattern(), NoneDatePartPattern())
GROUP_BY_ITEM_CANDIDATE_FILTERS: Tuple[SpecPattern, ...] = (
BaseTimeGrainPattern(),
MinimumTimeGrainPattern(),
NoneDatePartPattern(),
NoGroupByMetricPattern(),
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from __future__ import annotations

from collections import defaultdict
from typing import Dict, Sequence, Set, Tuple

from dbt_semantic_interfaces.references import MetricReference
from dbt_semantic_interfaces.type_enums import TimeGranularity
from typing_extensions import override

from metricflow_semantics.model.semantics.metric_lookup import MetricLookup
from metricflow_semantics.specs.patterns.spec_pattern import SpecPattern
from metricflow_semantics.specs.spec_classes import (
InstanceSpec,
LinkableInstanceSpec,
TimeDimensionSpec,
TimeDimensionSpecComparisonKey,
TimeDimensionSpecField,
)
from metricflow_semantics.specs.spec_set import group_specs_by_type


class MetricTimeDefaultGranularityPattern(SpecPattern):
"""A pattern that matches metric_time specs if they have the default granularity for the requested metrics.

This is used to determine the granularity that should be used for metric_time if no granularity is specified.
Spec passes through if granularity is already selected or if no metrics were queried, since no default is needed.
All non-metric_time specs are passed through.

e.g., if a metric with default_granularity MONTH is queried

inputs:
[
TimeDimensionSpec('metric_time', 'day'),
TimeDimensionSpec('metric_time', 'week'),
TimeDimensionSpec('metric_time', 'month'),
DimensionSpec('listing__country'),
]

matches:
[
TimeDimensionSpec('metric_time', 'month'),
DimensionSpec('listing__country'),
]
"""

def __init__(self, metric_lookup: MetricLookup, queried_metrics: Sequence[MetricReference] = ()) -> None:
"""Match only time dimensions with the default granularity for a given query.

Only affects time dimensions. All other items pass through.
"""
self._metric_lookup = metric_lookup
self._queried_metrics = queried_metrics

@override
def match(self, candidate_specs: Sequence[InstanceSpec]) -> Sequence[InstanceSpec]:
# TODO: using placeholder grain option for now
max_metric_default_time_granularity = max((TimeGranularity.DAY,))
spec_set = group_specs_by_type(candidate_specs)

# TODO: is this needed?
# If there are no metrics or metric_time specs in the query, skip this filter.
if not (max_metric_default_time_granularity and spec_set.metric_time_specs):
return candidate_specs

spec_key_to_grains: Dict[TimeDimensionSpecComparisonKey, Set[TimeGranularity]] = defaultdict(set)
spec_key_to_specs: Dict[TimeDimensionSpecComparisonKey, Tuple[TimeDimensionSpec, ...]] = defaultdict(tuple)
for metric_time_spec in spec_set.metric_time_specs:
spec_key = metric_time_spec.comparison_key(exclude_fields=(TimeDimensionSpecField.TIME_GRANULARITY,))
spec_key_to_grains[spec_key].add(metric_time_spec.time_granularity)
spec_key_to_specs[spec_key] += (metric_time_spec,)

matched_metric_time_specs: Tuple[TimeDimensionSpec, ...] = ()
for spec_key, time_grains in spec_key_to_grains.items():
if max_metric_default_time_granularity in time_grains:
matched_metric_time_specs += (
spec_key_to_specs[spec_key][0].with_grain(max_metric_default_time_granularity),
)
else:
# If default_granularity is not in the available options, then time granularity was specified in the request
# and a default is not needed here. Pass all options through for this spec key.
matched_metric_time_specs += spec_key_to_specs[spec_key]

matching_specs: Sequence[LinkableInstanceSpec] = (
spec_set.dimension_specs
+ matched_metric_time_specs
+ tuple(spec for spec in spec_set.time_dimension_specs if not spec.is_metric_time)
+ spec_set.entity_specs
+ spec_set.group_by_metric_specs
)

return matching_specs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from dbt_semantic_interfaces.type_enums import TimeGranularity
from typing_extensions import override

from metricflow_semantics.specs.patterns.metric_time_pattern import MetricTimePattern
from metricflow_semantics.specs.patterns.spec_pattern import SpecPattern
from metricflow_semantics.specs.spec_classes import (
InstanceSpec,
Expand All @@ -18,7 +17,7 @@
from metricflow_semantics.specs.spec_set import group_specs_by_type


class BaseTimeGrainPattern(SpecPattern):
class MinimumTimeGrainPattern(SpecPattern):
"""A pattern that matches linkable specs, but for time dimension specs, only the one with the finest grain.

e.g.
Expand Down Expand Up @@ -46,25 +45,8 @@ class BaseTimeGrainPattern(SpecPattern):
by the base grain of metric_time.
"""

def __init__(self, only_apply_for_metric_time: bool = False) -> None:
"""Initializer.

Args:
only_apply_for_metric_time: If set, only remove time dimension specs with a non-base grain if it's for
metric_time. This parameter is useful for implementing restrictions on cumulative metrics as they can only
be queried by the base grain of metric_time.
TODO: This is a little odd. This can be replaced once composite patterns are supported.
"""
self._only_apply_for_metric_time = only_apply_for_metric_time

@override
def match(self, candidate_specs: Sequence[InstanceSpec]) -> Sequence[InstanceSpec]:
if self._only_apply_for_metric_time:
metric_time_specs = MetricTimePattern().match(candidate_specs)
other_specs = tuple(spec for spec in candidate_specs if spec not in metric_time_specs)

return other_specs + tuple(BaseTimeGrainPattern(only_apply_for_metric_time=False).match(metric_time_specs))

spec_set = group_specs_by_type(candidate_specs)

spec_key_to_grains: Dict[TimeDimensionSpecComparisonKey, Set[TimeGranularity]] = defaultdict(set)
Expand Down
Loading
Loading