Skip to content

Commit

Permalink
chore: apply suggestions from CR
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Sep 18, 2023
1 parent 649e7e3 commit 23497d8
Showing 1 changed file with 16 additions and 12 deletions.
28 changes: 16 additions & 12 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,15 @@ pub struct Datanode {
region_event_receiver: Option<RegionServerEventReceiver>,
region_server: RegionServer,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
coordinated_notifier: Option<Arc<Notify>>,
}

impl Datanode {
pub async fn start(&mut self) -> Result<()> {
info!("Starting datanode instance...");

self.start_heartbeat().await?;
self.wait_coordinated().await;

let _ = self.greptimedb_telemetry_task.start();
self.start_services().await
Expand All @@ -88,22 +90,24 @@ impl Datanode {
// Safety: The event_receiver must exist.
let receiver = self.region_event_receiver.take().unwrap();

if let Some(notify) = {
let notify = if self.opts.coordination {
Some(Arc::new(Notify::new()))
} else {
None
};
task.start(receiver, notify.clone()).await?;
notify
} {
// Waits for first heartbeat response processed.
notify.notified().await;
}
self.coordinated_notifier = if self.opts.coordination {
Some(Arc::new(Notify::new()))
} else {
None
};
task.start(receiver, self.coordinated_notifier.clone())
.await?;
}
Ok(())
}

/// If `coordinated_notifier` exists, it waits for all regions to be coordinated.
pub async fn wait_coordinated(&mut self) {
if let Some(notifier) = self.coordinated_notifier.take() {
notifier.notified().await;
}
}

/// Start services of datanode. This method call will block until services are shutdown.
pub async fn start_services(&mut self) -> Result<()> {
if let Some(service) = self.services.as_mut() {
Expand Down

0 comments on commit 23497d8

Please sign in to comment.