From e75e330c2071f252c19abf7080232716b6a99c85 Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 20 Aug 2024 01:25:31 +0800 Subject: [PATCH] add test and refine --- src/stream/src/task/barrier_manager.rs | 27 ++++-- .../src/task/barrier_manager/managed_state.rs | 20 ++-- src/stream/src/task/barrier_manager/tests.rs | 92 ++++++++++++++++++- 3 files changed, 126 insertions(+), 13 deletions(-) diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index ca37707c77287..3e50a81bf25df 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -425,11 +425,15 @@ impl LocalBarrierWorker { } }, event = self.barrier_event_rx.recv() => { - self.handle_barrier_event(event.expect("should not be none")); + // event should not be None because the LocalBarrierManager holds a copy of tx + let result = self.handle_barrier_event(event.expect("should not be none")); + if let Err((actor_id, err)) = result { + self.notify_actor_failure(actor_id, err, "failed to handle barrier event").await; + } }, failure = self.actor_failure_rx.recv() => { let (actor_id, err) = failure.unwrap(); - self.notify_actor_failure(actor_id, err).await; + self.notify_actor_failure(actor_id, err, "recv actor failure").await; }, actor_op = actor_op_rx.recv() => { if let Some(actor_op) = actor_op { @@ -515,7 +519,10 @@ impl LocalBarrierWorker { } } - fn handle_barrier_event(&mut self, event: LocalBarrierEvent) { + fn handle_barrier_event( + &mut self, + event: LocalBarrierEvent, + ) -> Result<(), (ActorId, StreamError)> { match event { LocalBarrierEvent::ReportActorCollected { actor_id, epoch } => { self.collect(actor_id, epoch) @@ -539,11 +546,14 @@ impl LocalBarrierWorker { actor_id, barrier_sender, } => { - self.state.register_barrier_sender(actor_id, barrier_sender); + self.state + .register_barrier_sender(actor_id, barrier_sender) + .map_err(|e| (actor_id, e))?; } #[cfg(test)] LocalBarrierEvent::Flush(sender) => sender.send(()).unwrap(), } + Ok(()) } fn handle_actor_op(&mut self, actor_op: LocalActorOperation) { @@ -768,7 +778,12 @@ impl LocalBarrierWorker { /// When a actor exit unexpectedly, the error is reported using this function. The control stream /// will be reset and the meta service will then trigger recovery. - async fn notify_actor_failure(&mut self, actor_id: ActorId, err: StreamError) { + async fn notify_actor_failure( + &mut self, + actor_id: ActorId, + err: StreamError, + err_context: &'static str, + ) { self.add_failure(actor_id, err.clone()); let root_err = self.try_find_root_failure().await.unwrap(); // always `Some` because we just added one @@ -777,7 +792,7 @@ impl LocalBarrierWorker { { self.control_stream_handle.reset_stream_with_err( anyhow!(root_err) - .context("failed to collect barrier") + .context(err_context) .to_status_unnamed(Code::Internal), ); } diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index de3ca4c54dd28..2bc7fbf88566d 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -509,16 +509,23 @@ impl InflightActorState { } } - pub(super) fn register_barrier_sender(&mut self, tx: mpsc::UnboundedSender) { + pub(super) fn register_barrier_sender( + &mut self, + tx: mpsc::UnboundedSender, + ) -> StreamResult<()> { match &self.status { InflightActorStatus::NotStarted => { self.barrier_senders.push(tx); } InflightActorStatus::IssuedFirst(pending_barriers) => { for barrier in pending_barriers { - // ignore the send err and register the tx anyway. - // failed tx will trigger failure when handling the `InjectBarrier`. - let _ = tx.send(barrier.clone()); + tx.send(barrier.clone()).map_err(|_| { + StreamError::barrier_send( + barrier.clone(), + self.actor_id, + "failed to send pending barriers to newly registered sender", + ) + })?; } self.barrier_senders.push(tx); } @@ -526,6 +533,7 @@ impl InflightActorState { unreachable!("should not register barrier sender when entering Running status") } } + Ok(()) } } @@ -546,11 +554,11 @@ impl ManagedBarrierState { &mut self, actor_id: ActorId, tx: mpsc::UnboundedSender, - ) { + ) -> StreamResult<()> { self.actor_states .entry(actor_id) .or_insert_with(|| InflightActorState::not_started(actor_id)) - .register_barrier_sender(tx); + .register_barrier_sender(tx) } pub(super) fn transform_to_issued( diff --git a/src/stream/src/task/barrier_manager/tests.rs b/src/stream/src/task/barrier_manager/tests.rs index 6d5a5cc0e666f..0fb71e5240431 100644 --- a/src/stream/src/task/barrier_manager/tests.rs +++ b/src/stream/src/task/barrier_manager/tests.rs @@ -19,7 +19,7 @@ use std::task::Poll; use futures::future::join_all; use futures::FutureExt; -use risingwave_common::util::epoch::test_epoch; +use risingwave_common::util::epoch::{test_epoch, EpochExt}; use super::*; use crate::task::barrier_test_utils::LocalBarrierTestEnv; @@ -162,3 +162,93 @@ async fn test_managed_barrier_collection_separately() -> StreamResult<()> { Ok(()) } + +#[tokio::test] +async fn test_late_register_barrier_sender() -> StreamResult<()> { + let mut test_env = LocalBarrierTestEnv::for_test().await; + + let manager = &test_env.shared_context.local_barrier_manager; + + let register_sender = |actor_id: u32| { + let (barrier_tx, barrier_rx) = unbounded_channel(); + test_env + .shared_context + .local_barrier_manager + .register_sender(actor_id, barrier_tx); + (actor_id, barrier_rx) + }; + + let actor_ids_to_send = vec![233, 234, 235]; + let extra_actor_id = 666; + let actor_ids_to_collect = actor_ids_to_send + .iter() + .cloned() + .chain(once(extra_actor_id)) + .collect_vec(); + + // Register actors + let count = actor_ids_to_send.len(); + + // Prepare the barrier + let epoch1 = test_epoch(2); + let barrier1 = Barrier::new_test_barrier(epoch1); + + let epoch2 = epoch1.next_epoch(); + let barrier2 = Barrier::new_test_barrier(epoch2).with_stop(); + + test_env.inject_barrier(&barrier1, actor_ids_to_collect.clone()); + test_env.inject_barrier(&barrier2, actor_ids_to_collect.clone()); + + // register sender after inject barrier + let mut rxs = actor_ids_to_send + .clone() + .into_iter() + .map(register_sender) + .collect_vec(); + + // Collect barriers from actors + let collected_barriers = join_all(rxs.iter_mut().map(|(actor_id, rx)| async move { + let barrier1 = rx.recv().await.unwrap(); + assert_eq!(barrier1.epoch.curr, epoch1); + let barrier2 = rx.recv().await.unwrap(); + assert_eq!(barrier2.epoch.curr, epoch2); + manager.collect(*actor_id, &barrier1); + (*actor_id, barrier2) + })) + .await; + + // Collect a barrier before sending + manager.collect(extra_actor_id, &barrier1); + + let resp = test_env.response_rx.recv().await.unwrap().unwrap(); + match resp.response.unwrap() { + streaming_control_stream_response::Response::CompleteBarrier(complete_barrier) => { + assert_eq!(complete_barrier.epoch, barrier1.epoch.prev); + } + _ => unreachable!(), + } + + manager.collect(extra_actor_id, &barrier2); + + let mut await_epoch_future = pin!(test_env.response_rx.recv().map(|result| { + let resp: StreamingControlStreamResponse = result.unwrap().unwrap(); + let resp = resp.response.unwrap(); + match resp { + streaming_control_stream_response::Response::CompleteBarrier(complete_barrier) => { + assert_eq!(complete_barrier.epoch, barrier2.epoch.prev); + } + _ => unreachable!(), + } + })); + + // Report to local barrier manager + for (i, (actor_id, barrier)) in collected_barriers.into_iter().enumerate() { + manager.collect(actor_id, &barrier); + manager.flush_all_events().await; + let notified = + poll_fn(|cx| Poll::Ready(await_epoch_future.as_mut().poll(cx).is_ready())).await; + assert_eq!(notified, i == count - 1); + } + + Ok(()) +}