Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: snapshot testing for stream executors #9787

Merged
merged 25 commits into from
May 19, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .config/hakari.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,10 @@ workspace-members = [
"risingwave_object_store",
"risingwave_bench",
]
third-party = [{ name = "opendal" }, { name = "criterion" }]
third-party = [
{ name = "opendal" },
# These are solely dev-dependencies. Unifying them may slow down build.
{ name = "criterion" },
{ name = "console" },
{ name = "similar" },
]
2 changes: 1 addition & 1 deletion .config/nextest.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[profile.default]
retries = 5
retries = 0
slow-timeout = { period = "5s" }
status-level = "all"
final-status-level = "slow"
Expand Down
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ workspace-hack = { path = "../workspace-hack" }
[dev-dependencies]
assert_matches = "1"
criterion = { version = "0.4", features = ["async_tokio", "async"] }
insta = "1.29"
risingwave_hummock_test = { path = "../storage/hummock_test", features = ["test"] }
serde = { version = "1.0", features = ["derive"] }
serde_yaml = "0.9"
tracing-test = "0.2"

[[bench]]
Expand Down
136 changes: 45 additions & 91 deletions src/stream/src/executor/over_window/eowc.rs
xxchan marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -494,9 +494,7 @@ mod tests {
use std::sync::atomic::AtomicU64;
use std::sync::Arc;

use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
use risingwave_common::test_prelude::StreamChunkTestExt;
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;
use risingwave_expr::agg::{AggArgs, AggKind};
Expand All @@ -506,7 +504,8 @@ mod tests {

use super::{EowcOverWindowExecutor, EowcOverWindowExecutorArgs};
use crate::common::table::state_table::StateTable;
use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
use crate::executor::test_utils::snapshot::executor_snapshot;
use crate::executor::test_utils::{MessageSender, MockSource};
use crate::executor::{ActorContext, BoxedMessageStream, Executor};

async fn create_executor<S: StateStore>(
Expand Down Expand Up @@ -580,75 +579,37 @@ mod tests {
},
];

{
// test basic
let (mut tx, mut over_window) = create_executor(calls.clone(), store.clone()).await;

tx.push_barrier(1, false);
over_window.expect_barrier().await;

tx.push_chunk(StreamChunk::from_pretty(
" I T I i
+ 1 p1 100 10
+ 1 p1 101 16
+ 4 p2 200 20",
));
assert_eq!(1, over_window.expect_watermark().await.val.into_int64());
assert_eq!(
over_window.expect_chunk().await,
StreamChunk::from_pretty(
" I T I i i i
+ 1 p1 100 10 . 16"
)
);

tx.push_chunk(StreamChunk::from_pretty(
" I T I i
+ 5 p1 102 18
+ 7 p2 201 22
+ 8 p3 300 33",
));
// NOTE: no watermark message here, since watermark(1) was already received
assert_eq!(
over_window.expect_chunk().await,
StreamChunk::from_pretty(
" I T I i i i
+ 1 p1 101 16 10 18
+ 4 p2 200 20 . 22"
)
);

tx.push_barrier(2, false);
over_window.expect_barrier().await;
}

{
// test recovery
let (mut tx, mut over_window) = create_executor(calls.clone(), store.clone()).await;

tx.push_barrier(3, false);
over_window.expect_barrier().await;

tx.push_chunk(StreamChunk::from_pretty(
" I T I i
+ 10 p1 103 13
+ 12 p2 202 28
+ 13 p3 301 39",
));
assert_eq!(5, over_window.expect_watermark().await.val.into_int64());
assert_eq!(
over_window.expect_chunk().await,
StreamChunk::from_pretty(
" I T I i i i
+ 5 p1 102 18 16 13
+ 7 p2 201 22 20 28
+ 8 p3 300 33 . 39"
)
);

tx.push_barrier(4, false);
over_window.expect_barrier().await;
}
insta::assert_snapshot!(
executor_snapshot(
Box::new(async || create_executor(calls.clone(), store.clone()).await),
r###"
- barrier
# FIXME: the header cannot be aligned. What's the correct YAML format?
- !chunk |-
I T I i
+ 1 p1 100 10
+ 1 p1 101 16
+ 4 p2 200 20
- !chunk |-
I T I i
+ 5 p1 102 18
+ 7 p2 201 22
+ 8 p3 300 33
# TODO: preserve comment in snapshot?
# NOTE: no watermark message here, since watermark(1) was already received
- barrier
- recovery
- barrier
- !chunk |-
I T I i
+ 10 p1 103 13
+ 12 p2 202 28
+ 13 p3 301 39
- barrier
"###,
)
.await
);
}

#[tokio::test]
Expand All @@ -661,26 +622,19 @@ mod tests {
frame: Frame::rows(FrameBound::Preceding(1), FrameBound::Following(1)),
}];

let (mut tx, mut over_window) = create_executor(calls.clone(), store.clone()).await;

tx.push_barrier(1, false);
over_window.expect_barrier().await;

tx.push_chunk(StreamChunk::from_pretty(
" I T I i
+ 1 p1 100 10
+ 1 p1 101 16
+ 4 p1 102 20",
));
assert_eq!(1, over_window.expect_watermark().await.val.into_int64());
let chunk = over_window.expect_chunk().await;
assert_eq!(
chunk,
StreamChunk::from_pretty(
" I T I i I
+ 1 p1 100 10 26
+ 1 p1 101 16 46"
insta::assert_snapshot!(
executor_snapshot(
Box::new(async || create_executor(calls.clone(), store.clone()).await),
xxchan marked this conversation as resolved.
Show resolved Hide resolved
r###"
- barrier
- !chunk |-
I T I i
+ 1 p1 100 10
+ 1 p1 101 16
+ 4 p1 102 20
"###
)
.await
);
}
}
xxchan marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
---
source: src/stream/src/executor/over_window/eowc.rs
expression: "executor_snapshot(Box::new(async ||\n create_executor(calls.clone(), store.clone()).await),\n SnapshotEvent::parse(inputs)).await"
xxchan marked this conversation as resolved.
Show resolved Hide resolved
---
- input: barrier
output:
- barrier
- input: !chunk |-
+---+---+----+-----+----+
| + | 1 | p1 | 100 | 10 |
| + | 1 | p1 | 101 | 16 |
| + | 4 | p2 | 200 | 20 |
+---+---+----+-----+----+
output:
- !watermark
col_idx: 0
val: 1
- !chunk |-
+---+---+----+-----+----+---+----+
| + | 1 | p1 | 100 | 10 | | 16 |
+---+---+----+-----+----+---+----+
- input: !chunk |-
+---+---+----+-----+----+
| + | 5 | p1 | 102 | 18 |
| + | 7 | p2 | 201 | 22 |
| + | 8 | p3 | 300 | 33 |
+---+---+----+-----+----+
output:
- !chunk |-
+---+---+----+-----+----+----+----+
| + | 1 | p1 | 101 | 16 | 10 | 18 |
| + | 4 | p2 | 200 | 20 | | 22 |
+---+---+----+-----+----+----+----+
- input: barrier
output:
- barrier
- input: recovery
output: []
- input: barrier
output:
- barrier
- input: !chunk |-
+---+----+----+-----+----+
| + | 10 | p1 | 103 | 13 |
| + | 12 | p2 | 202 | 28 |
| + | 13 | p3 | 301 | 39 |
+---+----+----+-----+----+
output:
- !watermark
col_idx: 0
val: 5
- !chunk |-
+---+---+----+-----+----+----+----+
| + | 5 | p1 | 102 | 18 | 16 | 13 |
| + | 7 | p2 | 201 | 22 | 20 | 28 |
| + | 8 | p3 | 300 | 33 | | 39 |
+---+---+----+-----+----+----+----+
- input: barrier
output:
- barrier

Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
---
source: src/stream/src/executor/over_window/eowc.rs
expression: "executor_snapshot(Box::new(async ||\n create_executor(calls.clone(), store.clone()).await),\n r###\"\n- barrier\n- !chunk |-\n I T I i\n + 1 p1 100 10\n + 1 p1 101 16\n + 4 p1 102 20\n\"###).await"
xxchan marked this conversation as resolved.
Show resolved Hide resolved
---
- input: barrier
output:
- barrier
- input: !chunk |-
+---+---+----+-----+----+
| + | 1 | p1 | 100 | 10 |
| + | 1 | p1 | 101 | 16 |
| + | 4 | p1 | 102 | 20 |
+---+---+----+-----+----+
output:
- !watermark
col_idx: 0
val: 1
- !chunk |-
+---+---+----+-----+----+----+
| + | 1 | p1 | 100 | 10 | 26 |
| + | 1 | p1 | 101 | 16 | 46 |
+---+---+----+-----+----+----+

Loading