diff --git a/src/stream/src/executor/over_window/eowc.rs b/src/stream/src/executor/over_window/eowc.rs index e712d4a0af99c..93b1801b21f12 100644 --- a/src/stream/src/executor/over_window/eowc.rs +++ b/src/stream/src/executor/over_window/eowc.rs @@ -624,7 +624,7 @@ mod tests { insta::assert_snapshot!( executor_snapshot( - Box::new(async || create_executor(calls.clone(), store.clone()).await), + async || create_executor(calls.clone(), store.clone()).await, r###" - barrier - !chunk |- diff --git a/src/stream/src/executor/project_set.rs b/src/stream/src/executor/project_set.rs index 3dfa7ac0dd7ce..9de217c1cf0e9 100644 --- a/src/stream/src/executor/project_set.rs +++ b/src/stream/src/executor/project_set.rs @@ -190,9 +190,6 @@ impl ProjectSetExecutor { #[cfg(test)] mod tests { - use futures::StreamExt; - use risingwave_common::array::stream_chunk::StreamChunkTestExt; - use risingwave_common::array::StreamChunk; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::DataType; use risingwave_expr::expr::build_from_pretty; @@ -201,29 +198,19 @@ mod tests { use super::super::test_utils::MockSource; use super::super::*; use super::*; + use crate::executor::test_utils::snapshot::executor_snapshot; + use crate::executor::test_utils::MessageSender; const CHUNK_SIZE: usize = 1024; - #[tokio::test] - async fn test_project_set() { - let chunk1 = StreamChunk::from_pretty( - " I I - + 1 4 - + 2 5 - + 3 6", - ); - let chunk2 = StreamChunk::from_pretty( - " I I - + 7 8 - - 3 6", - ); + fn create_executor() -> (MessageSender, BoxedMessageStream) { let schema = Schema { fields: vec![ Field::unnamed(DataType::Int64), Field::unnamed(DataType::Int64), ], }; - let source = MockSource::with_chunks(schema, PkIndices::new(), vec![chunk1, chunk2]); + let (tx, source) = MockSource::channel(schema, PkIndices::new()); let test_expr = build_from_pretty("(add:int8 $0:int8 $1:int8)"); let tf1 = repeat(build_from_pretty("1:int4"), 1); @@ -236,33 +223,45 @@ mod tests { 1, CHUNK_SIZE, )); + (tx, project_set.execute()) + } - let expected = vec![ - StreamChunk::from_pretty( - " I I i i - + 0 5 1 2 - + 1 5 . 2 - + 0 7 1 2 - + 1 7 . 2 - + 0 9 1 2 - + 1 9 . 2", - ), - StreamChunk::from_pretty( - " I I i i - + 0 15 1 2 - + 1 15 . 2 - - 0 9 1 2 - - 1 9 . 2", - ), - ]; - - let mut project_set = project_set.execute(); + #[tokio::test] + async fn test_project_set() { + insta::assert_snapshot!( + executor_snapshot( + async || create_executor(), + r###" +- !chunk |- + I I + + 1 4 + + 2 5 + + 3 6 +- !chunk |- + I I + + 7 8 + - 3 6 +"### + ) + .await + ); - for expected in expected { - let msg = project_set.next().await.unwrap().unwrap(); - let chunk = msg.as_chunk().unwrap(); - assert_eq!(*chunk, expected); - } - assert!(project_set.next().await.unwrap().unwrap().is_stop()); + insta::assert_snapshot!( + executor_snapshot( + async || create_executor(), + r###" +- !chunk |- + I I + + 1 4 + + 2 5 + + 3 6 +- !chunk |- + I I + + 7 8 + - 3 6 +"### + ) + .await + ); } } diff --git a/src/stream/src/executor/snapshots/risingwave_stream__executor__project_set__tests__project_set.snap b/src/stream/src/executor/snapshots/risingwave_stream__executor__project_set__tests__project_set.snap new file mode 100644 index 0000000000000..6dcdd458ea304 --- /dev/null +++ b/src/stream/src/executor/snapshots/risingwave_stream__executor__project_set__tests__project_set.snap @@ -0,0 +1,34 @@ +--- +source: src/stream/src/executor/project_set.rs +expression: "executor_snapshot(async || create_executor(),\n r###\"\n- !chunk |-\n I I\n + 1 4\n + 2 5\n + 3 6\n- !chunk |-\n I I\n + 7 8\n - 3 6\n\"###).await" +--- +- input: !chunk |- + +---+---+---+ + | + | 1 | 4 | + | + | 2 | 5 | + | + | 3 | 6 | + +---+---+---+ + output: + - !chunk |- + +---+---+---+---+---+ + | + | 0 | 5 | 1 | 2 | + | + | 1 | 5 | | 2 | + | + | 0 | 7 | 1 | 2 | + | + | 1 | 7 | | 2 | + | + | 0 | 9 | 1 | 2 | + | + | 1 | 9 | | 2 | + +---+---+---+---+---+ +- input: !chunk |- + +---+---+---+ + | + | 7 | 8 | + | - | 3 | 6 | + +---+---+---+ + output: + - !chunk |- + +---+---+----+---+---+ + | + | 0 | 15 | 1 | 2 | + | + | 1 | 15 | | 2 | + | - | 0 | 9 | 1 | 2 | + | - | 1 | 9 | | 2 | + +---+---+----+---+---+ +