Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed performance of Nexmark q15: 3CN equals to 1CN #11866

Closed
lmatz opened this issue Aug 24, 2023 · 3 comments
Closed

Distributed performance of Nexmark q15: 3CN equals to 1CN #11866

lmatz opened this issue Aug 24, 2023 · 3 comments

Comments

@lmatz
Copy link
Contributor

lmatz commented Aug 24, 2023

The source throughput of q15 under 1CN setting is almost the same as under 3CN setting (both setting colocated with compactors)

1 CN:
SCR-20230824-j0p
3 CN:
SCR-20230824-j21

Two settings are using the same amount CPU in total.

    SELECT
        TO_CHAR(date_time, 'yyyy-MM-dd') as day,
        count(*) AS total_bids,
        count(*) filter (where price < 10000) AS rank1_bids,
        count(*) filter (where price >= 10000 and price < 1000000) AS rank2_bids,
        count(*) filter (where price >= 1000000) AS rank3_bids,
        count(distinct bidder) AS total_bidders,
        count(distinct bidder) filter (where price < 10000) AS rank1_bidders,
        count(distinct bidder) filter (where price >= 10000 and price < 1000000) AS rank2_bidders,
        count(distinct bidder) filter (where price >= 1000000) AS rank3_bidders,
        count(distinct auction) AS total_auctions,
        count(distinct auction) filter (where price < 10000) AS rank1_auctions,
        count(distinct auction) filter (where price >= 10000 and price < 1000000) AS rank2_auctions,
        count(distinct auction) filter (where price >= 1000000) AS rank3_auctions
    FROM bid
    GROUP BY to_char(date_time, 'yyyy-MM-dd');

We notice that the query is grouping by to_char(date_time, 'yyyy-MM-dd'). It is likely that during one period of time, the source input data all belong to the same day, and we are observing data skewness because of Two settings are using the same amount CPU in total.

The plan:

 StreamMaterialize { columns: [day, total_bids, rank1_bids, rank2_bids, rank3_bids, total_bidders, rank1_bidders, rank2_bidders, rank3_bidders, total_auctions, rank1_auctions, rank2_auctions, rank3_auctions], stream_key: [day], pk_columns: [day], pk_conflict: NoCheck }
 └─StreamHashAgg [append_only] { group_key: [$expr3], aggs: [count, count filter(($expr4 < 10000:Int32)), count filter(($expr4 >= 10000:Int32) AND ($expr4 < 1000000:Int32)), count filter(($expr4 >= 1000000:Int32)), count(distinct $expr5), count(distinct $expr5) filter(($expr4 < 10000:Int32)), count(distinct $expr5) filter(($expr4 >= 10000:Int32) AND ($expr4 < 1000000:Int32)), count(distinct $expr5) filter(($expr4 >= 1000000:Int32)), count(distinct $expr6), count(distinct $expr6) filter(($expr4 < 10000:Int32)), count(distinct $expr6) filter(($expr4 >= 10000:Int32) AND ($expr4 < 1000000:Int32)), count(distinct $expr6) filter(($expr4 >= 1000000:Int32))] }
   └─StreamExchange { dist: HashShard($expr3) }
     └─StreamProject { exprs: [ToChar($expr2, 'yyyy-MM-dd':Varchar) as $expr3, Field(bid, 2:Int32) as $expr4, Field(bid, 1:Int32) as $expr5, Field(bid, 0:Int32) as $expr6, _row_id] }
       └─StreamFilter { predicate: (event_type = 2:Int32) }
         └─StreamRowIdGen { row_id_index: 6 }
           └─StreamWatermarkFilter { watermark_descs: [Desc { column: $expr2, expr: ($expr2 - '00:00:05':Interval) }], output_watermarks: [$expr1, $expr2] }
             └─StreamProject { exprs: [event_type, person, auction, bid, Proctime as $expr1, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr2, _row_id], output_watermarks: [$expr1] }
               └─StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _row_id] }
(9 rows)

So we wonder if two-phase aggregation can help in this case.
But set rw_force_two_phase_agg to true does not change the plan.

Any other ideas to improve?

Let's also wait for the numbers of other systems.

@github-actions github-actions bot added this to the release-1.2 milestone Aug 24, 2023
@BugenZhao
Copy link
Member

Since there's distinct in the aggregation call, current two-phase optimization cannot be applied.

let distinct_ok =
matches!(call.agg_kind, agg_kinds::result_unaffected_by_distinct!())
|| !call.distinct;
agg_kind_ok && order_ok && distinct_ok

However, we may apply the optimization of Split Distinct Aggregation which is specifically designed for these data skew cases.

@lmatz
Copy link
Contributor Author

lmatz commented Aug 28, 2023

True, I forgot this option has already been added, let me try this

@lmatz lmatz changed the title Distributed performance of Nexmark q15 Distributed performance of Nexmark q15: 3CN equals to 1CN Aug 29, 2023
@lmatz lmatz removed this from the release-1.2 milestone Sep 11, 2023
@lmatz
Copy link
Contributor Author

lmatz commented Sep 14, 2023

Duplicated with #11964, close

@lmatz lmatz closed this as not planned Won't fix, can't repro, duplicate, stale Sep 14, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants