From 3a5f445521b94fe98620d292602710ae385bb034 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 11 Jul 2024 17:06:42 +0800 Subject: [PATCH 01/11] Revert "revert changes not relavent to standalone" This reverts commit 3142219c0411713cf37ff1cdacf2c8ee802674da. --- .../common_service/src/observer_manager.rs | 2 +- src/compute/src/server.rs | 4 +-- src/meta/node/src/server.rs | 27 ++++++++++++++++--- src/meta/src/barrier/info.rs | 23 +++++++++++++--- src/meta/src/manager/metadata.rs | 27 +++++++++++++++++++ src/storage/src/hummock/event_handler/mod.rs | 2 +- .../src/hummock/store/hummock_storage.rs | 3 +-- 7 files changed, 74 insertions(+), 14 deletions(-) diff --git a/src/common/common_service/src/observer_manager.rs b/src/common/common_service/src/observer_manager.rs index bf7e457be8b1c..da61118f1ad49 100644 --- a/src/common/common_service/src/observer_manager.rs +++ b/src/common/common_service/src/observer_manager.rs @@ -157,7 +157,7 @@ where match self.rx.message().await { Ok(resp) => { if resp.is_none() { - tracing::error!("Stream of notification terminated."); + tracing::warn!("Stream of notification terminated."); self.re_subscribe().await; continue; } diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 530810ef6a69a..c7ecc9e0c908e 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -469,8 +469,8 @@ pub async fn compute_node_serve( // Wait for the shutdown signal. shutdown.cancelled().await; - // TODO(shutdown): gracefully unregister from the meta service (need to cautious since it may - // trigger auto-scaling) + // TODO(shutdown): how does this interact with auto-scaling? + meta_client.try_unregister().await; // NOTE(shutdown): We can't simply join the tonic server here because it only returns when all // existing connections are closed, while we have long-running streaming calls that never diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index cb59e14479ac0..d78652422fc8f 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -719,10 +719,11 @@ pub async fn start_service_as_election_leader( } } + let idle_shutdown = CancellationToken::new(); let _idle_checker_handle = IdleManager::start_idle_checker( env.idle_manager_ref(), Duration::from_secs(30), - shutdown.clone(), + idle_shutdown.clone(), ); let (abort_sender, abort_recv) = tokio::sync::oneshot::channel(); @@ -771,6 +772,7 @@ pub async fn start_service_as_election_leader( risingwave_pb::meta::event_log::Event::MetaNodeStart(event), ]); + let server_shutdown = CancellationToken::new(); let server = tonic::transport::Server::builder() .layer(MetricsMiddlewareLayer::new(meta_metrics)) .layer(TracingExtractLayer::new()) @@ -802,14 +804,31 @@ pub async fn start_service_as_election_leader( tcp_nodelay: true, keepalive_duration: None, }, - shutdown.clone().cancelled_owned(), + server_shutdown.clone().cancelled_owned(), ); started::set(); let _server_handle = tokio::spawn(server); // Wait for the shutdown signal. - shutdown.cancelled().await; - // TODO(shutdown): may warn user if there's any other node still running in the cluster. + tokio::select! { + // Idle manager informs to shutdown. Do nothing else but directly return. + _ = idle_shutdown.cancelled() => {} + + // External shutdown signal. + _ = shutdown.cancelled() => { + // Wait for all other workers to shutdown for gracefulness. + if election_client.is_leader() { + let res = metadata_manager.wait_till_all_worker_nodes_exit().await; + if let Err(e) = res { + tracing::error!( + error = %e.as_report(), + "error occurs while waiting for all worker nodes to exit, directly shutdown", + ); + } + } + server_shutdown.cancel(); + } + } // TODO(shutdown): do we have any other shutdown tasks? Ok(()) } diff --git a/src/meta/src/barrier/info.rs b/src/meta/src/barrier/info.rs index a6bdcdace6e54..645d15e83a7e0 100644 --- a/src/meta/src/barrier/info.rs +++ b/src/meta/src/barrier/info.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use risingwave_common::catalog::TableId; use risingwave_pb::common::PbWorkerNode; @@ -137,11 +137,26 @@ impl InflightActorInfo { .into_iter() .map(|node| (node.id, node)) .collect::>(); - for (actor_id, location) in &self.actor_location_map { - if !new_node_map.contains_key(location) { - warn!(actor_id, location, node = ?self.node_map.get(location), "node with running actors is deleted"); + + let mut deleted_actors = BTreeMap::new(); + for (&actor_id, &location) in &self.actor_location_map { + if !new_node_map.contains_key(&location) { + deleted_actors + .entry(location) + .or_insert_with(BTreeSet::new) + .insert(actor_id); } } + for (node_id, actors) in deleted_actors { + let node = self.node_map.get(&node_id); + warn!( + node_id, + ?node, + ?actors, + "node with running actors is deleted" + ); + } + self.node_map = new_node_map; } diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index cb90f2326d20d..82650cd8f6c35 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -328,6 +328,33 @@ impl MetadataManager { } } + /// Wait until all worker nodes (except meta nodes) exit. Used for graceful shutdown. + pub async fn wait_till_all_worker_nodes_exit(&self) -> MetaResult<()> { + let mut interval = tokio::time::interval(Duration::from_secs(1)); + let mut last_remaining = 0; + + loop { + interval.tick().await; + + let remaining = self + .list_worker_node(None, Some(State::Running)) + .await? + .into_iter() + .filter(|w| !matches!(w.r#type(), WorkerType::Meta)) // filter out meta node + .count(); + + if remaining == 0 { + tracing::info!("all worker nodes exited"); + return Ok(()); + } + + if remaining != last_remaining { + last_remaining = remaining; + warn!(remaining, "waiting for all worker nodes to exit..."); + } + } + } + pub async fn subscribe_active_streaming_compute_nodes( &self, ) -> MetaResult<(Vec, UnboundedReceiver)> { diff --git a/src/storage/src/hummock/event_handler/mod.rs b/src/storage/src/hummock/event_handler/mod.rs index 74a69bfa71947..39e9d3ecc920a 100644 --- a/src/storage/src/hummock/event_handler/mod.rs +++ b/src/storage/src/hummock/event_handler/mod.rs @@ -217,7 +217,7 @@ impl Drop for LocalInstanceGuard { instance_id: self.instance_id, }) .unwrap_or_else(|err| { - tracing::error!( + tracing::debug!( error = %err.as_report(), table_id = %self.table_id, instance_id = self.instance_id, diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 511d9dd33814d..2ce0fed8984a6 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -37,7 +37,6 @@ use risingwave_rpc_client::HummockMetaClient; use thiserror_ext::AsReport; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use tokio::sync::oneshot; -use tracing::error; use super::local_hummock_storage::LocalHummockStorage; use super::version::{read_filter_for_version, CommittedVersion, HummockVersionReader}; @@ -75,7 +74,7 @@ impl Drop for HummockStorageShutdownGuard { let _ = self .shutdown_sender .send(HummockEvent::Shutdown) - .inspect_err(|e| error!(event = ?e.0, "unable to send shutdown")); + .inspect_err(|e| tracing::warn!(event = ?e.0, "unable to send shutdown")); } } From b6ba7cc19048e546278a4f5207bcf884b8164b77 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 23 Jul 2024 15:58:18 +0800 Subject: [PATCH 02/11] only maintain worker cache for streaming compute nodes in scale manager --- src/meta/src/stream/scale.rs | 24 ++++++++++++++----- .../src/hummock/store/hummock_storage.rs | 2 +- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 7d8f8049d5c4d..8d0452d48b158 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -2618,15 +2618,24 @@ impl GlobalStreamManager { notification = local_notification_rx.recv() => { let notification = notification.expect("local notification channel closed in loop of stream manager"); + // Only maintain the cache for streaming compute nodes. + let worker_is_streaming_compute = |worker: &WorkerNode| { + worker.get_type() == Ok(WorkerType::ComputeNode) + && worker + .property + .as_ref() + .map(|p| p.is_streaming) + .unwrap_or(false) + }; + match notification { LocalNotification::WorkerNodeActivated(worker) => { - match (worker.get_type(), worker.property.as_ref()) { - (Ok(WorkerType::ComputeNode), Some(prop)) if prop.is_streaming => { - tracing::info!("worker {} activated notification received", worker.id); - } - _ => continue + if !worker_is_streaming_compute(&worker) { + continue; } + tracing::info!("worker {} activated notification received", worker.id); + let prev_worker = worker_cache.insert(worker.id, worker.clone()); match prev_worker { @@ -2645,11 +2654,14 @@ impl GlobalStreamManager { // Since our logic for handling passive scale-in is within the barrier manager, // there’s not much we can do here. All we can do is proactively remove the entries from our cache. LocalNotification::WorkerNodeDeleted(worker) => { + if !worker_is_streaming_compute(&worker) { + continue; + } + match worker_cache.remove(&worker.id) { Some(prev_worker) => { tracing::info!(worker = prev_worker.id, "worker removed from stream manager cache"); } - None => { tracing::warn!(worker = worker.id, "worker not found in stream manager cache, but it was removed"); } diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 2ce0fed8984a6..13d166adc8530 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -74,7 +74,7 @@ impl Drop for HummockStorageShutdownGuard { let _ = self .shutdown_sender .send(HummockEvent::Shutdown) - .inspect_err(|e| tracing::warn!(event = ?e.0, "unable to send shutdown")); + .inspect_err(|e| tracing::debug!(event = ?e.0, "unable to send shutdown")); } } From b55b5d262c3ca0379de551e1d57f8da0d03b0f54 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 23 Jul 2024 17:28:44 +0800 Subject: [PATCH 03/11] shutdown stream manager --- src/compute/src/server.rs | 4 ++- src/meta/src/barrier/mod.rs | 30 ++++++++------------ src/meta/src/barrier/rpc.rs | 27 +++++++++++------- src/stream/src/error.rs | 5 ++++ src/stream/src/task/barrier_manager.rs | 39 ++++++++++++++++++++++++-- src/stream/src/task/stream_manager.rs | 6 ++++ 6 files changed, 79 insertions(+), 32 deletions(-) diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index c7ecc9e0c908e..e8aeef52649e7 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -405,7 +405,7 @@ pub async fn compute_node_serve( meta_cache, block_cache, ); - let config_srv = ConfigServiceImpl::new(batch_mgr, stream_mgr); + let config_srv = ConfigServiceImpl::new(batch_mgr, stream_mgr.clone()); let health_srv = HealthServiceImpl::new(); let telemetry_manager = TelemetryManager::new( @@ -472,6 +472,8 @@ pub async fn compute_node_serve( // TODO(shutdown): how does this interact with auto-scaling? meta_client.try_unregister().await; + let _ = stream_mgr.shutdown().await; + // NOTE(shutdown): We can't simply join the tonic server here because it only returns when all // existing connections are closed, while we have long-running streaming calls that never // close. From the other side, there's also no need to gracefully shutdown them if we have diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 50f8ab52b5fca..a1769a2bcd64d 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -985,24 +985,18 @@ impl GlobalBarrierManagerContext { } fn report_complete_event(&self, duration_sec: f64, command_ctx: &CommandContext) { - { - { - { - // Record barrier latency in event log. - use risingwave_pb::meta::event_log; - let event = event_log::EventBarrierComplete { - prev_epoch: command_ctx.prev_epoch.value().0, - cur_epoch: command_ctx.curr_epoch.value().0, - duration_sec, - command: command_ctx.command.to_string(), - barrier_kind: command_ctx.kind.as_str_name().to_string(), - }; - self.env - .event_log_manager_ref() - .add_event_logs(vec![event_log::Event::BarrierComplete(event)]); - } - } - } + // Record barrier latency in event log. + use risingwave_pb::meta::event_log; + let event = event_log::EventBarrierComplete { + prev_epoch: command_ctx.prev_epoch.value().0, + cur_epoch: command_ctx.curr_epoch.value().0, + duration_sec, + command: command_ctx.command.to_string(), + barrier_kind: command_ctx.kind.as_str_name().to_string(), + }; + self.env + .event_log_manager_ref() + .add_event_logs(vec![event_log::Event::BarrierComplete(event)]); } } diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index dd30a3fe9e00d..1d67b236eee26 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -207,16 +207,22 @@ impl ControlStreamManager { .expect("should exist when get collect resp"); // Note: No need to use `?` as the backtrace is from meta and not useful. warn!(node = ?node.worker, err = %err.as_report(), "get error from response stream"); + + // TODO: this future can be cancelled during collection, so may not work as expected. + let errors = self.collect_errors(node.worker.id, err).await; + let err = merge_node_rpc_errors("get error from control stream", errors); + if let Some(command) = node.inflight_barriers.pop_front() { - let errors = self.collect_errors(node.worker.id, err).await; - let err = merge_node_rpc_errors("get error from control stream", errors); - self.context.report_collect_failure(&command, &err); - break Err(err); + self.context.report_collect_failure( + command.prev_epoch.value().0, + command.curr_epoch.value().0, + &err, + ); } else { - // for node with no inflight barrier, simply ignore the error - info!(node = ?node.worker, "no inflight barrier no node. Ignore error"); - continue; + self.context.report_collect_failure(0, 0, &err); } + + break Err(err); } } } @@ -239,6 +245,7 @@ impl ControlStreamManager { }) .await; } + tracing::debug!(?errors, "collected stream errors"); errors } } @@ -366,12 +373,12 @@ impl GlobalBarrierManagerContext { } /// Send barrier-complete-rpc and wait for responses from all CNs - fn report_collect_failure(&self, command_context: &CommandContext, error: &MetaError) { + fn report_collect_failure(&self, prev_epoch: u64, curr_epoch: u64, error: &MetaError) { // Record failure in event log. use risingwave_pb::meta::event_log; let event = event_log::EventCollectBarrierFail { - prev_epoch: command_context.prev_epoch.value().0, - cur_epoch: command_context.curr_epoch.value().0, + prev_epoch, + cur_epoch: curr_epoch, error: error.to_report_string(), }; self.env diff --git a/src/stream/src/error.rs b/src/stream/src/error.rs index 42b3e92e4e043..23adf38bb9830 100644 --- a/src/stream/src/error.rs +++ b/src/stream/src/error.rs @@ -86,18 +86,23 @@ pub enum ErrorKind { actor_id: ActorId, reason: &'static str, }, + #[error("Secret error: {0}")] Secret( #[from] #[backtrace] SecretError, ), + #[error(transparent)] Uncategorized( #[from] #[backtrace] anyhow::Error, ), + + #[error("the compute node is asked to shutdown")] + Shutdown, } impl From for StreamError { diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 167997fae3400..a9bc9030e2f84 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -119,7 +119,27 @@ impl ControlStreamHandle { let err = err.into_inner(); if sender.send(Err(err)).is_err() { - warn!("failed to notify finish of control stream"); + warn!("failed to notify reset of control stream"); + } + } + } + + async fn shutdown_stream(&mut self) { + let err = + ScoredStreamError::new(StreamError::shutdown()).to_status_unnamed(Code::Cancelled); + + if let Some((sender, _)) = self.pair.take() { + if sender.send(Err(err)).is_err() { + warn!("failed to notify shutdown of control stream"); + } else { + // We should always unregister the current node from the meta service before + // calling this method. So upon next recovery, the meta service will not involve + // us anymore and drop the connection to us. + // + // We detect this by waiting the stream to be closed. This is to ensure that the + // `Shutdown` error has been acknowledged by the meta service, for more precise + // error report. + sender.closed().await; } } } @@ -243,6 +263,9 @@ pub(super) enum LocalActorOperation { InspectState { result_sender: oneshot::Sender, }, + Shutdown { + result_sender: oneshot::Sender<()>, + }, } pub(super) struct CreateActorOutput { @@ -452,7 +475,7 @@ impl LocalBarrierWorker { }); } actor_op => { - self.handle_actor_op(actor_op); + self.handle_actor_op(actor_op).await; } } } @@ -534,7 +557,7 @@ impl LocalBarrierWorker { } } - fn handle_actor_op(&mut self, actor_op: LocalActorOperation) { + async fn handle_actor_op(&mut self, actor_op: LocalActorOperation) { match actor_op { LocalActorOperation::NewControlStream { .. } => { unreachable!("NewControlStream event should be handled separately in async context") @@ -583,6 +606,13 @@ impl LocalBarrierWorker { let debug_info = self.to_debug_info(); let _ = result_sender.send(debug_info.to_string()); } + LocalActorOperation::Shutdown { result_sender } => { + if !self.actor_manager_state.handles.is_empty() { + tracing::warn!("shutdown with running actors"); + } + self.control_stream_handle.shutdown_stream().await; + let _ = result_sender.send(()); + } } } } @@ -1010,6 +1040,9 @@ impl ScoredStreamError { // `BarrierSend` is likely to be caused by actor exit and not the root cause. ErrorKind::BarrierSend { .. } => 1, + // Asked to shutdown by user, likely to be the root cause. + ErrorKind::Shutdown => 3000, + // Executor errors first. ErrorKind::Executor(ee) => 2000 + stream_executor_error_score(ee), diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index e7105347c9356..a8c1c625a5c37 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -277,6 +277,12 @@ impl LocalStreamManager { }) .await } + + pub async fn shutdown(&self) -> StreamResult<()> { + self.actor_op_tx + .send_and_await(|result_sender| LocalActorOperation::Shutdown { result_sender }) + .await + } } impl LocalBarrierWorker { From f10f303884b53adc58469cd490df71a75f1438ec Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 23 Jul 2024 17:56:10 +0800 Subject: [PATCH 04/11] add todos --- src/meta/src/barrier/rpc.rs | 1 + src/stream/src/task/barrier_manager.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 1d67b236eee26..344cfd2c1406b 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -196,6 +196,7 @@ impl ControlStreamManager { .expect("should exist when get collect resp"); break Ok((worker_id, command.prev_epoch.value().0, resp)); } + // TODO: then directly handle shutdown message here resp => { break Err(anyhow!("get unexpected resp: {:?}", resp).into()); } diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index a9bc9030e2f84..5c0da00dc59b2 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -129,6 +129,7 @@ impl ControlStreamHandle { ScoredStreamError::new(StreamError::shutdown()).to_status_unnamed(Code::Cancelled); if let Some((sender, _)) = self.pair.take() { + // TODO: Shall we send a specific message of shutdown? if sender.send(Err(err)).is_err() { warn!("failed to notify shutdown of control stream"); } else { From 69a752bcedf5217c0c34a3dd8df9055341991d62 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 24 Jul 2024 14:12:36 +0800 Subject: [PATCH 05/11] use shutdown message Signed-off-by: Bugen Zhao --- proto/stream_service.proto | 2 ++ src/meta/src/barrier/rpc.rs | 42 ++++++++++++----------- src/stream/src/error.rs | 3 -- src/stream/src/task/barrier_manager.rs | 47 +++++++++++++++----------- 4 files changed, 51 insertions(+), 43 deletions(-) diff --git a/proto/stream_service.proto b/proto/stream_service.proto index fd97bde853487..663b7aa5bae56 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -108,10 +108,12 @@ message StreamingControlStreamRequest { message StreamingControlStreamResponse { message InitResponse {} + message ShutdownResponse {} oneof response { InitResponse init = 1; BarrierCompleteResponse complete_barrier = 2; + ShutdownResponse shutdown = 3; } } diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 344cfd2c1406b..d5e512f72073b 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -181,11 +181,13 @@ impl ControlStreamManager { pub(super) async fn next_complete_barrier_response( &mut self, ) -> MetaResult<(WorkerId, u64, BarrierCompleteResponse)> { + use streaming_control_stream_response::Response; + loop { let (worker_id, result) = pending_on_none(self.next_response()).await; match result { - Ok(resp) => match resp.response { - Some(streaming_control_stream_response::Response::CompleteBarrier(resp)) => { + Ok(resp) => match resp.response.unwrap() { + Response::CompleteBarrier(resp) => { let node = self .nodes .get_mut(&worker_id) @@ -196,9 +198,12 @@ impl ControlStreamManager { .expect("should exist when get collect resp"); break Ok((worker_id, command.prev_epoch.value().0, resp)); } - // TODO: then directly handle shutdown message here - resp => { - break Err(anyhow!("get unexpected resp: {:?}", resp).into()); + Response::Shutdown(_) => { + break Err(anyhow!("worker node {worker_id} is shutting down").into()); + } + Response::Init(_) => { + // unreachable + break Err(anyhow!("get unexpected init response").into()); } }, Err(err) => { @@ -209,21 +214,18 @@ impl ControlStreamManager { // Note: No need to use `?` as the backtrace is from meta and not useful. warn!(node = ?node.worker, err = %err.as_report(), "get error from response stream"); - // TODO: this future can be cancelled during collection, so may not work as expected. - let errors = self.collect_errors(node.worker.id, err).await; - let err = merge_node_rpc_errors("get error from control stream", errors); - if let Some(command) = node.inflight_barriers.pop_front() { - self.context.report_collect_failure( - command.prev_epoch.value().0, - command.curr_epoch.value().0, - &err, - ); + // FIXME: this future can be cancelled during collection, so the error collection + // might not work as expected. + let errors = self.collect_errors(node.worker.id, err).await; + let err = merge_node_rpc_errors("get error from control stream", errors); + self.context.report_collect_failure(&command, &err); + break Err(err); } else { - self.context.report_collect_failure(0, 0, &err); + // for node with no inflight barrier, simply ignore the error + info!(node = ?node.worker, "no inflight barrier in the node, ignore error"); + continue; } - - break Err(err); } } } @@ -374,12 +376,12 @@ impl GlobalBarrierManagerContext { } /// Send barrier-complete-rpc and wait for responses from all CNs - fn report_collect_failure(&self, prev_epoch: u64, curr_epoch: u64, error: &MetaError) { + fn report_collect_failure(&self, command_context: &CommandContext, error: &MetaError) { // Record failure in event log. use risingwave_pb::meta::event_log; let event = event_log::EventCollectBarrierFail { - prev_epoch, - cur_epoch: curr_epoch, + prev_epoch: command_context.prev_epoch.value().0, + cur_epoch: command_context.curr_epoch.value().0, error: error.to_report_string(), }; self.env diff --git a/src/stream/src/error.rs b/src/stream/src/error.rs index 23adf38bb9830..98424f324965b 100644 --- a/src/stream/src/error.rs +++ b/src/stream/src/error.rs @@ -100,9 +100,6 @@ pub enum ErrorKind { #[backtrace] anyhow::Error, ), - - #[error("the compute node is asked to shutdown")] - Shutdown, } impl From for StreamError { diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 5c0da00dc59b2..97066d5912f47 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -124,23 +124,34 @@ impl ControlStreamHandle { } } - async fn shutdown_stream(&mut self) { - let err = - ScoredStreamError::new(StreamError::shutdown()).to_status_unnamed(Code::Cancelled); - + /// Send `Shutdown` message to the control stream and wait for the stream to be closed + /// by the meta service. Notify the caller through `shutdown_sender` once it's done. + fn shutdown_stream(&mut self, shutdown_sender: oneshot::Sender<()>) { if let Some((sender, _)) = self.pair.take() { // TODO: Shall we send a specific message of shutdown? - if sender.send(Err(err)).is_err() { + if sender + .send(Ok(StreamingControlStreamResponse { + response: Some(streaming_control_stream_response::Response::Shutdown( + Default::default(), + )), + })) + .is_err() + { warn!("failed to notify shutdown of control stream"); } else { - // We should always unregister the current node from the meta service before - // calling this method. So upon next recovery, the meta service will not involve - // us anymore and drop the connection to us. - // - // We detect this by waiting the stream to be closed. This is to ensure that the - // `Shutdown` error has been acknowledged by the meta service, for more precise - // error report. - sender.closed().await; + tokio::spawn(async move { + // We should always unregister the current node from the meta service before + // calling this method. So upon next recovery, the meta service will not involve + // us anymore and drop the connection to us. + // + // We detect this by waiting the stream to be closed. This is to ensure that the + // `Shutdown` error has been acknowledged by the meta service, for more precise + // error report. + sender.closed().await; + + // Notify the caller that the shutdown is done. + let _ = shutdown_sender.send(()); + }); } } } @@ -476,7 +487,7 @@ impl LocalBarrierWorker { }); } actor_op => { - self.handle_actor_op(actor_op).await; + self.handle_actor_op(actor_op); } } } @@ -558,7 +569,7 @@ impl LocalBarrierWorker { } } - async fn handle_actor_op(&mut self, actor_op: LocalActorOperation) { + fn handle_actor_op(&mut self, actor_op: LocalActorOperation) { match actor_op { LocalActorOperation::NewControlStream { .. } => { unreachable!("NewControlStream event should be handled separately in async context") @@ -611,8 +622,7 @@ impl LocalBarrierWorker { if !self.actor_manager_state.handles.is_empty() { tracing::warn!("shutdown with running actors"); } - self.control_stream_handle.shutdown_stream().await; - let _ = result_sender.send(()); + self.control_stream_handle.shutdown_stream(result_sender); } } } @@ -1041,9 +1051,6 @@ impl ScoredStreamError { // `BarrierSend` is likely to be caused by actor exit and not the root cause. ErrorKind::BarrierSend { .. } => 1, - // Asked to shutdown by user, likely to be the root cause. - ErrorKind::Shutdown => 3000, - // Executor errors first. ErrorKind::Executor(ee) => 2000 + stream_executor_error_score(ee), From a46cba26fb4a5dfd3bf4a989d6dfda2bed92bbcb Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 24 Jul 2024 15:03:56 +0800 Subject: [PATCH 06/11] release connection eagerly Signed-off-by: Bugen Zhao --- src/meta/src/barrier/recovery.rs | 4 ++++ src/meta/src/barrier/rpc.rs | 5 +++++ src/stream/src/task/barrier_manager.rs | 2 ++ 3 files changed, 11 insertions(+) diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 159e7b71387d6..7d38346a6beee 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -234,9 +234,12 @@ impl GlobalBarrierManager { .committed_epoch .into(), ); + // Mark blocked and abort buffered schedules, they might be dirty already. self.scheduled_barriers .abort_and_mark_blocked("cluster is under recovering"); + // Clear all control streams to release resources (connections to compute nodes) first. + self.control_stream_manager.clear(); tracing::info!("recovery start!"); let retry_strategy = Self::get_retry_strategy(); @@ -288,6 +291,7 @@ impl GlobalBarrierManager { // Resolve actor info for recovery. If there's no actor to recover, most of the // following steps will be no-op, while the compute nodes will still be reset. // FIXME: Transactions should be used. + // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere. let mut info = if !self.env.opts.disable_automatic_parallelism_control && background_streaming_jobs.is_empty() { diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index d5e512f72073b..e8a576d16b17d 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -167,6 +167,11 @@ impl ControlStreamManager { Ok(()) } + /// Clear all nodes and response streams in the manager. + pub(super) fn clear(&mut self) { + *self = Self::new(self.context.clone()); + } + async fn next_response( &mut self, ) -> Option<(WorkerId, MetaResult)> { diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 97066d5912f47..0d3f4ddf5ef0d 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -140,6 +140,8 @@ impl ControlStreamHandle { warn!("failed to notify shutdown of control stream"); } else { tokio::spawn(async move { + tracing::info!("waiting for meta service to close control stream..."); + // We should always unregister the current node from the meta service before // calling this method. So upon next recovery, the meta service will not involve // us anymore and drop the connection to us. From 29b2992ec8256b1af2da58eecbaa2f8bc3d61d5f Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 24 Jul 2024 15:04:11 +0800 Subject: [PATCH 07/11] do not exit on heartbeat error Signed-off-by: Bugen Zhao --- src/rpc_client/src/meta_client.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 3701e8482f650..f2f86c1dbed0a 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -14,6 +14,8 @@ use std::collections::HashMap; use std::fmt::{Debug, Display}; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering::Relaxed; use std::sync::Arc; use std::thread; use std::time::{Duration, SystemTime}; @@ -115,6 +117,7 @@ pub struct MetaClient { inner: GrpcMetaClient, meta_config: MetaConfig, cluster_id: String, + shutting_down: Arc, } impl MetaClient { @@ -276,6 +279,7 @@ impl MetaClient { inner: grpc_meta_client, meta_config: meta_config.to_owned(), cluster_id: add_worker_resp.cluster_id, + shutting_down: Arc::new(false.into()), }; static REPORT_PANIC: std::sync::Once = std::sync::Once::new(); @@ -322,8 +326,12 @@ impl MetaClient { let resp = self.inner.heartbeat(request).await?; if let Some(status) = resp.status { if status.code() == risingwave_pb::common::status::Code::UnknownWorker { - tracing::error!("worker expired: {}", status.message); - std::process::exit(1); + // Ignore the error if we're shutting down. + // Otherwise, exit the process. + if !self.shutting_down.load(Relaxed) { + tracing::error!(message = status.message, "worker expired"); + std::process::exit(1); + } } } Ok(()) @@ -745,6 +753,7 @@ impl MetaClient { host: Some(self.host_addr.to_protobuf()), }; self.inner.delete_worker_node(request).await?; + self.shutting_down.store(true, Relaxed); Ok(()) } From a0a2c48c1c9ca6a08a7e21c7399a4fbd649481b4 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 24 Jul 2024 15:42:51 +0800 Subject: [PATCH 08/11] refine docs Signed-off-by: Bugen Zhao --- src/compute/src/server.rs | 6 ++++-- src/meta/src/barrier/rpc.rs | 5 +++-- src/meta/src/stream/scale.rs | 8 ++------ src/rpc_client/src/meta_client.rs | 2 +- src/stream/src/task/barrier_manager.rs | 16 ++++++++-------- 5 files changed, 18 insertions(+), 19 deletions(-) diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index e8aeef52649e7..6d9e2eecd2904 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -469,9 +469,11 @@ pub async fn compute_node_serve( // Wait for the shutdown signal. shutdown.cancelled().await; - // TODO(shutdown): how does this interact with auto-scaling? + // Unregister from the meta service, then... + // - batch queries will not be scheduled to this compute node, + // - streaming actors will not be scheduled to this compute node after next recovery. meta_client.try_unregister().await; - + // Shutdown the streaming manager. let _ = stream_mgr.shutdown().await; // NOTE(shutdown): We can't simply join the tonic server here because it only returns when all diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index e8a576d16b17d..bab9dd7235de1 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -204,10 +204,11 @@ impl ControlStreamManager { break Ok((worker_id, command.prev_epoch.value().0, resp)); } Response::Shutdown(_) => { + // Directly return an error to trigger recovery. break Err(anyhow!("worker node {worker_id} is shutting down").into()); } Response::Init(_) => { - // unreachable + // This arm should be unreachable. break Err(anyhow!("get unexpected init response").into()); } }, @@ -228,7 +229,7 @@ impl ControlStreamManager { break Err(err); } else { // for node with no inflight barrier, simply ignore the error - info!(node = ?node.worker, "no inflight barrier in the node, ignore error"); + info!(node = ?node.worker, error = %err.as_report(), "no inflight barrier in the node, ignore error"); continue; } } diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 8d0452d48b158..e91a785e9da93 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -2621,11 +2621,7 @@ impl GlobalStreamManager { // Only maintain the cache for streaming compute nodes. let worker_is_streaming_compute = |worker: &WorkerNode| { worker.get_type() == Ok(WorkerType::ComputeNode) - && worker - .property - .as_ref() - .map(|p| p.is_streaming) - .unwrap_or(false) + && worker.property.as_ref().unwrap().is_streaming }; match notification { @@ -2634,7 +2630,7 @@ impl GlobalStreamManager { continue; } - tracing::info!("worker {} activated notification received", worker.id); + tracing::info!(worker = worker.id, "worker activated notification received"); let prev_worker = worker_cache.insert(worker.id, worker.clone()); diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index f2f86c1dbed0a..10ebe5ce2344a 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -326,7 +326,7 @@ impl MetaClient { let resp = self.inner.heartbeat(request).await?; if let Some(status) = resp.status { if status.code() == risingwave_pb::common::status::Code::UnknownWorker { - // Ignore the error if we're shutting down. + // Ignore the error if we're already shutting down. // Otherwise, exit the process. if !self.shutting_down.load(Relaxed) { tracing::error!(message = status.message, "worker expired"); diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 0d3f4ddf5ef0d..27344d20e5722 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -128,7 +128,6 @@ impl ControlStreamHandle { /// by the meta service. Notify the caller through `shutdown_sender` once it's done. fn shutdown_stream(&mut self, shutdown_sender: oneshot::Sender<()>) { if let Some((sender, _)) = self.pair.take() { - // TODO: Shall we send a specific message of shutdown? if sender .send(Ok(StreamingControlStreamResponse { response: Some(streaming_control_stream_response::Response::Shutdown( @@ -142,13 +141,12 @@ impl ControlStreamHandle { tokio::spawn(async move { tracing::info!("waiting for meta service to close control stream..."); - // We should always unregister the current node from the meta service before - // calling this method. So upon next recovery, the meta service will not involve - // us anymore and drop the connection to us. + // Wait for the stream to be closed, to ensure that the `Shutdown` message has + // been acknowledged by the meta service for more precise error report. // - // We detect this by waiting the stream to be closed. This is to ensure that the - // `Shutdown` error has been acknowledged by the meta service, for more precise - // error report. + // This is because the meta service will reset the control stream manager and + // drop the connection to us upon recovery. As a result, the receiver part of + // this sender will also be dropped, causing the stream to close. sender.closed().await; // Notify the caller that the shutdown is done. @@ -622,7 +620,9 @@ impl LocalBarrierWorker { } LocalActorOperation::Shutdown { result_sender } => { if !self.actor_manager_state.handles.is_empty() { - tracing::warn!("shutdown with running actors"); + tracing::warn!( + "shutdown with running actors, scaling or migration will be triggered" + ); } self.control_stream_handle.shutdown_stream(result_sender); } From edfc335c184acd91f095cd0f93c95aadaf8dcb22 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 24 Jul 2024 15:50:08 +0800 Subject: [PATCH 09/11] revert changes on meta Signed-off-by: Bugen Zhao --- src/meta/node/src/server.rs | 27 ++++----------------------- src/meta/src/manager/metadata.rs | 27 --------------------------- 2 files changed, 4 insertions(+), 50 deletions(-) diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index d78652422fc8f..cb59e14479ac0 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -719,11 +719,10 @@ pub async fn start_service_as_election_leader( } } - let idle_shutdown = CancellationToken::new(); let _idle_checker_handle = IdleManager::start_idle_checker( env.idle_manager_ref(), Duration::from_secs(30), - idle_shutdown.clone(), + shutdown.clone(), ); let (abort_sender, abort_recv) = tokio::sync::oneshot::channel(); @@ -772,7 +771,6 @@ pub async fn start_service_as_election_leader( risingwave_pb::meta::event_log::Event::MetaNodeStart(event), ]); - let server_shutdown = CancellationToken::new(); let server = tonic::transport::Server::builder() .layer(MetricsMiddlewareLayer::new(meta_metrics)) .layer(TracingExtractLayer::new()) @@ -804,31 +802,14 @@ pub async fn start_service_as_election_leader( tcp_nodelay: true, keepalive_duration: None, }, - server_shutdown.clone().cancelled_owned(), + shutdown.clone().cancelled_owned(), ); started::set(); let _server_handle = tokio::spawn(server); // Wait for the shutdown signal. - tokio::select! { - // Idle manager informs to shutdown. Do nothing else but directly return. - _ = idle_shutdown.cancelled() => {} - - // External shutdown signal. - _ = shutdown.cancelled() => { - // Wait for all other workers to shutdown for gracefulness. - if election_client.is_leader() { - let res = metadata_manager.wait_till_all_worker_nodes_exit().await; - if let Err(e) = res { - tracing::error!( - error = %e.as_report(), - "error occurs while waiting for all worker nodes to exit, directly shutdown", - ); - } - } - server_shutdown.cancel(); - } - } + shutdown.cancelled().await; + // TODO(shutdown): may warn user if there's any other node still running in the cluster. // TODO(shutdown): do we have any other shutdown tasks? Ok(()) } diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 82650cd8f6c35..cb90f2326d20d 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -328,33 +328,6 @@ impl MetadataManager { } } - /// Wait until all worker nodes (except meta nodes) exit. Used for graceful shutdown. - pub async fn wait_till_all_worker_nodes_exit(&self) -> MetaResult<()> { - let mut interval = tokio::time::interval(Duration::from_secs(1)); - let mut last_remaining = 0; - - loop { - interval.tick().await; - - let remaining = self - .list_worker_node(None, Some(State::Running)) - .await? - .into_iter() - .filter(|w| !matches!(w.r#type(), WorkerType::Meta)) // filter out meta node - .count(); - - if remaining == 0 { - tracing::info!("all worker nodes exited"); - return Ok(()); - } - - if remaining != last_remaining { - last_remaining = remaining; - warn!(remaining, "waiting for all worker nodes to exit..."); - } - } - } - pub async fn subscribe_active_streaming_compute_nodes( &self, ) -> MetaResult<(Vec, UnboundedReceiver)> { From a514fd2fccbb5d05bc187ecd3beec8ca9c2a7df1 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 25 Jul 2024 14:13:18 +0800 Subject: [PATCH 10/11] do not recover if the compute node is idle Signed-off-by: Bugen Zhao --- proto/stream_service.proto | 4 ++- src/meta/src/barrier/rpc.rs | 40 ++++++++++++++++++++------ src/stream/src/task/barrier_manager.rs | 20 +++++++------ 3 files changed, 45 insertions(+), 19 deletions(-) diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 663b7aa5bae56..fa6fe1eca0a55 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -108,7 +108,9 @@ message StreamingControlStreamRequest { message StreamingControlStreamResponse { message InitResponse {} - message ShutdownResponse {} + message ShutdownResponse { + bool has_running_actor = 1; + } oneof response { InitResponse init = 1; diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index bab9dd7235de1..2593fb264db38 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -176,10 +176,17 @@ impl ControlStreamManager { &mut self, ) -> Option<(WorkerId, MetaResult)> { let (worker_id, response_stream, result) = self.response_streams.next().await?; - if result.is_ok() { - self.response_streams - .push(into_future(worker_id, response_stream)); + + match result.as_ref().map(|r| r.response.as_ref().unwrap()) { + Ok(streaming_control_stream_response::Response::Shutdown(_)) | Err(_) => { + // Do not add it back to the `response_streams` so that it will not be polled again. + } + _ => { + self.response_streams + .push(into_future(worker_id, response_stream)); + } } + Some((worker_id, result)) } @@ -203,9 +210,25 @@ impl ControlStreamManager { .expect("should exist when get collect resp"); break Ok((worker_id, command.prev_epoch.value().0, resp)); } - Response::Shutdown(_) => { - // Directly return an error to trigger recovery. - break Err(anyhow!("worker node {worker_id} is shutting down").into()); + Response::Shutdown(resp) => { + let node = self + .nodes + .remove(&worker_id) + .expect("should exist when get shutdown resp"); + let has_inflight_barrier = !node.inflight_barriers.is_empty(); + + // Trigger recovery only if there are actors running. + // + // We need "or" here because... + // - even if there's no running actor, it's possible that new actors will be created + // soon because of upcoming inflight barriers; + // - even if there's no inflight barrier, it's possible that the next periodic barrier + // has not been produced yet. + if resp.has_running_actor || has_inflight_barrier { + break Err(anyhow!("worker node {worker_id} is shutting down").into()); + } else { + info!(node = ?node.worker, "idle worker node is shutting down, no need to recover"); + } } Response::Init(_) => { // This arm should be unreachable. @@ -213,14 +236,14 @@ impl ControlStreamManager { } }, Err(err) => { - let mut node = self + let node = self .nodes .remove(&worker_id) .expect("should exist when get collect resp"); // Note: No need to use `?` as the backtrace is from meta and not useful. warn!(node = ?node.worker, err = %err.as_report(), "get error from response stream"); - if let Some(command) = node.inflight_barriers.pop_front() { + if let Some(command) = node.inflight_barriers.into_iter().next() { // FIXME: this future can be cancelled during collection, so the error collection // might not work as expected. let errors = self.collect_errors(node.worker.id, err).await; @@ -230,7 +253,6 @@ impl ControlStreamManager { } else { // for node with no inflight barrier, simply ignore the error info!(node = ?node.worker, error = %err.as_report(), "no inflight barrier in the node, ignore error"); - continue; } } } diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 27344d20e5722..ae7be0d2f5f99 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -56,7 +56,9 @@ use risingwave_hummock_sdk::{LocalSstableInfo, SyncResult}; use risingwave_pb::common::ActorInfo; use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_service::streaming_control_stream_request::{InitRequest, Request}; -use risingwave_pb::stream_service::streaming_control_stream_response::InitResponse; +use risingwave_pb::stream_service::streaming_control_stream_response::{ + InitResponse, ShutdownResponse, +}; use risingwave_pb::stream_service::{ streaming_control_stream_response, BarrierCompleteResponse, BuildActorInfo, StreamingControlStreamRequest, StreamingControlStreamResponse, @@ -126,12 +128,16 @@ impl ControlStreamHandle { /// Send `Shutdown` message to the control stream and wait for the stream to be closed /// by the meta service. Notify the caller through `shutdown_sender` once it's done. - fn shutdown_stream(&mut self, shutdown_sender: oneshot::Sender<()>) { + fn shutdown_stream(&mut self, has_running_actor: bool, shutdown_sender: oneshot::Sender<()>) { + if has_running_actor { + tracing::warn!("shutdown with running actors, scaling or migration will be triggered"); + } + if let Some((sender, _)) = self.pair.take() { if sender .send(Ok(StreamingControlStreamResponse { response: Some(streaming_control_stream_response::Response::Shutdown( - Default::default(), + ShutdownResponse { has_running_actor }, )), })) .is_err() @@ -619,12 +625,8 @@ impl LocalBarrierWorker { let _ = result_sender.send(debug_info.to_string()); } LocalActorOperation::Shutdown { result_sender } => { - if !self.actor_manager_state.handles.is_empty() { - tracing::warn!( - "shutdown with running actors, scaling or migration will be triggered" - ); - } - self.control_stream_handle.shutdown_stream(result_sender); + self.control_stream_handle + .shutdown_stream(!self.actor_manager_state.handles.is_empty(), result_sender); } } } From 4dcfb9dfdd1a7b73e2c1cbf32d7b598d47e9dfad Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 26 Jul 2024 16:21:09 +0800 Subject: [PATCH 11/11] remove no-actor optimization & move actor op handler to async context Signed-off-by: Bugen Zhao --- proto/stream_service.proto | 4 +- src/meta/src/barrier/rpc.rs | 20 ++-------- src/stream/src/task/barrier_manager.rs | 53 +++++++++++++------------- 3 files changed, 31 insertions(+), 46 deletions(-) diff --git a/proto/stream_service.proto b/proto/stream_service.proto index fa6fe1eca0a55..663b7aa5bae56 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -108,9 +108,7 @@ message StreamingControlStreamRequest { message StreamingControlStreamResponse { message InitResponse {} - message ShutdownResponse { - bool has_running_actor = 1; - } + message ShutdownResponse {} oneof response { InitResponse init = 1; diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 2593fb264db38..f3f56db1a9a89 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -210,25 +210,13 @@ impl ControlStreamManager { .expect("should exist when get collect resp"); break Ok((worker_id, command.prev_epoch.value().0, resp)); } - Response::Shutdown(resp) => { - let node = self + Response::Shutdown(_) => { + let _ = self .nodes .remove(&worker_id) .expect("should exist when get shutdown resp"); - let has_inflight_barrier = !node.inflight_barriers.is_empty(); - - // Trigger recovery only if there are actors running. - // - // We need "or" here because... - // - even if there's no running actor, it's possible that new actors will be created - // soon because of upcoming inflight barriers; - // - even if there's no inflight barrier, it's possible that the next periodic barrier - // has not been produced yet. - if resp.has_running_actor || has_inflight_barrier { - break Err(anyhow!("worker node {worker_id} is shutting down").into()); - } else { - info!(node = ?node.worker, "idle worker node is shutting down, no need to recover"); - } + // TODO: if there's no actor running on the node, we can ignore and not trigger recovery. + break Err(anyhow!("worker node {worker_id} is shutting down").into()); } Response::Init(_) => { // This arm should be unreachable. diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index ae7be0d2f5f99..37cf7c697354a 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -127,38 +127,31 @@ impl ControlStreamHandle { } /// Send `Shutdown` message to the control stream and wait for the stream to be closed - /// by the meta service. Notify the caller through `shutdown_sender` once it's done. - fn shutdown_stream(&mut self, has_running_actor: bool, shutdown_sender: oneshot::Sender<()>) { - if has_running_actor { - tracing::warn!("shutdown with running actors, scaling or migration will be triggered"); - } - + /// by the meta service. + async fn shutdown_stream(&mut self) { if let Some((sender, _)) = self.pair.take() { if sender .send(Ok(StreamingControlStreamResponse { response: Some(streaming_control_stream_response::Response::Shutdown( - ShutdownResponse { has_running_actor }, + ShutdownResponse::default(), )), })) .is_err() { warn!("failed to notify shutdown of control stream"); } else { - tokio::spawn(async move { - tracing::info!("waiting for meta service to close control stream..."); - - // Wait for the stream to be closed, to ensure that the `Shutdown` message has - // been acknowledged by the meta service for more precise error report. - // - // This is because the meta service will reset the control stream manager and - // drop the connection to us upon recovery. As a result, the receiver part of - // this sender will also be dropped, causing the stream to close. - sender.closed().await; - - // Notify the caller that the shutdown is done. - let _ = shutdown_sender.send(()); - }); + tracing::info!("waiting for meta service to close control stream..."); + + // Wait for the stream to be closed, to ensure that the `Shutdown` message has + // been acknowledged by the meta service for more precise error report. + // + // This is because the meta service will reset the control stream manager and + // drop the connection to us upon recovery. As a result, the receiver part of + // this sender will also be dropped, causing the stream to close. + sender.closed().await; } + } else { + debug!("control stream has been reset, ignore shutdown"); } } @@ -245,6 +238,7 @@ pub(super) enum LocalBarrierEvent { Flush(oneshot::Sender<()>), } +#[derive(strum_macros::Display)] pub(super) enum LocalActorOperation { NewControlStream { handle: ControlStreamHandle, @@ -492,6 +486,15 @@ impl LocalBarrierWorker { response: Some(streaming_control_stream_response::Response::Init(InitResponse {})) }); } + LocalActorOperation::Shutdown { result_sender } => { + if !self.actor_manager_state.handles.is_empty() { + tracing::warn!( + "shutdown with running actors, scaling or migration will be triggered" + ); + } + self.control_stream_handle.shutdown_stream().await; + let _ = result_sender.send(()); + } actor_op => { self.handle_actor_op(actor_op); } @@ -577,8 +580,8 @@ impl LocalBarrierWorker { fn handle_actor_op(&mut self, actor_op: LocalActorOperation) { match actor_op { - LocalActorOperation::NewControlStream { .. } => { - unreachable!("NewControlStream event should be handled separately in async context") + LocalActorOperation::NewControlStream { .. } | LocalActorOperation::Shutdown { .. } => { + unreachable!("event {actor_op} should be handled separately in async context") } LocalActorOperation::DropActors { actors, @@ -624,10 +627,6 @@ impl LocalBarrierWorker { let debug_info = self.to_debug_info(); let _ = result_sender.send(debug_info.to_string()); } - LocalActorOperation::Shutdown { result_sender } => { - self.control_stream_handle - .shutdown_stream(!self.actor_manager_state.handles.is_empty(), result_sender); - } } } }