From e0847640f705afe002a04dd4c5937d66935d6a13 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 23 Oct 2023 16:46:48 +0000 Subject: [PATCH 1/8] feat: introduce the region lease keeper --- src/common/meta/src/key.rs | 45 +-- src/common/meta/src/key/test_utils.rs | 57 ++++ src/common/meta/src/peer.rs | 8 + src/common/meta/src/rpc/router.rs | 69 +++- src/meta-srv/Cargo.toml | 1 + src/meta-srv/src/handler/failure_handler.rs | 4 + src/meta-srv/src/handler/node_stat.rs | 26 +- src/meta-srv/src/lib.rs | 2 + src/meta-srv/src/region.rs | 17 + src/meta-srv/src/region/lease_keeper.rs | 309 ++++++++++++++++++ src/meta-srv/src/region/lease_keeper/file.rs | 167 ++++++++++ src/meta-srv/src/region/lease_keeper/mito.rs | 160 +++++++++ src/meta-srv/src/region/lease_keeper/utils.rs | 129 ++++++++ src/partition/src/manager.rs | 4 +- src/store-api/src/region_engine.rs | 12 +- 15 files changed, 965 insertions(+), 45 deletions(-) create mode 100644 src/common/meta/src/key/test_utils.rs create mode 100644 src/meta-srv/src/region.rs create mode 100644 src/meta-srv/src/region/lease_keeper.rs create mode 100644 src/meta-srv/src/region/lease_keeper/file.rs create mode 100644 src/meta-srv/src/region/lease_keeper/mito.rs create mode 100644 src/meta-srv/src/region/lease_keeper/utils.rs diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 6bb2c1bdabc5..c9522a9d64b5 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -54,6 +54,8 @@ pub mod table_region; // TODO(weny): removes it. #[allow(deprecated)] pub mod table_route; +#[cfg(any(test, feature = "testing"))] +pub mod test_utils; use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; @@ -684,12 +686,11 @@ mod tests { use std::sync::Arc; use bytes::Bytes; - use datatypes::prelude::ConcreteDataType; - use datatypes::schema::{ColumnSchema, SchemaBuilder}; use futures::TryStreamExt; - use table::metadata::{RawTableInfo, TableInfo, TableInfoBuilder, TableMetaBuilder}; + use table::metadata::{RawTableInfo, TableInfo}; use super::datanode_table::DatanodeTableKey; + use super::test_utils; use crate::ddl::utils::region_storage_path; use crate::key::datanode_table::RegionInfo; use crate::key::table_info::TableInfoValue; @@ -735,40 +736,6 @@ mod tests { assert_eq!(removed, to_removed_key(key)); } - fn new_test_table_info(region_numbers: impl Iterator) -> TableInfo { - let column_schemas = vec![ - ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), - ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ) - .with_time_index(true), - ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true), - ]; - let schema = SchemaBuilder::try_from(column_schemas) - .unwrap() - .version(123) - .build() - .unwrap(); - - let meta = TableMetaBuilder::default() - .schema(Arc::new(schema)) - .primary_key_indices(vec![0]) - .engine("engine") - .next_column_id(3) - .region_numbers(region_numbers.collect::>()) - .build() - .unwrap(); - TableInfoBuilder::default() - .table_id(10) - .table_version(5) - .name("mytable") - .meta(meta) - .build() - .unwrap() - } - fn new_test_region_route() -> RegionRoute { new_region_route(1, 2) } @@ -787,6 +754,10 @@ mod tests { } } + fn new_test_table_info(region_numbers: impl Iterator) -> TableInfo { + test_utils::new_test_table_info(10, region_numbers) + } + #[tokio::test] async fn test_create_table_metadata() { let mem_kv = Arc::new(MemoryKvBackend::default()); diff --git a/src/common/meta/src/key/test_utils.rs b/src/common/meta/src/key/test_utils.rs new file mode 100644 index 000000000000..aa4fa21ea992 --- /dev/null +++ b/src/common/meta/src/key/test_utils.rs @@ -0,0 +1,57 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::{ColumnSchema, SchemaBuilder}; +use store_api::storage::TableId; +use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder}; + +pub fn new_test_table_info>( + table_id: TableId, + region_numbers: I, +) -> TableInfo { + let column_schemas = vec![ + ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true), + ]; + let schema = SchemaBuilder::try_from(column_schemas) + .unwrap() + .version(123) + .build() + .unwrap(); + + let meta = TableMetaBuilder::default() + .schema(Arc::new(schema)) + .primary_key_indices(vec![0]) + .engine("engine") + .next_column_id(3) + .region_numbers(region_numbers.into_iter().collect::>()) + .build() + .unwrap(); + TableInfoBuilder::default() + .table_id(table_id) + .table_version(5) + .name("mytable") + .meta(meta) + .build() + .unwrap() +} diff --git a/src/common/meta/src/peer.rs b/src/common/meta/src/peer.rs index 483250cbf1df..dd012a12a1c5 100644 --- a/src/common/meta/src/peer.rs +++ b/src/common/meta/src/peer.rs @@ -49,6 +49,14 @@ impl Peer { addr: addr.into(), } } + + #[cfg(any(test, feature = "testing"))] + pub fn empty(id: u64) -> Self { + Self { + id, + addr: String::new(), + } + } } impl Display for Peer { diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs index ffc311fbe511..53189137a6b5 100644 --- a/src/common/meta/src/rpc/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -58,7 +58,10 @@ pub fn find_leaders(region_routes: &[RegionRoute]) -> HashSet { .collect() } -pub fn convert_to_region_map(region_routes: &[RegionRoute]) -> HashMap { +/// Returns the HashMap<[RegionNumber], &[Peer]>; +/// +/// If the region doesn't have a leader peer, the [Region] will be omitted. +pub fn convert_to_region_leader_map(region_routes: &[RegionRoute]) -> HashMap { region_routes .iter() .filter_map(|x| { @@ -69,6 +72,50 @@ pub fn convert_to_region_map(region_routes: &[RegionRoute]) -> HashMap>() } +/// Returns the HashMap<[RegionNumber], BTreeMap>. +pub fn convert_to_region_followers_map( + region_routes: &[RegionRoute], +) -> HashMap> { + region_routes + .iter() + .map(|x| { + ( + x.region.id.region_number(), + x.follower_peers + .iter() + .map(|peer| (peer.id, peer)) + .collect::>(), + ) + }) + .collect::>() +} + +/// Returns the HashMap<[RegionNumber], HashSet>. +pub fn convert_to_region_peers_map(region_routes: &[RegionRoute]) -> HashMap> { + let mut set = region_routes + .iter() + .map(|x| { + ( + x.region.id.region_number(), + x.follower_peers + .iter() + .map(|peer| (peer.id)) + .collect::>(), + ) + }) + .collect::>(); + + for route in region_routes { + if let Some(peer) = &route.leader_peer { + let entry = set.entry(route.region.id.region_number()).or_default(); + + entry.insert(peer.id); + } + } + + set +} + pub fn find_region_leader(region_routes: &[RegionRoute], region_number: u32) -> Option<&Peer> { region_routes .iter() @@ -241,12 +288,16 @@ impl RegionRoute { pub struct RegionRoutes(pub Vec); impl RegionRoutes { - pub fn region_map(&self) -> HashMap { - convert_to_region_map(&self.0) + pub fn region_leader_map(&self) -> HashMap { + convert_to_region_leader_map(&self.0) + } + + pub fn region_followers_map(&self) -> HashMap> { + convert_to_region_followers_map(&self.0) } pub fn find_region_leader(&self, region_number: u32) -> Option<&Peer> { - self.region_map().get(®ion_number).copied() + self.region_leader_map().get(®ion_number).copied() } } @@ -258,6 +309,16 @@ pub struct Region { pub attrs: BTreeMap, } +impl Region { + #[cfg(any(test, feature = "testing"))] + pub fn new_test(id: RegionId) -> Self { + Self { + id, + ..Default::default() + } + } +} + impl From for Region { fn from(r: PbRegion) -> Self { Self { diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 4998969e5006..d6bb07a74b22 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -59,6 +59,7 @@ uuid.workspace = true [dev-dependencies] chrono.workspace = true client = { workspace = true, features = ["testing"] } +common-meta = { workspace = true, features = ["testing"] } common-procedure-test = { workspace = true } session = { workspace = true } tracing = "0.1" diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index 85bb0c49aac3..d1c39d99e75f 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -102,6 +102,8 @@ impl HeartbeatHandler for RegionFailureHandler { #[cfg(test)] mod tests { + use store_api::region_engine::RegionRole; + use super::*; use crate::handler::node_stat::{RegionStat, Stat}; use crate::metasrv::builder::MetaSrvBuilder; @@ -129,6 +131,8 @@ mod tests { wcus: 0, approximate_bytes: 0, approximate_rows: 0, + engine: default_engine().to_string(), + role: RegionRole::Follower, } } acc.stat = Some(Stat { diff --git a/src/meta-srv/src/handler/node_stat.rs b/src/meta-srv/src/handler/node_stat.rs index 8601ec5dbf67..08152749e7f5 100644 --- a/src/meta-srv/src/handler/node_stat.rs +++ b/src/meta-srv/src/handler/node_stat.rs @@ -17,6 +17,7 @@ use std::collections::HashSet; use api::v1::meta::HeartbeatRequest; use common_time::util as time_util; use serde::{Deserialize, Serialize}; +use store_api::region_engine::RegionRole; use crate::error::{Error, InvalidHeartbeatRequestSnafu}; use crate::keys::StatKey; @@ -25,7 +26,9 @@ use crate::keys::StatKey; pub struct Stat { pub timestamp_millis: i64, pub cluster_id: u64, + // The datanode Id. pub id: u64, + // The datanode address. pub addr: String, /// The read capacity units during this period pub rcus: i64, @@ -38,8 +41,9 @@ pub struct Stat { pub node_epoch: u64, } -#[derive(Debug, Default, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct RegionStat { + /// The region_id. pub id: u64, /// The read capacity units during this period pub rcus: i64, @@ -49,6 +53,24 @@ pub struct RegionStat { pub approximate_bytes: i64, /// Approximate number of rows in this region pub approximate_rows: i64, + /// The engine name. + pub engine: String, + /// The region role. + pub role: RegionRole, +} + +impl Default for RegionStat { + fn default() -> Self { + Self { + id: 0, + rcus: 0, + wcus: 0, + approximate_bytes: 0, + approximate_rows: 0, + engine: String::new(), + role: RegionRole::Follower, + } + } } impl Stat { @@ -132,6 +154,8 @@ impl TryFrom for RegionStat { wcus: value.wcus, approximate_bytes: value.approximate_bytes, approximate_rows: value.approximate_rows, + engine: value.engine.to_string(), + role: RegionRole::from(value.role()), }) } } diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index e27196b478a7..18c966502b00 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -14,6 +14,7 @@ #![feature(async_closure)] #![feature(result_flattening)] +#![feature(extract_if)] pub mod bootstrap; mod cache_invalidator; @@ -31,6 +32,7 @@ mod metrics; pub mod mocks; pub mod procedure; pub mod pubsub; +pub mod region; pub mod selector; pub mod service; pub mod table_meta_alloc; diff --git a/src/meta-srv/src/region.rs b/src/meta-srv/src/region.rs new file mode 100644 index 000000000000..a3106bd57935 --- /dev/null +++ b/src/meta-srv/src/region.rs @@ -0,0 +1,17 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod lease_keeper; + +pub use lease_keeper::RegionLeaseKeeper; diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs new file mode 100644 index 000000000000..53bc56092832 --- /dev/null +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -0,0 +1,309 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod file; +pub mod mito; +pub mod utils; + +use std::collections::{HashMap, HashSet}; + +use common_catalog::consts::{FILE_ENGINE, MITO2_ENGINE}; +use common_meta::key::TableMetadataManagerRef; +use futures::future::try_join_all; +use snafu::ResultExt; +use store_api::region_engine::RegionRole; +use store_api::storage::{RegionId, TableId}; + +use crate::error::{self, Result}; + +pub struct RegionLeaseKeeper { + table_metadata_manager: TableMetadataManagerRef, +} + +impl RegionLeaseKeeper { + pub fn new(table_metadata_manager: TableMetadataManagerRef) -> Self { + Self { + table_metadata_manager, + } + } +} + +impl RegionLeaseKeeper { + /// Retains active mito regions(`datanode_regions`), returns inactive regions. + /// + /// **For mito regions:** + /// + /// - It removes a leader region if the `node_id` isn't the corresponding leader peer in `region_routes`. + /// - It removes a follower region if the `node_id` isn't one of the peers in `region_routes`. + /// + /// **For file regions:** + /// + /// - It removes a follower region if the `node_id` isn't one of the peers in `region_routes`. + /// + /// **Common behaviors:** + /// + /// - It removes a region if the region's table metadata is not found. + pub async fn retain_active_regions( + &self, + _cluster_id: u64, + node_id: u64, + engine: &str, + datanode_regions: &mut Vec<(RegionId, RegionRole)>, + ) -> Result> { + let table_route_manager = self.table_metadata_manager.table_route_manager(); + + let handler = match engine { + MITO2_ENGINE => mito::retain_active_regions, + FILE_ENGINE => file::retain_active_regions, + _ => { + return error::UnexpectedSnafu { + violated: format!("Unknown engine: {engine}"), + } + .fail() + } + }; + + let mut tables = HashMap::>::new(); + + // Group by `table_id`. + for (region_id, role) in datanode_regions.iter() { + let table = tables.entry(region_id.table_id()).or_default(); + table.push((*region_id, *role)); + } + + let table_ids: Vec = tables.keys().cloned().collect::>(); + let metadata = try_join_all( + table_ids + .iter() + .map(|table_id| table_route_manager.get(*table_id)), + ) + .await + .context(error::TableMetadataManagerSnafu)?; + + // All tables' metadata. + let tables_metadata = table_ids + .into_iter() + .zip(metadata) + .collect::>>(); + + let mut inactive_regions = HashSet::new(); + + // Removes inactive regions. + for (table_id, metadata) in tables_metadata { + if let Some(metadata) = metadata { + // Safety: Value must exist. + let regions = tables.get_mut(&table_id).unwrap(); + let region_routes = &metadata.region_routes; + + inactive_regions.extend(handler(node_id, regions, region_routes)); + } else { + // Removes table if metadata is not found. + // Safety: Value must exist. + let regions = tables + .remove(&table_id) + .unwrap() + .into_iter() + .map(|(region_id, _)| region_id); + + inactive_regions.extend(regions); + } + } + + let _ = datanode_regions + .extract_if(|(region_id, _)| inactive_regions.contains(region_id)) + .collect::>(); + + Ok(inactive_regions) + } + + #[cfg(test)] + pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef { + &self.table_metadata_manager + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_catalog::consts::MITO2_ENGINE; + use common_meta::key::test_utils::new_test_table_info; + use common_meta::key::TableMetadataManager; + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute}; + use store_api::region_engine::RegionRole; + use store_api::storage::RegionId; + + use super::RegionLeaseKeeper; + use crate::service::store::kv::KvBackendAdapter; + use crate::service::store::memory::MemStore; + + fn new_test_keeper() -> RegionLeaseKeeper { + let store = KvBackendAdapter::wrap(Arc::new(MemStore::new())); + + let table_metadata_manager = Arc::new(TableMetadataManager::new(store)); + + RegionLeaseKeeper::new(table_metadata_manager) + } + + #[tokio::test] + async fn test_empty_table_routes() { + let node_id = 1; + let engine = MITO2_ENGINE; + let region_number = 1u32; + let region_id = RegionId::from_u64(region_number as u64); + + let keeper = new_test_keeper(); + + let mut datanode_regions = vec![(region_id, RegionRole::Leader)]; + + let removed = keeper + .retain_active_regions(0, node_id, engine, &mut datanode_regions) + .await + .unwrap(); + + assert!(datanode_regions.is_empty()); + assert_eq!(removed.len(), 1); + assert!(removed.contains(®ion_id)); + } + + #[tokio::test] + async fn test_retain_active_regions_simple() { + let node_id = 1; + let engine = MITO2_ENGINE; + let region_number = 1u32; + let table_id = 10; + let region_id = RegionId::new(table_id, region_number); + let peer = Peer::empty(node_id); + let table_info = new_test_table_info(table_id, vec![region_number]).into(); + + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(peer.clone()), + ..Default::default() + }]; + + let keeper = new_test_keeper(); + let table_metadata_manager = keeper.table_metadata_manager(); + + table_metadata_manager + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + // `inactive_regions` should be vec![region_id]. + let mut datanode_regions = vec![(region_id, RegionRole::Leader)]; + + let inactive_regions = keeper + .retain_active_regions(0, node_id, engine, &mut datanode_regions) + .await + .unwrap(); + + assert!(inactive_regions.is_empty()); + assert_eq!(datanode_regions.len(), 1); + + // `inactive_regions` should be empty, because the `region_id` is a potential leader. + let mut datanode_regions = vec![(region_id, RegionRole::Follower)]; + + let inactive_regions = keeper + .retain_active_regions(0, node_id, engine, &mut datanode_regions) + .await + .unwrap(); + + assert!(inactive_regions.is_empty()); + } + + #[tokio::test] + async fn test_retain_active_regions_2() { + let node_id = 1; + let engine = MITO2_ENGINE; + let region_number = 1u32; + let table_id = 10; + let region_id = RegionId::new(table_id, region_number); + let another_region_id = RegionId::new(table_id, region_number + 1); + let unknown_region_id = RegionId::new(table_id + 1, region_number); + + let peer = Peer::empty(node_id); + let another_peer = Peer::empty(node_id + 1); + + let table_info = + new_test_table_info(table_id, vec![region_number, region_number + 1]).into(); + + let region_routes = vec![ + RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(peer.clone()), + ..Default::default() + }, + RegionRoute { + region: Region::new_test(another_region_id), + leader_peer: None, + follower_peers: vec![another_peer.clone()], + }, + ]; + + let keeper = new_test_keeper(); + let table_metadata_manager = keeper.table_metadata_manager(); + + table_metadata_manager + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + // `inactive_regions` should be vec![unknown_region_id]. + let mut datanode_regions = vec![ + (region_id, RegionRole::Leader), + (unknown_region_id, RegionRole::Follower), + ]; + + let inactive_regions = keeper + .retain_active_regions(0, node_id, engine, &mut datanode_regions) + .await + .unwrap(); + + assert_eq!(inactive_regions.len(), 1); + assert!(inactive_regions.contains(&unknown_region_id)); + assert_eq!(datanode_regions.len(), 1); + + // `inactive_regions` should be vec![another_region_id], because the `another_region_id` is a active region of `another_peer`. + let mut datanode_regions = vec![ + (region_id, RegionRole::Follower), + (another_region_id, RegionRole::Follower), + ]; + + let inactive_regions = keeper + .retain_active_regions(0, node_id, engine, &mut datanode_regions) + .await + .unwrap(); + + assert_eq!(inactive_regions.len(), 1); + assert!(inactive_regions.contains(&another_region_id)); + assert_eq!(datanode_regions.len(), 1); + + // `inactive_regions` should be vec![another_region_id], because the `another_region_id` is a active region of `another_peer`. + let mut datanode_regions = vec![ + (region_id, RegionRole::Follower), + (another_region_id, RegionRole::Leader), + ]; + + let inactive_regions = keeper + .retain_active_regions(0, node_id, engine, &mut datanode_regions) + .await + .unwrap(); + + assert_eq!(inactive_regions.len(), 1); + assert!(inactive_regions.contains(&another_region_id)); + assert_eq!(datanode_regions.len(), 1); + } +} diff --git a/src/meta-srv/src/region/lease_keeper/file.rs b/src/meta-srv/src/region/lease_keeper/file.rs new file mode 100644 index 000000000000..c0148584359e --- /dev/null +++ b/src/meta-srv/src/region/lease_keeper/file.rs @@ -0,0 +1,167 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use common_meta::rpc::router::{convert_to_region_peers_map, RegionRoute}; +use store_api::region_engine::RegionRole; +use store_api::storage::RegionId; + +use super::utils::inactive_follower_regions; + +/// Retains active mito regions(`datanode_regions`), returns inactive regions. +/// +/// It removes a region if the `node_id` isn't one of the peers in `region_routes`. +pub fn retain_active_regions( + node_id: u64, + datanode_regions: &mut Vec<(RegionId, RegionRole)>, + region_routes: &[RegionRoute], +) -> HashSet { + let region_peers_map = convert_to_region_peers_map(region_routes); + + let inactive_region_ids = datanode_regions + .clone() + .into_iter() + .filter_map(|(region_id, _)| { + inactive_follower_regions(node_id, region_id, ®ion_peers_map) + }) + .collect::>(); + + let _ = datanode_regions + .extract_if(|(region_id, _)| inactive_region_ids.contains(region_id)) + .collect::>(); + inactive_region_ids +} + +#[cfg(test)] +mod tests { + + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute}; + use store_api::region_engine::RegionRole; + use store_api::storage::RegionId; + + use crate::region::lease_keeper::file::retain_active_regions; + + #[test] + fn test_retain_active_regions() { + let node_id = 1u64; + let region_number = 1u32; + let region_id = RegionId::from_u64(region_number as u64); + let peer = Peer::empty(node_id); + + let another_region_id = RegionId::from_u64(region_number as u64 + 1); + let another_peer = Peer::empty(node_id + 1); + + let datanode_regions = vec![(region_id, RegionRole::Follower)]; + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(peer.clone()), + ..Default::default() + }]; + + // `inactive_regions` should be empty, `region_id` is a active region of the `peer` + let inactive_regions = + retain_active_regions(node_id, &mut datanode_regions.clone(), ®ion_routes); + + assert!(inactive_regions.is_empty()); + + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: None, + follower_peers: vec![peer.clone()], + }]; + + // `inactive_regions` should be empty, `region_id` is a active region of the `peer` + let inactive_regions = + retain_active_regions(node_id, &mut datanode_regions.clone(), ®ion_routes); + + assert!(inactive_regions.is_empty()); + + let region_routes = vec![ + RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(peer.clone()), + ..Default::default() + }, + RegionRoute { + region: Region::new_test(another_region_id), + leader_peer: None, + follower_peers: vec![peer.clone()], + }, + ]; + + // `inactive_regions` should be empty, `region_id` is a active region of the `peer` + let inactive_regions = + retain_active_regions(node_id, &mut datanode_regions.clone(), ®ion_routes); + + assert!(inactive_regions.is_empty()); + + // `inactive_regions` should be vec[`region_id`,`another_region_id`], + // both regions are the active region of the `another_peer`. + let region_routes = vec![ + RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(another_peer.clone()), + ..Default::default() + }, + RegionRoute { + region: Region::new_test(another_region_id), + leader_peer: None, + follower_peers: vec![another_peer.clone()], + }, + ]; + + let mut datanode_regions = vec![ + (region_id, RegionRole::Follower), + (another_region_id, RegionRole::Follower), + ]; + + let inactive_regions = + retain_active_regions(node_id, &mut datanode_regions, ®ion_routes); + + assert_eq!(inactive_regions.len(), 2); + assert!(inactive_regions.contains(®ion_id)); + assert!(inactive_regions.contains(&another_region_id)); + assert!(datanode_regions.is_empty()); + + // `inactive_regions` should be vec[`another_region_id`], + // `another_region_id` regions are the active region of the `another_peer`. + let region_routes = vec![ + RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(peer.clone()), + ..Default::default() + }, + RegionRoute { + region: Region::new_test(another_region_id), + leader_peer: None, + follower_peers: vec![another_peer], + }, + ]; + + let mut datanode_regions = vec![ + (region_id, RegionRole::Follower), + (another_region_id, RegionRole::Follower), + ]; + + let inactive_regions = + retain_active_regions(node_id, &mut datanode_regions, ®ion_routes); + + assert_eq!(inactive_regions.len(), 1); + assert!(inactive_regions.contains(&another_region_id)); + assert_eq!(datanode_regions.len(), 1); + assert!(datanode_regions.contains(&(region_id, RegionRole::Follower))); + } +} diff --git a/src/meta-srv/src/region/lease_keeper/mito.rs b/src/meta-srv/src/region/lease_keeper/mito.rs new file mode 100644 index 000000000000..fd3ed84e3fe3 --- /dev/null +++ b/src/meta-srv/src/region/lease_keeper/mito.rs @@ -0,0 +1,160 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use common_meta::rpc::router::{ + convert_to_region_leader_map, convert_to_region_peers_map, RegionRoute, +}; +use store_api::region_engine::RegionRole; +use store_api::storage::RegionId; + +use crate::region::lease_keeper::utils::{inactive_follower_regions, inactive_leader_regions}; + +/// Retains active mito regions(`datanode_regions`), returns inactive regions. +/// +/// It removes a leader region if the `node_id` isn't the corresponding leader peer in `region_routes`. +/// +/// It removes a follower region if the `node_id` isn't one of the peers in `region_routes`. +pub fn retain_active_regions( + node_id: u64, + datanode_regions: &mut Vec<(RegionId, RegionRole)>, + region_routes: &[RegionRoute], +) -> HashSet { + let region_leader_map = convert_to_region_leader_map(region_routes); + let region_peers_map = convert_to_region_peers_map(region_routes); + + let inactive_region_ids = datanode_regions + .clone() + .into_iter() + .filter_map(|(region_id, role)| match role { + RegionRole::Follower => { + inactive_follower_regions(node_id, region_id, ®ion_peers_map) + } + RegionRole::Leader => inactive_leader_regions(node_id, region_id, ®ion_leader_map), + }) + .collect::>(); + + let _ = datanode_regions + .extract_if(|(region_id, _)| inactive_region_ids.contains(region_id)) + .collect::>(); + + inactive_region_ids +} + +#[cfg(test)] +mod tests { + + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute}; + use store_api::region_engine::RegionRole; + use store_api::storage::RegionId; + + use crate::region::lease_keeper::mito::retain_active_regions; + + #[test] + fn test_retain_active_regions() { + let node_id = 1u64; + let region_number = 1u32; + let region_id = RegionId::from_u64(region_number as u64); + let peer = Peer::empty(node_id); + let another_peer = Peer::empty(node_id + 1); + + let datanode_regions = vec![(region_id, RegionRole::Leader)]; + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(peer.clone()), + ..Default::default() + }]; + + // `inactive_regions` should be empty, `region_id` is a active leader region of the `peer` + let inactive_regions = + retain_active_regions(node_id, &mut datanode_regions.clone(), ®ion_routes); + + assert!(inactive_regions.is_empty()); + + let mut retained_active_regions = datanode_regions.clone(); + + // `inactive_regions` should be vec![`region_id`]; + let inactive_regions = retain_active_regions(node_id, &mut retained_active_regions, &[]); + + assert_eq!(inactive_regions.len(), 1); + assert!(inactive_regions.contains(®ion_id)); + assert!(retained_active_regions.is_empty()); + + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(another_peer.clone()), + follower_peers: vec![peer.clone()], + }]; + + let mut retained_active_regions = datanode_regions.clone(); + + // `inactive_regions` should be vec![`region_id`], `region_id` is RegionRole::Leader. + let inactive_regions = + retain_active_regions(node_id, &mut retained_active_regions, ®ion_routes); + + assert_eq!(inactive_regions.len(), 1); + assert!(inactive_regions.contains(®ion_id)); + assert!(retained_active_regions.is_empty()); + + // `inactive_regions` should be empty, `region_id` is a active follower region of the `peer`. + let datanode_regions = vec![(region_id, RegionRole::Follower)]; + let inactive_regions = + retain_active_regions(node_id, &mut datanode_regions.clone(), ®ion_routes); + assert!(inactive_regions.is_empty()); + + let another_region_id = RegionId::from_u64(region_number as u64 + 1); + + let region_routes = vec![ + RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(peer.clone()), + ..Default::default() + }, + RegionRoute { + region: Region::new_test(another_region_id), + leader_peer: None, + follower_peers: vec![peer], + }, + ]; + + let datanode_regions = vec![ + (region_id, RegionRole::Leader), + (another_region_id, RegionRole::Follower), + ]; + + let inactive_regions = + retain_active_regions(node_id, &mut datanode_regions.clone(), ®ion_routes); + + assert!(inactive_regions.is_empty()); + + // `inactive_regions` should be vec![another_region_id]. + let datanode_regions = vec![ + (another_region_id, RegionRole::Leader), + (region_id, RegionRole::Follower), + ]; + + let mut retained_active_regions = datanode_regions.clone(); + + let inactive_regions = + retain_active_regions(node_id, &mut retained_active_regions, ®ion_routes); + + assert_eq!(inactive_regions.len(), 1); + assert!(inactive_regions.contains(&another_region_id)); + + assert_eq!(retained_active_regions.len(), 1); + assert!(retained_active_regions.contains(&(region_id, RegionRole::Follower))); + } +} diff --git a/src/meta-srv/src/region/lease_keeper/utils.rs b/src/meta-srv/src/region/lease_keeper/utils.rs new file mode 100644 index 000000000000..4cec956a6947 --- /dev/null +++ b/src/meta-srv/src/region/lease_keeper/utils.rs @@ -0,0 +1,129 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; + +use common_meta::peer::Peer; +use store_api::storage::RegionId; + +/// Returns Some(region_id) if it's a inactive leader region. +/// +/// It removes a leader region if the `node_id` isn't the corresponding leader peer in `region_routes`. +pub fn inactive_leader_regions( + node_id: u64, + region_id: RegionId, + region_leader_map: &HashMap, +) -> Option { + if let Some(peer) = region_leader_map.get(®ion_id.region_number()) { + // TODO(weny): treats the leader peer as inactive if it's readonly or downgraded. + if peer.id == node_id { + None + } else { + Some(region_id) + } + } else { + Some(region_id) + } +} + +/// Returns Some(region_id) if it's a inactive follower region. +/// +/// It removes a region if the `node_id` isn't one of the peers in `region_routes`. +pub fn inactive_follower_regions( + node_id: u64, + region_id: RegionId, + region_peer_map: &HashMap>, +) -> Option { + if let Some(peers) = region_peer_map.get(®ion_id.region_number()) { + if peers.contains(&node_id) { + None + } else { + Some(region_id) + } + } else { + Some(region_id) + } +} + +#[cfg(test)] +mod tests { + + use common_meta::peer::Peer; + use store_api::storage::RegionId; + + use super::*; + + #[test] + fn test_inactive_follower_regions() { + let node_id = 1u64; + let region_number = 1u32; + let region_id = RegionId::from_u64(region_number as u64); + let peer = Peer::empty(node_id); + + let region_peers_map = [(region_number, HashSet::from([node_id]))].into(); + + // Should be None, `region_id` is a active region of `peer`. + assert_eq!( + None, + inactive_follower_regions(peer.id, region_id, ®ion_peers_map) + ); + + // Should be Some(`region_id`), region_followers_map is empty. + assert_eq!( + Some(region_id), + inactive_follower_regions(peer.id, region_id, &Default::default()) + ); + + let another_peer = Peer::empty(node_id + 1); + + let region_peers_map = [(region_number, HashSet::from([peer.id, another_peer.id]))].into(); + + // Should be None, `region_id` is a active region of `another_peer`. + assert_eq!( + None, + inactive_follower_regions(node_id, region_id, ®ion_peers_map) + ); + } + + #[test] + fn test_inactive_leader_regions() { + let node_id = 1u64; + let region_number = 1u32; + let region_id = RegionId::from_u64(region_number as u64); + let peer = Peer::empty(node_id); + + let region_leader_map = [(region_number, &peer)].into(); + + // Should be None, `region_id` is a active region of `peer`. + assert_eq!( + None, + inactive_leader_regions(node_id, region_id, ®ion_leader_map) + ); + + // Should be Some(`region_id`), the inactive_leader_regions is empty. + assert_eq!( + Some(region_id), + inactive_leader_regions(node_id, region_id, &Default::default()) + ); + + let another_peer = Peer::empty(node_id + 1); + let region_leader_map = [(region_number, &another_peer)].into(); + + // Should be Some(`region_id`), `region_id` is active region of `another_peer`. + assert_eq!( + Some(region_id), + inactive_leader_regions(node_id, region_id, ®ion_leader_map) + ); + } +} diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index 6a7f74d30de2..41b3bef065f8 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -19,7 +19,7 @@ use api::v1::Rows; use common_meta::key::table_route::TableRouteManager; use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; -use common_meta::rpc::router::{convert_to_region_map, RegionRoutes}; +use common_meta::rpc::router::{convert_to_region_leader_map, RegionRoutes}; use common_query::prelude::Expr; use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator}; use datatypes::prelude::Value; @@ -93,7 +93,7 @@ impl PartitionRuleManager { .context(error::FindTableRoutesSnafu { table_id })? .into_inner(); let mut datanodes = HashMap::with_capacity(regions.len()); - let region_map = convert_to_region_map(&route.region_routes); + let region_map = convert_to_region_leader_map(&route.region_routes); for region in regions.iter() { let datanode = *region_map.get(region).context(error::FindDatanodeSnafu { table_id, diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 7f6762562a10..3dbeaa6e74a5 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -21,12 +21,13 @@ use async_trait::async_trait; use common_error::ext::BoxedError; use common_query::Output; use common_recordbatch::SendableRecordBatchStream; +use serde::{Deserialize, Serialize}; use crate::metadata::RegionMetadataRef; use crate::region_request::RegionRequest; use crate::storage::{RegionId, ScanRequest}; -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub enum RegionRole { // Readonly region(mito2), Readonly region(file). Follower, @@ -43,6 +44,15 @@ impl From for PbRegionRole { } } +impl From for RegionRole { + fn from(value: PbRegionRole) -> Self { + match value { + PbRegionRole::Leader => RegionRole::Leader, + PbRegionRole::Follower => RegionRole::Follower, + } + } +} + #[async_trait] pub trait RegionEngine: Send + Sync { /// Name of this engine From 38b784eeb1807ad03717b8d16adac656b068cb74 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 24 Oct 2023 14:05:05 +0000 Subject: [PATCH 2/8] chore: apply suggestions from CR --- src/common/meta/src/key/table_route.rs | 34 ++++++++++++++++ src/common/meta/src/rpc/router.rs | 17 ++------ src/meta-srv/src/lib.rs | 1 - src/meta-srv/src/region/lease_keeper.rs | 41 ++++++-------------- src/meta-srv/src/region/lease_keeper/file.rs | 5 +-- src/meta-srv/src/region/lease_keeper/mito.rs | 4 +- 6 files changed, 52 insertions(+), 50 deletions(-) diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index 57abcf103362..885300c2edff 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::fmt::Display; use serde::{Deserialize, Serialize}; @@ -23,6 +24,7 @@ use crate::key::{to_removed_key, RegionDistribution, TableMetaKey, TABLE_ROUTE_P use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse}; use crate::kv_backend::KvBackendRef; use crate::rpc::router::{region_distribution, RegionRoute}; +use crate::rpc::store::BatchGetRequest; pub struct TableRouteKey { pub table_id: TableId, @@ -197,6 +199,38 @@ impl TableRouteManager { .transpose() } + /// It may return a subset of the `table_ids`. + pub async fn batch_get( + &self, + table_ids: &[TableId], + ) -> Result> { + let lookup_table = table_ids + .iter() + .map(|id| (TableRouteKey::new(*id).as_raw_key(), id)) + .collect::>(); + + let resp = self + .kv_backend + .batch_get(BatchGetRequest { + keys: lookup_table.keys().cloned().collect::>(), + }) + .await?; + + let values = resp + .kvs + .iter() + .map(|kv| { + Ok(( + // Safety: must exist. + **lookup_table.get(kv.key()).unwrap(), + TableRouteValue::try_from_raw_value(&kv.value)?, + )) + }) + .collect::>>()?; + + Ok(values) + } + #[cfg(test)] pub async fn get_removed( &self, diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs index 53189137a6b5..7faa0f4f8ca1 100644 --- a/src/common/meta/src/rpc/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -92,28 +92,19 @@ pub fn convert_to_region_followers_map( /// Returns the HashMap<[RegionNumber], HashSet>. pub fn convert_to_region_peers_map(region_routes: &[RegionRoute]) -> HashMap> { - let mut set = region_routes + region_routes .iter() .map(|x| { ( x.region.id.region_number(), x.follower_peers .iter() - .map(|peer| (peer.id)) + .chain(&x.leader_peer) + .map(|peer| peer.id) .collect::>(), ) }) - .collect::>(); - - for route in region_routes { - if let Some(peer) = &route.leader_peer { - let entry = set.entry(route.region.id.region_number()).or_default(); - - entry.insert(peer.id); - } - } - - set + .collect::>() } pub fn find_region_leader(region_routes: &[RegionRoute], region_number: u32) -> Option<&Peer> { diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index 18c966502b00..77f8f3f82145 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -14,7 +14,6 @@ #![feature(async_closure)] #![feature(result_flattening)] -#![feature(extract_if)] pub mod bootstrap; mod cache_invalidator; diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs index 53bc56092832..dc33abc6c9d0 100644 --- a/src/meta-srv/src/region/lease_keeper.rs +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -20,7 +20,6 @@ use std::collections::{HashMap, HashSet}; use common_catalog::consts::{FILE_ENGINE, MITO2_ENGINE}; use common_meta::key::TableMetadataManagerRef; -use futures::future::try_join_all; use snafu::ResultExt; use store_api::region_engine::RegionRole; use store_api::storage::{RegionId, TableId}; @@ -82,47 +81,29 @@ impl RegionLeaseKeeper { table.push((*region_id, *role)); } - let table_ids: Vec = tables.keys().cloned().collect::>(); - let metadata = try_join_all( - table_ids - .iter() - .map(|table_id| table_route_manager.get(*table_id)), - ) - .await - .context(error::TableMetadataManagerSnafu)?; - - // All tables' metadata. - let tables_metadata = table_ids - .into_iter() - .zip(metadata) - .collect::>>(); + let table_ids = tables.keys().cloned().collect::>(); + + // The subset of all table metadata. + let metadata_subset = table_route_manager + .batch_get(&table_ids) + .await + .context(error::TableMetadataManagerSnafu)?; let mut inactive_regions = HashSet::new(); // Removes inactive regions. - for (table_id, metadata) in tables_metadata { - if let Some(metadata) = metadata { - // Safety: Value must exist. - let regions = tables.get_mut(&table_id).unwrap(); + for (table_id, regions) in &mut tables { + if let Some(metadata) = metadata_subset.get(table_id) { let region_routes = &metadata.region_routes; inactive_regions.extend(handler(node_id, regions, region_routes)); } else { // Removes table if metadata is not found. - // Safety: Value must exist. - let regions = tables - .remove(&table_id) - .unwrap() - .into_iter() - .map(|(region_id, _)| region_id); - - inactive_regions.extend(regions); + inactive_regions.extend(regions.drain(..).map(|(region_id, _)| region_id)); } } - let _ = datanode_regions - .extract_if(|(region_id, _)| inactive_regions.contains(region_id)) - .collect::>(); + datanode_regions.retain(|(region_id, _)| !inactive_regions.contains(region_id)); Ok(inactive_regions) } diff --git a/src/meta-srv/src/region/lease_keeper/file.rs b/src/meta-srv/src/region/lease_keeper/file.rs index c0148584359e..903ad968c6e6 100644 --- a/src/meta-srv/src/region/lease_keeper/file.rs +++ b/src/meta-srv/src/region/lease_keeper/file.rs @@ -38,9 +38,8 @@ pub fn retain_active_regions( }) .collect::>(); - let _ = datanode_regions - .extract_if(|(region_id, _)| inactive_region_ids.contains(region_id)) - .collect::>(); + datanode_regions.retain(|(region_id, _)| !inactive_region_ids.contains(region_id)); + inactive_region_ids } diff --git a/src/meta-srv/src/region/lease_keeper/mito.rs b/src/meta-srv/src/region/lease_keeper/mito.rs index fd3ed84e3fe3..b131f181c00e 100644 --- a/src/meta-srv/src/region/lease_keeper/mito.rs +++ b/src/meta-srv/src/region/lease_keeper/mito.rs @@ -46,9 +46,7 @@ pub fn retain_active_regions( }) .collect::>(); - let _ = datanode_regions - .extract_if(|(region_id, _)| inactive_region_ids.contains(region_id)) - .collect::>(); + datanode_regions.retain(|(region_id, _)| !inactive_region_ids.contains(region_id)); inactive_region_ids } From 356fc8ace13d3ff8315c28a48f39cae30dbc7ef3 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 31 Oct 2023 04:39:40 +0000 Subject: [PATCH 3/8] refactor: simplify `retain_active_regions` --- src/meta-srv/src/region/lease_keeper.rs | 116 ++++-------- src/meta-srv/src/region/lease_keeper/file.rs | 166 ------------------ src/meta-srv/src/region/lease_keeper/mito.rs | 96 +++------- src/meta-srv/src/region/lease_keeper/utils.rs | 65 +------ 4 files changed, 64 insertions(+), 379 deletions(-) delete mode 100644 src/meta-srv/src/region/lease_keeper/file.rs diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs index dc33abc6c9d0..5eb91feaf17b 100644 --- a/src/meta-srv/src/region/lease_keeper.rs +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -12,20 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod file; pub mod mito; pub mod utils; use std::collections::{HashMap, HashSet}; -use common_catalog::consts::{FILE_ENGINE, MITO2_ENGINE}; use common_meta::key::TableMetadataManagerRef; use snafu::ResultExt; -use store_api::region_engine::RegionRole; use store_api::storage::{RegionId, TableId}; +use self::mito::retain_active_regions; use crate::error::{self, Result}; +/// Region Lease Keeper removes any inactive [RegionRole::Leader] regions. pub struct RegionLeaseKeeper { table_metadata_manager: TableMetadataManagerRef, } @@ -39,46 +38,27 @@ impl RegionLeaseKeeper { } impl RegionLeaseKeeper { - /// Retains active mito regions(`datanode_regions`), returns inactive regions. - /// - /// **For mito regions:** - /// - /// - It removes a leader region if the `node_id` isn't the corresponding leader peer in `region_routes`. - /// - It removes a follower region if the `node_id` isn't one of the peers in `region_routes`. - /// - /// **For file regions:** - /// - /// - It removes a follower region if the `node_id` isn't one of the peers in `region_routes`. - /// - /// **Common behaviors:** + /// Retains active [RegionRole::Leader] regions, returns inactive regions. /// + /// - It grants new lease if the `datanode_id` is the corresponding leader peer in `region_routes`. + /// - It removes a leader region if the `datanode_id` isn't the corresponding leader peer in `region_routes`. + /// - Expected as [RegionRole::Follower] regions. + /// - Unexpected [RegionRole::Leader] regions. /// - It removes a region if the region's table metadata is not found. pub async fn retain_active_regions( &self, _cluster_id: u64, - node_id: u64, - engine: &str, - datanode_regions: &mut Vec<(RegionId, RegionRole)>, + datanode_id: u64, + datanode_regions: &mut Vec, ) -> Result> { let table_route_manager = self.table_metadata_manager.table_route_manager(); - let handler = match engine { - MITO2_ENGINE => mito::retain_active_regions, - FILE_ENGINE => file::retain_active_regions, - _ => { - return error::UnexpectedSnafu { - violated: format!("Unknown engine: {engine}"), - } - .fail() - } - }; - - let mut tables = HashMap::>::new(); + let mut tables = HashMap::>::new(); // Group by `table_id`. - for (region_id, role) in datanode_regions.iter() { + for region_id in datanode_regions.iter() { let table = tables.entry(region_id.table_id()).or_default(); - table.push((*region_id, *role)); + table.push(*region_id); } let table_ids = tables.keys().cloned().collect::>(); @@ -96,14 +76,14 @@ impl RegionLeaseKeeper { if let Some(metadata) = metadata_subset.get(table_id) { let region_routes = &metadata.region_routes; - inactive_regions.extend(handler(node_id, regions, region_routes)); + inactive_regions.extend(retain_active_regions(datanode_id, regions, region_routes)); } else { // Removes table if metadata is not found. - inactive_regions.extend(regions.drain(..).map(|(region_id, _)| region_id)); + inactive_regions.extend(regions.drain(..)); } } - datanode_regions.retain(|(region_id, _)| !inactive_regions.contains(region_id)); + datanode_regions.retain(|region_id| !inactive_regions.contains(region_id)); Ok(inactive_regions) } @@ -118,12 +98,10 @@ impl RegionLeaseKeeper { mod tests { use std::sync::Arc; - use common_catalog::consts::MITO2_ENGINE; use common_meta::key::test_utils::new_test_table_info; use common_meta::key::TableMetadataManager; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; - use store_api::region_engine::RegionRole; use store_api::storage::RegionId; use super::RegionLeaseKeeper; @@ -140,17 +118,16 @@ mod tests { #[tokio::test] async fn test_empty_table_routes() { - let node_id = 1; - let engine = MITO2_ENGINE; + let datanode_id = 1; let region_number = 1u32; let region_id = RegionId::from_u64(region_number as u64); let keeper = new_test_keeper(); - let mut datanode_regions = vec![(region_id, RegionRole::Leader)]; + let mut datanode_regions = vec![region_id]; let removed = keeper - .retain_active_regions(0, node_id, engine, &mut datanode_regions) + .retain_active_regions(0, datanode_id, &mut datanode_regions) .await .unwrap(); @@ -161,12 +138,11 @@ mod tests { #[tokio::test] async fn test_retain_active_regions_simple() { - let node_id = 1; - let engine = MITO2_ENGINE; + let datanode_id = 1; let region_number = 1u32; let table_id = 10; let region_id = RegionId::new(table_id, region_number); - let peer = Peer::empty(node_id); + let peer = Peer::empty(datanode_id); let table_info = new_test_table_info(table_id, vec![region_number]).into(); let region_routes = vec![RegionRoute { @@ -183,22 +159,22 @@ mod tests { .await .unwrap(); - // `inactive_regions` should be vec![region_id]. - let mut datanode_regions = vec![(region_id, RegionRole::Leader)]; + // `inactive_regions` should be empty. + let mut datanode_regions = vec![region_id]; let inactive_regions = keeper - .retain_active_regions(0, node_id, engine, &mut datanode_regions) + .retain_active_regions(0, datanode_id, &mut datanode_regions) .await .unwrap(); assert!(inactive_regions.is_empty()); assert_eq!(datanode_regions.len(), 1); - // `inactive_regions` should be empty, because the `region_id` is a potential leader. - let mut datanode_regions = vec![(region_id, RegionRole::Follower)]; + // `inactive_regions` should be empty. + let mut datanode_regions = vec![]; let inactive_regions = keeper - .retain_active_regions(0, node_id, engine, &mut datanode_regions) + .retain_active_regions(0, datanode_id, &mut datanode_regions) .await .unwrap(); @@ -207,16 +183,15 @@ mod tests { #[tokio::test] async fn test_retain_active_regions_2() { - let node_id = 1; - let engine = MITO2_ENGINE; + let datanode_id = 1; let region_number = 1u32; let table_id = 10; let region_id = RegionId::new(table_id, region_number); let another_region_id = RegionId::new(table_id, region_number + 1); let unknown_region_id = RegionId::new(table_id + 1, region_number); - let peer = Peer::empty(node_id); - let another_peer = Peer::empty(node_id + 1); + let peer = Peer::empty(datanode_id); + let another_peer = Peer::empty(datanode_id + 1); let table_info = new_test_table_info(table_id, vec![region_number, region_number + 1]).into(); @@ -242,14 +217,12 @@ mod tests { .await .unwrap(); + // Unexpected Leader region. // `inactive_regions` should be vec![unknown_region_id]. - let mut datanode_regions = vec![ - (region_id, RegionRole::Leader), - (unknown_region_id, RegionRole::Follower), - ]; + let mut datanode_regions = vec![region_id, unknown_region_id]; let inactive_regions = keeper - .retain_active_regions(0, node_id, engine, &mut datanode_regions) + .retain_active_regions(0, datanode_id, &mut datanode_regions) .await .unwrap(); @@ -257,34 +230,17 @@ mod tests { assert!(inactive_regions.contains(&unknown_region_id)); assert_eq!(datanode_regions.len(), 1); + // Expected as Follower region. // `inactive_regions` should be vec![another_region_id], because the `another_region_id` is a active region of `another_peer`. - let mut datanode_regions = vec![ - (region_id, RegionRole::Follower), - (another_region_id, RegionRole::Follower), - ]; - - let inactive_regions = keeper - .retain_active_regions(0, node_id, engine, &mut datanode_regions) - .await - .unwrap(); - - assert_eq!(inactive_regions.len(), 1); - assert!(inactive_regions.contains(&another_region_id)); - assert_eq!(datanode_regions.len(), 1); - - // `inactive_regions` should be vec![another_region_id], because the `another_region_id` is a active region of `another_peer`. - let mut datanode_regions = vec![ - (region_id, RegionRole::Follower), - (another_region_id, RegionRole::Leader), - ]; + let mut datanode_regions = vec![another_region_id]; let inactive_regions = keeper - .retain_active_regions(0, node_id, engine, &mut datanode_regions) + .retain_active_regions(0, datanode_id, &mut datanode_regions) .await .unwrap(); assert_eq!(inactive_regions.len(), 1); assert!(inactive_regions.contains(&another_region_id)); - assert_eq!(datanode_regions.len(), 1); + assert!(datanode_regions.is_empty()); } } diff --git a/src/meta-srv/src/region/lease_keeper/file.rs b/src/meta-srv/src/region/lease_keeper/file.rs deleted file mode 100644 index 903ad968c6e6..000000000000 --- a/src/meta-srv/src/region/lease_keeper/file.rs +++ /dev/null @@ -1,166 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashSet; - -use common_meta::rpc::router::{convert_to_region_peers_map, RegionRoute}; -use store_api::region_engine::RegionRole; -use store_api::storage::RegionId; - -use super::utils::inactive_follower_regions; - -/// Retains active mito regions(`datanode_regions`), returns inactive regions. -/// -/// It removes a region if the `node_id` isn't one of the peers in `region_routes`. -pub fn retain_active_regions( - node_id: u64, - datanode_regions: &mut Vec<(RegionId, RegionRole)>, - region_routes: &[RegionRoute], -) -> HashSet { - let region_peers_map = convert_to_region_peers_map(region_routes); - - let inactive_region_ids = datanode_regions - .clone() - .into_iter() - .filter_map(|(region_id, _)| { - inactive_follower_regions(node_id, region_id, ®ion_peers_map) - }) - .collect::>(); - - datanode_regions.retain(|(region_id, _)| !inactive_region_ids.contains(region_id)); - - inactive_region_ids -} - -#[cfg(test)] -mod tests { - - use common_meta::peer::Peer; - use common_meta::rpc::router::{Region, RegionRoute}; - use store_api::region_engine::RegionRole; - use store_api::storage::RegionId; - - use crate::region::lease_keeper::file::retain_active_regions; - - #[test] - fn test_retain_active_regions() { - let node_id = 1u64; - let region_number = 1u32; - let region_id = RegionId::from_u64(region_number as u64); - let peer = Peer::empty(node_id); - - let another_region_id = RegionId::from_u64(region_number as u64 + 1); - let another_peer = Peer::empty(node_id + 1); - - let datanode_regions = vec![(region_id, RegionRole::Follower)]; - let region_routes = vec![RegionRoute { - region: Region::new_test(region_id), - leader_peer: Some(peer.clone()), - ..Default::default() - }]; - - // `inactive_regions` should be empty, `region_id` is a active region of the `peer` - let inactive_regions = - retain_active_regions(node_id, &mut datanode_regions.clone(), ®ion_routes); - - assert!(inactive_regions.is_empty()); - - let region_routes = vec![RegionRoute { - region: Region::new_test(region_id), - leader_peer: None, - follower_peers: vec![peer.clone()], - }]; - - // `inactive_regions` should be empty, `region_id` is a active region of the `peer` - let inactive_regions = - retain_active_regions(node_id, &mut datanode_regions.clone(), ®ion_routes); - - assert!(inactive_regions.is_empty()); - - let region_routes = vec![ - RegionRoute { - region: Region::new_test(region_id), - leader_peer: Some(peer.clone()), - ..Default::default() - }, - RegionRoute { - region: Region::new_test(another_region_id), - leader_peer: None, - follower_peers: vec![peer.clone()], - }, - ]; - - // `inactive_regions` should be empty, `region_id` is a active region of the `peer` - let inactive_regions = - retain_active_regions(node_id, &mut datanode_regions.clone(), ®ion_routes); - - assert!(inactive_regions.is_empty()); - - // `inactive_regions` should be vec[`region_id`,`another_region_id`], - // both regions are the active region of the `another_peer`. - let region_routes = vec![ - RegionRoute { - region: Region::new_test(region_id), - leader_peer: Some(another_peer.clone()), - ..Default::default() - }, - RegionRoute { - region: Region::new_test(another_region_id), - leader_peer: None, - follower_peers: vec![another_peer.clone()], - }, - ]; - - let mut datanode_regions = vec![ - (region_id, RegionRole::Follower), - (another_region_id, RegionRole::Follower), - ]; - - let inactive_regions = - retain_active_regions(node_id, &mut datanode_regions, ®ion_routes); - - assert_eq!(inactive_regions.len(), 2); - assert!(inactive_regions.contains(®ion_id)); - assert!(inactive_regions.contains(&another_region_id)); - assert!(datanode_regions.is_empty()); - - // `inactive_regions` should be vec[`another_region_id`], - // `another_region_id` regions are the active region of the `another_peer`. - let region_routes = vec![ - RegionRoute { - region: Region::new_test(region_id), - leader_peer: Some(peer.clone()), - ..Default::default() - }, - RegionRoute { - region: Region::new_test(another_region_id), - leader_peer: None, - follower_peers: vec![another_peer], - }, - ]; - - let mut datanode_regions = vec![ - (region_id, RegionRole::Follower), - (another_region_id, RegionRole::Follower), - ]; - - let inactive_regions = - retain_active_regions(node_id, &mut datanode_regions, ®ion_routes); - - assert_eq!(inactive_regions.len(), 1); - assert!(inactive_regions.contains(&another_region_id)); - assert_eq!(datanode_regions.len(), 1); - assert!(datanode_regions.contains(&(region_id, RegionRole::Follower))); - } -} diff --git a/src/meta-srv/src/region/lease_keeper/mito.rs b/src/meta-srv/src/region/lease_keeper/mito.rs index b131f181c00e..497ea8726704 100644 --- a/src/meta-srv/src/region/lease_keeper/mito.rs +++ b/src/meta-srv/src/region/lease_keeper/mito.rs @@ -14,39 +14,30 @@ use std::collections::HashSet; -use common_meta::rpc::router::{ - convert_to_region_leader_map, convert_to_region_peers_map, RegionRoute, -}; -use store_api::region_engine::RegionRole; +use common_meta::rpc::router::{convert_to_region_leader_map, RegionRoute}; use store_api::storage::RegionId; -use crate::region::lease_keeper::utils::{inactive_follower_regions, inactive_leader_regions}; +use crate::region::lease_keeper::utils::inactive_leader_regions; -/// Retains active mito regions(`datanode_regions`), returns inactive regions. +/// Retains active [RegionRole::Leader](store_api::region_engine::RegionRole::Leader) regions(`datanode_regions`), returns inactive regions. /// -/// It removes a leader region if the `node_id` isn't the corresponding leader peer in `region_routes`. -/// -/// It removes a follower region if the `node_id` isn't one of the peers in `region_routes`. +/// It removes a leader region if the `datanode_id` isn't the corresponding leader peer in `region_routes`. +/// - Expected as [RegionRole::Follower](store_api::region_engine::RegionRole::Follower) regions. +/// - Unexpected [RegionRole::Leader](store_api::region_engine::RegionRole::Leader) regions. pub fn retain_active_regions( - node_id: u64, - datanode_regions: &mut Vec<(RegionId, RegionRole)>, + datanode_id: u64, + datanode_regions: &mut Vec, region_routes: &[RegionRoute], ) -> HashSet { let region_leader_map = convert_to_region_leader_map(region_routes); - let region_peers_map = convert_to_region_peers_map(region_routes); let inactive_region_ids = datanode_regions .clone() .into_iter() - .filter_map(|(region_id, role)| match role { - RegionRole::Follower => { - inactive_follower_regions(node_id, region_id, ®ion_peers_map) - } - RegionRole::Leader => inactive_leader_regions(node_id, region_id, ®ion_leader_map), - }) + .filter_map(|region_id| inactive_leader_regions(datanode_id, region_id, ®ion_leader_map)) .collect::>(); - datanode_regions.retain(|(region_id, _)| !inactive_region_ids.contains(region_id)); + datanode_regions.retain(|region_id| !inactive_region_ids.contains(region_id)); inactive_region_ids } @@ -56,36 +47,38 @@ mod tests { use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; - use store_api::region_engine::RegionRole; use store_api::storage::RegionId; use crate::region::lease_keeper::mito::retain_active_regions; #[test] fn test_retain_active_regions() { - let node_id = 1u64; + let datanode_id = 1u64; let region_number = 1u32; let region_id = RegionId::from_u64(region_number as u64); - let peer = Peer::empty(node_id); - let another_peer = Peer::empty(node_id + 1); + let peer = Peer::empty(datanode_id); + let another_peer = Peer::empty(datanode_id + 1); - let datanode_regions = vec![(region_id, RegionRole::Leader)]; + let datanode_regions = vec![region_id]; let region_routes = vec![RegionRoute { region: Region::new_test(region_id), leader_peer: Some(peer.clone()), ..Default::default() }]; + // Grants lease. // `inactive_regions` should be empty, `region_id` is a active leader region of the `peer` let inactive_regions = - retain_active_regions(node_id, &mut datanode_regions.clone(), ®ion_routes); + retain_active_regions(datanode_id, &mut datanode_regions.clone(), ®ion_routes); assert!(inactive_regions.is_empty()); let mut retained_active_regions = datanode_regions.clone(); + // Unexpected Leader region. // `inactive_regions` should be vec![`region_id`]; - let inactive_regions = retain_active_regions(node_id, &mut retained_active_regions, &[]); + let inactive_regions = + retain_active_regions(datanode_id, &mut retained_active_regions, &[]); assert_eq!(inactive_regions.len(), 1); assert!(inactive_regions.contains(®ion_id)); @@ -99,60 +92,13 @@ mod tests { let mut retained_active_regions = datanode_regions.clone(); + // Expected as Follower region. // `inactive_regions` should be vec![`region_id`], `region_id` is RegionRole::Leader. let inactive_regions = - retain_active_regions(node_id, &mut retained_active_regions, ®ion_routes); + retain_active_regions(datanode_id, &mut retained_active_regions, ®ion_routes); assert_eq!(inactive_regions.len(), 1); assert!(inactive_regions.contains(®ion_id)); assert!(retained_active_regions.is_empty()); - - // `inactive_regions` should be empty, `region_id` is a active follower region of the `peer`. - let datanode_regions = vec![(region_id, RegionRole::Follower)]; - let inactive_regions = - retain_active_regions(node_id, &mut datanode_regions.clone(), ®ion_routes); - assert!(inactive_regions.is_empty()); - - let another_region_id = RegionId::from_u64(region_number as u64 + 1); - - let region_routes = vec![ - RegionRoute { - region: Region::new_test(region_id), - leader_peer: Some(peer.clone()), - ..Default::default() - }, - RegionRoute { - region: Region::new_test(another_region_id), - leader_peer: None, - follower_peers: vec![peer], - }, - ]; - - let datanode_regions = vec![ - (region_id, RegionRole::Leader), - (another_region_id, RegionRole::Follower), - ]; - - let inactive_regions = - retain_active_regions(node_id, &mut datanode_regions.clone(), ®ion_routes); - - assert!(inactive_regions.is_empty()); - - // `inactive_regions` should be vec![another_region_id]. - let datanode_regions = vec![ - (another_region_id, RegionRole::Leader), - (region_id, RegionRole::Follower), - ]; - - let mut retained_active_regions = datanode_regions.clone(); - - let inactive_regions = - retain_active_regions(node_id, &mut retained_active_regions, ®ion_routes); - - assert_eq!(inactive_regions.len(), 1); - assert!(inactive_regions.contains(&another_region_id)); - - assert_eq!(retained_active_regions.len(), 1); - assert!(retained_active_regions.contains(&(region_id, RegionRole::Follower))); } } diff --git a/src/meta-srv/src/region/lease_keeper/utils.rs b/src/meta-srv/src/region/lease_keeper/utils.rs index 4cec956a6947..7acd0150c797 100644 --- a/src/meta-srv/src/region/lease_keeper/utils.rs +++ b/src/meta-srv/src/region/lease_keeper/utils.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use common_meta::peer::Peer; use store_api::storage::RegionId; @@ -37,25 +37,6 @@ pub fn inactive_leader_regions( } } -/// Returns Some(region_id) if it's a inactive follower region. -/// -/// It removes a region if the `node_id` isn't one of the peers in `region_routes`. -pub fn inactive_follower_regions( - node_id: u64, - region_id: RegionId, - region_peer_map: &HashMap>, -) -> Option { - if let Some(peers) = region_peer_map.get(®ion_id.region_number()) { - if peers.contains(&node_id) { - None - } else { - Some(region_id) - } - } else { - Some(region_id) - } -} - #[cfg(test)] mod tests { @@ -64,66 +45,34 @@ mod tests { use super::*; - #[test] - fn test_inactive_follower_regions() { - let node_id = 1u64; - let region_number = 1u32; - let region_id = RegionId::from_u64(region_number as u64); - let peer = Peer::empty(node_id); - - let region_peers_map = [(region_number, HashSet::from([node_id]))].into(); - - // Should be None, `region_id` is a active region of `peer`. - assert_eq!( - None, - inactive_follower_regions(peer.id, region_id, ®ion_peers_map) - ); - - // Should be Some(`region_id`), region_followers_map is empty. - assert_eq!( - Some(region_id), - inactive_follower_regions(peer.id, region_id, &Default::default()) - ); - - let another_peer = Peer::empty(node_id + 1); - - let region_peers_map = [(region_number, HashSet::from([peer.id, another_peer.id]))].into(); - - // Should be None, `region_id` is a active region of `another_peer`. - assert_eq!( - None, - inactive_follower_regions(node_id, region_id, ®ion_peers_map) - ); - } - #[test] fn test_inactive_leader_regions() { - let node_id = 1u64; + let datanode_id = 1u64; let region_number = 1u32; let region_id = RegionId::from_u64(region_number as u64); - let peer = Peer::empty(node_id); + let peer = Peer::empty(datanode_id); let region_leader_map = [(region_number, &peer)].into(); // Should be None, `region_id` is a active region of `peer`. assert_eq!( None, - inactive_leader_regions(node_id, region_id, ®ion_leader_map) + inactive_leader_regions(datanode_id, region_id, ®ion_leader_map) ); // Should be Some(`region_id`), the inactive_leader_regions is empty. assert_eq!( Some(region_id), - inactive_leader_regions(node_id, region_id, &Default::default()) + inactive_leader_regions(datanode_id, region_id, &Default::default()) ); - let another_peer = Peer::empty(node_id + 1); + let another_peer = Peer::empty(datanode_id + 1); let region_leader_map = [(region_number, &another_peer)].into(); // Should be Some(`region_id`), `region_id` is active region of `another_peer`. assert_eq!( Some(region_id), - inactive_leader_regions(node_id, region_id, ®ion_leader_map) + inactive_leader_regions(datanode_id, region_id, ®ion_leader_map) ); } } From 9f348a67946f53e0bd1ea1a255718662d4cc0a2f Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 31 Oct 2023 07:30:38 +0000 Subject: [PATCH 4/8] refactor: remove Default of RegionStat --- src/datanode/src/heartbeat.rs | 4 +++- src/meta-srv/src/handler/node_stat.rs | 14 -------------- src/meta-srv/src/handler/region_lease_handler.rs | 8 +++++++- 3 files changed, 10 insertions(+), 16 deletions(-) diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index a829d969df86..4e2242288c05 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -293,7 +293,9 @@ impl HeartbeatTask { role: RegionRole::from(stat.role).into(), approximate_bytes, // TODO(ruihang): scratch more info - ..Default::default() + rcus: 0, + wcus: 0, + approximate_rows: 0, }; region_stats.push(region_stat); } diff --git a/src/meta-srv/src/handler/node_stat.rs b/src/meta-srv/src/handler/node_stat.rs index 08152749e7f5..e48168be94a0 100644 --- a/src/meta-srv/src/handler/node_stat.rs +++ b/src/meta-srv/src/handler/node_stat.rs @@ -59,20 +59,6 @@ pub struct RegionStat { pub role: RegionRole, } -impl Default for RegionStat { - fn default() -> Self { - Self { - id: 0, - rcus: 0, - wcus: 0, - approximate_bytes: 0, - approximate_rows: 0, - engine: String::new(), - role: RegionRole::Follower, - } - } -} - impl Stat { #[inline] pub fn is_empty(&self) -> bool { diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 5090d1e1b64b..fc156ff99149 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -72,6 +72,7 @@ mod test { use common_meta::key::TableMetadataManager; use common_meta::{distributed_time_constants, RegionIdent}; + use store_api::region_engine::RegionRole; use store_api::storage::{RegionId, RegionNumber}; use super::*; @@ -110,7 +111,12 @@ mod test { let region_id = RegionId::new(table_id, region_number); RegionStat { id: region_id.as_u64(), - ..Default::default() + rcus: 0, + wcus: 0, + approximate_bytes: 0, + approximate_rows: 0, + engine: String::new(), + role: RegionRole::Leader, } }; acc.stat = Some(Stat { From a1a968c9c2437c22a6e102176ea6b8b07c977b3d Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 31 Oct 2023 07:35:03 +0000 Subject: [PATCH 5/8] chore: add todo comments --- src/meta-srv/src/region/lease_keeper.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs index 5eb91feaf17b..2418e8750fa7 100644 --- a/src/meta-srv/src/region/lease_keeper.rs +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -64,6 +64,7 @@ impl RegionLeaseKeeper { let table_ids = tables.keys().cloned().collect::>(); // The subset of all table metadata. + // TODO: considers storing all active regions in meta's memory. let metadata_subset = table_route_manager .batch_get(&table_ids) .await From be45ec24e7be619a2a0c9cd27467574050bbf468 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 31 Oct 2023 08:18:26 +0000 Subject: [PATCH 6/8] chore: apply suggestions from CR --- src/common/meta/src/rpc/router.rs | 37 +++++++++---------------------- 1 file changed, 10 insertions(+), 27 deletions(-) diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs index 7faa0f4f8ca1..3f448472fceb 100644 --- a/src/common/meta/src/rpc/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -61,7 +61,7 @@ pub fn find_leaders(region_routes: &[RegionRoute]) -> HashSet { /// Returns the HashMap<[RegionNumber], &[Peer]>; /// /// If the region doesn't have a leader peer, the [Region] will be omitted. -pub fn convert_to_region_leader_map(region_routes: &[RegionRoute]) -> HashMap { +pub fn convert_to_region_leader_map(region_routes: &[RegionRoute]) -> HashMap { region_routes .iter() .filter_map(|x| { @@ -72,26 +72,10 @@ pub fn convert_to_region_leader_map(region_routes: &[RegionRoute]) -> HashMap>() } -/// Returns the HashMap<[RegionNumber], BTreeMap>. -pub fn convert_to_region_followers_map( - region_routes: &[RegionRoute], -) -> HashMap> { - region_routes - .iter() - .map(|x| { - ( - x.region.id.region_number(), - x.follower_peers - .iter() - .map(|peer| (peer.id, peer)) - .collect::>(), - ) - }) - .collect::>() -} - /// Returns the HashMap<[RegionNumber], HashSet>. -pub fn convert_to_region_peers_map(region_routes: &[RegionRoute]) -> HashMap> { +pub fn convert_to_region_peers_map( + region_routes: &[RegionRoute], +) -> HashMap> { region_routes .iter() .map(|x| { @@ -107,7 +91,10 @@ pub fn convert_to_region_peers_map(region_routes: &[RegionRoute]) -> HashMap>() } -pub fn find_region_leader(region_routes: &[RegionRoute], region_number: u32) -> Option<&Peer> { +pub fn find_region_leader( + region_routes: &[RegionRoute], + region_number: RegionNumber, +) -> Option<&Peer> { region_routes .iter() .find(|x| x.region.id.region_number() == region_number) @@ -279,15 +266,11 @@ impl RegionRoute { pub struct RegionRoutes(pub Vec); impl RegionRoutes { - pub fn region_leader_map(&self) -> HashMap { + pub fn region_leader_map(&self) -> HashMap { convert_to_region_leader_map(&self.0) } - pub fn region_followers_map(&self) -> HashMap> { - convert_to_region_followers_map(&self.0) - } - - pub fn find_region_leader(&self, region_number: u32) -> Option<&Peer> { + pub fn find_region_leader(&self, region_number: RegionNumber) -> Option<&Peer> { self.region_leader_map().get(®ion_number).copied() } } From 67d51a59b18f7c37f16ec0f00f53db28753203c6 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 2 Nov 2023 04:16:49 +0000 Subject: [PATCH 7/8] refactor: simplify `retain_active_regions` --- src/common/meta/src/rpc/router.rs | 19 ---- src/meta-srv/src/region/lease_keeper.rs | 97 +++++++++---------- src/meta-srv/src/region/lease_keeper/mito.rs | 60 +++++------- src/meta-srv/src/region/lease_keeper/utils.rs | 8 +- 4 files changed, 75 insertions(+), 109 deletions(-) diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs index 3f448472fceb..8118e049e19c 100644 --- a/src/common/meta/src/rpc/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -72,25 +72,6 @@ pub fn convert_to_region_leader_map(region_routes: &[RegionRoute]) -> HashMap>() } -/// Returns the HashMap<[RegionNumber], HashSet>. -pub fn convert_to_region_peers_map( - region_routes: &[RegionRoute], -) -> HashMap> { - region_routes - .iter() - .map(|x| { - ( - x.region.id.region_number(), - x.follower_peers - .iter() - .chain(&x.leader_peer) - .map(|peer| peer.id) - .collect::>(), - ) - }) - .collect::>() -} - pub fn find_region_leader( region_routes: &[RegionRoute], region_number: RegionNumber, diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs index 2418e8750fa7..fc9f76503ccf 100644 --- a/src/meta-srv/src/region/lease_keeper.rs +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -21,10 +21,9 @@ use common_meta::key::TableMetadataManagerRef; use snafu::ResultExt; use store_api::storage::{RegionId, TableId}; -use self::mito::retain_active_regions; +use self::mito::find_staled_leader_regions; use crate::error::{self, Result}; -/// Region Lease Keeper removes any inactive [RegionRole::Leader] regions. pub struct RegionLeaseKeeper { table_metadata_manager: TableMetadataManagerRef, } @@ -38,18 +37,17 @@ impl RegionLeaseKeeper { } impl RegionLeaseKeeper { - /// Retains active [RegionRole::Leader] regions, returns inactive regions. + /// Returns staled [RegionRole::Leader](store_api::region_engine::RegionRole::Leader) regions. /// - /// - It grants new lease if the `datanode_id` is the corresponding leader peer in `region_routes`. - /// - It removes a leader region if the `datanode_id` isn't the corresponding leader peer in `region_routes`. - /// - Expected as [RegionRole::Follower] regions. - /// - Unexpected [RegionRole::Leader] regions. - /// - It removes a region if the region's table metadata is not found. - pub async fn retain_active_regions( + /// - It returns a region if the `datanode_id` isn't the corresponding leader peer in `region_routes`. + /// - Expected as [RegionRole::Follower](store_api::region_engine::RegionRole::Follower) regions. + /// - Unexpected [RegionRole::Leader](store_api::region_engine::RegionRole::Leader) regions. + /// - It returns a region if the region's table metadata is not found. + pub async fn find_staled_leader_regions( &self, _cluster_id: u64, datanode_id: u64, - datanode_regions: &mut Vec, + datanode_regions: &[RegionId], ) -> Result> { let table_route_manager = self.table_metadata_manager.table_route_manager(); @@ -70,23 +68,24 @@ impl RegionLeaseKeeper { .await .context(error::TableMetadataManagerSnafu)?; - let mut inactive_regions = HashSet::new(); + let mut staled_regions = HashSet::new(); - // Removes inactive regions. for (table_id, regions) in &mut tables { if let Some(metadata) = metadata_subset.get(table_id) { let region_routes = &metadata.region_routes; - inactive_regions.extend(retain_active_regions(datanode_id, regions, region_routes)); + staled_regions.extend(find_staled_leader_regions( + datanode_id, + regions, + region_routes, + )); } else { - // Removes table if metadata is not found. - inactive_regions.extend(regions.drain(..)); + // If table metadata is not found. + staled_regions.extend(regions.drain(..)); } } - datanode_regions.retain(|region_id| !inactive_regions.contains(region_id)); - - Ok(inactive_regions) + Ok(staled_regions) } #[cfg(test)] @@ -125,20 +124,19 @@ mod tests { let keeper = new_test_keeper(); - let mut datanode_regions = vec![region_id]; + let datanode_regions = vec![region_id]; - let removed = keeper - .retain_active_regions(0, datanode_id, &mut datanode_regions) + let staled_regions = keeper + .find_staled_leader_regions(0, datanode_id, &datanode_regions) .await .unwrap(); - assert!(datanode_regions.is_empty()); - assert_eq!(removed.len(), 1); - assert!(removed.contains(®ion_id)); + assert_eq!(staled_regions.len(), 1); + assert!(staled_regions.contains(®ion_id)); } #[tokio::test] - async fn test_retain_active_regions_simple() { + async fn test_find_staled_regions_simple() { let datanode_id = 1; let region_number = 1u32; let table_id = 10; @@ -160,30 +158,29 @@ mod tests { .await .unwrap(); - // `inactive_regions` should be empty. - let mut datanode_regions = vec![region_id]; + // `staled_regions` should be empty. + let datanode_regions = vec![region_id]; - let inactive_regions = keeper - .retain_active_regions(0, datanode_id, &mut datanode_regions) + let staled_regions = keeper + .find_staled_leader_regions(0, datanode_id, &datanode_regions) .await .unwrap(); - assert!(inactive_regions.is_empty()); - assert_eq!(datanode_regions.len(), 1); + assert!(staled_regions.is_empty()); - // `inactive_regions` should be empty. - let mut datanode_regions = vec![]; + // `staled_regions` should be empty. + let datanode_regions = vec![]; - let inactive_regions = keeper - .retain_active_regions(0, datanode_id, &mut datanode_regions) + let staled_regions = keeper + .find_staled_leader_regions(0, datanode_id, &datanode_regions) .await .unwrap(); - assert!(inactive_regions.is_empty()); + assert!(staled_regions.is_empty()); } #[tokio::test] - async fn test_retain_active_regions_2() { + async fn test_find_staled_regions_2() { let datanode_id = 1; let region_number = 1u32; let table_id = 10; @@ -219,29 +216,27 @@ mod tests { .unwrap(); // Unexpected Leader region. - // `inactive_regions` should be vec![unknown_region_id]. - let mut datanode_regions = vec![region_id, unknown_region_id]; + // `staled_regions` should be vec![unknown_region_id]. + let datanode_regions = vec![region_id, unknown_region_id]; - let inactive_regions = keeper - .retain_active_regions(0, datanode_id, &mut datanode_regions) + let staled_regions = keeper + .find_staled_leader_regions(0, datanode_id, &datanode_regions) .await .unwrap(); - assert_eq!(inactive_regions.len(), 1); - assert!(inactive_regions.contains(&unknown_region_id)); - assert_eq!(datanode_regions.len(), 1); + assert_eq!(staled_regions.len(), 1); + assert!(staled_regions.contains(&unknown_region_id)); // Expected as Follower region. - // `inactive_regions` should be vec![another_region_id], because the `another_region_id` is a active region of `another_peer`. - let mut datanode_regions = vec![another_region_id]; + // `staled_regions` should be vec![another_region_id], because the `another_region_id` is a active region of `another_peer`. + let datanode_regions = vec![another_region_id]; - let inactive_regions = keeper - .retain_active_regions(0, datanode_id, &mut datanode_regions) + let staled_regions = keeper + .find_staled_leader_regions(0, datanode_id, &datanode_regions) .await .unwrap(); - assert_eq!(inactive_regions.len(), 1); - assert!(inactive_regions.contains(&another_region_id)); - assert!(datanode_regions.is_empty()); + assert_eq!(staled_regions.len(), 1); + assert!(staled_regions.contains(&another_region_id)); } } diff --git a/src/meta-srv/src/region/lease_keeper/mito.rs b/src/meta-srv/src/region/lease_keeper/mito.rs index 497ea8726704..245329a3e7cc 100644 --- a/src/meta-srv/src/region/lease_keeper/mito.rs +++ b/src/meta-srv/src/region/lease_keeper/mito.rs @@ -17,29 +17,24 @@ use std::collections::HashSet; use common_meta::rpc::router::{convert_to_region_leader_map, RegionRoute}; use store_api::storage::RegionId; -use crate::region::lease_keeper::utils::inactive_leader_regions; +use crate::region::lease_keeper::utils::staled_leader_regions; -/// Retains active [RegionRole::Leader](store_api::region_engine::RegionRole::Leader) regions(`datanode_regions`), returns inactive regions. +/// Returns staled regions. /// -/// It removes a leader region if the `datanode_id` isn't the corresponding leader peer in `region_routes`. +/// It returns a region if the `datanode_id` isn't the corresponding leader peer in `region_routes`. /// - Expected as [RegionRole::Follower](store_api::region_engine::RegionRole::Follower) regions. /// - Unexpected [RegionRole::Leader](store_api::region_engine::RegionRole::Leader) regions. -pub fn retain_active_regions( +pub fn find_staled_leader_regions( datanode_id: u64, - datanode_regions: &mut Vec, + datanode_regions: &[RegionId], region_routes: &[RegionRoute], ) -> HashSet { let region_leader_map = convert_to_region_leader_map(region_routes); - let inactive_region_ids = datanode_regions - .clone() - .into_iter() - .filter_map(|region_id| inactive_leader_regions(datanode_id, region_id, ®ion_leader_map)) - .collect::>(); - - datanode_regions.retain(|region_id| !inactive_region_ids.contains(region_id)); - - inactive_region_ids + datanode_regions + .iter() + .filter_map(|region_id| staled_leader_regions(datanode_id, *region_id, ®ion_leader_map)) + .collect::>() } #[cfg(test)] @@ -49,10 +44,10 @@ mod tests { use common_meta::rpc::router::{Region, RegionRoute}; use store_api::storage::RegionId; - use crate::region::lease_keeper::mito::retain_active_regions; + use crate::region::lease_keeper::mito::find_staled_leader_regions; #[test] - fn test_retain_active_regions() { + fn test_find_staled_regions() { let datanode_id = 1u64; let region_number = 1u32; let region_id = RegionId::from_u64(region_number as u64); @@ -67,22 +62,18 @@ mod tests { }]; // Grants lease. - // `inactive_regions` should be empty, `region_id` is a active leader region of the `peer` - let inactive_regions = - retain_active_regions(datanode_id, &mut datanode_regions.clone(), ®ion_routes); - - assert!(inactive_regions.is_empty()); + // `staled_regions` should be empty, `region_id` is a active leader region of the `peer` + let staled_regions = + find_staled_leader_regions(datanode_id, &datanode_regions, ®ion_routes); - let mut retained_active_regions = datanode_regions.clone(); + assert!(staled_regions.is_empty()); // Unexpected Leader region. - // `inactive_regions` should be vec![`region_id`]; - let inactive_regions = - retain_active_regions(datanode_id, &mut retained_active_regions, &[]); + // `staled_regions` should be vec![`region_id`]; + let staled_regions = find_staled_leader_regions(datanode_id, &datanode_regions, &[]); - assert_eq!(inactive_regions.len(), 1); - assert!(inactive_regions.contains(®ion_id)); - assert!(retained_active_regions.is_empty()); + assert_eq!(staled_regions.len(), 1); + assert!(staled_regions.contains(®ion_id)); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), @@ -90,15 +81,14 @@ mod tests { follower_peers: vec![peer.clone()], }]; - let mut retained_active_regions = datanode_regions.clone(); + let retained_active_regions = datanode_regions.clone(); // Expected as Follower region. - // `inactive_regions` should be vec![`region_id`], `region_id` is RegionRole::Leader. - let inactive_regions = - retain_active_regions(datanode_id, &mut retained_active_regions, ®ion_routes); + // `staled_regions` should be vec![`region_id`], `region_id` is RegionRole::Leader. + let staled_regions = + find_staled_leader_regions(datanode_id, &retained_active_regions, ®ion_routes); - assert_eq!(inactive_regions.len(), 1); - assert!(inactive_regions.contains(®ion_id)); - assert!(retained_active_regions.is_empty()); + assert_eq!(staled_regions.len(), 1); + assert!(staled_regions.contains(®ion_id)); } } diff --git a/src/meta-srv/src/region/lease_keeper/utils.rs b/src/meta-srv/src/region/lease_keeper/utils.rs index 7acd0150c797..dfc82066cdc5 100644 --- a/src/meta-srv/src/region/lease_keeper/utils.rs +++ b/src/meta-srv/src/region/lease_keeper/utils.rs @@ -20,7 +20,7 @@ use store_api::storage::RegionId; /// Returns Some(region_id) if it's a inactive leader region. /// /// It removes a leader region if the `node_id` isn't the corresponding leader peer in `region_routes`. -pub fn inactive_leader_regions( +pub fn staled_leader_regions( node_id: u64, region_id: RegionId, region_leader_map: &HashMap, @@ -57,13 +57,13 @@ mod tests { // Should be None, `region_id` is a active region of `peer`. assert_eq!( None, - inactive_leader_regions(datanode_id, region_id, ®ion_leader_map) + staled_leader_regions(datanode_id, region_id, ®ion_leader_map) ); // Should be Some(`region_id`), the inactive_leader_regions is empty. assert_eq!( Some(region_id), - inactive_leader_regions(datanode_id, region_id, &Default::default()) + staled_leader_regions(datanode_id, region_id, &Default::default()) ); let another_peer = Peer::empty(datanode_id + 1); @@ -72,7 +72,7 @@ mod tests { // Should be Some(`region_id`), `region_id` is active region of `another_peer`. assert_eq!( Some(region_id), - inactive_leader_regions(datanode_id, region_id, ®ion_leader_map) + staled_leader_regions(datanode_id, region_id, ®ion_leader_map) ); } } From 32ec24d62d3d4ed94f4922140b076fc0d73d03ec Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 2 Nov 2023 04:22:15 +0000 Subject: [PATCH 8/8] fix: fix ci --- src/meta-srv/src/region/lease_keeper.rs | 1 + src/meta-srv/src/region/lease_keeper/mito.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs index fc9f76503ccf..bad7719ac8e2 100644 --- a/src/meta-srv/src/region/lease_keeper.rs +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -204,6 +204,7 @@ mod tests { region: Region::new_test(another_region_id), leader_peer: None, follower_peers: vec![another_peer.clone()], + leader_status: None, }, ]; diff --git a/src/meta-srv/src/region/lease_keeper/mito.rs b/src/meta-srv/src/region/lease_keeper/mito.rs index 245329a3e7cc..0a6e53013af9 100644 --- a/src/meta-srv/src/region/lease_keeper/mito.rs +++ b/src/meta-srv/src/region/lease_keeper/mito.rs @@ -79,6 +79,7 @@ mod tests { region: Region::new_test(region_id), leader_peer: Some(another_peer.clone()), follower_peers: vec![peer.clone()], + leader_status: None, }]; let retained_active_regions = datanode_regions.clone();