Skip to content

Commit

Permalink
Enable DataflowPlanOptimizers for query rendering tests (#1263)
Browse files Browse the repository at this point in the history
Enable DataflowPlanOptimizers for query rendering tests

The metricflow query rendering tests do snapshot generation and comparison
for standard rendering and optimized rendering. However, these plans only
run the SqlQueryPlanOptimizers - they do not use the DataflowPlanOptimizers.

This means our optimized plans were only partially optimized. Now with
predicate pushdown it would be helpful to see the complete optimization
effect on query plan rendering to SQL.

This change makes that possible by including DataflowPlanOptimizers in
the comparison helper function. For the time being, and to minimize
thrash in snapshot plans, we only include the no-op PredicatePushdownOptimizer.
This will allow us to track the impact of enabling predicate pushdown via
that optimizer through query plan snapshot changes.

A later change will add the branch combiner and update snapshot rendering
accordingly.

Note the distinct values tests needed a quick hack to keep working, which proved less
silly than a local refactor of the helper method.

Snapshot changes should be limited to ID number updates.
  • Loading branch information
tlento authored Jun 25, 2024
1 parent 7758362 commit d18efed
Show file tree
Hide file tree
Showing 234 changed files with 3,219 additions and 3,268 deletions.
32 changes: 21 additions & 11 deletions tests_metricflow/query_rendering/compare_rendered_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,36 @@

from _pytest.fixtures import FixtureRequest
from metricflow_semantics.dag.mf_dag import DagId
from metricflow_semantics.specs.query_spec import MetricFlowQuerySpec
from metricflow_semantics.test_helpers.config_helpers import MetricFlowTestConfiguration

from metricflow.dataflow.dataflow_plan import DataflowPlanNode
from metricflow.dataflow.builder.dataflow_plan_builder import DataflowPlanBuilder
from metricflow.dataflow.optimizer.predicate_pushdown_optimizer import PredicatePushdownOptimizer
from metricflow.plan_conversion.dataflow_to_sql import DataflowToSqlQueryPlanConverter
from metricflow.protocols.sql_client import SqlClient
from metricflow.sql.optimizer.optimization_levels import SqlQueryOptimizationLevel
from tests_metricflow.dataflow_plan_to_svg import display_graph_if_requested
from tests_metricflow.sql.compare_sql_plan import assert_rendered_sql_from_plan_equal


def convert_and_check(
def render_and_check(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
sql_client: SqlClient,
node: DataflowPlanNode,
query_spec: MetricFlowQuerySpec,
is_distinct_values_plan: bool = False,
) -> None:
"""Renders an engine-specific query output from a DataflowPlanNode DataFlowPlan node.
TODO: refine interface once file move operations are complete.
"""
# Run dataflow -> sql conversion without optimizers
"""Renders an engine-specific query output from a given query, in both basic and optimized forms."""
# Build and convert dataflow plan without optimizers
if is_distinct_values_plan:
base_plan = dataflow_plan_builder.build_plan_for_distinct_values(query_spec=query_spec)
else:
base_plan = dataflow_plan_builder.build_plan(query_spec)
conversion_result = dataflow_to_sql_converter.convert_to_sql_query_plan(
sql_engine_type=sql_client.sql_engine_type,
dataflow_plan_node=node,
dataflow_plan_node=base_plan.sink_node,
optimization_level=SqlQueryOptimizationLevel.O0,
sql_query_plan_id=DagId.from_str("plan0"),
)
Expand All @@ -46,10 +51,15 @@ def convert_and_check(
sql_client=sql_client,
)

# Run dataflow -> sql conversion with optimizers
# Run dataflow -> sql conversion with all optimizers
if is_distinct_values_plan:
# TODO: Make optimization available for distinct values plans
optimized_plan = base_plan
else:
optimized_plan = dataflow_plan_builder.build_plan(query_spec, optimizers=(PredicatePushdownOptimizer(),))
conversion_result = dataflow_to_sql_converter.convert_to_sql_query_plan(
sql_engine_type=sql_client.sql_engine_type,
dataflow_plan_node=node,
dataflow_plan_node=optimized_plan.sink_node,
optimization_level=SqlQueryOptimizationLevel.O4,
sql_query_plan_id=DagId.from_str("plan0_optimized"),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from metricflow.dataflow.builder.dataflow_plan_builder import DataflowPlanBuilder
from metricflow.plan_conversion.dataflow_to_sql import DataflowToSqlQueryPlanConverter
from metricflow.protocols.sql_client import SqlClient
from tests_metricflow.query_rendering.compare_rendered_query import convert_and_check
from tests_metricflow.query_rendering.compare_rendered_query import render_and_check


@pytest.mark.sql_engine_snapshot
Expand All @@ -32,14 +32,14 @@ def test_conversion_metric(
where_sql_template=("{{ TimeDimension('metric_time', 'day') }} = '2020-01-01'")
),
)
dataflow_plan = dataflow_plan_builder.build_plan(parsed_query.query_spec)

convert_and_check(
render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
node=dataflow_plan.sink_node,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=parsed_query.query_spec,
)


Expand All @@ -61,14 +61,14 @@ def test_conversion_metric_with_window(
where_sql_template=("{{ TimeDimension('metric_time', 'day') }} = '2020-01-01'")
),
)
dataflow_plan = dataflow_plan_builder.build_plan(parsed_query.query_spec)

convert_and_check(
render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
node=dataflow_plan.sink_node,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=parsed_query.query_spec,
)


Expand All @@ -90,14 +90,14 @@ def test_conversion_metric_with_categorical_filter(
where_sql_template=("{{ Dimension('visit__referrer_id') }} = 'ref_id_01'")
),
)
dataflow_plan = dataflow_plan_builder.build_plan(parsed_query.query_spec)

convert_and_check(
render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
node=dataflow_plan.sink_node,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=parsed_query.query_spec,
)


Expand All @@ -121,14 +121,14 @@ def test_conversion_metric_with_time_constraint(
time_constraint_start=datetime.datetime(2020, 1, 1),
time_constraint_end=datetime.datetime(2020, 1, 2),
)
dataflow_plan = dataflow_plan_builder.build_plan(parsed_query.query_spec)

convert_and_check(
render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
node=dataflow_plan.sink_node,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=parsed_query.query_spec,
)


Expand All @@ -155,12 +155,12 @@ def test_conversion_metric_with_window_and_time_constraint(
time_constraint_start=datetime.datetime(2020, 1, 1),
time_constraint_end=datetime.datetime(2020, 1, 2),
)
dataflow_plan = dataflow_plan_builder.build_plan(parsed_query.query_spec)

convert_and_check(
render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
node=dataflow_plan.sink_node,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=parsed_query.query_spec,
)
Loading

0 comments on commit d18efed

Please sign in to comment.