diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 1b08166d7b0b..ca1465366219 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -16,7 +16,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use api::v1::meta::{HeartbeatRequest, Peer, RegionStat, Role}; +use api::v1::meta::{HeartbeatRequest, Peer, RegionStat, RequestHeader, Role}; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::{ @@ -169,6 +169,16 @@ impl HeartbeatTask { ) .await?; + let req_header = Some(RequestHeader { + member_id: node_id, + role: Role::Datanode as i32, + ..Default::default() + }); + let self_peer = Some(Peer { + addr: addr.clone(), + ..Default::default() + }); + let epoch = self.region_alive_keeper.epoch(); common_runtime::spawn_bg(async move { let sleep = tokio::time::sleep(Duration::from_millis(0)); @@ -184,9 +194,9 @@ impl HeartbeatTask { if let Some(message) = message { match outgoing_message_to_mailbox_message(message) { Ok(message) => { - let peer = Some(Peer { id: node_id, addr: addr.clone() }); let req = HeartbeatRequest { - peer, + header: req_header.clone(), + peer: self_peer.clone(), mailbox_message: Some(message), ..Default::default() }; @@ -202,12 +212,12 @@ impl HeartbeatTask { } } _ = &mut sleep => { - let peer = Some(Peer { id: node_id, addr: addr.clone() }); let region_stats = Self::load_region_stats(®ion_server_clone).await; let now = Instant::now(); let duration_since_epoch = (now - epoch).as_millis() as u64; let req = HeartbeatRequest { - peer, + header: req_header.clone(), + peer: self_peer.clone(), region_stats, duration_since_epoch, node_epoch, diff --git a/src/frontend/src/heartbeat.rs b/src/frontend/src/heartbeat.rs index 6991757b1eb8..8fdcad68dcb4 100644 --- a/src/frontend/src/heartbeat.rs +++ b/src/frontend/src/heartbeat.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use api::v1::meta::HeartbeatRequest; +use api::v1::meta::{HeartbeatRequest, RequestHeader, Role}; use common_meta::heartbeat::handler::{ HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef, }; @@ -109,6 +109,11 @@ impl HeartbeatTask { ) { let report_interval = self.report_interval; + let req_header = Some(RequestHeader { + role: Role::Frontend as i32, + ..Default::default() + }); + let _handle = common_runtime::spawn_bg(async move { let sleep = tokio::time::sleep(Duration::from_millis(0)); tokio::pin!(sleep); @@ -120,6 +125,7 @@ impl HeartbeatTask { match outgoing_message_to_mailbox_message(message) { Ok(message) => { let req = HeartbeatRequest { + header: req_header.clone(), mailbox_message: Some(message), ..Default::default() }; @@ -137,7 +143,11 @@ impl HeartbeatTask { } _ = &mut sleep => { sleep.as_mut().reset(Instant::now() + Duration::from_millis(report_interval)); - Some(HeartbeatRequest::default()) + let req = HeartbeatRequest { + header: req_header.clone(), + ..Default::default() + }; + Some(req) } }; diff --git a/src/meta-srv/src/handler/node_stat.rs b/src/meta-srv/src/handler/node_stat.rs index 298360bbb7eb..3c35f7d0f48e 100644 --- a/src/meta-srv/src/handler/node_stat.rs +++ b/src/meta-srv/src/handler/node_stat.rs @@ -103,7 +103,9 @@ impl TryFrom for Stat { Ok(Self { timestamp_millis: time_util::current_time_millis(), cluster_id: header.cluster_id, - id: peer.id, + // datanode id + id: header.member_id, + // datanode address addr: peer.addr, rcus: region_stats.iter().map(|s| s.rcus).sum(), wcus: region_stats.iter().map(|s| s.wcus).sum(), diff --git a/src/meta-srv/src/handler/response_header_handler.rs b/src/meta-srv/src/handler/response_header_handler.rs index 741596baaac0..2926fcc25c94 100644 --- a/src/meta-srv/src/handler/response_header_handler.rs +++ b/src/meta-srv/src/handler/response_header_handler.rs @@ -22,8 +22,8 @@ pub struct ResponseHeaderHandler; #[async_trait::async_trait] impl HeartbeatHandler for ResponseHeaderHandler { - fn is_acceptable(&self, role: Role) -> bool { - role == Role::Datanode + fn is_acceptable(&self, _role: Role) -> bool { + true } async fn handle(