Skip to content

Commit

Permalink
project set
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed May 15, 2023
1 parent 1fe29e6 commit 4b304ef
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 44 deletions.
2 changes: 1 addition & 1 deletion src/stream/src/executor/over_window/eowc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ mod tests {

insta::assert_snapshot!(
executor_snapshot(
Box::new(async || create_executor(calls.clone(), store.clone()).await),
async || create_executor(calls.clone(), store.clone()).await,
r###"
- barrier
- !chunk |-
Expand Down
85 changes: 42 additions & 43 deletions src/stream/src/executor/project_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,6 @@ impl ProjectSetExecutor {

#[cfg(test)]
mod tests {
use futures::StreamExt;
use risingwave_common::array::stream_chunk::StreamChunkTestExt;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::DataType;
use risingwave_expr::expr::build_from_pretty;
Expand All @@ -201,29 +198,19 @@ mod tests {
use super::super::test_utils::MockSource;
use super::super::*;
use super::*;
use crate::executor::test_utils::snapshot::executor_snapshot;
use crate::executor::test_utils::MessageSender;

const CHUNK_SIZE: usize = 1024;

#[tokio::test]
async fn test_project_set() {
let chunk1 = StreamChunk::from_pretty(
" I I
+ 1 4
+ 2 5
+ 3 6",
);
let chunk2 = StreamChunk::from_pretty(
" I I
+ 7 8
- 3 6",
);
fn create_executor() -> (MessageSender, BoxedMessageStream) {
let schema = Schema {
fields: vec![
Field::unnamed(DataType::Int64),
Field::unnamed(DataType::Int64),
],
};
let source = MockSource::with_chunks(schema, PkIndices::new(), vec![chunk1, chunk2]);
let (tx, source) = MockSource::channel(schema, PkIndices::new());

let test_expr = build_from_pretty("(add:int8 $0:int8 $1:int8)");
let tf1 = repeat(build_from_pretty("1:int4"), 1);
Expand All @@ -236,33 +223,45 @@ mod tests {
1,
CHUNK_SIZE,
));
(tx, project_set.execute())
}

let expected = vec![
StreamChunk::from_pretty(
" I I i i
+ 0 5 1 2
+ 1 5 . 2
+ 0 7 1 2
+ 1 7 . 2
+ 0 9 1 2
+ 1 9 . 2",
),
StreamChunk::from_pretty(
" I I i i
+ 0 15 1 2
+ 1 15 . 2
- 0 9 1 2
- 1 9 . 2",
),
];

let mut project_set = project_set.execute();
#[tokio::test]
async fn test_project_set() {
insta::assert_snapshot!(
executor_snapshot(
async || create_executor(),
r###"
- !chunk |-
I I
+ 1 4
+ 2 5
+ 3 6
- !chunk |-
I I
+ 7 8
- 3 6
"###
)
.await
);

for expected in expected {
let msg = project_set.next().await.unwrap().unwrap();
let chunk = msg.as_chunk().unwrap();
assert_eq!(*chunk, expected);
}
assert!(project_set.next().await.unwrap().unwrap().is_stop());
insta::assert_snapshot!(
executor_snapshot(
async || create_executor(),
r###"
- !chunk |-
I I
+ 1 4
+ 2 5
+ 3 6
- !chunk |-
I I
+ 7 8
- 3 6
"###
)
.await
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
---
source: src/stream/src/executor/project_set.rs
expression: "executor_snapshot(async || create_executor(),\n r###\"\n- !chunk |-\n I I\n + 1 4\n + 2 5\n + 3 6\n- !chunk |-\n I I\n + 7 8\n - 3 6\n\"###).await"
---
- input: !chunk |-
+---+---+---+
| + | 1 | 4 |
| + | 2 | 5 |
| + | 3 | 6 |
+---+---+---+
output:
- !chunk |-
+---+---+---+---+---+
| + | 0 | 5 | 1 | 2 |
| + | 1 | 5 | | 2 |
| + | 0 | 7 | 1 | 2 |
| + | 1 | 7 | | 2 |
| + | 0 | 9 | 1 | 2 |
| + | 1 | 9 | | 2 |
+---+---+---+---+---+
- input: !chunk |-
+---+---+---+
| + | 7 | 8 |
| - | 3 | 6 |
+---+---+---+
output:
- !chunk |-
+---+---+----+---+---+
| + | 0 | 15 | 1 | 2 |
| + | 1 | 15 | | 2 |
| - | 0 | 9 | 1 | 2 |
| - | 1 | 9 | | 2 |
+---+---+----+---+---+

0 comments on commit 4b304ef

Please sign in to comment.