Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DimensionLookup to break-up SemanticModelLookup #1487

Merged
merged 2 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Dict, Sequence

from dbt_semantic_interfaces.protocols import SemanticModel
from dbt_semantic_interfaces.references import DimensionReference
from dbt_semantic_interfaces.type_enums import DimensionType

from metricflow_semantics.mf_logging.lazy_formattable import LazyFormat


@dataclass(frozen=True)
class DimensionInvariant:
"""For a given manifest, all defined dimensions with the same name should have these same properties."""

dimension_type: DimensionType
is_partition: bool


class DimensionLookup:
"""Looks up properties related to dimensions."""

def __init__(self, semantic_models: Sequence[SemanticModel]) -> None: # noqa: D107
self._dimension_reference_to_invariant: Dict[DimensionReference, DimensionInvariant] = {}
for semantic_model in semantic_models:
for dimension in semantic_model.dimensions:
invariant = DimensionInvariant(
dimension_type=dimension.type,
is_partition=dimension.is_partition,
)
dimension_reference = dimension.reference
existing_invariant = self._dimension_reference_to_invariant.get(dimension_reference)
if existing_invariant is not None and existing_invariant != invariant:
raise ValueError(
str(
LazyFormat(
"Dimensions with the same name have been defined with conflicting values that "
"should have been the same in a given semantic manifest. This should have been caught "
"during validation.",
dimension_reference=dimension_reference,
existing_invariant=existing_invariant,
conflicting_invariant=invariant,
semantic_model_reference=semantic_model.reference,
)
)
)

self._dimension_reference_to_invariant[dimension_reference] = invariant

def get_invariant(self, dimension_reference: DimensionReference) -> DimensionInvariant:
"""Get invariants for the given dimension in the semantic manifest."""
# dimension_reference might be a TimeDimensionReference, so change types.
dimension_reference = DimensionReference(element_name=dimension_reference.element_name)
invariant = self._dimension_reference_to_invariant[dimension_reference]
if invariant is None:
raise ValueError(
str(
LazyFormat(
"Unknown dimension reference",
dimension_reference=dimension_reference,
known_dimension_references=list(self._dimension_reference_to_invariant.keys()),
)
)
)

return invariant
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from metricflow_semantics.errors.error_classes import InvalidSemanticModelError
from metricflow_semantics.mf_logging.lazy_formattable import LazyFormat
from metricflow_semantics.model.semantics.dimension_lookup import DimensionLookup
from metricflow_semantics.model.semantics.element_group import ElementGrouper
from metricflow_semantics.model.semantics.measure_lookup import MeasureLookup
from metricflow_semantics.model.semantics.semantic_model_helper import SemanticModelHelper
Expand Down Expand Up @@ -72,6 +73,7 @@ def __init__(self, model: SemanticManifest, custom_granularities: Dict[str, Expa
self._measure_reference_to_agg_time_dimension_specs: Dict[MeasureReference, Sequence[TimeDimensionSpec]] = {}

self._measure_lookup = MeasureLookup(sorted_semantic_models, custom_granularities)
self._dimension_lookup = DimensionLookup(sorted_semantic_models)

def get_dimension_references(self) -> Sequence[DimensionReference]:
"""Retrieve all dimension references from the collection of semantic models."""
Expand Down Expand Up @@ -323,3 +325,7 @@ def _get_defined_time_granularity(self, time_dimension_reference: TimeDimensionR
@property
def measure_lookup(self) -> MeasureLookup: # noqa: D102
return self._measure_lookup

@property
def dimension_lookup(self) -> DimensionLookup: # noqa: D102
return self._dimension_lookup
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from __future__ import annotations

import pytest
from _pytest.fixtures import FixtureRequest
from dbt_semantic_interfaces.implementations.semantic_manifest import PydanticSemanticManifest
from metricflow_semantics.model.semantics.dimension_lookup import DimensionLookup
from metricflow_semantics.test_helpers.config_helpers import MetricFlowTestConfiguration
from metricflow_semantics.test_helpers.snapshot_helpers import assert_object_snapshot_equal


@pytest.fixture(scope="module")
def dimension_lookup( # noqa: D103
partitioned_multi_hop_join_semantic_manifest: PydanticSemanticManifest,
) -> DimensionLookup:
return DimensionLookup(partitioned_multi_hop_join_semantic_manifest.semantic_models)


def test_get_invariant(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
partitioned_multi_hop_join_semantic_manifest: PydanticSemanticManifest,
dimension_lookup: DimensionLookup,
) -> None:
"""Test invariants for all dimensions.

Uses `partitioned_multi_hop_join_semantic_manifest` to show an example of different `is_partition` values.
"""
dimension_references = []
for semantic_model in partitioned_multi_hop_join_semantic_manifest.semantic_models:
for dimension in semantic_model.dimensions:
dimension_references.append(dimension.reference)

sorted_dimension_references = sorted(dimension_references)
result = {
dimension_reference.element_name: dimension_lookup.get_invariant(dimension_reference)
for dimension_reference in sorted_dimension_references
}
assert_object_snapshot_equal(
request=request, mf_test_configuration=mf_test_configuration, obj_id="obj_0", obj=result
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
'account_month': DimensionInvariant(dimension_type=CATEGORICAL, is_partition=False),
'acquired_ds': DimensionInvariant(dimension_type=TIME, is_partition=False),
'country': DimensionInvariant(dimension_type=CATEGORICAL, is_partition=False),
'customer_atomic_weight': DimensionInvariant(dimension_type=CATEGORICAL, is_partition=False),
'customer_name': DimensionInvariant(dimension_type=CATEGORICAL, is_partition=False),
'ds': DimensionInvariant(dimension_type=TIME, is_partition=False),
'ds_partitioned': DimensionInvariant(dimension_type=TIME, is_partition=True),
'extra_dim': DimensionInvariant(dimension_type=CATEGORICAL, is_partition=False),
'third_hop_ds': DimensionInvariant(dimension_type=TIME, is_partition=False),
'value': DimensionInvariant(dimension_type=CATEGORICAL, is_partition=False),
}
14 changes: 7 additions & 7 deletions metricflow/dataflow/builder/partitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ def __init__(self, semantic_model_lookup: SemanticModelLookup) -> None: # noqa:
def _get_partitions(self, spec_set: InstanceSpecSet) -> PartitionSpecSet:
"""Returns the specs from the instance set that correspond to partition specs."""
partition_dimension_specs = tuple(
x
for x in spec_set.dimension_specs
if self._semantic_model_lookup.get_dimension(dimension_reference=x.reference).is_partition
dimension_spec
for dimension_spec in spec_set.dimension_specs
if self._semantic_model_lookup.dimension_lookup.get_invariant(dimension_spec.reference).is_partition
)
partition_time_dimension_specs = tuple(
x
for x in spec_set.time_dimension_specs
if x.reference != DataSet.metric_time_dimension_reference()
and self._semantic_model_lookup.get_time_dimension(time_dimension_reference=x.reference).is_partition
time_dimension_spec
for time_dimension_spec in spec_set.time_dimension_specs
if time_dimension_spec.reference != DataSet.metric_time_dimension_reference()
and self._semantic_model_lookup.dimension_lookup.get_invariant(time_dimension_spec.reference).is_partition
)

return PartitionSpecSet(
Expand Down
Loading