Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: bring back inactive_region_ids #2783

Merged
merged 1 commit into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/meta-srv/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use common_telemetry::{debug, info, warn};
use dashmap::DashMap;
use futures::future::join_all;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use tokio::sync::mpsc::Sender;
use tokio::sync::{oneshot, Notify, RwLock};

Expand Down Expand Up @@ -75,7 +76,7 @@ pub struct HeartbeatAccumulator {
pub header: Option<ResponseHeader>,
pub instructions: Vec<Instruction>,
pub stat: Option<Stat>,
pub inactive_region_ids: HashSet<u64>,
pub inactive_region_ids: HashSet<RegionId>,
pub region_lease: Option<RegionLease>,
}

Expand Down
6 changes: 3 additions & 3 deletions src/meta-srv/src/handler/failure_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use api::v1::meta::{HeartbeatRequest, Role};
use async_trait::async_trait;
use common_catalog::consts::default_engine;
use common_meta::RegionIdent;
use store_api::storage::RegionId;

use crate::error::Result;
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
Expand Down Expand Up @@ -86,7 +85,7 @@ impl HeartbeatHandler for RegionFailureHandler {
.region_stats
.iter()
.map(|x| {
let region_id = RegionId::from(x.id);
let region_id = x.id;
RegionIdent {
cluster_id: stat.cluster_id,
datanode_id: stat.id,
Expand All @@ -108,6 +107,7 @@ impl HeartbeatHandler for RegionFailureHandler {
#[cfg(test)]
mod tests {
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;

use super::*;
use crate::handler::node_stat::{RegionStat, Stat};
Expand All @@ -133,7 +133,7 @@ mod tests {
let acc = &mut HeartbeatAccumulator::default();
fn new_region_stat(region_id: u64) -> RegionStat {
RegionStat {
id: region_id,
id: RegionId::from_u64(region_id),
rcus: 0,
wcus: 0,
approximate_bytes: 0,
Expand Down
11 changes: 4 additions & 7 deletions src/meta-srv/src/handler/node_stat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub struct Stat {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegionStat {
/// The region_id.
pub id: u64,
pub id: RegionId,
/// The read capacity units during this period
pub rcus: i64,
/// The write capacity units during this period
Expand Down Expand Up @@ -75,13 +75,10 @@ impl Stat {

/// Returns a tuple array containing [RegionId] and [RegionRole].
pub fn regions(&self) -> Vec<(RegionId, RegionRole)> {
self.region_stats
.iter()
.map(|s| (RegionId::from(s.id), s.role))
.collect()
self.region_stats.iter().map(|s| (s.id, s.role)).collect()
}

pub fn retain_active_region_stats(&mut self, inactive_region_ids: &HashSet<u64>) {
pub fn retain_active_region_stats(&mut self, inactive_region_ids: &HashSet<RegionId>) {
if inactive_region_ids.is_empty() {
return;
}
Expand Down Expand Up @@ -140,7 +137,7 @@ impl TryFrom<api::v1::meta::RegionStat> for RegionStat {

fn try_from(value: api::v1::meta::RegionStat) -> Result<Self, Self::Error> {
Ok(Self {
id: value.region_id,
id: RegionId::from_u64(value.region_id),
rcus: value.rcus,
wcus: value.wcus,
approximate_bytes: value.approximate_bytes,
Expand Down
10 changes: 9 additions & 1 deletion src/meta-srv/src/handler/region_lease_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ impl HeartbeatHandler for RegionLeaseHandler {
let cluster_id = stat.cluster_id;
let datanode_id = stat.id;
let mut granted_regions = Vec::with_capacity(regions.len());
let mut inactive_regions = HashSet::new();

let (leaders, followers): (Vec<_>, Vec<_>) = regions
.into_iter()
Expand All @@ -122,6 +123,7 @@ impl HeartbeatHandler for RegionLeaseHandler {
&leaders,
RegionRole::Leader,
);
inactive_regions.extend(closable);

let followers = followers.into_iter().flatten().collect::<Vec<_>>();

Expand All @@ -142,7 +144,9 @@ impl HeartbeatHandler for RegionLeaseHandler {
&followers,
RegionRole::Follower,
);
inactive_regions.extend(closable);

acc.inactive_region_ids = inactive_regions;
acc.region_lease = Some(RegionLease {
regions: granted_regions
.into_iter()
Expand Down Expand Up @@ -184,7 +188,7 @@ mod test {

fn new_empty_region_stat(region_id: RegionId, role: RegionRole) -> RegionStat {
RegionStat {
id: region_id.as_u64(),
id: region_id,
role,
rcus: 0,
wcus: 0,
Expand Down Expand Up @@ -253,6 +257,7 @@ mod test {
handler.handle(&req, ctx, acc).await.unwrap();

assert_region_lease(acc, vec![GrantedRegion::new(region_id, RegionRole::Leader)]);
assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));

let acc = &mut HeartbeatAccumulator::default();

Expand All @@ -277,6 +282,7 @@ mod test {
acc,
vec![GrantedRegion::new(region_id, RegionRole::Follower)],
);
assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));

let opening_region_id = RegionId::new(table_id, region_number + 2);
let _guard = opening_region_keeper
Expand Down Expand Up @@ -310,6 +316,7 @@ mod test {
GrantedRegion::new(opening_region_id, RegionRole::Follower),
],
);
assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
}

#[tokio::test]
Expand Down Expand Up @@ -385,6 +392,7 @@ mod test {
GrantedRegion::new(another_region_id, RegionRole::Leader),
],
);
assert_eq!(acc.inactive_region_ids, HashSet::from([no_exist_region_id]));
}

fn assert_region_lease(acc: &HeartbeatAccumulator, expected: Vec<GrantedRegion>) {
Expand Down
7 changes: 4 additions & 3 deletions src/meta-srv/src/selector/weight_compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ mod tests {

use api::v1::meta::Peer;
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;

use super::{RegionNumsBasedWeightCompute, WeightCompute};
use crate::handler::node_stat::{RegionStat, Stat};
Expand Down Expand Up @@ -189,7 +190,7 @@ mod tests {
addr: "127.0.0.1:3001".to_string(),
region_num: 11,
region_stats: vec![RegionStat {
id: 111,
id: RegionId::from_u64(111),
rcus: 1,
wcus: 1,
approximate_bytes: 1,
Expand All @@ -206,7 +207,7 @@ mod tests {
addr: "127.0.0.1:3002".to_string(),
region_num: 12,
region_stats: vec![RegionStat {
id: 112,
id: RegionId::from_u64(112),
rcus: 1,
wcus: 1,
approximate_bytes: 1,
Expand All @@ -223,7 +224,7 @@ mod tests {
addr: "127.0.0.1:3003".to_string(),
region_num: 13,
region_stats: vec![RegionStat {
id: 113,
id: RegionId::from_u64(113),
rcus: 1,
wcus: 1,
approximate_bytes: 1,
Expand Down
Loading