From 3242457a5cbfa2d9d0cfd048affb757aa3804502 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 21 Nov 2023 03:46:35 +0000 Subject: [PATCH] fix: bring back `inactive_region_ids` --- src/meta-srv/src/handler.rs | 3 ++- src/meta-srv/src/handler/failure_handler.rs | 6 +++--- src/meta-srv/src/handler/node_stat.rs | 11 ++++------- src/meta-srv/src/handler/region_lease_handler.rs | 10 +++++++++- src/meta-srv/src/selector/weight_compute.rs | 7 ++++--- 5 files changed, 22 insertions(+), 15 deletions(-) diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 0b0fff204f60..912245b791c8 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -28,6 +28,7 @@ use common_telemetry::{debug, info, warn}; use dashmap::DashMap; use futures::future::join_all; use snafu::{OptionExt, ResultExt}; +use store_api::storage::RegionId; use tokio::sync::mpsc::Sender; use tokio::sync::{oneshot, Notify, RwLock}; @@ -75,7 +76,7 @@ pub struct HeartbeatAccumulator { pub header: Option, pub instructions: Vec, pub stat: Option, - pub inactive_region_ids: HashSet, + pub inactive_region_ids: HashSet, pub region_lease: Option, } diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index 717012896163..e44c84282396 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -20,7 +20,6 @@ use api::v1::meta::{HeartbeatRequest, Role}; use async_trait::async_trait; use common_catalog::consts::default_engine; use common_meta::RegionIdent; -use store_api::storage::RegionId; use crate::error::Result; use crate::failure_detector::PhiAccrualFailureDetectorOptions; @@ -86,7 +85,7 @@ impl HeartbeatHandler for RegionFailureHandler { .region_stats .iter() .map(|x| { - let region_id = RegionId::from(x.id); + let region_id = x.id; RegionIdent { cluster_id: stat.cluster_id, datanode_id: stat.id, @@ -108,6 +107,7 @@ impl HeartbeatHandler for RegionFailureHandler { #[cfg(test)] mod tests { use store_api::region_engine::RegionRole; + use store_api::storage::RegionId; use super::*; use crate::handler::node_stat::{RegionStat, Stat}; @@ -133,7 +133,7 @@ mod tests { let acc = &mut HeartbeatAccumulator::default(); fn new_region_stat(region_id: u64) -> RegionStat { RegionStat { - id: region_id, + id: RegionId::from_u64(region_id), rcus: 0, wcus: 0, approximate_bytes: 0, diff --git a/src/meta-srv/src/handler/node_stat.rs b/src/meta-srv/src/handler/node_stat.rs index 4e9343da35b7..3d9fe02e78cc 100644 --- a/src/meta-srv/src/handler/node_stat.rs +++ b/src/meta-srv/src/handler/node_stat.rs @@ -45,7 +45,7 @@ pub struct Stat { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RegionStat { /// The region_id. - pub id: u64, + pub id: RegionId, /// The read capacity units during this period pub rcus: i64, /// The write capacity units during this period @@ -75,13 +75,10 @@ impl Stat { /// Returns a tuple array containing [RegionId] and [RegionRole]. pub fn regions(&self) -> Vec<(RegionId, RegionRole)> { - self.region_stats - .iter() - .map(|s| (RegionId::from(s.id), s.role)) - .collect() + self.region_stats.iter().map(|s| (s.id, s.role)).collect() } - pub fn retain_active_region_stats(&mut self, inactive_region_ids: &HashSet) { + pub fn retain_active_region_stats(&mut self, inactive_region_ids: &HashSet) { if inactive_region_ids.is_empty() { return; } @@ -140,7 +137,7 @@ impl TryFrom for RegionStat { fn try_from(value: api::v1::meta::RegionStat) -> Result { Ok(Self { - id: value.region_id, + id: RegionId::from_u64(value.region_id), rcus: value.rcus, wcus: value.wcus, approximate_bytes: value.approximate_bytes, diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 7ef74713c892..ebf470bbf028 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -99,6 +99,7 @@ impl HeartbeatHandler for RegionLeaseHandler { let cluster_id = stat.cluster_id; let datanode_id = stat.id; let mut granted_regions = Vec::with_capacity(regions.len()); + let mut inactive_regions = HashSet::new(); let (leaders, followers): (Vec<_>, Vec<_>) = regions .into_iter() @@ -122,6 +123,7 @@ impl HeartbeatHandler for RegionLeaseHandler { &leaders, RegionRole::Leader, ); + inactive_regions.extend(closable); let followers = followers.into_iter().flatten().collect::>(); @@ -142,7 +144,9 @@ impl HeartbeatHandler for RegionLeaseHandler { &followers, RegionRole::Follower, ); + inactive_regions.extend(closable); + acc.inactive_region_ids = inactive_regions; acc.region_lease = Some(RegionLease { regions: granted_regions .into_iter() @@ -184,7 +188,7 @@ mod test { fn new_empty_region_stat(region_id: RegionId, role: RegionRole) -> RegionStat { RegionStat { - id: region_id.as_u64(), + id: region_id, role, rcus: 0, wcus: 0, @@ -253,6 +257,7 @@ mod test { handler.handle(&req, ctx, acc).await.unwrap(); assert_region_lease(acc, vec![GrantedRegion::new(region_id, RegionRole::Leader)]); + assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id])); let acc = &mut HeartbeatAccumulator::default(); @@ -277,6 +282,7 @@ mod test { acc, vec![GrantedRegion::new(region_id, RegionRole::Follower)], ); + assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id])); let opening_region_id = RegionId::new(table_id, region_number + 2); let _guard = opening_region_keeper @@ -310,6 +316,7 @@ mod test { GrantedRegion::new(opening_region_id, RegionRole::Follower), ], ); + assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id])); } #[tokio::test] @@ -385,6 +392,7 @@ mod test { GrantedRegion::new(another_region_id, RegionRole::Leader), ], ); + assert_eq!(acc.inactive_region_ids, HashSet::from([no_exist_region_id])); } fn assert_region_lease(acc: &HeartbeatAccumulator, expected: Vec) { diff --git a/src/meta-srv/src/selector/weight_compute.rs b/src/meta-srv/src/selector/weight_compute.rs index 8971f73accbc..10e4b50035c4 100644 --- a/src/meta-srv/src/selector/weight_compute.rs +++ b/src/meta-srv/src/selector/weight_compute.rs @@ -94,6 +94,7 @@ mod tests { use api::v1::meta::Peer; use store_api::region_engine::RegionRole; + use store_api::storage::RegionId; use super::{RegionNumsBasedWeightCompute, WeightCompute}; use crate::handler::node_stat::{RegionStat, Stat}; @@ -189,7 +190,7 @@ mod tests { addr: "127.0.0.1:3001".to_string(), region_num: 11, region_stats: vec![RegionStat { - id: 111, + id: RegionId::from_u64(111), rcus: 1, wcus: 1, approximate_bytes: 1, @@ -206,7 +207,7 @@ mod tests { addr: "127.0.0.1:3002".to_string(), region_num: 12, region_stats: vec![RegionStat { - id: 112, + id: RegionId::from_u64(112), rcus: 1, wcus: 1, approximate_bytes: 1, @@ -223,7 +224,7 @@ mod tests { addr: "127.0.0.1:3003".to_string(), region_num: 13, region_stats: vec![RegionStat { - id: 113, + id: RegionId::from_u64(113), rcus: 1, wcus: 1, approximate_bytes: 1,