Skip to content

Commit

Permalink
Merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb committed Dec 6, 2023
2 parents 5bfe36c + 87d7061 commit 2aaee50
Showing 30 changed files with 4,200 additions and 15 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20231205-110320.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Fixes a bug in dimension-only queries where the filter column is removed before
the filter has been applied.
time: 2023-12-05T11:03:20.124953-08:00
custom:
Author: courtneyholcomb
Issue: "923"
45 changes: 32 additions & 13 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
@@ -350,9 +350,12 @@ def build_plan_for_distinct_values(self, query_spec: MetricFlowQuerySpec) -> Dat
"""
assert not query_spec.metric_specs, "Can't build distinct values plan with metrics."

dataflow_recipe = self._find_dataflow_recipe(linkable_spec_set=query_spec.linkable_specs)
required_linkable_specs, _ = self.__get_required_and_extraneous_linkable_specs(
queried_linkable_specs=query_spec.linkable_specs, where_constraint=query_spec.where_constraint
)
dataflow_recipe = self._find_dataflow_recipe(linkable_spec_set=required_linkable_specs)
if not dataflow_recipe:
raise UnableToSatisfyQueryError(f"Recipe not found for linkable specs: {query_spec.linkable_specs}")
raise UnableToSatisfyQueryError(f"Recipe not found for linkable specs: {required_linkable_specs}")

joined_node: Optional[JoinToBaseOutputNode] = None
if dataflow_recipe.join_targets:
@@ -677,7 +680,6 @@ def _find_dataflow_recipe(
for x in evaluation.join_recipes
for y in x.join_on_partition_time_dimensions
)

return DataflowRecipe(
source_node=node_with_lowest_cost_plan,
required_local_linkable_specs=(
@@ -799,6 +801,28 @@ def build_aggregated_measure(
time_range_constraint=time_range_constraint,
)

def __get_required_and_extraneous_linkable_specs(
self,
queried_linkable_specs: LinkableSpecSet,
where_constraint: Optional[WhereFilterSpec] = None,
non_additive_dimension_spec: Optional[NonAdditiveDimensionSpec] = None,
) -> Tuple[LinkableSpecSet, LinkableSpecSet]:
"""Get the required and extraneous linkable specs for this query.
Extraneous linkable specs are specs that are used in this phase that should not show up in the final result
unless it was already a requested spec in the query (e.g., linkable spec used in where constraint)
"""
linkable_spec_sets_to_merge: List[LinkableSpecSet] = []
if where_constraint:
linkable_spec_sets_to_merge.append(where_constraint.linkable_spec_set)
if non_additive_dimension_spec:
linkable_spec_sets_to_merge.append(non_additive_dimension_spec.linkable_specs)

extraneous_linkable_specs = LinkableSpecSet.merge_iterable(linkable_spec_sets_to_merge).dedupe()
required_linkable_specs = queried_linkable_specs.merge(extraneous_linkable_specs).dedupe()

return required_linkable_specs, extraneous_linkable_specs

def _build_aggregated_measure_from_measure_source_node(
self,
metric_input_measure_spec: MetricInputMeasureSpec,
@@ -838,16 +862,11 @@ def _build_aggregated_measure_from_measure_source_node(
)
logger.info(f"Adjusted time range constraint {cumulative_metric_adjusted_time_constraint}")

# Extraneous linkable specs are specs that are used in this phase that should not show up in the final result
# unless it was already a requested spec in the query
linkable_spec_sets_to_merge: List[LinkableSpecSet] = []
if where_constraint:
linkable_spec_sets_to_merge.append(where_constraint.linkable_spec_set)
if non_additive_dimension_spec:
linkable_spec_sets_to_merge.append(non_additive_dimension_spec.linkable_specs)

extraneous_linkable_specs = LinkableSpecSet.merge_iterable(linkable_spec_sets_to_merge).dedupe()
required_linkable_specs = queried_linkable_specs.merge(extraneous_linkable_specs).dedupe()
required_linkable_specs, extraneous_linkable_specs = self.__get_required_and_extraneous_linkable_specs(
queried_linkable_specs=queried_linkable_specs,
where_constraint=where_constraint,
non_additive_dimension_spec=non_additive_dimension_spec,
)

logger.info(
f"Looking for a recipe to get:\n"
23 changes: 21 additions & 2 deletions metricflow/test/integration/test_cases/itest_dimensions.yaml
Original file line number Diff line number Diff line change
@@ -171,7 +171,7 @@ integration_test:
---
integration_test:
name: query_dimension_only_with_constraint
description: Query dimenension only
description: Query dimenension only with where constraint
model: SIMPLE_MODEL
group_bys: ["user__home_state"]
where_filter: "{{ render_dimension_template('user__home_state') }} = 'CA'"
@@ -183,6 +183,25 @@ integration_test:
GROUP BY
u.home_state
---
integration_test:
name: query_dimension_with_constraint_from_diff_source
description: Query dimenension only with where constraint, where constraint requires a join
model: SIMPLE_MODEL
group_bys: ["user__home_state_latest"]
where_filter: "{{ render_dimension_template('listing__country_latest') }} = 'us'"
check_query: |
SELECT
user__home_state_latest
FROM (
SELECT
l.country AS listing__country_latest
, u.home_state_latest AS user__home_state_latest
FROM {{ source_schema }}.dim_listings_latest l
FULL OUTER JOIN {{ source_schema }}.dim_users_latest u ON l.user_id = u.user_id
)
WHERE listing__country_latest = 'us'
GROUP BY user__home_state_latest
---
integration_test:
name: metric_time_only
description: Query metric_time alone
@@ -230,6 +249,6 @@ integration_test:
, l.is_lux
, u.home_state_latest
# case where where filter not in selections, for derived metric plan that doesn't use normal one
# TODO: add test with date part
# test with where filter
# where_filter: "{{ render_dimension_template('user__home_state') }} = 'CA'"
60 changes: 60 additions & 0 deletions metricflow/test/plan_conversion/test_dataflow_to_sql_plan.py
Original file line number Diff line number Diff line change
@@ -1070,6 +1070,66 @@ def test_combine_output_node( # noqa: D
)


@pytest.mark.sql_engine_snapshot
def test_dimensions_requiring_join(
request: FixtureRequest,
mf_test_session_state: MetricFlowTestSessionState,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
sql_client: SqlClient,
) -> None:
"""Tests querying 2 dimensions that require a join."""
dimension_specs = (
DimensionSpec(element_name="home_state_latest", entity_links=(EntityReference(element_name="user"),)),
DimensionSpec(element_name="is_lux_latest", entity_links=(EntityReference(element_name="listing"),)),
)
dataflow_plan = dataflow_plan_builder.build_plan_for_distinct_values(
query_spec=MetricFlowQuerySpec(dimension_specs=dimension_specs)
)

convert_and_check(
request=request,
mf_test_session_state=mf_test_session_state,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
node=dataflow_plan.sink_output_nodes[0].parent_node,
)


@pytest.mark.sql_engine_snapshot
def test_dimension_with_joined_where_constraint(
request: FixtureRequest,
mf_test_session_state: MetricFlowTestSessionState,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
sql_client: SqlClient,
column_association_resolver: ColumnAssociationResolver,
) -> None:
"""Tests querying 2 dimensions that require a join."""
dataflow_plan = dataflow_plan_builder.build_plan_for_distinct_values(
query_spec=MetricFlowQuerySpec(
dimension_specs=(
DimensionSpec(element_name="home_state_latest", entity_links=(EntityReference(element_name="user"),)),
),
where_constraint=WhereSpecFactory(
column_association_resolver=column_association_resolver,
).create_from_where_filter(
PydanticWhereFilter(
where_sql_template="{{ Dimension('listing__country_latest') }} = 'us'",
)
),
),
)

convert_and_check(
request=request,
mf_test_session_state=mf_test_session_state,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
node=dataflow_plan.sink_output_nodes[0].parent_node,
)


@pytest.mark.sql_engine_snapshot
def test_metric_time_only(
request: FixtureRequest,
Loading

0 comments on commit 2aaee50

Please sign in to comment.