Skip to content

Commit

Permalink
Fix pushdown operations on joined in dimensions
Browse files Browse the repository at this point in the history
Note for reviewers, as this will be squashed:

Pushdown on joined in dimensions was not working as expected. In
the original predicate pushdown rendering tests, most joined in
dimensions were being skipped for pushdown operations.

The root cause of the issue was the semantic model source value
we were accessing, which actually included the complete history
of all semantic model inputs for the joined in dimension.

Fixing that problem uncovered a separate issue, which is we were
inappropriately pushing filters down past outer join expressions.

This commit fixes both issues at once - we now only push down
on the "safe" side of an outer join (the left side for LEFT OUTER
and not at all for FULL OUTER joins), and we evaluate pushdown
based on the singuolar semantic model source where each element is
defined.
  • Loading branch information
tlento committed May 23, 2024
1 parent 353c5d9 commit b3b2958
Show file tree
Hide file tree
Showing 35 changed files with 5,068 additions and 5,269 deletions.
18 changes: 8 additions & 10 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -896,23 +896,21 @@ def _find_dataflow_recipe(
node_data_set_resolver=self._node_data_set_resolver,
)

if predicate_pushdown_state.has_pushdown_potential:
if predicate_pushdown_state.has_pushdown_potential and default_join_type is not SqlJoinType.FULL_OUTER:
# TODO: encapsulate join type and distinct values state and eventually move this to a DataflowPlanOptimizer
# This works today because all of our subsequent join configuration operations preserve the join type
# as-is, or else switch it to a CROSS JOIN or INNER JOIN type, both of which are safe for predicate
# pushdown. However, there is currently no way to enforce that invariant, so we will need to move
# to a model where we evaluate the join nodes themselves and decide on whether or not to push down
# the predicate. This will be much more straightforward once we finish encapsulating our existing
# time range constraint pushdown controls into this mechanism.
candidate_nodes_for_left_side_of_join = list(
node_processor.apply_matching_filter_predicates(
source_nodes=candidate_nodes_for_left_side_of_join,
predicate_pushdown_state=predicate_pushdown_state,
metric_time_dimension_reference=self._metric_time_dimension_reference,
)
)
candidate_nodes_for_right_side_of_join = list(
node_processor.apply_matching_filter_predicates(
source_nodes=candidate_nodes_for_right_side_of_join,
predicate_pushdown_state=PredicatePushdownState.without_time_range_constraint(
original_pushdown_state=predicate_pushdown_state
),
metric_time_dimension_reference=self._metric_time_dimension_reference,
)
)

candidate_nodes_for_right_side_of_join = node_processor.remove_unnecessary_nodes(
desired_linkable_specs=linkable_specs,
Expand Down
7 changes: 1 addition & 6 deletions metricflow/plan_conversion/node_processor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import dataclasses
import itertools
import logging
from enum import Enum
from typing import Dict, FrozenSet, List, Optional, Sequence, Set
Expand Down Expand Up @@ -355,11 +354,7 @@ def _add_where_constraint(
"""Processes where filter specs and evaluates their fitness for pushdown against the provided node set."""
eligible_filter_specs_by_model: Dict[SemanticModelReference, Sequence[WhereFilterSpec]] = {}
for spec in where_filter_specs:
semantic_models = set(
itertools.chain.from_iterable(
[element.derived_from_semantic_models for element in spec.linkable_elements]
)
)
semantic_models = set(element.semantic_model_origin for element in spec.linkable_elements)
invalid_element_types = [
element for element in spec.linkable_elements if element.element_type not in enabled_element_types
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ def test_multiple_categorical_dimension_pushdown(
) -> None:
"""Tests rendering a query where we expect predicate pushdown for more than one categorical dimension."""
parsed_query = query_parser.parse_and_validate_query(
metric_names=("bookings",),
group_by_names=("booking__is_instant",),
metric_names=("listings",),
group_by_names=("user__home_state_latest",),
where_constraint=PydanticWhereFilter(
where_sql_template="{{ Dimension('listing__is_lux_latest') }} OR {{ Dimension('listing__capacity_latest') }} > 4",
),
Expand All @@ -69,37 +69,7 @@ def test_multiple_categorical_dimension_pushdown(


@pytest.mark.sql_engine_snapshot
def test_multiple_different_filters_on_same_joined_categorical_dimension(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
query_parser: MetricFlowQueryParser,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
sql_client: SqlClient,
) -> None:
"""Tests rendering a query where multiple filters against the same joined dimension need to be an effective OR.
This can be an issue where a derived metric takes in two filters that refer to the same joined-in categorical
dimension. If these filters are disjoint the predicate pushdown needs to ensure that all matching rows are
returned, so we cannot simply push one filter or the other down, nor can we push them down as an AND - they
must be an OR, since all relevant rows need to be returned to the requesting metrics.
"""
parsed_query = query_parser.parse_and_validate_query(
metric_names=("regional_starting_balance_ratios",),
)
dataflow_plan = dataflow_plan_builder.build_plan(parsed_query.query_spec)

convert_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
node=dataflow_plan.sink_node,
)


@pytest.mark.sql_engine_snapshot
def test_multiple_different_filters_on_same_measure_source_categorical_dimension(
def test_different_filters_on_same_measure_source_categorical_dimension(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
Expand All @@ -113,6 +83,9 @@ def test_multiple_different_filters_on_same_measure_source_categorical_dimension
measure source. If these filters are disjoint the predicate pushdown needs to ensure that all matching rows are
returned, so we cannot simply push one filter or the other down, nor can we push them down as an AND - they
must be an OR, since all relevant rows need to be returned to the requesting metrics.
The metric listed here has one input that filters on bookings__is_instant and another that does not, which means
the source input for the latter input must NOT have the filter applied to it.
"""
parsed_query = query_parser.parse_and_validate_query(
metric_names=("instant_booking_fraction_of_max_value",),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<!-- distinct = True -->
<ConstrainTimeRangeNode>
<!-- description = 'Constrain Time Range to [2020-01-01T00:00:00, 2020-01-03T00:00:00]' -->
<!-- node_id = NodeId(id_str='ctr_1') -->
<!-- node_id = NodeId(id_str='ctr_0') -->
<!-- time_range_start = '2020-01-01T00:00:00' -->
<!-- time_range_end = '2020-01-03T00:00:00' -->
<JoinOnEntitiesNode>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<!-- distinct = True -->
<WhereConstraintNode>
<!-- description = 'Constrain Output with WHERE' -->
<!-- node_id = NodeId(id_str='wcc_2') -->
<!-- node_id = NodeId(id_str='wcc_0') -->
<!-- where_condition = -->
<!-- WhereFilterSpec( -->
<!-- where_sql="listing__country_latest = 'us'", -->
Expand Down Expand Up @@ -53,42 +53,11 @@
<!-- ), -->
<!-- ), -->
<!-- ) -->
<WhereConstraintNode>
<!-- description = 'Constrain Output with WHERE' -->
<!-- node_id = NodeId(id_str='wcc_0') -->
<!-- where_condition = -->
<!-- WhereFilterSpec( -->
<!-- where_sql="listing__country_latest = 'us'", -->
<!-- bind_parameters=SqlBindParameters(), -->
<!-- linkable_specs=( -->
<!-- DimensionSpec( -->
<!-- element_name='country_latest', -->
<!-- entity_links=(EntityReference(element_name='listing'),), -->
<!-- ), -->
<!-- ), -->
<!-- linkable_elements=( -->
<!-- LinkableDimension( -->
<!-- semantic_model_origin=SemanticModelReference( -->
<!-- semantic_model_name='listings_latest', -->
<!-- ), -->
<!-- element_name='country_latest', -->
<!-- dimension_type=CATEGORICAL, -->
<!-- entity_links=(EntityReference(element_name='listing'),), -->
<!-- join_path=SemanticModelJoinPath( -->
<!-- left_semantic_model_reference=SemanticModelReference( -->
<!-- semantic_model_name='listings_latest', -->
<!-- ), -->
<!-- ), -->
<!-- properties=frozenset('LOCAL',), -->
<!-- ), -->
<!-- ), -->
<!-- ) -->
<ReadSqlSourceNode>
<!-- description = "Read From SemanticModelDataSet('listings_latest')" -->
<!-- node_id = NodeId(id_str='rss_28018') -->
<!-- data_set = SemanticModelDataSet('listings_latest') -->
</ReadSqlSourceNode>
</WhereConstraintNode>
<ReadSqlSourceNode>
<!-- description = "Read From SemanticModelDataSet('listings_latest')" -->
<!-- node_id = NodeId(id_str='rss_28018') -->
<!-- data_set = SemanticModelDataSet('listings_latest') -->
</ReadSqlSourceNode>
</WhereConstraintNode>
</FilterElementsNode>
</OrderByLimitNode>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<!-- distinct = True -->
<WhereConstraintNode>
<!-- description = 'Constrain Output with WHERE' -->
<!-- node_id = NodeId(id_str='wcc_2') -->
<!-- node_id = NodeId(id_str='wcc_0') -->
<!-- where_condition = -->
<!-- WhereFilterSpec( -->
<!-- where_sql="listing__country_latest = 'us'", -->
Expand Down Expand Up @@ -67,42 +67,11 @@
<!-- join_on_entity=LinklessEntitySpec(element_name='user'), -->
<!-- join_type=FULL_OUTER, -->
<!-- ) -->
<WhereConstraintNode>
<!-- description = 'Constrain Output with WHERE' -->
<!-- node_id = NodeId(id_str='wcc_0') -->
<!-- where_condition = -->
<!-- WhereFilterSpec( -->
<!-- where_sql="listing__country_latest = 'us'", -->
<!-- bind_parameters=SqlBindParameters(), -->
<!-- linkable_specs=( -->
<!-- DimensionSpec( -->
<!-- element_name='country_latest', -->
<!-- entity_links=(EntityReference(element_name='listing'),), -->
<!-- ), -->
<!-- ), -->
<!-- linkable_elements=( -->
<!-- LinkableDimension( -->
<!-- semantic_model_origin=SemanticModelReference( -->
<!-- semantic_model_name='listings_latest', -->
<!-- ), -->
<!-- element_name='country_latest', -->
<!-- dimension_type=CATEGORICAL, -->
<!-- entity_links=(EntityReference(element_name='listing'),), -->
<!-- join_path=SemanticModelJoinPath( -->
<!-- left_semantic_model_reference=SemanticModelReference( -->
<!-- semantic_model_name='listings_latest', -->
<!-- ), -->
<!-- ), -->
<!-- properties=frozenset('LOCAL',), -->
<!-- ), -->
<!-- ), -->
<!-- ) -->
<ReadSqlSourceNode>
<!-- description = "Read From SemanticModelDataSet('listings_latest')" -->
<!-- node_id = NodeId(id_str='rss_28018') -->
<!-- data_set = SemanticModelDataSet('listings_latest') -->
</ReadSqlSourceNode>
</WhereConstraintNode>
<ReadSqlSourceNode>
<!-- description = "Read From SemanticModelDataSet('listings_latest')" -->
<!-- node_id = NodeId(id_str='rss_28018') -->
<!-- data_set = SemanticModelDataSet('listings_latest') -->
</ReadSqlSourceNode>
<FilterElementsNode>
<!-- description = "Pass Only Elements: ['home_state_latest', 'user']" -->
<!-- node_id = NodeId(id_str='pfe_1') -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
<!-- distinct = False -->
<WhereConstraintNode>
<!-- description = 'Constrain Output with WHERE' -->
<!-- node_id = NodeId(id_str='wcc_3') -->
<!-- node_id = NodeId(id_str='wcc_1') -->
<!-- where_condition = -->
<!-- WhereFilterSpec( -->
<!-- where_sql='booking__is_instant', -->
Expand Down Expand Up @@ -117,7 +117,7 @@
<!-- ), -->
<!-- linkable_elements=( -->
<!-- LinkableDimension( -->
<!-- semantic_model_origin=SemanticModelReference( -->
<!-- defined_in_semantic_model=SemanticModelReference( -->
<!-- semantic_model_name='bookings_source', -->
<!-- ), -->
<!-- element_name='is_instant', -->
Expand Down
Loading

0 comments on commit b3b2958

Please sign in to comment.