From 381ffab9e75601a3c2b3fd6818ae664645fc679d Mon Sep 17 00:00:00 2001 From: WenyXu Date: Sun, 17 Sep 2023 14:02:05 +0000 Subject: [PATCH 1/6] feat: start services after first heartbeat response processed --- src/datanode/src/datanode.rs | 6 +++++- src/datanode/src/heartbeat.rs | 15 ++++++++++++--- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 2393cd99a196..32d94f0927eb 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -43,6 +43,7 @@ use store_api::region_engine::RegionEngineRef; use store_api::region_request::{RegionOpenRequest, RegionRequest}; use store_api::storage::RegionId; use tokio::fs; +use tokio::sync::Notify; use crate::config::{DatanodeOptions, RegionEngineConfig}; use crate::error::{ @@ -86,7 +87,10 @@ impl Datanode { if let Some(task) = &self.heartbeat_task { // Safety: The event_receiver must exist. let receiver = self.region_event_receiver.take().unwrap(); - task.start(receiver).await?; + let notify = Arc::new(Notify::new()); + task.start(receiver, notify.clone()).await?; + // Waits for first heartbeat response processed. + notify.notified().await; } Ok(()) } diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 39e4310f9bd0..2fb2d5bbbe9e 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -28,7 +28,7 @@ use common_telemetry::{debug, error, info, trace, warn}; use meta_client::client::{HeartbeatSender, MetaClient, MetaClientBuilder}; use meta_client::MetaClientOptions; use snafu::ResultExt; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, Notify}; use tokio::time::Instant; use self::handler::RegionHeartbeatResponseHandler; @@ -96,6 +96,7 @@ impl HeartbeatTask { running: Arc, handler_executor: HeartbeatResponseHandlerExecutorRef, mailbox: MailboxRef, + mut notify: Option>, ) -> Result { let client_id = meta_client.id(); @@ -111,11 +112,13 @@ impl HeartbeatTask { if let Some(msg) = res.mailbox_message.as_ref() { info!("Received mailbox message: {msg:?}, meta_client id: {client_id:?}"); } - let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), res); if let Err(e) = Self::handle_response(ctx, handler_executor.clone()).await { error!(e; "Error while handling heartbeat response"); } + if let Some(notify) = notify.take() { + notify.notify_one(); + } if !running.load(Ordering::Acquire) { info!("Heartbeat task shutdown"); } @@ -137,7 +140,11 @@ impl HeartbeatTask { } /// Start heartbeat task, spawn background task. - pub async fn start(&self, mut event_receiver: RegionServerEventReceiver) -> Result<()> { + pub async fn start( + &self, + mut event_receiver: RegionServerEventReceiver, + notify: Arc, + ) -> Result<()> { let running = self.running.clone(); if running .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) @@ -167,6 +174,7 @@ impl HeartbeatTask { running.clone(), handler_executor.clone(), mailbox.clone(), + Some(notify), ) .await?; @@ -256,6 +264,7 @@ impl HeartbeatTask { running.clone(), handler_executor.clone(), mailbox.clone(), + None, ) .await { From 6eea7e911e514612fbac58c187f0e41463468fa7 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Sun, 17 Sep 2023 14:34:34 +0000 Subject: [PATCH 2/6] refactor: watch changes in RegionAliveKeeper --- src/datanode/src/alive_keeper.rs | 69 +++++++++++++++++++++++++++++--- src/datanode/src/heartbeat.rs | 32 ++------------- 2 files changed, 66 insertions(+), 35 deletions(-) diff --git a/src/datanode/src/alive_keeper.rs b/src/datanode/src/alive_keeper.rs index 5f5fe9be1395..aed0abd5bd2a 100644 --- a/src/datanode/src/alive_keeper.rs +++ b/src/datanode/src/alive_keeper.rs @@ -34,6 +34,8 @@ use tokio::sync::{mpsc, Mutex}; use tokio::task::JoinHandle; use tokio::time::{Duration, Instant}; +use crate::error::{self, Result}; +use crate::event_listener::{RegionServerEvent, RegionServerEventReceiver}; use crate::region_server::RegionServer; const MAX_CLOSE_RETRY_TIMES: usize = 10; @@ -54,7 +56,7 @@ pub struct RegionAliveKeeper { region_server: RegionServer, tasks: Arc>>>, heartbeat_interval_millis: u64, - started: AtomicBool, + started: Arc, /// The epoch when [RegionAliveKeeper] is created. It's used to get a monotonically non-decreasing /// elapsed time when submitting heartbeats to Metasrv (because [Instant] is monotonically @@ -69,7 +71,7 @@ impl RegionAliveKeeper { region_server, tasks: Arc::new(Mutex::new(HashMap::new())), heartbeat_interval_millis, - started: AtomicBool::new(false), + started: Arc::new(AtomicBool::new(false)), epoch: Instant::now(), } } @@ -141,17 +143,72 @@ impl RegionAliveKeeper { deadline } - pub async fn start(&self) { + pub async fn start( + self: &Arc, + event_receiver: Option, + ) -> Result<()> { + self.started.store(true, Ordering::Relaxed); + + if let Some(mut event_receiver) = event_receiver { + let keeper = self.clone(); + // Initializers region alive keeper. + // It makes sure all opened regions are registered to `RegionAliveKeeper.` + loop { + match event_receiver.0.try_recv() { + Ok(RegionServerEvent::Registered(region_id)) => { + keeper.register_region(region_id).await; + } + Ok(RegionServerEvent::Deregistered(region_id)) => { + keeper.deregister_region(region_id).await; + } + Err(mpsc::error::TryRecvError::Disconnected) => { + return error::UnexpectedSnafu { + violated: "RegionServerEventSender closed", + } + .fail() + } + Err(mpsc::error::TryRecvError::Empty) => { + break; + } + } + } + let running = self.started.clone(); + + // Watches changes + common_runtime::spawn_bg(async move { + loop { + if !running.load(Ordering::Relaxed) { + info!("RegionAliveKeeper stopped! Quits the watch loop!"); + break; + } + + match event_receiver.0.recv().await { + Some(RegionServerEvent::Registered(region_id)) => { + keeper.register_region(region_id).await; + } + Some(RegionServerEvent::Deregistered(region_id)) => { + keeper.deregister_region(region_id).await; + } + None => { + info!("RegionServerEventSender closed! Quits the watch loop!"); + break; + } + } + } + }); + } + let tasks = self.tasks.lock().await; for task in tasks.values() { task.start(self.heartbeat_interval_millis).await; } - self.started.store(true, Ordering::Relaxed); info!( "RegionAliveKeeper is started with region {:?}", tasks.keys().map(|x| x.to_string()).collect::>(), ); + + Ok(()) } pub fn epoch(&self) -> Instant { @@ -383,14 +440,14 @@ mod test { #[tokio::test(flavor = "multi_thread")] async fn region_alive_keeper() { let region_server = mock_region_server(); - let alive_keeper = RegionAliveKeeper::new(region_server, 300); + let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server, 300)); let region_id = RegionId::new(1, 2); // register a region before starting alive_keeper.register_region(region_id).await; assert!(alive_keeper.find_handle(region_id).await.is_some()); - alive_keeper.start().await; + alive_keeper.start(None).await.unwrap(); // started alive keeper should assign deadline to this region let deadline = alive_keeper.deadline(region_id).await.unwrap(); diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 2fb2d5bbbe9e..c2fa5820e436 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -35,7 +35,7 @@ use self::handler::RegionHeartbeatResponseHandler; use crate::alive_keeper::RegionAliveKeeper; use crate::config::DatanodeOptions; use crate::error::{self, MetaClientInitSnafu, Result}; -use crate::event_listener::{RegionServerEvent, RegionServerEventReceiver}; +use crate::event_listener::RegionServerEventReceiver; use crate::region_server::RegionServer; pub(crate) mod handler; @@ -142,7 +142,7 @@ impl HeartbeatTask { /// Start heartbeat task, spawn background task. pub async fn start( &self, - mut event_receiver: RegionServerEventReceiver, + event_receiver: RegionServerEventReceiver, notify: Arc, ) -> Result<()> { let running = self.running.clone(); @@ -159,8 +159,6 @@ impl HeartbeatTask { let addr = resolve_addr(&self.server_addr, &self.server_hostname); info!("Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}."); - self.region_alive_keeper.start().await; - let meta_client = self.meta_client.clone(); let region_server_clone = self.region_server.clone(); @@ -184,31 +182,7 @@ impl HeartbeatTask { }); let epoch = self.region_alive_keeper.epoch(); - let keeper = self.region_alive_keeper.clone(); - - common_runtime::spawn_bg(async move { - loop { - if !running.load(Ordering::Relaxed) { - info!("shutdown heartbeat task"); - break; - } - - match event_receiver.0.recv().await { - Some(RegionServerEvent::Registered(region_id)) => { - keeper.register_region(region_id).await; - } - Some(RegionServerEvent::Deregistered(region_id)) => { - keeper.deregister_region(region_id).await; - } - None => { - info!("region server event sender closed!"); - break; - } - } - } - }); - - let running = self.running.clone(); + self.region_alive_keeper.start(Some(event_receiver)).await?; common_runtime::spawn_bg(async move { let sleep = tokio::time::sleep(Duration::from_millis(0)); From 649e7e37a44ba82ba7e3f294a85b387461e8aee9 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Sun, 17 Sep 2023 15:09:43 +0000 Subject: [PATCH 3/6] feat: add coordination to DatanodeOptions --- config/datanode.example.toml | 3 +++ src/datanode/src/config.rs | 2 ++ src/datanode/src/datanode.rs | 17 +++++++++++++---- src/datanode/src/heartbeat.rs | 4 ++-- 4 files changed, 20 insertions(+), 6 deletions(-) diff --git a/config/datanode.example.toml b/config/datanode.example.toml index c1baa7eb9217..808cc855d5a1 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -8,6 +8,9 @@ rpc_addr = "127.0.0.1:3001" rpc_hostname = "127.0.0.1" # The number of gRPC server worker threads, 8 by default. rpc_runtime_size = 8 +# Start services after regions are coordinated. +# It will block the datanode start if it can't receive the heartbeat from metasrv. +coordination = false [heartbeat] # Interval for sending heartbeat messages to the Metasrv in milliseconds, 5000 by default. diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 0b85eb2e765c..9940224e65d9 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -319,6 +319,7 @@ impl From<&DatanodeOptions> for StorageEngineConfig { pub struct DatanodeOptions { pub mode: Mode, pub node_id: Option, + pub coordination: bool, pub rpc_addr: String, pub rpc_hostname: Option, pub rpc_runtime_size: usize, @@ -338,6 +339,7 @@ impl Default for DatanodeOptions { Self { mode: Mode::Standalone, node_id: None, + coordination: false, rpc_addr: "127.0.0.1:3001".to_string(), rpc_hostname: None, rpc_runtime_size: 8, diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 32d94f0927eb..9bbee69a95f3 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -87,10 +87,19 @@ impl Datanode { if let Some(task) = &self.heartbeat_task { // Safety: The event_receiver must exist. let receiver = self.region_event_receiver.take().unwrap(); - let notify = Arc::new(Notify::new()); - task.start(receiver, notify.clone()).await?; - // Waits for first heartbeat response processed. - notify.notified().await; + + if let Some(notify) = { + let notify = if self.opts.coordination { + Some(Arc::new(Notify::new())) + } else { + None + }; + task.start(receiver, notify.clone()).await?; + notify + } { + // Waits for first heartbeat response processed. + notify.notified().await; + } } Ok(()) } diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index c2fa5820e436..2b7cb04c5cbc 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -143,7 +143,7 @@ impl HeartbeatTask { pub async fn start( &self, event_receiver: RegionServerEventReceiver, - notify: Arc, + notify: Option>, ) -> Result<()> { let running = self.running.clone(); if running @@ -172,7 +172,7 @@ impl HeartbeatTask { running.clone(), handler_executor.clone(), mailbox.clone(), - Some(notify), + notify, ) .await?; From 7ca6aeb8c8aadfae00442e41a1e858d6143c9976 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 18 Sep 2023 04:54:15 +0000 Subject: [PATCH 4/6] chore: apply suggestions from CR --- src/datanode/src/datanode.rs | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 9bbee69a95f3..32d2e52a2ca3 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -71,6 +71,7 @@ pub struct Datanode { region_event_receiver: Option, region_server: RegionServer, greptimedb_telemetry_task: Arc, + coordinated_notifier: Option>, } impl Datanode { @@ -78,6 +79,7 @@ impl Datanode { info!("Starting datanode instance..."); self.start_heartbeat().await?; + self.wait_coordinated().await; let _ = self.greptimedb_telemetry_task.start(); self.start_services().await @@ -88,22 +90,19 @@ impl Datanode { // Safety: The event_receiver must exist. let receiver = self.region_event_receiver.take().unwrap(); - if let Some(notify) = { - let notify = if self.opts.coordination { - Some(Arc::new(Notify::new())) - } else { - None - }; - task.start(receiver, notify.clone()).await?; - notify - } { - // Waits for first heartbeat response processed. - notify.notified().await; - } + task.start(receiver, self.coordinated_notifier.clone()) + .await?; } Ok(()) } + /// If `coordinated_notifier` exists, it waits for all regions to be coordinated. + pub async fn wait_coordinated(&mut self) { + if let Some(notifier) = self.coordinated_notifier.take() { + notifier.notified().await; + } + } + /// Start services of datanode. This method call will block until services are shutdown. pub async fn start_services(&mut self) -> Result<()> { if let Some(service) = self.services.as_mut() { @@ -249,6 +248,12 @@ impl DatanodeBuilder { ) .await; + let coordinated_notifier = if self.opts.coordination { + Some(Arc::new(Notify::new())) + } else { + None + }; + Ok(Datanode { opts: self.opts, services, @@ -256,6 +261,7 @@ impl DatanodeBuilder { region_server, greptimedb_telemetry_task, region_event_receiver, + coordinated_notifier, }) } From 0d9db20e9ae0c2638ef0d770cfe2c0c8b00e81dc Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 18 Sep 2023 07:00:11 +0000 Subject: [PATCH 5/6] chore: apply suggestions from CR --- src/datanode/src/datanode.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 32d2e52a2ca3..4bba303ea52d 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -248,7 +248,7 @@ impl DatanodeBuilder { ) .await; - let coordinated_notifier = if self.opts.coordination { + let coordinated_notifier = if self.opts.coordination && matches!(mode, Mode::Distributed) { Some(Arc::new(Notify::new())) } else { None From f5d4a2dbef40347fd1fd90f3cea84207ae1c3199 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 18 Sep 2023 08:01:05 +0000 Subject: [PATCH 6/6] chore: enable coordination in sqlness --- tests/conf/datanode-test.toml.template | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/conf/datanode-test.toml.template b/tests/conf/datanode-test.toml.template index 6280ad48b430..4a692e97423c 100644 --- a/tests/conf/datanode-test.toml.template +++ b/tests/conf/datanode-test.toml.template @@ -3,6 +3,7 @@ mode = 'distributed' rpc_addr = '127.0.0.1:4100' rpc_hostname = '127.0.0.1' rpc_runtime_size = 8 +coordination = true [wal] file_size = '1GB'