Skip to content

Commit

Permalink
Make linkable elements available in FilterSpecResolutionLookup
Browse files Browse the repository at this point in the history
We need to access LinkableElements - or, more specifically, their
source SemanticManifestReferences - in the WhereSpecFactory in order
to evaluate fitness for predicate pushdown.

This makes these elements available to the FilterSpecResolutionLookup,
which we use to get the resolved filter elements when building the
WhereFilterSpec. Subsequent changes will plumb this through to the
WhereFilterSpec for predicate pushdown evaluation.
  • Loading branch information
tlento committed May 3, 2024
1 parent 3ae634f commit 4a62cec
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from metricflow_semantics.model.semantics.linkable_element import (
ElementPathKey,
LinkableDimension,
LinkableElement,
LinkableElementType,
LinkableEntity,
LinkableMetric,
Expand Down Expand Up @@ -200,6 +201,20 @@ def intersection_by_path_key(linkable_element_sets: Sequence[LinkableElementSet]
},
)

def linkable_elements_for_path_key(self, path_key: ElementPathKey) -> Sequence[LinkableElement]:
"""Returns the linkable elements associated with the given path key in this set.
If the path key does not exist in the set, this silently returns an empty Sequence.
"""
if path_key in self.path_key_to_linkable_dimensions:
return self.path_key_to_linkable_dimensions[path_key]
elif path_key in self.path_key_to_linkable_entities:
return self.path_key_to_linkable_entities[path_key]
elif path_key in self.path_key_to_linkable_metrics:
return self.path_key_to_linkable_metrics[path_key]
else:
return tuple()

def filter(
self,
with_any_of: FrozenSet[LinkableElementProperty],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from metricflow_semantics.collection_helpers.merger import Mergeable
from metricflow_semantics.mf_logging.formatting import indent
from metricflow_semantics.mf_logging.pretty_print import mf_pformat
from metricflow_semantics.model.semantics.linkable_element import LinkableElement
from metricflow_semantics.query.group_by_item.filter_spec_resolution.filter_location import WhereFilterLocation
from metricflow_semantics.query.group_by_item.path_prefixable import PathPrefixable
from metricflow_semantics.query.group_by_item.resolution_path import MetricFlowQueryResolutionPath
Expand Down Expand Up @@ -63,11 +64,8 @@ def spec_resolution_exists(self, resolved_spec_lookup_key: ResolvedSpecLookUpKey
"""Returns true if a resolution exists for the given key."""
return len(self.get_spec_resolutions(resolved_spec_lookup_key)) > 0

def checked_resolved_spec(self, resolved_spec_lookup_key: ResolvedSpecLookUpKey) -> LinkableInstanceSpec:
"""Returns the resolved spec for the given key.
If a resolution does not exist, or there is no spec associated with the resolution, this raises a RuntimeError.
"""
def _checked_resolution(self, resolved_spec_lookup_key: ResolvedSpecLookUpKey) -> FilterSpecResolution:
"""Helper to get just the resolution so we can access different properties on it."""
resolutions = self.get_spec_resolutions(resolved_spec_lookup_key)
if len(resolutions) == 0:
raise RuntimeError(
Expand All @@ -91,7 +89,27 @@ def checked_resolved_spec(self, resolved_spec_lookup_key: ResolvedSpecLookUpKey)
f"{mf_pformat(self.spec_resolutions)}"
)

return resolution.resolved_spec
return resolution

def checked_resolved_spec(self, resolved_spec_lookup_key: ResolvedSpecLookUpKey) -> LinkableInstanceSpec:
"""Returns the resolved spec for the given key.
If a resolution does not exist, or there is no spec associated with the resolution, this raises a RuntimeError.
"""
resolved_spec = self._checked_resolution(resolved_spec_lookup_key=resolved_spec_lookup_key).resolved_spec
assert resolved_spec is not None, "Typechecker hint, this should have been verified in _checked_resolution"
return resolved_spec

def checked_resolved_linkable_elements(
self, resolved_spec_lookup_key: ResolvedSpecLookUpKey
) -> Sequence[LinkableElement]:
"""Returns the sequence of LinkableElements for the given spec lookup key.
These are the LinkableElements bound to the singular spec/path_key for a given resolved filter item. They are
useful for propagating metadata about the origin semantic model across the boundary between the filter resolver
and the DataflowPlanBuilder.
"""
return self._checked_resolution(resolved_spec_lookup_key=resolved_spec_lookup_key).resolved_linkable_elements

@override
def merge(self, other: FilterSpecResolutionLookUp) -> FilterSpecResolutionLookUp:
Expand Down Expand Up @@ -206,6 +224,15 @@ def resolved_spec(self) -> Optional[LinkableInstanceSpec]:
f"Found {len(specs)} in {self.resolved_linkable_element_set}, this should not be possible!"
)

@property
def resolved_linkable_elements(self) -> Sequence[LinkableElement]:
"""Returns the resolved linkable elements, if any, for this resolution result."""
resolved_spec = self.resolved_spec
if resolved_spec is None:
return tuple()

return self.resolved_linkable_element_set.linkable_elements_for_path_key(resolved_spec.element_path_key)


CallParameterSet = Union[
DimensionCallParameterSet, TimeDimensionCallParameterSet, EntityCallParameterSet, MetricCallParameterSet
Expand Down
46 changes: 44 additions & 2 deletions metricflow-semantics/metricflow_semantics/specs/spec_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from metricflow_semantics.aggregation_properties import AggregationState
from metricflow_semantics.collection_helpers.dedupe import ordered_dedupe
from metricflow_semantics.collection_helpers.merger import Mergeable
from metricflow_semantics.model.semantics.linkable_element import ElementPathKey, LinkableElementType
from metricflow_semantics.naming.linkable_spec_name import StructuredLinkableSpecName
from metricflow_semantics.sql.sql_bind_parameters import SqlBindParameters
from metricflow_semantics.sql.sql_column_type import SqlColumnType
Expand Down Expand Up @@ -153,11 +154,13 @@ class LinkableInstanceSpec(InstanceSpec, ABC):
entity_links: Tuple[EntityReference, ...]

@property
@abstractmethod
def without_first_entity_link(self: SelfTypeT) -> SelfTypeT:
"""e.g. user_id__device_id__platform -> device_id__platform."""
raise NotImplementedError()

@property
@abstractmethod
def without_entity_links(self: SelfTypeT) -> SelfTypeT:
"""e.g. user_id__device_id__platform -> platform."""
raise NotImplementedError()
Expand All @@ -179,8 +182,15 @@ def qualified_name(self) -> str:

@property
@abstractmethod
def reference(self) -> LinkableElementReference: # noqa: D102
pass
def reference(self) -> LinkableElementReference:
"""Return the LinkableElementReference associated with the spec instance."""
raise NotImplementedError()

@property
@abstractmethod
def element_path_key(self) -> ElementPathKey:
"""Return the ElementPathKey representation of the LinkableInstanceSpec subtype."""
raise NotImplementedError()


@dataclass(frozen=True)
Expand Down Expand Up @@ -225,6 +235,13 @@ def reference(self) -> EntityReference: # noqa: D102
def accept(self, visitor: InstanceSpecVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D102
return visitor.visit_entity_spec(self)

@property
@override
def element_path_key(self) -> ElementPathKey:
return ElementPathKey(
element_name=self.element_name, element_type=LinkableElementType.ENTITY, entity_links=self.entity_links
)


@dataclass(frozen=True)
class LinklessEntitySpec(EntitySpec, SerializableDataclass):
Expand Down Expand Up @@ -285,6 +302,13 @@ def reference(self) -> DimensionReference: # noqa: D102
def accept(self, visitor: InstanceSpecVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D102
return visitor.visit_dimension_spec(self)

@property
@override
def element_path_key(self) -> ElementPathKey:
return ElementPathKey(
element_name=self.element_name, element_type=LinkableElementType.DIMENSION, entity_links=self.entity_links
)


class TimeDimensionSpecField(Enum):
"""Fields of the time dimension spec.
Expand Down Expand Up @@ -398,6 +422,17 @@ def qualified_name(self) -> str: # noqa: D102
date_part=self.date_part,
).qualified_name

@property
@override
def element_path_key(self) -> ElementPathKey:
return ElementPathKey(
element_name=self.element_name,
element_type=LinkableElementType.TIME_DIMENSION,
entity_links=self.entity_links,
time_granularity=self.time_granularity,
date_part=self.date_part,
)

@staticmethod
def from_reference(reference: TimeDimensionReference) -> TimeDimensionSpec:
"""Initialize from a time dimension reference instance."""
Expand Down Expand Up @@ -786,3 +821,10 @@ def reference(self) -> GroupByMetricReference: # noqa: D102

def accept(self, visitor: InstanceSpecVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D102
return visitor.visit_group_by_metric_spec(self)

@property
@override
def element_path_key(self) -> ElementPathKey:
return ElementPathKey(
element_name=self.element_name, element_type=LinkableElementType.METRIC, entity_links=self.entity_links
)
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,46 @@ def _linkable_set_with_uniques_and_duplicates() -> LinkableElementSet:
)


def test_linkable_elements_for_path_key() -> None:
"""Tests accessing the linkable element tuples for a given path key.
The keys all share the same name and links but should return different results. Note the metric keys have
additional entity link annotations due to the way we differentiate between link paths within the query and link
paths outside the query (from outer query to inner metric query).
"""
linkable_set = _linkable_set_with_uniques_and_duplicates()
entity_key = ElementPathKey(
element_name=AMBIGUOUS_NAME, element_type=LinkableElementType.ENTITY, entity_links=(_base_entity_reference,)
)
dimension_key = ElementPathKey(
element_name=AMBIGUOUS_NAME, element_type=LinkableElementType.DIMENSION, entity_links=(_base_entity_reference,)
)
ambiguous_metric_key = ElementPathKey(
element_name=AMBIGUOUS_NAME,
element_type=LinkableElementType.METRIC,
entity_links=(_base_entity_reference,),
metric_subquery_entity_links=(_base_entity_reference, _base_entity_reference),
)
doubled_ambiguous_metric_key = ElementPathKey(
element_name=AMBIGUOUS_NAME,
element_type=LinkableElementType.METRIC,
entity_links=(_base_entity_reference, _base_entity_reference),
metric_subquery_entity_links=(_base_entity_reference, _base_entity_reference),
)

entity_elements = linkable_set.linkable_elements_for_path_key(path_key=entity_key)
dimension_elements = linkable_set.linkable_elements_for_path_key(path_key=dimension_key)
ambiguous_metric_elements = linkable_set.linkable_elements_for_path_key(path_key=ambiguous_metric_key)
doubled_ambiguous_metric_elments = linkable_set.linkable_elements_for_path_key(
path_key=doubled_ambiguous_metric_key
)

assert entity_elements == (_ambiguous_entity, _ambiguous_entity_with_join_path)
assert dimension_elements == (_ambiguous_categorical_dimension, _ambiguous_categorical_dimension_with_join_path)
assert ambiguous_metric_elements == (_ambiguous_metric,)
assert doubled_ambiguous_metric_elments == (_ambiguous_metric_with_join_path,)


def test_filter_with_any_of() -> None:
"""Tests behavior of filter method with a `with_any_of` specified."""
filter_properties = frozenset([LinkableElementProperty.JOINED, LinkableElementProperty.ENTITY])
Expand Down

0 comments on commit 4a62cec

Please sign in to comment.