Skip to content

Commit

Permalink
feat: implement append-only group TopN (#7522)
Browse files Browse the repository at this point in the history
close #7376

Approved-By: BugenZhao
  • Loading branch information
xxchan authored Feb 3, 2023
1 parent dffc2f1 commit 05eb37f
Show file tree
Hide file tree
Showing 14 changed files with 452 additions and 37 deletions.
19 changes: 18 additions & 1 deletion dashboard/proto/gen/stream_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 24 additions & 8 deletions e2e_test/streaming/append_only.slt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ insert into t2 values (1,5), (2,6), (3, 7);
statement ok
create materialized view mv1 as select t1.v1 as id, v2, v3 from t1 join t2 on t1.v1=t2.v1;

query I rowsort
query III rowsort
select * from mv1;
----
1 2 5
Expand All @@ -27,7 +27,7 @@ select * from mv1;
statement ok
insert into t1 values (3,4), (7,7);

query II rowsort
query III rowsort
select * from mv1;
----
1 2 5
Expand All @@ -45,7 +45,7 @@ insert into t4 values (1,1,4), (5,1,4), (1,9,1), (9,8,1), (0,2,3);
statement ok
create materialized view mv3 as select v3, sum(v1) as sum_v1, min(v1) as min_v1, max(v1) as max_v1 from t4 group by v3;

query III
query IIII
select sum_v1, min_v1, max_v1, v3 from mv3 order by sum_v1;
----
0 0 0 3
Expand All @@ -58,18 +58,31 @@ statement ok
create materialized view mv4 as select v1, v3 from t4 order by v1 limit 3 offset 3;

## scan MV with ORDER BY isn't guaranteed to be ordered
query IV rowsort
query II rowsort
select * from mv4;
----
5 4
9 1

## Group TopN
statement ok
create materialized view mv4_1 as
select v1, v3 from (
select *, ROW_NUMBER() OVER (PARTITION BY v3 ORDER BY v1) as rank from t4
)
where rank <= 2 AND rank > 1;

query II rowsort
select * from mv4_1;
----
5 4
9 1

## SimpleAgg
statement ok
create materialized view mv5 as select sum(v1) as sum_v1, max(v2) as max_v2, min(v3) as min_v3 from t4;

query V
query III
select * from mv5;
----
16 9 1
Expand All @@ -84,7 +97,7 @@ insert into t5 values (1,0), (1,1), (1,2), (1,3);
statement ok
create materialized view mv6 as select v1, v2 from t5 order by v1 fetch first 3 rows with ties;

query IV rowsort
query II rowsort
select * from mv6;
----
1 0
Expand All @@ -95,7 +108,7 @@ select * from mv6;
statement ok
insert into t5 values (0,1), (0,2);

query IV rowsort
query II rowsort
select * from mv6;
----
0 1
Expand All @@ -108,7 +121,7 @@ select * from mv6;
statement ok
insert into t5 values (0,3);

query IV rowsort
query II rowsort
select * from mv6;
----
0 1
Expand All @@ -121,6 +134,9 @@ drop materialized view mv6
statement ok
drop materialized view mv5

statement ok
drop materialized view mv4_1

statement ok
drop materialized view mv4

Expand Down
1 change: 1 addition & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ message StreamNode {
DmlNode dml = 127;
RowIdGenNode row_id_gen = 128;
NowNode now = 129;
GroupTopNNode append_only_group_top_n = 130;
}
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/planner_test/tests/testdata/nexmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,7 @@
StreamMaterialize { columns: [auction, bidder, price, channel, url, date_time, extra, bid._row_id(hidden)], pk_columns: [bid._row_id] }
└─StreamExchange { dist: HashShard(bid._row_id) }
└─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id] }
└─StreamGroupTopN { order: "[bid.date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] }
└─StreamAppendOnlyGroupTopN { order: "[bid.date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] }
└─StreamExchange { dist: HashShard(bid.bidder, bid.auction) }
└─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
stream_dist_plan: |
Expand All @@ -964,7 +964,7 @@
Fragment 1
StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id] }
StreamGroupTopN { order: "[bid.date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] }
StreamAppendOnlyGroupTopN { order: "[bid.date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] }
state table: 0
StreamExchange Hash([1, 0]) from 2
Expand Down
13 changes: 10 additions & 3 deletions src/frontend/src/optimizer/plan_node/stream_group_topn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,21 @@ impl StreamNode for StreamGroupTopN {
table: Some(table.to_internal_table_prost()),
order_by: self.topn_order().to_protobuf(),
};

ProstStreamNode::GroupTopN(group_topn_node)
if self.input().append_only() {
ProstStreamNode::AppendOnlyGroupTopN(group_topn_node)
} else {
ProstStreamNode::GroupTopN(group_topn_node)
}
}
}

impl fmt::Display for StreamGroupTopN {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut builder = f.debug_struct("StreamGroupTopN");
let mut builder = f.debug_struct(if self.input().append_only() {
"StreamAppendOnlyGroupTopN"
} else {
"StreamGroupTopN"
});
let input = self.input();
let input_schema = input.schema();
builder.field(
Expand Down
12 changes: 8 additions & 4 deletions src/frontend/src/utils/stream_graph_formatter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,6 @@ impl StreamGraphFormatter {
})
.join(", ")
)),
stream_node::NodeBody::AppendOnlyTopN(node) => Some(format!(
"state table: {}",
self.add_table(node.get_table().unwrap())
)),
stream_node::NodeBody::HashJoin(node) => Some(format!(
"left table: {}, right table {},{}{}",
self.add_table(node.get_left_table().unwrap()),
Expand All @@ -201,6 +197,10 @@ impl StreamGraphFormatter {
"state table: {}",
self.add_table(node.get_table().unwrap())
)),
stream_node::NodeBody::AppendOnlyTopN(node) => Some(format!(
"state table: {}",
self.add_table(node.get_table().unwrap())
)),
stream_node::NodeBody::Arrange(node) => Some(format!(
"arrange table: {}",
self.add_table(node.get_table().unwrap())
Expand All @@ -214,6 +214,10 @@ impl StreamGraphFormatter {
"state table: {}",
self.add_table(node.get_table().unwrap())
)),
stream_node::NodeBody::AppendOnlyGroupTopN(node) => Some(format!(
"state table: {}",
self.add_table(node.get_table().unwrap())
)),
stream_node::NodeBody::Now(node) => Some(format!(
"state table: {}",
self.add_table(node.get_state_table().unwrap())
Expand Down
3 changes: 3 additions & 0 deletions src/meta/src/stream/stream_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1278,6 +1278,9 @@ where
NodeBody::TopN(node) => {
always!(node.table, "TopN");
}
NodeBody::AppendOnlyGroupTopN(node) => {
always!(node.table, "AppendOnlyGroupTopN");
}
NodeBody::GroupTopN(node) => {
always!(node.table, "GroupTopN");
}
Expand Down
4 changes: 3 additions & 1 deletion src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ use simple::{SimpleExecutor, SimpleExecutorWrapper};
pub use sink::SinkExecutor;
pub use sort::SortExecutor;
pub use source::*;
pub use top_n::{AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor};
pub use top_n::{
AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
};
pub use union::UnionExecutor;
pub use watermark_filter::WatermarkFilterExecutor;
pub use wrapper::WrapperExecutor;
Expand Down
30 changes: 13 additions & 17 deletions src/stream/src/executor/top_n/group_top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::{Deref, DerefMut};
use std::sync::Arc;

use async_trait::async_trait;
Expand Down Expand Up @@ -117,7 +118,7 @@ impl<K: HashKey, S: StateStore, const WITH_TIES: bool> InnerGroupTopNExecutorNew
info: ExecutorInfo {
schema,
pk_indices,
identity: format!("TopNExecutorNew {:X}", executor_id),
identity: format!("GroupTopNExecutor {:X}", executor_id),
},
offset: offset_and_limit.0,
limit: offset_and_limit.1,
Expand All @@ -139,27 +140,22 @@ impl<K: HashKey, const WITH_TIES: bool> GroupTopNCache<K, WITH_TIES> {
let cache = ExecutorCache::new(new_unbounded(lru_manager));
Self { data: cache }
}
}

fn clear(&mut self) {
self.data.clear()
}
impl<K: HashKey, const WITH_TIES: bool> Deref for GroupTopNCache<K, WITH_TIES> {
type Target = ExecutorCache<K, TopNCache<WITH_TIES>>;

fn get_mut(&mut self, key: &K) -> Option<&mut TopNCache<WITH_TIES>> {
self.data.get_mut(key)
}

fn contains(&mut self, key: &K) -> bool {
self.data.contains(key)
}

fn insert(&mut self, key: K, value: TopNCache<WITH_TIES>) {
self.data.push(key, value);
fn deref(&self) -> &Self::Target {
&self.data
}
}

fn evict(&mut self) {
self.data.evict()
impl<K: HashKey, const WITH_TIES: bool> DerefMut for GroupTopNCache<K, WITH_TIES> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.data
}
}

#[async_trait]
impl<K: HashKey, S: StateStore, const WITH_TIES: bool> TopNExecutorBase
for InnerGroupTopNExecutorNew<K, S, WITH_TIES>
Expand All @@ -186,7 +182,7 @@ where
self.managed_state
.init_topn_cache(Some(group_key), &mut topn_cache)
.await?;
self.caches.insert(group_cache_key.clone(), topn_cache);
self.caches.push(group_cache_key.clone(), topn_cache);
}
let cache = self.caches.get_mut(group_cache_key).unwrap();

Expand Down
Loading

0 comments on commit 05eb37f

Please sign in to comment.