diff --git a/src/common/common_service/src/observer_manager.rs b/src/common/common_service/src/observer_manager.rs index bf7e457be8b1c..da61118f1ad49 100644 --- a/src/common/common_service/src/observer_manager.rs +++ b/src/common/common_service/src/observer_manager.rs @@ -157,7 +157,7 @@ where match self.rx.message().await { Ok(resp) => { if resp.is_none() { - tracing::error!("Stream of notification terminated."); + tracing::warn!("Stream of notification terminated."); self.re_subscribe().await; continue; } diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index d7dcbd5146c31..c1fb8d53ee6dc 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -456,8 +456,8 @@ pub async fn compute_node_serve( // Wait for the shutdown signal. shutdown.cancelled().await; - // TODO(shutdown): gracefully unregister from the meta service (need to cautious since it may - // trigger auto-scaling) + // TODO(shutdown): how does this interact with auto-scaling? + meta_client.try_unregister().await; // NOTE(shutdown): We can't simply join the tonic server here because it only returns when all // existing connections are closed, while we have long-running streaming calls that never diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index 74310c75374e5..feff3a27fe217 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -706,10 +706,11 @@ pub async fn start_service_as_election_leader( } } + let idle_shutdown = CancellationToken::new(); let _idle_checker_handle = IdleManager::start_idle_checker( env.idle_manager_ref(), Duration::from_secs(30), - shutdown.clone(), + idle_shutdown.clone(), ); let (abort_sender, abort_recv) = tokio::sync::oneshot::channel(); @@ -758,6 +759,7 @@ pub async fn start_service_as_election_leader( risingwave_pb::meta::event_log::Event::MetaNodeStart(event), ]); + let server_shutdown = CancellationToken::new(); let server = tonic::transport::Server::builder() .layer(MetricsMiddlewareLayer::new(meta_metrics)) .layer(TracingExtractLayer::new()) @@ -789,14 +791,31 @@ pub async fn start_service_as_election_leader( tcp_nodelay: true, keepalive_duration: None, }, - shutdown.clone().cancelled_owned(), + server_shutdown.clone().cancelled_owned(), ); started::set(); let _server_handle = tokio::spawn(server); // Wait for the shutdown signal. - shutdown.cancelled().await; - // TODO(shutdown): may warn user if there's any other node still running in the cluster. + tokio::select! { + // Idle manager informs to shutdown. Do nothing else but directly return. + _ = idle_shutdown.cancelled() => {} + + // External shutdown signal. + _ = shutdown.cancelled() => { + // Wait for all other workers to shutdown for gracefulness. + if election_client.is_leader() { + let res = metadata_manager.wait_till_all_worker_nodes_exit().await; + if let Err(e) = res { + tracing::error!( + error = %e.as_report(), + "error occurs while waiting for all worker nodes to exit, directly shutdown", + ); + } + } + server_shutdown.cancel(); + } + } // TODO(shutdown): do we have any other shutdown tasks? Ok(()) } diff --git a/src/meta/src/barrier/info.rs b/src/meta/src/barrier/info.rs index f6617b9ceef47..1468f27b5a6f8 100644 --- a/src/meta/src/barrier/info.rs +++ b/src/meta/src/barrier/info.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use risingwave_common::catalog::TableId; use risingwave_pb::common::PbWorkerNode; @@ -134,11 +134,26 @@ impl InflightActorInfo { .into_iter() .map(|node| (node.id, node)) .collect::>(); - for (actor_id, location) in &self.actor_location_map { - if !new_node_map.contains_key(location) { - warn!(actor_id, location, node = ?self.node_map.get(location), "node with running actors is deleted"); + + let mut deleted_actors = BTreeMap::new(); + for (&actor_id, &location) in &self.actor_location_map { + if !new_node_map.contains_key(&location) { + deleted_actors + .entry(location) + .or_insert_with(BTreeSet::new) + .insert(actor_id); } } + for (node_id, actors) in deleted_actors { + let node = self.node_map.get(&node_id); + warn!( + node_id, + ?node, + ?actors, + "node with running actors is deleted" + ); + } + self.node_map = new_node_map; } diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 241a47941755b..cdb9efff793ef 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -317,6 +317,33 @@ impl MetadataManager { } } + /// Wait until all worker nodes (except meta nodes) exit. Used for graceful shutdown. + pub async fn wait_till_all_worker_nodes_exit(&self) -> MetaResult<()> { + let mut interval = tokio::time::interval(Duration::from_secs(1)); + let mut last_remaining = 0; + + loop { + interval.tick().await; + + let remaining = self + .list_worker_node(None, Some(State::Running)) + .await? + .into_iter() + .filter(|w| !matches!(w.r#type(), WorkerType::Meta)) // filter out meta node + .count(); + + if remaining == 0 { + tracing::info!("all worker nodes exited"); + return Ok(()); + } + + if remaining != last_remaining { + last_remaining = remaining; + warn!(remaining, "waiting for all worker nodes to exit..."); + } + } + } + pub async fn subscribe_active_streaming_compute_nodes( &self, ) -> MetaResult<(Vec, UnboundedReceiver)> { diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index 0dabc9b19022d..518d6e7b3eafb 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -94,7 +94,7 @@ impl DdlController { { Ok(version) => Ok(version), Err(err) => { - tracing::error!(id = job_id, error = ?err.as_report(), "failed to create streaming job"); + tracing::error!(id = job_id, error = %err.as_report(), "failed to create streaming job"); let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail { id: streaming_job.id(), name: streaming_job.name(), diff --git a/src/storage/src/hummock/event_handler/mod.rs b/src/storage/src/hummock/event_handler/mod.rs index 996d5d6a6df72..4fdd656d14488 100644 --- a/src/storage/src/hummock/event_handler/mod.rs +++ b/src/storage/src/hummock/event_handler/mod.rs @@ -208,7 +208,7 @@ impl Drop for LocalInstanceGuard { instance_id: self.instance_id, }) .unwrap_or_else(|err| { - tracing::error!( + tracing::debug!( error = %err.as_report(), table_id = %self.table_id, instance_id = self.instance_id, diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index f7794604e5a8b..4127aebd2eea9 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -35,7 +35,6 @@ use risingwave_pb::hummock::SstableInfo; use risingwave_rpc_client::HummockMetaClient; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use tokio::sync::oneshot; -use tracing::error; use super::local_hummock_storage::LocalHummockStorage; use super::version::{read_filter_for_version, CommittedVersion, HummockVersionReader}; @@ -72,7 +71,7 @@ impl Drop for HummockStorageShutdownGuard { let _ = self .shutdown_sender .send(HummockEvent::Shutdown) - .inspect_err(|e| error!(event = ?e.0, "unable to send shutdown")); + .inspect_err(|e| tracing::warn!(event = ?e.0, "unable to send shutdown")); } }