Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: improve nexmark q8(EOWC) (w/ and w/o scaling up) performance #14986

Open
Tracked by #14448
lmatz opened this issue Feb 4, 2024 · 2 comments
Open
Tracked by #14448

perf: improve nexmark q8(EOWC) (w/ and w/o scaling up) performance #14986

lmatz opened this issue Feb 4, 2024 · 2 comments
Labels
help wanted Issues that need help from contributors no-issue-activity type/perf

Comments

@lmatz
Copy link
Contributor

lmatz commented Feb 4, 2024

nightly-20240127

4X:

  1. https://buildkite.com/risingwave-test/nexmark-benchmark/builds/2962
  2. https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1706892320000&orgId=1&to=1706893643000&var-datasource=Prometheus: test-useast1-eks-a&var-namespace=nexmark-ht-4x-1cn-affinity-weekly

1X:

  1. https://buildkite.com/risingwave-test/nexmark-benchmark/builds/2961
  2. https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1706890825000&orgId=1&to=1706892628000&var-datasource=Prometheus: test-useast1-eks-a&var-namespace=nexmark-1cn-affinity-lmatz

RW 4X: 1659K
RW 1X: 592K (slower)
4X/1X Ratio: 2.8

Flink: http://metabase.risingwave-cloud.xyz/question/9549-nexmark-rw-vs-flink-avg-source-throughput-all-testbeds?rw_tag=nightly-20240127&flink_tag=v1.16.0&flink_label=flink-medium-1tm-test-20230104,flink-4x-medium-1tm-test-20240104&flink_metrics=avg-job-throughput-per-second

4X/1X Ratio: 3.16

RW:

CREATE SINK nexmark_q8
    AS
    SELECT P.id,
           P.name,
           P.starttime
    FROM (SELECT id,
                 name,
                 window_start AS starttime,
                 window_end   AS endtime
          FROM
              TUMBLE(person, date_time, INTERVAL '10' SECOND)
          GROUP BY id,
                   name,
                   window_start,
                   window_end) P
             JOIN (SELECT seller,
                          window_start AS starttime,
                          window_end   AS endtime
                   FROM
                       TUMBLE(auction, date_time, INTERVAL '10' SECOND)
                   GROUP BY seller,
                            window_start,
                            window_end) A ON P.id = A.seller
        AND P.starttime = A.starttime
        AND P.endtime = A.endtime
    EMIT ON WINDOW CLOSE
    WITH ( connector = 'blackhole', type = 'append-only' );

Query Plan:

 StreamSink { type: append-only, columns: [id, name, starttime, $expr10217(hidden), $expr10224(hidden), $expr10223(hidden), $expr10225(hidden)] }
 └─StreamEowcSort { sort_column: $expr2 }
   └─StreamHashJoin [window, append_only] { type: Inner, predicate: $expr2 = $expr6 AND $expr5 = $expr8 AND $expr3 = $expr7, output_watermarks: [$expr2, $expr5, $expr6, $expr8] }
     ├─StreamExchange { dist: HashShard($expr3, $expr2, $expr5) }
     │ └─StreamAppendOnlyDedup { dedup_cols: [$expr3, $expr4, $expr2, $expr5] }
     │   └─StreamExchange { dist: HashShard($expr3, $expr4, $expr2, $expr5) }
     │     └─StreamProject { exprs: [Field(person, 0:Int32) as $expr3, Field(person, 1:Int32) as $expr4, $expr2, ($expr2 + '00:00:10':Interval) as $expr5], output_watermarks: [$expr2, $expr5] }
     │       └─StreamProject { exprs: [event_type, person, auction, $expr1, TumbleStart($expr1, '00:00:10':Interval) as $expr2, _row_id], output_watermarks: [$expr1, $expr2] }
     │         └─StreamFilter { predicate: (event_type = 0:Int32) }
     │           └─StreamShare { id: 6 }
     │             └─StreamProject { exprs: [event_type, person, auction, $expr1, _row_id], output_watermarks: [$expr1] }
     │               └─StreamFilter { predicate: ((event_type = 0:Int32) OR (event_type = 1:Int32)) }
     │                 └─StreamRowIdGen { row_id_index: 6 }
     │                   └─StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] }
     │                     └─StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _rw_kafka_timestamp, _row_id] }
     │                       └─StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _rw_kafka_timestamp, _row_id] }
     └─StreamAppendOnlyDedup { dedup_cols: [$expr7, $expr6, $expr8] }
       └─StreamExchange { dist: HashShard($expr7, $expr6, $expr8) }
         └─StreamProject { exprs: [Field(auction, 7:Int32) as $expr7, $expr6, ($expr6 + '00:00:10':Interval) as $expr8], output_watermarks: [$expr6, $expr8] }
           └─StreamProject { exprs: [event_type, person, auction, $expr1, TumbleStart($expr1, '00:00:10':Interval) as $expr6, _row_id], output_watermarks: [$expr1, $expr6] }
             └─StreamFilter { predicate: (event_type = 1:Int32) }
               └─StreamShare { id: 6 }
                 └─StreamProject { exprs: [event_type, person, auction, $expr1, _row_id], output_watermarks: [$expr1] }
                   └─StreamFilter { predicate: ((event_type = 0:Int32) OR (event_type = 1:Int32)) }
                     └─StreamRowIdGen { row_id_index: 6 }
                       └─StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] }
                         └─StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _rw_kafka_timestamp, _row_id] }
                           └─StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _rw_kafka_timestamp, _row_id] }
(28 rows)

Dist Query Plan:

 Fragment 0
 StreamSink { type: append-only, columns: [id, name, starttime, $expr10217(hidden), $expr10224(hidden), $expr10223(hidden), $expr10225(hidden)] } { tables: [ Sink: 0 ] }
 └── StreamEowcSort { sort_column: $expr2 } { tables: [ Sort: 1 ] }
     └── StreamHashJoin [window, append_only] { type: Inner, predicate: $expr2 = $expr6 AND $expr5 = $expr8 AND $expr3 = $expr7, output_watermarks: [$expr2, $expr5, $expr6, $expr8] }
         ├── tables: [ HashJoinLeft: 2, HashJoinDegreeLeft: 3, HashJoinRight: 4, HashJoinDegreeRight: 5 ]
         ├── StreamExchange Hash([0, 2, 3]) from 1
         └── StreamAppendOnlyDedup { dedup_cols: [$expr7, $expr6, $expr8] } { tables: [ AppendOnlyDedup: 9 ] }
             └── StreamExchange Hash([0, 1, 2]) from 4
 
 Fragment 1
 StreamAppendOnlyDedup { dedup_cols: [$expr3, $expr4, $expr2, $expr5] } { tables: [ AppendOnlyDedup: 6 ] }
 └── StreamExchange Hash([0, 1, 2, 3]) from 2
 
 Fragment 2
 StreamProject { exprs: [Field(person, 0:Int32) as $expr3, Field(person, 1:Int32) as $expr4, $expr2, ($expr2 + '00:00:10':Interval) as $expr5], output_watermarks: [$expr2, $expr5] }
 └── StreamProject { exprs: [event_type, person, auction, $expr1, TumbleStart($expr1, '00:00:10':Interval) as $expr2, _row_id], output_watermarks: [$expr1, $expr2] }
     └── StreamFilter { predicate: (event_type = 0:Int32) }
         └── StreamExchange NoShuffle from 3
 
 Fragment 3
 StreamProject { exprs: [event_type, person, auction, $expr1, _row_id], output_watermarks: [$expr1] }
 └── StreamFilter { predicate: ((event_type = 0:Int32) OR (event_type = 1:Int32)) }
     └── StreamRowIdGen { row_id_index: 6 }
         └── StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] } { tables: [ WatermarkFilter: 7 ] }
             └── StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _rw_kafka_timestamp, _row_id] }
                 └── StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _rw_kafka_timestamp, _row_id] } { tables: [ Source: 8 ] }
 
 Fragment 4
 StreamProject { exprs: [Field(auction, 7:Int32) as $expr7, $expr6, ($expr6 + '00:00:10':Interval) as $expr8], output_watermarks: [$expr6, $expr8] }
 └── StreamProject { exprs: [event_type, person, auction, $expr1, TumbleStart($expr1, '00:00:10':Interval) as $expr6, _row_id], output_watermarks: [$expr1, $expr6] }
     └── StreamFilter { predicate: (event_type = 1:Int32) }
         └── StreamExchange NoShuffle from 3
 
 Table 0
 ├── columns: [ kv_log_store_epoch, kv_log_store_seq_id, kv_log_store_vnode, kv_log_store_row_op, id, name, starttime, $expr10217, $expr10224, $expr10223, $expr10225 ]
 ├── primary key: [ $0 ASC, $1 ASC, $2 ASC ]
 ├── value indices: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ]
 ├── distribution key: [ 4, 6, 7 ]
 ├── read pk prefix len hint: 3
 └── vnode column idx: 2
 
 Table 1 { columns: [ $expr3, $expr4, $expr2, $expr5, $expr7, $expr6, $expr8 ], primary key: [ $2 ASC, $0 ASC, $3 ASC, $1 ASC ], value indices: [ 0, 1, 2, 3, 4, 5, 6 ], distribution key: [ 0, 2, 3 ], read pk prefix len hint: 0 }
 
 Table 2 { columns: [ $expr3, $expr4, $expr2, $expr5 ], primary key: [ $2 ASC, $3 ASC, $0 ASC, $1 ASC ], value indices: [ 0, 1, 2, 3 ], distribution key: [ 0, 2, 3 ], read pk prefix len hint: 3 }
 
 Table 3 { columns: [ $expr2, $expr5, $expr3, $expr4, _degree ], primary key: [ $0 ASC, $1 ASC, $2 ASC, $3 ASC ], value indices: [ 4 ], distribution key: [ 2, 0, 1 ], read pk prefix len hint: 3 }
 
 Table 4 { columns: [ $expr7, $expr6, $expr8 ], primary key: [ $1 ASC, $2 ASC, $0 ASC ], value indices: [ 0, 1, 2 ], distribution key: [ 0, 1, 2 ], read pk prefix len hint: 3 }
 
 Table 5 { columns: [ $expr6, $expr8, $expr7, _degree ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 3 ], distribution key: [ 2, 0, 1 ], read pk prefix len hint: 3 }
 
 Table 6 { columns: [ $expr3, $expr4, $expr2, $expr5 ], primary key: [ $0 ASC, $1 ASC, $2 ASC, $3 ASC ], value indices: [ 0, 1, 2, 3 ], distribution key: [ 0, 1, 2, 3 ], read pk prefix len hint: 4 }
 
 Table 7 { columns: [ vnode, offset ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 }
 
 Table 8 { columns: [ partition_id, offset_info ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [], read pk prefix len hint: 1 }
 
 Table 9 { columns: [ $expr7, $expr6, $expr8 ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 0, 1, 2 ], distribution key: [ 0, 1, 2 ], read pk prefix len hint: 3 }
 
(59 rows)

Flink:

INSERT INTO nexmark_q8
    SELECT P.id, P.name, P.starttime
    FROM (
      SELECT P.id, P.name,
             TUMBLE_START(P.dateTime, INTERVAL '10' SECOND) AS starttime,
             TUMBLE_END(P.dateTime, INTERVAL '10' SECOND) AS endtime
      FROM person P
      GROUP BY P.id, P.name, TUMBLE(P.dateTime, INTERVAL '10' SECOND)
    ) P
    JOIN (
      SELECT A.seller,
             TUMBLE_START(A.dateTime, INTERVAL '10' SECOND) AS starttime,
             TUMBLE_END(A.dateTime, INTERVAL '10' SECOND) AS endtime
      FROM auction A
      GROUP BY A.seller, TUMBLE(A.dateTime, INTERVAL '10' SECOND)
    ) A
    ON P.id = A.seller AND P.starttime = A.starttime AND P.endtime = A.endtime;

Query Plan:

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.nexmark_q8], fields=[id, name, starttime])
+- Calc(select=[id, name, starttime])
   +- Join(joinType=[InnerJoin], where=[AND(=(id, seller), =(starttime, starttime0), =(endtime, endtime0))], select=[id, name, starttime, endtime, seller, starttime0, endtime0], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
      :- Exchange(distribution=[hash[id, starttime, endtime]])
      :  +- Calc(select=[$f0 AS id, $f1 AS name, w$start AS starttime, w$end AS endtime])
      :     +- GroupWindowAggregate(groupBy=[$f0, $f1], window=[TumblingGroupWindow('w$, dateTime, 10000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[$f0, $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
      :        +- Exchange(distribution=[hash[$f0, $f1]])
      :           +- Calc(select=[person.id AS $f0, person.name AS $f1, dateTime], where=[=(event_type, 0)])
      :              +- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
      :                 +- Calc(select=[event_type, person, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
      :                    +- TableSourceScan(table=[[default_catalog, default_database, kafka]], fields=[event_type, person, auction, bid])
      +- Exchange(distribution=[hash[seller, starttime, endtime]])
         +- Calc(select=[$f0 AS seller, w$start AS starttime, w$end AS endtime])
            +- GroupWindowAggregate(groupBy=[$f0], window=[TumblingGroupWindow('w$, dateTime, 10000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[$f0, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
               +- Exchange(distribution=[hash[$f0]])
                  +- Calc(select=[auction.seller AS $f0, dateTime], where=[=(event_type, 1)])
                     +- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
                        +- Calc(select=[event_type, auction, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
                           +- TableSourceScan(table=[[default_catalog, default_database, kafka]], fields=[event_type, person, auction, bid])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.nexmark_q8], fields=[id, name, starttime])
+- Calc(select=[id, name, starttime])
   +- Join(joinType=[InnerJoin], where=[((id = seller) AND (starttime = starttime0) AND (endtime = endtime0))], select=[id, name, starttime, endtime, seller, starttime0, endtime0], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
      :- Exchange(distribution=[hash[id, starttime, endtime]])
      :  +- Calc(select=[$f0 AS id, $f1 AS name, w$start AS starttime, w$end AS endtime])
      :     +- GroupWindowAggregate(groupBy=[$f0, $f1], window=[TumblingGroupWindow('w$, dateTime, 10000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[$f0, $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
      :        +- Exchange(distribution=[hash[$f0, $f1]])
      :           +- Calc(select=[person.id AS $f0, person.name AS $f1, dateTime], where=[(event_type = 0)])
      :              +- WatermarkAssigner(rowtime=[dateTime], watermark=[(dateTime - 4000:INTERVAL SECOND)])
      :                 +- Calc(select=[event_type, person, CASE((event_type = 0), person.dateTime, (event_type = 1), auction.dateTime, bid.dateTime) AS dateTime])
      :                    +- TableSourceScan(table=[[default_catalog, default_database, kafka]], fields=[event_type, person, auction, bid])(reuse_id=[1])
      +- Exchange(distribution=[hash[seller, starttime, endtime]])
         +- Calc(select=[$f0 AS seller, w$start AS starttime, w$end AS endtime])
            +- GroupWindowAggregate(groupBy=[$f0], window=[TumblingGroupWindow('w$, dateTime, 10000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[$f0, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
               +- Exchange(distribution=[hash[$f0]])
                  +- Calc(select=[auction.seller AS $f0, dateTime], where=[(event_type = 1)])
                     +- WatermarkAssigner(rowtime=[dateTime], watermark=[(dateTime - 4000:INTERVAL SECOND)])
                        +- Calc(select=[event_type, auction, CASE((event_type = 0), person.dateTime, (event_type = 1), auction.dateTime, bid.dateTime) AS dateTime])
                           +- Reused(reference_id=[1])
@github-actions github-actions bot added this to the release-1.7 milestone Feb 4, 2024
@lmatz
Copy link
Contributor Author

lmatz commented Feb 4, 2024

Some notable phenomenon:

1. why the executor cache for materialized view when no materialized view is created?

And why such an executor cache pattern?

4X:
SCR-20240205-dm

1X:
SCR-20240205-dp

2. Why join table miss rates 80% and 100%, so high? If 100%, the cache is effectively 100% useless.

4X:
SCR-20240205-fq

1X:
SCR-20240205-fv

3. Join Actor Input Blocking Time Ratio and Join Actor Match Duration Per Second are both about 4 times worse.

4X:
SCR-20240205-ij

1X:
SCR-20240205-in

4. AppendOnlyDedup cannot keep up when scaling up

4X:
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1706892320000&orgId=1&to=1706893643000&var-datasource=Prometheus:%20test-useast1-eks-a&var-namespace=nexmark-ht-4x-1cn-affinity-weekly&editPanel=63
SCR-20240205-vr

1X:
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1706890825000&orgId=1&to=1706892628000&var-datasource=Prometheus:%20test-useast1-eks-a&var-namespace=nexmark-1cn-affinity-lmatz&editPanel=63
SCR-20240205-vn

5. The HashJoin generally keeps up when scaling up.

4X:
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1706892320000&orgId=1&to=1706893643000&var-datasource=Prometheus:%20test-useast1-eks-a&var-namespace=nexmark-ht-4x-1cn-affinity-weekly&editPanel=63
SCR-20240205-vr

1X:
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1706890825000&orgId=1&to=1706892628000&var-datasource=Prometheus:%20test-useast1-eks-a&var-namespace=nexmark-1cn-affinity-lmatz&editPanel=63
SCR-20240205-vn

6. The Sort operator does not keep up when scaling up

4X:
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1706892320000&orgId=1&to=1706893643000&var-datasource=Prometheus:%20test-useast1-eks-a&var-namespace=nexmark-ht-4x-1cn-affinity-weekly&editPanel=63
SCR-20240205-vr

1X:
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?editPanel=63&from=1706890825000&orgId=1&to=1706892628000&var-datasource=Prometheus:+test-useast1-eks-a&var-namespace=nexmark-1cn-affinity-lmatz
SCR-20240205-vn

7. Project cannot keep up when scaling up (hard to believe

4X:
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1706892320000&orgId=1&to=1706893643000&var-datasource=Prometheus:%20test-useast1-eks-a&var-namespace=nexmark-ht-4x-1cn-affinity-weekly&editPanel=63

SCR-20240205-1bd

1X:
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?editPanel=63&from=1706890825000&orgId=1&to=1706892628000&var-datasource=Prometheus:+test-useast1-eks-a&var-namespace=nexmark-1cn-affinity-lmatz

SCR-20240205-1bf

@lmatz lmatz added help wanted Issues that need help from contributors type/perf labels Feb 4, 2024
@lmatz lmatz changed the title perf: improve nexmark q8(EOWC) scaling up performance perf: improve nexmark q8(EOWC) (scaling up) performance Feb 4, 2024
@lmatz lmatz changed the title perf: improve nexmark q8(EOWC) (scaling up) performance perf: improve nexmark q8(EOWC) (w/ and w/o scaling up) performance Feb 4, 2024
@lmatz lmatz removed this from the release-1.7 milestone Mar 6, 2024
Copy link
Contributor

This issue has been open for 60 days with no activity. Could you please update the status? Feel free to continue discussion or close as not planned.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted Issues that need help from contributors no-issue-activity type/perf
Projects
None yet
Development

No branches or pull requests

1 participant