Skip to content

Commit

Permalink
fix unit test
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Aug 1, 2024
1 parent a206402 commit 5c53a46
Showing 1 changed file with 47 additions and 69 deletions.
116 changes: 47 additions & 69 deletions src/stream/src/executor/dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,17 +555,10 @@ mod tests {

async fn create_executor(
comparator: PbExprNodeType,
store: MemoryStateStore,
cleaned_by_watermark: bool,
) -> (MessageSender, MessageSender, BoxedMessageStream) {
let mem_state = MemoryStateStore::new();
create_executor_inner(comparator, mem_state, false).await
}

async fn create_executor_inner(
comparator: PbExprNodeType,
mem_state: MemoryStateStore,
always_relax: bool,
) -> (MessageSender, MessageSender, BoxedMessageStream) {
let (mem_state_l, mem_state_r) = create_in_memory_state_table(mem_state).await;
let (mem_state_l, mem_state_r) = create_in_memory_state_table(store).await;
let schema = Schema {
fields: vec![Field::unnamed(DataType::Int64)],
};
Expand All @@ -591,7 +584,7 @@ mod tests {
mem_state_r,
Arc::new(StreamingMetrics::unused()),
1024,
false,
cleaned_by_watermark,
);
(tx_l, tx_r, executor.boxed().execute())
}
Expand Down Expand Up @@ -626,9 +619,9 @@ mod tests {
" I
+ 4",
);
let mem_state = MemoryStateStore::new();
let mem_store = MemoryStateStore::new();
let (mut tx_l, mut tx_r, mut dynamic_filter) =
create_executor_inner(PbExprNodeType::GreaterThan, mem_state.clone(), false).await;
create_executor(PbExprNodeType::GreaterThan, mem_store.clone(), false).await;

// push the init barrier for left and right
tx_l.push_barrier(test_epoch(1), false);
Expand All @@ -651,7 +644,7 @@ mod tests {

// Recover executor from state store
let (mut tx_l, mut tx_r, mut dynamic_filter) =
create_executor_inner(PbExprNodeType::GreaterThan, mem_state.clone(), false).await;
create_executor(PbExprNodeType::GreaterThan, mem_store.clone(), false).await;

// push the recovery barrier for left and right
tx_l.push_barrier(test_epoch(2), false);
Expand Down Expand Up @@ -698,7 +691,7 @@ mod tests {

// Recover executor from state store
let (mut tx_l, mut tx_r, mut dynamic_filter) =
create_executor_inner(PbExprNodeType::GreaterThan, mem_state.clone(), false).await;
create_executor(PbExprNodeType::GreaterThan, mem_store.clone(), false).await;

// push recovery barrier
tx_l.push_barrier(test_epoch(3), false);
Expand Down Expand Up @@ -783,8 +776,9 @@ mod tests {
" I
+ 4",
);
let mem_store = MemoryStateStore::new();
let (mut tx_l, mut tx_r, mut dynamic_filter) =
create_executor(PbExprNodeType::GreaterThan).await;
create_executor(PbExprNodeType::GreaterThan, mem_store, false).await;

// push the init barrier for left and right
tx_l.push_barrier(test_epoch(1), false);
Expand Down Expand Up @@ -889,8 +883,9 @@ mod tests {
" I
+ 5",
);
let mem_store = MemoryStateStore::new();
let (mut tx_l, mut tx_r, mut dynamic_filter) =
create_executor(PbExprNodeType::GreaterThanOrEqual).await;
create_executor(PbExprNodeType::GreaterThanOrEqual, mem_store, false).await;

// push the init barrier for left and right
tx_l.push_barrier(test_epoch(1), false);
Expand Down Expand Up @@ -995,8 +990,9 @@ mod tests {
" I
+ 1",
);
let mem_store = MemoryStateStore::new();
let (mut tx_l, mut tx_r, mut dynamic_filter) =
create_executor(PbExprNodeType::LessThan).await;
create_executor(PbExprNodeType::LessThan, mem_store, false).await;

// push the init barrier for left and right
tx_l.push_barrier(test_epoch(1), false);
Expand Down Expand Up @@ -1101,8 +1097,9 @@ mod tests {
" I
+ 0",
);
let mem_store = MemoryStateStore::new();
let (mut tx_l, mut tx_r, mut dynamic_filter) =
create_executor(PbExprNodeType::LessThanOrEqual).await;
create_executor(PbExprNodeType::LessThanOrEqual, mem_store, false).await;

// push the init barrier for left and right
tx_l.push_barrier(test_epoch(1), false);
Expand Down Expand Up @@ -1194,9 +1191,10 @@ mod tests {
}

#[tokio::test]
async fn test_dynamic_filter_always_relax() -> StreamExecutorResult<()> {
async fn test_dynamic_filter_state_cleaning() -> StreamExecutorResult<()> {
let chunk_l1 = StreamChunk::from_pretty(
" I
+ 1
+ 2
+ 3
+ 4
Expand All @@ -1212,17 +1210,19 @@ mod tests {
" I
+ 2",
);
let watermark_r1 = 2;
let chunk_r2 = StreamChunk::from_pretty(
" I
+ 5",
);
let watermark_r2 = 5;

let mem_state = MemoryStateStore::new();
let mem_store = MemoryStateStore::new();
let (mut tx_l, mut tx_r, mut dynamic_filter) =
create_executor_inner(PbExprNodeType::LessThanOrEqual, mem_state.clone(), true).await;
create_executor(PbExprNodeType::LessThanOrEqual, mem_store.clone(), true).await;
let column_descs = ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64);
let table = StorageTable::for_test(
mem_state.clone(),
mem_store.clone(),
TableId::new(0),
vec![column_descs],
vec![OrderType::ascending()],
Expand All @@ -1235,69 +1235,48 @@ mod tests {
tx_r.push_barrier(test_epoch(1), false);
dynamic_filter.next_unwrap_ready_barrier()?;

// push the 1st right chunk
// push the 1st set of messages
tx_l.push_chunk(chunk_l1);
tx_r.push_chunk(chunk_r1);

// push the init barrier for left and right
tx_r.push_int64_watermark(0, watermark_r1);
tx_l.push_barrier(test_epoch(2), false);
tx_r.push_barrier(test_epoch(2), false);

// Get the barrier
dynamic_filter.next_unwrap_ready_barrier()?;

// push the 1st left chunk
tx_l.push_chunk(chunk_l1);

let chunk = dynamic_filter.next_unwrap_ready_chunk()?.compact();
let chunk = dynamic_filter.expect_chunk().await;
assert_eq!(
chunk,
chunk.compact(),
StreamChunk::from_pretty(
" I
+ 1
+ 2"
)
);

// push the init barrier for left and right
tx_l.push_barrier(test_epoch(3), false);
tx_r.push_barrier(test_epoch(3), false);

// Get the barrier
dynamic_filter.next_unwrap_ready_barrier()?;
dynamic_filter.expect_barrier().await;

assert!(!in_table(&table, 2).await);
assert!(!in_table(&table, 1).await); // `1` should be cleaned because it's less than watermark
assert!(in_table(&table, 2).await);
assert!(in_table(&table, 3).await);
assert!(in_table(&table, 4).await);

// push the 2nd left chunk
// push the 2nd set of messages
tx_l.push_chunk(chunk_l2);
let chunk = dynamic_filter.next_unwrap_ready_chunk()?.compact();
tx_r.push_chunk(chunk_r2);
tx_r.push_int64_watermark(0, watermark_r2);
tx_l.push_barrier(test_epoch(3), false);
tx_r.push_barrier(test_epoch(3), false);

let chunk = dynamic_filter.expect_chunk().await;
assert_eq!(
chunk,
chunk.compact(),
StreamChunk::from_pretty(
// the two rows are directly sent to the output cuz they satisfy the condition of previously committed rhs
" I
+ 1
- 2"
)
);
// push the init barrier for left and right
tx_l.push_barrier(test_epoch(4), false);
tx_r.push_barrier(test_epoch(4), false);
// Get the barrier
dynamic_filter.next_unwrap_ready_barrier()?;

assert!(!in_table(&table, 2).await);
assert!(!in_table(&table, 2).await);
assert!(!in_table(&table, 3).await);
assert!(in_table(&table, 4).await);

// push the 2nd right chunk
tx_r.push_chunk(chunk_r2);

// push the init barrier for left and right
tx_l.push_barrier(test_epoch(5), false);
tx_r.push_barrier(test_epoch(5), false);

let chunk = dynamic_filter.next_unwrap_ready_chunk()?.compact();
let chunk = dynamic_filter.expect_chunk().await;
assert_eq!(
chunk,
StreamChunk::from_pretty(
Expand All @@ -1307,14 +1286,13 @@ mod tests {
)
);

// Get the barrier
dynamic_filter.next_unwrap_ready_barrier()?;
tx_l.push_barrier(test_epoch(6), false);
tx_r.push_barrier(test_epoch(6), false);
// Get the barrier
dynamic_filter.next_unwrap_ready_barrier()?;
dynamic_filter.expect_barrier().await;

assert!(!in_table(&table, 2).await);
assert!(!in_table(&table, 3).await);
assert!(!in_table(&table, 4).await);
assert!(in_table(&table, 5).await);

Ok(())
}
}

0 comments on commit 5c53a46

Please sign in to comment.