diff --git a/src/cmd_all/src/standalone.rs b/src/cmd_all/src/standalone.rs index beaf46bafc498..90ba35b99d626 100644 --- a/src/cmd_all/src/standalone.rs +++ b/src/cmd_all/src/standalone.rs @@ -204,7 +204,14 @@ pub async fn standalone( } }); // wait for the service to be ready - tokio::time::sleep(std::time::Duration::from_secs(1)).await; + let mut tries = 0; + while !risingwave_meta_node::is_server_started() { + if tries % 50 == 0 { + tracing::info!("waiting for meta service to be ready..."); + } + tries += 1; + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } } if let Some(opts) = compute_opts { tracing::info!("starting compute-node thread with cli args: {:?}", opts); diff --git a/src/common/metrics/src/monitor/connection.rs b/src/common/metrics/src/monitor/connection.rs index 44b0164a6f62c..f5570ff97f60f 100644 --- a/src/common/metrics/src/monitor/connection.rs +++ b/src/common/metrics/src/monitor/connection.rs @@ -521,13 +521,19 @@ impl Endpoint { #[cfg(not(madsim))] #[easy_ext::ext(RouterExt)] impl tonic::transport::server::Router { - pub async fn monitored_serve_with_shutdown( + /// Serve the given service while monitoring the connection. + /// + /// Calling the function will first bind the given service to the given address. Awaiting the + /// returned future will then start the server and keep it running until the given signal + /// future resolves. + pub fn monitored_serve_with_shutdown( self, listen_addr: std::net::SocketAddr, connection_type: impl Into, config: TcpConfig, signal: impl Future, - ) where + ) -> impl Future + where L: tower_layer::Layer, L::Service: Service< http::request::Request, @@ -544,21 +550,26 @@ impl tonic::transport::server::Router { ResBody: http_body::Body + Send + 'static, ResBody::Error: Into>, { + let connection_type = connection_type.into(); let incoming = tonic::transport::server::TcpIncoming::new( listen_addr, config.tcp_nodelay, config.keepalive_duration, ) - .unwrap_or_else(|err| panic!("failed to connect to {listen_addr}: {}", err.as_report())); - let incoming = MonitoredConnection::new( - incoming, - MonitorNewConnectionImpl { - connection_type: connection_type.into(), - }, - ); - self.serve_with_incoming_shutdown(incoming, signal) - .await - .unwrap() + .unwrap_or_else(|err| { + panic!( + "failed to bind `{connection_type}` to `{listen_addr}`: {}", + err.as_report() + ) + }); + let incoming = + MonitoredConnection::new(incoming, MonitorNewConnectionImpl { connection_type }); + + async move { + self.serve_with_incoming_shutdown(incoming, signal) + .await + .unwrap() + } } } diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 134d3843b4cc7..0330f2af1de3d 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -32,6 +32,7 @@ use risingwave_meta::*; use risingwave_meta_service::*; pub use rpc::{ElectionClient, ElectionMember, EtcdElectionClient}; use server::rpc_serve; +pub use server::started::get as is_server_started; use crate::manager::MetaOpts; diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index e53759349568e..bf5bb72ed731b 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -107,6 +107,25 @@ use crate::stream::{GlobalStreamManager, SourceManager}; use crate::telemetry::{MetaReportCreator, MetaTelemetryInfoFetcher}; use crate::{hummock, serving, MetaError, MetaResult}; +/// Used for standalone mode checking the status of the meta service. +/// This can be easier and more accurate than checking the TCP connection. +pub mod started { + use std::sync::atomic::AtomicBool; + use std::sync::atomic::Ordering::Relaxed; + + static STARTED: AtomicBool = AtomicBool::new(false); + + /// Mark the meta service as started. + pub(crate) fn set() { + STARTED.store(true, Relaxed); + } + + /// Check if the meta service has started. + pub fn get() -> bool { + STARTED.load(Relaxed) + } +} + pub async fn rpc_serve( address_info: AddressInfo, meta_store_backend: MetaStoreBackend, @@ -334,7 +353,8 @@ pub async fn start_service_as_election_follower( }); let health_srv = HealthServiceImpl::new(); - tonic::transport::Server::builder() + + let server = tonic::transport::Server::builder() .layer(MetricsMiddlewareLayer::new(Arc::new( GLOBAL_META_METRICS.clone(), ))) @@ -366,8 +386,9 @@ pub async fn start_service_as_election_follower( }, } }, - ) - .await; + ); + started::set(); + server.await; } /// Starts all services needed for the meta leader node @@ -777,7 +798,7 @@ pub async fn start_service_as_election_leader( risingwave_pb::meta::event_log::Event::MetaNodeStart(event), ]); - tonic::transport::Server::builder() + let server = tonic::transport::Server::builder() .layer(MetricsMiddlewareLayer::new(meta_metrics)) .layer(TracingExtractLayer::new()) .add_service(HeartbeatServiceServer::new(heartbeat_srv)) @@ -822,8 +843,9 @@ pub async fn start_service_as_election_leader( }, } }, - ) - .await; + ); + started::set(); + server.await; #[cfg(not(madsim))] if let Some(dashboard_task) = dashboard_task {