Skip to content

Commit

Permalink
WIP wut.
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb committed Oct 23, 2024
1 parent 4bdd2fa commit 8bb2430
Show file tree
Hide file tree
Showing 14 changed files with 146 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,11 @@ def __init__(
self._measure_non_additive_dimension_specs: Dict[MeasureReference, NonAdditiveDimensionSpec] = {}
self._dimension_index: Dict[DimensionReference, List[SemanticModel]] = {}
self._entity_index: Dict[EntityReference, List[SemanticModel]] = {}
self._primary_entity_index: Dict[EntityReference, List[SemanticModel]] = {}

# TODO: remove this dict
self._dimension_ref_to_spec: Dict[DimensionReference, DimensionSpec] = {}

self._entity_ref_to_spec: Dict[EntityReference, EntitySpec] = {}

self._semantic_model_to_aggregation_time_dimensions: Dict[
Expand Down Expand Up @@ -89,29 +92,6 @@ def get_dimension_from_semantic_model(
f"No dimension with name '{dimension_reference.element_name}' in semantic_model '{semantic_model.name}'."
)

# TODO: remove this method
def get_dimension(self, dimension_reference: DimensionReference) -> Dimension:
"""Retrieves a full dimension object by name."""
# If the reference passed is a TimeDimensionReference, convert to DimensionReference.
dimension_reference = DimensionReference(dimension_reference.element_name)

semantic_models = self._dimension_index.get(dimension_reference)
if not semantic_models:
raise ValueError(
f"Could not find dimension with name '{dimension_reference.element_name}' in configured semantic models"
)

return SemanticModelLookup.get_dimension_from_semantic_model(
# Dimension object should match across semantic models, so just use the first semantic model.
semantic_model=semantic_models[0],
dimension_reference=dimension_reference,
)

# TODO: remove this method
def get_time_dimension(self, time_dimension_reference: TimeDimensionReference) -> Dimension:
"""Retrieves a full dimension object by name."""
return self.get_dimension(dimension_reference=time_dimension_reference.dimension_reference)

@property
def measure_references(self) -> Sequence[MeasureReference]:
"""Return all measure references from the collection of semantic models."""
Expand Down Expand Up @@ -277,19 +257,18 @@ def _add_semantic_model(self, semantic_model: SemanticModel) -> None:
)
)

# TODO: Construct these specs correctly. All of the time dimension specs have the default granularity
self._dimension_ref_to_spec[dim.time_dimension_reference or dim.reference] = (
TimeDimensionSpec(element_name=dim.name, entity_links=())
if dim.type is DimensionType.TIME
else DimensionSpec(element_name=dim.name, entity_links=())
)

for entity in semantic_model.entities:
semantic_models_for_entity = self._entity_index.get(entity.reference, []) + [semantic_model]
self._entity_index[entity.reference] = semantic_models_for_entity

self._entity_ref_to_spec[entity.reference] = EntitySpec(element_name=entity.name, entity_links=())

primary_entity = self.resolved_primary_entity(semantic_model)
if primary_entity:
self._primary_entity_index[primary_entity] = self._primary_entity_index.get(primary_entity, []) + [
semantic_model
]

self._semantic_model_reference_to_semantic_model[semantic_model.reference] = semantic_model

def get_primary_entity_else_error(self, semantic_model: SemanticModel) -> EntityReference:
Expand Down Expand Up @@ -375,6 +354,7 @@ def entity_links_for_local_elements(semantic_model: SemanticModel) -> Sequence[E

return sorted(possible_entity_links, key=lambda entity_reference: entity_reference.element_name)

# TODO: remove this method
def get_element_spec_for_name(self, element_name: str) -> LinkableInstanceSpec:
"""Returns the spec for the given name of a linkable element (dimension or entity)."""
if TimeDimensionReference(element_name=element_name) in self._dimension_ref_to_spec:
Expand Down Expand Up @@ -449,3 +429,38 @@ def _get_defined_time_granularity(
defined_time_granularity = time_dimension.type_params.time_granularity

return defined_time_granularity

def get_semantic_models_for_primary_entity(self, entity_reference: EntityReference) -> List[SemanticModel]:
"""Return all semantic models associated with the primary entity reference."""
return self._primary_entity_index.get(entity_reference, [])

def get_semantic_model_for_dimension(
self, dimension_reference: DimensionReference, entity_links: Sequence[EntityReference]
) -> SemanticModel:
"""Use the entity links proveded to deterine the semantic model where this dimension is defined."""
if not entity_links:
raise ValueError(
f"No entity links received for dimension {dimension_reference}. "
"Entity links are required to determine semantic model for dimension."
)
primary_entity = entity_links[-1]
semantic_models = self.get_semantic_models_for_entity(primary_entity)
for semantic_model in semantic_models:
try:
self.get_dimension_from_semantic_model(
semantic_model=semantic_model, dimension_reference=dimension_reference
)
return semantic_model
except ValueError:
continue
raise ValueError(
f"Could not find dimension {dimension_reference} in any semantic model associated with primary entity {primary_entity}."
)

def dimension_is_partition(self, dimension_spec: DimensionSpec) -> bool: # noqa: D102
semantic_model = self.get_semantic_model_for_dimension(
dimension_reference=dimension_spec.reference, entity_links=dimension_spec.entity_links
)
return self.get_dimension_from_semantic_model(
dimension_reference=dimension_spec.reference, semantic_model=semantic_model
).is_partition
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def without_first_entity_link(self) -> DimensionSpec: # noqa: D102

@property
def without_entity_links(self) -> DimensionSpec: # noqa: D102
# TODO - check if this is acceptable?
return DimensionSpec(element_name=self.element_name, entity_links=())

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Any, Sequence, Tuple

from dbt_semantic_interfaces.dataclass_serialization import SerializableDataclass
from dbt_semantic_interfaces.references import EntityReference
from dbt_semantic_interfaces.type_enums import AggregationType, TimeGranularity

from metricflow_semantics.mf_logging.lazy_formattable import LazyFormat
Expand Down Expand Up @@ -57,7 +58,9 @@ def bucket_hash(self) -> str:
values.extend(sorted(self.window_groupings))
return hash_items(values)

def linkable_specs(self, non_additive_dimension_grain: TimeGranularity) -> Sequence[LinkableInstanceSpec]:
def linkable_specs(
self, non_additive_dimension_grain: TimeGranularity, primary_entity: EntityReference
) -> Sequence[LinkableInstanceSpec]:
"""Return the set of linkable specs referenced by the NonAdditiveDimensionSpec.
In practice, the name will always point to a time dimension. This method requires the time granularity
Expand All @@ -67,7 +70,7 @@ def linkable_specs(self, non_additive_dimension_grain: TimeGranularity) -> Seque
return (
TimeDimensionSpec(
element_name=self.name,
entity_links=(),
entity_links=(primary_entity,),
time_granularity=ExpandedTimeGranularity.from_time_granularity(non_additive_dimension_grain),
),
) + tuple(LinklessEntitySpec.from_element_name(entity_name) for entity_name in self.window_groupings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def without_first_entity_link(self) -> TimeDimensionSpec: # noqa: D102
date_part=self.date_part,
)

# TODO - can we remove this method?
@property
def without_entity_links(self) -> TimeDimensionSpec: # noqa: D102
return TimeDimensionSpec(
Expand Down Expand Up @@ -138,12 +139,6 @@ def element_path_key(self) -> ElementPathKey:
date_part=self.date_part,
)

# TODO: remove this method
@staticmethod
def from_reference(reference: TimeDimensionReference) -> TimeDimensionSpec:
"""Initialize from a time dimension reference instance."""
return TimeDimensionSpec(entity_links=(), element_name=reference.element_name)

def accept(self, visitor: InstanceSpecVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D102
return visitor.visit_time_dimension_spec(self)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@ def test_get_names( # noqa: D103


def test_get_elements(semantic_model_lookup: SemanticModelLookup) -> None: # noqa: D103
for dimension_reference in semantic_model_lookup.get_dimension_references():
assert (
semantic_model_lookup.get_dimension(dimension_reference=dimension_reference).reference
== dimension_reference
)
for measure_reference in semantic_model_lookup.measure_references:
measure_reference = MeasureReference(element_name=measure_reference.element_name)
assert semantic_model_lookup.get_measure(measure_reference=measure_reference).reference == measure_reference
Expand Down
50 changes: 33 additions & 17 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@
MetricTimeWindow,
MetricType,
)
from dbt_semantic_interfaces.references import MetricReference, SemanticModelElementReference, TimeDimensionReference
from dbt_semantic_interfaces.references import (
MetricReference,
SemanticModelElementReference,
SemanticModelReference,
TimeDimensionReference,
)
from dbt_semantic_interfaces.type_enums.time_granularity import TimeGranularity
from dbt_semantic_interfaces.validations.unique_valid_name import MetricFlowReservedKeywords
from metricflow_semantics.dag.id_prefix import StaticIdPrefix
Expand Down Expand Up @@ -48,7 +53,7 @@
from metricflow_semantics.specs.order_by_spec import OrderBySpec
from metricflow_semantics.specs.query_spec import MetricFlowQuerySpec
from metricflow_semantics.specs.spec_set import InstanceSpecSet, group_specs_by_type
from metricflow_semantics.specs.time_dimension_spec import TimeDimensionSpec
from metricflow_semantics.specs.time_dimension_spec import DEFAULT_TIME_GRANULARITY, TimeDimensionSpec
from metricflow_semantics.specs.where_filter.where_filter_spec import WhereFilterSpec
from metricflow_semantics.specs.where_filter.where_filter_spec_set import WhereFilterSpecSet
from metricflow_semantics.specs.where_filter.where_filter_transform import WhereSpecFactory
Expand Down Expand Up @@ -303,19 +308,23 @@ def _build_aggregated_conversion_node(
)

# Get the agg time dimension for each measure used for matching conversion time windows
base_time_dimension_spec = TimeDimensionSpec.from_reference(
TimeDimensionReference(
self._semantic_model_lookup.get_agg_time_dimension_for_measure(
base_measure_spec.measure_spec.reference
).element_name
)
)
conversion_time_dimension_spec = TimeDimensionSpec.from_reference(
TimeDimensionReference(
self._semantic_model_lookup.get_agg_time_dimension_for_measure(
conversion_measure_spec.measure_spec.reference
).element_name
)
base_time_dimension_spec = TimeDimensionSpec(
element_name=self._semantic_model_lookup.get_agg_time_dimension_for_measure(
base_measure_spec.measure_spec.reference
).element_name,
entity_links=(), # TODO: is this acceptable?
time_granularity=ExpandedTimeGranularity.from_time_granularity(
DEFAULT_TIME_GRANULARITY
), # TODO: need actual granularity here
)
conversion_time_dimension_spec = TimeDimensionSpec(
element_name=self._semantic_model_lookup.get_agg_time_dimension_for_measure(
conversion_measure_spec.measure_spec.reference
).element_name,
entity_links=(), # TODO: is this acceptable?
time_granularity=ExpandedTimeGranularity.from_time_granularity(
DEFAULT_TIME_GRANULARITY
), # TODO: need actual granularity here
)

# Filter the source nodes with only the required specs needed for the calculation
Expand Down Expand Up @@ -1467,16 +1476,23 @@ def __get_required_and_extraneous_linkable_specs(
linkable_spec_sets_to_merge: List[LinkableSpecSet] = []
for filter_spec in filter_specs:
linkable_spec_sets_to_merge.append(LinkableSpecSet.create_from_specs(filter_spec.linkable_specs))

if measure_spec_properties and measure_spec_properties.non_additive_dimension_spec:
non_additive_dimension_grain = self._semantic_model_lookup.get_defined_time_granularity(
SemanticModelElementReference(
element_name=measure_spec_properties.non_additive_dimension_spec.name,
semantic_model_name=measure_spec_properties.agg_time_dimension.semantic_model_name,
)
)
semantic_model = self._semantic_model_lookup.get_semantic_model(
SemanticModelReference(measure_spec_properties.agg_time_dimension.semantic_model_name)
)
primary_entity = self._semantic_model_lookup.get_primary_entity_else_error(semantic_model)
linkable_spec_sets_to_merge.append(
LinkableSpecSet.create_from_specs(
measure_spec_properties.non_additive_dimension_spec.linkable_specs(non_additive_dimension_grain)
measure_spec_properties.non_additive_dimension_spec.linkable_specs(
non_additive_dimension_grain=non_additive_dimension_grain, primary_entity=primary_entity
)
)
)

Expand Down Expand Up @@ -1713,7 +1729,7 @@ def _build_aggregated_measure_from_measure_source_node(
time_dimension_spec = TimeDimensionSpec(
# The NonAdditiveDimensionSpec name property is a plain element name
element_name=non_additive_dimension_spec.name,
entity_links=(),
entity_links=(), # TODO
time_granularity=ExpandedTimeGranularity.from_time_granularity(non_additive_dimension_grain),
)
window_groupings = tuple(
Expand Down
15 changes: 8 additions & 7 deletions metricflow/dataflow/builder/partitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,16 @@ def __init__(self, semantic_model_lookup: SemanticModelLookup) -> None: # noqa:
def _get_partitions(self, spec_set: InstanceSpecSet) -> PartitionSpecSet:
"""Returns the specs from the instance set that correspond to partition specs."""
partition_dimension_specs = tuple(
x
for x in spec_set.dimension_specs
if self._semantic_model_lookup.get_dimension(dimension_reference=x.reference).is_partition
dimension
for dimension in spec_set.dimension_specs
if self._semantic_model_lookup.dimension_is_partition(dimension)
)

partition_time_dimension_specs = tuple(
x
for x in spec_set.time_dimension_specs
if x.reference != DataSet.metric_time_dimension_reference()
and self._semantic_model_lookup.get_time_dimension(time_dimension_reference=x.reference).is_partition
time_dimension
for time_dimension in spec_set.time_dimension_specs
if time_dimension.reference != DataSet.metric_time_dimension_reference()
and self._semantic_model_lookup.dimension_is_partition(time_dimension)
)

return PartitionSpecSet(
Expand Down
5 changes: 4 additions & 1 deletion metricflow/dataflow/builder/source_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ def __init__( # noqa: D107
semantic_manifest_lookup: SemanticManifestLookup,
) -> None:
self._semantic_manifest_lookup = semantic_manifest_lookup
data_set_converter = SemanticModelToDataSetConverter(column_association_resolver)
data_set_converter = SemanticModelToDataSetConverter(
column_association_resolver=column_association_resolver,
semantic_model_lookup=semantic_manifest_lookup.semantic_model_lookup,
)
self.time_spine_sources = TimeSpineSource.build_standard_time_spine_sources(
semantic_manifest_lookup.semantic_manifest
)
Expand Down
Loading

0 comments on commit 8bb2430

Please sign in to comment.