Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

暴露接口可以直接增加 executor/node 的黑名单 #72

Open
Aaaaaaron opened this issue Nov 1, 2019 · 0 comments
Open

暴露接口可以直接增加 executor/node 的黑名单 #72

Aaaaaaron opened this issue Nov 1, 2019 · 0 comments

Comments

@Aaaaaaron
Copy link

Aaaaaaron commented Nov 1, 2019

https://github.com/Kyligence/KAP/issues/15855

lxian pushed a commit to lxian/spark that referenced this issue Mar 24, 2021
…join can be planned as broadcast join

### What changes were proposed in this pull request?

Should not pushdown LeftSemi/LeftAnti over Aggregate for some cases.

```scala
spark.range(50000000L).selectExpr("id % 10000 as a", "id % 10000 as b").write.saveAsTable("t1")
spark.range(40000000L).selectExpr("id % 8000 as c", "id % 8000 as d").write.saveAsTable("t2")
spark.sql("SELECT distinct a, b FROM t1 INTERSECT SELECT distinct c, d FROM t2").explain
```

Before this pr:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
   +- HashAggregate(keys=[a#16L, b#17L], functions=[])
      +- HashAggregate(keys=[a#16L, b#17L], functions=[])
         +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=Kyligence#72]
            +- HashAggregate(keys=[a#16L, b#17L], functions=[])
               +- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L)], LeftSemi
                  :- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L) ASC NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS FIRST], false, 0
                  :  +- Exchange hashpartitioning(coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS, [id=Kyligence#65]
                  :     +- FileScan parquet default.t1[a#16L,b#17L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>
                  +- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L) ASC NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS FIRST], false, 0
                     +- Exchange hashpartitioning(coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L), 5), ENSURE_REQUIREMENTS, [id=Kyligence#66]
                        +- HashAggregate(keys=[c#18L, d#19L], functions=[])
                           +- Exchange hashpartitioning(c#18L, d#19L, 5), ENSURE_REQUIREMENTS, [id=Kyligence#61]
                              +- HashAggregate(keys=[c#18L, d#19L], functions=[])
                                 +- FileScan parquet default.t2[c#18L,d#19L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c:bigint,d:bigint>
```

After this pr:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
   +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=Kyligence#74]
      +- HashAggregate(keys=[a#16L, b#17L], functions=[])
         +- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L)], LeftSemi
            :- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L) ASC NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS, [id=Kyligence#67]
            :     +- HashAggregate(keys=[a#16L, b#17L], functions=[])
            :        +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=Kyligence#61]
            :           +- HashAggregate(keys=[a#16L, b#17L], functions=[])
            :              +- FileScan parquet default.t1[a#16L,b#17L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>
            +- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L) ASC NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L), 5), ENSURE_REQUIREMENTS, [id=Kyligence#68]
                  +- HashAggregate(keys=[c#18L, d#19L], functions=[])
                     +- Exchange hashpartitioning(c#18L, d#19L, 5), ENSURE_REQUIREMENTS, [id=Kyligence#63]
                        +- HashAggregate(keys=[c#18L, d#19L], functions=[])
                           +- FileScan parquet default.t2[c#18L,d#19L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c:bigint,d:bigint>
```

### Why are the changes needed?

1. Pushdown LeftSemi/LeftAnti over Aggregate will affect performance.
2. It will remove user added DISTINCT operator, e.g.: [q38](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q38.sql), [q87](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q87.sql).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test and benchmark test.

SQL | Before this PR(Seconds) | After this PR(Seconds)
-- | -- | --
q14a | 660 | 594
q14b | 660 | 600
q38 | 55 | 29
q87 | 66 | 35

Before this pr:
![image](https://user-images.githubusercontent.com/5399861/104452849-8789fc80-55de-11eb-88da-44059899f9a9.png)

After this pr:
![image](https://user-images.githubusercontent.com/5399861/104452899-9a043600-55de-11eb-9286-d8f3a23ca3b8.png)

Closes apache#31145 from wangyum/SPARK-34081.

Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant