Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: improve tpc-h q17 performance (single-topic) #14799

Open
Tracked by #15036
lmatz opened this issue Jan 25, 2024 · 6 comments
Open
Tracked by #15036

perf: improve tpc-h q17 performance (single-topic) #14799

lmatz opened this issue Jan 25, 2024 · 6 comments

Comments

@lmatz
Copy link
Contributor

lmatz commented Jan 25, 2024

See performance numbers at https://www.notion.so/risingwave-labs/TPCH-Performance-Numbers-Table-e098ef82884546949333409f0513ada7?pvs=4#8de0bf4bda51444c8381f3b0c10ddfe1

@github-actions github-actions bot added this to the release-1.7 milestone Jan 25, 2024
@lmatz
Copy link
Contributor Author

lmatz commented Jan 25, 2024

RW:

create sink tpch_q17 as
    select
    	sum(l_extendedprice) / 7.0 as avg_yearly
    from
    	lineitem,
    	part
    where
    	p_partkey = l_partkey
        -- and p_brand = 'Brand#13'
        -- and p_container = 'JUMBO PKG'
    	and l_quantity < (
    		select
    			220.2 * avg(l_quantity)
    		from
    			lineitem
    		where
    			l_partkey = p_partkey
    	)
    with ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');

Plan:

 StreamSink { type: append-only, columns: [avg_yearly] }
 └─StreamProject { exprs: [(sum(sum($expr3)) / 7.0:Decimal) as $expr6] }
   └─StreamSimpleAgg { aggs: [sum(sum($expr3)), count] }
     └─StreamExchange { dist: Single }
       └─StreamStatelessSimpleAgg { aggs: [sum($expr3)] }
         └─StreamProject { exprs: [$expr3, _row_id, _row_id, $expr1, $expr4, $expr4] }
           └─StreamFilter { predicate: ($expr2 < $expr5) }
             └─StreamHashJoin { type: Inner, predicate: $expr4 IS NOT DISTINCT FROM $expr4 }
               ├─StreamExchange { dist: HashShard($expr4) }
               │ └─StreamShare { id: 13 }
               │   └─StreamHashJoin [append_only] { type: Inner, predicate: $expr1 = $expr4 }
               │     ├─StreamExchange { dist: HashShard($expr1) }
               │     │ └─StreamShare { id: 7 }
               │     │   └─StreamProject { exprs: [Field(lineitem, 1:Int32) as $expr1, Field(lineitem, 4:Int32) as $expr2, Field(lineitem, 5:Int32) as $expr3, _row_id] }
               │     │     └─StreamFilter { predicate: (eventType = 'lineitem':Varchar) }
               │     │       └─StreamShare { id: 4 }
               │     │         └─StreamProject { exprs: [eventType, lineitem, part, _row_id] }
               │     │           └─StreamFilter { predicate: ((eventType = 'part':Varchar) OR (eventType = 'lineitem':Varchar)) }
               │     │             └─StreamRowIdGen { row_id_index: 10 }
               │     │               └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
               │     └─StreamExchange { dist: HashShard($expr4) }
               │       └─StreamProject { exprs: [Field(part, 0:Int32) as $expr4, _row_id] }
               │         └─StreamFilter { predicate: (eventType = 'part':Varchar) }
               │           └─StreamShare { id: 4 }
               │             └─StreamProject { exprs: [eventType, lineitem, part, _row_id] }
               │               └─StreamFilter { predicate: ((eventType = 'part':Varchar) OR (eventType = 'lineitem':Varchar)) }
               │                 └─StreamRowIdGen { row_id_index: 10 }
               │                   └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
               └─StreamProject { exprs: [$expr4, (220.2:Decimal * (sum($expr2) / count($expr2)::Decimal)) as $expr5] }
                 └─StreamHashAgg { group_key: [$expr4], aggs: [sum($expr2), count($expr2), count] }
                   └─StreamHashJoin { type: LeftOuter, predicate: $expr4 IS NOT DISTINCT FROM $expr1 }
                     ├─StreamAppendOnlyDedup { dedup_cols: [$expr4] }
                     │ └─StreamExchange { dist: HashShard($expr4) }
                     │   └─StreamProject { exprs: [$expr4] }
                     │     └─StreamShare { id: 13 }
                     │       └─StreamHashJoin [append_only] { type: Inner, predicate: $expr1 = $expr4 }
                     │         ├─StreamExchange { dist: HashShard($expr1) }
                     │         │ └─StreamShare { id: 7 }
                     │         │   └─StreamProject { exprs: [Field(lineitem, 1:Int32) as $expr1, Field(lineitem, 4:Int32) as $expr2, Field(lineitem, 5:Int32) as $expr3, _row_id] }
                     │         │     └─StreamFilter { predicate: (eventType = 'lineitem':Varchar) }
                     │         │       └─StreamShare { id: 4 }
                     │         │         └─StreamProject { exprs: [eventType, lineitem, part, _row_id] }
                     │         │           └─StreamFilter { predicate: ((eventType = 'part':Varchar) OR (eventType = 'lineitem':Varchar)) }
                     │         │             └─StreamRowIdGen { row_id_index: 10 }
                     │         │               └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
                     │         └─StreamExchange { dist: HashShard($expr4) }
                     │           └─StreamProject { exprs: [Field(part, 0:Int32) as $expr4, _row_id] }
                     │             └─StreamFilter { predicate: (eventType = 'part':Varchar) }
                     │               └─StreamShare { id: 4 }
                     │                 └─StreamProject { exprs: [eventType, lineitem, part, _row_id] }
                     │                   └─StreamFilter { predicate: ((eventType = 'part':Varchar) OR (eventType = 'lineitem':Varchar)) }
                     │                     └─StreamRowIdGen { row_id_index: 10 }
                     │                       └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
                     └─StreamExchange { dist: HashShard($expr1) }
                       └─StreamProject { exprs: [$expr1, $expr2, _row_id] }
                         └─StreamFilter { predicate: IsNotNull($expr1) }
                           └─StreamShare { id: 7 }
                             └─StreamProject { exprs: [Field(lineitem, 1:Int32) as $expr1, Field(lineitem, 4:Int32) as $expr2, Field(lineitem, 5:Int32) as $expr3, _row_id] }
                               └─StreamFilter { predicate: (eventType = 'lineitem':Varchar) }
                                 └─StreamShare { id: 4 }
                                   └─StreamProject { exprs: [eventType, lineitem, part, _row_id] }
                                     └─StreamFilter { predicate: ((eventType = 'part':Varchar) OR (eventType = 'lineitem':Varchar)) }
                                       └─StreamRowIdGen { row_id_index: 10 }
                                         └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
(64 rows)

Dist Plan:

 Fragment 0
 StreamSink { type: append-only, columns: [avg_yearly] } { tables: [ Sink: 0 ] }
 └── StreamProject { exprs: [(sum(sum($expr3)) / 7.0:Decimal) as $expr6] }
     └── StreamSimpleAgg { aggs: [sum(sum($expr3)), count] }
         ├── tables: [ SimpleAggState: 1 ]
         └── StreamExchange Single from 1
 
 Fragment 1
 StreamStatelessSimpleAgg { aggs: [sum($expr3)] }
 └── StreamProject { exprs: [$expr3, _row_id, _row_id, $expr1, $expr4, $expr4] }
     └── StreamFilter { predicate: ($expr2 < $expr5) }
         └── StreamHashJoin { type: Inner, predicate: $expr4 IS NOT DISTINCT FROM $expr4 }
             ├── tables: [ HashJoinLeft: 2, HashJoinDegreeLeft: 3, HashJoinRight: 4, HashJoinDegreeRight: 5 ]
             ├── StreamExchange Hash([2]) from 2
             └── StreamProject { exprs: [$expr4, (220.2:Decimal * (sum($expr2) / count($expr2)::Decimal)) as $expr5] }
                 └── StreamHashAgg { group_key: [$expr4], aggs: [sum($expr2), count($expr2), count] }
                     ├── tables: [ HashAggState: 11 ]
                     └── StreamHashJoin { type: LeftOuter, predicate: $expr4 IS NOT DISTINCT FROM $expr1 }
                         ├── tables:
                         │   ┌── HashJoinLeft: 12
                         │   ├── HashJoinDegreeLeft: 13
                         │   ├── HashJoinRight: 14
                         │   └── HashJoinDegreeRight: 15
                         ├── StreamAppendOnlyDedup { dedup_cols: [$expr4] } { tables: [ AppendOnlyDedup: 16 ] }
                         │   └── StreamExchange Hash([0]) from 8
                         └── StreamExchange Hash([0]) from 9
 
 Fragment 2
 StreamNoOp
 └── StreamExchange NoShuffle from 3
 
 Fragment 3
 StreamHashJoin [append_only] { type: Inner, predicate: $expr1 = $expr4 }
 ├── tables: [ HashJoinLeft: 6, HashJoinDegreeLeft: 7, HashJoinRight: 8, HashJoinDegreeRight: 9 ]
 ├── StreamExchange Hash([0]) from 4
 └── StreamExchange Hash([0]) from 7
 
 Fragment 4
 StreamNoOp
 └── StreamExchange NoShuffle from 5
 
 Fragment 5
 StreamProject { exprs: [Field(lineitem, 1:Int32) as $expr1, Field(lineitem, 4:Int32) as $expr2, Field(lineitem, 5:Int32) as $expr3, _row_id] }
 └── StreamFilter { predicate: (eventType = 'lineitem':Varchar) }
     └── StreamExchange NoShuffle from 6
 
 Fragment 6
 StreamProject { exprs: [eventType, lineitem, part, _row_id] }
 └── StreamFilter { predicate: ((eventType = 'part':Varchar) OR (eventType = 'lineitem':Varchar)) }
     └── StreamRowIdGen { row_id_index: 10 }
         └── StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
             └── tables: [ Source: 10 ]
 
 Fragment 7
 StreamProject { exprs: [Field(part, 0:Int32) as $expr4, _row_id] }
 └── StreamFilter { predicate: (eventType = 'part':Varchar) }
     └── StreamExchange NoShuffle from 6
 
 Fragment 8
 StreamProject { exprs: [$expr4] }
 └── StreamExchange NoShuffle from 3
 
 Fragment 9
 StreamProject { exprs: [$expr1, $expr2, _row_id] }
 └── StreamFilter { predicate: IsNotNull($expr1) }
     └── StreamExchange NoShuffle from 5
 
 Table 0
 ├── columns: [ kv_log_store_epoch, kv_log_store_seq_id, kv_log_store_row_op, avg_yearly ]
 ├── primary key: [ $0 ASC, $1 ASC ]
 ├── value indices: [ 0, 1, 2, 3 ]
 ├── distribution key: []
 └── read pk prefix len hint: 2
 
 Table 1 { columns: [ sum(sum($expr3)), count ], primary key: [], value indices: [ 0, 1 ], distribution key: [], read pk prefix len hint: 0 }
 
 Table 2
 ├── columns: [ $expr2, $expr3, $expr4, _row_id, $expr1, _row_id_0 ]
 ├── primary key: [ $2 ASC, $3 ASC, $5 ASC, $4 ASC ]
 ├── value indices: [ 0, 1, 2, 3, 4, 5 ]
 ├── distribution key: [ 2 ]
 └── read pk prefix len hint: 1
 
 Table 3
 ├── columns: [ $expr4, _row_id, _row_id_0, $expr1, _degree ]
 ├── primary key: [ $0 ASC, $1 ASC, $2 ASC, $3 ASC ]
 ├── value indices: [ 4 ]
 ├── distribution key: [ 0 ]
 └── read pk prefix len hint: 1
 
 Table 4 { columns: [ $expr4, $expr5 ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 5 { columns: [ $expr4, _degree ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 6
 ├── columns: [ $expr1, $expr2, $expr3, _row_id ]
 ├── primary key: [ $0 ASC, $3 ASC ]
 ├── value indices: [ 0, 1, 2, 3 ]
 ├── distribution key: [ 0 ]
 └── read pk prefix len hint: 1
 
 Table 7 { columns: [ $expr1, _row_id, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 8 { columns: [ $expr4, _row_id ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 0, 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 9 { columns: [ $expr4, _row_id, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 10 { columns: [ partition_id, offset_info ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [], read pk prefix len hint: 1 }
 
 Table 11
 ├── columns: [ $expr4, sum($expr2), count($expr2), count ]
 ├── primary key: [ $0 ASC ]
 ├── value indices: [ 1, 2, 3 ]
 ├── distribution key: [ 0 ]
 └── read pk prefix len hint: 1
 
 Table 12 { columns: [ $expr4 ], primary key: [ $0 ASC ], value indices: [ 0 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 13 { columns: [ $expr4, _degree ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 14 { columns: [ $expr1, $expr2, _row_id ], primary key: [ $0 ASC, $2 ASC ], value indices: [ 0, 1, 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 15 { columns: [ $expr1, _row_id, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 16 { columns: [ $expr4 ], primary key: [ $0 ASC ], value indices: [ 0 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
(126 rows)

@lmatz
Copy link
Contributor Author

lmatz commented Jan 25, 2024

Flink:

INSERT INTO tpch_q17
    select
      sum(l_extendedprice) / 7.0 as avg_yearly
    from
      lineitem,
      part
    where
      p_partkey = l_partkey
      -- and p_brand = 'Brand#13'
      -- and p_container = 'JUMBO PKG'
      and l_quantity < (
        select
          20.2 * avg(l_quantity)
        from
          lineitem
        where
          l_partkey = p_partkey
      );

Plan:

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.tpch_q17], fields=[avg_yearly])
+- Calc(select=[CAST(/($f0, 7.0:DECIMAL(2, 1)) AS DECIMAL(10, 0)) AS avg_yearly])
   +- GroupAggregate(select=[SUM_RETRACT(l_extendedprice) AS $f0])
      +- Exchange(distribution=[single])
         +- Calc(select=[l_extendedprice])
            +- Join(joinType=[InnerJoin], where=[AND(=(p_partkey, l_partkey), <(l_quantity, *(20.2:DECIMAL(3, 1), $f1)))], select=[l_quantity, l_extendedprice, p_partkey, l_partkey, $f1], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
               :- Exchange(distribution=[hash[p_partkey]])
               :  +- Calc(select=[l_quantity, l_extendedprice, p_partkey])
               :     +- Join(joinType=[InnerJoin], where=[=(p_partkey, l_partkey)], select=[l_partkey, l_quantity, l_extendedprice, p_partkey], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
               :        :- Exchange(distribution=[hash[l_partkey]])
               :        :  +- Calc(select=[lineitem.l_partkey AS l_partkey, lineitem.l_quantity AS l_quantity, lineitem.l_extendedprice AS l_extendedprice], where=[=(eventType, _UTF-16LE'lineitem':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")])
               :        :     +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])
               :        +- Exchange(distribution=[hash[p_partkey]])
               :           +- Calc(select=[part.p_partkey AS p_partkey], where=[=(eventType, _UTF-16LE'part':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")])
               :              +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])
               +- Exchange(distribution=[hash[l_partkey]])
                  +- GroupAggregate(groupBy=[l_partkey], select=[l_partkey, AVG(l_quantity) AS $f1])
                     +- Exchange(distribution=[hash[l_partkey]])
                        +- Calc(select=[lineitem.l_partkey AS l_partkey, lineitem.l_quantity AS l_quantity], where=[AND(=(eventType, _UTF-16LE'lineitem':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), IS NOT NULL(lineitem.l_partkey))])
                           +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.tpch_q17], fields=[avg_yearly])
+- Calc(select=[CAST(($f0 / 7.0) AS DECIMAL(10, 0)) AS avg_yearly])
   +- GroupAggregate(select=[SUM_RETRACT(l_extendedprice) AS $f0])
      +- Exchange(distribution=[single])
         +- Calc(select=[l_extendedprice])
            +- Join(joinType=[InnerJoin], where=[((p_partkey = l_partkey) AND (l_quantity < (20.2 * $f1)))], select=[l_quantity, l_extendedprice, p_partkey, l_partkey, $f1], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
               :- Exchange(distribution=[hash[p_partkey]])
               :  +- Calc(select=[l_quantity, l_extendedprice, p_partkey])
               :     +- Join(joinType=[InnerJoin], where=[(p_partkey = l_partkey)], select=[l_partkey, l_quantity, l_extendedprice, p_partkey], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
               :        :- Exchange(distribution=[hash[l_partkey]])
               :        :  +- Calc(select=[lineitem.l_partkey AS l_partkey, lineitem.l_quantity AS l_quantity, lineitem.l_extendedprice AS l_extendedprice], where=[(eventType = 'lineitem')])
               :        :     +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])(reuse_id=[1])
               :        +- Exchange(distribution=[hash[p_partkey]])
               :           +- Calc(select=[part.p_partkey AS p_partkey], where=[(eventType = 'part')])
               :              +- Reused(reference_id=[1])
               +- Exchange(distribution=[hash[l_partkey]])
                  +- GroupAggregate(groupBy=[l_partkey], select=[l_partkey, AVG(l_quantity) AS $f1])
                     +- Exchange(distribution=[hash[l_partkey]])
                        +- Calc(select=[lineitem.l_partkey AS l_partkey, lineitem.l_quantity AS l_quantity], where=[((eventType = 'lineitem') AND lineitem.l_partkey IS NOT NULL)])
                           +- Reused(reference_id=[1])

@lmatz
Copy link
Contributor Author

lmatz commented Jan 25, 2024

We notice that
there are 4 StreamHashJoin in Risingwave's query plan
even after we discount the 2 StreamHashJoins that are shared by StreamShare { id: 13 }
we still got 4 - (2-1) = 3 StreamHashJoin:
2 InnerJoin
1 LeftOuter

while there are only 2 Join in Flink's query plan:
2 InnerJoin

@lmatz lmatz changed the title perf: improve tpc-h q17 performance perf: improve tpc-h q17 performance (single-topic) Feb 6, 2024
@lmatz lmatz removed this from the release-1.7 milestone Mar 6, 2024
@lmatz
Copy link
Contributor Author

lmatz commented Mar 7, 2024

After #15247 and since nightly-20240306

RW's plan now becomes:

StreamSink { type: append-only, columns: [avg_yearly] }
 └─StreamProject { exprs: [(sum(sum($expr3)) / 7.0:Decimal) as $expr6] }
   └─StreamSimpleAgg { aggs: [sum(sum($expr3)), count] }
     └─StreamExchange { dist: Single }
       └─StreamStatelessSimpleAgg { aggs: [sum($expr3)] }
         └─StreamProject { exprs: [$expr3, _row_id, _row_id, $expr1, $expr4, $expr1] }
           └─StreamFilter { predicate: ($expr2 < $expr5) }
             └─StreamHashJoin { type: Inner, predicate: $expr4 = $expr1 }
               ├─StreamExchange { dist: HashShard($expr4) }
               │ └─StreamHashJoin [append_only] { type: Inner, predicate: $expr1 = $expr4 }
               │   ├─StreamExchange { dist: HashShard($expr1) }
               │   │ └─StreamShare { id: 7 }
               │   │   └─StreamProject { exprs: [Field(lineitem, 1:Int32) as $expr1, Field(lineitem, 4:Int32) as $expr2, Field(lineitem, 5:Int32) as $expr3, _row_id] }
               │   │     └─StreamFilter { predicate: (eventType = 'lineitem':Varchar) }
               │   │       └─StreamShare { id: 4 }
               │   │         └─StreamProject { exprs: [eventType, lineitem, part, _row_id] }
               │   │           └─StreamFilter { predicate: ((eventType = 'part':Varchar) OR (eventType = 'lineitem':Varchar)) }
               │   │             └─StreamRowIdGen { row_id_index: 10 }
               │   │               └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
               │   └─StreamExchange { dist: HashShard($expr4) }
               │     └─StreamProject { exprs: [Field(part, 0:Int32) as $expr4, _row_id] }
               │       └─StreamFilter { predicate: (eventType = 'part':Varchar) }
               │         └─StreamShare { id: 4 }
               │           └─StreamProject { exprs: [eventType, lineitem, part, _row_id] }
               │             └─StreamFilter { predicate: ((eventType = 'part':Varchar) OR (eventType = 'lineitem':Varchar)) }
               │               └─StreamRowIdGen { row_id_index: 10 }
               │                 └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
               └─StreamProject { exprs: [(220.2:Decimal * (sum($expr2) / count($expr2)::Decimal)) as $expr5, $expr1] }
                 └─StreamHashAgg [append_only] { group_key: [$expr1], aggs: [sum($expr2), count($expr2), count] }
                   └─StreamExchange { dist: HashShard($expr1) }
                     └─StreamShare { id: 7 }
                       └─StreamProject { exprs: [Field(lineitem, 1:Int32) as $expr1, Field(lineitem, 4:Int32) as $expr2, Field(lineitem, 5:Int32) as $expr3, _row_id] }
                         └─StreamFilter { predicate: (eventType = 'lineitem':Varchar) }
                           └─StreamShare { id: 4 }
                             └─StreamProject { exprs: [eventType, lineitem, part, _row_id] }
                               └─StreamFilter { predicate: ((eventType = 'part':Varchar) OR (eventType = 'lineitem':Varchar)) }
                                 └─StreamRowIdGen { row_id_index: 10 }
                                   └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
(38 rows)

@lmatz
Copy link
Contributor Author

lmatz commented Mar 18, 2024

Copy link
Contributor

This issue has been open for 60 days with no activity. Could you please update the status? Feel free to continue discussion or close as not planned.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants