forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
暴露出来当前有多少 job 和 task 在 pending #55
Comments
7mming7
pushed a commit
that referenced
this issue
Nov 4, 2020
…or its output partitioning ### What changes were proposed in this pull request? Currently, the `BroadcastHashJoinExec`'s `outputPartitioning` only uses the streamed side's `outputPartitioning`. However, if the join type of `BroadcastHashJoinExec` is an inner-like join, the build side's info (the join keys) can be added to `BroadcastHashJoinExec`'s `outputPartitioning`. For example, ```Scala spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "500") val t1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1") val t2 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i2", "j2") val t3 = (0 until 20).map(i => (i % 7, i % 11)).toDF("i3", "j3") val t4 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i4", "j4") // join1 is a sort merge join. val join1 = t1.join(t2, t1("i1") === t2("i2")) // join2 is a broadcast join where t3 is broadcasted. val join2 = join1.join(t3, join1("i1") === t3("i3")) // Join on the column from the broadcasted side (i3). val join3 = join2.join(t4, join2("i3") === t4("i4")) join3.explain ``` You see that `Exchange hashpartitioning(i2#103, 200)` is introduced because there is no output partitioning info from the build side. ``` == Physical Plan == *(6) SortMergeJoin [i3#29], [i4#40], Inner :- *(4) Sort [i3#29 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(i3#29, 200), true, [id=#55] : +- *(3) BroadcastHashJoin [i1#7], [i3#29], Inner, BuildRight : :- *(3) SortMergeJoin [i1#7], [i2#18], Inner : : :- *(1) Sort [i1#7 ASC NULLS FIRST], false, 0 : : : +- Exchange hashpartitioning(i1#7, 200), true, [id=#28] : : : +- LocalTableScan [i1#7, j1#8] : : +- *(2) Sort [i2#18 ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(i2#18, 200), true, [id=#29] : : +- LocalTableScan [i2#18, j2#19] : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#34] : +- LocalTableScan [i3#29, j3#30] +- *(5) Sort [i4#40 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(i4#40, 200), true, [id=#39] +- LocalTableScan [i4#40, j4#41] ``` This PR proposes to introduce output partitioning for the build side for `BroadcastHashJoinExec` if the streamed side has a `HashPartitioning` or a collection of `HashPartitioning`s. There is a new internal config `spark.sql.execution.broadcastHashJoin.outputPartitioningExpandLimit`, which can limit the number of partitioning a `HashPartitioning` can expand to. It can be set to "0" to disable this feature. ### Why are the changes needed? To remove unnecessary shuffle. ### Does this PR introduce _any_ user-facing change? Yes, now the shuffle in the above example can be eliminated: ``` == Physical Plan == *(5) SortMergeJoin [i3#108], [i4#119], Inner :- *(3) Sort [i3#108 ASC NULLS FIRST], false, 0 : +- *(3) BroadcastHashJoin [i1#86], [i3#108], Inner, BuildRight : :- *(3) SortMergeJoin [i1#86], [i2#97], Inner : : :- *(1) Sort [i1#86 ASC NULLS FIRST], false, 0 : : : +- Exchange hashpartitioning(i1#86, 200), true, [id=#120] : : : +- LocalTableScan [i1#86, j1#87] : : +- *(2) Sort [i2#97 ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(i2#97, 200), true, [id=#121] : : +- LocalTableScan [i2#97, j2#98] : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#126] : +- LocalTableScan [i3#108, j3#109] +- *(4) Sort [i4#119 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(i4#119, 200), true, [id=#130] +- LocalTableScan [i4#119, j4#120] ``` ### How was this patch tested? Added new tests. Closes apache#28676 from imback82/broadcast_join_output. Authored-by: Terry Kim <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
No description provided.
The text was updated successfully, but these errors were encountered: