From 78ebf98c54609f603df2dc5658eefead7151a337 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Tue, 19 Nov 2024 17:39:36 +0800 Subject: [PATCH] fix all topn ut Signed-off-by: Richard Chien --- src/stream/src/executor/top_n/group_top_n.rs | 160 +++----- .../src/executor/top_n/top_n_appendonly.rs | 64 ++- src/stream/src/executor/top_n/top_n_plain.rs | 388 ++++++------------ 3 files changed, 220 insertions(+), 392 deletions(-) diff --git a/src/stream/src/executor/top_n/group_top_n.rs b/src/stream/src/executor/top_n/group_top_n.rs index 1b64a268b860b..9bf1fecc3f3a3 100644 --- a/src/stream/src/executor/top_n/group_top_n.rs +++ b/src/stream/src/executor/top_n/group_top_n.rs @@ -266,7 +266,6 @@ where mod tests { use std::sync::atomic::AtomicU64; - use assert_matches::assert_matches; use risingwave_common::array::stream_chunk::StreamChunkTestExt; use risingwave_common::catalog::Field; use risingwave_common::hash::SerializedKey; @@ -373,7 +372,7 @@ mod tests { ) .await; let schema = source.schema().clone(); - let top_n_executor = GroupTopNExecutor::::new( + let top_n = GroupTopNExecutor::::new( source, ActorContext::for_test(0), schema, @@ -385,14 +384,13 @@ mod tests { Arc::new(AtomicU64::new(0)), ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I + 10 9 1 + 8 8 2 @@ -400,58 +398,50 @@ mod tests { + 9 1 1 + 10 1 1 ", - ), + ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I - 10 9 1 - 8 8 2 - 10 1 1 + 8 1 3 ", - ), + ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I - 7 8 2 - 8 1 3 - 9 1 1 ", - ), + ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I + 5 1 1 + 2 1 1 ", - ), + ) + .sort_rows(), ); } @@ -469,7 +459,7 @@ mod tests { ) .await; let schema = source.schema().clone(); - let top_n_executor = GroupTopNExecutor::::new( + let top_n = GroupTopNExecutor::::new( source, ActorContext::for_test(0), schema, @@ -481,66 +471,57 @@ mod tests { Arc::new(AtomicU64::new(0)), ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I + 8 8 2 + 10 1 1 + 8 1 3 ", - ), + ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I - 8 8 2 - 10 1 1 ", - ), + ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I - 8 1 3", - ), + ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I + 5 1 1 + 3 1 2 ", - ), + ) + .sort_rows(), ); } @@ -558,7 +539,7 @@ mod tests { ) .await; let schema = source.schema().clone(); - let top_n_executor = GroupTopNExecutor::::new( + let top_n = GroupTopNExecutor::::new( source, ActorContext::for_test(0), schema, @@ -570,14 +551,13 @@ mod tests { Arc::new(AtomicU64::new(0)), ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I + 10 9 1 + 8 8 2 @@ -585,56 +565,48 @@ mod tests { + 9 1 1 + 10 1 1 + 8 1 3", - ), + ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I - 10 9 1 - 8 8 2 - 10 1 1", - ), + ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I - 7 8 2 - 8 1 3 - 9 1 1", - ), + ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I + 5 1 1 + 2 1 1 + 3 1 2 + 4 1 3", - ), + ) + .sort_rows(), ); } diff --git a/src/stream/src/executor/top_n/top_n_appendonly.rs b/src/stream/src/executor/top_n/top_n_appendonly.rs index ce00d56558ab2..9d4e8127c2a98 100644 --- a/src/stream/src/executor/top_n/top_n_appendonly.rs +++ b/src/stream/src/executor/top_n/top_n_appendonly.rs @@ -160,8 +160,6 @@ where #[cfg(test)] mod tests { - use assert_matches::assert_matches; - use futures::StreamExt; use risingwave_common::array::stream_chunk::StreamChunkTestExt; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::{Field, Schema}; @@ -171,7 +169,7 @@ mod tests { use super::AppendOnlyTopNExecutor; use crate::executor::test_utils::top_n_executor::create_in_memory_state_table; - use crate::executor::test_utils::MockSource; + use crate::executor::test_utils::{MockSource, StreamExecutorTestExt}; use crate::executor::{ActorContext, Barrier, Execute, Executor, Message, PkIndices}; fn create_stream_chunks() -> Vec { @@ -250,7 +248,7 @@ mod tests { .await; let schema = source.schema().clone(); - let top_n_executor = AppendOnlyTopNExecutor::<_, false>::new( + let top_n = AppendOnlyTopNExecutor::<_, false>::new( source, ActorContext::for_test(0), schema, @@ -260,54 +258,43 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init epoch - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 1 0 + 2 1 + 3 2 - + 10 3 + 9 4 - - 10 3 + 8 5" ) + .sort_rows(), ); // We added (1, 2, 3, 10, 9, 8). // Now (1, 2, 3, 8, 9) // Barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 9 4 - + 7 6 - 8 5 + 3 7 - - 7 6 + 1 8" ) + .sort_rows(), ); // We added (7, 3, 1, 9). // Now (1, 1, 2, 3, 3) // Barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 3 7 @@ -315,6 +302,7 @@ mod tests { - 3 2 + 1 13" ) + .sort_rows(), ); // We added (1, 1, 2, 3). // Now (1, 1, 1, 1, 2) @@ -331,7 +319,7 @@ mod tests { .await; let schema = source.schema().clone(); - let top_n_executor = AppendOnlyTopNExecutor::<_, false>::new( + let top_n = AppendOnlyTopNExecutor::<_, false>::new( source, ActorContext::for_test(0), schema, @@ -341,30 +329,26 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init epoch - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 10 3 + 9 4 + 8 5" ) + .sort_rows(), ); // We added (1, 2, 3, 10, 9, 8). // Now (1, 2, 3) -> (8, 9, 10) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 7 6 @@ -373,17 +357,14 @@ mod tests { - 9 4 + 3 2" ) + .sort_rows(), ); // We added (7, 3, 1, 9). // Now (1, 1, 2) -> (3, 3, 7, 8) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 8 5 @@ -393,6 +374,7 @@ mod tests { - 3 7 + 2 14" ) + .sort_rows(), ); // We added (1, 1, 2, 3). // Now (1, 1, 1) -> (1, 2, 2, 3) diff --git a/src/stream/src/executor/top_n/top_n_plain.rs b/src/stream/src/executor/top_n/top_n_plain.rs index b4d8db242e999..73468cd7dbe6a 100644 --- a/src/stream/src/executor/top_n/top_n_plain.rs +++ b/src/stream/src/executor/top_n/top_n_plain.rs @@ -196,7 +196,6 @@ where #[cfg(test)] mod tests { - use assert_matches::assert_matches; use risingwave_common::array::stream_chunk::StreamChunkTestExt; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::DataType; @@ -212,6 +211,7 @@ mod tests { use risingwave_common::util::epoch::test_epoch; use super::*; + use crate::executor::test_utils::StreamExecutorTestExt; fn create_stream_chunks() -> Vec { let chunk1 = StreamChunk::from_pretty( @@ -299,7 +299,7 @@ mod tests { .await; let schema = source.schema().clone(); - let top_n_executor = TopNExecutor::<_, false>::new( + let top_n = TopNExecutor::<_, false>::new( source, ActorContext::for_test(0), schema, @@ -309,49 +309,38 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 10 3 + 9 4 + 8 5" ) + .sort_rows(), ); // Barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - + 7 6 - - 7 6 - - 8 5 - + 8 5 - 8 5 + 11 8" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); // (8, 9, 10, 11, 12, 13, 14) assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 8 5 @@ -359,29 +348,24 @@ mod tests { + 13 11 + 14 12" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; // (10, 12, 13, 14) - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 8 5 - 9 4 - 11 8" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; } #[tokio::test] @@ -394,7 +378,7 @@ mod tests { ) .await; let schema = source.schema().clone(); - let top_n_executor = TopNExecutor::<_, false>::new( + let top_n = TopNExecutor::<_, false>::new( source, ActorContext::for_test(0), schema, @@ -404,76 +388,58 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 1 0 + 2 1 + 3 2 - + 10 3 - - 10 3 - + 9 4 - - 9 4 + 8 5" ) + .sort_rows(), ); // now () -> (1, 2, 3, 8) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - - 8 5 + 7 6 - 3 2 - + 8 5 - 1 0 - + 9 4 - - 9 4 + 5 7 - 2 1 + 9 4" ) + .sort_rows(), ); // (5, 7, 8, 9) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 9 4 + 6 9" ) + .sort_rows(), ); // (5, 6, 7, 8) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 5 7 @@ -481,13 +447,11 @@ mod tests { - 6 9 + 10 3" ) + .sort_rows(), ); // (7, 8, 9, 10) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; } // Should have the same result as above, since there are no duplicate sort keys. @@ -501,7 +465,7 @@ mod tests { ) .await; let schema = source.schema().clone(); - let top_n_executor = TopNExecutor::<_, true>::new( + let top_n = TopNExecutor::<_, true>::new( source, ActorContext::for_test(0), schema, @@ -511,76 +475,58 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 1 0 + 2 1 + 3 2 - + 10 3 - - 10 3 - + 9 4 - - 9 4 + 8 5" ) + .sort_rows(), ); // now () -> (1, 2, 3, 8) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - - 8 5 + 7 6 - 3 2 - + 8 5 - 1 0 - + 9 4 - - 9 4 + 5 7 - 2 1 + 9 4" ) + .sort_rows(), ); // (5, 7, 8, 9) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 9 4 + 6 9" ) + .sort_rows(), ); // (5, 6, 7, 8) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 5 7 @@ -588,13 +534,11 @@ mod tests { - 6 9 + 10 3" ) + .sort_rows(), ); // (7, 8, 9, 10) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; } #[tokio::test] @@ -607,7 +551,7 @@ mod tests { ) .await; let schema = source.schema().clone(); - let top_n_executor = TopNExecutor::<_, false>::new( + let top_n = TopNExecutor::<_, false>::new( source, ActorContext::for_test(0), schema, @@ -617,60 +561,46 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 10 3 + 9 4 + 8 5" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - + 7 6 - - 7 6 - - 8 5 - + 8 5 - 8 5 + 11 8" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - + 8 5" + + 8 5" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 8 5 @@ -680,12 +610,10 @@ mod tests { - 11 8 + 14 12" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; } } @@ -696,6 +624,7 @@ mod tests { use super::*; use crate::executor::test_utils::top_n_executor::create_in_memory_state_table_from_state_store; + use crate::executor::test_utils::StreamExecutorTestExt; fn create_source_new() -> Executor { let mut chunks = vec![ StreamChunk::from_pretty( @@ -824,7 +753,7 @@ mod tests { ) .await; let schema = source.schema().clone(); - let top_n_executor = TopNExecutor::<_, false>::new( + let top_n = TopNExecutor::<_, false>::new( source, ActorContext::for_test(0), schema, @@ -834,55 +763,42 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); - - let res = top_n_executor.next().await.unwrap().unwrap(); - // should be empty - assert_eq!( - *res.as_chunk().unwrap(), - StreamChunk::from_pretty(" I I I I") - ); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I I I - + 5 1 4 1002 - " + + 5 1 4 1002" ) + .sort_rows(), ); - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I I I - + 1 9 1 1003 - + 9 8 1 1004 - - 9 8 1 1004 - + 1 1 4 1001", - ), + + 1 9 1 1003 + + 1 1 4 1001", + ) + .sort_rows(), ); - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I I I - - 5 1 4 1002 - + 1 0 2 1006", + - 5 1 4 1002 + + 1 0 2 1006", ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; } #[tokio::test] @@ -902,7 +818,7 @@ mod tests { .await; let source = create_source_new_before_recovery(); let schema = source.schema().clone(); - let top_n_executor = TopNExecutor::<_, false>::new( + let top_n = TopNExecutor::<_, false>::new( source, ActorContext::for_test(0), schema, @@ -912,33 +828,22 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); - - let res = top_n_executor.next().await.unwrap().unwrap(); - // should be empty - assert_eq!( - *res.as_chunk().unwrap(), - StreamChunk::from_pretty(" I I I I") - ); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I I I - + 5 1 4 1002 - " + + 5 1 4 1002" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; let state_table = create_in_memory_state_table_from_state_store( &[ @@ -956,7 +861,7 @@ mod tests { // recovery let source = create_source_new_after_recovery(); let schema = source.schema().clone(); - let top_n_executor_after_recovery = TopNExecutor::<_, false>::new( + let top_n_after_recovery = TopNExecutor::<_, false>::new( source, ActorContext::for_test(0), schema, @@ -966,41 +871,33 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor_after_recovery.boxed().execute(); + let mut top_n = top_n_after_recovery.boxed().execute(); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I I I - + 1 9 1 1003 - + 9 8 1 1004 - - 9 8 1 1004 - + 1 1 4 1001", - ), + + 1 9 1 1003 + + 1 1 4 1001", + ) + .sort_rows(), ); - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I I I - - 5 1 4 1002 - + 1 0 2 1006", + - 5 1 4 1002 + + 1 0 2 1006", ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; } } @@ -1011,6 +908,7 @@ mod tests { use super::*; use crate::executor::test_utils::top_n_executor::create_in_memory_state_table_from_state_store; + use crate::executor::test_utils::StreamExecutorTestExt; fn create_source() -> Executor { let mut chunks = vec![ @@ -1082,7 +980,7 @@ mod tests { ) .await; let schema = source.schema().clone(); - let top_n_executor = TopNExecutor::new_with_ties_for_test( + let top_n = TopNExecutor::new_with_ties_for_test( source, ActorContext::for_test(0), schema, @@ -1092,64 +990,56 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 1 0 + 2 1 + 3 2" ) + .sort_rows(), ); - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - + 3 6 - + 3 7 - - 3 7 - - 3 6 - 3 2 + 1 8 + 2 9" ) + .sort_rows(), ); - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 1 0" ) + .sort_rows(), ); // High cache has only 2 capacity, but we need to trigger 3 inserts here! - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 1 8 + 3 2 + 3 6 - + 3 7 - " + + 3 7" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; } fn create_source_before_recovery() -> Executor { @@ -1161,8 +1051,7 @@ mod tests { + 3 2 + 10 3 + 9 4 - + 8 5 - ", + + 8 5", ), StreamChunk::from_pretty( " I I @@ -1226,7 +1115,7 @@ mod tests { .await; let source = create_source_before_recovery(); let schema = source.schema().clone(); - let top_n_executor = TopNExecutor::new_with_ties_for_test( + let top_n = TopNExecutor::new_with_ties_for_test( source, ActorContext::for_test(0), schema, @@ -1236,41 +1125,34 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 1 0 + 2 1 + 3 2" ) + .sort_rows(), ); - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - + 3 6 - + 3 7 - - 3 7 - - 3 6 - 3 2 + 1 8 + 2 9" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; let state_table = create_in_memory_state_table_from_state_store( &[DataType::Int64, DataType::Int64], @@ -1283,7 +1165,7 @@ mod tests { // recovery let source = create_source_after_recovery(); let schema = source.schema().clone(); - let top_n_executor_after_recovery = TopNExecutor::new_with_ties_for_test( + let top_n_after_recovery = TopNExecutor::new_with_ties_for_test( source, ActorContext::for_test(0), schema, @@ -1293,42 +1175,34 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor_after_recovery.boxed().execute(); + let mut top_n = top_n_after_recovery.boxed().execute(); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 1 0" ) + .sort_rows(), ); // High cache has only 2 capacity, but we need to trigger 3 inserts here! - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 1 8 + 3 2 + 3 6 - + 3 7 - " + + 3 7" ) + .sort_rows(), ); - println!("hello"); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; } } }