Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb committed Jul 2, 2024
1 parent 10c4007 commit aac89e0
Show file tree
Hide file tree
Showing 13 changed files with 248 additions and 425 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,18 @@ def get_min_queryable_time_granularity(self, metric_reference: MetricReference)
minimum_queryable_granularity = defined_time_granularity

return minimum_queryable_granularity

def get_default_granularity_for_metrics(
self, metric_references: Sequence[MetricReference]
) -> Optional[TimeGranularity]:
"""When querying a group of metrics, the default granularity will be the largest of the metrics' default granularities."""
max_default_granularity: Optional[TimeGranularity] = None
for metric_reference in metric_references:
default_granularity = self.get_metric(metric_reference).default_granularity
assert (
default_granularity
), f"No default_granularity set for {metric_reference}. Something has been misconfigured."
if not max_default_granularity or default_granularity.to_int() > max_default_granularity.to_int():
max_default_granularity = default_granularity

return max_default_granularity
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ def _resolve_specs_for_where_filters(
input_str=group_by_item_in_where_filter.object_builder_str,
spec_pattern=group_by_item_in_where_filter.spec_pattern,
resolution_node=current_node,
filter_location=filter_location,
)
# The paths in the issue set are generated relative to the current node. For error messaging, it seems more
# helpful for those paths to be relative to the query. To do, we have to add nodes from the resolution path.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from dbt_semantic_interfaces.call_parameter_sets import TimeDimensionCallParameterSet
from dbt_semantic_interfaces.naming.keywords import METRIC_TIME_ELEMENT_NAME
from dbt_semantic_interfaces.references import SemanticModelReference, TimeDimensionReference
from dbt_semantic_interfaces.references import MetricReference, SemanticModelReference, TimeDimensionReference
from dbt_semantic_interfaces.type_enums import TimeGranularity
from typing_extensions import override

Expand All @@ -20,13 +20,14 @@
PushDownResult,
_PushDownGroupByItemCandidatesVisitor,
)
from metricflow_semantics.query.group_by_item.filter_spec_resolution.filter_location import WhereFilterLocation
from metricflow_semantics.query.group_by_item.resolution_dag.dag import GroupByItemResolutionDag, ResolutionDagSinkNode
from metricflow_semantics.query.group_by_item.resolution_path import MetricFlowQueryResolutionPath
from metricflow_semantics.query.issues.group_by_item_resolver.ambiguous_group_by_item import AmbiguousGroupByItemIssue
from metricflow_semantics.query.issues.issues_base import (
MetricFlowQueryResolutionIssueSet,
)
from metricflow_semantics.query.suggestion_generator import QueryItemSuggestionGenerator
from metricflow_semantics.query.suggestion_generator import QueryItemSuggestionGenerator, QueryPartForSuggestions
from metricflow_semantics.specs.patterns.base_time_grain import DefaultTimeGranularityPattern
from metricflow_semantics.specs.patterns.no_group_by_metric import NoGroupByMetricPattern
from metricflow_semantics.specs.patterns.spec_pattern import SpecPattern
Expand Down Expand Up @@ -80,6 +81,7 @@ def resolve_matching_item_for_querying(
self,
spec_pattern: SpecPattern,
suggestion_generator: Optional[QueryItemSuggestionGenerator],
queried_metrics: Sequence[MetricReference],
) -> GroupByItemResolution:
"""Returns the spec that corresponds the one described by spec_pattern and is valid for the query.
Expand All @@ -102,7 +104,11 @@ def resolve_matching_item_for_querying(
)

push_down_result = push_down_result.filter_candidates_by_pattern(
DefaultTimeGranularityPattern(),
DefaultTimeGranularityPattern(
metric_lookup=self._manifest_lookup.metric_lookup,
only_apply_for_metric_time=False,
queried_metrics=queried_metrics,
),
)
logger.info(
f"Spec pattern:\n"
Expand Down Expand Up @@ -135,6 +141,7 @@ def resolve_matching_item_for_filters(
input_str: str,
spec_pattern: SpecPattern,
resolution_node: ResolutionDagSinkNode,
filter_location: WhereFilterLocation,
) -> GroupByItemResolution:
"""Returns the spec that matches the spec_pattern associated with the filter in the given node.
Expand All @@ -147,12 +154,21 @@ def resolve_matching_item_for_filters(
suggestion_generator = QueryItemSuggestionGenerator(
input_naming_scheme=ObjectBuilderNamingScheme(),
input_str=input_str,
candidate_filters=QueryItemSuggestionGenerator.FILTER_ITEM_CANDIDATE_FILTERS,
query_part=QueryPartForSuggestions.WHERE_FILTER,
metric_lookup=self._manifest_lookup.metric_lookup,
queried_metrics=filter_location.metric_references,
)

push_down_visitor = _PushDownGroupByItemCandidatesVisitor(
manifest_lookup=self._manifest_lookup,
source_spec_patterns=(spec_pattern, DefaultTimeGranularityPattern()),
source_spec_patterns=(
spec_pattern,
DefaultTimeGranularityPattern(
metric_lookup=self._manifest_lookup.metric_lookup,
only_apply_for_metric_time=False,
queried_metrics=filter_location.metric_references,
),
),
suggestion_generator=suggestion_generator,
)

Expand Down Expand Up @@ -210,8 +226,8 @@ def resolve_available_items(
issue_set=push_down_result.issue_set,
)

def resolve_min_metric_time_grain(self) -> TimeGranularity:
"""Returns the finest time grain of metric_time for querying."""
def resolve_default_metric_time_grain(self, metrics_in_query: Sequence[MetricReference]) -> TimeGranularity:
"""Returns the default time grain of metric_time for querying."""
metric_time_grain_resolution = self.resolve_matching_item_for_querying(
spec_pattern=TimeDimensionPattern.from_call_parameter_set(
TimeDimensionCallParameterSet(
Expand All @@ -220,6 +236,7 @@ def resolve_min_metric_time_grain(self) -> TimeGranularity:
)
),
suggestion_generator=None,
queried_metrics=metrics_in_query,
)
metric_time_spec_set = (
group_specs_by_type((metric_time_grain_resolution.spec,))
Expand Down
24 changes: 18 additions & 6 deletions metricflow-semantics/metricflow_semantics/query/query_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
)
from dbt_semantic_interfaces.protocols import SavedQuery
from dbt_semantic_interfaces.protocols.where_filter import WhereFilter
from dbt_semantic_interfaces.references import SemanticModelReference
from dbt_semantic_interfaces.references import MetricReference, SemanticModelReference
from dbt_semantic_interfaces.type_enums import TimeGranularity

from metricflow_semantics.assert_one_arg import assert_at_most_one_arg_set
Expand Down Expand Up @@ -147,13 +147,16 @@ def _get_saved_query(self, saved_query_parameter: SavedQueryParameter) -> SavedQ

return matching_saved_queries[0]

@staticmethod
def _metric_time_granularity(time_dimension_specs: Sequence[TimeDimensionSpec]) -> Optional[TimeGranularity]:
def _metric_time_granularity(
self, time_dimension_specs: Sequence[TimeDimensionSpec], queried_metrics: Sequence[MetricReference]
) -> Optional[TimeGranularity]:
matching_specs: Sequence[InstanceSpec] = time_dimension_specs

for pattern_to_apply in (
MetricTimePattern(),
DefaultTimeGranularityPattern(),
DefaultTimeGranularityPattern(
metric_lookup=self._manifest_lookup.metric_lookup, queried_metrics=queried_metrics
),
NoneDatePartPattern(),
):
matching_specs = pattern_to_apply.match(matching_specs)
Expand All @@ -173,14 +176,19 @@ def _adjust_time_constraint(
resolution_dag: GroupByItemResolutionDag,
time_dimension_specs_in_query: Sequence[TimeDimensionSpec],
time_constraint: TimeRangeConstraint,
metrics_in_query: Sequence[MetricReference],
) -> TimeRangeConstraint:
metric_time_granularity = MetricFlowQueryParser._metric_time_granularity(time_dimension_specs_in_query)
metric_time_granularity = self._metric_time_granularity(
time_dimension_specs=time_dimension_specs_in_query, queried_metrics=metrics_in_query
)
if metric_time_granularity is None:
group_by_item_resolver = GroupByItemResolver(
manifest_lookup=self._manifest_lookup,
resolution_dag=resolution_dag,
)
metric_time_granularity = group_by_item_resolver.resolve_min_metric_time_grain()
metric_time_granularity = group_by_item_resolver.resolve_default_metric_time_grain(
metrics_in_query=metrics_in_query
)

"""Change the time range so that the ends are at the ends of the appropriate time granularity windows.
Expand Down Expand Up @@ -495,6 +503,10 @@ def _parse_and_validate_query(
resolution_dag=query_resolution.resolution_dag,
time_dimension_specs_in_query=query_spec.time_dimension_specs,
time_constraint=time_constraint,
metrics_in_query=tuple(
metric_resolver_input.spec_pattern.metric_reference
for metric_resolver_input in resolver_inputs_for_metrics
),
)
logger.info(f"Time constraint after adjustment is: {time_constraint}")

Expand Down
25 changes: 14 additions & 11 deletions metricflow-semantics/metricflow_semantics/query/query_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@
ResolverInputForQueryLevelWhereFilterIntersection,
ResolverInputForWhereFilterIntersection,
)
from metricflow_semantics.query.suggestion_generator import QueryItemSuggestionGenerator
from metricflow_semantics.query.suggestion_generator import QueryItemSuggestionGenerator, QueryPartForSuggestions
from metricflow_semantics.query.validation_rules.query_validator import PostResolutionQueryValidator
from metricflow_semantics.specs.patterns.match_list_pattern import MatchListSpecPattern
from metricflow_semantics.specs.query_spec import MetricFlowQuerySpec
from metricflow_semantics.specs.spec_classes import (
InstanceSpec,
Expand Down Expand Up @@ -149,25 +148,26 @@ def _resolve_has_metric_or_group_by_inputs(
)
return ResolveMetricOrGroupByItemsResult(input_to_issue_set_mapping=InputToIssueSetMapping.empty_instance())

@staticmethod
def _resolve_group_by_item_input(
self,
group_by_item_input: ResolverInputForGroupByItem,
group_by_item_resolver: GroupByItemResolver,
valid_group_by_item_specs_for_querying: Sequence[LinkableInstanceSpec],
queried_metrics: Sequence[MetricReference],
) -> GroupByItemResolution:
suggestion_generator = QueryItemSuggestionGenerator(
input_naming_scheme=group_by_item_input.input_obj_naming_scheme,
input_str=str(group_by_item_input.input_obj),
candidate_filters=QueryItemSuggestionGenerator.GROUP_BY_ITEM_CANDIDATE_FILTERS
+ (
MatchListSpecPattern(
listed_specs=valid_group_by_item_specs_for_querying,
),
),
query_part=QueryPartForSuggestions.GROUP_BY,
metric_lookup=self._manifest_lookup.metric_lookup,
queried_metrics=queried_metrics,
valid_group_by_item_specs_for_querying=valid_group_by_item_specs_for_querying,
)

return group_by_item_resolver.resolve_matching_item_for_querying(
spec_pattern=group_by_item_input.spec_pattern,
suggestion_generator=suggestion_generator,
queried_metrics=queried_metrics,
)

def _resolve_metric_inputs(
Expand All @@ -190,7 +190,9 @@ def _resolve_metric_inputs(
suggestion_generator = QueryItemSuggestionGenerator(
input_naming_scheme=MetricNamingScheme(),
input_str=str(metric_input.input_obj),
candidate_filters=(),
query_part=QueryPartForSuggestions.METRIC,
metric_lookup=self._manifest_lookup.metric_lookup,
queried_metrics=tuple(metric_input.spec_pattern.metric_reference for metric_input in metric_inputs),
)
metric_suggestions = suggestion_generator.input_suggestions(candidate_specs=available_metric_specs)
input_to_issue_set_mapping_items.append(
Expand Down Expand Up @@ -238,10 +240,11 @@ def _resolve_group_by_items_result(
group_by_item_specs: List[LinkableInstanceSpec] = []
linkable_element_sets: List[LinkableElementSet] = []
for group_by_item_input in group_by_item_inputs:
resolution = MetricFlowQueryResolver._resolve_group_by_item_input(
resolution = self._resolve_group_by_item_input(
group_by_item_resolver=group_by_item_resolver,
group_by_item_input=group_by_item_input,
valid_group_by_item_specs_for_querying=valid_group_by_item_specs_for_querying,
queried_metrics=metric_references,
)
if resolution.issue_set.has_issues:
input_to_issue_set_mapping_items.append(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,33 @@
from __future__ import annotations

import logging
from typing import Sequence, Tuple
from enum import Enum
from typing import Optional, Sequence, Tuple

from dbt_semantic_interfaces.enum_extension import assert_values_exhausted
from dbt_semantic_interfaces.references import MetricReference

from metricflow_semantics.model.semantics.metric_lookup import MetricLookup
from metricflow_semantics.naming.naming_scheme import QueryItemNamingScheme
from metricflow_semantics.query.similarity import top_fuzzy_matches
from metricflow_semantics.specs.patterns.base_time_grain import DefaultTimeGranularityPattern
from metricflow_semantics.specs.patterns.match_list_pattern import MatchListSpecPattern
from metricflow_semantics.specs.patterns.no_group_by_metric import NoGroupByMetricPattern
from metricflow_semantics.specs.patterns.none_date_part import NoneDatePartPattern
from metricflow_semantics.specs.patterns.spec_pattern import SpecPattern
from metricflow_semantics.specs.spec_classes import InstanceSpec
from metricflow_semantics.specs.spec_classes import InstanceSpec, LinkableInstanceSpec

logger = logging.getLogger(__name__)


class QueryPartForSuggestions(Enum):
"""Indicates which type of query parameter is being suggested."""

WHERE_FILTER = "where_filter"
GROUP_BY = "group_by"
METRIC = "metric"


class QueryItemSuggestionGenerator:
"""Returns specs that partially match a spec pattern created from user input. Used for suggestions in errors.
Expand All @@ -22,29 +36,66 @@ class QueryItemSuggestionGenerator:
a candidate filter is not needed as any available spec at a resolution node can be used.
"""

# Adding these filters so that we don't get multiple suggestions that are similar, but with different
# grains. Some additional thought is needed to tweak this as the base grain may not be the best suggestion.
FILTER_ITEM_CANDIDATE_FILTERS: Tuple[SpecPattern, ...] = (DefaultTimeGranularityPattern(), NoneDatePartPattern())
GROUP_BY_ITEM_CANDIDATE_FILTERS: Tuple[SpecPattern, ...] = (
DefaultTimeGranularityPattern(),
NoneDatePartPattern(),
NoGroupByMetricPattern(),
)

def __init__( # noqa: D107
self, input_naming_scheme: QueryItemNamingScheme, input_str: str, candidate_filters: Sequence[SpecPattern]
self,
input_naming_scheme: QueryItemNamingScheme,
input_str: str,
query_part: QueryPartForSuggestions,
metric_lookup: MetricLookup,
queried_metrics: Sequence[MetricReference],
valid_group_by_item_specs_for_querying: Optional[Sequence[LinkableInstanceSpec]] = None,
) -> None:
self._input_naming_scheme = input_naming_scheme
self._input_str = input_str
self._candidate_filters = candidate_filters
self._query_part = query_part
self._metric_lookup = metric_lookup
self._queried_metrics = queried_metrics
self._valid_group_by_item_specs_for_querying = valid_group_by_item_specs_for_querying

if self._query_part is QueryPartForSuggestions.GROUP_BY and valid_group_by_item_specs_for_querying is None:
raise ValueError(
"QueryItemSuggestionGenerator requires valid_group_by_item_specs_for_querying param when used on group by items."
)

@property
def candidate_filters(self) -> Tuple[SpecPattern, ...]:
"""Filters to apply before determining suggestions.
These eensure we don't get multiple suggestions that are similar, but with different grains or date_parts.
"""
default_filters = (
DefaultTimeGranularityPattern(
metric_lookup=self._metric_lookup,
only_apply_for_metric_time=False,
queried_metrics=self._queried_metrics,
),
NoneDatePartPattern(),
)
if self._query_part is QueryPartForSuggestions.WHERE_FILTER:
return default_filters
elif self._query_part is QueryPartForSuggestions.GROUP_BY:
assert self._valid_group_by_item_specs_for_querying, (
"Group by suggestions require valid_group_by_item_specs_for_querying param."
"This should have been validated on init."
)
return default_filters + (
NoGroupByMetricPattern(),
MatchListSpecPattern(
listed_specs=self._valid_group_by_item_specs_for_querying,
),
)
elif self._query_part is QueryPartForSuggestions.METRIC:
return ()
else:
assert_values_exhausted(self._query_part)

def input_suggestions(
self,
candidate_specs: Sequence[InstanceSpec],
max_suggestions: int = 6,
) -> Sequence[str]:
"""Return the best specs that match the given pattern from candidate_specs and match the candidate_filer."""
for candidate_filter in self._candidate_filters:
"""Return the best specs that match the given pattern from candidate_specs and match the candidate_filter."""
for candidate_filter in self.candidate_filters:
candidate_specs = candidate_filter.match(candidate_specs)

# Use edit distance to figure out the closest matches, so convert the specs to strings.
Expand Down
Loading

0 comments on commit aac89e0

Please sign in to comment.