Skip to content

Commit

Permalink
refactor: simplify retain_active_regions
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Oct 31, 2023
1 parent 0052fa4 commit 9f18d1c
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 379 deletions.
116 changes: 36 additions & 80 deletions src/meta-srv/src/region/lease_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod file;
pub mod mito;
pub mod utils;

use std::collections::{HashMap, HashSet};

use common_catalog::consts::{FILE_ENGINE, MITO2_ENGINE};
use common_meta::key::TableMetadataManagerRef;
use snafu::ResultExt;
use store_api::region_engine::RegionRole;
use store_api::storage::{RegionId, TableId};

use self::mito::retain_active_regions;
use crate::error::{self, Result};

/// Region Lease Keeper removes any inactive [RegionRole::Leader] regions.
pub struct RegionLeaseKeeper {
table_metadata_manager: TableMetadataManagerRef,
}
Expand All @@ -39,46 +38,27 @@ impl RegionLeaseKeeper {
}

impl RegionLeaseKeeper {
/// Retains active mito regions(`datanode_regions`), returns inactive regions.
///
/// **For mito regions:**
///
/// - It removes a leader region if the `node_id` isn't the corresponding leader peer in `region_routes`.
/// - It removes a follower region if the `node_id` isn't one of the peers in `region_routes`.
///
/// **For file regions:**
///
/// - It removes a follower region if the `node_id` isn't one of the peers in `region_routes`.
///
/// **Common behaviors:**
/// Retains active [RegionRole::Leader] regions, returns inactive regions.
///
/// - It grants new lease if the `datanode_id` is the corresponding leader peer in `region_routes`.
/// - It removes a leader region if the `datanode_id` isn't the corresponding leader peer in `region_routes`.
/// - Expected as [RegionRole::Follower] regions.
/// - Unexpected [RegionRole::Leader] regions.
/// - It removes a region if the region's table metadata is not found.
pub async fn retain_active_regions(
&self,
_cluster_id: u64,
node_id: u64,
engine: &str,
datanode_regions: &mut Vec<(RegionId, RegionRole)>,
datanode_id: u64,
datanode_regions: &mut Vec<RegionId>,
) -> Result<HashSet<RegionId>> {
let table_route_manager = self.table_metadata_manager.table_route_manager();

let handler = match engine {
MITO2_ENGINE => mito::retain_active_regions,
FILE_ENGINE => file::retain_active_regions,
_ => {
return error::UnexpectedSnafu {
violated: format!("Unknown engine: {engine}"),
}
.fail()
}
};

let mut tables = HashMap::<TableId, Vec<(RegionId, RegionRole)>>::new();
let mut tables = HashMap::<TableId, Vec<RegionId>>::new();

// Group by `table_id`.
for (region_id, role) in datanode_regions.iter() {
for region_id in datanode_regions.iter() {
let table = tables.entry(region_id.table_id()).or_default();
table.push((*region_id, *role));
table.push(*region_id);
}

let table_ids = tables.keys().cloned().collect::<Vec<_>>();
Expand All @@ -96,14 +76,14 @@ impl RegionLeaseKeeper {
if let Some(metadata) = metadata_subset.get(table_id) {
let region_routes = &metadata.region_routes;

inactive_regions.extend(handler(node_id, regions, region_routes));
inactive_regions.extend(retain_active_regions(datanode_id, regions, region_routes));
} else {
// Removes table if metadata is not found.
inactive_regions.extend(regions.drain(..).map(|(region_id, _)| region_id));
inactive_regions.extend(regions.drain(..));
}
}

datanode_regions.retain(|(region_id, _)| !inactive_regions.contains(region_id));
datanode_regions.retain(|region_id| !inactive_regions.contains(region_id));

Ok(inactive_regions)
}
Expand All @@ -118,12 +98,10 @@ impl RegionLeaseKeeper {
mod tests {
use std::sync::Arc;

use common_catalog::consts::MITO2_ENGINE;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::key::TableMetadataManager;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;

use super::RegionLeaseKeeper;
Expand All @@ -140,17 +118,16 @@ mod tests {

#[tokio::test]
async fn test_empty_table_routes() {
let node_id = 1;
let engine = MITO2_ENGINE;
let datanode_id = 1;
let region_number = 1u32;
let region_id = RegionId::from_u64(region_number as u64);

let keeper = new_test_keeper();

let mut datanode_regions = vec![(region_id, RegionRole::Leader)];
let mut datanode_regions = vec![region_id];

let removed = keeper
.retain_active_regions(0, node_id, engine, &mut datanode_regions)
.retain_active_regions(0, datanode_id, &mut datanode_regions)
.await
.unwrap();

Expand All @@ -161,12 +138,11 @@ mod tests {

#[tokio::test]
async fn test_retain_active_regions_simple() {
let node_id = 1;
let engine = MITO2_ENGINE;
let datanode_id = 1;
let region_number = 1u32;
let table_id = 10;
let region_id = RegionId::new(table_id, region_number);
let peer = Peer::empty(node_id);
let peer = Peer::empty(datanode_id);
let table_info = new_test_table_info(table_id, vec![region_number]).into();

let region_routes = vec![RegionRoute {
Expand All @@ -183,22 +159,22 @@ mod tests {
.await
.unwrap();

// `inactive_regions` should be vec![region_id].
let mut datanode_regions = vec![(region_id, RegionRole::Leader)];
// `inactive_regions` should be empty.
let mut datanode_regions = vec![region_id];

let inactive_regions = keeper
.retain_active_regions(0, node_id, engine, &mut datanode_regions)
.retain_active_regions(0, datanode_id, &mut datanode_regions)
.await
.unwrap();

assert!(inactive_regions.is_empty());
assert_eq!(datanode_regions.len(), 1);

// `inactive_regions` should be empty, because the `region_id` is a potential leader.
let mut datanode_regions = vec![(region_id, RegionRole::Follower)];
// `inactive_regions` should be empty.
let mut datanode_regions = vec![];

let inactive_regions = keeper
.retain_active_regions(0, node_id, engine, &mut datanode_regions)
.retain_active_regions(0, datanode_id, &mut datanode_regions)
.await
.unwrap();

Expand All @@ -207,16 +183,15 @@ mod tests {

#[tokio::test]
async fn test_retain_active_regions_2() {
let node_id = 1;
let engine = MITO2_ENGINE;
let datanode_id = 1;
let region_number = 1u32;
let table_id = 10;
let region_id = RegionId::new(table_id, region_number);
let another_region_id = RegionId::new(table_id, region_number + 1);
let unknown_region_id = RegionId::new(table_id + 1, region_number);

let peer = Peer::empty(node_id);
let another_peer = Peer::empty(node_id + 1);
let peer = Peer::empty(datanode_id);
let another_peer = Peer::empty(datanode_id + 1);

let table_info =
new_test_table_info(table_id, vec![region_number, region_number + 1]).into();
Expand All @@ -242,49 +217,30 @@ mod tests {
.await
.unwrap();

// Unexpected Leader region.
// `inactive_regions` should be vec![unknown_region_id].
let mut datanode_regions = vec![
(region_id, RegionRole::Leader),
(unknown_region_id, RegionRole::Follower),
];
let mut datanode_regions = vec![region_id, unknown_region_id];

let inactive_regions = keeper
.retain_active_regions(0, node_id, engine, &mut datanode_regions)
.retain_active_regions(0, datanode_id, &mut datanode_regions)
.await
.unwrap();

assert_eq!(inactive_regions.len(), 1);
assert!(inactive_regions.contains(&unknown_region_id));
assert_eq!(datanode_regions.len(), 1);

// Expected as Follower region.
// `inactive_regions` should be vec![another_region_id], because the `another_region_id` is a active region of `another_peer`.
let mut datanode_regions = vec![
(region_id, RegionRole::Follower),
(another_region_id, RegionRole::Follower),
];

let inactive_regions = keeper
.retain_active_regions(0, node_id, engine, &mut datanode_regions)
.await
.unwrap();

assert_eq!(inactive_regions.len(), 1);
assert!(inactive_regions.contains(&another_region_id));
assert_eq!(datanode_regions.len(), 1);

// `inactive_regions` should be vec![another_region_id], because the `another_region_id` is a active region of `another_peer`.
let mut datanode_regions = vec![
(region_id, RegionRole::Follower),
(another_region_id, RegionRole::Leader),
];
let mut datanode_regions = vec![another_region_id];

let inactive_regions = keeper
.retain_active_regions(0, node_id, engine, &mut datanode_regions)
.retain_active_regions(0, datanode_id, &mut datanode_regions)
.await
.unwrap();

assert_eq!(inactive_regions.len(), 1);
assert!(inactive_regions.contains(&another_region_id));
assert_eq!(datanode_regions.len(), 1);
assert!(datanode_regions.is_empty());
}
}
Loading

0 comments on commit 9f18d1c

Please sign in to comment.