Skip to content

Commit

Permalink
Resolve some bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb committed Sep 23, 2023
1 parent 2074663 commit e700c96
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 17 deletions.
87 changes: 75 additions & 12 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@
from dbt_semantic_interfaces.enum_extension import assert_values_exhausted
from dbt_semantic_interfaces.pretty_print import pformat_big_objects
from dbt_semantic_interfaces.protocols.metric import MetricTimeWindow, MetricType
from dbt_semantic_interfaces.references import TimeDimensionReference
from dbt_semantic_interfaces.protocols.semantic_model import SemanticModel
from dbt_semantic_interfaces.references import (
DimensionReference,
EntityReference,
TimeDimensionReference,
)
from dbt_semantic_interfaces.type_enums.time_granularity import TimeGranularity

from metricflow.dag.id_generation import DATAFLOW_PLAN_PREFIX, IdGeneratorRegistry
Expand Down Expand Up @@ -272,24 +277,82 @@ def _build_metrics_output_node(
join_type=combine_metrics_join_type,
)

def __get_semantic_models_for_linkable_specs(
self, linkable_specs: LinkableSpecSet
) -> Dict[SemanticModel, LinkableSpecSet]:
"""Build dict of semantic models to associated linkable specs."""
semantic_models_to_linkable_specs: Dict[SemanticModel, LinkableSpecSet] = {}

# Dimensions
for dimension_spec in linkable_specs.dimension_specs:
semantic_models = self._semantic_model_lookup.get_semantic_models_for_linkable_element(
linkable_element=DimensionReference(element_name=dimension_spec.element_name)
)
for semantic_model in semantic_models:
new_linkable_spec_set = LinkableSpecSet(dimension_specs=(dimension_spec,))
linkable_specs_for_semantic_model = semantic_models_to_linkable_specs.get(semantic_model)
semantic_models_to_linkable_specs[semantic_model] = (
LinkableSpecSet.merge([linkable_specs_for_semantic_model, new_linkable_spec_set])
if linkable_specs_for_semantic_model
else new_linkable_spec_set
)
# Time dimensions
for time_dimension_spec in linkable_specs.time_dimension_specs:
semantic_models = self._semantic_model_lookup.get_semantic_models_for_linkable_element(
linkable_element=TimeDimensionReference(element_name=time_dimension_spec.element_name)
)
for semantic_model in semantic_models:
new_linkable_spec_set = LinkableSpecSet(time_dimension_specs=(time_dimension_spec,))
semantic_models_to_linkable_specs[semantic_model] = (
LinkableSpecSet.merge([linkable_specs_for_semantic_model, new_linkable_spec_set])
if linkable_specs_for_semantic_model
else new_linkable_spec_set
)
# Entities
for entity_spec in linkable_specs.entity_specs:
semantic_models = self._semantic_model_lookup.get_semantic_models_for_linkable_element(
linkable_element=EntityReference(element_name=entity_spec.element_name)
)
for semantic_model in semantic_models:
new_linkable_spec_set = LinkableSpecSet(entity_specs=(entity_spec,))
semantic_models_to_linkable_specs[semantic_model] = (
LinkableSpecSet.merge([linkable_specs_for_semantic_model, new_linkable_spec_set])
if linkable_specs_for_semantic_model
else new_linkable_spec_set
)

return semantic_models_to_linkable_specs

def build_plan_for_distinct_values(self, query_spec: MetricFlowQuerySpec) -> DataflowPlan:
"""Generate a plan that would get the distinct values of a linkable instance.
e.g. distinct listing__country_latest for bookings by listing__country_latest
"""
assert not query_spec.metric_specs, "Can't build distinct values plan with metrics."
output_nodes = []
for linkable_specs in self.__get_semantic_models_for_linkable_specs(
linkable_specs=query_spec.linkable_specs
).values():
dataflow_recipe = self._find_dataflow_recipe(linkable_spec_set=linkable_specs)
if not dataflow_recipe:
raise UnableToSatisfyQueryError(f"Recipe not found for linkable specs: {linkable_specs}.")
output_nodes.append(dataflow_recipe.source_node)

if not output_nodes:
raise UnableToSatisfyQueryError(f"Recipe not found for linkable specs: {query_spec.linkable_specs}")

linkable_specs = query_spec.linkable_specs
dataflow_recipe = self._find_dataflow_recipe(linkable_spec_set=linkable_specs)
if not dataflow_recipe:
raise UnableToSatisfyQueryError(f"Recipe not found for linkable specs: {linkable_specs}.")

source_node = dataflow_recipe.source_node
distinct_values_node = FilterElementsNode(
parent_node=source_node,
include_specs=InstanceSpecSet.create_from_linkable_specs(linkable_specs.as_tuple),
distinct_values=True,
)
if len(output_nodes) == 1:
distinct_values_node = FilterElementsNode(
parent_node=output_nodes[0],
include_specs=InstanceSpecSet.create_from_linkable_specs(query_spec.linkable_specs.as_tuple),
distinct_values=True,
)
else:
distinct_values_node = FilterElementsNode(
parent_node=JoinAggregatedMeasuresByGroupByColumnsNode(parent_nodes=output_nodes),
include_specs=query_spec.linkable_specs.as_spec_set,
distinct_values=True,
)

where_constraint_node: Optional[WhereConstraintNode] = None
if query_spec.where_constraint:
Expand Down
6 changes: 6 additions & 0 deletions metricflow/model/semantics/semantic_model_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,12 @@ def get_semantic_models_for_entity(self, entity_reference: EntityReference) -> S
entity = self._entity_ref_to_entity[entity_reference]
return set(self._entity_index[entity])

def get_semantic_models_for_linkable_element(
self, linkable_element: LinkableElementReference
) -> Set[SemanticModel]:
"""Return all semantic models associated with a linkable element reference."""
return set(self._linkable_reference_index[linkable_element])

@staticmethod
def get_entity_from_semantic_model(
semantic_model: SemanticModel, entity_reference: LinkableElementReference
Expand Down
7 changes: 7 additions & 0 deletions metricflow/protocols/semantics.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from dbt_semantic_interfaces.references import (
DimensionReference,
EntityReference,
LinkableElementReference,
MeasureReference,
MetricReference,
SemanticModelElementReference,
Expand Down Expand Up @@ -101,6 +102,12 @@ def get_entity_in_semantic_model(self, ref: SemanticModelElementReference) -> Op
"""Retrieve the entity matching the element -> semantic model mapping, if any."""
raise NotImplementedError

def get_semantic_models_for_linkable_element(
self, linkable_element: LinkableElementReference
) -> Set[SemanticModel]:
"""Return all semantic models associated with a linkable element reference."""
raise NotImplementedError

@abstractmethod
def get_by_reference(self, semantic_model_reference: SemanticModelReference) -> Optional[SemanticModel]:
"""Retrieve the semantic model object matching the input semantic model reference, if any."""
Expand Down
11 changes: 6 additions & 5 deletions metricflow/test/integration/test_cases/itest_dimensions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -133,17 +133,18 @@ integration_test:
name: query_dimensions_from_different_tables
description: Query multiple dimensions without metrics, requiring a join
model: SIMPLE_MODEL
group_bys: ["user__home_state", "verification__ds__day"]
group_bys: ["user__home_state", "listing__is_lux_latest"]
check_query: |
SELECT
u.home_state AS user__home_state
, v.ds AS verification__ds__day
FROM {{ source_schema }}.fct_id_verifications v
LEFT OUTER JOIN {{ source_schema }}.dim_users u
, l.is_lux AS listing__is_lux_latest
FROM {{ source_schema }}.dim_listings_latest l
ON l.listing_id = v.listing_id
LEFT OUTER JOIN {{ source_schema }}.dim_users_latest u
ON u.user_id = v.user_id
GROUP BY
u.home_state
, v.ds
, l.is_lux
---
integration_test:
name: query_time_dimension_without_granularity
Expand Down
1 change: 1 addition & 0 deletions metricflow/test/integration/test_configured_cases.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ def filter_not_supported_features(
@pytest.mark.parametrize(
"name",
CONFIGURED_INTEGRATION_TESTS_REPOSITORY.all_test_case_names,
# ["itest_dimensions.yaml/query_dimensions_from_different_tables"],
ids=lambda name: f"name={name}",
)
def test_case(
Expand Down

0 comments on commit e700c96

Please sign in to comment.