Skip to content

Commit

Permalink
add test and refine
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Aug 19, 2024
1 parent 13e6fc0 commit e75e330
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 13 deletions.
27 changes: 21 additions & 6 deletions src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,11 +425,15 @@ impl LocalBarrierWorker {
}
},
event = self.barrier_event_rx.recv() => {
self.handle_barrier_event(event.expect("should not be none"));
// event should not be None because the LocalBarrierManager holds a copy of tx
let result = self.handle_barrier_event(event.expect("should not be none"));
if let Err((actor_id, err)) = result {
self.notify_actor_failure(actor_id, err, "failed to handle barrier event").await;
}
},
failure = self.actor_failure_rx.recv() => {
let (actor_id, err) = failure.unwrap();
self.notify_actor_failure(actor_id, err).await;
self.notify_actor_failure(actor_id, err, "recv actor failure").await;
},
actor_op = actor_op_rx.recv() => {
if let Some(actor_op) = actor_op {
Expand Down Expand Up @@ -515,7 +519,10 @@ impl LocalBarrierWorker {
}
}

fn handle_barrier_event(&mut self, event: LocalBarrierEvent) {
fn handle_barrier_event(
&mut self,
event: LocalBarrierEvent,
) -> Result<(), (ActorId, StreamError)> {
match event {
LocalBarrierEvent::ReportActorCollected { actor_id, epoch } => {
self.collect(actor_id, epoch)
Expand All @@ -539,11 +546,14 @@ impl LocalBarrierWorker {
actor_id,
barrier_sender,
} => {
self.state.register_barrier_sender(actor_id, barrier_sender);
self.state
.register_barrier_sender(actor_id, barrier_sender)
.map_err(|e| (actor_id, e))?;
}
#[cfg(test)]
LocalBarrierEvent::Flush(sender) => sender.send(()).unwrap(),
}
Ok(())
}

fn handle_actor_op(&mut self, actor_op: LocalActorOperation) {
Expand Down Expand Up @@ -768,7 +778,12 @@ impl LocalBarrierWorker {

/// When a actor exit unexpectedly, the error is reported using this function. The control stream
/// will be reset and the meta service will then trigger recovery.
async fn notify_actor_failure(&mut self, actor_id: ActorId, err: StreamError) {
async fn notify_actor_failure(
&mut self,
actor_id: ActorId,
err: StreamError,
err_context: &'static str,
) {
self.add_failure(actor_id, err.clone());
let root_err = self.try_find_root_failure().await.unwrap(); // always `Some` because we just added one

Expand All @@ -777,7 +792,7 @@ impl LocalBarrierWorker {
{
self.control_stream_handle.reset_stream_with_err(
anyhow!(root_err)
.context("failed to collect barrier")
.context(err_context)
.to_status_unnamed(Code::Internal),
);
}
Expand Down
20 changes: 14 additions & 6 deletions src/stream/src/task/barrier_manager/managed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,23 +509,31 @@ impl InflightActorState {
}
}

pub(super) fn register_barrier_sender(&mut self, tx: mpsc::UnboundedSender<Barrier>) {
pub(super) fn register_barrier_sender(
&mut self,
tx: mpsc::UnboundedSender<Barrier>,
) -> StreamResult<()> {
match &self.status {
InflightActorStatus::NotStarted => {
self.barrier_senders.push(tx);
}
InflightActorStatus::IssuedFirst(pending_barriers) => {
for barrier in pending_barriers {
// ignore the send err and register the tx anyway.
// failed tx will trigger failure when handling the `InjectBarrier`.
let _ = tx.send(barrier.clone());
tx.send(barrier.clone()).map_err(|_| {
StreamError::barrier_send(
barrier.clone(),
self.actor_id,
"failed to send pending barriers to newly registered sender",
)
})?;
}
self.barrier_senders.push(tx);
}
InflightActorStatus::Running(_) => {
unreachable!("should not register barrier sender when entering Running status")
}
}
Ok(())
}
}

Expand All @@ -546,11 +554,11 @@ impl ManagedBarrierState {
&mut self,
actor_id: ActorId,
tx: mpsc::UnboundedSender<Barrier>,
) {
) -> StreamResult<()> {
self.actor_states
.entry(actor_id)
.or_insert_with(|| InflightActorState::not_started(actor_id))
.register_barrier_sender(tx);
.register_barrier_sender(tx)
}

pub(super) fn transform_to_issued(
Expand Down
92 changes: 91 additions & 1 deletion src/stream/src/task/barrier_manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::task::Poll;

use futures::future::join_all;
use futures::FutureExt;
use risingwave_common::util::epoch::test_epoch;
use risingwave_common::util::epoch::{test_epoch, EpochExt};

use super::*;
use crate::task::barrier_test_utils::LocalBarrierTestEnv;
Expand Down Expand Up @@ -162,3 +162,93 @@ async fn test_managed_barrier_collection_separately() -> StreamResult<()> {

Ok(())
}

#[tokio::test]
async fn test_late_register_barrier_sender() -> StreamResult<()> {
let mut test_env = LocalBarrierTestEnv::for_test().await;

let manager = &test_env.shared_context.local_barrier_manager;

let register_sender = |actor_id: u32| {
let (barrier_tx, barrier_rx) = unbounded_channel();
test_env
.shared_context
.local_barrier_manager
.register_sender(actor_id, barrier_tx);
(actor_id, barrier_rx)
};

let actor_ids_to_send = vec![233, 234, 235];
let extra_actor_id = 666;
let actor_ids_to_collect = actor_ids_to_send
.iter()
.cloned()
.chain(once(extra_actor_id))
.collect_vec();

// Register actors
let count = actor_ids_to_send.len();

// Prepare the barrier
let epoch1 = test_epoch(2);
let barrier1 = Barrier::new_test_barrier(epoch1);

let epoch2 = epoch1.next_epoch();
let barrier2 = Barrier::new_test_barrier(epoch2).with_stop();

test_env.inject_barrier(&barrier1, actor_ids_to_collect.clone());
test_env.inject_barrier(&barrier2, actor_ids_to_collect.clone());

// register sender after inject barrier
let mut rxs = actor_ids_to_send
.clone()
.into_iter()
.map(register_sender)
.collect_vec();

// Collect barriers from actors
let collected_barriers = join_all(rxs.iter_mut().map(|(actor_id, rx)| async move {
let barrier1 = rx.recv().await.unwrap();
assert_eq!(barrier1.epoch.curr, epoch1);
let barrier2 = rx.recv().await.unwrap();
assert_eq!(barrier2.epoch.curr, epoch2);
manager.collect(*actor_id, &barrier1);
(*actor_id, barrier2)
}))
.await;

// Collect a barrier before sending
manager.collect(extra_actor_id, &barrier1);

let resp = test_env.response_rx.recv().await.unwrap().unwrap();
match resp.response.unwrap() {
streaming_control_stream_response::Response::CompleteBarrier(complete_barrier) => {
assert_eq!(complete_barrier.epoch, barrier1.epoch.prev);
}
_ => unreachable!(),
}

manager.collect(extra_actor_id, &barrier2);

let mut await_epoch_future = pin!(test_env.response_rx.recv().map(|result| {
let resp: StreamingControlStreamResponse = result.unwrap().unwrap();
let resp = resp.response.unwrap();
match resp {
streaming_control_stream_response::Response::CompleteBarrier(complete_barrier) => {
assert_eq!(complete_barrier.epoch, barrier2.epoch.prev);
}
_ => unreachable!(),
}
}));

// Report to local barrier manager
for (i, (actor_id, barrier)) in collected_barriers.into_iter().enumerate() {
manager.collect(actor_id, &barrier);
manager.flush_all_events().await;
let notified =
poll_fn(|cx| Poll::Ready(await_epoch_future.as_mut().poll(cx).is_ready())).await;
assert_eq!(notified, i == count - 1);
}

Ok(())
}

0 comments on commit e75e330

Please sign in to comment.