Skip to content

Commit

Permalink
feat: heartbeat request with header (GreptimeTeam#2412)
Browse files Browse the repository at this point in the history
* feat: heartbeat request with header

* chore: frontend send heartbeat with a longer interval
  • Loading branch information
fengjiachun authored and paomian committed Oct 19, 2023
1 parent 41b44f3 commit af888c8
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/datanode/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ impl Default for DatanodeOptions {
storage: StorageConfig::default(),
region_engine: vec![RegionEngineConfig::Mito(MitoConfig::default())],
logging: LoggingOptions::default(),
heartbeat: HeartbeatOptions::default(),
heartbeat: HeartbeatOptions::datanode_default(),
enable_telemetry: true,
}
}
Expand Down
20 changes: 15 additions & 5 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;

use api::v1::meta::{HeartbeatRequest, Peer, RegionStat, Role};
use api::v1::meta::{HeartbeatRequest, Peer, RegionStat, RequestHeader, Role};
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::{
Expand Down Expand Up @@ -169,7 +169,17 @@ impl HeartbeatTask {
)
.await?;

let req_header = Some(RequestHeader {
member_id: node_id,
role: Role::Datanode as i32,
..Default::default()
});
let self_peer = Some(Peer {
addr: addr.clone(),
..Default::default()
});
let epoch = self.region_alive_keeper.epoch();

common_runtime::spawn_bg(async move {
let sleep = tokio::time::sleep(Duration::from_millis(0));
tokio::pin!(sleep);
Expand All @@ -184,9 +194,9 @@ impl HeartbeatTask {
if let Some(message) = message {
match outgoing_message_to_mailbox_message(message) {
Ok(message) => {
let peer = Some(Peer { id: node_id, addr: addr.clone() });
let req = HeartbeatRequest {
peer,
header: req_header.clone(),
peer: self_peer.clone(),
mailbox_message: Some(message),
..Default::default()
};
Expand All @@ -202,12 +212,12 @@ impl HeartbeatTask {
}
}
_ = &mut sleep => {
let peer = Some(Peer { id: node_id, addr: addr.clone() });
let region_stats = Self::load_region_stats(&region_server_clone).await;
let now = Instant::now();
let duration_since_epoch = (now - epoch).as_millis() as u64;
let req = HeartbeatRequest {
peer,
header: req_header.clone(),
peer: self_peer.clone(),
region_stats,
duration_since_epoch,
node_epoch,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Default for FrontendOptions {
Self {
mode: Mode::Standalone,
node_id: None,
heartbeat: HeartbeatOptions::default(),
heartbeat: HeartbeatOptions::frontend_default(),
http_options: HttpOptions::default(),
grpc_options: GrpcOptions::default(),
mysql_options: MysqlOptions::default(),
Expand Down
22 changes: 16 additions & 6 deletions src/frontend/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::sync::Arc;

use api::v1::meta::HeartbeatRequest;
use api::v1::meta::{HeartbeatRequest, RequestHeader, Role};
use common_meta::heartbeat::handler::{
HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
};
Expand Down Expand Up @@ -44,13 +44,13 @@ pub struct HeartbeatTask {
impl HeartbeatTask {
pub fn new(
meta_client: Arc<MetaClient>,
heartbeat: HeartbeatOptions,
heartbeat_opts: HeartbeatOptions,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
) -> Self {
HeartbeatTask {
meta_client,
report_interval: heartbeat.interval_millis,
retry_interval: heartbeat.retry_interval_millis,
report_interval: heartbeat_opts.interval_millis,
retry_interval: heartbeat_opts.retry_interval_millis,
resp_handler_executor,
}
}
Expand Down Expand Up @@ -109,7 +109,12 @@ impl HeartbeatTask {
) {
let report_interval = self.report_interval;

let _handle = common_runtime::spawn_bg(async move {
let req_header = Some(RequestHeader {
role: Role::Frontend as i32,
..Default::default()
});

common_runtime::spawn_bg(async move {
let sleep = tokio::time::sleep(Duration::from_millis(0));
tokio::pin!(sleep);

Expand All @@ -120,6 +125,7 @@ impl HeartbeatTask {
match outgoing_message_to_mailbox_message(message) {
Ok(message) => {
let req = HeartbeatRequest {
header: req_header.clone(),
mailbox_message: Some(message),
..Default::default()
};
Expand All @@ -137,7 +143,11 @@ impl HeartbeatTask {
}
_ = &mut sleep => {
sleep.as_mut().reset(Instant::now() + Duration::from_millis(report_interval));
Some(HeartbeatRequest::default())
let req = HeartbeatRequest {
header: req_header.clone(),
..Default::default()
};
Some(req)
}
};

Expand Down
4 changes: 3 additions & 1 deletion src/meta-srv/src/handler/node_stat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ impl TryFrom<HeartbeatRequest> for Stat {
Ok(Self {
timestamp_millis: time_util::current_time_millis(),
cluster_id: header.cluster_id,
id: peer.id,
// datanode id
id: header.member_id,
// datanode address
addr: peer.addr,
rcus: region_stats.iter().map(|s| s.rcus).sum(),
wcus: region_stats.iter().map(|s| s.wcus).sum(),
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/handler/response_header_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ pub struct ResponseHeaderHandler;

#[async_trait::async_trait]
impl HeartbeatHandler for ResponseHeaderHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Datanode
fn is_acceptable(&self, _role: Role) -> bool {
true
}

async fn handle(
Expand Down
20 changes: 18 additions & 2 deletions src/servers/src/heartbeat_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,34 @@

use serde::{Deserialize, Serialize};

pub const HEARTBEAT_INTERVAL_MILLIS: u64 = 5000;

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct HeartbeatOptions {
pub interval_millis: u64,
pub retry_interval_millis: u64,
}

impl HeartbeatOptions {
pub fn datanode_default() -> Self {
Default::default()
}

pub fn frontend_default() -> Self {
Self {
// Frontend can send heartbeat with a longer interval.
interval_millis: HEARTBEAT_INTERVAL_MILLIS * 10,
retry_interval_millis: HEARTBEAT_INTERVAL_MILLIS,
}
}
}

impl Default for HeartbeatOptions {
fn default() -> Self {
Self {
interval_millis: 5000,
retry_interval_millis: 5000,
interval_millis: HEARTBEAT_INTERVAL_MILLIS,
retry_interval_millis: HEARTBEAT_INTERVAL_MILLIS,
}
}
}

0 comments on commit af888c8

Please sign in to comment.