Skip to content

Commit

Permalink
feat(stream): new boxed group_top_n executor from proto (#4682)
Browse files Browse the repository at this point in the history
* feat(stream): new boxed group top n executor from proto

Signed-off-by: Shmiwy <[email protected]>

* feat(stream): infer group topn state table catalog

Signed-off-by: Shmiwy <[email protected]>

* feat(stream): new boxed group top n executor from proto

Signed-off-by: Shmiwy <[email protected]>

* feat(stream): assigned table id on meta node

Signed-off-by: Shmiwy <[email protected]>

* feat(stream): assigned table id on meta node

Signed-off-by: Shmiwy <[email protected]>

* feat(stream): infer state table catalog in front end

Signed-off-by: Shmiwy <[email protected]>

* feat(stream): infer state table catalog in front end

Signed-off-by: Shmiwy <[email protected]>

* feat(stream): infer state table catalog in front end

Signed-off-by: Shmiwy <[email protected]>

* refactor(stream): reduce code duplication

Signed-off-by: Shmiwy <[email protected]>

* feat(stream): infer state table catalog in front end

Signed-off-by: Shmiwy <[email protected]>

* refactor(stream): reduce code duplication

Signed-off-by: Shmiwy <[email protected]>

* refactor(stream): reduce code duplication

Signed-off-by: Shmiwy <[email protected]>

* feat(stream): new boxed group top n executor from proto

Signed-off-by: Shmiwy <[email protected]>

* refactor(stream): remove allow dead code

Signed-off-by: Shmiwy <[email protected]>

Signed-off-by: Shmiwy <[email protected]>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
Shmiwy and mergify[bot] authored Aug 20, 2022
1 parent d86dae2 commit a9b6043
Show file tree
Hide file tree
Showing 24 changed files with 539 additions and 326 deletions.
17 changes: 16 additions & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ message HashAggNode {
}

message TopNNode {
// 0 means no limit as limit of 0 means this node should be optimized away
uint64 limit = 1;
uint64 offset = 2;
catalog.Table table = 3;
}
message AppendOnlyTopNNode {
repeated plan_common.ColumnOrder column_orders = 1;
// 0 means no limit as limit of 0 means this node should be optimized away
uint64 limit = 2;
Expand All @@ -178,6 +184,14 @@ message TopNNode {
uint32 table_id_h = 6;
}

message GroupTopNNode {
// 0 means no limit as limit of 0 means this node should be optimized away
uint64 limit = 1;
uint64 offset = 2;
repeated uint32 group_key = 3;
catalog.Table table = 4;
}

message HashJoinNode {
plan_common.JoinType join_type = 1;
repeated int32 left_key = 2;
Expand Down Expand Up @@ -350,7 +364,7 @@ message StreamNode {
SimpleAggNode local_simple_agg = 104;
SimpleAggNode global_simple_agg = 105;
HashAggNode hash_agg = 106;
TopNNode append_only_top_n = 107;
AppendOnlyTopNNode append_only_top_n = 107;
HashJoinNode hash_join = 108;
TopNNode top_n = 109;
HopWindowNode hop_window = 110;
Expand All @@ -367,6 +381,7 @@ message StreamNode {
ExpandNode expand = 121;
DynamicFilterNode dynamic_filter = 122;
ProjectSetNode project_set = 123;
GroupTopNNode group_top_n = 124;
}
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
Expand Down
8 changes: 8 additions & 0 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,14 @@ impl TableCatalog {
self.distribution_key.as_ref()
}

pub fn to_state_table_prost(&self) -> ProstTable {
use risingwave_common::catalog::{DatabaseId, SchemaId};
self.to_prost(
SchemaId::placeholder() as u32,
DatabaseId::placeholder() as u32,
)
}

pub fn to_prost(&self, schema_id: SchemaId, database_id: DatabaseId) -> ProstTable {
ProstTable {
id: self.id.table_id as u32,
Expand Down
32 changes: 32 additions & 0 deletions src/frontend/src/optimizer/plan_node/logical_topn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::fmt;

use fixedbitset::FixedBitSet;
use itertools::Itertools;
use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::{Result, RwError};
use risingwave_common::util::sort_util::OrderType;

use super::utils::TableCatalogBuilder;
use super::{
gen_filter_and_pushdown, ColPrunable, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown,
ToBatch, ToStream,
Expand All @@ -27,6 +30,7 @@ use crate::optimizer::plan_node::{BatchTopN, LogicalProject, StreamTopN};
use crate::optimizer::property::{FieldOrder, Order, OrderDisplay, RequiredDist};
use crate::planner::LIMIT_ALL_COUNT;
use crate::utils::{ColIndexMapping, Condition};
use crate::TableCatalog;

/// `LogicalTopN` sorts the input data and fetches up to `limit` rows from `offset`
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -96,6 +100,34 @@ impl LogicalTopN {
.field("offset", &format_args!("{}", self.offset()))
.finish()
}

pub fn infer_internal_table_catalog(&self) -> TableCatalog {
let schema = &self.base.schema;
let dist_keys = self.base.dist.dist_column_indices().to_vec();
let pk_indices = &self.base.logical_pk;
let columns_fields = schema.fields().to_vec();
let field_order = &self.order.field_order;
let mut internal_table_catalog_builder = TableCatalogBuilder::new();

columns_fields.iter().for_each(|field| {
internal_table_catalog_builder.add_column(field);
});
let mut order_cols = HashSet::new();

field_order.iter().for_each(|field_order| {
internal_table_catalog_builder
.add_order_column(field_order.index, OrderType::from(field_order.direct));
order_cols.insert(field_order.index);
});

pk_indices.iter().for_each(|idx| {
if !order_cols.contains(idx) {
internal_table_catalog_builder.add_order_column(*idx, OrderType::Ascending);
order_cols.insert(*idx);
}
});
internal_table_catalog_builder.build(dist_keys, self.base.append_only)
}
}

impl PlanTreeNodeUnary for LogicalTopN {
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ pub use stream_exchange::StreamExchange;
pub use stream_expand::StreamExpand;
pub use stream_filter::StreamFilter;
pub use stream_global_simple_agg::StreamGlobalSimpleAgg;
pub use stream_group_topn::StreamGroupTopN;
pub use stream_hash_agg::StreamHashAgg;
pub use stream_hash_join::StreamHashJoin;
pub use stream_hop_window::StreamHopWindow;
Expand Down Expand Up @@ -392,6 +393,7 @@ macro_rules! for_all_plan_nodes {
, { Stream, Expand }
, { Stream, DynamicFilter }
, { Stream, ProjectSet }
, { Stream, GroupTopN }
}
};
}
Expand Down Expand Up @@ -480,6 +482,7 @@ macro_rules! for_stream_plan_nodes {
, { Stream, Expand }
, { Stream, DynamicFilter }
, { Stream, ProjectSet }
, { Stream, GroupTopN }
}
};
}
Expand Down
13 changes: 4 additions & 9 deletions src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use std::collections::HashMap;
use std::fmt;

use risingwave_common::catalog::{DatabaseId, Schema, SchemaId};
use risingwave_common::catalog::Schema;
use risingwave_common::config::constant::hummock::PROPERTIES_RETAINTION_SECOND_KEY;
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::stream_plan::stream_node::NodeBody;
Expand Down Expand Up @@ -101,16 +101,11 @@ impl ToStreamProst for StreamDynamicFilter {
.as_expr_unless_true()
.map(|x| x.to_expr_proto()),
left_table: Some(
infer_left_internal_table_catalog(self.clone().into(), self.left_index).to_prost(
SchemaId::placeholder() as u32,
DatabaseId::placeholder() as u32,
),
infer_left_internal_table_catalog(self.clone().into(), self.left_index)
.to_state_table_prost(),
),
right_table: Some(
infer_right_internal_table_catalog(self.right.clone()).to_prost(
SchemaId::placeholder() as u32,
DatabaseId::placeholder() as u32,
),
infer_right_internal_table_catalog(self.right.clone()).to_state_table_prost(),
),
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::fmt;

use itertools::Itertools;
use risingwave_common::catalog::{DatabaseId, SchemaId};
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;

use super::logical_agg::PlanAggCall;
Expand Down Expand Up @@ -102,12 +101,7 @@ impl ToStreamProst for StreamGlobalSimpleAgg {
.collect_vec(),
internal_tables: internal_tables
.into_iter()
.map(|table_catalog| {
table_catalog.to_prost(
SchemaId::placeholder() as u32,
DatabaseId::placeholder() as u32,
)
})
.map(|table_catalog| table_catalog.to_state_table_prost())
.collect_vec(),
column_mappings: column_mappings
.into_iter()
Expand Down
75 changes: 67 additions & 8 deletions src/frontend/src/optimizer/plan_node/stream_group_topn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,26 @@
// limitations under the License.

use std::collections::HashSet;
use std::fmt;

use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;

use super::utils::TableCatalogBuilder;
use super::PlanBase;
use crate::optimizer::property::{Distribution, Order};
use super::{PlanBase, PlanTreeNodeUnary, ToStreamProst};
use crate::optimizer::property::{Distribution, Order, OrderDisplay};
use crate::{PlanRef, TableCatalog};

#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct StreamGroupTopN {
pub base: PlanBase,
input: PlanRef,
group_key: Vec<usize>,
limit: usize,
offset: usize,
order: Order,
}

#[allow(dead_code)]
impl StreamGroupTopN {
pub fn new(
input: PlanRef,
Expand All @@ -45,10 +46,7 @@ impl StreamGroupTopN {
Distribution::UpstreamHashShard(_) => {
Distribution::UpstreamHashShard(group_key.clone())
}

Distribution::Broadcast => Distribution::Broadcast,
Distribution::Single => Distribution::Single,
Distribution::SomeShard => Distribution::SomeShard,
_ => input.distribution().clone(),
};
let base = PlanBase::new_stream(
input.ctx(),
Expand All @@ -60,6 +58,7 @@ impl StreamGroupTopN {
);
StreamGroupTopN {
base,
input,
group_key,
limit,
offset,
Expand Down Expand Up @@ -111,3 +110,63 @@ impl StreamGroupTopN {
internal_table_catalog_builder.build(dist_keys, self.base.append_only)
}
}

impl ToStreamProst for StreamGroupTopN {
fn to_stream_prost_body(&self) -> ProstStreamNode {
use risingwave_pb::stream_plan::*;
let group_key = self.group_key.iter().map(|idx| *idx as u32).collect();

if self.limit == 0 {
panic!("topN's limit shouldn't be 0.");
}
let group_topn_node = GroupTopNNode {
limit: self.limit as u64,
offset: self.offset as u64,
group_key,
table: Some(self.infer_internal_table_catalog().to_state_table_prost()),
};

ProstStreamNode::GroupTopN(group_topn_node)
}
}

impl fmt::Display for StreamGroupTopN {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut builder = f.debug_struct("StreamGroupTopN");
let input = self.input();
let input_schema = input.schema();
builder.field(
"order",
&format!(
"{}",
OrderDisplay {
order: self.order(),
input_schema
}
),
);
builder
.field("limit", &format_args!("{}", self.limit))
.field("offset", &format_args!("{}", self.offset))
.field("group_key", &format_args!("{:?}", self.group_key))
.finish()
}
}

impl_plan_tree_node_for_unary! { StreamGroupTopN }

impl PlanTreeNodeUnary for StreamGroupTopN {
fn input(&self) -> PlanRef {
self.input.clone()
}

fn clone_with_input(&self, input: PlanRef) -> Self {
Self::new(
input,
self.group_key.clone(),
self.limit,
self.offset,
self.order.clone(),
)
}
}
8 changes: 1 addition & 7 deletions src/frontend/src/optimizer/plan_node/stream_hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::fmt;

use itertools::Itertools;
use risingwave_common::catalog::{DatabaseId, SchemaId};
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;

use super::logical_agg::PlanAggCall;
Expand Down Expand Up @@ -95,12 +94,7 @@ impl ToStreamProst for StreamHashAgg {
.collect_vec(),
internal_tables: internal_tables
.into_iter()
.map(|table_catalog| {
table_catalog.to_prost(
SchemaId::placeholder() as u32,
DatabaseId::placeholder() as u32,
)
})
.map(|table_catalog| table_catalog.to_state_table_prost())
.collect_vec(),
column_mappings: column_mappings
.into_iter()
Expand Down
13 changes: 4 additions & 9 deletions src/frontend/src/optimizer/plan_node/stream_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::HashMap;
use std::fmt;

use itertools::Itertools;
use risingwave_common::catalog::{DatabaseId, Field, Schema, SchemaId};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::config::constant::hummock::PROPERTIES_RETAINTION_SECOND_KEY;
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;
Expand Down Expand Up @@ -211,16 +211,11 @@ impl ToStreamProst for StreamHashJoin {
.as_expr_unless_true()
.map(|x| x.to_expr_proto()),
left_table: Some(
infer_internal_table_catalog(self.left(), left_key_indices).to_prost(
SchemaId::placeholder() as u32,
DatabaseId::placeholder() as u32,
),
infer_internal_table_catalog(self.left(), left_key_indices).to_state_table_prost(),
),
right_table: Some(
infer_internal_table_catalog(self.right(), right_key_indices).to_prost(
SchemaId::placeholder() as u32,
DatabaseId::placeholder() as u32,
),
infer_internal_table_catalog(self.right(), right_key_indices)
.to_state_table_prost(),
),
output_indices: self
.logical
Expand Down
7 changes: 2 additions & 5 deletions src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::fmt;

use fixedbitset::FixedBitSet;
use itertools::Itertools;
use risingwave_common::catalog::{ColumnDesc, DatabaseId, SchemaId, TableId};
use risingwave_common::catalog::{ColumnDesc, TableId};
use risingwave_common::config::constant::hummock::PROPERTIES_RETAINTION_SECOND_KEY;
use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::Result;
Expand Down Expand Up @@ -257,10 +257,7 @@ impl ToStreamProst for StreamMaterialize {
.iter()
.map(FieldOrder::to_protobuf)
.collect(),
table: Some(self.table().to_prost(
SchemaId::placeholder() as u32,
DatabaseId::placeholder() as u32,
)),
table: Some(self.table().to_state_table_prost()),
})
}
}
Loading

0 comments on commit a9b6043

Please sign in to comment.