Skip to content

Commit

Permalink
feat(batch): prepare for union operator (#4761)
Browse files Browse the repository at this point in the history
* support batch union operator

* add test

* Update src/frontend/src/optimizer/plan_node/logical_union.rs

Co-authored-by: xxchan <[email protected]>

Co-authored-by: xxchan <[email protected]>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Aug 20, 2022
1 parent a9b6043 commit bce7c81
Show file tree
Hide file tree
Showing 6 changed files with 483 additions and 0 deletions.
3 changes: 3 additions & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ message LookupJoinNode {
repeated common.WorkerNode worker_nodes = 8;
}

message UnionNode {}

message PlanNode {
repeated PlanNode children = 1;
oneof node_body {
Expand All @@ -236,6 +238,7 @@ message PlanNode {
ExpandNode expand = 28;
LookupJoinNode lookup_join = 29;
ProjectSetNode project_set = 30;
UnionNode union = 31;
}
string identity = 24;
}
Expand Down
3 changes: 3 additions & 0 deletions src/batch/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ mod table_function;
pub mod test_utils;
mod top_n;
mod trace;
mod union;
mod update;
mod values;

Expand Down Expand Up @@ -62,6 +63,7 @@ pub use sort_agg::*;
pub use table_function::*;
pub use top_n::*;
pub use trace::*;
pub use union::*;
pub use update::*;
pub use values::*;

Expand Down Expand Up @@ -199,6 +201,7 @@ impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> {
NodeBody::Expand => ExpandExecutor,
NodeBody::LookupJoin => LookupJoinExecutorBuilder,
NodeBody::ProjectSet => ProjectSetExecutor,
NodeBody::Union => UnionExecutor,
}
.await?;
let input_desc = real_executor.identity().to_string();
Expand Down
178 changes: 178 additions & 0 deletions src/batch/src/executor/union.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use futures::StreamExt;
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::error::{Result, RwError};
use risingwave_common::util::select_all;
use risingwave_pb::batch_plan::plan_node::NodeBody;

use crate::executor::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
};
use crate::task::BatchTaskContext;

pub struct UnionExecutor {
inputs: Vec<BoxedExecutor>,
identity: String,
}

impl Executor for UnionExecutor {
fn schema(&self) -> &Schema {
self.inputs[0].schema()
}

fn identity(&self) -> &str {
&self.identity
}

fn execute(self: Box<Self>) -> BoxedDataChunkStream {
self.do_execute()
}
}

impl UnionExecutor {
#[try_stream(boxed, ok = DataChunk, error = RwError)]
async fn do_execute(self: Box<Self>) {
let mut stream = select_all(
self.inputs
.into_iter()
.map(|input| input.execute())
.collect_vec(),
)
.boxed();

while let Some(data_chunk) = stream.next().await {
let data_chunk = data_chunk?;
yield data_chunk
}
}
}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for UnionExecutor {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<C>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
let _union_node =
try_match_expand!(source.plan_node().get_node_body().unwrap(), NodeBody::Union)?;

Ok(Box::new(Self::new(inputs, "UnionExecutor".into())))
}
}

impl UnionExecutor {
pub fn new(inputs: Vec<BoxedExecutor>, identity: String) -> Self {
Self { inputs, identity }
}
}

#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use futures::stream::StreamExt;
use risingwave_common::array::{Array, DataChunk};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_common::types::DataType;

use crate::executor::test_utils::MockExecutor;
use crate::executor::{Executor, UnionExecutor};

#[tokio::test]
async fn test_union_executor() {
let schema = Schema {
fields: vec![
Field::unnamed(DataType::Int32),
Field::unnamed(DataType::Int32),
],
};
let mut mock_executor1 = MockExecutor::new(schema.clone());
mock_executor1.add(DataChunk::from_pretty(
"i i
1 10
2 20
3 30
4 40",
));

let mut mock_executor2 = MockExecutor::new(schema);
mock_executor2.add(DataChunk::from_pretty(
"i i
5 50
6 60
7 70
8 80",
));

let union_executor = Box::new(UnionExecutor {
inputs: vec![Box::new(mock_executor1), Box::new(mock_executor2)],
identity: "UnionExecutor".to_string(),
});
let fields = &union_executor.schema().fields;
assert_eq!(fields[0].data_type, DataType::Int32);
assert_eq!(fields[1].data_type, DataType::Int32);
let mut stream = union_executor.execute();
let res = stream.next().await.unwrap();
assert_matches!(res, Ok(_));
if let Ok(res) = res {
let col1 = res.column_at(0);
let array = col1.array();
let col1 = array.as_int32();
assert_eq!(col1.len(), 4);
assert_eq!(col1.value_at(0), Some(1));
assert_eq!(col1.value_at(1), Some(2));
assert_eq!(col1.value_at(2), Some(3));
assert_eq!(col1.value_at(3), Some(4));

let col2 = res.column_at(1);
let array = col2.array();
let col2 = array.as_int32();
assert_eq!(col2.len(), 4);
assert_eq!(col2.value_at(0), Some(10));
assert_eq!(col2.value_at(1), Some(20));
assert_eq!(col2.value_at(2), Some(30));
assert_eq!(col2.value_at(3), Some(40));
}

let res = stream.next().await.unwrap();
assert_matches!(res, Ok(_));
if let Ok(res) = res {
let col1 = res.column_at(0);
let array = col1.array();
let col1 = array.as_int32();
assert_eq!(col1.len(), 4);
assert_eq!(col1.value_at(0), Some(5));
assert_eq!(col1.value_at(1), Some(6));
assert_eq!(col1.value_at(2), Some(7));
assert_eq!(col1.value_at(3), Some(8));

let col2 = res.column_at(1);
let array = col2.array();
let col2 = array.as_int32();
assert_eq!(col2.len(), 4);
assert_eq!(col2.value_at(0), Some(50));
assert_eq!(col2.value_at(1), Some(60));
assert_eq!(col2.value_at(2), Some(70));
assert_eq!(col2.value_at(3), Some(80));
}

let res = stream.next().await;
assert_matches!(res, None);
}
}
101 changes: 101 additions & 0 deletions src/frontend/src/optimizer/plan_node/batch_union.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt;

use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::UnionNode;

use super::{PlanRef, ToBatchProst, ToDistributedBatch};
use crate::optimizer::plan_node::{LogicalUnion, PlanBase, PlanTreeNode, ToLocalBatch};
use crate::optimizer::property::{Distribution, Order, RequiredDist};

/// `BatchUnion` implements [`super::LogicalUnion`]
#[derive(Debug, Clone)]
pub struct BatchUnion {
pub base: PlanBase,
logical: LogicalUnion,
}

impl BatchUnion {
pub fn new(logical: LogicalUnion) -> Self {
let ctx = logical.base.ctx.clone();

let dist = if logical
.inputs()
.iter()
.all(|input| *input.distribution() == Distribution::Single)
{
Distribution::Single
} else {
Distribution::SomeShard
};

let base = PlanBase::new_batch(ctx, logical.schema().clone(), dist, Order::any());
BatchUnion { base, logical }
}
}

impl fmt::Display for BatchUnion {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.logical.fmt_with_name(f, "BatchUnion")
}
}

impl PlanTreeNode for BatchUnion {
fn inputs(&self) -> smallvec::SmallVec<[crate::optimizer::PlanRef; 2]> {
let mut vec = smallvec::SmallVec::new();
vec.extend(self.logical.inputs().into_iter());
vec
}

fn clone_with_inputs(&self, inputs: &[crate::optimizer::PlanRef]) -> PlanRef {
Self::new(LogicalUnion::new(self.logical.all(), inputs.to_owned())).into()
}
}

impl ToDistributedBatch for BatchUnion {
fn to_distributed(&self) -> Result<PlanRef> {
// TODO: use round robin distribution to improve it
let new_inputs: Result<Vec<_>> = self
.inputs()
.iter()
.map(|input| {
RequiredDist::single()
.enforce_if_not_satisfies(input.to_distributed()?, &Order::any())
})
.collect();
Ok(self.clone_with_inputs(&new_inputs?))
}
}

impl ToBatchProst for BatchUnion {
fn to_batch_prost_body(&self) -> NodeBody {
NodeBody::Union(UnionNode {})
}
}

impl ToLocalBatch for BatchUnion {
fn to_local(&self) -> Result<PlanRef> {
let new_inputs: Result<Vec<_>> = self
.inputs()
.iter()
.map(|input| {
RequiredDist::single().enforce_if_not_satisfies(input.to_local()?, &Order::any())
})
.collect();
Ok(self.clone_with_inputs(&new_inputs?))
}
}
Loading

0 comments on commit bce7c81

Please sign in to comment.