From 381ffab9e75601a3c2b3fd6818ae664645fc679d Mon Sep 17 00:00:00 2001 From: WenyXu Date: Sun, 17 Sep 2023 14:02:05 +0000 Subject: [PATCH] feat: start services after first heartbeat response processed --- src/datanode/src/datanode.rs | 6 +++++- src/datanode/src/heartbeat.rs | 15 ++++++++++++--- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 2393cd99a196..32d94f0927eb 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -43,6 +43,7 @@ use store_api::region_engine::RegionEngineRef; use store_api::region_request::{RegionOpenRequest, RegionRequest}; use store_api::storage::RegionId; use tokio::fs; +use tokio::sync::Notify; use crate::config::{DatanodeOptions, RegionEngineConfig}; use crate::error::{ @@ -86,7 +87,10 @@ impl Datanode { if let Some(task) = &self.heartbeat_task { // Safety: The event_receiver must exist. let receiver = self.region_event_receiver.take().unwrap(); - task.start(receiver).await?; + let notify = Arc::new(Notify::new()); + task.start(receiver, notify.clone()).await?; + // Waits for first heartbeat response processed. + notify.notified().await; } Ok(()) } diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 39e4310f9bd0..2fb2d5bbbe9e 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -28,7 +28,7 @@ use common_telemetry::{debug, error, info, trace, warn}; use meta_client::client::{HeartbeatSender, MetaClient, MetaClientBuilder}; use meta_client::MetaClientOptions; use snafu::ResultExt; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, Notify}; use tokio::time::Instant; use self::handler::RegionHeartbeatResponseHandler; @@ -96,6 +96,7 @@ impl HeartbeatTask { running: Arc, handler_executor: HeartbeatResponseHandlerExecutorRef, mailbox: MailboxRef, + mut notify: Option>, ) -> Result { let client_id = meta_client.id(); @@ -111,11 +112,13 @@ impl HeartbeatTask { if let Some(msg) = res.mailbox_message.as_ref() { info!("Received mailbox message: {msg:?}, meta_client id: {client_id:?}"); } - let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), res); if let Err(e) = Self::handle_response(ctx, handler_executor.clone()).await { error!(e; "Error while handling heartbeat response"); } + if let Some(notify) = notify.take() { + notify.notify_one(); + } if !running.load(Ordering::Acquire) { info!("Heartbeat task shutdown"); } @@ -137,7 +140,11 @@ impl HeartbeatTask { } /// Start heartbeat task, spawn background task. - pub async fn start(&self, mut event_receiver: RegionServerEventReceiver) -> Result<()> { + pub async fn start( + &self, + mut event_receiver: RegionServerEventReceiver, + notify: Arc, + ) -> Result<()> { let running = self.running.clone(); if running .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) @@ -167,6 +174,7 @@ impl HeartbeatTask { running.clone(), handler_executor.clone(), mailbox.clone(), + Some(notify), ) .await?; @@ -256,6 +264,7 @@ impl HeartbeatTask { running.clone(), handler_executor.clone(), mailbox.clone(), + None, ) .await {