diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 2f1ac2796aa6..6051ca4b030a 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -347,7 +347,7 @@ impl Default for DatanodeOptions { storage: StorageConfig::default(), region_engine: vec![RegionEngineConfig::Mito(MitoConfig::default())], logging: LoggingOptions::default(), - heartbeat: HeartbeatOptions::default(), + heartbeat: HeartbeatOptions::datanode_default(), enable_telemetry: true, } } diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 1b08166d7b0b..74dffcc240b5 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -16,7 +16,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use api::v1::meta::{HeartbeatRequest, Peer, RegionStat, Role}; +use api::v1::meta::{HeartbeatRequest, Peer, RegionStat, RequestHeader, Role}; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::{ @@ -169,7 +169,17 @@ impl HeartbeatTask { ) .await?; + let req_header = Some(RequestHeader { + member_id: node_id, + role: Role::Datanode as i32, + ..Default::default() + }); + let self_peer = Some(Peer { + addr: addr.clone(), + ..Default::default() + }); let epoch = self.region_alive_keeper.epoch(); + common_runtime::spawn_bg(async move { let sleep = tokio::time::sleep(Duration::from_millis(0)); tokio::pin!(sleep); @@ -184,9 +194,9 @@ impl HeartbeatTask { if let Some(message) = message { match outgoing_message_to_mailbox_message(message) { Ok(message) => { - let peer = Some(Peer { id: node_id, addr: addr.clone() }); let req = HeartbeatRequest { - peer, + header: req_header.clone(), + peer: self_peer.clone(), mailbox_message: Some(message), ..Default::default() }; @@ -202,12 +212,12 @@ impl HeartbeatTask { } } _ = &mut sleep => { - let peer = Some(Peer { id: node_id, addr: addr.clone() }); let region_stats = Self::load_region_stats(®ion_server_clone).await; let now = Instant::now(); let duration_since_epoch = (now - epoch).as_millis() as u64; let req = HeartbeatRequest { - peer, + header: req_header.clone(), + peer: self_peer.clone(), region_stats, duration_since_epoch, node_epoch, diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index ef40db631268..1a354e92e07c 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -48,7 +48,7 @@ impl Default for FrontendOptions { Self { mode: Mode::Standalone, node_id: None, - heartbeat: HeartbeatOptions::default(), + heartbeat: HeartbeatOptions::frontend_default(), http_options: HttpOptions::default(), grpc_options: GrpcOptions::default(), mysql_options: MysqlOptions::default(), diff --git a/src/frontend/src/heartbeat.rs b/src/frontend/src/heartbeat.rs index 6991757b1eb8..162442d80b1b 100644 --- a/src/frontend/src/heartbeat.rs +++ b/src/frontend/src/heartbeat.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use api::v1::meta::HeartbeatRequest; +use api::v1::meta::{HeartbeatRequest, RequestHeader, Role}; use common_meta::heartbeat::handler::{ HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef, }; @@ -44,13 +44,13 @@ pub struct HeartbeatTask { impl HeartbeatTask { pub fn new( meta_client: Arc, - heartbeat: HeartbeatOptions, + heartbeat_opts: HeartbeatOptions, resp_handler_executor: HeartbeatResponseHandlerExecutorRef, ) -> Self { HeartbeatTask { meta_client, - report_interval: heartbeat.interval_millis, - retry_interval: heartbeat.retry_interval_millis, + report_interval: heartbeat_opts.interval_millis, + retry_interval: heartbeat_opts.retry_interval_millis, resp_handler_executor, } } @@ -109,7 +109,12 @@ impl HeartbeatTask { ) { let report_interval = self.report_interval; - let _handle = common_runtime::spawn_bg(async move { + let req_header = Some(RequestHeader { + role: Role::Frontend as i32, + ..Default::default() + }); + + common_runtime::spawn_bg(async move { let sleep = tokio::time::sleep(Duration::from_millis(0)); tokio::pin!(sleep); @@ -120,6 +125,7 @@ impl HeartbeatTask { match outgoing_message_to_mailbox_message(message) { Ok(message) => { let req = HeartbeatRequest { + header: req_header.clone(), mailbox_message: Some(message), ..Default::default() }; @@ -137,7 +143,11 @@ impl HeartbeatTask { } _ = &mut sleep => { sleep.as_mut().reset(Instant::now() + Duration::from_millis(report_interval)); - Some(HeartbeatRequest::default()) + let req = HeartbeatRequest { + header: req_header.clone(), + ..Default::default() + }; + Some(req) } }; diff --git a/src/meta-srv/src/handler/node_stat.rs b/src/meta-srv/src/handler/node_stat.rs index 298360bbb7eb..3c35f7d0f48e 100644 --- a/src/meta-srv/src/handler/node_stat.rs +++ b/src/meta-srv/src/handler/node_stat.rs @@ -103,7 +103,9 @@ impl TryFrom for Stat { Ok(Self { timestamp_millis: time_util::current_time_millis(), cluster_id: header.cluster_id, - id: peer.id, + // datanode id + id: header.member_id, + // datanode address addr: peer.addr, rcus: region_stats.iter().map(|s| s.rcus).sum(), wcus: region_stats.iter().map(|s| s.wcus).sum(), diff --git a/src/meta-srv/src/handler/response_header_handler.rs b/src/meta-srv/src/handler/response_header_handler.rs index 741596baaac0..2926fcc25c94 100644 --- a/src/meta-srv/src/handler/response_header_handler.rs +++ b/src/meta-srv/src/handler/response_header_handler.rs @@ -22,8 +22,8 @@ pub struct ResponseHeaderHandler; #[async_trait::async_trait] impl HeartbeatHandler for ResponseHeaderHandler { - fn is_acceptable(&self, role: Role) -> bool { - role == Role::Datanode + fn is_acceptable(&self, _role: Role) -> bool { + true } async fn handle( diff --git a/src/servers/src/heartbeat_options.rs b/src/servers/src/heartbeat_options.rs index 02167fd69a9e..966cb75faf0f 100644 --- a/src/servers/src/heartbeat_options.rs +++ b/src/servers/src/heartbeat_options.rs @@ -14,6 +14,8 @@ use serde::{Deserialize, Serialize}; +pub const HEARTBEAT_INTERVAL_MILLIS: u64 = 5000; + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] pub struct HeartbeatOptions { @@ -21,11 +23,25 @@ pub struct HeartbeatOptions { pub retry_interval_millis: u64, } +impl HeartbeatOptions { + pub fn datanode_default() -> Self { + Default::default() + } + + pub fn frontend_default() -> Self { + Self { + // Frontend can send heartbeat with a longer interval. + interval_millis: HEARTBEAT_INTERVAL_MILLIS * 10, + retry_interval_millis: HEARTBEAT_INTERVAL_MILLIS, + } + } +} + impl Default for HeartbeatOptions { fn default() -> Self { Self { - interval_millis: 5000, - retry_interval_millis: 5000, + interval_millis: HEARTBEAT_INTERVAL_MILLIS, + retry_interval_millis: HEARTBEAT_INTERVAL_MILLIS, } } }