From 589557e5aaee5ce254343fc6aae07ff0230def88 Mon Sep 17 00:00:00 2001 From: jiachun Date: Sat, 16 Sep 2023 19:31:47 +0800 Subject: [PATCH] fix: missing datanode id on keep lease --- src/datanode/src/heartbeat.rs | 11 ++--------- src/frontend/src/heartbeat.rs | 9 +-------- src/frontend/src/instance.rs | 3 ++- src/meta-srv/src/handler/keep_lease_handler.rs | 5 ++++- src/meta-srv/src/handler/node_stat.rs | 2 +- src/meta-srv/src/selector/load_based.rs | 4 ++-- 6 files changed, 12 insertions(+), 22 deletions(-) diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 74dffcc240b5..4a371eba66f7 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -16,7 +16,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use api::v1::meta::{HeartbeatRequest, Peer, RegionStat, RequestHeader, Role}; +use api::v1::meta::{HeartbeatRequest, Peer, RegionStat, Role}; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::{ @@ -169,14 +169,9 @@ impl HeartbeatTask { ) .await?; - let req_header = Some(RequestHeader { - member_id: node_id, - role: Role::Datanode as i32, - ..Default::default() - }); let self_peer = Some(Peer { + id: node_id, addr: addr.clone(), - ..Default::default() }); let epoch = self.region_alive_keeper.epoch(); @@ -195,7 +190,6 @@ impl HeartbeatTask { match outgoing_message_to_mailbox_message(message) { Ok(message) => { let req = HeartbeatRequest { - header: req_header.clone(), peer: self_peer.clone(), mailbox_message: Some(message), ..Default::default() @@ -216,7 +210,6 @@ impl HeartbeatTask { let now = Instant::now(); let duration_since_epoch = (now - epoch).as_millis() as u64; let req = HeartbeatRequest { - header: req_header.clone(), peer: self_peer.clone(), region_stats, duration_since_epoch, diff --git a/src/frontend/src/heartbeat.rs b/src/frontend/src/heartbeat.rs index 162442d80b1b..4f96e5da70e2 100644 --- a/src/frontend/src/heartbeat.rs +++ b/src/frontend/src/heartbeat.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use api::v1::meta::{HeartbeatRequest, RequestHeader, Role}; +use api::v1::meta::HeartbeatRequest; use common_meta::heartbeat::handler::{ HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef, }; @@ -109,11 +109,6 @@ impl HeartbeatTask { ) { let report_interval = self.report_interval; - let req_header = Some(RequestHeader { - role: Role::Frontend as i32, - ..Default::default() - }); - common_runtime::spawn_bg(async move { let sleep = tokio::time::sleep(Duration::from_millis(0)); tokio::pin!(sleep); @@ -125,7 +120,6 @@ impl HeartbeatTask { match outgoing_message_to_mailbox_message(message) { Ok(message) => { let req = HeartbeatRequest { - header: req_header.clone(), mailbox_message: Some(message), ..Default::default() }; @@ -144,7 +138,6 @@ impl HeartbeatTask { _ = &mut sleep => { sleep.as_mut().reset(Instant::now() + Duration::from_millis(report_interval)); let req = HeartbeatRequest { - header: req_header.clone(), ..Default::default() }; Some(req) diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index e59563b87b17..6c4a4c42b77e 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -245,7 +245,8 @@ impl Instance { let channel_manager = ChannelManager::with_config(channel_config); let ddl_channel_manager = ChannelManager::with_config(ddl_channel_config); - let mut meta_client = MetaClientBuilder::new(0, 0, Role::Frontend) + let cluster_id = 0; // TODO(jeremy): read from config + let mut meta_client = MetaClientBuilder::new(cluster_id, 0, Role::Frontend) .enable_router() .enable_store() .enable_heartbeat() diff --git a/src/meta-srv/src/handler/keep_lease_handler.rs b/src/meta-srv/src/handler/keep_lease_handler.rs index 9d607a1fa1cf..dc669aee1c5e 100644 --- a/src/meta-srv/src/handler/keep_lease_handler.rs +++ b/src/meta-srv/src/handler/keep_lease_handler.rs @@ -37,12 +37,15 @@ impl HeartbeatHandler for KeepLeaseHandler { _acc: &mut HeartbeatAccumulator, ) -> Result<()> { let HeartbeatRequest { header, peer, .. } = req; + let Some(header) = &header else { + return Ok(()); + }; let Some(peer) = &peer else { return Ok(()); }; let key = LeaseKey { - cluster_id: header.as_ref().map_or(0, |h| h.cluster_id), + cluster_id: header.cluster_id, node_id: peer.id, }; let value = LeaseValue { diff --git a/src/meta-srv/src/handler/node_stat.rs b/src/meta-srv/src/handler/node_stat.rs index 3c35f7d0f48e..d51c1c194dcd 100644 --- a/src/meta-srv/src/handler/node_stat.rs +++ b/src/meta-srv/src/handler/node_stat.rs @@ -104,7 +104,7 @@ impl TryFrom for Stat { timestamp_millis: time_util::current_time_millis(), cluster_id: header.cluster_id, // datanode id - id: header.member_id, + id: peer.id, // datanode address addr: peer.addr, rcus: region_stats.iter().map(|s| s.rcus).sum(), diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs index 5c612d538df6..ecafa97d1a5e 100644 --- a/src/meta-srv/src/selector/load_based.rs +++ b/src/meta-srv/src/selector/load_based.rs @@ -99,8 +99,8 @@ impl Selector for LoadBasedSelector { Ok(tuples .into_iter() - .map(|(stat_key, lease_val, _)| Peer { - id: stat_key.node_id, + .map(|(lease_key, lease_val, _)| Peer { + id: lease_key.node_id, addr: lease_val.node_addr, }) .collect())