From 281478e81951ce804ba890042002080dfbd201de Mon Sep 17 00:00:00 2001 From: Shaurya Chaturvedi Date: Tue, 22 Oct 2024 16:56:28 -0700 Subject: [PATCH] Enabling LogicalProject pushdown optimizations to eliminate exchange of unused columns (#14198) --- .../calcite/rel/rules/PinotQueryRuleSets.java | 8 ++ .../apache/pinot/query/QueryEnvironment.java | 6 ++ .../src/test/resources/queries/JoinPlans.json | 98 +++++++++++++++---- .../queries/WindowFunctionPlans.json | 4 +- 4 files changed, 94 insertions(+), 22 deletions(-) diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java index 45f867c11b5..5bc55835e79 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java @@ -101,6 +101,14 @@ private PinotQueryRuleSets() { CoreRules.FILTER_PROJECT_TRANSPOSE ); + // Project pushdown rules run using a RuleCollection since we want to push down a project as much as possible in a + // single HepInstruction. + public static final List PROJECT_PUSHDOWN_RULES = List.of( + CoreRules.PROJECT_FILTER_TRANSPOSE, + CoreRules.PROJECT_JOIN_TRANSPOSE, + CoreRules.PROJECT_MERGE + ); + // The pruner rules run top-down to ensure Calcite restarts from root node after applying a transformation. public static final List PRUNE_RULES = List.of( CoreRules.AGGREGATE_PROJECT_MERGE, diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java index 1681d41c947..629c7ae2c56 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java @@ -427,6 +427,12 @@ private static HepProgram getOptProgram() { // Pushdown filters using a single HepInstruction. hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.FILTER_PUSHDOWN_RULES); + // Pushdown projects after first filter pushdown to minimize projected columns. + hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.PROJECT_PUSHDOWN_RULES); + + // Pushdown filters again since filter should be pushed down at the lowest level, after project pushdown. + hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.FILTER_PUSHDOWN_RULES); + // ---- // Prune duplicate/unnecessary nodes using a single HepInstruction. // TODO: We can consider using HepMatchOrder.TOP_DOWN if we find cases where it would help. diff --git a/pinot-query-planner/src/test/resources/queries/JoinPlans.json b/pinot-query-planner/src/test/resources/queries/JoinPlans.json index 168c3ceaa36..45fcf62251d 100644 --- a/pinot-query-planner/src/test/resources/queries/JoinPlans.json +++ b/pinot-query-planner/src/test/resources/queries/JoinPlans.json @@ -272,14 +272,14 @@ "output": [ "Execution Plan", "\nLogicalProject(col1=[$0], col2=[$1])", - "\n LogicalFilter(condition=[IS NOT TRUE($8)])", - "\n LogicalJoin(condition=[=($6, $7)], joinType=[left])", - "\n PinotLogicalExchange(distribution=[hash[6]])", - "\n LogicalProject(col1=[$0], col2=[$1], col30=[$3], $f1=[$4], col32=[$5], $f10=[$7], col34=[$2])", - "\n LogicalFilter(condition=[IS NOT TRUE($7)])", - "\n LogicalJoin(condition=[=($5, $6)], joinType=[left])", - "\n PinotLogicalExchange(distribution=[hash[5]])", - "\n LogicalProject(col1=[$0], col2=[$1], col3=[$2], col30=[$3], $f1=[$5], col32=[$2])", + "\n LogicalFilter(condition=[IS NOT TRUE($4)])", + "\n LogicalJoin(condition=[=($2, $3)], joinType=[left])", + "\n PinotLogicalExchange(distribution=[hash[2]])", + "\n LogicalProject(col1=[$0], col2=[$1], col34=[$2])", + "\n LogicalFilter(condition=[IS NOT TRUE($5)])", + "\n LogicalJoin(condition=[=($3, $4)], joinType=[left])", + "\n PinotLogicalExchange(distribution=[hash[3]])", + "\n LogicalProject(col1=[$0], col2=[$1], col3=[$2], col32=[$2])", "\n LogicalFilter(condition=[IS NOT TRUE($5)])", "\n LogicalJoin(condition=[=($3, $4)], joinType=[left])", "\n PinotLogicalExchange(distribution=[hash[3]])", @@ -294,19 +294,21 @@ "\n LogicalFilter(condition=[=($0, _UTF-8'foo')])", "\n LogicalTableScan(table=[[default, b]])", "\n PinotLogicalExchange(distribution=[hash[0]])", - "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])", - "\n PinotLogicalExchange(distribution=[hash[0]])", - "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])", - "\n LogicalProject(col3=[$2], $f1=[true])", - "\n LogicalFilter(condition=[=($0, _UTF-8'bar')])", - "\n LogicalTableScan(table=[[default, b]])", + "\n LogicalProject(col3=[$0], $f1=[$1])", + "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])", + "\n LogicalProject(col3=[$2], $f1=[true])", + "\n LogicalFilter(condition=[=($0, _UTF-8'bar')])", + "\n LogicalTableScan(table=[[default, b]])", "\n PinotLogicalExchange(distribution=[hash[0]])", - "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])", - "\n PinotLogicalExchange(distribution=[hash[0]])", - "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])", - "\n LogicalProject(col3=[$2], $f1=[true])", - "\n LogicalFilter(condition=[=($0, _UTF-8'foobar')])", - "\n LogicalTableScan(table=[[default, b]])", + "\n LogicalProject(col3=[$0], $f1=[$1])", + "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])", + "\n LogicalProject(col3=[$2], $f1=[true])", + "\n LogicalFilter(condition=[=($0, _UTF-8'foobar')])", + "\n LogicalTableScan(table=[[default, b]])", "\n" ] }, @@ -517,6 +519,62 @@ "\n LogicalTableScan(table=[[default, a]])", "\n" ] + }, + { + "description": "Multiple IN and NOT IN joins while selecting count at top", + "sql": "EXPLAIN PLAN FOR SELECT count(*) FROM a WHERE a.col1 = 'foo' AND col2 = 'xylo' AND a.col4 = 12 AND a.col5 = false AND col3 NOT IN (SELECT col3 FROM b WHERE col1='foo') AND col3 NOT IN (SELECT col3 FROM b WHERE col1='bar') AND col3 NOT IN (SELECT col3 FROM b WHERE col1='foobar') AND col3 IN (SELECT col3 FROM b WHERE col1 = 'fork')", + "output": [ + "Execution Plan", + "\nPinotLogicalAggregate(group=[{}], agg#0=[COUNT($0)])", + "\n PinotLogicalExchange(distribution=[hash])", + "\n PinotLogicalAggregate(group=[{}], agg#0=[COUNT()])", + "\n LogicalJoin(condition=[=($0, $1)], joinType=[semi])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n LogicalProject(col3=[$0])", + "\n LogicalFilter(condition=[IS NOT TRUE($3)])", + "\n LogicalJoin(condition=[=($1, $2)], joinType=[left])", + "\n PinotLogicalExchange(distribution=[hash[1]])", + "\n LogicalProject(col3=[$0], col34=[$0])", + "\n LogicalFilter(condition=[IS NOT TRUE($3)])", + "\n LogicalJoin(condition=[=($1, $2)], joinType=[left])", + "\n PinotLogicalExchange(distribution=[hash[1]])", + "\n LogicalProject(col3=[$0], col32=[$0])", + "\n LogicalFilter(condition=[IS NOT TRUE($3)])", + "\n LogicalJoin(condition=[=($1, $2)], joinType=[left])", + "\n PinotLogicalExchange(distribution=[hash[1]])", + "\n LogicalProject(col3=[$2], col30=[$2])", + "\n LogicalFilter(condition=[AND(=($0, _UTF-8'foo'), =($1, _UTF-8'xylo'), =($3, 12), NOT($4))])", + "\n LogicalTableScan(table=[[default, a]])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n LogicalProject(col3=[$0], $f1=[$1])", + "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])", + "\n LogicalProject(col3=[$2], $f1=[true])", + "\n LogicalFilter(condition=[=($0, _UTF-8'foo')])", + "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n LogicalProject(col3=[$0], $f1=[$1])", + "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])", + "\n LogicalProject(col3=[$2], $f1=[true])", + "\n LogicalFilter(condition=[=($0, _UTF-8'bar')])", + "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n LogicalProject(col3=[$0], $f1=[$1])", + "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])", + "\n LogicalProject(col3=[$2], $f1=[true])", + "\n LogicalFilter(condition=[=($0, _UTF-8'foobar')])", + "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n LogicalProject(col3=[$2])", + "\n LogicalFilter(condition=[=($0, _UTF-8'fork')])", + "\n LogicalTableScan(table=[[default, b]])", + "\n" + ] } ] }, diff --git a/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json index ac8ef927843..191dea2fdf5 100644 --- a/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json +++ b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json @@ -3404,7 +3404,7 @@ "sql": "EXPLAIN PLAN FOR WITH windowfunc AS (SELECT a.col1, ROW_NUMBER() OVER(PARTITION BY a.col2 ORDER BY a.col3) as rownum from a) SELECT a.col1, a.rownum FROM windowfunc AS a where a.rownum < 5", "output": [ "Execution Plan", - "\nLogicalProject(col1=[$0], $1=[$3])", + "\nLogicalProject(col1=[$0], w0$o0=[$3])", "\n LogicalFilter(condition=[<($3, 5)])", "\n LogicalWindow(window#0=[window(partition {1} order by [2] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])", "\n PinotLogicalSortExchange(distribution=[hash[1]], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])", @@ -3418,7 +3418,7 @@ "sql": "EXPLAIN PLAN FOR WITH windowfunc AS (SELECT a.col1, RANK() OVER(PARTITION BY a.col2 ORDER BY a.col3) as rank, DENSE_RANK() OVER(PARTITION BY a.col2 ORDER BY a.col3) as dense_rank from a) SELECT a.col1, a.rank, a.dense_rank FROM windowfunc AS a where a.dense_rank < 5", "output": [ "Execution Plan", - "\nLogicalProject(col1=[$0], $1=[$3], $2=[$4])", + "\nLogicalProject(col1=[$0], w0$o0=[$3], w0$o1=[$4])", "\n LogicalFilter(condition=[<($4, 5)])", "\n LogicalWindow(window#0=[window(partition {1} order by [2] aggs [RANK(), DENSE_RANK()])])", "\n PinotLogicalSortExchange(distribution=[hash[1]], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",