From 21f75a77c1f6647acfa41a64210bd8719bdcdda7 Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Tue, 12 Sep 2023 10:49:13 +0800 Subject: [PATCH] feat: filter out empty heartbeat req (#2345) * feat: filter out empty heartbeat request * fix: big mistake --- src/datanode/src/heartbeat.rs | 29 ++++++++----------- .../src/handler/collect_stats_handler.rs | 5 +++- src/meta-srv/src/handler/node_stat.rs | 5 ++++ 3 files changed, 21 insertions(+), 18 deletions(-) diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index fabc04024a03..72c00192d857 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -196,11 +196,9 @@ impl HeartbeatTask { if let Some(message) = message { match outgoing_message_to_mailbox_message(message) { Ok(message) => { + let peer = Some(Peer { id: node_id, addr: addr.clone() }); let req = HeartbeatRequest { - peer: Some(Peer { - id: node_id, - addr: addr.clone(), - }), + peer, mailbox_message: Some(message), ..Default::default() }; @@ -216,19 +214,18 @@ impl HeartbeatTask { } } _ = &mut sleep => { - // TODO(jeremy): refactor load_status - let (_,region_stats) = Self::load_stats(®ion_server_clone).await; + let peer = Some(Peer { id: node_id, addr: addr.clone() }); + let region_stats = Self::load_region_stats(®ion_server_clone).await; + let now = Instant::now(); + let duration_since_epoch = (now - epoch).as_millis() as u64; let req = HeartbeatRequest { - peer: Some(Peer { - id: node_id, - addr: addr.clone(), - }), + peer, region_stats, - duration_since_epoch: (Instant::now() - epoch).as_millis() as u64, + duration_since_epoch, node_epoch, ..Default::default() }; - sleep.as_mut().reset(Instant::now() + Duration::from_millis(interval)); + sleep.as_mut().reset(now + Duration::from_millis(interval)); Some(req) } }; @@ -260,9 +257,9 @@ impl HeartbeatTask { Ok(()) } - async fn load_stats(region_server: &RegionServer) -> (u64, Vec) { + async fn load_region_stats(region_server: &RegionServer) -> Vec { let region_ids = region_server.opened_region_ids(); - let region_stats = region_ids + region_ids .into_iter() .map(|region_id| RegionStat { // TODO(ruihang): scratch more info @@ -270,9 +267,7 @@ impl HeartbeatTask { engine: "MitoEngine".to_string(), ..Default::default() }) - .collect::>(); - - (region_stats.len() as _, region_stats) + .collect::>() } pub async fn close(&self) -> Result<()> { diff --git a/src/meta-srv/src/handler/collect_stats_handler.rs b/src/meta-srv/src/handler/collect_stats_handler.rs index 6f4efffcc883..b5bb94969211 100644 --- a/src/meta-srv/src/handler/collect_stats_handler.rs +++ b/src/meta-srv/src/handler/collect_stats_handler.rs @@ -42,7 +42,10 @@ impl HeartbeatHandler for CollectStatsHandler { match Stat::try_from(req.clone()) { Ok(stat) => { - let _ = acc.stat.insert(stat); + // If stat is empty, it means the request is a mailbox response + if !stat.is_empty() { + let _ = acc.stat.insert(stat); + } } Err(err) => { warn!("Incomplete heartbeat data: {:?}, err: {:?}", req, err); diff --git a/src/meta-srv/src/handler/node_stat.rs b/src/meta-srv/src/handler/node_stat.rs index 20072b49418e..298360bbb7eb 100644 --- a/src/meta-srv/src/handler/node_stat.rs +++ b/src/meta-srv/src/handler/node_stat.rs @@ -52,6 +52,11 @@ pub struct RegionStat { } impl Stat { + #[inline] + pub fn is_empty(&self) -> bool { + self.region_stats.is_empty() + } + pub fn stat_key(&self) -> StatKey { StatKey { cluster_id: self.cluster_id,