diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 0c4874c97a418..ca65a3c45dc55 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -311,23 +311,6 @@ impl LocalBarrierWorker { loop { select! { biased; - (partial_graph_id, completed_epoch) = self.state.next_completed_epoch() => { - let result = self.on_epoch_completed(partial_graph_id, completed_epoch); - if let Err(err) = result { - self.notify_other_failure(err, "failed to complete epoch").await; - } - }, - event = self.barrier_event_rx.recv() => { - // event should not be None because the LocalBarrierManager holds a copy of tx - let result = self.handle_barrier_event(event.expect("should not be none")); - if let Err((actor_id, err)) = result { - self.notify_actor_failure(actor_id, err, "failed to handle barrier event").await; - } - }, - failure = self.actor_failure_rx.recv() => { - let (actor_id, err) = failure.unwrap(); - self.notify_actor_failure(actor_id, err, "recv actor failure").await; - }, actor_op = actor_op_rx.recv() => { if let Some(actor_op) = actor_op { match actor_op { @@ -358,6 +341,23 @@ impl LocalBarrierWorker { break; } }, + (partial_graph_id, completed_epoch) = self.state.next_completed_epoch() => { + let result = self.on_epoch_completed(partial_graph_id, completed_epoch); + if let Err(err) = result { + self.notify_other_failure(err, "failed to complete epoch").await; + } + }, + event = self.barrier_event_rx.recv() => { + // event should not be None because the LocalBarrierManager holds a copy of tx + let result = self.handle_barrier_event(event.expect("should not be none")); + if let Err((actor_id, err)) = result { + self.notify_actor_failure(actor_id, err, "failed to handle barrier event").await; + } + }, + failure = self.actor_failure_rx.recv() => { + let (actor_id, err) = failure.unwrap(); + self.notify_actor_failure(actor_id, err, "recv actor failure").await; + }, request = self.control_stream_handle.next_request() => { let result = self.handle_streaming_control_request(request); if let Err(err) = result {