Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed performance: NexMark Q15 #11964

Closed
Tracked by #11932
fuyufjh opened this issue Aug 30, 2023 · 7 comments
Closed
Tracked by #11932

Distributed performance: NexMark Q15 #11964

fuyufjh opened this issue Aug 30, 2023 · 7 comments

Comments

@fuyufjh
Copy link
Member

fuyufjh commented Aug 30, 2023

The CPU is not fully utilized and not balanced.

image
@github-actions github-actions bot added this to the release-1.2 milestone Aug 30, 2023
@fuyufjh
Copy link
Member Author

fuyufjh commented Aug 31, 2023

Tried to use expand-based distinct aggregation but the performance was still unsatisfactory

https://buildkite.com/risingwave-test/nexmark-benchmark/builds/1860#018a4517-24cf-493b-91e3-6cbccf5fcc7f

@fuyufjh fuyufjh self-assigned this Aug 31, 2023
@fuyufjh
Copy link
Member Author

fuyufjh commented Sep 11, 2023

Recently I was not working on this but @st1page was. Reassigned.

@lmatz
Copy link
Contributor

lmatz commented Sep 14, 2023

This is the Flink plan after enabling table.optimizer.distinct-agg.split.enabled: true

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.nexmark_q15], fields=[day, $f1, $f2, $f3, $f4, $f5, $f6, $f7, $f8, $f9, $f10, $f11, $f12])
+- GroupAggregate(groupBy=[day], partialFinalType=[FINAL], select=[day, $SUM0_RETRACT($f3) AS $f1, $SUM0_RETRACT($f4) AS $f2, $SUM0_RETRACT($f5) AS $f3, $SUM0_RETRACT($f6_0) AS $f4, $SUM0_RETRACT($f7_0) AS $f5, $SUM0_RETRACT($f8) AS $f6, $SUM0_RETRACT($f9) AS $f7, $SUM0_RETRACT($f10) AS $f8, $SUM0_RETRACT($f11) AS $f9, $SUM0_RETRACT($f12) AS $f10, $SUM0_RETRACT($f13) AS $f11, $SUM0_RETRACT($f14) AS $f12])
   +- Exchange(distribution=[hash[day]])
      +- GroupAggregate(groupBy=[day, $f6, $f7], partialFinalType=[PARTIAL], select=[day, $f6, $f7, COUNT(*) FILTER $g_3 AS $f3, COUNT(*) FILTER $g_30 AS $f4, COUNT(*) FILTER $g_31 AS $f5, COUNT(*) FILTER $g_32 AS $f6_0, COUNT(DISTINCT bidder) FILTER $g_1 AS $f7_0, COUNT(DISTINCT bidder) FILTER $g_10 AS $f8, COUNT(DISTINCT bidder) FILTER $g_11 AS $f9, COUNT(DISTINCT bidder) FILTER $g_12 AS $f10, COUNT(DISTINCT auction) FILTER $g_2 AS $f11, COUNT(DISTINCT auction) FILTER $g_20 AS $f12, COUNT(DISTINCT auction) FILTER $g_21 AS $f13, COUNT(DISTINCT auction) FILTER $g_22 AS $f14])
         +- Exchange(distribution=[hash[day, $f6, $f7]])
            +- Calc(select=[day, $f1, $f2, $f3, bidder, auction, $f6, $f7, =($e, 3) AS $g_3, AND(=($e, 3), $f1) AS $g_30, AND(=($e, 3), $f2) AS $g_31, AND(=($e, 3), $f3) AS $g_32, =($e, 1) AS $g_1, AND(=($e, 1), $f1) AS $g_10, AND(=($e, 1), $f2) AS $g_11, AND(=($e, 1), $f3) AS $g_12, =($e, 2) AS $g_2, AND(=($e, 2), $f1) AS $g_20, AND(=($e, 2), $f2) AS $g_21, AND(=($e, 2), $f3) AS $g_22])
               +- Expand(projects=[{day, $f1, $f2, $f3, bidder, auction, $f6, null AS $f7, 1 AS $e}, {day, $f1, $f2, $f3, bidder, auction, null AS $f6, $f7, 2 AS $e}, {day, $f1, $f2, $f3, bidder, auction, null AS $f6, null AS $f7, 3 AS $e}])
                  +- Calc(select=[DATE_FORMAT(CAST(dateTime AS TIMESTAMP(3)), _UTF-16LE'yyyy-MM-dd') AS day, IS TRUE(<(bid.price, 10000)) AS $f1, IS TRUE(SEARCH(bid.price, Sarg[[10000..1000000)])) AS $f2, IS TRUE(>=(bid.price, 1000000)) AS $f3, bid.bidder AS bidder, bid.auction AS auction, MOD(HASH_CODE(bid.bidder), 1024) AS $f6, MOD(HASH_CODE(bid.auction), 1024) AS $f7], where=[=(event_type, 2)])
                     +- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
                        +- Calc(select=[event_type, bid, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
                           +- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.nexmark_q15], fields=[day, $f1, $f2, $f3, $f4, $f5, $f6, $f7, $f8, $f9, $f10, $f11, $f12])
+- GroupAggregate(groupBy=[day], partialFinalType=[FINAL], select=[day, $SUM0_RETRACT($f3) AS $f1, $SUM0_RETRACT($f4) AS $f2, $SUM0_RETRACT($f5) AS $f3, $SUM0_RETRACT($f6_0) AS $f4, $SUM0_RETRACT($f7_0) AS $f5, $SUM0_RETRACT($f8) AS $f6, $SUM0_RETRACT($f9) AS $f7, $SUM0_RETRACT($f10) AS $f8, $SUM0_RETRACT($f11) AS $f9, $SUM0_RETRACT($f12) AS $f10, $SUM0_RETRACT($f13) AS $f11, $SUM0_RETRACT($f14) AS $f12])
   +- Exchange(distribution=[hash[day]])
      +- GroupAggregate(groupBy=[day, $f6, $f7], partialFinalType=[PARTIAL], select=[day, $f6, $f7, COUNT(*) FILTER $g_3 AS $f3, COUNT(*) FILTER $g_30 AS $f4, COUNT(*) FILTER $g_31 AS $f5, COUNT(*) FILTER $g_32 AS $f6_0, COUNT(DISTINCT bidder) FILTER $g_1 AS $f7_0, COUNT(DISTINCT bidder) FILTER $g_10 AS $f8, COUNT(DISTINCT bidder) FILTER $g_11 AS $f9, COUNT(DISTINCT bidder) FILTER $g_12 AS $f10, COUNT(DISTINCT auction) FILTER $g_2 AS $f11, COUNT(DISTINCT auction) FILTER $g_20 AS $f12, COUNT(DISTINCT auction) FILTER $g_21 AS $f13, COUNT(DISTINCT auction) FILTER $g_22 AS $f14])
         +- Exchange(distribution=[hash[day, $f6, $f7]])
            +- Calc(select=[day, $f1, $f2, $f3, bidder, auction, $f6, $f7, ($e = 3) AS $g_3, (($e = 3) AND $f1) AS $g_30, (($e = 3) AND $f2) AS $g_31, (($e = 3) AND $f3) AS $g_32, ($e = 1) AS $g_1, (($e = 1) AND $f1) AS $g_10, (($e = 1) AND $f2) AS $g_11, (($e = 1) AND $f3) AS $g_12, ($e = 2) AS $g_2, (($e = 2) AND $f1) AS $g_20, (($e = 2) AND $f2) AS $g_21, (($e = 2) AND $f3) AS $g_22])
               +- Expand(projects=[{day, $f1, $f2, $f3, bidder, auction, $f6, null AS $f7, 1 AS $e}, {day, $f1, $f2, $f3, bidder, auction, null AS $f6, $f7, 2 AS $e}, {day, $f1, $f2, $f3, bidder, auction, null AS $f6, null AS $f7, 3 AS $e}])
                  +- Calc(select=[DATE_FORMAT(CAST(dateTime AS TIMESTAMP(3)), 'yyyy-MM-dd') AS day, (bid.price < 10000) IS TRUE AS $f1, SEARCH(bid.price, Sarg[[10000..1000000)]) IS TRUE AS $f2, (bid.price >= 1000000) IS TRUE AS $f3, bid.bidder AS bidder, bid.auction AS auction, MOD(HASH_CODE(bid.bidder), 1024) AS $f6, MOD(HASH_CODE(bid.auction), 1024) AS $f7], where=[(event_type = 2)])
                     +- WatermarkAssigner(rowtime=[dateTime], watermark=[(dateTime - 4000:INTERVAL SECOND)])
                        +- Calc(select=[event_type, bid, CASE((event_type = 0), person.dateTime, (event_type = 1), auction.dateTime, bid.dateTime) AS dateTime])
                           +- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])

Risingwave with both split and two phase enabled:
https://github.com/risingwavelabs/risingwave/blob/main/src/frontend/planner_test/tests/testdata/output/nexmark_source.yaml#L1171-L1182

    StreamMaterialize { columns: [day, total_bids, rank1_bids, rank2_bids, rank3_bids, total_bidders, rank1_bidders, rank2_bidders, rank3_bidders, total_auctions, rank1_auctions, rank2_auctions, rank3_auctions], stream_key: [day], pk_columns: [day], pk_conflict: NoCheck }
    └─StreamProject { exprs: [$expr1, sum0(sum0(count) filter((flag = 0:Int64))), sum0(sum0(count filter((price < 10000:Int32))) filter((flag = 0:Int64))), sum0(sum0(count filter((price >= 10000:Int32) AND (price < 1000000:Int32))) filter((flag = 0:Int64))), sum0(sum0(count filter((price >= 1000000:Int32))) filter((flag = 0:Int64))), sum0(count(bidder) filter((flag = 1:Int64))), sum0(count(bidder) filter((count filter((price < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64))), sum0(count(bidder) filter((count filter((price >= 10000:Int32) AND (price < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64))), sum0(count(bidder) filter((count filter((price >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64))), sum0(count(auction) filter((flag = 2:Int64))), sum0(count(auction) filter((count filter((price < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64))), sum0(count(auction) filter((count filter((price >= 10000:Int32) AND (price < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))), sum0(count(auction) filter((count filter((price >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)))] }
      └─StreamHashAgg { group_key: [$expr1], aggs: [sum0(sum0(count) filter((flag = 0:Int64))), sum0(sum0(count filter((price < 10000:Int32))) filter((flag = 0:Int64))), sum0(sum0(count filter((price >= 10000:Int32) AND (price < 1000000:Int32))) filter((flag = 0:Int64))), sum0(sum0(count filter((price >= 1000000:Int32))) filter((flag = 0:Int64))), sum0(count(bidder) filter((flag = 1:Int64))), sum0(count(bidder) filter((count filter((price < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64))), sum0(count(bidder) filter((count filter((price >= 10000:Int32) AND (price < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64))), sum0(count(bidder) filter((count filter((price >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64))), sum0(count(auction) filter((flag = 2:Int64))), sum0(count(auction) filter((count filter((price < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64))), sum0(count(auction) filter((count filter((price >= 10000:Int32) AND (price < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))), sum0(count(auction) filter((count filter((price >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))), count] }
        └─StreamExchange { dist: HashShard($expr1) }
          └─StreamHashAgg { group_key: [$expr1, $expr2], aggs: [sum0(count) filter((flag = 0:Int64)), sum0(count filter((price < 10000:Int32))) filter((flag = 0:Int64)), sum0(count filter((price >= 10000:Int32) AND (price < 1000000:Int32))) filter((flag = 0:Int64)), sum0(count filter((price >= 1000000:Int32))) filter((flag = 0:Int64)), count(bidder) filter((flag = 1:Int64)), count(bidder) filter((count filter((price < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bidder) filter((count filter((price >= 10000:Int32) AND (price < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bidder) filter((count filter((price >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(auction) filter((flag = 2:Int64)), count(auction) filter((count filter((price < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(auction) filter((count filter((price >= 10000:Int32) AND (price < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(auction) filter((count filter((price >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count] }
            └─StreamProject { exprs: [$expr1, bidder, auction, flag, count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), Vnode($expr1, bidder, auction, flag) as $expr2] }
              └─StreamHashAgg [append_only] { group_key: [$expr1, bidder, auction, flag], aggs: [count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32))] }
                └─StreamExchange { dist: HashShard($expr1, bidder, auction, flag) }
                  └─StreamExpand { column_subsets: [[$expr1], [$expr1, bidder], [$expr1, auction]] }
                    └─StreamProject { exprs: [ToChar(date_time, 'yyyy-MM-dd':Varchar) as $expr1, price, bidder, auction, _row_id] }
                      └─StreamRowIdGen { row_id_index: 7 }
                        └─StreamSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }

The group_key in aggregation is different

$expr1, bidder, auction, flag

versus

day, $f6, $f7
MOD(HASH_CODE(bid.bidder), 1024) AS $f6, MOD(HASH_CODE(bid.auction), 1024) AS $f7

What's flag in RW's group key

Does eowc make a huge difference, I guess not?

@st1page
Copy link
Contributor

st1page commented Sep 14, 2023

What's flag in RW's group key

The group generated by the expand operator

Does eowc make a huge difference, I guess not?

Not

@fuyufjh
Copy link
Member Author

fuyufjh commented Sep 14, 2023

Flink's approach is not that different from us. They also have an Expand and then filter the rows with $e, which is called flag in RisingWave.

However, they don't put $e in group key, instead, they filter the $e = 1/2/3 in agg call's filtering condition e.g. COUNT(*) FILTER $g_30 AS $f4

@lmatz
Copy link
Contributor

lmatz commented Sep 14, 2023

link #12305

Copy link
Contributor

This issue has been open for 60 days with no activity. Could you please update the status? Feel free to continue discussion or close as not planned.

@fuyufjh fuyufjh closed this as not planned Won't fix, can't repro, duplicate, stale Jun 20, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants