Skip to content

Commit

Permalink
add ut for topn output compaction
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Nov 19, 2024
1 parent 587dba8 commit 578c81c
Showing 1 changed file with 101 additions and 1 deletion.
102 changes: 101 additions & 1 deletion src/stream/src/executor/top_n/group_top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ mod tests {

use super::*;
use crate::executor::test_utils::top_n_executor::create_in_memory_state_table;
use crate::executor::test_utils::MockSource;
use crate::executor::test_utils::{MockSource, StreamExecutorTestExt};

fn create_schema() -> Schema {
Schema {
Expand Down Expand Up @@ -637,4 +637,104 @@ mod tests {
),
);
}

#[tokio::test]
async fn test_compact_changes() {
let schema = create_schema();
let source = MockSource::with_messages(vec![
Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
Message::Chunk(StreamChunk::from_pretty(
" I I I
+ 0 0 9
+ 0 0 8
+ 0 0 7
+ 0 0 6
+ 0 1 15
+ 0 1 14",
)),
Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
Message::Chunk(StreamChunk::from_pretty(
" I I I
- 0 0 6
- 0 0 8
+ 0 0 4
+ 0 0 3
+ 0 1 12
+ 0 2 26
- 0 1 12
+ 0 1 11",
)),
Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
Message::Chunk(StreamChunk::from_pretty(
" I I I
+ 0 0 11", // this should result in no chunk output
)),
Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
])
.into_executor(schema.clone(), vec![2]);

let state_table = create_in_memory_state_table(
&schema.data_types(),
&[
OrderType::ascending(),
OrderType::ascending(),
OrderType::ascending(),
],
&[0, 1, 2], // table pk = group key (0, 1) + order key (2) + additional pk (empty)
)
.await;

let top_n = GroupTopNExecutor::<SerializedKey, MemoryStateStore, false>::new(
source,
ActorContext::for_test(0),
schema,
vec![
ColumnOrder::new(0, OrderType::ascending()),
ColumnOrder::new(1, OrderType::ascending()),
ColumnOrder::new(2, OrderType::ascending()),
],
(0, 2), // (offset, limit)
vec![ColumnOrder::new(2, OrderType::ascending())],
vec![0, 1],
state_table,
Arc::new(AtomicU64::new(0)),
)
.unwrap();
let mut top_n = top_n.boxed().execute();

// initial barrier
top_n.expect_barrier().await;

assert_eq!(
top_n.expect_chunk().await.sort_rows(),
StreamChunk::from_pretty(
" I I I
+ 0 0 7
+ 0 0 6
+ 0 1 15
+ 0 1 14",
)
.sort_rows(),
);
top_n.expect_barrier().await;

assert_eq!(
top_n.expect_chunk().await.sort_rows(),
StreamChunk::from_pretty(
" I I I
- 0 0 6
- 0 0 7
+ 0 0 4
+ 0 0 3
- 0 1 15
+ 0 1 11
+ 0 2 26",
)
.sort_rows(),
);
top_n.expect_barrier().await;

// no output chunk for the last input chunk
top_n.expect_barrier().await;
}
}

0 comments on commit 578c81c

Please sign in to comment.