Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Bug with get_defined_time_granularity() #1488

Merged
merged 3 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -270,23 +270,15 @@ def get_min_queryable_time_granularity(self, metric_reference: MetricReference)
return result

def _get_min_queryable_time_granularity(self, metric_reference: MetricReference) -> TimeGranularity:
agg_time_dimension_specs = self._get_agg_time_dimension_specs_for_metric(metric_reference)
assert (
agg_time_dimension_specs
), f"No agg_time_dimension found for metric {metric_reference}. Something has been misconfigured."

minimum_queryable_granularity = self._semantic_model_lookup.get_defined_time_granularity(
agg_time_dimension_specs[0].reference
)
if len(agg_time_dimension_specs) > 1:
for agg_time_dimension_spec in agg_time_dimension_specs[1:]:
defined_time_granularity = self._semantic_model_lookup.get_defined_time_granularity(
agg_time_dimension_spec.reference
)
if defined_time_granularity.to_int() > minimum_queryable_granularity.to_int():
minimum_queryable_granularity = defined_time_granularity
metric = self.get_metric(metric_reference)
agg_time_dimension_grains = set()
for input_measure in metric.input_measures:
measure_properties = self._semantic_model_lookup.measure_lookup.get_properties(
input_measure.measure_reference
)
agg_time_dimension_grains.add(measure_properties.agg_time_granularity)

return minimum_queryable_granularity
return max(agg_time_dimension_grains, key=lambda time_granularity: time_granularity.to_int())

def get_joinable_scd_specs_for_metric(self, metric_reference: MetricReference) -> Sequence[LinkableInstanceSpec]:
"""Get the SCDs that can be joined to a metric."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@

import logging

import pytest
from dbt_semantic_interfaces.references import MetricReference
from dbt_semantic_interfaces.type_enums import TimeGranularity
from metricflow_semantics.model.semantic_manifest_lookup import SemanticManifestLookup

logger = logging.getLogger(__name__)


@pytest.mark.skip("get_min_queryable_time_granularity has a bug with agg. time dimensions at different grains.")
def test_min_queryable_time_granularity_for_different_agg_time_grains( # noqa: D103
extended_date_semantic_manifest_lookup: SemanticManifestLookup,
) -> None:
Expand Down
37 changes: 20 additions & 17 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -980,9 +980,11 @@ def _build_measure_spec_properties(self, measure_specs: Sequence[MeasureSpec]) -
)
semantic_model_name = semantic_model_names.pop()

agg_time_dimension = self._semantic_model_lookup.measure_lookup.get_properties(
measure_specs[0].reference
).agg_time_dimension_reference
measure_reference = measure_specs[0].reference
measure_properties = self._semantic_model_lookup.measure_lookup.get_properties(measure_reference)
agg_time_dimension = measure_properties.agg_time_dimension_reference
agg_time_dimension_grain = measure_properties.agg_time_granularity

non_additive_dimension_spec = measure_specs[0].non_additive_dimension_spec
for measure_spec in measure_specs:
if non_additive_dimension_spec != measure_spec.non_additive_dimension_spec:
Expand All @@ -996,6 +998,7 @@ def _build_measure_spec_properties(self, measure_specs: Sequence[MeasureSpec]) -
measure_specs=tuple(measure_specs),
semantic_model_name=semantic_model_name,
agg_time_dimension=agg_time_dimension,
agg_time_dimension_grain=agg_time_dimension_grain,
non_additive_dimension_spec=non_additive_dimension_spec,
)

Expand Down Expand Up @@ -1464,7 +1467,7 @@ def __get_required_and_extraneous_linkable_specs(
self,
queried_linkable_specs: LinkableSpecSet,
filter_specs: Sequence[WhereFilterSpec],
non_additive_dimension_spec: Optional[NonAdditiveDimensionSpec] = None,
measure_spec_properties: Optional[MeasureSpecProperties] = None,
) -> Tuple[LinkableSpecSet, LinkableSpecSet]:
"""Get the required and extraneous linkable specs for this query.

Expand All @@ -1474,15 +1477,18 @@ def __get_required_and_extraneous_linkable_specs(
linkable_spec_sets_to_merge: List[LinkableSpecSet] = []
for filter_spec in filter_specs:
linkable_spec_sets_to_merge.append(LinkableSpecSet.create_from_specs(filter_spec.linkable_specs))
if non_additive_dimension_spec:
non_additive_dimension_grain = self._semantic_model_lookup.get_defined_time_granularity(
TimeDimensionReference(non_additive_dimension_spec.name)

if measure_spec_properties is not None:
non_additive_dimension_spec = (
measure_spec_properties.non_additive_dimension_spec if measure_spec_properties else None
)
linkable_spec_sets_to_merge.append(
LinkableSpecSet.create_from_specs(
non_additive_dimension_spec.linkable_specs(non_additive_dimension_grain)
if non_additive_dimension_spec is not None:
agg_time_dimension_grain = measure_spec_properties.agg_time_dimension_grain
linkable_spec_sets_to_merge.append(
LinkableSpecSet.create_from_specs(
non_additive_dimension_spec.linkable_specs(agg_time_dimension_grain)
)
)
)

extraneous_linkable_specs = LinkableSpecSet.merge_iterable(linkable_spec_sets_to_merge).dedupe()
required_linkable_specs = queried_linkable_specs.merge(extraneous_linkable_specs).dedupe()
Expand Down Expand Up @@ -1515,8 +1521,6 @@ def _build_aggregated_measure_from_measure_source_node(
else None
)
measure_properties = self._build_measure_spec_properties([measure_spec])
non_additive_dimension_spec = measure_properties.non_additive_dimension_spec

cumulative_metric_adjusted_time_constraint: Optional[TimeRangeConstraint] = None
if cumulative and predicate_pushdown_state.time_range_constraint is not None:
logger.debug(
Expand Down Expand Up @@ -1545,7 +1549,7 @@ def _build_aggregated_measure_from_measure_source_node(
required_linkable_specs, extraneous_linkable_specs = self.__get_required_and_extraneous_linkable_specs(
queried_linkable_specs=queried_linkable_specs,
filter_specs=metric_input_measure_spec.filter_spec_set.all_filter_specs,
non_additive_dimension_spec=non_additive_dimension_spec,
measure_spec_properties=measure_properties,
)

before_aggregation_time_spine_join_description = (
Expand Down Expand Up @@ -1700,12 +1704,11 @@ def _build_aggregated_measure_from_measure_source_node(
where_specs=metric_input_measure_spec.filter_spec_set.all_filter_specs,
)

non_additive_dimension_spec = measure_properties.non_additive_dimension_spec
if non_additive_dimension_spec is not None:
# Apply semi additive join on the node
agg_time_dimension = measure_properties.agg_time_dimension
non_additive_dimension_grain = self._semantic_model_lookup.get_defined_time_granularity(
TimeDimensionReference(non_additive_dimension_spec.name)
)
non_additive_dimension_grain = measure_properties.agg_time_dimension_grain
queried_time_dimension_spec: Optional[
TimeDimensionSpec
] = self._find_non_additive_dimension_in_linkable_specs(
Expand Down
2 changes: 2 additions & 0 deletions metricflow/dataflow/builder/measure_spec_properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Optional, Sequence

from dbt_semantic_interfaces.references import TimeDimensionReference
from dbt_semantic_interfaces.type_enums import TimeGranularity
from metricflow_semantics.specs.measure_spec import MeasureSpec
from metricflow_semantics.specs.non_additive_dimension_spec import NonAdditiveDimensionSpec

Expand All @@ -15,4 +16,5 @@ class MeasureSpecProperties:
measure_specs: Sequence[MeasureSpec]
semantic_model_name: str
agg_time_dimension: TimeDimensionReference
agg_time_dimension_grain: TimeGranularity
non_additive_dimension_spec: Optional[NonAdditiveDimensionSpec] = None
23 changes: 11 additions & 12 deletions tests_metricflow/integration/test_cases/itest_dimensions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -157,18 +157,17 @@ integration_test:
GROUP BY
v.ds
---
# TODO: Test is currently broken.
#integration_test:
# name: query_non_default_time_dimension_without_granularity
# description: Query just a time dimension, no granularity specified. Should assume default granularity for dimension.
# model: EXTENDED_DATE_MODEL
# group_bys: [ "booking__monthly_ds"]
# check_query: |
# SELECT
# ds AS booking__monthly_ds__month
# FROM {{ source_schema }}.fct_bookings_extended_monthly
# GROUP BY
# ds
integration_test:
name: query_non_default_time_dimension_without_granularity
description: Query just a time dimension, no granularity specified. Should assume default granularity for dimension.
model: EXTENDED_DATE_MODEL
group_bys: [ "booking_monthly__ds"]
check_query: |
SELECT
ds AS booking_monthly__ds__month
FROM {{ source_schema }}.fct_bookings_extended_monthly
GROUP BY
ds
---
integration_test:
name: query_dimension_with_constraint_from_diff_source
Expand Down
Loading