Skip to content

Commit

Permalink
feat: filter out empty heartbeat req (GreptimeTeam#2345)
Browse files Browse the repository at this point in the history
* feat: filter out empty heartbeat request

* fix: big mistake
  • Loading branch information
fengjiachun authored and paomian committed Oct 19, 2023
1 parent 5bfc56a commit 21f75a7
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 18 deletions.
29 changes: 12 additions & 17 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,9 @@ impl HeartbeatTask {
if let Some(message) = message {
match outgoing_message_to_mailbox_message(message) {
Ok(message) => {
let peer = Some(Peer { id: node_id, addr: addr.clone() });
let req = HeartbeatRequest {
peer: Some(Peer {
id: node_id,
addr: addr.clone(),
}),
peer,
mailbox_message: Some(message),
..Default::default()
};
Expand All @@ -216,19 +214,18 @@ impl HeartbeatTask {
}
}
_ = &mut sleep => {
// TODO(jeremy): refactor load_status
let (_,region_stats) = Self::load_stats(&region_server_clone).await;
let peer = Some(Peer { id: node_id, addr: addr.clone() });
let region_stats = Self::load_region_stats(&region_server_clone).await;
let now = Instant::now();
let duration_since_epoch = (now - epoch).as_millis() as u64;
let req = HeartbeatRequest {
peer: Some(Peer {
id: node_id,
addr: addr.clone(),
}),
peer,
region_stats,
duration_since_epoch: (Instant::now() - epoch).as_millis() as u64,
duration_since_epoch,
node_epoch,
..Default::default()
};
sleep.as_mut().reset(Instant::now() + Duration::from_millis(interval));
sleep.as_mut().reset(now + Duration::from_millis(interval));
Some(req)
}
};
Expand Down Expand Up @@ -260,19 +257,17 @@ impl HeartbeatTask {
Ok(())
}

async fn load_stats(region_server: &RegionServer) -> (u64, Vec<RegionStat>) {
async fn load_region_stats(region_server: &RegionServer) -> Vec<RegionStat> {
let region_ids = region_server.opened_region_ids();
let region_stats = region_ids
region_ids
.into_iter()
.map(|region_id| RegionStat {
// TODO(ruihang): scratch more info
region_id: region_id.as_u64(),
engine: "MitoEngine".to_string(),
..Default::default()
})
.collect::<Vec<_>>();

(region_stats.len() as _, region_stats)
.collect::<Vec<_>>()
}

pub async fn close(&self) -> Result<()> {
Expand Down
5 changes: 4 additions & 1 deletion src/meta-srv/src/handler/collect_stats_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ impl HeartbeatHandler for CollectStatsHandler {

match Stat::try_from(req.clone()) {
Ok(stat) => {
let _ = acc.stat.insert(stat);
// If stat is empty, it means the request is a mailbox response
if !stat.is_empty() {
let _ = acc.stat.insert(stat);
}
}
Err(err) => {
warn!("Incomplete heartbeat data: {:?}, err: {:?}", req, err);
Expand Down
5 changes: 5 additions & 0 deletions src/meta-srv/src/handler/node_stat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ pub struct RegionStat {
}

impl Stat {
#[inline]
pub fn is_empty(&self) -> bool {
self.region_stats.is_empty()
}

pub fn stat_key(&self) -> StatKey {
StatKey {
cluster_id: self.cluster_id,
Expand Down

0 comments on commit 21f75a7

Please sign in to comment.