diff --git a/proto/stream_service.proto b/proto/stream_service.proto index fa6fe1eca0a5..663b7aa5bae5 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -108,9 +108,7 @@ message StreamingControlStreamRequest { message StreamingControlStreamResponse { message InitResponse {} - message ShutdownResponse { - bool has_running_actor = 1; - } + message ShutdownResponse {} oneof response { InitResponse init = 1; diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 2593fb264db3..f3f56db1a9a8 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -210,25 +210,13 @@ impl ControlStreamManager { .expect("should exist when get collect resp"); break Ok((worker_id, command.prev_epoch.value().0, resp)); } - Response::Shutdown(resp) => { - let node = self + Response::Shutdown(_) => { + let _ = self .nodes .remove(&worker_id) .expect("should exist when get shutdown resp"); - let has_inflight_barrier = !node.inflight_barriers.is_empty(); - - // Trigger recovery only if there are actors running. - // - // We need "or" here because... - // - even if there's no running actor, it's possible that new actors will be created - // soon because of upcoming inflight barriers; - // - even if there's no inflight barrier, it's possible that the next periodic barrier - // has not been produced yet. - if resp.has_running_actor || has_inflight_barrier { - break Err(anyhow!("worker node {worker_id} is shutting down").into()); - } else { - info!(node = ?node.worker, "idle worker node is shutting down, no need to recover"); - } + // TODO: if there's no actor running on the node, we can ignore and not trigger recovery. + break Err(anyhow!("worker node {worker_id} is shutting down").into()); } Response::Init(_) => { // This arm should be unreachable. diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index ae7be0d2f5f9..37cf7c697354 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -127,38 +127,31 @@ impl ControlStreamHandle { } /// Send `Shutdown` message to the control stream and wait for the stream to be closed - /// by the meta service. Notify the caller through `shutdown_sender` once it's done. - fn shutdown_stream(&mut self, has_running_actor: bool, shutdown_sender: oneshot::Sender<()>) { - if has_running_actor { - tracing::warn!("shutdown with running actors, scaling or migration will be triggered"); - } - + /// by the meta service. + async fn shutdown_stream(&mut self) { if let Some((sender, _)) = self.pair.take() { if sender .send(Ok(StreamingControlStreamResponse { response: Some(streaming_control_stream_response::Response::Shutdown( - ShutdownResponse { has_running_actor }, + ShutdownResponse::default(), )), })) .is_err() { warn!("failed to notify shutdown of control stream"); } else { - tokio::spawn(async move { - tracing::info!("waiting for meta service to close control stream..."); - - // Wait for the stream to be closed, to ensure that the `Shutdown` message has - // been acknowledged by the meta service for more precise error report. - // - // This is because the meta service will reset the control stream manager and - // drop the connection to us upon recovery. As a result, the receiver part of - // this sender will also be dropped, causing the stream to close. - sender.closed().await; - - // Notify the caller that the shutdown is done. - let _ = shutdown_sender.send(()); - }); + tracing::info!("waiting for meta service to close control stream..."); + + // Wait for the stream to be closed, to ensure that the `Shutdown` message has + // been acknowledged by the meta service for more precise error report. + // + // This is because the meta service will reset the control stream manager and + // drop the connection to us upon recovery. As a result, the receiver part of + // this sender will also be dropped, causing the stream to close. + sender.closed().await; } + } else { + debug!("control stream has been reset, ignore shutdown"); } } @@ -245,6 +238,7 @@ pub(super) enum LocalBarrierEvent { Flush(oneshot::Sender<()>), } +#[derive(strum_macros::Display)] pub(super) enum LocalActorOperation { NewControlStream { handle: ControlStreamHandle, @@ -492,6 +486,15 @@ impl LocalBarrierWorker { response: Some(streaming_control_stream_response::Response::Init(InitResponse {})) }); } + LocalActorOperation::Shutdown { result_sender } => { + if !self.actor_manager_state.handles.is_empty() { + tracing::warn!( + "shutdown with running actors, scaling or migration will be triggered" + ); + } + self.control_stream_handle.shutdown_stream().await; + let _ = result_sender.send(()); + } actor_op => { self.handle_actor_op(actor_op); } @@ -577,8 +580,8 @@ impl LocalBarrierWorker { fn handle_actor_op(&mut self, actor_op: LocalActorOperation) { match actor_op { - LocalActorOperation::NewControlStream { .. } => { - unreachable!("NewControlStream event should be handled separately in async context") + LocalActorOperation::NewControlStream { .. } | LocalActorOperation::Shutdown { .. } => { + unreachable!("event {actor_op} should be handled separately in async context") } LocalActorOperation::DropActors { actors, @@ -624,10 +627,6 @@ impl LocalBarrierWorker { let debug_info = self.to_debug_info(); let _ = result_sender.send(debug_info.to_string()); } - LocalActorOperation::Shutdown { result_sender } => { - self.control_stream_handle - .shutdown_stream(!self.actor_manager_state.handles.is_empty(), result_sender); - } } } }