diff --git a/src/stream/src/task/barrier_manager/tests.rs b/src/stream/src/task/barrier_manager/tests.rs index ae248facc0e5f..c3624b93440f7 100644 --- a/src/stream/src/task/barrier_manager/tests.rs +++ b/src/stream/src/task/barrier_manager/tests.rs @@ -76,7 +76,7 @@ async fn test_managed_barrier_collection() -> StreamResult<()> { } #[tokio::test] -async fn test_managed_barrier_collection_before_send_request() -> StreamResult<()> { +async fn test_managed_barrier_collection_separately() -> StreamResult<()> { let (actor_op_tx, manager) = LocalBarrierManager::spawn_for_test().await; let register_sender = |actor_id: u32| { @@ -106,8 +106,8 @@ async fn test_managed_barrier_collection_before_send_request() -> StreamResult<( let barrier = Barrier::new_test_barrier(curr_epoch); let epoch = barrier.epoch.prev; - // Collect a barrier before sending - manager.collect(extra_actor_id, &barrier); + // Read the mutation after receiving the barrier from remote input. + let mutation_reader = pin!(manager.read_barrier_mutation(&barrier)); // Send the barrier to all actors actor_op_tx @@ -115,6 +115,12 @@ async fn test_managed_barrier_collection_before_send_request() -> StreamResult<( .await .unwrap(); + let mutation = mutation_reader.await.unwrap(); + assert_eq!(mutation, barrier.mutation); + + // Collect a barrier before sending + manager.collect(extra_actor_id, &barrier); + // Collect barriers from actors let collected_barriers = rxs .iter_mut()