diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 9bbee69a95f3..f85edf87c80b 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -71,6 +71,7 @@ pub struct Datanode { region_event_receiver: Option, region_server: RegionServer, greptimedb_telemetry_task: Arc, + coordinated_notifier: Option>, } impl Datanode { @@ -78,6 +79,7 @@ impl Datanode { info!("Starting datanode instance..."); self.start_heartbeat().await?; + self.wait_coordinated().await; let _ = self.greptimedb_telemetry_task.start(); self.start_services().await @@ -88,22 +90,24 @@ impl Datanode { // Safety: The event_receiver must exist. let receiver = self.region_event_receiver.take().unwrap(); - if let Some(notify) = { - let notify = if self.opts.coordination { - Some(Arc::new(Notify::new())) - } else { - None - }; - task.start(receiver, notify.clone()).await?; - notify - } { - // Waits for first heartbeat response processed. - notify.notified().await; - } + self.coordinated_notifier = if self.opts.coordination { + Some(Arc::new(Notify::new())) + } else { + None + }; + task.start(receiver, self.coordinated_notifier.clone()) + .await?; } Ok(()) } + /// If `coordinated_notifier` exists, it waits for all regions to be coordinated. + pub async fn wait_coordinated(&mut self) { + if let Some(notifier) = self.coordinated_notifier.take() { + notifier.notified().await; + } + } + /// Start services of datanode. This method call will block until services are shutdown. pub async fn start_services(&mut self) -> Result<()> { if let Some(service) = self.services.as_mut() {