Skip to content

Commit

Permalink
feat: start services after first heartbeat response processed
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Sep 17, 2023
1 parent 4a82926 commit 381ffab
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 4 deletions.
6 changes: 5 additions & 1 deletion src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use store_api::region_engine::RegionEngineRef;
use store_api::region_request::{RegionOpenRequest, RegionRequest};
use store_api::storage::RegionId;
use tokio::fs;
use tokio::sync::Notify;

use crate::config::{DatanodeOptions, RegionEngineConfig};
use crate::error::{
Expand Down Expand Up @@ -86,7 +87,10 @@ impl Datanode {
if let Some(task) = &self.heartbeat_task {
// Safety: The event_receiver must exist.
let receiver = self.region_event_receiver.take().unwrap();
task.start(receiver).await?;
let notify = Arc::new(Notify::new());
task.start(receiver, notify.clone()).await?;
// Waits for first heartbeat response processed.
notify.notified().await;
}
Ok(())
}
Expand Down
15 changes: 12 additions & 3 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use common_telemetry::{debug, error, info, trace, warn};
use meta_client::client::{HeartbeatSender, MetaClient, MetaClientBuilder};
use meta_client::MetaClientOptions;
use snafu::ResultExt;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, Notify};
use tokio::time::Instant;

use self::handler::RegionHeartbeatResponseHandler;
Expand Down Expand Up @@ -96,6 +96,7 @@ impl HeartbeatTask {
running: Arc<AtomicBool>,
handler_executor: HeartbeatResponseHandlerExecutorRef,
mailbox: MailboxRef,
mut notify: Option<Arc<Notify>>,
) -> Result<HeartbeatSender> {
let client_id = meta_client.id();

Expand All @@ -111,11 +112,13 @@ impl HeartbeatTask {
if let Some(msg) = res.mailbox_message.as_ref() {
info!("Received mailbox message: {msg:?}, meta_client id: {client_id:?}");
}

let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), res);
if let Err(e) = Self::handle_response(ctx, handler_executor.clone()).await {
error!(e; "Error while handling heartbeat response");
}
if let Some(notify) = notify.take() {
notify.notify_one();
}
if !running.load(Ordering::Acquire) {
info!("Heartbeat task shutdown");
}
Expand All @@ -137,7 +140,11 @@ impl HeartbeatTask {
}

/// Start heartbeat task, spawn background task.
pub async fn start(&self, mut event_receiver: RegionServerEventReceiver) -> Result<()> {
pub async fn start(
&self,
mut event_receiver: RegionServerEventReceiver,
notify: Arc<Notify>,
) -> Result<()> {
let running = self.running.clone();
if running
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
Expand Down Expand Up @@ -167,6 +174,7 @@ impl HeartbeatTask {
running.clone(),
handler_executor.clone(),
mailbox.clone(),
Some(notify),
)
.await?;

Expand Down Expand Up @@ -256,6 +264,7 @@ impl HeartbeatTask {
running.clone(),
handler_executor.clone(),
mailbox.clone(),
None,
)
.await
{
Expand Down

0 comments on commit 381ffab

Please sign in to comment.