diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs index 7d21fc44b4e57..9017f58606f7c 100644 --- a/src/cmd/src/lib.rs +++ b/src/cmd/src/lib.rs @@ -44,8 +44,7 @@ pub fn compute(opts: ComputeNodeOpts) -> ! { pub fn meta(opts: MetaNodeOpts) -> ! { init_risingwave_logger(LoggerSettings::from_opts(&opts)); - // TODO(shutdown): pass the shutdown token - main_okk(|_| risingwave_meta_node::start(opts)); + main_okk(|shutdown| risingwave_meta_node::start(opts, shutdown)); } pub fn frontend(opts: FrontendOpts) -> ! { diff --git a/src/cmd_all/src/standalone.rs b/src/cmd_all/src/standalone.rs index 4f8c208c89aa3..325f2f8ff395b 100644 --- a/src/cmd_all/src/standalone.rs +++ b/src/cmd_all/src/standalone.rs @@ -194,9 +194,10 @@ pub async fn standalone( is_in_memory = matches!(opts.backend, Some(MetaBackend::Mem)); tracing::info!("starting meta-node thread with cli args: {:?}", opts); + let shutdown = shutdown.clone(); let _meta_handle = tokio::spawn(async move { let dangerous_max_idle_secs = opts.dangerous_max_idle_secs; - risingwave_meta_node::start(opts).await; + risingwave_meta_node::start(opts, shutdown).await; tracing::warn!("meta is stopped, shutdown all nodes"); if let Some(idle_exit_secs) = dangerous_max_idle_secs { eprintln!("{}", diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index a7600ba930a15..5fd658c8a6581 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -26,6 +26,7 @@ use redact::Secret; use risingwave_common::config::OverrideConfig; use risingwave_common::util::meta_addr::MetaAddressStrategy; use risingwave_common::util::resource_util; +use risingwave_common::util::tokio_util::sync::CancellationToken; use risingwave_common::{GIT_SHA, RW_VERSION}; use risingwave_common_heap_profiling::HeapProfiler; use risingwave_meta::*; @@ -204,7 +205,10 @@ use risingwave_common::config::{load_config, MetaBackend, RwConfig}; use tracing::info; /// Start meta node -pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { +pub fn start( + opts: MetaNodeOpts, + shutdown: CancellationToken, +) -> Pin + Send>> { // WARNING: don't change the function signature. Making it `async fn` will cause // slow compile in release mode. Box::pin(async move { @@ -324,7 +328,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { max_timeout_ms / 1000 } + MIN_TIMEOUT_INTERVAL_SEC; - let (mut join_handle, leader_lost_handle, shutdown_send) = rpc_serve( + rpc_serve( add_info, backend, max_heartbeat_interval, @@ -428,42 +432,10 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { }, config.system.into_init_system_params(), Default::default(), + shutdown, ) .await .unwrap(); - - tracing::info!("Meta server listening at {}", listen_addr); - - match leader_lost_handle { - None => { - tokio::select! { - _ = tokio::signal::ctrl_c() => { - tracing::info!("receive ctrl+c"); - shutdown_send.send(()).unwrap(); - join_handle.await.unwrap() - } - res = &mut join_handle => res.unwrap(), - }; - } - Some(mut handle) => { - tokio::select! { - _ = &mut handle => { - tracing::info!("receive leader lost signal"); - // When we lose leadership, we will exit as soon as possible. - } - _ = tokio::signal::ctrl_c() => { - tracing::info!("receive ctrl+c"); - shutdown_send.send(()).unwrap(); - join_handle.await.unwrap(); - handle.abort(); - } - res = &mut join_handle => { - res.unwrap(); - handle.abort(); - }, - }; - } - }; }) } diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index 9098ee1429c82..55f069e5e0104 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -16,9 +16,7 @@ use std::sync::Arc; use std::time::Duration; use anyhow::Context; -use either::Either; use etcd_client::ConnectOptions; -use futures::future::join_all; use otlp_embedded::TraceServiceServer; use regex::Regex; use risingwave_common::monitor::{RouterExt, TcpConfig}; @@ -26,11 +24,13 @@ use risingwave_common::session_config::SessionConfig; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::telemetry::manager::TelemetryManager; use risingwave_common::telemetry::{report_scarf_enabled, report_to_scarf, telemetry_env_enabled}; +use risingwave_common::util::tokio_util::sync::CancellationToken; use risingwave_common_service::{MetricsManager, TracingExtractLayer}; use risingwave_meta::barrier::StreamRpcManager; use risingwave_meta::controller::catalog::CatalogController; use risingwave_meta::controller::cluster::ClusterController; use risingwave_meta::manager::{MetaStoreImpl, MetadataManager, SystemParamsManagerImpl}; +use risingwave_meta::rpc::election::dummy::DummyElectionClient; use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer; use risingwave_meta::rpc::ElectionClientRef; use risingwave_meta::stream::ScaleController; @@ -76,10 +76,7 @@ use risingwave_pb::user::user_service_server::UserServiceServer; use risingwave_rpc_client::ComputeClientPool; use sea_orm::{ConnectionTrait, DbBackend}; use thiserror_ext::AsReport; -use tokio::sync::oneshot::{channel as OneChannel, Receiver as OneReceiver}; use tokio::sync::watch; -use tokio::sync::watch::{Receiver as WatchReceiver, Sender as WatchSender}; -use tokio::task::JoinHandle; use crate::backup_restore::BackupManager; use crate::barrier::{BarrierScheduler, GlobalBarrierManager}; @@ -124,6 +121,9 @@ pub mod started { } } +/// A wrapper around [`rpc_serve_with_store`] that dispatches different store implementations. +/// +/// For the timing of returning, see [`rpc_serve_with_store`]. pub async fn rpc_serve( address_info: AddressInfo, meta_store_backend: MetaStoreBackend, @@ -132,7 +132,8 @@ pub async fn rpc_serve( opts: MetaOpts, init_system_params: SystemParams, init_session_config: SessionConfig, -) -> MetaResult<(JoinHandle<()>, Option>, WatchSender<()>)> { + shutdown: CancellationToken, +) -> MetaResult<()> { match meta_store_backend { MetaStoreBackend::Etcd { endpoints, @@ -168,27 +169,34 @@ pub async fn rpc_serve( rpc_serve_with_store( MetaStoreImpl::Kv(meta_store), - Some(election_client), + election_client, address_info, max_cluster_heartbeat_interval, lease_interval_secs, opts, init_system_params, init_session_config, + shutdown, ) + .await } MetaStoreBackend::Mem => { let meta_store = MemStore::new().into_ref(); + let dummy_election_client = Arc::new(DummyElectionClient::new( + address_info.advertise_addr.clone(), + )); rpc_serve_with_store( MetaStoreImpl::Kv(meta_store), - None, + dummy_election_client, address_info, max_cluster_heartbeat_interval, lease_interval_secs, opts, init_system_params, init_session_config, + shutdown, ) + .await } MetaStoreBackend::Sql { endpoint } => { let max_connection = if DbBackend::Sqlite.is_prefix_of(&endpoint) { @@ -225,130 +233,120 @@ pub async fn rpc_serve( rpc_serve_with_store( MetaStoreImpl::Sql(meta_store_sql), - Some(election_client), + election_client, address_info, max_cluster_heartbeat_interval, lease_interval_secs, opts, init_system_params, init_session_config, + shutdown, ) + .await } } } -#[expect(clippy::type_complexity)] -pub fn rpc_serve_with_store( +/// Bootstraps the follower or leader service based on the election status. +/// +/// Returns when the `shutdown` token is triggered, or when leader status is lost, or if the leader +/// service fails to start. +pub async fn rpc_serve_with_store( meta_store_impl: MetaStoreImpl, - election_client: Option, + election_client: ElectionClientRef, address_info: AddressInfo, max_cluster_heartbeat_interval: Duration, lease_interval_secs: u64, opts: MetaOpts, init_system_params: SystemParams, init_session_config: SessionConfig, -) -> MetaResult<(JoinHandle<()>, Option>, WatchSender<()>)> { - let (svc_shutdown_tx, svc_shutdown_rx) = watch::channel(()); + shutdown: CancellationToken, +) -> MetaResult<()> { + // TODO(shutdown): directly use cancellation token + let (election_shutdown_tx, election_shutdown_rx) = watch::channel(()); - let leader_lost_handle = if let Some(election_client) = election_client.clone() { - let stop_rx = svc_shutdown_tx.subscribe(); + let election_handle = tokio::spawn({ + let shutdown = shutdown.clone(); + let election_client = election_client.clone(); - let handle = tokio::spawn(async move { + async move { while let Err(e) = election_client - .run_once(lease_interval_secs as i64, stop_rx.clone()) + .run_once(lease_interval_secs as i64, election_shutdown_rx.clone()) .await { tracing::error!(error = %e.as_report(), "election error happened"); } - }); + // Leader lost, shutdown the service. + shutdown.cancel(); + } + }); - Some(handle) - } else { - None - }; + // Spawn and run the follower service if not the leader. + // Watch the leader status and switch to the leader service when elected. + // TODO: the branch seems to be always hit since the default value of `is_leader` is false until + // the election is done (unless using `DummyElectionClient`). + if !election_client.is_leader() { + // The follower service can be shutdown separately if we're going to be the leader. + let follower_shutdown = shutdown.child_token(); + + let follower_handle = tokio::spawn(start_service_as_election_follower( + follower_shutdown.clone(), + address_info.clone(), + election_client.clone(), + )); - let join_handle = tokio::spawn(async move { - if let Some(election_client) = election_client.clone() { - let mut is_leader_watcher = election_client.subscribe(); - let mut svc_shutdown_rx_clone = svc_shutdown_rx.clone(); - let (follower_shutdown_tx, follower_shutdown_rx) = OneChannel::<()>(); + // Watch and wait until we become the leader. + let mut is_leader_watcher = election_client.subscribe(); + while !*is_leader_watcher.borrow_and_update() { tokio::select! { - _ = svc_shutdown_rx_clone.changed() => return, + // External shutdown signal. Directly return without switching to leader. + _ = shutdown.cancelled() => return Ok(()), + res = is_leader_watcher.changed() => { if res.is_err() { tracing::error!("leader watcher recv failed"); } } } - let svc_shutdown_rx_clone = svc_shutdown_rx.clone(); - - // If not the leader, spawn a follower. - let follower_handle: Option> = if !*is_leader_watcher.borrow() { - let address_info_clone = address_info.clone(); - - let election_client_ = election_client.clone(); - Some(tokio::spawn(async move { - start_service_as_election_follower( - svc_shutdown_rx_clone, - follower_shutdown_rx, - address_info_clone, - Some(election_client_), - ) - .await; - })) - } else { - None - }; + } - let mut svc_shutdown_rx_clone = svc_shutdown_rx.clone(); - while !*is_leader_watcher.borrow_and_update() { - tokio::select! { - _ = svc_shutdown_rx_clone.changed() => { - return; - } - res = is_leader_watcher.changed() => { - if res.is_err() { - tracing::error!("leader watcher recv failed"); - } - } - } - } + tracing::info!("elected as leader, shutting down follower services"); + follower_shutdown.cancel(); + let _ = follower_handle.await; + } - if let Some(handle) = follower_handle { - let _res = follower_shutdown_tx.send(()); - let _ = handle.await; - } - }; + // Run the leader service. + let result = start_service_as_election_leader( + meta_store_impl, + address_info, + max_cluster_heartbeat_interval, + opts, + init_system_params, + init_session_config, + election_client, + shutdown, + ) + .await; - start_service_as_election_leader( - meta_store_impl, - address_info, - max_cluster_heartbeat_interval, - opts, - init_system_params, - init_session_config, - election_client, - svc_shutdown_rx, - ) - .await - .expect("Unable to start leader services"); - }); + // Leader service has stopped, shutdown the election service to gracefully resign. + election_shutdown_tx.send(()).ok(); + let _ = election_handle.await; - Ok((join_handle, leader_lost_handle, svc_shutdown_tx)) + result } -/// Starts all services needed for the meta follower node +/// Starts all services needed for the meta follower node. +/// +/// Returns when the `shutdown` token is triggered. pub async fn start_service_as_election_follower( - mut svc_shutdown_rx: WatchReceiver<()>, - follower_shutdown_rx: OneReceiver<()>, + shutdown: CancellationToken, address_info: AddressInfo, - election_client: Option, + election_client: ElectionClientRef, ) { - let meta_member_srv = MetaMemberServiceImpl::new(match election_client { - None => Either::Right(address_info.clone()), - Some(election_client) => Either::Left(election_client), - }); + tracing::info!("starting follower services"); + + let meta_member_srv = MetaMemberServiceImpl::new(election_client); let health_srv = HealthServiceImpl::new(); @@ -366,35 +364,21 @@ pub async fn start_service_as_election_follower( tcp_nodelay: true, keepalive_duration: None, }, - async move { - tokio::select! { - // shutdown service if all services should be shut down - res = svc_shutdown_rx.changed() => { - match res { - Ok(_) => tracing::info!("Shutting down services"), - Err(_) => tracing::error!("Service shutdown sender dropped") - } - }, - // shutdown service if follower becomes leader - res = follower_shutdown_rx => { - match res { - Ok(_) => tracing::info!("Shutting down follower services"), - Err(_) => tracing::error!("Follower service shutdown sender dropped") - } - }, - } - }, + shutdown.clone().cancelled_owned(), ); + let server_handle = tokio::spawn(server); started::set(); - server.await; + + // Wait for the shutdown signal. + shutdown.cancelled().await; + // Wait for the server to shutdown. This is necessary because we may be transitioning from follower + // to leader, and conflicts on the services must be avoided. + let _ = server_handle.await; } -/// Starts all services needed for the meta leader node -/// Only call this function once, since initializing the services multiple times will result in an -/// inconsistent state +/// Starts all services needed for the meta leader node. /// -/// ## Returns -/// Returns an error if the service initialization failed +/// Returns when the `shutdown` token is triggered, or if the service initialization fails. pub async fn start_service_as_election_leader( meta_store_impl: MetaStoreImpl, address_info: AddressInfo, @@ -402,10 +386,11 @@ pub async fn start_service_as_election_leader( opts: MetaOpts, init_system_params: SystemParams, init_session_config: SessionConfig, - election_client: Option, - mut svc_shutdown_rx: WatchReceiver<()>, + election_client: ElectionClientRef, + shutdown: CancellationToken, ) -> MetaResult<()> { - tracing::info!("Defining leader services"); + tracing::info!("starting leader services"); + let env = MetaSrvEnv::new( opts.clone(), init_system_params, @@ -479,10 +464,7 @@ pub async fn start_service_as_election_leader( .unwrap(); let object_store_media_type = hummock_manager.object_store_media_type(); - let meta_member_srv = MetaMemberServiceImpl::new(match election_client.clone() { - None => Either::Right(address_info.clone()), - Some(election_client) => Either::Left(election_client), - }); + let meta_member_srv = MetaMemberServiceImpl::new(election_client.clone()); let prometheus_client = opts.prometheus_endpoint.as_ref().map(|x| { use std::str::FromStr; @@ -504,7 +486,7 @@ pub async fn start_service_as_election_leader( let trace_srv = otlp_embedded::TraceServiceImpl::new(trace_state.clone()); #[cfg(not(madsim))] - let dashboard_task = if let Some(ref dashboard_addr) = address_info.dashboard_addr { + let _dashboard_task = if let Some(ref dashboard_addr) = address_info.dashboard_addr { let dashboard_service = crate::dashboard::DashboardService { dashboard_addr: *dashboard_addr, prometheus_client, @@ -537,6 +519,7 @@ pub async fn start_service_as_election_leader( ); let (sink_manager, shutdown_handle) = SinkCoordinatorManager::start_worker(); + // TODO(shutdown): remove this as there's no need to gracefully shutdown some of these sub-tasks. let mut sub_tasks = vec![shutdown_handle]; let stream_rpc_manager = StreamRpcManager::new(env.clone()); @@ -721,17 +704,17 @@ pub async fn start_service_as_election_leader( sub_tasks.push(stream_manager.start_auto_parallelism_monitor()); } } - let (idle_send, idle_recv) = tokio::sync::oneshot::channel(); - sub_tasks.push(IdleManager::start_idle_checker( + + let _idle_checker_handle = IdleManager::start_idle_checker( env.idle_manager_ref(), Duration::from_secs(30), - idle_send, - )); + shutdown.clone(), + ); let (abort_sender, abort_recv) = tokio::sync::oneshot::channel(); let notification_mgr = env.notification_manager_ref(); let stream_abort_handler = tokio::spawn(async move { - abort_recv.await.unwrap(); + let _ = abort_recv.await; notification_mgr.abort_all().await; compactor_manager.abort_all_compactors(); }); @@ -762,33 +745,6 @@ pub async fn start_service_as_election_leader( sub_tasks.push(pair); } - let shutdown_all = async move { - let mut handles = Vec::with_capacity(sub_tasks.len()); - - for (join_handle, shutdown_sender) in sub_tasks { - if let Err(_err) = shutdown_sender.send(()) { - continue; - } - - handles.push(join_handle); - } - - // The barrier manager can't be shutdown gracefully if it's under recovering, try to - // abort it using timeout. - match tokio::time::timeout(Duration::from_secs(1), join_all(handles)).await { - Ok(results) => { - for result in results { - if result.is_err() { - tracing::warn!("Failed to join shutdown"); - } - } - } - Err(_e) => { - tracing::warn!("Join shutdown timeout"); - } - } - }; - tracing::info!("Assigned cluster id {:?}", *env.cluster_id()); tracing::info!("Starting meta services"); @@ -832,28 +788,15 @@ pub async fn start_service_as_election_leader( tcp_nodelay: true, keepalive_duration: None, }, - async move { - tokio::select! { - res = svc_shutdown_rx.changed() => { - match res { - Ok(_) => tracing::info!("Shutting down services"), - Err(_) => tracing::error!("Service shutdown receiver dropped") - } - shutdown_all.await; - }, - _ = idle_recv => { - shutdown_all.await; - }, - } - }, + shutdown.clone().cancelled_owned(), ); started::set(); - server.await; + let _server_handle = tokio::spawn(server); - #[cfg(not(madsim))] - if let Some(dashboard_task) = dashboard_task { - dashboard_task.abort(); - } + // Wait for the shutdown signal. + shutdown.cancelled().await; + // TODO(shutdown): may warn user if there's any other node still running in the cluster. + // TODO(shutdown): do we have any other shutdown tasks? Ok(()) } diff --git a/src/meta/service/src/meta_member_service.rs b/src/meta/service/src/meta_member_service.rs index b8f5d9ebf92c4..946337d248485 100644 --- a/src/meta/service/src/meta_member_service.rs +++ b/src/meta/service/src/meta_member_service.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use either::Either; use risingwave_common::util::addr::HostAddr; use risingwave_meta::rpc::ElectionClientRef; use risingwave_pb::common::HostAddress; @@ -20,17 +19,14 @@ use risingwave_pb::meta::meta_member_service_server::MetaMemberService; use risingwave_pb::meta::{MembersRequest, MembersResponse, MetaMember}; use tonic::{Request, Response, Status}; -use crate::AddressInfo; #[derive(Clone)] pub struct MetaMemberServiceImpl { - election_client_or_self: Either, + election_client: ElectionClientRef, } impl MetaMemberServiceImpl { - pub fn new(election_client_or_self: Either) -> Self { - MetaMemberServiceImpl { - election_client_or_self, - } + pub fn new(election_client: ElectionClientRef) -> Self { + MetaMemberServiceImpl { election_client } } } @@ -41,39 +37,20 @@ impl MetaMemberService for MetaMemberServiceImpl { &self, _request: Request, ) -> Result, Status> { - let members = match &self.election_client_or_self { - Either::Left(election_client) => { - let mut members = vec![]; - for member in election_client.get_members().await? { - let host_addr = member - .id - .parse::() - .map_err(|err| Status::from_error(err.into()))?; - members.push(MetaMember { - address: Some(HostAddress { - host: host_addr.host, - port: host_addr.port.into(), - }), - is_leader: member.is_leader, - }) - } - - members - } - Either::Right(self_as_leader) => { - let host_addr = self_as_leader - .advertise_addr - .parse::() - .map_err(|err| Status::from_error(err.into()))?; - vec![MetaMember { - address: Some(HostAddress { - host: host_addr.host, - port: host_addr.port.into(), - }), - is_leader: true, - }] - } - }; + let mut members = vec![]; + for member in self.election_client.get_members().await? { + let host_addr = member + .id + .parse::() + .map_err(|err| Status::from_error(err.into()))?; + members.push(MetaMember { + address: Some(HostAddress { + host: host_addr.host, + port: host_addr.port.into(), + }), + is_leader: member.is_leader, + }) + } Ok(Response::new(MembersResponse { members })) } diff --git a/src/meta/src/manager/idle.rs b/src/meta/src/manager/idle.rs index afa9fbf860932..431575e665b88 100644 --- a/src/meta/src/manager/idle.rs +++ b/src/meta/src/manager/idle.rs @@ -16,7 +16,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; -use tokio::sync::oneshot::Sender; +use risingwave_common::util::tokio_util::sync::CancellationToken; use tokio::task::JoinHandle; /// `IdleManager` keeps track of latest activity and report whether the meta service has been @@ -77,24 +77,17 @@ impl IdleManager { pub fn start_idle_checker( idle_manager: IdleManagerRef, check_interval: Duration, - idle_send: tokio::sync::oneshot::Sender<()>, - ) -> (JoinHandle<()>, Sender<()>) { + shutdown: CancellationToken, + ) -> JoinHandle<()> { let dur = idle_manager.get_config_max_idle(); if !dur.is_zero() { tracing::warn!("--dangerous-max-idle-secs is set. The meta server will be automatically stopped after idle for {:?}.", dur) } - let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel(); - let join_handle = tokio::spawn(async move { + tokio::spawn(async move { let mut min_interval = tokio::time::interval(check_interval); loop { - tokio::select! { - _ = min_interval.tick() => {}, - _ = &mut shutdown_rx => { - tracing::info!("Idle checker is stopped"); - return; - } - } + min_interval.tick().await; if idle_manager.is_exceeding_max_idle() { break; } @@ -104,9 +97,9 @@ impl IdleManager { idle_manager.get_config_max_idle() ); tracing::warn!("Idle checker is shutting down the server"); - let _ = idle_send.send(()); - }); - (join_handle, shutdown_tx) + + shutdown.cancel(); + }) } } diff --git a/src/meta/src/rpc/election/dummy.rs b/src/meta/src/rpc/election/dummy.rs new file mode 100644 index 0000000000000..567958dd08600 --- /dev/null +++ b/src/meta/src/rpc/election/dummy.rs @@ -0,0 +1,73 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use tokio::sync::watch::{self, Receiver, Sender}; + +use crate::{ElectionClient, ElectionMember, MetaResult}; + +/// A dummy implementation of [`ElectionClient`] for scenarios where only one meta node is running, +/// typically for testing purposes such as an in-memory meta store. +/// +/// This can be used to unify the code paths no matter there's HA or not. +pub struct DummyElectionClient { + id: String, + + /// A dummy watcher that never changes, indicating we are always the leader. + dummy_watcher: Sender, +} + +impl DummyElectionClient { + pub fn new(id: String) -> Self { + Self { + id, + dummy_watcher: watch::channel(true).0, + } + } + + fn self_member(&self) -> ElectionMember { + ElectionMember { + id: self.id.clone(), + is_leader: true, + } + } +} + +#[async_trait::async_trait] +impl ElectionClient for DummyElectionClient { + fn id(&self) -> MetaResult { + Ok(self.id.clone()) + } + + async fn run_once(&self, _ttl: i64, mut stop: Receiver<()>) -> MetaResult<()> { + // Only exit when the stop signal is received. + let _ = stop.changed().await; + Ok(()) + } + + fn subscribe(&self) -> Receiver { + self.dummy_watcher.subscribe() + } + + async fn leader(&self) -> MetaResult> { + Ok(Some(self.self_member())) + } + + async fn get_members(&self) -> MetaResult> { + Ok(vec![self.self_member()]) + } + + fn is_leader(&self) -> bool { + true + } +} diff --git a/src/meta/src/rpc/election/etcd.rs b/src/meta/src/rpc/election/etcd.rs index 96b16f537356e..6834591764360 100644 --- a/src/meta/src/rpc/election/etcd.rs +++ b/src/meta/src/rpc/election/etcd.rs @@ -36,7 +36,7 @@ pub struct EtcdElectionClient { #[async_trait::async_trait] impl ElectionClient for EtcdElectionClient { - async fn is_leader(&self) -> bool { + fn is_leader(&self) -> bool { *self.is_leader_sender.borrow() } @@ -404,7 +404,7 @@ mod tests { let leader = new_followers.into_iter().next().unwrap(); - assert!(leader.1.is_leader().await); + assert!(leader.1.is_leader()); } #[tokio::test] @@ -434,7 +434,7 @@ mod tests { let mut leaders = vec![]; let mut followers = vec![]; for (sender, client) in clients { - if client.is_leader().await { + if client.is_leader() { leaders.push((sender, client)); } else { followers.push((sender, client)); @@ -476,7 +476,7 @@ mod tests { } for client in &clients { - assert!(!client.1.is_leader().await); + assert!(!client.1.is_leader()); } for (stop_sender, client) in &clients { diff --git a/src/meta/src/rpc/election/mod.rs b/src/meta/src/rpc/election/mod.rs index 0c65d497b677e..9b34d19ce2244 100644 --- a/src/meta/src/rpc/election/mod.rs +++ b/src/meta/src/rpc/election/mod.rs @@ -11,6 +11,8 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + +pub mod dummy; pub mod etcd; pub mod sql; @@ -34,9 +36,12 @@ pub trait ElectionClient: Send + Sync + 'static { } fn id(&self) -> MetaResult; + /// Run the long-running election process. + /// + /// Returns when the leader status is lost, or the stop signal is received. async fn run_once(&self, ttl: i64, stop: Receiver<()>) -> MetaResult<()>; fn subscribe(&self) -> Receiver; async fn leader(&self) -> MetaResult>; async fn get_members(&self) -> MetaResult>; - async fn is_leader(&self) -> bool; + fn is_leader(&self) -> bool; } diff --git a/src/meta/src/rpc/election/sql.rs b/src/meta/src/rpc/election/sql.rs index 9ec5bd199cf76..62694aaa3ded3 100644 --- a/src/meta/src/rpc/election/sql.rs +++ b/src/meta/src/rpc/election/sql.rs @@ -781,7 +781,7 @@ where .collect()) } - async fn is_leader(&self) -> bool { + fn is_leader(&self) -> bool { *self.is_leader_sender.borrow() } } @@ -842,7 +842,7 @@ mod tests { loop { receiver.changed().await.unwrap(); if *receiver.borrow() { - assert!(sql_election_client.is_leader().await); + assert!(sql_election_client.is_leader()); break; } } @@ -874,7 +874,7 @@ mod tests { let mut is_leaders = vec![]; for client in clients { - is_leaders.push(client.is_leader().await); + is_leaders.push(client.is_leader()); } assert!(is_leaders.iter().filter(|&x| *x).count() <= 1); diff --git a/src/meta/src/rpc/metrics.rs b/src/meta/src/rpc/metrics.rs index 28520720e98fe..618ac9c26436c 100644 --- a/src/meta/src/rpc/metrics.rs +++ b/src/meta/src/rpc/metrics.rs @@ -717,7 +717,7 @@ impl Default for MetaMetrics { pub fn start_worker_info_monitor( metadata_manager: MetadataManager, - election_client: Option, + election_client: ElectionClientRef, interval: Duration, meta_metrics: Arc, ) -> (JoinHandle<()>, Sender<()>) { @@ -754,9 +754,7 @@ pub fn start_worker_info_monitor( .with_label_values(&[(worker_type.as_str_name())]) .set(worker_num as i64); } - if let Some(client) = &election_client - && let Ok(meta_members) = client.get_members().await - { + if let Ok(meta_members) = election_client.get_members().await { meta_metrics .worker_num .with_label_values(&[WorkerType::Meta.as_str_name()]) diff --git a/src/tests/compaction_test/src/compaction_test_runner.rs b/src/tests/compaction_test/src/compaction_test_runner.rs index 195715e130877..de5e747624098 100644 --- a/src/tests/compaction_test/src/compaction_test_runner.rs +++ b/src/tests/compaction_test/src/compaction_test_runner.rs @@ -154,7 +154,7 @@ pub async fn start_meta_node(listen_addr: String, state_store: String, config_pa "enable_compaction_deterministic should be set" ); - risingwave_meta_node::start(meta_opts).await + risingwave_meta_node::start(meta_opts, CancellationToken::new() /* dummy */).await } async fn start_compactor_node( diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index c733288757ba2..8e2ffece2fed9 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -455,7 +455,12 @@ impl Cluster { .create_node() .name(format!("meta-{i}")) .ip([192, 168, 1, i as u8].into()) - .init(move || risingwave_meta_node::start(opts.clone())) + .init(move || { + risingwave_meta_node::start( + opts.clone(), + CancellationToken::new(), // dummy + ) + }) .build(); }