Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 committed Mar 13, 2024
1 parent eb5dbb2 commit 0d9e35f
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions src/stream/src/task/barrier_manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ async fn test_managed_barrier_collection() -> StreamResult<()> {
}

#[tokio::test]
async fn test_managed_barrier_collection_before_send_request() -> StreamResult<()> {
async fn test_managed_barrier_collection_separately() -> StreamResult<()> {
let (actor_op_tx, manager) = LocalBarrierManager::spawn_for_test().await;

let register_sender = |actor_id: u32| {
Expand Down Expand Up @@ -106,15 +106,21 @@ async fn test_managed_barrier_collection_before_send_request() -> StreamResult<(
let barrier = Barrier::new_test_barrier(curr_epoch);
let epoch = barrier.epoch.prev;

// Collect a barrier before sending
manager.collect(extra_actor_id, &barrier);
// Read the mutation after receiving the barrier from remote input.
let mutation_reader = pin!(manager.read_barrier_mutation(&barrier));

// Send the barrier to all actors
actor_op_tx
.send_barrier(barrier.clone(), actor_ids_to_send, actor_ids_to_collect)
.await
.unwrap();

let mutation = mutation_reader.await.unwrap();
assert_eq!(mutation, barrier.mutation);

// Collect a barrier before sending
manager.collect(extra_actor_id, &barrier);

// Collect barriers from actors
let collected_barriers = rxs
.iter_mut()
Expand Down

0 comments on commit 0d9e35f

Please sign in to comment.