From f30024504281c40c655f13b059464a899166397d Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 23 Oct 2023 16:46:48 +0000 Subject: [PATCH] feat: introduce the region lease keeper --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/common/meta/src/key.rs | 45 +-- src/common/meta/src/key/test_utils.rs | 57 ++++ src/common/meta/src/peer.rs | 8 + src/common/meta/src/rpc/router.rs | 69 +++- src/meta-srv/Cargo.toml | 1 + src/meta-srv/src/handler/failure_handler.rs | 4 + src/meta-srv/src/handler/node_stat.rs | 26 +- src/meta-srv/src/lib.rs | 2 + src/meta-srv/src/region.rs | 17 + src/meta-srv/src/region/lease_keeper.rs | 309 ++++++++++++++++++ src/meta-srv/src/region/lease_keeper/file.rs | 167 ++++++++++ src/meta-srv/src/region/lease_keeper/mito.rs | 160 +++++++++ src/meta-srv/src/region/lease_keeper/utils.rs | 129 ++++++++ src/partition/src/manager.rs | 4 +- src/store-api/src/region_engine.rs | 28 ++ 17 files changed, 984 insertions(+), 46 deletions(-) create mode 100644 src/common/meta/src/key/test_utils.rs create mode 100644 src/meta-srv/src/region.rs create mode 100644 src/meta-srv/src/region/lease_keeper.rs create mode 100644 src/meta-srv/src/region/lease_keeper/file.rs create mode 100644 src/meta-srv/src/region/lease_keeper/mito.rs create mode 100644 src/meta-srv/src/region/lease_keeper/utils.rs diff --git a/Cargo.lock b/Cargo.lock index b96060df31d7..78c9ff712697 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4180,7 +4180,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=1f1dd532a111e3834cc3019c5605e2993ffb9dc3#1f1dd532a111e3834cc3019c5605e2993ffb9dc3" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=9fa3c42d5aaf38d8056dffd8273583b04f62c06c#9fa3c42d5aaf38d8056dffd8273583b04f62c06c" dependencies = [ "prost", "serde", diff --git a/Cargo.toml b/Cargo.toml index 49f275710d4b..bc7c60bf7e1e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,7 +79,7 @@ derive_builder = "0.12" etcd-client = "0.11" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1f1dd532a111e3834cc3019c5605e2993ffb9dc3" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "9fa3c42d5aaf38d8056dffd8273583b04f62c06c" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index d1a980d7023e..92f85591a92f 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -54,6 +54,8 @@ pub mod table_region; // TODO(weny): removes it. #[allow(deprecated)] pub mod table_route; +#[cfg(any(test, feature = "testing"))] +pub mod test_utils; use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; @@ -684,12 +686,11 @@ mod tests { use std::sync::Arc; use bytes::Bytes; - use datatypes::prelude::ConcreteDataType; - use datatypes::schema::{ColumnSchema, SchemaBuilder}; use futures::TryStreamExt; - use table::metadata::{RawTableInfo, TableInfo, TableInfoBuilder, TableMetaBuilder}; + use table::metadata::{RawTableInfo, TableInfo}; use super::datanode_table::DatanodeTableKey; + use super::test_utils; use crate::ddl::utils::region_storage_path; use crate::key::datanode_table::RegionInfo; use crate::key::table_info::TableInfoValue; @@ -735,40 +736,6 @@ mod tests { assert_eq!(removed, to_removed_key(key)); } - fn new_test_table_info(region_numbers: impl Iterator) -> TableInfo { - let column_schemas = vec![ - ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), - ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ) - .with_time_index(true), - ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true), - ]; - let schema = SchemaBuilder::try_from(column_schemas) - .unwrap() - .version(123) - .build() - .unwrap(); - - let meta = TableMetaBuilder::default() - .schema(Arc::new(schema)) - .primary_key_indices(vec![0]) - .engine("engine") - .next_column_id(3) - .region_numbers(region_numbers.collect::>()) - .build() - .unwrap(); - TableInfoBuilder::default() - .table_id(10) - .table_version(5) - .name("mytable") - .meta(meta) - .build() - .unwrap() - } - fn new_test_region_route() -> RegionRoute { new_region_route(1, 2) } @@ -786,6 +753,10 @@ mod tests { } } + fn new_test_table_info(region_numbers: impl Iterator) -> TableInfo { + test_utils::new_test_table_info(10, region_numbers) + } + #[tokio::test] async fn test_create_table_metadata() { let mem_kv = Arc::new(MemoryKvBackend::default()); diff --git a/src/common/meta/src/key/test_utils.rs b/src/common/meta/src/key/test_utils.rs new file mode 100644 index 000000000000..aa4fa21ea992 --- /dev/null +++ b/src/common/meta/src/key/test_utils.rs @@ -0,0 +1,57 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::{ColumnSchema, SchemaBuilder}; +use store_api::storage::TableId; +use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder}; + +pub fn new_test_table_info>( + table_id: TableId, + region_numbers: I, +) -> TableInfo { + let column_schemas = vec![ + ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true), + ]; + let schema = SchemaBuilder::try_from(column_schemas) + .unwrap() + .version(123) + .build() + .unwrap(); + + let meta = TableMetaBuilder::default() + .schema(Arc::new(schema)) + .primary_key_indices(vec![0]) + .engine("engine") + .next_column_id(3) + .region_numbers(region_numbers.into_iter().collect::>()) + .build() + .unwrap(); + TableInfoBuilder::default() + .table_id(table_id) + .table_version(5) + .name("mytable") + .meta(meta) + .build() + .unwrap() +} diff --git a/src/common/meta/src/peer.rs b/src/common/meta/src/peer.rs index 483250cbf1df..dd012a12a1c5 100644 --- a/src/common/meta/src/peer.rs +++ b/src/common/meta/src/peer.rs @@ -49,6 +49,14 @@ impl Peer { addr: addr.into(), } } + + #[cfg(any(test, feature = "testing"))] + pub fn empty(id: u64) -> Self { + Self { + id, + addr: String::new(), + } + } } impl Display for Peer { diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs index 63d4880c231e..09a59161eb66 100644 --- a/src/common/meta/src/rpc/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -104,7 +104,10 @@ pub fn find_leaders(region_routes: &[RegionRoute]) -> HashSet { .collect() } -pub fn convert_to_region_map(region_routes: &[RegionRoute]) -> HashMap { +/// Returns the HashMap<[RegionNumber], &[Peer]>; +/// +/// If the region doesn't have a leader peer, the [Region] will be omitted. +pub fn convert_to_region_leader_map(region_routes: &[RegionRoute]) -> HashMap { region_routes .iter() .filter_map(|x| { @@ -115,6 +118,50 @@ pub fn convert_to_region_map(region_routes: &[RegionRoute]) -> HashMap>() } +/// Returns the HashMap<[RegionNumber], BTreeMap>. +pub fn convert_to_region_followers_map( + region_routes: &[RegionRoute], +) -> HashMap> { + region_routes + .iter() + .map(|x| { + ( + x.region.id.region_number(), + x.follower_peers + .iter() + .map(|peer| (peer.id, peer)) + .collect::>(), + ) + }) + .collect::>() +} + +/// Returns the HashMap<[RegionNumber], HashSet>. +pub fn convert_to_region_peers_map(region_routes: &[RegionRoute]) -> HashMap> { + let mut set = region_routes + .iter() + .map(|x| { + ( + x.region.id.region_number(), + x.follower_peers + .iter() + .map(|peer| (peer.id)) + .collect::>(), + ) + }) + .collect::>(); + + for route in region_routes { + if let Some(peer) = &route.leader_peer { + let entry = set.entry(route.region.id.region_number()).or_default(); + + entry.insert(peer.id); + } + } + + set +} + pub fn find_region_leader(region_routes: &[RegionRoute], region_number: u32) -> Option<&Peer> { region_routes .iter() @@ -348,12 +395,16 @@ pub struct RegionRoute { pub struct RegionRoutes(pub Vec); impl RegionRoutes { - pub fn region_map(&self) -> HashMap { - convert_to_region_map(&self.0) + pub fn region_leader_map(&self) -> HashMap { + convert_to_region_leader_map(&self.0) + } + + pub fn region_followers_map(&self) -> HashMap> { + convert_to_region_followers_map(&self.0) } pub fn find_region_leader(&self, region_number: u32) -> Option<&Peer> { - self.region_map().get(®ion_number).copied() + self.region_leader_map().get(®ion_number).copied() } } @@ -365,6 +416,16 @@ pub struct Region { pub attrs: BTreeMap, } +impl Region { + #[cfg(any(test, feature = "testing"))] + pub fn new_test(id: RegionId) -> Self { + Self { + id, + ..Default::default() + } + } +} + impl From for Region { fn from(r: PbRegion) -> Self { Self { diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 0836485e0a89..57663c657e2e 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -59,6 +59,7 @@ uuid.workspace = true [dev-dependencies] chrono.workspace = true client = { workspace = true, features = ["testing"] } +common-meta = { workspace = true, features = ["testing"] } common-procedure-test = { workspace = true } session = { workspace = true } tracing = "0.1" diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index 85bb0c49aac3..d1c39d99e75f 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -102,6 +102,8 @@ impl HeartbeatHandler for RegionFailureHandler { #[cfg(test)] mod tests { + use store_api::region_engine::RegionRole; + use super::*; use crate::handler::node_stat::{RegionStat, Stat}; use crate::metasrv::builder::MetaSrvBuilder; @@ -129,6 +131,8 @@ mod tests { wcus: 0, approximate_bytes: 0, approximate_rows: 0, + engine: default_engine().to_string(), + role: RegionRole::Follower, } } acc.stat = Some(Stat { diff --git a/src/meta-srv/src/handler/node_stat.rs b/src/meta-srv/src/handler/node_stat.rs index 8601ec5dbf67..08152749e7f5 100644 --- a/src/meta-srv/src/handler/node_stat.rs +++ b/src/meta-srv/src/handler/node_stat.rs @@ -17,6 +17,7 @@ use std::collections::HashSet; use api::v1::meta::HeartbeatRequest; use common_time::util as time_util; use serde::{Deserialize, Serialize}; +use store_api::region_engine::RegionRole; use crate::error::{Error, InvalidHeartbeatRequestSnafu}; use crate::keys::StatKey; @@ -25,7 +26,9 @@ use crate::keys::StatKey; pub struct Stat { pub timestamp_millis: i64, pub cluster_id: u64, + // The datanode Id. pub id: u64, + // The datanode address. pub addr: String, /// The read capacity units during this period pub rcus: i64, @@ -38,8 +41,9 @@ pub struct Stat { pub node_epoch: u64, } -#[derive(Debug, Default, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct RegionStat { + /// The region_id. pub id: u64, /// The read capacity units during this period pub rcus: i64, @@ -49,6 +53,24 @@ pub struct RegionStat { pub approximate_bytes: i64, /// Approximate number of rows in this region pub approximate_rows: i64, + /// The engine name. + pub engine: String, + /// The region role. + pub role: RegionRole, +} + +impl Default for RegionStat { + fn default() -> Self { + Self { + id: 0, + rcus: 0, + wcus: 0, + approximate_bytes: 0, + approximate_rows: 0, + engine: String::new(), + role: RegionRole::Follower, + } + } } impl Stat { @@ -132,6 +154,8 @@ impl TryFrom for RegionStat { wcus: value.wcus, approximate_bytes: value.approximate_bytes, approximate_rows: value.approximate_rows, + engine: value.engine.to_string(), + role: RegionRole::from(value.role()), }) } } diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index 9bea19c48fce..0066883c0791 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -14,6 +14,7 @@ #![feature(async_closure)] #![feature(result_flattening)] +#![feature(extract_if)] pub mod bootstrap; mod cache_invalidator; @@ -31,6 +32,7 @@ mod metrics; pub mod mocks; pub mod procedure; pub mod pubsub; +pub mod region; pub mod selector; pub mod service; pub mod table_meta_alloc; diff --git a/src/meta-srv/src/region.rs b/src/meta-srv/src/region.rs new file mode 100644 index 000000000000..a3106bd57935 --- /dev/null +++ b/src/meta-srv/src/region.rs @@ -0,0 +1,17 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod lease_keeper; + +pub use lease_keeper::RegionLeaseKeeper; diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs new file mode 100644 index 000000000000..95ed1fc0f5e9 --- /dev/null +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -0,0 +1,309 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod file; +pub mod mito; +pub mod utils; + +use std::collections::{HashMap, HashSet}; + +use common_catalog::consts::{FILE_ENGINE, MITO2_ENGINE}; +use common_meta::key::TableMetadataManagerRef; +use futures::future::try_join_all; +use snafu::ResultExt; +use store_api::region_engine::RegionRole; +use store_api::storage::{RegionId, TableId}; + +use crate::error::{self, Result}; + +pub struct RegionLeaseKeeper { + table_metadata_manager: TableMetadataManagerRef, +} + +impl RegionLeaseKeeper { + pub fn new(table_metadata_manager: TableMetadataManagerRef) -> Self { + Self { + table_metadata_manager, + } + } +} + +impl RegionLeaseKeeper { + /// Retains active mito regions(`datanode_regions`), returns inactive regions. + /// + /// **For mito regions:** + /// + /// - It removes a leader region if the `node_id` isn't the corresponding leader peer in `region_routes`. + /// - It removes a follower region if the `node_id` isn't one of the peers in `region_routes`. + /// + /// **For file regions:** + /// + /// - It removes a follower region if the `node_id` isn't one of the peers in `region_routes`. + /// + /// **For all regions:** + /// + /// - It removes a region if the region's table metadata is not found. + pub async fn retain_active_regions( + &self, + _cluster_id: u64, + node_id: u64, + engine: &str, + datanode_regions: &mut Vec<(RegionId, RegionRole)>, + ) -> Result> { + let table_route_manager = self.table_metadata_manager.table_route_manager(); + + let handler = match engine { + MITO2_ENGINE => mito::retain_active_regions, + FILE_ENGINE => file::retain_active_regions, + _ => { + return error::UnexpectedSnafu { + violated: format!("Unknown engine: {engine}"), + } + .fail() + } + }; + + let mut tables = HashMap::>::new(); + + // Group by `table_id`. + for (region_id, role) in datanode_regions.iter() { + let table = tables.entry(region_id.table_id()).or_default(); + table.push((*region_id, *role)); + } + + let table_ids: Vec = tables.keys().cloned().collect::>(); + let metadata = try_join_all( + table_ids + .iter() + .map(|table_id| table_route_manager.get(*table_id)), + ) + .await + .context(error::TableMetadataManagerSnafu)?; + + // All tables' metadata. + let tables_metadata = table_ids + .into_iter() + .zip(metadata) + .collect::>>(); + + let mut inactive_regions = HashSet::new(); + + // Removes inactive regions. + for (table_id, metadata) in tables_metadata { + if let Some(metadata) = metadata { + // Safety: Value must exist. + let regions = tables.get_mut(&table_id).unwrap(); + let region_routes = &metadata.region_routes; + + inactive_regions.extend(handler(node_id, regions, region_routes)); + } else { + // Removes table if metadata is not found. + // Safety: Value must exist. + let regions = tables + .remove(&table_id) + .unwrap() + .into_iter() + .map(|(region_id, _)| region_id); + + inactive_regions.extend(regions); + } + } + + let _ = datanode_regions + .extract_if(|(region_id, _)| inactive_regions.contains(region_id)) + .collect::>(); + + Ok(inactive_regions) + } + + #[cfg(test)] + pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef { + &self.table_metadata_manager + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_catalog::consts::MITO2_ENGINE; + use common_meta::key::test_utils::new_test_table_info; + use common_meta::key::TableMetadataManager; + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute}; + use store_api::region_engine::RegionRole; + use store_api::storage::RegionId; + + use super::RegionLeaseKeeper; + use crate::service::store::kv::KvBackendAdapter; + use crate::service::store::memory::MemStore; + + fn new_test_keeper() -> RegionLeaseKeeper { + let store = KvBackendAdapter::wrap(Arc::new(MemStore::new())); + + let table_metadata_manager = Arc::new(TableMetadataManager::new(store)); + + RegionLeaseKeeper::new(table_metadata_manager) + } + + #[tokio::test] + async fn test_empty_table_routes() { + let node_id = 1; + let engine = MITO2_ENGINE; + let region_number = 1u32; + let region_id = RegionId::from_u64(region_number as u64); + + let keeper = new_test_keeper(); + + let mut datanode_regions = vec![(region_id, RegionRole::Leader)]; + + let removed = keeper + .retain_active_regions(0, node_id, engine, &mut datanode_regions) + .await + .unwrap(); + + assert!(datanode_regions.is_empty()); + assert_eq!(removed.len(), 1); + assert!(removed.contains(®ion_id)); + } + + #[tokio::test] + async fn test_retain_active_regions_simple() { + let node_id = 1; + let engine = MITO2_ENGINE; + let region_number = 1u32; + let table_id = 10; + let region_id = RegionId::new(table_id, region_number); + let peer = Peer::empty(node_id); + let table_info = new_test_table_info(table_id, vec![region_number]).into(); + + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(peer.clone()), + ..Default::default() + }]; + + let keeper = new_test_keeper(); + let table_metadata_manager = keeper.table_metadata_manager(); + + table_metadata_manager + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + // `inactive_regions` should be vec![region_id]. + let mut datanode_regions = vec![(region_id, RegionRole::Leader)]; + + let inactive_regions = keeper + .retain_active_regions(0, node_id, engine, &mut datanode_regions) + .await + .unwrap(); + + assert!(inactive_regions.is_empty()); + assert_eq!(datanode_regions.len(), 1); + + // `inactive_regions` should be empty, because the `region_id` is a potential leader. + let mut datanode_regions = vec![(region_id, RegionRole::Follower)]; + + let inactive_regions = keeper + .retain_active_regions(0, node_id, engine, &mut datanode_regions) + .await + .unwrap(); + + assert!(inactive_regions.is_empty()); + } + + #[tokio::test] + async fn test_retain_active_regions_2() { + let node_id = 1; + let engine = MITO2_ENGINE; + let region_number = 1u32; + let table_id = 10; + let region_id = RegionId::new(table_id, region_number); + let another_region_id = RegionId::new(table_id, region_number + 1); + let unknown_region_id = RegionId::new(table_id + 1, region_number); + + let peer = Peer::empty(node_id); + let another_peer = Peer::empty(node_id + 1); + + let table_info = + new_test_table_info(table_id, vec![region_number, region_number + 1]).into(); + + let region_routes = vec![ + RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(peer.clone()), + ..Default::default() + }, + RegionRoute { + region: Region::new_test(another_region_id), + leader_peer: None, + follower_peers: vec![another_peer.clone()], + }, + ]; + + let keeper = new_test_keeper(); + let table_metadata_manager = keeper.table_metadata_manager(); + + table_metadata_manager + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + // `inactive_regions` should be vec![unknown_region_id]. + let mut datanode_regions = vec![ + (region_id, RegionRole::Leader), + (unknown_region_id, RegionRole::Follower), + ]; + + let inactive_regions = keeper + .retain_active_regions(0, node_id, engine, &mut datanode_regions) + .await + .unwrap(); + + assert_eq!(inactive_regions.len(), 1); + assert!(inactive_regions.contains(&unknown_region_id)); + assert_eq!(datanode_regions.len(), 1); + + // `inactive_regions` should be vec![another_region_id], because the `another_region_id` is a active region of `another_peer`. + let mut datanode_regions = vec![ + (region_id, RegionRole::Follower), + (another_region_id, RegionRole::Follower), + ]; + + let inactive_regions = keeper + .retain_active_regions(0, node_id, engine, &mut datanode_regions) + .await + .unwrap(); + + assert_eq!(inactive_regions.len(), 1); + assert!(inactive_regions.contains(&another_region_id)); + assert_eq!(datanode_regions.len(), 1); + + // `inactive_regions` should be vec![another_region_id], because the `another_region_id` is a active region of `another_peer`. + let mut datanode_regions = vec![ + (region_id, RegionRole::Follower), + (another_region_id, RegionRole::Leader), + ]; + + let inactive_regions = keeper + .retain_active_regions(0, node_id, engine, &mut datanode_regions) + .await + .unwrap(); + + assert_eq!(inactive_regions.len(), 1); + assert!(inactive_regions.contains(&another_region_id)); + assert_eq!(datanode_regions.len(), 1); + } +} diff --git a/src/meta-srv/src/region/lease_keeper/file.rs b/src/meta-srv/src/region/lease_keeper/file.rs new file mode 100644 index 000000000000..c0148584359e --- /dev/null +++ b/src/meta-srv/src/region/lease_keeper/file.rs @@ -0,0 +1,167 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use common_meta::rpc::router::{convert_to_region_peers_map, RegionRoute}; +use store_api::region_engine::RegionRole; +use store_api::storage::RegionId; + +use super::utils::inactive_follower_regions; + +/// Retains active mito regions(`datanode_regions`), returns inactive regions. +/// +/// It removes a region if the `node_id` isn't one of the peers in `region_routes`. +pub fn retain_active_regions( + node_id: u64, + datanode_regions: &mut Vec<(RegionId, RegionRole)>, + region_routes: &[RegionRoute], +) -> HashSet { + let region_peers_map = convert_to_region_peers_map(region_routes); + + let inactive_region_ids = datanode_regions + .clone() + .into_iter() + .filter_map(|(region_id, _)| { + inactive_follower_regions(node_id, region_id, ®ion_peers_map) + }) + .collect::>(); + + let _ = datanode_regions + .extract_if(|(region_id, _)| inactive_region_ids.contains(region_id)) + .collect::>(); + inactive_region_ids +} + +#[cfg(test)] +mod tests { + + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute}; + use store_api::region_engine::RegionRole; + use store_api::storage::RegionId; + + use crate::region::lease_keeper::file::retain_active_regions; + + #[test] + fn test_retain_active_regions() { + let node_id = 1u64; + let region_number = 1u32; + let region_id = RegionId::from_u64(region_number as u64); + let peer = Peer::empty(node_id); + + let another_region_id = RegionId::from_u64(region_number as u64 + 1); + let another_peer = Peer::empty(node_id + 1); + + let datanode_regions = vec![(region_id, RegionRole::Follower)]; + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(peer.clone()), + ..Default::default() + }]; + + // `inactive_regions` should be empty, `region_id` is a active region of the `peer` + let inactive_regions = + retain_active_regions(node_id, &mut datanode_regions.clone(), ®ion_routes); + + assert!(inactive_regions.is_empty()); + + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: None, + follower_peers: vec![peer.clone()], + }]; + + // `inactive_regions` should be empty, `region_id` is a active region of the `peer` + let inactive_regions = + retain_active_regions(node_id, &mut datanode_regions.clone(), ®ion_routes); + + assert!(inactive_regions.is_empty()); + + let region_routes = vec![ + RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(peer.clone()), + ..Default::default() + }, + RegionRoute { + region: Region::new_test(another_region_id), + leader_peer: None, + follower_peers: vec![peer.clone()], + }, + ]; + + // `inactive_regions` should be empty, `region_id` is a active region of the `peer` + let inactive_regions = + retain_active_regions(node_id, &mut datanode_regions.clone(), ®ion_routes); + + assert!(inactive_regions.is_empty()); + + // `inactive_regions` should be vec[`region_id`,`another_region_id`], + // both regions are the active region of the `another_peer`. + let region_routes = vec![ + RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(another_peer.clone()), + ..Default::default() + }, + RegionRoute { + region: Region::new_test(another_region_id), + leader_peer: None, + follower_peers: vec![another_peer.clone()], + }, + ]; + + let mut datanode_regions = vec![ + (region_id, RegionRole::Follower), + (another_region_id, RegionRole::Follower), + ]; + + let inactive_regions = + retain_active_regions(node_id, &mut datanode_regions, ®ion_routes); + + assert_eq!(inactive_regions.len(), 2); + assert!(inactive_regions.contains(®ion_id)); + assert!(inactive_regions.contains(&another_region_id)); + assert!(datanode_regions.is_empty()); + + // `inactive_regions` should be vec[`another_region_id`], + // `another_region_id` regions are the active region of the `another_peer`. + let region_routes = vec![ + RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(peer.clone()), + ..Default::default() + }, + RegionRoute { + region: Region::new_test(another_region_id), + leader_peer: None, + follower_peers: vec![another_peer], + }, + ]; + + let mut datanode_regions = vec![ + (region_id, RegionRole::Follower), + (another_region_id, RegionRole::Follower), + ]; + + let inactive_regions = + retain_active_regions(node_id, &mut datanode_regions, ®ion_routes); + + assert_eq!(inactive_regions.len(), 1); + assert!(inactive_regions.contains(&another_region_id)); + assert_eq!(datanode_regions.len(), 1); + assert!(datanode_regions.contains(&(region_id, RegionRole::Follower))); + } +} diff --git a/src/meta-srv/src/region/lease_keeper/mito.rs b/src/meta-srv/src/region/lease_keeper/mito.rs new file mode 100644 index 000000000000..fd3ed84e3fe3 --- /dev/null +++ b/src/meta-srv/src/region/lease_keeper/mito.rs @@ -0,0 +1,160 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use common_meta::rpc::router::{ + convert_to_region_leader_map, convert_to_region_peers_map, RegionRoute, +}; +use store_api::region_engine::RegionRole; +use store_api::storage::RegionId; + +use crate::region::lease_keeper::utils::{inactive_follower_regions, inactive_leader_regions}; + +/// Retains active mito regions(`datanode_regions`), returns inactive regions. +/// +/// It removes a leader region if the `node_id` isn't the corresponding leader peer in `region_routes`. +/// +/// It removes a follower region if the `node_id` isn't one of the peers in `region_routes`. +pub fn retain_active_regions( + node_id: u64, + datanode_regions: &mut Vec<(RegionId, RegionRole)>, + region_routes: &[RegionRoute], +) -> HashSet { + let region_leader_map = convert_to_region_leader_map(region_routes); + let region_peers_map = convert_to_region_peers_map(region_routes); + + let inactive_region_ids = datanode_regions + .clone() + .into_iter() + .filter_map(|(region_id, role)| match role { + RegionRole::Follower => { + inactive_follower_regions(node_id, region_id, ®ion_peers_map) + } + RegionRole::Leader => inactive_leader_regions(node_id, region_id, ®ion_leader_map), + }) + .collect::>(); + + let _ = datanode_regions + .extract_if(|(region_id, _)| inactive_region_ids.contains(region_id)) + .collect::>(); + + inactive_region_ids +} + +#[cfg(test)] +mod tests { + + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute}; + use store_api::region_engine::RegionRole; + use store_api::storage::RegionId; + + use crate::region::lease_keeper::mito::retain_active_regions; + + #[test] + fn test_retain_active_regions() { + let node_id = 1u64; + let region_number = 1u32; + let region_id = RegionId::from_u64(region_number as u64); + let peer = Peer::empty(node_id); + let another_peer = Peer::empty(node_id + 1); + + let datanode_regions = vec![(region_id, RegionRole::Leader)]; + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(peer.clone()), + ..Default::default() + }]; + + // `inactive_regions` should be empty, `region_id` is a active leader region of the `peer` + let inactive_regions = + retain_active_regions(node_id, &mut datanode_regions.clone(), ®ion_routes); + + assert!(inactive_regions.is_empty()); + + let mut retained_active_regions = datanode_regions.clone(); + + // `inactive_regions` should be vec![`region_id`]; + let inactive_regions = retain_active_regions(node_id, &mut retained_active_regions, &[]); + + assert_eq!(inactive_regions.len(), 1); + assert!(inactive_regions.contains(®ion_id)); + assert!(retained_active_regions.is_empty()); + + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(another_peer.clone()), + follower_peers: vec![peer.clone()], + }]; + + let mut retained_active_regions = datanode_regions.clone(); + + // `inactive_regions` should be vec![`region_id`], `region_id` is RegionRole::Leader. + let inactive_regions = + retain_active_regions(node_id, &mut retained_active_regions, ®ion_routes); + + assert_eq!(inactive_regions.len(), 1); + assert!(inactive_regions.contains(®ion_id)); + assert!(retained_active_regions.is_empty()); + + // `inactive_regions` should be empty, `region_id` is a active follower region of the `peer`. + let datanode_regions = vec![(region_id, RegionRole::Follower)]; + let inactive_regions = + retain_active_regions(node_id, &mut datanode_regions.clone(), ®ion_routes); + assert!(inactive_regions.is_empty()); + + let another_region_id = RegionId::from_u64(region_number as u64 + 1); + + let region_routes = vec![ + RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(peer.clone()), + ..Default::default() + }, + RegionRoute { + region: Region::new_test(another_region_id), + leader_peer: None, + follower_peers: vec![peer], + }, + ]; + + let datanode_regions = vec![ + (region_id, RegionRole::Leader), + (another_region_id, RegionRole::Follower), + ]; + + let inactive_regions = + retain_active_regions(node_id, &mut datanode_regions.clone(), ®ion_routes); + + assert!(inactive_regions.is_empty()); + + // `inactive_regions` should be vec![another_region_id]. + let datanode_regions = vec![ + (another_region_id, RegionRole::Leader), + (region_id, RegionRole::Follower), + ]; + + let mut retained_active_regions = datanode_regions.clone(); + + let inactive_regions = + retain_active_regions(node_id, &mut retained_active_regions, ®ion_routes); + + assert_eq!(inactive_regions.len(), 1); + assert!(inactive_regions.contains(&another_region_id)); + + assert_eq!(retained_active_regions.len(), 1); + assert!(retained_active_regions.contains(&(region_id, RegionRole::Follower))); + } +} diff --git a/src/meta-srv/src/region/lease_keeper/utils.rs b/src/meta-srv/src/region/lease_keeper/utils.rs new file mode 100644 index 000000000000..4cec956a6947 --- /dev/null +++ b/src/meta-srv/src/region/lease_keeper/utils.rs @@ -0,0 +1,129 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; + +use common_meta::peer::Peer; +use store_api::storage::RegionId; + +/// Returns Some(region_id) if it's a inactive leader region. +/// +/// It removes a leader region if the `node_id` isn't the corresponding leader peer in `region_routes`. +pub fn inactive_leader_regions( + node_id: u64, + region_id: RegionId, + region_leader_map: &HashMap, +) -> Option { + if let Some(peer) = region_leader_map.get(®ion_id.region_number()) { + // TODO(weny): treats the leader peer as inactive if it's readonly or downgraded. + if peer.id == node_id { + None + } else { + Some(region_id) + } + } else { + Some(region_id) + } +} + +/// Returns Some(region_id) if it's a inactive follower region. +/// +/// It removes a region if the `node_id` isn't one of the peers in `region_routes`. +pub fn inactive_follower_regions( + node_id: u64, + region_id: RegionId, + region_peer_map: &HashMap>, +) -> Option { + if let Some(peers) = region_peer_map.get(®ion_id.region_number()) { + if peers.contains(&node_id) { + None + } else { + Some(region_id) + } + } else { + Some(region_id) + } +} + +#[cfg(test)] +mod tests { + + use common_meta::peer::Peer; + use store_api::storage::RegionId; + + use super::*; + + #[test] + fn test_inactive_follower_regions() { + let node_id = 1u64; + let region_number = 1u32; + let region_id = RegionId::from_u64(region_number as u64); + let peer = Peer::empty(node_id); + + let region_peers_map = [(region_number, HashSet::from([node_id]))].into(); + + // Should be None, `region_id` is a active region of `peer`. + assert_eq!( + None, + inactive_follower_regions(peer.id, region_id, ®ion_peers_map) + ); + + // Should be Some(`region_id`), region_followers_map is empty. + assert_eq!( + Some(region_id), + inactive_follower_regions(peer.id, region_id, &Default::default()) + ); + + let another_peer = Peer::empty(node_id + 1); + + let region_peers_map = [(region_number, HashSet::from([peer.id, another_peer.id]))].into(); + + // Should be None, `region_id` is a active region of `another_peer`. + assert_eq!( + None, + inactive_follower_regions(node_id, region_id, ®ion_peers_map) + ); + } + + #[test] + fn test_inactive_leader_regions() { + let node_id = 1u64; + let region_number = 1u32; + let region_id = RegionId::from_u64(region_number as u64); + let peer = Peer::empty(node_id); + + let region_leader_map = [(region_number, &peer)].into(); + + // Should be None, `region_id` is a active region of `peer`. + assert_eq!( + None, + inactive_leader_regions(node_id, region_id, ®ion_leader_map) + ); + + // Should be Some(`region_id`), the inactive_leader_regions is empty. + assert_eq!( + Some(region_id), + inactive_leader_regions(node_id, region_id, &Default::default()) + ); + + let another_peer = Peer::empty(node_id + 1); + let region_leader_map = [(region_number, &another_peer)].into(); + + // Should be Some(`region_id`), `region_id` is active region of `another_peer`. + assert_eq!( + Some(region_id), + inactive_leader_regions(node_id, region_id, ®ion_leader_map) + ); + } +} diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index 08e38a640180..8ce75a72e49b 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -19,7 +19,7 @@ use api::v1::Rows; use common_meta::key::table_route::TableRouteManager; use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; -use common_meta::rpc::router::{convert_to_region_map, RegionRoutes}; +use common_meta::rpc::router::{convert_to_region_leader_map, RegionRoutes}; use common_query::prelude::Expr; use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator}; use datatypes::prelude::Value; @@ -91,7 +91,7 @@ impl PartitionRuleManager { .context(error::FindTableRoutesSnafu { table_id })? .into_inner(); let mut datanodes = HashMap::with_capacity(regions.len()); - let region_map = convert_to_region_map(&route.region_routes); + let region_map = convert_to_region_leader_map(&route.region_routes); for region in regions.iter() { let datanode = *region_map.get(region).context(error::FindDatanodeSnafu { table_id, diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 504f9e5faa43..9ae874cd1c7b 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -16,15 +16,43 @@ use std::sync::Arc; +use api::greptime_proto::v1::meta::RegionRole as PbRegionRole; use async_trait::async_trait; use common_error::ext::BoxedError; use common_query::Output; use common_recordbatch::SendableRecordBatchStream; +use serde::{Deserialize, Serialize}; use crate::metadata::RegionMetadataRef; use crate::region_request::RegionRequest; use crate::storage::{RegionId, ScanRequest}; +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum RegionRole { + // Writable region(mito2), Readonly region(file). + Follower, + // Readonly region. + Leader, +} + +impl From for PbRegionRole { + fn from(value: RegionRole) -> Self { + match value { + RegionRole::Follower => PbRegionRole::Follower, + RegionRole::Leader => PbRegionRole::Leader, + } + } +} + +impl From for RegionRole { + fn from(value: PbRegionRole) -> Self { + match value { + PbRegionRole::Leader => RegionRole::Follower, + PbRegionRole::Follower => RegionRole::Leader, + } + } +} + #[async_trait] pub trait RegionEngine: Send + Sync { /// Name of this engine