Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use default_granularity to resolve metric_time #1310

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240628-074617.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Use default_granularity to resolve metric_time.
time: 2024-06-28T07:46:17.768805-07:00
custom:
Author: courtneyholcomb
Issue: "1310"
2 changes: 1 addition & 1 deletion extra-hatch-configuration/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Jinja2>=3.1.3
dbt-semantic-interfaces==0.6.1
dbt-semantic-interfaces==0.6.4
more-itertools>=8.10.0, <10.2.0
pydantic>=1.10.0, <3.0
tabulate>=0.8.9
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# dbt Cloud depends on metricflow-semantics (dependency set in dbt-mantle), so DSI must always point to a production version here.
dbt-semantic-interfaces>=0.6.1, <2.0.0
dbt-semantic-interfaces>=0.6.4
graphviz>=0.18.2, <0.21
python-dateutil>=2.9.0, <2.10.0
rapidfuzz>=3.0, <4.0
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
ConvertMedianToPercentileRule,
)
from dbt_semantic_interfaces.transformations.cumulative_type_params import SetCumulativeTypeParamsRule
from dbt_semantic_interfaces.transformations.default_granularity import SetDefaultGranularityRule
from dbt_semantic_interfaces.transformations.names import LowerCaseNamesRule
from dbt_semantic_interfaces.transformations.proxy_measure import CreateProxyMeasureRule
from dbt_semantic_interfaces.transformations.semantic_manifest_transformer import (
Expand Down Expand Up @@ -38,6 +39,7 @@ def parse_manifest_from_dbt_generated_manifest(manifest_json_string: str) -> Pyd
ConvertMedianToPercentileRule(),
DedupeMetricInputMeasuresRule(), # Remove once fix is in core
SetCumulativeTypeParamsRule(),
SetDefaultGranularityRule(),
),
)
model = PydanticSemanticManifestTransformer.transform(raw_model, rules)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ def filter_by_spec_patterns(self, spec_patterns: Sequence[SpecPattern]) -> Linka
"""
start_time = time.time()

# Spec patterns need all specs to match properly e.g. `BaseTimeGrainPattern`.
# Spec patterns need all specs to match properly e.g. `MinimumTimeGrainPattern`.
matching_specs: Sequence[InstanceSpec] = self.specs

for spec_pattern in spec_patterns:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,18 @@ def get_min_queryable_time_granularity(self, metric_reference: MetricReference)
minimum_queryable_granularity = defined_time_granularity

return minimum_queryable_granularity

def get_default_granularity_for_metrics(
self, metric_references: Sequence[MetricReference]
) -> Optional[TimeGranularity]:
"""When querying a group of metrics, the default granularity will be the largest of the metrics' default granularities."""
max_default_granularity: Optional[TimeGranularity] = None
for metric_reference in metric_references:
default_granularity = self.get_metric(metric_reference).default_granularity
assert (
default_granularity
), f"No default_granularity set for {metric_reference}. Something has been misconfigured."
if not max_default_granularity or default_granularity.to_int() > max_default_granularity.to_int():
max_default_granularity = default_granularity

return max_default_granularity
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def get_dimension(self, dimension_reference: DimensionReference) -> Dimension:

def get_time_dimension(self, time_dimension_reference: TimeDimensionReference) -> Dimension:
"""Retrieves a full dimension object by name."""
return self.get_dimension(dimension_reference=time_dimension_reference.dimension_reference())
return self.get_dimension(dimension_reference=time_dimension_reference.dimension_reference)

@property
def measure_references(self) -> Sequence[MeasureReference]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ def _resolve_specs_for_where_filters(
input_str=group_by_item_in_where_filter.object_builder_str,
spec_pattern=group_by_item_in_where_filter.spec_pattern,
resolution_node=current_node,
filter_location=filter_location,
)
# The paths in the issue set are generated relative to the current node. For error messaging, it seems more
# helpful for those paths to be relative to the query. To do, we have to add nodes from the resolution path.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from dbt_semantic_interfaces.call_parameter_sets import TimeDimensionCallParameterSet
from dbt_semantic_interfaces.naming.keywords import METRIC_TIME_ELEMENT_NAME
from dbt_semantic_interfaces.references import SemanticModelReference, TimeDimensionReference
from dbt_semantic_interfaces.references import MetricReference, SemanticModelReference, TimeDimensionReference
from dbt_semantic_interfaces.type_enums import TimeGranularity
from typing_extensions import override

Expand All @@ -20,14 +20,16 @@
PushDownResult,
_PushDownGroupByItemCandidatesVisitor,
)
from metricflow_semantics.query.group_by_item.filter_spec_resolution.filter_location import WhereFilterLocation
from metricflow_semantics.query.group_by_item.resolution_dag.dag import GroupByItemResolutionDag, ResolutionDagSinkNode
from metricflow_semantics.query.group_by_item.resolution_path import MetricFlowQueryResolutionPath
from metricflow_semantics.query.issues.group_by_item_resolver.ambiguous_group_by_item import AmbiguousGroupByItemIssue
from metricflow_semantics.query.issues.issues_base import (
MetricFlowQueryResolutionIssueSet,
)
from metricflow_semantics.query.suggestion_generator import QueryItemSuggestionGenerator
from metricflow_semantics.specs.patterns.base_time_grain import BaseTimeGrainPattern
from metricflow_semantics.query.suggestion_generator import QueryItemSuggestionGenerator, QueryPartForSuggestions
from metricflow_semantics.specs.patterns.metric_time_default_granularity import MetricTimeDefaultGranularityPattern
from metricflow_semantics.specs.patterns.min_time_grain import MinimumTimeGrainPattern
from metricflow_semantics.specs.patterns.no_group_by_metric import NoGroupByMetricPattern
from metricflow_semantics.specs.patterns.spec_pattern import SpecPattern
from metricflow_semantics.specs.patterns.typed_patterns import TimeDimensionPattern
Expand Down Expand Up @@ -80,11 +82,15 @@ def resolve_matching_item_for_querying(
self,
spec_pattern: SpecPattern,
suggestion_generator: Optional[QueryItemSuggestionGenerator],
queried_metrics: Sequence[MetricReference],
only_use_minimum_grain: bool = False,
) -> GroupByItemResolution:
"""Returns the spec that corresponds the one described by spec_pattern and is valid for the query.
"""Returns the spec that corresponds to the one described by spec_pattern and is valid for the query.

For queries, if the pattern matches to a spec for the same element at different grains, the spec with the finest
common grain is returned.
common grain is returned, unless the spec is metric_time, in which case the default grain is returned.

If only_use_minimum_grain is True, will use minimum grain instead of default for metric_time, too.
"""
push_down_visitor = _PushDownGroupByItemCandidatesVisitor(
manifest_lookup=self._manifest_lookup,
Expand All @@ -101,9 +107,18 @@ def resolve_matching_item_for_querying(
issue_set=push_down_result.issue_set,
)

push_down_result = push_down_result.filter_candidates_by_pattern(
BaseTimeGrainPattern(),
)
filters_to_use: Tuple[SpecPattern, ...] = (MinimumTimeGrainPattern(),)
if not only_use_minimum_grain:
# Default pattern must come first to avoid removing default grain options prematurely.
filters_to_use = (
MetricTimeDefaultGranularityPattern(
metric_lookup=self._manifest_lookup.metric_lookup, queried_metrics=queried_metrics
),
) + filters_to_use

for filter_to_use in filters_to_use:
push_down_result = push_down_result.filter_candidates_by_pattern(filter_to_use)

logger.info(
f"Spec pattern:\n"
f"{indent(mf_pformat(spec_pattern))}\n"
Expand Down Expand Up @@ -135,6 +150,7 @@ def resolve_matching_item_for_filters(
input_str: str,
spec_pattern: SpecPattern,
resolution_node: ResolutionDagSinkNode,
filter_location: WhereFilterLocation,
) -> GroupByItemResolution:
"""Returns the spec that matches the spec_pattern associated with the filter in the given node.

Expand All @@ -147,12 +163,22 @@ def resolve_matching_item_for_filters(
suggestion_generator = QueryItemSuggestionGenerator(
input_naming_scheme=ObjectBuilderNamingScheme(),
input_str=input_str,
candidate_filters=QueryItemSuggestionGenerator.FILTER_ITEM_CANDIDATE_FILTERS,
query_part=QueryPartForSuggestions.WHERE_FILTER,
metric_lookup=self._manifest_lookup.metric_lookup,
queried_metrics=filter_location.metric_references,
)

push_down_visitor = _PushDownGroupByItemCandidatesVisitor(
manifest_lookup=self._manifest_lookup,
source_spec_patterns=(spec_pattern, BaseTimeGrainPattern()),
source_spec_patterns=(
spec_pattern,
# MetricTimeDefaultGranularityPattern must come before MinimumTimeGrainPattern to ensure we don't remove the
# default grain from candiate set prematurely.
MetricTimeDefaultGranularityPattern(
metric_lookup=self._manifest_lookup.metric_lookup, queried_metrics=filter_location.metric_references
),
MinimumTimeGrainPattern(),
),
suggestion_generator=suggestion_generator,
)

Expand Down Expand Up @@ -210,16 +236,18 @@ def resolve_available_items(
issue_set=push_down_result.issue_set,
)

def resolve_min_metric_time_grain(self) -> TimeGranularity:
def resolve_min_metric_time_grain(self, metrics_in_query: Sequence[MetricReference]) -> TimeGranularity:
"""Returns the finest time grain of metric_time for querying."""
metric_time_grain_resolution = self.resolve_matching_item_for_querying(
spec_pattern=TimeDimensionPattern.from_call_parameter_set(
TimeDimensionCallParameterSet(
entity_path=(),
time_dimension_reference=TimeDimensionReference(element_name=METRIC_TIME_ELEMENT_NAME),
)
),
),
suggestion_generator=None,
queried_metrics=metrics_in_query,
only_use_minimum_grain=True,
)
metric_time_spec_set = (
group_specs_by_type((metric_time_grain_resolution.spec,))
Expand Down
39 changes: 22 additions & 17 deletions metricflow-semantics/metricflow_semantics/query/query_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
)
from dbt_semantic_interfaces.protocols import SavedQuery
from dbt_semantic_interfaces.protocols.where_filter import WhereFilter
from dbt_semantic_interfaces.references import SemanticModelReference
from dbt_semantic_interfaces.references import MetricReference, SemanticModelReference
from dbt_semantic_interfaces.type_enums import TimeGranularity

from metricflow_semantics.assert_one_arg import assert_at_most_one_arg_set
Expand Down Expand Up @@ -52,8 +52,8 @@
ResolverInputForQuery,
ResolverInputForQueryLevelWhereFilterIntersection,
)
from metricflow_semantics.specs.patterns.base_time_grain import BaseTimeGrainPattern
from metricflow_semantics.specs.patterns.metric_time_pattern import MetricTimePattern
from metricflow_semantics.specs.patterns.min_time_grain import MinimumTimeGrainPattern
from metricflow_semantics.specs.patterns.none_date_part import NoneDatePartPattern
from metricflow_semantics.specs.query_param_implementations import DimensionOrEntityParameter, MetricParameter
from metricflow_semantics.specs.query_spec import MetricFlowQuerySpec
Expand Down Expand Up @@ -147,13 +147,14 @@ def _get_saved_query(self, saved_query_parameter: SavedQueryParameter) -> SavedQ

return matching_saved_queries[0]

@staticmethod
def _metric_time_granularity(time_dimension_specs: Sequence[TimeDimensionSpec]) -> Optional[TimeGranularity]:
def _get_smallest_requested_metric_time_granularity(
self, time_dimension_specs: Sequence[TimeDimensionSpec]
) -> Optional[TimeGranularity]:
matching_specs: Sequence[InstanceSpec] = time_dimension_specs

for pattern_to_apply in (
MetricTimePattern(),
BaseTimeGrainPattern(),
MinimumTimeGrainPattern(),
NoneDatePartPattern(),
):
matching_specs = pattern_to_apply.match(matching_specs)
Expand All @@ -164,7 +165,7 @@ def _metric_time_granularity(time_dimension_specs: Sequence[TimeDimensionSpec])

assert (
len(time_dimension_specs) == 1
), f"Bug with BaseTimeGrainPattern - should have returned exactly 1 spec but got {time_dimension_specs}"
), f"Bug with MinimumTimeGrainPattern - should have returned exactly 1 spec but got {time_dimension_specs}"

return time_dimension_specs[0].time_granularity

Expand All @@ -173,19 +174,23 @@ def _adjust_time_constraint(
resolution_dag: GroupByItemResolutionDag,
time_dimension_specs_in_query: Sequence[TimeDimensionSpec],
time_constraint: TimeRangeConstraint,
metrics_in_query: Sequence[MetricReference],
) -> TimeRangeConstraint:
metric_time_granularity = MetricFlowQueryParser._metric_time_granularity(time_dimension_specs_in_query)
"""Change the time range so that the ends are at the ends of the requested time granularity windows.

e.g. [2020-01-15, 2020-2-15] with MONTH granularity -> [2020-01-01, 2020-02-29]
"""
metric_time_granularity = self._get_smallest_requested_metric_time_granularity(time_dimension_specs_in_query)
if metric_time_granularity is None:
# This indicates there were no metric time specs in the query, so use smallest available granularity for metric_time.
group_by_item_resolver = GroupByItemResolver(
manifest_lookup=self._manifest_lookup,
resolution_dag=resolution_dag,
)
metric_time_granularity = group_by_item_resolver.resolve_min_metric_time_grain()

"""Change the time range so that the ends are at the ends of the appropriate time granularity windows.
metric_time_granularity = group_by_item_resolver.resolve_min_metric_time_grain(
metrics_in_query=metrics_in_query
)

e.g. [2020-01-15, 2020-2-15] with MONTH granularity -> [2020-01-01, 2020-02-29]
"""
return self._time_period_adjuster.expand_time_constraint_to_fill_granularity(
time_constraint=time_constraint,
granularity=metric_time_granularity,
Expand Down Expand Up @@ -495,13 +500,13 @@ def _parse_and_validate_query(
resolution_dag=query_resolution.resolution_dag,
time_dimension_specs_in_query=query_spec.time_dimension_specs,
time_constraint=time_constraint,
metrics_in_query=tuple(
metric_resolver_input.spec_pattern.metric_reference
for metric_resolver_input in resolver_inputs_for_metrics
),
)
logger.info(f"Time constraint after adjustment is: {time_constraint}")

return ParseQueryResult(
query_spec=query_spec.with_time_range_constraint(time_constraint),
queried_semantic_models=query_resolution.queried_semantic_models,
)
query_spec = query_spec.with_time_range_constraint(time_constraint)

return ParseQueryResult(
query_spec=query_spec,
Expand Down
25 changes: 14 additions & 11 deletions metricflow-semantics/metricflow_semantics/query/query_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@
ResolverInputForQueryLevelWhereFilterIntersection,
ResolverInputForWhereFilterIntersection,
)
from metricflow_semantics.query.suggestion_generator import QueryItemSuggestionGenerator
from metricflow_semantics.query.suggestion_generator import QueryItemSuggestionGenerator, QueryPartForSuggestions
from metricflow_semantics.query.validation_rules.query_validator import PostResolutionQueryValidator
from metricflow_semantics.specs.patterns.match_list_pattern import MatchListSpecPattern
from metricflow_semantics.specs.query_spec import MetricFlowQuerySpec
from metricflow_semantics.specs.spec_classes import (
InstanceSpec,
Expand Down Expand Up @@ -149,25 +148,26 @@ def _resolve_has_metric_or_group_by_inputs(
)
return ResolveMetricOrGroupByItemsResult(input_to_issue_set_mapping=InputToIssueSetMapping.empty_instance())

@staticmethod
def _resolve_group_by_item_input(
self,
group_by_item_input: ResolverInputForGroupByItem,
group_by_item_resolver: GroupByItemResolver,
valid_group_by_item_specs_for_querying: Sequence[LinkableInstanceSpec],
queried_metrics: Sequence[MetricReference],
) -> GroupByItemResolution:
suggestion_generator = QueryItemSuggestionGenerator(
input_naming_scheme=group_by_item_input.input_obj_naming_scheme,
input_str=str(group_by_item_input.input_obj),
candidate_filters=QueryItemSuggestionGenerator.GROUP_BY_ITEM_CANDIDATE_FILTERS
+ (
MatchListSpecPattern(
listed_specs=valid_group_by_item_specs_for_querying,
),
),
query_part=QueryPartForSuggestions.GROUP_BY,
metric_lookup=self._manifest_lookup.metric_lookup,
queried_metrics=queried_metrics,
valid_group_by_item_specs_for_querying=valid_group_by_item_specs_for_querying,
)

return group_by_item_resolver.resolve_matching_item_for_querying(
spec_pattern=group_by_item_input.spec_pattern,
suggestion_generator=suggestion_generator,
queried_metrics=queried_metrics,
)

def _resolve_metric_inputs(
Expand All @@ -190,7 +190,9 @@ def _resolve_metric_inputs(
suggestion_generator = QueryItemSuggestionGenerator(
input_naming_scheme=MetricNamingScheme(),
input_str=str(metric_input.input_obj),
candidate_filters=(),
query_part=QueryPartForSuggestions.METRIC,
metric_lookup=self._manifest_lookup.metric_lookup,
queried_metrics=tuple(metric_input.spec_pattern.metric_reference for metric_input in metric_inputs),
)
metric_suggestions = suggestion_generator.input_suggestions(candidate_specs=available_metric_specs)
input_to_issue_set_mapping_items.append(
Expand Down Expand Up @@ -238,10 +240,11 @@ def _resolve_group_by_items_result(
group_by_item_specs: List[LinkableInstanceSpec] = []
linkable_element_sets: List[LinkableElementSet] = []
for group_by_item_input in group_by_item_inputs:
resolution = MetricFlowQueryResolver._resolve_group_by_item_input(
resolution = self._resolve_group_by_item_input(
group_by_item_resolver=group_by_item_resolver,
group_by_item_input=group_by_item_input,
valid_group_by_item_specs_for_querying=valid_group_by_item_specs_for_querying,
queried_metrics=metric_references,
)
if resolution.issue_set.has_issues:
input_to_issue_set_mapping_items.append(
Expand Down
Loading