Skip to content

Commit

Permalink
feat(optimizer): add kafka scan operator (#16371)
Browse files Browse the repository at this point in the history
chenzl25 authored Apr 18, 2024
1 parent aa9d4ba commit 9d9d205
Showing 23 changed files with 839 additions and 490 deletions.
6 changes: 3 additions & 3 deletions e2e_test/streaming/aggregate/count_star.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t (v int);

@@ -7,9 +10,6 @@ insert into t values (114), (514);
statement ok
create materialized view mv as select * from t;

statement ok
flush;

query I
select count(*) from t;
----
Original file line number Diff line number Diff line change
@@ -3,11 +3,11 @@
select * from s
logical_plan: |-
LogicalProject { exprs: [id, value] }
└─LogicalSource { source: s, columns: [id, value, _rw_kafka_timestamp, _row_id], time_range: (Unbounded, Unbounded) }
└─LogicalSource { source: s, columns: [id, value, _rw_kafka_timestamp, _row_id] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [id, value] }
└─BatchSource { source: s, columns: [id, value, _rw_kafka_timestamp, _row_id], filter: (None, None) }
└─BatchKafkaScan { source: s, columns: [id, value, _rw_kafka_timestamp, _row_id], filter: (None, None) }
create_source:
format: plain
encode: protobuf
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [(v2 + 1:Int32) as $expr1] }
└─BatchSource { source: s1, columns: [v2, _row_id], filter: (None, None) }
└─BatchSource { source: s1, columns: [v2, _row_id] }
- name: select proctime()
sql: |
select proctime();
98 changes: 49 additions & 49 deletions src/frontend/planner_test/tests/testdata/output/nexmark_source.yaml

Large diffs are not rendered by default.

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions src/frontend/planner_test/tests/testdata/output/share.yaml
Original file line number Diff line number Diff line change
@@ -27,10 +27,10 @@
└─BatchHashJoin { type: Inner, predicate: id = id, output: [] }
├─BatchExchange { order: [], dist: HashShard(id) }
│ └─BatchFilter { predicate: (initial_bid = 1:Int32) }
│ └─BatchSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id], filter: (None, None) }
│ └─BatchSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id] }
└─BatchExchange { order: [], dist: HashShard(id) }
└─BatchFilter { predicate: (initial_bid = 2:Int32) }
└─BatchSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id], filter: (None, None) }
└─BatchSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id] }
stream_plan: |-
StreamMaterialize { columns: [cnt], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamProject { exprs: [sum0(count)] }
@@ -92,7 +92,7 @@
│ └─LogicalProject { exprs: [window_start, auction] }
│ └─LogicalHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: all }
│ └─LogicalFilter { predicate: IsNotNull(date_time) }
│ └─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id], time_range: (Unbounded, Unbounded) }
│ └─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
└─LogicalProject { exprs: [max(count), window_start] }
└─LogicalAgg { group_key: [window_start], aggs: [max(count)] }
└─LogicalProject { exprs: [window_start, count] }
@@ -101,7 +101,7 @@
└─LogicalProject { exprs: [auction, window_start] }
└─LogicalHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: all }
└─LogicalFilter { predicate: IsNotNull(date_time) }
└─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id], time_range: (Unbounded, Unbounded) }
└─LogicalSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchHashJoin { type: Inner, predicate: window_start = window_start AND (count >= max(count)), output: [auction, count] }
@@ -111,15 +111,15 @@
│ └─BatchExchange { order: [], dist: HashShard(auction) }
│ └─BatchProject { exprs: [auction, date_time] }
│ └─BatchFilter { predicate: IsNotNull(date_time) }
│ └─BatchSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id], filter: (None, None) }
│ └─BatchSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
└─BatchHashAgg { group_key: [window_start], aggs: [max(count)] }
└─BatchExchange { order: [], dist: HashShard(window_start) }
└─BatchHashAgg { group_key: [auction, window_start], aggs: [count] }
└─BatchHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start] }
└─BatchExchange { order: [], dist: HashShard(auction) }
└─BatchProject { exprs: [auction, date_time] }
└─BatchFilter { predicate: IsNotNull(date_time) }
└─BatchSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id], filter: (None, None) }
└─BatchSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
stream_plan: |-
StreamMaterialize { columns: [auction, num, window_start(hidden), window_start#1(hidden)], stream_key: [auction, window_start], pk_columns: [auction, window_start], pk_conflict: NoCheck }
└─StreamProject { exprs: [auction, count, window_start, window_start] }
@@ -298,11 +298,11 @@
├─BatchExchange { order: [], dist: HashShard(id) }
│ └─BatchFilter { predicate: (initial_bid = 1:Int32) AND (AtTimeZone(date_time, 'UTC':Varchar) > '2021-03-31 23:59:59+00:00':Timestamptz) }
│ └─BatchProject { exprs: [id, initial_bid, date_time] }
│ └─BatchSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id], filter: (None, None) }
│ └─BatchSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id] }
└─BatchExchange { order: [], dist: HashShard(id) }
└─BatchFilter { predicate: (initial_bid = 2:Int32) }
└─BatchProject { exprs: [id, initial_bid, date_time] }
└─BatchSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id], filter: (None, None) }
└─BatchSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id] }
stream_plan: |-
StreamMaterialize { columns: [a_id, b_id, a_ts, b_ts, _row_id(hidden), _row_id#1(hidden)], stream_key: [_row_id, _row_id#1, a_id], pk_columns: [_row_id, _row_id#1, a_id], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(id, _row_id, _row_id) }
Original file line number Diff line number Diff line change
@@ -36,7 +36,7 @@
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [x, y] }
└─BatchSource { source: s, columns: [x, y, _rw_kafka_timestamp, _row_id], filter: (None, None) }
└─BatchKafkaScan { source: s, columns: [x, y, _rw_kafka_timestamp, _row_id], filter: (None, None) }
stream_plan: |-
StreamMaterialize { columns: [x, y, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [x, y, _row_id] }
@@ -52,7 +52,7 @@
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [x, y] }
└─BatchSource { source: s, columns: [x, y, _rw_kafka_timestamp, _row_id], filter: (None, None) }
└─BatchKafkaScan { source: s, columns: [x, y, _rw_kafka_timestamp, _row_id], filter: (None, None) }
stream_plan: |-
StreamMaterialize { columns: [x, y, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [x, y, _row_id] }
@@ -68,7 +68,7 @@
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [x, y] }
└─BatchSource { source: s, columns: [x, y, _rw_kafka_timestamp, _row_id], filter: (None, None) }
└─BatchKafkaScan { source: s, columns: [x, y, _rw_kafka_timestamp, _row_id], filter: (None, None) }
stream_plan: |-
StreamMaterialize { columns: [x, y, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [x, y, _row_id] }
@@ -84,7 +84,7 @@
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [x, y] }
└─BatchSource { source: s, columns: [x, y, _rw_kafka_timestamp, _row_id], filter: (None, None) }
└─BatchKafkaScan { source: s, columns: [x, y, _rw_kafka_timestamp, _row_id], filter: (None, None) }
stream_plan: |-
StreamMaterialize { columns: [x, y, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [x, y, _row_id] }
10 changes: 5 additions & 5 deletions src/frontend/planner_test/tests/testdata/output/subquery.yaml
Original file line number Diff line number Diff line change
@@ -486,21 +486,21 @@
└─LogicalJoin { type: LeftSemi, on: IsNotDistinctFrom(a1, a1), output: [] }
├─LogicalShare { id: 2 }
│ └─LogicalProject { exprs: [a1] }
│ └─LogicalSource { source: a, columns: [a1, a2, _row_id], time_range: (Unbounded, Unbounded) }
│ └─LogicalSource { source: a, columns: [a1, a2, _row_id] }
└─LogicalJoin { type: LeftSemi, on: IsNotDistinctFrom(a1, a1), output: all }
├─LogicalJoin { type: Inner, on: true, output: all }
│ ├─LogicalAgg { group_key: [a1], aggs: [] }
│ │ └─LogicalShare { id: 2 }
│ │ └─LogicalProject { exprs: [a1] }
│ │ └─LogicalSource { source: a, columns: [a1, a2, _row_id], time_range: (Unbounded, Unbounded) }
│ │ └─LogicalSource { source: a, columns: [a1, a2, _row_id] }
│ └─LogicalAgg { aggs: [] }
│ └─LogicalSource { source: b, columns: [b1, b2, _row_id], time_range: (Unbounded, Unbounded) }
│ └─LogicalSource { source: b, columns: [b1, b2, _row_id] }
└─LogicalJoin { type: Inner, on: true, output: [a1] }
├─LogicalAgg { group_key: [a1], aggs: [] }
│ └─LogicalShare { id: 2 }
│ └─LogicalProject { exprs: [a1] }
│ └─LogicalSource { source: a, columns: [a1, a2, _row_id], time_range: (Unbounded, Unbounded) }
└─LogicalSource { source: c, columns: [c1, c2, _row_id], time_range: (Unbounded, Unbounded) }
│ └─LogicalSource { source: a, columns: [a1, a2, _row_id] }
└─LogicalSource { source: c, columns: [c1, c2, _row_id] }
- name: test subquery in table function
sql: |
create table t(x int[], y int[], k int primary key);
Loading

0 comments on commit 9d9d205

Please sign in to comment.