Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: start services after first heartbeat response processed #2424

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ rpc_addr = "127.0.0.1:3001"
rpc_hostname = "127.0.0.1"
# The number of gRPC server worker threads, 8 by default.
rpc_runtime_size = 8
# Start services after regions are coordinated.
# It will block the datanode start if it can't receive the heartbeat from metasrv.
coordination = false

[heartbeat]
# Interval for sending heartbeat messages to the Metasrv in milliseconds, 5000 by default.
Expand Down
69 changes: 63 additions & 6 deletions src/datanode/src/alive_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ use tokio::sync::{mpsc, Mutex};
use tokio::task::JoinHandle;
use tokio::time::{Duration, Instant};

use crate::error::{self, Result};
use crate::event_listener::{RegionServerEvent, RegionServerEventReceiver};
use crate::region_server::RegionServer;

const MAX_CLOSE_RETRY_TIMES: usize = 10;
Expand All @@ -54,7 +56,7 @@ pub struct RegionAliveKeeper {
region_server: RegionServer,
tasks: Arc<Mutex<HashMap<RegionId, Arc<CountdownTaskHandle>>>>,
heartbeat_interval_millis: u64,
started: AtomicBool,
started: Arc<AtomicBool>,

/// The epoch when [RegionAliveKeeper] is created. It's used to get a monotonically non-decreasing
/// elapsed time when submitting heartbeats to Metasrv (because [Instant] is monotonically
Expand All @@ -69,7 +71,7 @@ impl RegionAliveKeeper {
region_server,
tasks: Arc::new(Mutex::new(HashMap::new())),
heartbeat_interval_millis,
started: AtomicBool::new(false),
started: Arc::new(AtomicBool::new(false)),
epoch: Instant::now(),
}
}
Expand Down Expand Up @@ -141,17 +143,72 @@ impl RegionAliveKeeper {
deadline
}

pub async fn start(&self) {
pub async fn start(
self: &Arc<Self>,
event_receiver: Option<RegionServerEventReceiver>,
) -> Result<()> {
self.started.store(true, Ordering::Relaxed);

if let Some(mut event_receiver) = event_receiver {
let keeper = self.clone();
// Initializers region alive keeper.
// It makes sure all opened regions are registered to `RegionAliveKeeper.`
loop {
match event_receiver.0.try_recv() {
Ok(RegionServerEvent::Registered(region_id)) => {
keeper.register_region(region_id).await;
}
Ok(RegionServerEvent::Deregistered(region_id)) => {
keeper.deregister_region(region_id).await;
}
Err(mpsc::error::TryRecvError::Disconnected) => {
return error::UnexpectedSnafu {
violated: "RegionServerEventSender closed",
}
.fail()
}
Err(mpsc::error::TryRecvError::Empty) => {
break;
}
}
}
let running = self.started.clone();

// Watches changes
common_runtime::spawn_bg(async move {
loop {
if !running.load(Ordering::Relaxed) {
info!("RegionAliveKeeper stopped! Quits the watch loop!");
break;
}

match event_receiver.0.recv().await {
Some(RegionServerEvent::Registered(region_id)) => {
keeper.register_region(region_id).await;
}
Some(RegionServerEvent::Deregistered(region_id)) => {
keeper.deregister_region(region_id).await;
}
None => {
info!("RegionServerEventSender closed! Quits the watch loop!");
break;
}
}
}
});
}

let tasks = self.tasks.lock().await;
for task in tasks.values() {
task.start(self.heartbeat_interval_millis).await;
}
self.started.store(true, Ordering::Relaxed);

info!(
"RegionAliveKeeper is started with region {:?}",
tasks.keys().map(|x| x.to_string()).collect::<Vec<_>>(),
);

Ok(())
}

pub fn epoch(&self) -> Instant {
Expand Down Expand Up @@ -383,14 +440,14 @@ mod test {
#[tokio::test(flavor = "multi_thread")]
async fn region_alive_keeper() {
let region_server = mock_region_server();
let alive_keeper = RegionAliveKeeper::new(region_server, 300);
let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server, 300));
let region_id = RegionId::new(1, 2);

// register a region before starting
alive_keeper.register_region(region_id).await;
assert!(alive_keeper.find_handle(region_id).await.is_some());

alive_keeper.start().await;
alive_keeper.start(None).await.unwrap();

// started alive keeper should assign deadline to this region
let deadline = alive_keeper.deadline(region_id).await.unwrap();
Expand Down
2 changes: 2 additions & 0 deletions src/datanode/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ impl From<&DatanodeOptions> for StorageEngineConfig {
pub struct DatanodeOptions {
pub mode: Mode,
pub node_id: Option<u64>,
pub coordination: bool,
pub rpc_addr: String,
pub rpc_hostname: Option<String>,
pub rpc_runtime_size: usize,
Expand All @@ -338,6 +339,7 @@ impl Default for DatanodeOptions {
Self {
mode: Mode::Standalone,
node_id: None,
coordination: false,
rpc_addr: "127.0.0.1:3001".to_string(),
rpc_hostname: None,
rpc_runtime_size: 8,
Expand Down
21 changes: 20 additions & 1 deletion src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use store_api::region_engine::RegionEngineRef;
use store_api::region_request::{RegionOpenRequest, RegionRequest};
use store_api::storage::RegionId;
use tokio::fs;
use tokio::sync::Notify;

use crate::config::{DatanodeOptions, RegionEngineConfig};
use crate::error::{
Expand Down Expand Up @@ -70,13 +71,15 @@ pub struct Datanode {
region_event_receiver: Option<RegionServerEventReceiver>,
region_server: RegionServer,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
coordinated_notifier: Option<Arc<Notify>>,
}

impl Datanode {
pub async fn start(&mut self) -> Result<()> {
info!("Starting datanode instance...");

self.start_heartbeat().await?;
self.wait_coordinated().await;

let _ = self.greptimedb_telemetry_task.start();
self.start_services().await
Expand All @@ -86,11 +89,20 @@ impl Datanode {
if let Some(task) = &self.heartbeat_task {
// Safety: The event_receiver must exist.
let receiver = self.region_event_receiver.take().unwrap();
task.start(receiver).await?;

task.start(receiver, self.coordinated_notifier.clone())
.await?;
}
Ok(())
}

/// If `coordinated_notifier` exists, it waits for all regions to be coordinated.
pub async fn wait_coordinated(&mut self) {
if let Some(notifier) = self.coordinated_notifier.take() {
notifier.notified().await;
}
}

/// Start services of datanode. This method call will block until services are shutdown.
pub async fn start_services(&mut self) -> Result<()> {
if let Some(service) = self.services.as_mut() {
Expand Down Expand Up @@ -236,13 +248,20 @@ impl DatanodeBuilder {
)
.await;

let coordinated_notifier = if self.opts.coordination && matches!(mode, Mode::Distributed) {
Some(Arc::new(Notify::new()))
} else {
None
};

Ok(Datanode {
opts: self.opts,
services,
heartbeat_task,
region_server,
greptimedb_telemetry_task,
region_event_receiver,
coordinated_notifier,
})
}

Expand Down
45 changes: 14 additions & 31 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ use common_telemetry::{debug, error, info, trace, warn};
use meta_client::client::{HeartbeatSender, MetaClient, MetaClientBuilder};
use meta_client::MetaClientOptions;
use snafu::ResultExt;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, Notify};
use tokio::time::Instant;

use self::handler::RegionHeartbeatResponseHandler;
use crate::alive_keeper::RegionAliveKeeper;
use crate::config::DatanodeOptions;
use crate::error::{self, MetaClientInitSnafu, Result};
use crate::event_listener::{RegionServerEvent, RegionServerEventReceiver};
use crate::event_listener::RegionServerEventReceiver;
use crate::region_server::RegionServer;

pub(crate) mod handler;
Expand Down Expand Up @@ -96,6 +96,7 @@ impl HeartbeatTask {
running: Arc<AtomicBool>,
handler_executor: HeartbeatResponseHandlerExecutorRef,
mailbox: MailboxRef,
mut notify: Option<Arc<Notify>>,
) -> Result<HeartbeatSender> {
let client_id = meta_client.id();

Expand All @@ -111,11 +112,13 @@ impl HeartbeatTask {
if let Some(msg) = res.mailbox_message.as_ref() {
info!("Received mailbox message: {msg:?}, meta_client id: {client_id:?}");
}

let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), res);
if let Err(e) = Self::handle_response(ctx, handler_executor.clone()).await {
error!(e; "Error while handling heartbeat response");
}
if let Some(notify) = notify.take() {
notify.notify_one();
}
if !running.load(Ordering::Acquire) {
info!("Heartbeat task shutdown");
}
Expand All @@ -137,7 +140,11 @@ impl HeartbeatTask {
}

/// Start heartbeat task, spawn background task.
pub async fn start(&self, mut event_receiver: RegionServerEventReceiver) -> Result<()> {
pub async fn start(
&self,
event_receiver: RegionServerEventReceiver,
notify: Option<Arc<Notify>>,
) -> Result<()> {
let running = self.running.clone();
if running
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
Expand All @@ -152,8 +159,6 @@ impl HeartbeatTask {
let addr = resolve_addr(&self.server_addr, &self.server_hostname);
info!("Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}.");

self.region_alive_keeper.start().await;

let meta_client = self.meta_client.clone();
let region_server_clone = self.region_server.clone();

Expand All @@ -167,6 +172,7 @@ impl HeartbeatTask {
running.clone(),
handler_executor.clone(),
mailbox.clone(),
notify,
)
.await?;

Expand All @@ -176,31 +182,7 @@ impl HeartbeatTask {
});
let epoch = self.region_alive_keeper.epoch();

let keeper = self.region_alive_keeper.clone();

common_runtime::spawn_bg(async move {
loop {
if !running.load(Ordering::Relaxed) {
info!("shutdown heartbeat task");
break;
}

match event_receiver.0.recv().await {
Some(RegionServerEvent::Registered(region_id)) => {
keeper.register_region(region_id).await;
}
Some(RegionServerEvent::Deregistered(region_id)) => {
keeper.deregister_region(region_id).await;
}
None => {
info!("region server event sender closed!");
break;
}
}
}
});

let running = self.running.clone();
self.region_alive_keeper.start(Some(event_receiver)).await?;
zhongzc marked this conversation as resolved.
Show resolved Hide resolved

common_runtime::spawn_bg(async move {
let sleep = tokio::time::sleep(Duration::from_millis(0));
Expand Down Expand Up @@ -256,6 +238,7 @@ impl HeartbeatTask {
running.clone(),
handler_executor.clone(),
mailbox.clone(),
None,
)
.await
{
Expand Down
1 change: 1 addition & 0 deletions tests/conf/datanode-test.toml.template
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mode = 'distributed'
rpc_addr = '127.0.0.1:4100'
rpc_hostname = '127.0.0.1'
rpc_runtime_size = 8
coordination = true

[wal]
file_size = '1GB'
Expand Down
Loading