diff --git a/src/stream/src/executor/top_n/group_top_n.rs b/src/stream/src/executor/top_n/group_top_n.rs index 1d051e87248f..1b64a268b860 100644 --- a/src/stream/src/executor/top_n/group_top_n.rs +++ b/src/stream/src/executor/top_n/group_top_n.rs @@ -276,7 +276,7 @@ mod tests { use super::*; use crate::executor::test_utils::top_n_executor::create_in_memory_state_table; - use crate::executor::test_utils::MockSource; + use crate::executor::test_utils::{MockSource, StreamExecutorTestExt}; fn create_schema() -> Schema { Schema { @@ -637,4 +637,104 @@ mod tests { ), ); } + + #[tokio::test] + async fn test_compact_changes() { + let schema = create_schema(); + let source = MockSource::with_messages(vec![ + Message::Barrier(Barrier::new_test_barrier(test_epoch(1))), + Message::Chunk(StreamChunk::from_pretty( + " I I I + + 0 0 9 + + 0 0 8 + + 0 0 7 + + 0 0 6 + + 0 1 15 + + 0 1 14", + )), + Message::Barrier(Barrier::new_test_barrier(test_epoch(2))), + Message::Chunk(StreamChunk::from_pretty( + " I I I + - 0 0 6 + - 0 0 8 + + 0 0 4 + + 0 0 3 + + 0 1 12 + + 0 2 26 + - 0 1 12 + + 0 1 11", + )), + Message::Barrier(Barrier::new_test_barrier(test_epoch(3))), + Message::Chunk(StreamChunk::from_pretty( + " I I I + + 0 0 11", // this should result in no chunk output + )), + Message::Barrier(Barrier::new_test_barrier(test_epoch(4))), + ]) + .into_executor(schema.clone(), vec![2]); + + let state_table = create_in_memory_state_table( + &schema.data_types(), + &[ + OrderType::ascending(), + OrderType::ascending(), + OrderType::ascending(), + ], + &[0, 1, 2], // table pk = group key (0, 1) + order key (2) + additional pk (empty) + ) + .await; + + let top_n = GroupTopNExecutor::::new( + source, + ActorContext::for_test(0), + schema, + vec![ + ColumnOrder::new(0, OrderType::ascending()), + ColumnOrder::new(1, OrderType::ascending()), + ColumnOrder::new(2, OrderType::ascending()), + ], + (0, 2), // (offset, limit) + vec![ColumnOrder::new(2, OrderType::ascending())], + vec![0, 1], + state_table, + Arc::new(AtomicU64::new(0)), + ) + .unwrap(); + let mut top_n = top_n.boxed().execute(); + + // initial barrier + top_n.expect_barrier().await; + + assert_eq!( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( + " I I I + + 0 0 7 + + 0 0 6 + + 0 1 15 + + 0 1 14", + ) + .sort_rows(), + ); + top_n.expect_barrier().await; + + assert_eq!( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( + " I I I + - 0 0 6 + - 0 0 7 + + 0 0 4 + + 0 0 3 + - 0 1 15 + + 0 1 11 + + 0 2 26", + ) + .sort_rows(), + ); + top_n.expect_barrier().await; + + // no output chunk for the last input chunk + top_n.expect_barrier().await; + } }