Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: improve tpc-h q4 performance (single-topic) #14811

Closed
Tracked by #15036
lmatz opened this issue Jan 26, 2024 · 17 comments
Closed
Tracked by #15036

perf: improve tpc-h q4 performance (single-topic) #14811

lmatz opened this issue Jan 26, 2024 · 17 comments
Assignees

Comments

@github-actions github-actions bot added this to the release-1.7 milestone Jan 26, 2024
@lmatz
Copy link
Contributor Author

lmatz commented Jan 26, 2024

RW Query:

create sink tpch_q4 as
    select
    	o_orderpriority,
    	count(*) as order_count
    from
    	orders
    where
    	o_orderdate >= date '1997-07-01'
    	and o_orderdate < date '2099-07-01' + interval '3' month
    	and exists (
    		select
    			*
    		from
    			lineitem
    		where
    			l_orderkey = o_orderkey
    			and l_commitdate < l_receiptdate
    	)
    group by
    	o_orderpriority
    order by
    	o_orderpriority
    limit 1
    with ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');

Plan:

 StreamSink { type: append-only, columns: [o_orderpriority, order_count] }
 └─StreamProject { exprs: [$expr2, count] }
   └─StreamTopN { order: [$expr2 ASC], limit: 1, offset: 0 }
     └─StreamExchange { dist: Single }
       └─StreamGroupTopN { order: [$expr2 ASC], limit: 1, offset: 0, group_key: [$expr4] }
         └─StreamProject { exprs: [$expr2, count, Vnode($expr2) as $expr4] }
           └─StreamHashAgg { group_key: [$expr2], aggs: [count] }
             └─StreamExchange { dist: HashShard($expr2) }
               └─StreamHashJoin { type: LeftSemi, predicate: $expr1 = $expr3 }
                 ├─StreamExchange { dist: HashShard($expr1) }
                 │ └─StreamProject { exprs: [Field(orders, 0:Int32) as $expr1, Field(orders, 5:Int32) as $expr2, _row_id] }
                 │   └─StreamFilter { predicate: (Field(orders, 4:Int32) >= '1997-07-01':Date) AND (Field(orders, 4:Int32) < '2099-10-01 00:00:00':Timestamp) AND (eventType = 'orders':Varchar) }
                 │     └─StreamShare { id: 4 }
                 │       └─StreamProject { exprs: [eventType, lineitem, orders, _row_id] }
                 │         └─StreamFilter { predicate: ((((Field(orders, 4:Int32) >= '1997-07-01':Date) AND (Field(orders, 4:Int32) < '2099-10-01 00:00:00':Timestamp)) AND (eventType = 'orders':Varchar)) OR ((Field(lineitem, 11:Int32) < Field(lineitem, 12:Int32)) AND (eventType = 'lineitem':Varchar))) }
                 │           └─StreamRowIdGen { row_id_index: 10 }
                 │             └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
                 └─StreamExchange { dist: HashShard($expr3) }
                   └─StreamProject { exprs: [Field(lineitem, 0:Int32) as $expr3, _row_id] }
                     └─StreamFilter { predicate: (Field(lineitem, 11:Int32) < Field(lineitem, 12:Int32)) AND (eventType = 'lineitem':Varchar) }
                       └─StreamShare { id: 4 }
                         └─StreamProject { exprs: [eventType, lineitem, orders, _row_id] }
                           └─StreamFilter { predicate: ((((Field(orders, 4:Int32) >= '1997-07-01':Date) AND (Field(orders, 4:Int32) < '2099-10-01 00:00:00':Timestamp)) AND (eventType = 'orders':Varchar)) OR ((Field(lineitem, 11:Int32) < Field(lineitem, 12:Int32)) AND (eventType = 'lineitem':Varchar))) }
                             └─StreamRowIdGen { row_id_index: 10 }
                               └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
(25 rows)

Dist Plan:

 Fragment 0
 StreamSink { type: append-only, columns: [o_orderpriority, order_count] }
 ├── tables: [ Sink: 0 ]
 └── StreamProject { exprs: [$expr2, count] }
     └── StreamTopN { order: [$expr2 ASC], limit: 1, offset: 0 }
         ├── tables: [ TopN: 1 ]
         └── StreamExchange Single from 1
 
 Fragment 1
 StreamGroupTopN { order: [$expr2 ASC], limit: 1, offset: 0, group_key: [$expr4] }
 ├── tables: [ GroupTopN: 2 ]
 └── StreamProject { exprs: [$expr2, count, Vnode($expr2) as $expr4] }
     └── StreamHashAgg { group_key: [$expr2], aggs: [count] }
         ├── tables: [ HashAggState: 3 ]
         └── StreamExchange Hash([0]) from 2
 
 Fragment 2
 StreamHashJoin { type: LeftSemi, predicate: $expr1 = $expr3 }
 ├── tables:
 │   ┌── HashJoinLeft: 4
 │   ├── HashJoinDegreeLeft: 5
 │   ├── HashJoinRight: 6
 │   └── HashJoinDegreeRight: 7
 ├── StreamExchange Hash([0]) from 3
 └── StreamExchange Hash([0]) from 5
 
 Fragment 3
 StreamProject { exprs: [Field(orders, 0:Int32) as $expr1, Field(orders, 5:Int32) as $expr2, _row_id] }
 └── StreamFilter { predicate: (Field(orders, 4:Int32) >= '1997-07-01':Date) AND (Field(orders, 4:Int32) < '2099-10-01 00:00:00':Timestamp) AND (eventType = 'orders':Varchar) }
     └── StreamExchange NoShuffle from 4
 
 Fragment 4
 StreamProject { exprs: [eventType, lineitem, orders, _row_id] }
 └── StreamFilter { predicate: ((((Field(orders, 4:Int32) >= '1997-07-01':Date) AND (Field(orders, 4:Int32) < '2099-10-01 00:00:00':Timestamp)) AND (eventType = 'orders':Varchar)) OR ((Field(lineitem, 11:Int32) < Field(lineitem, 12:Int32)) AND (eventType = 'lineitem':Varchar))) }
     └── StreamRowIdGen { row_id_index: 10 }
         └── StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] } { tables: [ Source: 8 ] }
 
 Fragment 5
 StreamProject { exprs: [Field(lineitem, 0:Int32) as $expr3, _row_id] }
 └── StreamFilter { predicate: (Field(lineitem, 11:Int32) < Field(lineitem, 12:Int32)) AND (eventType = 'lineitem':Varchar) }
     └── StreamExchange NoShuffle from 4
 
 Table 0 { columns: [ kv_log_store_epoch, kv_log_store_seq_id, kv_log_store_row_op, o_orderpriority, order_count ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 0, 1, 2, 3, 4 ], distribution key: [], read pk prefix len hint: 2 }
 
 Table 1 { columns: [ $expr2, count, $expr4 ], primary key: [ $0 ASC, $2 ASC ], value indices: [ 0, 1, 2 ], distribution key: [], read pk prefix len hint: 0 }
 
 Table 2 { columns: [ $expr2, count, $expr4 ], primary key: [ $2 ASC, $0 ASC ], value indices: [ 0, 1, 2 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 2 }
 
 Table 3 { columns: [ $expr2, count ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 4 { columns: [ $expr1, $expr2, _row_id ], primary key: [ $0 ASC, $2 ASC ], value indices: [ 0, 1, 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 5 { columns: [ $expr1, _row_id, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 6 { columns: [ $expr3, _row_id ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 0, 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 7 { columns: [ $expr3, _row_id, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 8 { columns: [ partition_id, offset_info ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [], read pk prefix len hint: 1 }
 
(60 rows)

@lmatz
Copy link
Contributor Author

lmatz commented Jan 26, 2024

Flink Query:

INSERT INTO tpch_q4
    select
      o_orderpriority,
      count(*) as order_count
    from
      orders
    where
      o_orderdate >= date '1997-07-01'
      and o_orderdate < date '2099-07-01' + interval '3' month
      and exists (
        select
          *
        from
          lineitem
        where
          l_orderkey = o_orderkey
          and l_commitdate < l_receiptdate
      )
    group by
      o_orderpriority
    order by
      o_orderpriority
    LIMIT 1;

Plan:

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.tpch_q4], fields=[o_orderpriority, order_count])
+- SortLimit(orderBy=[o_orderpriority ASC], offset=[0], fetch=[1], strategy=[UpdateFastStrategy[0]])
   +- Exchange(distribution=[single])
      +- GroupAggregate(groupBy=[o_orderpriority], select=[o_orderpriority, COUNT(*) AS order_count])
         +- Exchange(distribution=[hash[o_orderpriority]])
            +- Calc(select=[o_orderpriority])
               +- Join(joinType=[LeftSemiJoin], where=[=(l_orderkey, o_orderkey)], select=[o_orderkey, o_orderpriority], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
                  :- Exchange(distribution=[hash[o_orderkey]])
                  :  +- Calc(select=[orders.o_orderkey AS o_orderkey, orders.o_orderpriority AS o_orderpriority], where=[AND(SEARCH(eventType, Sarg[_UTF-16LE'orders':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), SEARCH(orders.o_orderdate, Sarg[[1997-07-01..2099-10-01)]))])
                  :     +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])
                  +- Exchange(distribution=[hash[l_orderkey]])
                     +- Calc(select=[lineitem.l_orderkey AS l_orderkey], where=[AND(=(eventType, _UTF-16LE'lineitem':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), <(lineitem.l_commitdate, lineitem.l_receiptdate))])
                        +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.tpch_q4], fields=[o_orderpriority, order_count])
+- SortLimit(orderBy=[o_orderpriority ASC], offset=[0], fetch=[1], strategy=[UpdateFastStrategy[0]])
   +- Exchange(distribution=[single])
      +- GroupAggregate(groupBy=[o_orderpriority], select=[o_orderpriority, COUNT(*) AS order_count])
         +- Exchange(distribution=[hash[o_orderpriority]])
            +- Calc(select=[o_orderpriority])
               +- Join(joinType=[LeftSemiJoin], where=[(l_orderkey = o_orderkey)], select=[o_orderkey, o_orderpriority], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
                  :- Exchange(distribution=[hash[o_orderkey]])
                  :  +- Calc(select=[orders.o_orderkey AS o_orderkey, orders.o_orderpriority AS o_orderpriority], where=[(SEARCH(eventType, Sarg[_UTF-16LE'orders']) AND SEARCH(orders.o_orderdate, Sarg[[1997-07-01..2099-10-01)]))])
                  :     +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])(reuse_id=[1])
                  +- Exchange(distribution=[hash[l_orderkey]])
                     +- Calc(select=[lineitem.l_orderkey AS l_orderkey], where=[((eventType = 'lineitem') AND (lineitem.l_commitdate < lineitem.l_receiptdate))])
                        +- Reused(reference_id=[1])

@lmatz
Copy link
Contributor Author

lmatz commented Jan 26, 2024

Surprisingly, Flink does not have a SortLimit, aka GroupTopN, before the last Exchange with single distribution 😮
All the rest looks the same.

Since the limit is 1, I don't think TopN in Risingwave should matter a lot. (Edit: see below, seems wrong)

The other important operator is LeftSemiJoin.

@lmatz
Copy link
Contributor Author

lmatz commented Feb 6, 2024

Actually, limit 1 is kind of special:
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L48-L53
when:

i.e., 1) the upsert key of input steam contains partition key; 2) the sort field is updated monotonely under the upsert key.

Wonder if RW also does such optimization?

Edit: Just briefly going through the code, it seems RW does not differentiate 1 or N in top-n/group-top-n processing logic? do you know if it is doable? @xxchan

@lmatz lmatz changed the title perf: improve tpc-h q4 performance perf: improve tpc-h q4 performance (single-topic) Feb 6, 2024
@xxchan
Copy link
Member

xxchan commented Feb 6, 2024

We do have a Dedup operator. Let me think about possible optimizations.

@xxchan
Copy link
Member

xxchan commented Feb 6, 2024

apache/flink#16434

From the pr description, its motivation is not for performance. I guess the impact won’t be very large.

@xxchan
Copy link
Member

xxchan commented Feb 6, 2024

The state is the same, but just changes to use kv interface.

@lmatz
Copy link
Contributor Author

lmatz commented Feb 7, 2024

I see, thanks for the explanation, let me remove the order by limit 1 in both queries and run it again

i.e.:

create sink tpch_q4_without_topn as
     select
         o_orderpriority,
         count(*) as order_count
     from
         orders
     where
         o_orderdate >= date '1997-07-01'
         and o_orderdate < date '2099-07-01' + interval '3' month
         and exists (
             select
                 *
             from
                 lineitem
             where
                 l_orderkey = o_orderkey
                 and l_commitdate < l_receiptdate
         )
     group by
         o_orderpriority
     with ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');

plan:

 StreamSink { type: append-only, columns: [o_orderpriority, order_count] }
 └─StreamHashAgg { group_key: [$expr2], aggs: [count] }
   └─StreamExchange { dist: HashShard($expr2) }
     └─StreamHashJoin { type: LeftSemi, predicate: $expr1 = $expr3 }
       ├─StreamExchange { dist: HashShard($expr1) }
       │ └─StreamProject { exprs: [Field(orders, 0:Int32) as $expr1, Field(orders, 5:Int32) as $expr2, _row_id] }
       │   └─StreamFilter { predicate: (Field(orders, 4:Int32) >= '1997-07-01':Date) AND (Field(orders, 4:Int32) < '2099-10-01 00:00:00':Timestamp) AND (eventType = 'orders':Varchar) }
       │     └─StreamShare { id: 4 }
       │       └─StreamProject { exprs: [eventType, lineitem, orders, _row_id] }
       │         └─StreamFilter { predicate: ((((Field(orders, 4:Int32) >= '1997-07-01':Date) AND (Field(orders, 4:Int32) < '2099-10-01 00:00:00':Timestamp)) AND (eventType = 'orders':Varchar)) OR ((Field(lineitem, 11:Int32) < Field(lineitem, 12:Int32)) AND (eventType = 'lineitem':Varchar))) }
       │           └─StreamRowIdGen { row_id_index: 10 }
       │             └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
       └─StreamExchange { dist: HashShard($expr3) }
         └─StreamProject { exprs: [Field(lineitem, 0:Int32) as $expr3, _row_id] }
           └─StreamFilter { predicate: (Field(lineitem, 11:Int32) < Field(lineitem, 12:Int32)) AND (eventType = 'lineitem':Varchar) }
             └─StreamShare { id: 4 }
               └─StreamProject { exprs: [eventType, lineitem, orders, _row_id] }
                 └─StreamFilter { predicate: ((((Field(orders, 4:Int32) >= '1997-07-01':Date) AND (Field(orders, 4:Int32) < '2099-10-01 00:00:00':Timestamp)) AND (eventType = 'orders':Varchar)) OR ((Field(lineitem, 11:Int32) < Field(lineitem, 12:Int32)) AND (eventType = 'lineitem':Varchar))) }
                   └─StreamRowIdGen { row_id_index: 10 }
                     └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
(20 rows)

Flink:

INSERT INTO tpch_q4_without_topn                       
     select                                                 
       o_orderpriority,                                     
       count(*) as order_count                              
     from                                                   
       orders                                               
     where                                                  
       o_orderdate >= date '1997-07-01'                     
       and o_orderdate < date '2099-07-01' + interval '3' month           
       and exists (                                         
         select
           *
         from                                               
           lineitem                                         
         where
           l_orderkey = o_orderkey                          
           and l_commitdate < l_receiptdate                 
       )   
     group by                                               
       o_orderpriority; 

Query Plan:

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.tpch_q4_without_topn], fields=[o_orderpriority, order_count])
+- GroupAggregate(groupBy=[o_orderpriority], select=[o_orderpriority, COUNT(*) AS order_count])
   +- Exchange(distribution=[hash[o_orderpriority]])
      +- Calc(select=[o_orderpriority])
         +- Join(joinType=[LeftSemiJoin], where=[=(l_orderkey, o_orderkey)], select=[o_orderkey, o_orderpriority], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
            :- Exchange(distribution=[hash[o_orderkey]])
            :  +- Calc(select=[orders.o_orderkey AS o_orderkey, orders.o_orderpriority AS o_orderpriority], where=[AND(SEARCH(eventType, Sarg[_UTF-16LE'orders':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), SEARCH(orders.o_orderdate, Sarg[[1997-07-01..2099-10-01)]))])
            :     +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])
            +- Exchange(distribution=[hash[l_orderkey]])
               +- Calc(select=[lineitem.l_orderkey AS l_orderkey], where=[AND(=(eventType, _UTF-16LE'lineitem':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), <(lineitem.l_commitdate, lineitem.l_receiptdate))])
                  +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.tpch_q4_without_topn], fields=[o_orderpriority, order_count])
+- GroupAggregate(groupBy=[o_orderpriority], select=[o_orderpriority, COUNT(*) AS order_count])
   +- Exchange(distribution=[hash[o_orderpriority]])
      +- Calc(select=[o_orderpriority])
         +- Join(joinType=[LeftSemiJoin], where=[(l_orderkey = o_orderkey)], select=[o_orderkey, o_orderpriority], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
            :- Exchange(distribution=[hash[o_orderkey]])
            :  +- Calc(select=[orders.o_orderkey AS o_orderkey, orders.o_orderpriority AS o_orderpriority], where=[(SEARCH(eventType, Sarg[_UTF-16LE'orders']) AND SEARCH(orders.o_orderdate, Sarg[[1997-07-01..2099-10-01)]))])
            :     +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])(reuse_id=[1])
            +- Exchange(distribution=[hash[l_orderkey]])
               +- Calc(select=[lineitem.l_orderkey AS l_orderkey], where=[((eventType = 'lineitem') AND (lineitem.l_commitdate < lineitem.l_receiptdate))])
                  +- Reused(reference_id=[1])

@lmatz
Copy link
Contributor Author

lmatz commented Feb 21, 2024

The performance of tpch_q4_without_topn and tpch_q4 are generally the same

  1. tpch_q4_without_topn: http://metabase.risingwave-cloud.xyz/question/11007-tpch-q4-without-topn-bs-medium-1cn-affinity-avg-source-output-rows-per-second-rows-s-history-thtb-2983?start_date=2024-01-09
  2. tpch_q4: http://metabase.risingwave-cloud.xyz/question/4834-tpch-q4-bs-medium-1cn-affinity-avg-source-output-rows-per-second-rows-s-history-thtb-367?start_date=2023-11-24

at least unable to explain that RW gets outperformed by such a margin

let's remove the count(*) and group by, then test it again

tpch_q4_without_topn_agg:

     create sink tpch_q4_without_topn_agg as
     select
         o_orderpriority
     from
         orders
     where
         o_orderdate >= date '1997-07-01'
         and o_orderdate < date '2099-07-01' + interval '3' month
         and exists (
             select
                 *
             from
                 lineitem
             where
                 l_orderkey = o_orderkey
                 and l_commitdate < l_receiptdate
         )
     with ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');

plan:

 StreamSink { type: append-only, columns: [o_orderpriority, _row_id(hidden), $expr10203(hidden)] }
 └─StreamHashJoin { type: LeftSemi, predicate: $expr1 = $expr3 }
   ├─StreamExchange { dist: HashShard($expr1) }
   │ └─StreamProject { exprs: [Field(orders, 0:Int32) as $expr1, Field(orders, 5:Int32) as $expr2, _row_id] }
   │   └─StreamFilter { predicate: (Field(orders, 4:Int32) >= '1997-07-01':Date) AND (Field(orders, 4:Int32) < '2099-10-01 00:00:00':Timestamp) AND (eventType = 'orders':Varchar) }
   │     └─StreamShare { id: 4 }
   │       └─StreamProject { exprs: [eventType, lineitem, orders, _row_id] }
   │         └─StreamFilter { predicate: ((((Field(orders, 4:Int32) >= '1997-07-01':Date) AND (Field(orders, 4:Int32) < '2099-10-01 00:00:00':Timestamp)) AND (eventType = 'orders':Varchar)) OR ((Field(lineitem, 11:Int32) < Field(lineitem, 12:Int32)) AND (eventType = 'lineitem':Varchar))) }
   │           └─StreamRowIdGen { row_id_index: 10 }
   │             └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
   └─StreamExchange { dist: HashShard($expr3) }
     └─StreamProject { exprs: [Field(lineitem, 0:Int32) as $expr3, _row_id] }
       └─StreamFilter { predicate: (Field(lineitem, 11:Int32) < Field(lineitem, 12:Int32)) AND (eventType = 'lineitem':Varchar) }
         └─StreamShare { id: 4 }
           └─StreamProject { exprs: [eventType, lineitem, orders, _row_id] }
             └─StreamFilter { predicate: ((((Field(orders, 4:Int32) >= '1997-07-01':Date) AND (Field(orders, 4:Int32) < '2099-10-01 00:00:00':Timestamp)) AND (eventType = 'orders':Varchar)) OR ((Field(lineitem, 11:Int32) < Field(lineitem, 12:Int32)) AND (eventType = 'lineitem':Varchar))) }
               └─StreamRowIdGen { row_id_index: 10 }
                 └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
(18 rows)

Flink:

INSERT INTO tpch_q4_without_topn_agg
     select
       o_orderpriority
     from
       orders
     where
       o_orderdate >= date '1997-07-01'
       and o_orderdate < date '2099-07-01' + interval '3' month
       and exists (
         select
           *
         from
           lineitem
         where
           l_orderkey = o_orderkey
           and l_commitdate < l_receiptdate
       );

Plan:

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.tpch_q4_without_topn_agg], fields=[o_orderpriority])
+- Calc(select=[o_orderpriority])
   +- Join(joinType=[LeftSemiJoin], where=[=(l_orderkey, o_orderkey)], select=[o_orderkey, o_orderpriority], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
      :- Exchange(distribution=[hash[o_orderkey]])
      :  +- Calc(select=[orders.o_orderkey AS o_orderkey, orders.o_orderpriority AS o_orderpriority], where=[AND(SEARCH(eventType, Sarg[_UTF-16LE'orders':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), SEARCH(orders.o_orderdate, Sarg[[1997-07-01..2099-10-01)]))])
      :     +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])
      +- Exchange(distribution=[hash[l_orderkey]])
         +- Calc(select=[lineitem.l_orderkey AS l_orderkey], where=[AND(=(eventType, _UTF-16LE'lineitem':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), <(lineitem.l_commitdate, lineitem.l_receiptdate))])
            +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.tpch_q4_without_topn_agg], fields=[o_orderpriority])
+- Calc(select=[o_orderpriority])
   +- Join(joinType=[LeftSemiJoin], where=[(l_orderkey = o_orderkey)], select=[o_orderkey, o_orderpriority], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
      :- Exchange(distribution=[hash[o_orderkey]])
      :  +- Calc(select=[orders.o_orderkey AS o_orderkey, orders.o_orderpriority AS o_orderpriority], where=[(SEARCH(eventType, Sarg[_UTF-16LE'orders']) AND SEARCH(orders.o_orderdate, Sarg[[1997-07-01..2099-10-01)]))])
      :     +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])(reuse_id=[1])
      +- Exchange(distribution=[hash[l_orderkey]])
         +- Calc(select=[lineitem.l_orderkey AS l_orderkey], where=[((eventType = 'lineitem') AND (lineitem.l_commitdate < lineitem.l_receiptdate))])
            +- Reused(reference_id=[1])

which means it becomes a direct comparison of LeftSemiJoin.

@st1page
Copy link
Contributor

st1page commented Feb 21, 2024

It's a CPU intensive HashJoin and we have not optimized for this case 🤔

@lmatz
Copy link
Contributor Author

lmatz commented Feb 21, 2024

link #14797 as it has two LeftSemiJoin and has BIG room to improve too

if want to generate flamegraph, checkout: https://github.com/risingwavelabs/risingwave-test/tree/main/benchmarks/tpch#generate-and-upload-flamegraph

an example: https://buildkite.com/risingwave-test/tpch-benchmark/builds/995#018dcb7c-c057-40a7-bcad-8874ef9282df
svg will be put under artifacts.

SCR-20240222-k5p

https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1708517192000&to=1708517665000&var-namespace=tpch-1cn-affinity-lmatz-test

SCR-20240222-k7c

many L0s

The conjecture is that since the barrier interval is by default 1s, many L0s may be expected?

Let's try barrier interval = 10s

@lmatz
Copy link
Contributor Author

lmatz commented Feb 29, 2024

RW_VERSION="nightly-20240224"

So after setting RW to:

RW_CONFIG="{'system':{'data_directory':'hummock_001','barrier_interval_ms':10000},'server':{'telemetry_enabled':false},'meta': {'level0_tier_compact_file_number':6,'level0_overlapping_sub_level_compact_level_count':6}}"

Buildkite: https://buildkite.com/risingwave-test/tpch-benchmark/builds/1010 (ignore the title of this pipeline, mistake)

The CPU flame graph: https://buildkite.com/organizations/risingwave-test/pipelines/tpch-benchmark/builds/1010/jobs/018df08c-e78f-4aae-89c8-0d7cf40531cd/artifacts/018df0b5-15b7-43e0-8238-f1b5e300a3bb

SCR-20240229-ilz

It looks like iter, aka read from the storage, still takes a significant portion of time. Although down from 43% (barrier interval = 1s) to 29% now.

Grafana: https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1709140058000&to=1709141079000&var-namespace=tpch-1cn-affinity-10s

LSM Tree Shape

SCR-20240229-it3

Is this considered proper? cc: @Li0k @Little-Wallace

Compute Node and Compactor Node CPU usage

SCR-20240229-k1y

When compaction is trigger, the CPU usage of compute node would drop as it yield some CPU usage to the compactor node.
But sometimes, the sum of CPU usage of these two nodes would be lower than the sum when there is low CPU usage for compaction. I wonder if it means the compaction causes cache block invalidation?

SCR-20240229-k40
SCR-20240229-k48

Do you think if we can make the claim above by these two figures? cc: @Li0k @Little-Wallace

Aggressive Cache Eviction

SCR-20240229-iua
SCR-20240229-iud
SCR-20240229-iul

It looks like the same problem #15305, which also appeared in TPC-H q20 #14797 (comment)
cc: @st1page @fuyufjh

@lmatz
Copy link
Contributor Author

lmatz commented Feb 29, 2024

From @MrCroxx on Slack:

Is cache eviction the reason of high CPU usage by iter, aka storage read?

By comparing the time when the CPU flame graph is taken and the time when cache eviction becomes aggressive:
SCR-20240229-jtt

SCR-20240229-jtz

Perf happens before the cache eviction. Therefore aggressive cache eviction is not the reason of high CPU usage by iter.

@lmatz
Copy link
Contributor Author

lmatz commented Mar 4, 2024

create sink tpch_q4 as
    select
    	o_orderpriority,
    	count(*) as order_count
    from
    	orders
    where
    	o_orderdate >= date '1997-07-01'
    	and o_orderdate < date '2099-07-01' + interval '3' month
    	and exists (
    		select
    			*
    		from
    			lineitem
    		where
    			l_orderkey = o_orderkey
    			and l_commitdate < l_receiptdate
    	)
    group by
    	o_orderpriority
    order by
    	o_orderpriority
    limit 1
    with ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');

o_orderpriority here has essentially just 5 distinct values: https://github.com/risingwavelabs/tpch-bench/blob/0bf3e0ed8cfdd64412ea5550c79e124218bb9d8e/assets/data/dists.dss#L231-L235
Therefore, both group by aggregation and order by limit 1 Top-N operator should have very small state and does not affect the performance or storage much.

@lmatz
Copy link
Contributor Author

lmatz commented Mar 6, 2024

Comparing RW and Flink, both barrier/skpt interval set to 10s

Flink:

Grafana: https://grafana.test.risingwave-cloud.xyz/d/Q4ayNkfVz/flink-hits?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1708699635000&to=1708700658000&var-namespace=tpch-flink-1tm-ckpt-10s

Metabase: http://metabase.risingwave-cloud.xyz/question/5478-flink-tpch-q4-flink-medium-1tm-avg-job-throughput-per-second-records-s-history-thtb-291?start_date=2023-09-07

RW:

Grafana: https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1709640007000&orgId=1&to=1709641102000&var-datasource=Prometheus:%20test-useast1-eks-a&var-namespace=tpch-1cn-affinity-10s

Metabase: http://metabase.risingwave-cloud.xyz/question/13146-tpch-q4-bs-medium-1cn-affinity-avg-source-output-rows-per-second-rows-s-history-thtb-367?start_date=2024-01-25

One intersting phenonomeon is that both Flink and RW has three stages of throughput:

  1. the peak at the beginning
  2. realtively low and stable in the middle
  3. and then peak again at the end

And based on the network usage (bandwidth from Kafka):
RW and Flink performs very similar at the first and the second stage.
However, Flink is better than RW at the third stage.

Flink:
SCR-20240306-l6n

RW:
SCR-20240306-ktb

What's causing the current gap seems to be the difference at the last stage. And we have two interseting observations:

  1. Flink achieves 256MiB/s from Kafka. This is the limit an stateless query that does nothing can maximally achieve. We once evaluated a such query: http://metabase.risingwave-cloud.xyz/question/10770-flink-tpch-q0-flink-medium-1tm-avg-job-throughput-per-second-records-s-history-thtb-2968?start_date=2024-01-08.
  2. Risingwave cannot achieve 256MiB/s from Kafka. At the same time, the CPU usage drops and there is no cache miss. (In general, both Flink and RW can almost completely cache the entire state used in this query).

Since Flink behaves like executing a stateless query, we can look at RW's per-fragment metrics to validate it.
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1709640007000&orgId=1&to=1709641102000&var-datasource=Prometheus:%20test-useast1-eks-a&var-namespace=tpch-1cn-affinity-10s&editPanel=66

SCR-20240306-lbt

We notice that at last stage, only RowIDGen is sending outputs. The filter operator, which immediately follows RowIDGen is outputing nothing. As we can see from the plan, if filter outputs nothing, there is no computation after that.

Then the question is why the data is being filtered away.

We remark that the tpch data is ingested by the following order:
SCR-20240306-l1h

At the last, it is lineitem. And q4 is only interesting in lineitem satisfying this condition l_commitdate < l_receiptdate.

Check out the data generator: https://github.com/risingwavelabs/tpch-bench/blob/master/pkg/data/lineitem.go#L126-L127
and its range for generating random data: https://github.com/risingwavelabs/tpch-bench/blob/master/pkg/data/lineitem.go#L43-L46

It turns out that the generator is always generating data whose l_commitdate is > l_receiptdate, which is unitentionally against the purpose of evaluation.

However, the data is the same for RW and Flink (we use fixed random seed in data generator).
RW shouldn't achieve lower peak throughput at the third stage (it is obvious by comparing network usage)

I have no idea if this can be explained by #14815, the for argument is that the third/last stage is a stateless stage with only filter computation, and the against argument is that RW can achieve close to 250MiB/s at the first stage.

@lmatz lmatz removed this from the release-1.7 milestone Mar 6, 2024
@lmatz
Copy link
Contributor Author

lmatz commented Mar 18, 2024

It is slightly better after #15478, close it for now. We can re-open if needed.

@lmatz lmatz closed this as completed Mar 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants