diff --git a/.config/hakari.toml b/.config/hakari.toml index fb85775958847..9cdfbb3b0c07f 100644 --- a/.config/hakari.toml +++ b/.config/hakari.toml @@ -29,4 +29,10 @@ workspace-members = [ "risingwave_object_store", "risingwave_bench", ] -third-party = [{ name = "opendal" }, { name = "criterion" }] +third-party = [ + { name = "opendal" }, + # These are solely dev-dependencies. Unifying them may slow down build. + { name = "criterion" }, + { name = "console" }, + { name = "similar" }, +] diff --git a/.config/nextest.toml b/.config/nextest.toml index 3ab35f0afa8ed..898b335fb8f86 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -1,5 +1,5 @@ [profile.default] -retries = 5 +retries = 0 slow-timeout = { period = "5s" } status-level = "all" final-status-level = "slow" diff --git a/Cargo.lock b/Cargo.lock index 37f10089c8623..59b583ebb258e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2145,6 +2145,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "dissimilar" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "210ec60ae7d710bed8683e333e9d2855a8a56a3e9892b38bad3bb0d4d29b0d5e" + [[package]] name = "dlv-list" version = "0.5.0" @@ -2338,6 +2344,16 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" +[[package]] +name = "expect-test" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30d9eafeadd538e68fb28016364c9732d78e420b9ff8853fa5e4058861e9f8d3" +dependencies = [ + "dissimilar", + "once_cell", +] + [[package]] name = "fail" version = "0.5.1" @@ -6645,6 +6661,7 @@ dependencies = [ "educe", "either", "enum-as-inner", + "expect-test", "fixedbitset", "futures", "futures-async-stream", @@ -6676,7 +6693,9 @@ dependencies = [ "risingwave_rpc_client", "risingwave_source", "risingwave_storage", + "serde", "serde_json", + "serde_yaml", "smallvec", "static_assertions", "task_stats_alloc", diff --git a/src/common/src/array/stream_chunk.rs b/src/common/src/array/stream_chunk.rs index 6a78020ef66b0..59498579a562c 100644 --- a/src/common/src/array/stream_chunk.rs +++ b/src/common/src/array/stream_chunk.rs @@ -226,6 +226,10 @@ impl StreamChunk { pub fn to_pretty_string(&self) -> String { use comfy_table::{Cell, CellAlignment, Table}; + if self.cardinality() == 0 { + return "(empty)".to_owned(); + } + let mut table = Table::new(); table.load_preset("||--+-++| ++++++"); for (op, row_ref) in self.rows() { diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index dd29bdb917b07..eba9d8898306a 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -80,7 +80,12 @@ workspace-hack = { path = "../workspace-hack" } [dev-dependencies] assert_matches = "1" criterion = { version = "0.4", features = ["async_tokio", "async"] } -risingwave_hummock_test = { path = "../storage/hummock_test", features = ["test"] } +expect-test = "1" +risingwave_hummock_test = { path = "../storage/hummock_test", features = [ + "test", +] } +serde = { version = "1.0", features = ["derive"] } +serde_yaml = "0.9" tracing-test = "0.2" [[bench]] diff --git a/src/stream/README.md b/src/stream/README.md new file mode 100644 index 0000000000000..129dd1b6651ac --- /dev/null +++ b/src/stream/README.md @@ -0,0 +1,11 @@ +# RisingWave Stream Engine + +Read [Stream Engine][stream-engine] for more details about architecture. + +[stream-engine]: https://github.com/risingwavelabs/risingwave/blob/main/docs/streaming-overview.md + +## Writing executor tests + +It's recommended to use [expect_test](https://github.com/rust-analyzer/expect-test) to write new tests, which can automatically update the expected output for you. See `check_until_pending` 's doc and its usage for more details. + +It's recommended to write new tests as *integration tests* (i.e. in `tests/` directory) instead of *unit tests* (i.e. in `src/` directory). See [#9878](https://github.com/risingwavelabs/risingwave/issues/9878) for more details. diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 2920134a59482..fdf5642ac6ef9 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -576,9 +576,9 @@ impl Barrier { #[derive(Debug, PartialEq, Eq, Clone)] pub struct Watermark { - col_idx: usize, - data_type: DataType, - val: ScalarImpl, + pub col_idx: usize, + pub data_type: DataType, + pub val: ScalarImpl, } impl PartialOrd for Watermark { diff --git a/src/stream/src/executor/over_window/eowc.rs b/src/stream/src/executor/over_window/eowc.rs index b8b00c27f6c25..c7bb2a1615a05 100644 --- a/src/stream/src/executor/over_window/eowc.rs +++ b/src/stream/src/executor/over_window/eowc.rs @@ -494,199 +494,3 @@ impl EowcOverWindowExecutor { } } } - -#[cfg(test)] -mod tests { - use std::sync::atomic::AtomicU64; - use std::sync::Arc; - - use risingwave_common::array::StreamChunk; - use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId}; - use risingwave_common::test_prelude::StreamChunkTestExt; - use risingwave_common::types::DataType; - use risingwave_common::util::sort_util::OrderType; - use risingwave_expr::agg::{AggArgs, AggKind}; - use risingwave_expr::function::window::{Frame, FrameBound, WindowFuncCall, WindowFuncKind}; - use risingwave_storage::memory::MemoryStateStore; - use risingwave_storage::StateStore; - - use super::{EowcOverWindowExecutor, EowcOverWindowExecutorArgs}; - use crate::common::table::state_table::StateTable; - use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt}; - use crate::executor::{ActorContext, BoxedMessageStream, Executor}; - - async fn create_executor( - calls: Vec, - store: S, - ) -> (MessageSender, BoxedMessageStream) { - let input_schema = Schema::new(vec![ - Field::unnamed(DataType::Int64), // order key - Field::unnamed(DataType::Varchar), // partition key - Field::unnamed(DataType::Int64), // pk - Field::unnamed(DataType::Int32), // x - ]); - let input_pk_indices = vec![2]; - let partition_key_indices = vec![1]; - let order_key_index = 0; - - let table_columns = vec![ - ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64), // order key - ColumnDesc::unnamed(ColumnId::new(1), DataType::Varchar), // partition key - ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64), // pk - ColumnDesc::unnamed(ColumnId::new(3), DataType::Int32), // x - ]; - let table_pk_indices = vec![1, 0, 2]; - let table_order_types = vec![ - OrderType::ascending(), - OrderType::ascending(), - OrderType::ascending(), - ]; - - let output_pk_indices = vec![2]; - - let state_table = StateTable::new_without_distribution_inconsistent_op( - store, - TableId::new(1), - table_columns, - table_order_types, - table_pk_indices, - ) - .await; - - let (tx, source) = MockSource::channel(input_schema, input_pk_indices.clone()); - let executor = EowcOverWindowExecutor::new(EowcOverWindowExecutorArgs { - input: source.boxed(), - actor_ctx: ActorContext::create(123), - pk_indices: output_pk_indices, - executor_id: 1, - calls, - partition_key_indices, - order_key_index, - state_table, - watermark_epoch: Arc::new(AtomicU64::new(0)), - }); - (tx, executor.boxed().execute()) - } - - #[tokio::test] - async fn test_over_window() { - let store = MemoryStateStore::new(); - let calls = vec![ - WindowFuncCall { - kind: WindowFuncKind::Lag, - args: AggArgs::Unary(DataType::Int32, 3), - return_type: DataType::Int32, - frame: Frame::rows(FrameBound::Preceding(1), FrameBound::CurrentRow), - }, - WindowFuncCall { - kind: WindowFuncKind::Lead, - args: AggArgs::Unary(DataType::Int32, 3), - return_type: DataType::Int32, - frame: Frame::rows(FrameBound::CurrentRow, FrameBound::Following(1)), - }, - ]; - - { - // test basic - let (mut tx, mut over_window) = create_executor(calls.clone(), store.clone()).await; - - tx.push_barrier(1, false); - over_window.expect_barrier().await; - - tx.push_chunk(StreamChunk::from_pretty( - " I T I i - + 1 p1 100 10 - + 1 p1 101 16 - + 4 p2 200 20", - )); - assert_eq!(1, over_window.expect_watermark().await.val.into_int64()); - assert_eq!( - over_window.expect_chunk().await, - StreamChunk::from_pretty( - " I T I i i i - + 1 p1 100 10 . 16" - ) - ); - - tx.push_chunk(StreamChunk::from_pretty( - " I T I i - + 5 p1 102 18 - + 7 p2 201 22 - + 8 p3 300 33", - )); - // NOTE: no watermark message here, since watermark(1) was already received - assert_eq!( - over_window.expect_chunk().await, - StreamChunk::from_pretty( - " I T I i i i - + 1 p1 101 16 10 18 - + 4 p2 200 20 . 22" - ) - ); - - tx.push_barrier(2, false); - over_window.expect_barrier().await; - } - - { - // test recovery - let (mut tx, mut over_window) = create_executor(calls.clone(), store.clone()).await; - - tx.push_barrier(3, false); - over_window.expect_barrier().await; - - tx.push_chunk(StreamChunk::from_pretty( - " I T I i - + 10 p1 103 13 - + 12 p2 202 28 - + 13 p3 301 39", - )); - assert_eq!(5, over_window.expect_watermark().await.val.into_int64()); - assert_eq!( - over_window.expect_chunk().await, - StreamChunk::from_pretty( - " I T I i i i - + 5 p1 102 18 16 13 - + 7 p2 201 22 20 28 - + 8 p3 300 33 . 39" - ) - ); - - tx.push_barrier(4, false); - over_window.expect_barrier().await; - } - } - - #[tokio::test] - async fn test_over_window_aggregate() { - let store = MemoryStateStore::new(); - let calls = vec![WindowFuncCall { - kind: WindowFuncKind::Aggregate(AggKind::Sum), - args: AggArgs::Unary(DataType::Int32, 3), - return_type: DataType::Int64, - frame: Frame::rows(FrameBound::Preceding(1), FrameBound::Following(1)), - }]; - - let (mut tx, mut over_window) = create_executor(calls.clone(), store.clone()).await; - - tx.push_barrier(1, false); - over_window.expect_barrier().await; - - tx.push_chunk(StreamChunk::from_pretty( - " I T I i - + 1 p1 100 10 - + 1 p1 101 16 - + 4 p1 102 20", - )); - assert_eq!(1, over_window.expect_watermark().await.val.into_int64()); - let chunk = over_window.expect_chunk().await; - assert_eq!( - chunk, - StreamChunk::from_pretty( - " I T I i I - + 1 p1 100 10 26 - + 1 p1 101 16 46" - ) - ); - } -} diff --git a/src/stream/src/executor/project_set.rs b/src/stream/src/executor/project_set.rs index 3dfa7ac0dd7ce..7c3b41d5235db 100644 --- a/src/stream/src/executor/project_set.rs +++ b/src/stream/src/executor/project_set.rs @@ -187,82 +187,3 @@ impl ProjectSetExecutor { } } } - -#[cfg(test)] -mod tests { - use futures::StreamExt; - use risingwave_common::array::stream_chunk::StreamChunkTestExt; - use risingwave_common::array::StreamChunk; - use risingwave_common::catalog::{Field, Schema}; - use risingwave_common::types::DataType; - use risingwave_expr::expr::build_from_pretty; - use risingwave_expr::table_function::repeat; - - use super::super::test_utils::MockSource; - use super::super::*; - use super::*; - - const CHUNK_SIZE: usize = 1024; - - #[tokio::test] - async fn test_project_set() { - let chunk1 = StreamChunk::from_pretty( - " I I - + 1 4 - + 2 5 - + 3 6", - ); - let chunk2 = StreamChunk::from_pretty( - " I I - + 7 8 - - 3 6", - ); - let schema = Schema { - fields: vec![ - Field::unnamed(DataType::Int64), - Field::unnamed(DataType::Int64), - ], - }; - let source = MockSource::with_chunks(schema, PkIndices::new(), vec![chunk1, chunk2]); - - let test_expr = build_from_pretty("(add:int8 $0:int8 $1:int8)"); - let tf1 = repeat(build_from_pretty("1:int4"), 1); - let tf2 = repeat(build_from_pretty("2:int4"), 2); - - let project_set = Box::new(ProjectSetExecutor::new( - Box::new(source), - vec![], - vec![test_expr.into(), tf1.into(), tf2.into()], - 1, - CHUNK_SIZE, - )); - - let expected = vec![ - StreamChunk::from_pretty( - " I I i i - + 0 5 1 2 - + 1 5 . 2 - + 0 7 1 2 - + 1 7 . 2 - + 0 9 1 2 - + 1 9 . 2", - ), - StreamChunk::from_pretty( - " I I i i - + 0 15 1 2 - + 1 15 . 2 - - 0 9 1 2 - - 1 9 . 2", - ), - ]; - - let mut project_set = project_set.execute(); - - for expected in expected { - let msg = project_set.next().await.unwrap().unwrap(); - let chunk = msg.as_chunk().unwrap(); - assert_eq!(*chunk, expected); - } - assert!(project_set.next().await.unwrap().unwrap().is_stop()); - } -} diff --git a/src/stream/src/executor/test_utils.rs b/src/stream/src/executor/test_utils.rs index 919bceb1d8529..7f1edb3fa089c 100644 --- a/src/stream/src/executor/test_utils.rs +++ b/src/stream/src/executor/test_utils.rs @@ -25,6 +25,24 @@ use super::{ StreamExecutorResult, Watermark, }; +pub mod prelude { + pub use std::sync::atomic::AtomicU64; + pub use std::sync::Arc; + + pub use risingwave_common::array::StreamChunk; + pub use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId}; + pub use risingwave_common::test_prelude::StreamChunkTestExt; + pub use risingwave_common::types::DataType; + pub use risingwave_common::util::sort_util::OrderType; + pub use risingwave_expr::expr::build_from_pretty; + pub use risingwave_storage::memory::MemoryStateStore; + pub use risingwave_storage::StateStore; + + pub use crate::common::table::state_table::StateTable; + pub use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt}; + pub use crate::executor::{ActorContext, BoxedMessageStream, Executor, PkIndices}; +} + pub struct MockSource { schema: Schema, pk_indices: PkIndices, diff --git a/src/stream/tests/README.md b/src/stream/tests/README.md new file mode 120000 index 0000000000000..9b94e2d61ad05 --- /dev/null +++ b/src/stream/tests/README.md @@ -0,0 +1 @@ +src/stream/README.md \ No newline at end of file diff --git a/src/stream/tests/integration_tests/eowc.rs b/src/stream/tests/integration_tests/eowc.rs new file mode 100644 index 0000000000000..47a72ae69000e --- /dev/null +++ b/src/stream/tests/integration_tests/eowc.rs @@ -0,0 +1,221 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_expr::agg::{AggArgs, AggKind}; +use risingwave_expr::function::window::{Frame, FrameBound, WindowFuncCall, WindowFuncKind}; +use risingwave_stream::executor::{EowcOverWindowExecutor, EowcOverWindowExecutorArgs}; + +use crate::prelude::*; + +async fn create_executor( + calls: Vec, + store: S, +) -> (MessageSender, BoxedMessageStream) { + let input_schema = Schema::new(vec![ + Field::unnamed(DataType::Int64), // order key + Field::unnamed(DataType::Varchar), // partition key + Field::unnamed(DataType::Int64), // pk + Field::unnamed(DataType::Int32), // x + ]); + let input_pk_indices = vec![2]; + let partition_key_indices = vec![1]; + let order_key_index = 0; + + let table_columns = vec![ + ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64), // order key + ColumnDesc::unnamed(ColumnId::new(1), DataType::Varchar), // partition key + ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64), // pk + ColumnDesc::unnamed(ColumnId::new(3), DataType::Int32), // x + ]; + let table_pk_indices = vec![1, 0, 2]; + let table_order_types = vec![ + OrderType::ascending(), + OrderType::ascending(), + OrderType::ascending(), + ]; + + let output_pk_indices = vec![2]; + + let state_table = StateTable::new_without_distribution_inconsistent_op( + store, + TableId::new(1), + table_columns, + table_order_types, + table_pk_indices, + ) + .await; + + let (tx, source) = MockSource::channel(input_schema, input_pk_indices.clone()); + let executor = EowcOverWindowExecutor::new(EowcOverWindowExecutorArgs { + input: source.boxed(), + actor_ctx: ActorContext::create(123), + pk_indices: output_pk_indices, + executor_id: 1, + calls, + partition_key_indices, + order_key_index, + state_table, + watermark_epoch: Arc::new(AtomicU64::new(0)), + }); + (tx, executor.boxed().execute()) +} + +#[tokio::test] +async fn test_over_window() { + let store = MemoryStateStore::new(); + let calls = vec![ + WindowFuncCall { + kind: WindowFuncKind::Lag, + args: AggArgs::Unary(DataType::Int32, 3), + return_type: DataType::Int32, + frame: Frame::rows(FrameBound::Preceding(1), FrameBound::CurrentRow), + }, + WindowFuncCall { + kind: WindowFuncKind::Lead, + args: AggArgs::Unary(DataType::Int32, 3), + return_type: DataType::Int32, + frame: Frame::rows(FrameBound::CurrentRow, FrameBound::Following(1)), + }, + ]; + + check_with_script( + || create_executor(calls.clone(), store.clone()), + r###" +- !barrier 1 +- !chunk |2 + I T I i + + 1 p1 100 10 + + 1 p1 101 16 + + 4 p2 200 20 +- !chunk |2 + I T I i + + 5 p1 102 18 + + 7 p2 201 22 + + 8 p3 300 33 +# NOTE: no watermark message here, since watermark(1) was already received +- !barrier 2 +- recovery +- !barrier 3 +- !chunk |2 + I T I i + + 10 p1 103 13 + + 12 p2 202 28 + + 13 p3 301 39 +- !barrier 4 +"###, + expect![[r#" + - input: !barrier 1 + output: + - !barrier 1 + - input: !chunk |- + +---+---+----+-----+----+ + | + | 1 | p1 | 100 | 10 | + | + | 1 | p1 | 101 | 16 | + | + | 4 | p2 | 200 | 20 | + +---+---+----+-----+----+ + output: + - !watermark + col_idx: 0 + val: 1 + - !chunk |- + +---+---+----+-----+----+---+----+ + | + | 1 | p1 | 100 | 10 | | 16 | + +---+---+----+-----+----+---+----+ + - input: !chunk |- + +---+---+----+-----+----+ + | + | 5 | p1 | 102 | 18 | + | + | 7 | p2 | 201 | 22 | + | + | 8 | p3 | 300 | 33 | + +---+---+----+-----+----+ + output: + - !chunk |- + +---+---+----+-----+----+----+----+ + | + | 1 | p1 | 101 | 16 | 10 | 18 | + | + | 4 | p2 | 200 | 20 | | 22 | + +---+---+----+-----+----+----+----+ + - input: !barrier 2 + output: + - !barrier 2 + - input: recovery + output: [] + - input: !barrier 3 + output: + - !barrier 3 + - input: !chunk |- + +---+----+----+-----+----+ + | + | 10 | p1 | 103 | 13 | + | + | 12 | p2 | 202 | 28 | + | + | 13 | p3 | 301 | 39 | + +---+----+----+-----+----+ + output: + - !watermark + col_idx: 0 + val: 5 + - !chunk |- + +---+---+----+-----+----+----+----+ + | + | 5 | p1 | 102 | 18 | 16 | 13 | + | + | 7 | p2 | 201 | 22 | 20 | 28 | + | + | 8 | p3 | 300 | 33 | | 39 | + +---+---+----+-----+----+----+----+ + - input: !barrier 4 + output: + - !barrier 4 + "#]], + ) + .await; +} + +#[tokio::test] +async fn test_over_window_aggregate() { + let store = MemoryStateStore::new(); + let calls = vec![WindowFuncCall { + kind: WindowFuncKind::Aggregate(AggKind::Sum), + args: AggArgs::Unary(DataType::Int32, 3), + return_type: DataType::Int64, + frame: Frame::rows(FrameBound::Preceding(1), FrameBound::Following(1)), + }]; + + check_with_script( + || create_executor(calls.clone(), store.clone()), + r###" +- !barrier 1 +- !chunk |2 + I T I i + + 1 p1 100 10 + + 1 p1 101 16 + + 4 p1 102 20 +"###, + expect![[r#" + - input: !barrier 1 + output: + - !barrier 1 + - input: !chunk |- + +---+---+----+-----+----+ + | + | 1 | p1 | 100 | 10 | + | + | 1 | p1 | 101 | 16 | + | + | 4 | p1 | 102 | 20 | + +---+---+----+-----+----+ + output: + - !watermark + col_idx: 0 + val: 1 + - !chunk |- + +---+---+----+-----+----+----+ + | + | 1 | p1 | 100 | 10 | 26 | + | + | 1 | p1 | 101 | 16 | 46 | + +---+---+----+-----+----+----+ + "#]], + ) + .await; +} diff --git a/src/stream/tests/integration_tests/main.rs b/src/stream/tests/integration_tests/main.rs new file mode 100644 index 0000000000000..95f9415d588cf --- /dev/null +++ b/src/stream/tests/integration_tests/main.rs @@ -0,0 +1,26 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// tests +mod eowc; +mod project_set; + +// utils +mod prelude { + pub use expect_test::{expect, expect_file}; + pub use risingwave_stream::executor::test_utils::prelude::*; + + pub use crate::snapshot::*; +} +mod snapshot; diff --git a/src/stream/tests/integration_tests/project_set.rs b/src/stream/tests/integration_tests/project_set.rs new file mode 100644 index 0000000000000..1fc7b50dac658 --- /dev/null +++ b/src/stream/tests/integration_tests/project_set.rs @@ -0,0 +1,82 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_expr::table_function::repeat; +use risingwave_stream::executor::ProjectSetExecutor; + +use crate::prelude::*; + +const CHUNK_SIZE: usize = 1024; + +fn create_executor() -> (MessageSender, BoxedMessageStream) { + let schema = Schema { + fields: vec![ + Field::unnamed(DataType::Int64), + Field::unnamed(DataType::Int64), + ], + }; + let (tx, source) = MockSource::channel(schema, PkIndices::new()); + + let test_expr = build_from_pretty("(add:int8 $0:int8 $1:int8)"); + let tf1 = repeat(build_from_pretty("1:int4"), 1); + let tf2 = repeat(build_from_pretty("2:int4"), 2); + + let project_set = Box::new(ProjectSetExecutor::new( + Box::new(source), + vec![], + vec![test_expr.into(), tf1.into(), tf2.into()], + 1, + CHUNK_SIZE, + )); + (tx, project_set.execute()) +} + +#[tokio::test] +async fn test_project_set() { + let (mut tx, mut project_set) = create_executor(); + + tx.push_chunk(StreamChunk::from_pretty( + " I I + + 1 4 + + 2 5 + + 3 6", + )); + tx.push_chunk(StreamChunk::from_pretty( + " I I + + 7 8 + - 3 6", + )); + + check_until_pending( + &mut project_set, + expect_test::expect![[r#" + - !chunk |- + +---+---+---+---+---+ + | + | 0 | 5 | 1 | 2 | + | + | 1 | 5 | | 2 | + | + | 0 | 7 | 1 | 2 | + | + | 1 | 7 | | 2 | + | + | 0 | 9 | 1 | 2 | + | + | 1 | 9 | | 2 | + +---+---+---+---+---+ + - !chunk |- + +---+---+----+---+---+ + | + | 0 | 15 | 1 | 2 | + | + | 1 | 15 | | 2 | + | - | 0 | 9 | 1 | 2 | + | - | 1 | 9 | | 2 | + +---+---+----+---+---+ + "#]], + ); +} diff --git a/src/stream/tests/integration_tests/snapshot.rs b/src/stream/tests/integration_tests/snapshot.rs new file mode 100644 index 0000000000000..2a7d1aae685ef --- /dev/null +++ b/src/stream/tests/integration_tests/snapshot.rs @@ -0,0 +1,156 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use futures::{Future, FutureExt, TryStreamExt}; +use risingwave_common::array::StreamChunk; +use risingwave_common::test_prelude::StreamChunkTestExt; +use risingwave_common::types::DataType; +use risingwave_stream::executor::test_utils::MessageSender; +use risingwave_stream::executor::{BoxedMessageStream, Message}; + +/// Drives the executor until it is pending, and then asserts that the output matches +/// `expect`. +/// +/// `expect` can be altomatically updated by running the test suite with `UPDATE_EXPECT` +/// environmental variable set. +/// +/// TODO: Do we want sth like `check_n_steps` instead, where we do want to wait for a +/// `.await` to complete? +/// +/// # Suggested workflow to add a new test +/// +/// Just drop this one-liner after creating the executor and sending input messages. +/// +/// ```ignore +/// check_until_pending(&mut executor, expect![[""]]).await; +/// ``` +/// +/// Alternatively, you can use `expect_file!` if the inline result doesn't look good. +/// +/// Then just run the tests with env var `UPDATE_EXPECT=1`. +/// +/// ```sh +/// UPDATE_EXPECT=1 cargo nextest run -p risingwave_stream +/// # or +/// UPDATE_EXPECT=1 risedev test -p risingwave_stream +/// ``` +pub fn check_until_pending(executor: &mut BoxedMessageStream, expect: expect_test::Expect) { + let output = run_until_pending(executor); + let output = serde_yaml::to_string(&output).unwrap(); + expect.assert_eq(&output); +} + +/// Similar to [`check_until_pending`], but use a DSL test script as input. +/// +/// For each input event, it drives the executor until it is pending. +pub async fn check_with_script( + create_executor: F, + test_script: &str, + expect: expect_test::Expect, +) where + F: Fn() -> Fut, + Fut: Future, +{ + let output = executor_snapshot(create_executor, test_script).await; + expect.assert_eq(&output); +} + +/// This is a DSL for the input and output of executor snapshot tests. +/// +/// It immitates [`Message`], but more ser/de friendly. +#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "lowercase")] +enum SnapshotEvent { + Barrier(u64), + Noop, + Recovery, + Chunk(String), + Watermark { col_idx: usize, val: i64 }, +} + +impl SnapshotEvent { + #[track_caller] + fn parse(s: &str) -> Vec { + serde_yaml::from_str(s).unwrap() + } +} + +/// One input [event](`SnapshotEvent`) and its corresponding output events. +/// +/// A `Vec` can represent a whole test scenario. +#[derive(Debug, serde::Serialize, serde::Deserialize)] +struct Snapshot { + input: SnapshotEvent, + output: Vec, +} + +async fn executor_snapshot(create_executor: F, inputs: &str) -> String +where + F: Fn() -> Fut, + Fut: Future, +{ + let inputs = SnapshotEvent::parse(inputs); + + let (mut tx, mut executor) = create_executor().await; + + let mut snapshot = Vec::with_capacity(inputs.len()); + for mut event in inputs { + match &mut event { + SnapshotEvent::Barrier(epoch) => { + tx.push_barrier(*epoch, false); + } + SnapshotEvent::Noop => unreachable!(), + SnapshotEvent::Recovery => { + (tx, executor) = create_executor().await; + } + SnapshotEvent::Chunk(chunk_str) => { + let chunk = StreamChunk::from_pretty(chunk_str); + *chunk_str = chunk.to_pretty_string(); + tx.push_chunk(chunk); + } + SnapshotEvent::Watermark { col_idx, val } => { + tx.push_watermark(*col_idx, DataType::Int64, (*val).into()) + } + } + + snapshot.push(Snapshot { + input: event, + output: run_until_pending(&mut executor), + }); + } + + serde_yaml::to_string(&snapshot).unwrap() +} + +fn run_until_pending(executor: &mut BoxedMessageStream) -> Vec { + let mut output = vec![]; + + while let Some(msg) = executor.try_next().now_or_never() { + let msg = msg.unwrap(); + let msg = match msg { + Some(msg) => msg, + None => return output, + }; + output.push(match msg { + Message::Chunk(chunk) => SnapshotEvent::Chunk(chunk.to_pretty_string()), + Message::Barrier(barrier) => SnapshotEvent::Barrier(barrier.epoch.curr), + Message::Watermark(watermark) => SnapshotEvent::Watermark { + col_idx: watermark.col_idx, + val: watermark.val.into_int64(), + }, + }); + } + + output +} diff --git a/src/tests/simulation/tests/it/main.rs b/src/tests/simulation/tests/integration_tests/main.rs similarity index 100% rename from src/tests/simulation/tests/it/main.rs rename to src/tests/simulation/tests/integration_tests/main.rs diff --git a/src/tests/simulation/tests/it/recovery/mod.rs b/src/tests/simulation/tests/integration_tests/recovery/mod.rs similarity index 100% rename from src/tests/simulation/tests/it/recovery/mod.rs rename to src/tests/simulation/tests/integration_tests/recovery/mod.rs diff --git a/src/tests/simulation/tests/it/recovery/nexmark_recovery.rs b/src/tests/simulation/tests/integration_tests/recovery/nexmark_recovery.rs similarity index 100% rename from src/tests/simulation/tests/it/recovery/nexmark_recovery.rs rename to src/tests/simulation/tests/integration_tests/recovery/nexmark_recovery.rs diff --git a/src/tests/simulation/tests/it/scale/cascade_materialized_view.rs b/src/tests/simulation/tests/integration_tests/scale/cascade_materialized_view.rs similarity index 100% rename from src/tests/simulation/tests/it/scale/cascade_materialized_view.rs rename to src/tests/simulation/tests/integration_tests/scale/cascade_materialized_view.rs diff --git a/src/tests/simulation/tests/it/scale/dynamic_filter.rs b/src/tests/simulation/tests/integration_tests/scale/dynamic_filter.rs similarity index 100% rename from src/tests/simulation/tests/it/scale/dynamic_filter.rs rename to src/tests/simulation/tests/integration_tests/scale/dynamic_filter.rs diff --git a/src/tests/simulation/tests/it/scale/mod.rs b/src/tests/simulation/tests/integration_tests/scale/mod.rs similarity index 100% rename from src/tests/simulation/tests/it/scale/mod.rs rename to src/tests/simulation/tests/integration_tests/scale/mod.rs diff --git a/src/tests/simulation/tests/it/scale/nexmark_chaos.rs b/src/tests/simulation/tests/integration_tests/scale/nexmark_chaos.rs similarity index 100% rename from src/tests/simulation/tests/it/scale/nexmark_chaos.rs rename to src/tests/simulation/tests/integration_tests/scale/nexmark_chaos.rs diff --git a/src/tests/simulation/tests/it/scale/nexmark_q4.rs b/src/tests/simulation/tests/integration_tests/scale/nexmark_q4.rs similarity index 100% rename from src/tests/simulation/tests/it/scale/nexmark_q4.rs rename to src/tests/simulation/tests/integration_tests/scale/nexmark_q4.rs diff --git a/src/tests/simulation/tests/it/scale/nexmark_source.rs b/src/tests/simulation/tests/integration_tests/scale/nexmark_source.rs similarity index 100% rename from src/tests/simulation/tests/it/scale/nexmark_source.rs rename to src/tests/simulation/tests/integration_tests/scale/nexmark_source.rs diff --git a/src/tests/simulation/tests/it/scale/no_shuffle.rs b/src/tests/simulation/tests/integration_tests/scale/no_shuffle.rs similarity index 100% rename from src/tests/simulation/tests/it/scale/no_shuffle.rs rename to src/tests/simulation/tests/integration_tests/scale/no_shuffle.rs diff --git a/src/tests/simulation/tests/it/scale/singleton_migration.rs b/src/tests/simulation/tests/integration_tests/scale/singleton_migration.rs similarity index 100% rename from src/tests/simulation/tests/it/scale/singleton_migration.rs rename to src/tests/simulation/tests/integration_tests/scale/singleton_migration.rs diff --git a/src/tests/simulation/tests/it/scale/sink.rs b/src/tests/simulation/tests/integration_tests/scale/sink.rs similarity index 100% rename from src/tests/simulation/tests/it/scale/sink.rs rename to src/tests/simulation/tests/integration_tests/scale/sink.rs diff --git a/src/tests/simulation/tests/it/scale/streaming_parallelism.rs b/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs similarity index 100% rename from src/tests/simulation/tests/it/scale/streaming_parallelism.rs rename to src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs diff --git a/src/tests/simulation/tests/it/scale/table.rs b/src/tests/simulation/tests/integration_tests/scale/table.rs similarity index 100% rename from src/tests/simulation/tests/it/scale/table.rs rename to src/tests/simulation/tests/integration_tests/scale/table.rs