Skip to content

Commit

Permalink
test: snapshot testing for unary stream executors (#9787)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan authored May 19, 2023
1 parent 539b061 commit 09d1849
Show file tree
Hide file tree
Showing 29 changed files with 555 additions and 281 deletions.
8 changes: 7 additions & 1 deletion .config/hakari.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,10 @@ workspace-members = [
"risingwave_object_store",
"risingwave_bench",
]
third-party = [{ name = "opendal" }, { name = "criterion" }]
third-party = [
{ name = "opendal" },
# These are solely dev-dependencies. Unifying them may slow down build.
{ name = "criterion" },
{ name = "console" },
{ name = "similar" },
]
2 changes: 1 addition & 1 deletion .config/nextest.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[profile.default]
retries = 5
retries = 0
slow-timeout = { period = "5s" }
status-level = "all"
final-status-level = "slow"
Expand Down
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions src/common/src/array/stream_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ impl StreamChunk {
pub fn to_pretty_string(&self) -> String {
use comfy_table::{Cell, CellAlignment, Table};

if self.cardinality() == 0 {
return "(empty)".to_owned();
}

let mut table = Table::new();
table.load_preset("||--+-++| ++++++");
for (op, row_ref) in self.rows() {
Expand Down
7 changes: 6 additions & 1 deletion src/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,12 @@ workspace-hack = { path = "../workspace-hack" }
[dev-dependencies]
assert_matches = "1"
criterion = { version = "0.4", features = ["async_tokio", "async"] }
risingwave_hummock_test = { path = "../storage/hummock_test", features = ["test"] }
expect-test = "1"
risingwave_hummock_test = { path = "../storage/hummock_test", features = [
"test",
] }
serde = { version = "1.0", features = ["derive"] }
serde_yaml = "0.9"
tracing-test = "0.2"

[[bench]]
Expand Down
11 changes: 11 additions & 0 deletions src/stream/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# RisingWave Stream Engine

Read [Stream Engine][stream-engine] for more details about architecture.

[stream-engine]: https://github.com/risingwavelabs/risingwave/blob/main/docs/streaming-overview.md

## Writing executor tests

It's recommended to use [expect_test](https://github.com/rust-analyzer/expect-test) to write new tests, which can automatically update the expected output for you. See `check_until_pending` 's doc and its usage for more details.

It's recommended to write new tests as *integration tests* (i.e. in `tests/` directory) instead of *unit tests* (i.e. in `src/` directory). See [#9878](https://github.com/risingwavelabs/risingwave/issues/9878) for more details.
6 changes: 3 additions & 3 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,9 +576,9 @@ impl Barrier {

#[derive(Debug, PartialEq, Eq, Clone)]
pub struct Watermark {
col_idx: usize,
data_type: DataType,
val: ScalarImpl,
pub col_idx: usize,
pub data_type: DataType,
pub val: ScalarImpl,
}

impl PartialOrd for Watermark {
Expand Down
196 changes: 0 additions & 196 deletions src/stream/src/executor/over_window/eowc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,199 +494,3 @@ impl<S: StateStore> EowcOverWindowExecutor<S> {
}
}
}

#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicU64;
use std::sync::Arc;

use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
use risingwave_common::test_prelude::StreamChunkTestExt;
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;
use risingwave_expr::agg::{AggArgs, AggKind};
use risingwave_expr::function::window::{Frame, FrameBound, WindowFuncCall, WindowFuncKind};
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::StateStore;

use super::{EowcOverWindowExecutor, EowcOverWindowExecutorArgs};
use crate::common::table::state_table::StateTable;
use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
use crate::executor::{ActorContext, BoxedMessageStream, Executor};

async fn create_executor<S: StateStore>(
calls: Vec<WindowFuncCall>,
store: S,
) -> (MessageSender, BoxedMessageStream) {
let input_schema = Schema::new(vec![
Field::unnamed(DataType::Int64), // order key
Field::unnamed(DataType::Varchar), // partition key
Field::unnamed(DataType::Int64), // pk
Field::unnamed(DataType::Int32), // x
]);
let input_pk_indices = vec![2];
let partition_key_indices = vec![1];
let order_key_index = 0;

let table_columns = vec![
ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64), // order key
ColumnDesc::unnamed(ColumnId::new(1), DataType::Varchar), // partition key
ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64), // pk
ColumnDesc::unnamed(ColumnId::new(3), DataType::Int32), // x
];
let table_pk_indices = vec![1, 0, 2];
let table_order_types = vec![
OrderType::ascending(),
OrderType::ascending(),
OrderType::ascending(),
];

let output_pk_indices = vec![2];

let state_table = StateTable::new_without_distribution_inconsistent_op(
store,
TableId::new(1),
table_columns,
table_order_types,
table_pk_indices,
)
.await;

let (tx, source) = MockSource::channel(input_schema, input_pk_indices.clone());
let executor = EowcOverWindowExecutor::new(EowcOverWindowExecutorArgs {
input: source.boxed(),
actor_ctx: ActorContext::create(123),
pk_indices: output_pk_indices,
executor_id: 1,
calls,
partition_key_indices,
order_key_index,
state_table,
watermark_epoch: Arc::new(AtomicU64::new(0)),
});
(tx, executor.boxed().execute())
}

#[tokio::test]
async fn test_over_window() {
let store = MemoryStateStore::new();
let calls = vec![
WindowFuncCall {
kind: WindowFuncKind::Lag,
args: AggArgs::Unary(DataType::Int32, 3),
return_type: DataType::Int32,
frame: Frame::rows(FrameBound::Preceding(1), FrameBound::CurrentRow),
},
WindowFuncCall {
kind: WindowFuncKind::Lead,
args: AggArgs::Unary(DataType::Int32, 3),
return_type: DataType::Int32,
frame: Frame::rows(FrameBound::CurrentRow, FrameBound::Following(1)),
},
];

{
// test basic
let (mut tx, mut over_window) = create_executor(calls.clone(), store.clone()).await;

tx.push_barrier(1, false);
over_window.expect_barrier().await;

tx.push_chunk(StreamChunk::from_pretty(
" I T I i
+ 1 p1 100 10
+ 1 p1 101 16
+ 4 p2 200 20",
));
assert_eq!(1, over_window.expect_watermark().await.val.into_int64());
assert_eq!(
over_window.expect_chunk().await,
StreamChunk::from_pretty(
" I T I i i i
+ 1 p1 100 10 . 16"
)
);

tx.push_chunk(StreamChunk::from_pretty(
" I T I i
+ 5 p1 102 18
+ 7 p2 201 22
+ 8 p3 300 33",
));
// NOTE: no watermark message here, since watermark(1) was already received
assert_eq!(
over_window.expect_chunk().await,
StreamChunk::from_pretty(
" I T I i i i
+ 1 p1 101 16 10 18
+ 4 p2 200 20 . 22"
)
);

tx.push_barrier(2, false);
over_window.expect_barrier().await;
}

{
// test recovery
let (mut tx, mut over_window) = create_executor(calls.clone(), store.clone()).await;

tx.push_barrier(3, false);
over_window.expect_barrier().await;

tx.push_chunk(StreamChunk::from_pretty(
" I T I i
+ 10 p1 103 13
+ 12 p2 202 28
+ 13 p3 301 39",
));
assert_eq!(5, over_window.expect_watermark().await.val.into_int64());
assert_eq!(
over_window.expect_chunk().await,
StreamChunk::from_pretty(
" I T I i i i
+ 5 p1 102 18 16 13
+ 7 p2 201 22 20 28
+ 8 p3 300 33 . 39"
)
);

tx.push_barrier(4, false);
over_window.expect_barrier().await;
}
}

#[tokio::test]
async fn test_over_window_aggregate() {
let store = MemoryStateStore::new();
let calls = vec![WindowFuncCall {
kind: WindowFuncKind::Aggregate(AggKind::Sum),
args: AggArgs::Unary(DataType::Int32, 3),
return_type: DataType::Int64,
frame: Frame::rows(FrameBound::Preceding(1), FrameBound::Following(1)),
}];

let (mut tx, mut over_window) = create_executor(calls.clone(), store.clone()).await;

tx.push_barrier(1, false);
over_window.expect_barrier().await;

tx.push_chunk(StreamChunk::from_pretty(
" I T I i
+ 1 p1 100 10
+ 1 p1 101 16
+ 4 p1 102 20",
));
assert_eq!(1, over_window.expect_watermark().await.val.into_int64());
let chunk = over_window.expect_chunk().await;
assert_eq!(
chunk,
StreamChunk::from_pretty(
" I T I i I
+ 1 p1 100 10 26
+ 1 p1 101 16 46"
)
);
}
}
79 changes: 0 additions & 79 deletions src/stream/src/executor/project_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,82 +187,3 @@ impl ProjectSetExecutor {
}
}
}

#[cfg(test)]
mod tests {
use futures::StreamExt;
use risingwave_common::array::stream_chunk::StreamChunkTestExt;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::DataType;
use risingwave_expr::expr::build_from_pretty;
use risingwave_expr::table_function::repeat;

use super::super::test_utils::MockSource;
use super::super::*;
use super::*;

const CHUNK_SIZE: usize = 1024;

#[tokio::test]
async fn test_project_set() {
let chunk1 = StreamChunk::from_pretty(
" I I
+ 1 4
+ 2 5
+ 3 6",
);
let chunk2 = StreamChunk::from_pretty(
" I I
+ 7 8
- 3 6",
);
let schema = Schema {
fields: vec![
Field::unnamed(DataType::Int64),
Field::unnamed(DataType::Int64),
],
};
let source = MockSource::with_chunks(schema, PkIndices::new(), vec![chunk1, chunk2]);

let test_expr = build_from_pretty("(add:int8 $0:int8 $1:int8)");
let tf1 = repeat(build_from_pretty("1:int4"), 1);
let tf2 = repeat(build_from_pretty("2:int4"), 2);

let project_set = Box::new(ProjectSetExecutor::new(
Box::new(source),
vec![],
vec![test_expr.into(), tf1.into(), tf2.into()],
1,
CHUNK_SIZE,
));

let expected = vec![
StreamChunk::from_pretty(
" I I i i
+ 0 5 1 2
+ 1 5 . 2
+ 0 7 1 2
+ 1 7 . 2
+ 0 9 1 2
+ 1 9 . 2",
),
StreamChunk::from_pretty(
" I I i i
+ 0 15 1 2
+ 1 15 . 2
- 0 9 1 2
- 1 9 . 2",
),
];

let mut project_set = project_set.execute();

for expected in expected {
let msg = project_set.next().await.unwrap().unwrap();
let chunk = msg.as_chunk().unwrap();
assert_eq!(*chunk, expected);
}
assert!(project_set.next().await.unwrap().unwrap().is_stop());
}
}
Loading

0 comments on commit 09d1849

Please sign in to comment.