diff --git a/config/datanode.example.toml b/config/datanode.example.toml index c1baa7eb9217..808cc855d5a1 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -8,6 +8,9 @@ rpc_addr = "127.0.0.1:3001" rpc_hostname = "127.0.0.1" # The number of gRPC server worker threads, 8 by default. rpc_runtime_size = 8 +# Start services after regions are coordinated. +# It will block the datanode start if it can't receive the heartbeat from metasrv. +coordination = false [heartbeat] # Interval for sending heartbeat messages to the Metasrv in milliseconds, 5000 by default. diff --git a/src/datanode/src/alive_keeper.rs b/src/datanode/src/alive_keeper.rs index 5f5fe9be1395..aed0abd5bd2a 100644 --- a/src/datanode/src/alive_keeper.rs +++ b/src/datanode/src/alive_keeper.rs @@ -34,6 +34,8 @@ use tokio::sync::{mpsc, Mutex}; use tokio::task::JoinHandle; use tokio::time::{Duration, Instant}; +use crate::error::{self, Result}; +use crate::event_listener::{RegionServerEvent, RegionServerEventReceiver}; use crate::region_server::RegionServer; const MAX_CLOSE_RETRY_TIMES: usize = 10; @@ -54,7 +56,7 @@ pub struct RegionAliveKeeper { region_server: RegionServer, tasks: Arc>>>, heartbeat_interval_millis: u64, - started: AtomicBool, + started: Arc, /// The epoch when [RegionAliveKeeper] is created. It's used to get a monotonically non-decreasing /// elapsed time when submitting heartbeats to Metasrv (because [Instant] is monotonically @@ -69,7 +71,7 @@ impl RegionAliveKeeper { region_server, tasks: Arc::new(Mutex::new(HashMap::new())), heartbeat_interval_millis, - started: AtomicBool::new(false), + started: Arc::new(AtomicBool::new(false)), epoch: Instant::now(), } } @@ -141,17 +143,72 @@ impl RegionAliveKeeper { deadline } - pub async fn start(&self) { + pub async fn start( + self: &Arc, + event_receiver: Option, + ) -> Result<()> { + self.started.store(true, Ordering::Relaxed); + + if let Some(mut event_receiver) = event_receiver { + let keeper = self.clone(); + // Initializers region alive keeper. + // It makes sure all opened regions are registered to `RegionAliveKeeper.` + loop { + match event_receiver.0.try_recv() { + Ok(RegionServerEvent::Registered(region_id)) => { + keeper.register_region(region_id).await; + } + Ok(RegionServerEvent::Deregistered(region_id)) => { + keeper.deregister_region(region_id).await; + } + Err(mpsc::error::TryRecvError::Disconnected) => { + return error::UnexpectedSnafu { + violated: "RegionServerEventSender closed", + } + .fail() + } + Err(mpsc::error::TryRecvError::Empty) => { + break; + } + } + } + let running = self.started.clone(); + + // Watches changes + common_runtime::spawn_bg(async move { + loop { + if !running.load(Ordering::Relaxed) { + info!("RegionAliveKeeper stopped! Quits the watch loop!"); + break; + } + + match event_receiver.0.recv().await { + Some(RegionServerEvent::Registered(region_id)) => { + keeper.register_region(region_id).await; + } + Some(RegionServerEvent::Deregistered(region_id)) => { + keeper.deregister_region(region_id).await; + } + None => { + info!("RegionServerEventSender closed! Quits the watch loop!"); + break; + } + } + } + }); + } + let tasks = self.tasks.lock().await; for task in tasks.values() { task.start(self.heartbeat_interval_millis).await; } - self.started.store(true, Ordering::Relaxed); info!( "RegionAliveKeeper is started with region {:?}", tasks.keys().map(|x| x.to_string()).collect::>(), ); + + Ok(()) } pub fn epoch(&self) -> Instant { @@ -383,14 +440,14 @@ mod test { #[tokio::test(flavor = "multi_thread")] async fn region_alive_keeper() { let region_server = mock_region_server(); - let alive_keeper = RegionAliveKeeper::new(region_server, 300); + let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server, 300)); let region_id = RegionId::new(1, 2); // register a region before starting alive_keeper.register_region(region_id).await; assert!(alive_keeper.find_handle(region_id).await.is_some()); - alive_keeper.start().await; + alive_keeper.start(None).await.unwrap(); // started alive keeper should assign deadline to this region let deadline = alive_keeper.deadline(region_id).await.unwrap(); diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 0b85eb2e765c..9940224e65d9 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -319,6 +319,7 @@ impl From<&DatanodeOptions> for StorageEngineConfig { pub struct DatanodeOptions { pub mode: Mode, pub node_id: Option, + pub coordination: bool, pub rpc_addr: String, pub rpc_hostname: Option, pub rpc_runtime_size: usize, @@ -338,6 +339,7 @@ impl Default for DatanodeOptions { Self { mode: Mode::Standalone, node_id: None, + coordination: false, rpc_addr: "127.0.0.1:3001".to_string(), rpc_hostname: None, rpc_runtime_size: 8, diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 2393cd99a196..4bba303ea52d 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -43,6 +43,7 @@ use store_api::region_engine::RegionEngineRef; use store_api::region_request::{RegionOpenRequest, RegionRequest}; use store_api::storage::RegionId; use tokio::fs; +use tokio::sync::Notify; use crate::config::{DatanodeOptions, RegionEngineConfig}; use crate::error::{ @@ -70,6 +71,7 @@ pub struct Datanode { region_event_receiver: Option, region_server: RegionServer, greptimedb_telemetry_task: Arc, + coordinated_notifier: Option>, } impl Datanode { @@ -77,6 +79,7 @@ impl Datanode { info!("Starting datanode instance..."); self.start_heartbeat().await?; + self.wait_coordinated().await; let _ = self.greptimedb_telemetry_task.start(); self.start_services().await @@ -86,11 +89,20 @@ impl Datanode { if let Some(task) = &self.heartbeat_task { // Safety: The event_receiver must exist. let receiver = self.region_event_receiver.take().unwrap(); - task.start(receiver).await?; + + task.start(receiver, self.coordinated_notifier.clone()) + .await?; } Ok(()) } + /// If `coordinated_notifier` exists, it waits for all regions to be coordinated. + pub async fn wait_coordinated(&mut self) { + if let Some(notifier) = self.coordinated_notifier.take() { + notifier.notified().await; + } + } + /// Start services of datanode. This method call will block until services are shutdown. pub async fn start_services(&mut self) -> Result<()> { if let Some(service) = self.services.as_mut() { @@ -236,6 +248,12 @@ impl DatanodeBuilder { ) .await; + let coordinated_notifier = if self.opts.coordination && matches!(mode, Mode::Distributed) { + Some(Arc::new(Notify::new())) + } else { + None + }; + Ok(Datanode { opts: self.opts, services, @@ -243,6 +261,7 @@ impl DatanodeBuilder { region_server, greptimedb_telemetry_task, region_event_receiver, + coordinated_notifier, }) } diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 39e4310f9bd0..2b7cb04c5cbc 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -28,14 +28,14 @@ use common_telemetry::{debug, error, info, trace, warn}; use meta_client::client::{HeartbeatSender, MetaClient, MetaClientBuilder}; use meta_client::MetaClientOptions; use snafu::ResultExt; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, Notify}; use tokio::time::Instant; use self::handler::RegionHeartbeatResponseHandler; use crate::alive_keeper::RegionAliveKeeper; use crate::config::DatanodeOptions; use crate::error::{self, MetaClientInitSnafu, Result}; -use crate::event_listener::{RegionServerEvent, RegionServerEventReceiver}; +use crate::event_listener::RegionServerEventReceiver; use crate::region_server::RegionServer; pub(crate) mod handler; @@ -96,6 +96,7 @@ impl HeartbeatTask { running: Arc, handler_executor: HeartbeatResponseHandlerExecutorRef, mailbox: MailboxRef, + mut notify: Option>, ) -> Result { let client_id = meta_client.id(); @@ -111,11 +112,13 @@ impl HeartbeatTask { if let Some(msg) = res.mailbox_message.as_ref() { info!("Received mailbox message: {msg:?}, meta_client id: {client_id:?}"); } - let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), res); if let Err(e) = Self::handle_response(ctx, handler_executor.clone()).await { error!(e; "Error while handling heartbeat response"); } + if let Some(notify) = notify.take() { + notify.notify_one(); + } if !running.load(Ordering::Acquire) { info!("Heartbeat task shutdown"); } @@ -137,7 +140,11 @@ impl HeartbeatTask { } /// Start heartbeat task, spawn background task. - pub async fn start(&self, mut event_receiver: RegionServerEventReceiver) -> Result<()> { + pub async fn start( + &self, + event_receiver: RegionServerEventReceiver, + notify: Option>, + ) -> Result<()> { let running = self.running.clone(); if running .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) @@ -152,8 +159,6 @@ impl HeartbeatTask { let addr = resolve_addr(&self.server_addr, &self.server_hostname); info!("Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}."); - self.region_alive_keeper.start().await; - let meta_client = self.meta_client.clone(); let region_server_clone = self.region_server.clone(); @@ -167,6 +172,7 @@ impl HeartbeatTask { running.clone(), handler_executor.clone(), mailbox.clone(), + notify, ) .await?; @@ -176,31 +182,7 @@ impl HeartbeatTask { }); let epoch = self.region_alive_keeper.epoch(); - let keeper = self.region_alive_keeper.clone(); - - common_runtime::spawn_bg(async move { - loop { - if !running.load(Ordering::Relaxed) { - info!("shutdown heartbeat task"); - break; - } - - match event_receiver.0.recv().await { - Some(RegionServerEvent::Registered(region_id)) => { - keeper.register_region(region_id).await; - } - Some(RegionServerEvent::Deregistered(region_id)) => { - keeper.deregister_region(region_id).await; - } - None => { - info!("region server event sender closed!"); - break; - } - } - } - }); - - let running = self.running.clone(); + self.region_alive_keeper.start(Some(event_receiver)).await?; common_runtime::spawn_bg(async move { let sleep = tokio::time::sleep(Duration::from_millis(0)); @@ -256,6 +238,7 @@ impl HeartbeatTask { running.clone(), handler_executor.clone(), mailbox.clone(), + None, ) .await { diff --git a/tests/conf/datanode-test.toml.template b/tests/conf/datanode-test.toml.template index 6280ad48b430..4a692e97423c 100644 --- a/tests/conf/datanode-test.toml.template +++ b/tests/conf/datanode-test.toml.template @@ -3,6 +3,7 @@ mode = 'distributed' rpc_addr = '127.0.0.1:4100' rpc_hostname = '127.0.0.1' rpc_runtime_size = 8 +coordination = true [wal] file_size = '1GB'