From fd98745828c7bf5a958d96e4ca9d27d21dc492c1 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 5 Dec 2023 06:58:35 +0000 Subject: [PATCH] refactor: simplify lease keeper --- Cargo.lock | 1 + src/common/meta/Cargo.toml | 1 + src/common/meta/src/key/table_route.rs | 10 + src/common/meta/src/rpc/router.rs | 6 +- .../src/handler/region_lease_handler.rs | 132 +--- src/meta-srv/src/region/lease_keeper.rs | 586 ++++++++---------- src/meta-srv/src/region/lease_keeper/mito.rs | 122 ---- src/meta-srv/src/region/lease_keeper/utils.rs | 340 ---------- 8 files changed, 308 insertions(+), 890 deletions(-) delete mode 100644 src/meta-srv/src/region/lease_keeper/mito.rs delete mode 100644 src/meta-srv/src/region/lease_keeper/utils.rs diff --git a/Cargo.lock b/Cargo.lock index 7c68686f897e..b80fc3ca06a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1799,6 +1799,7 @@ dependencies = [ "common-telemetry", "common-time", "datatypes", + "derive_builder 0.12.0", "etcd-client", "futures", "humantime-serde", diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index b645fd75edb9..891076f9149c 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -24,6 +24,7 @@ common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true datatypes.workspace = true +derive_builder.workspace = true etcd-client.workspace = true futures.workspace = true humantime-serde.workspace = true diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index 65178d2f0157..3eef0680b3c7 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::fmt::Display; use serde::{Deserialize, Serialize}; +use store_api::storage::RegionId; use table::metadata::TableId; use super::DeserializedValueWithBytes; @@ -50,12 +51,21 @@ impl TableRouteValue { } } + /// Returns a new version [TableRouteValue] with `region_routes`. pub fn update(&self, region_routes: Vec) -> Self { Self { region_routes, version: self.version + 1, } } + + /// Returns the corresponding [RegionRoute]. + pub fn region_route(&self, region_id: RegionId) -> Option { + self.region_routes + .iter() + .find(|route| route.region.id == region_id) + .cloned() + } } impl TableMetaKey for TableRouteKey { diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs index 6eca316cd496..edd500fa1e15 100644 --- a/src/common/meta/src/rpc/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -18,6 +18,7 @@ use api::v1::meta::{ Partition as PbPartition, Peer as PbPeer, Region as PbRegion, Table as PbTable, TableRoute as PbTableRoute, }; +use derive_builder::Builder; use serde::ser::SerializeSeq; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use snafu::OptionExt; @@ -231,12 +232,15 @@ impl From for PbTable { } } -#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq)] +#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq, Builder)] pub struct RegionRoute { pub region: Region, + #[builder(setter(into, strip_option))] pub leader_peer: Option, + #[builder(setter(into, strip_option), default)] pub follower_peers: Vec, /// `None` by default. + #[builder(setter(into, strip_option), default)] #[serde(default, skip_serializing_if = "Option::is_none")] pub leader_status: Option, } diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 24066f29765e..0df071f29c2f 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -12,26 +12,24 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; use std::sync::Arc; use api::v1::meta::{HeartbeatRequest, RegionLease, Role}; use async_trait::async_trait; use common_meta::key::TableMetadataManagerRef; -use common_telemetry::info; -use store_api::region_engine::{GrantedRegion, RegionRole}; -use store_api::storage::RegionId; +use store_api::region_engine::GrantedRegion; use crate::error::Result; use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::Context; -use crate::region::lease_keeper::{OpeningRegionKeeperRef, RegionLeaseKeeperRef}; +use crate::region::lease_keeper::{ + OpeningRegionKeeperRef, RegionLeaseKeeperRef, RenewRegionLeasesResponse, +}; use crate::region::RegionLeaseKeeper; pub struct RegionLeaseHandler { region_lease_seconds: u64, region_lease_keeper: RegionLeaseKeeperRef, - opening_region_keeper: OpeningRegionKeeperRef, } impl RegionLeaseHandler { @@ -40,42 +38,12 @@ impl RegionLeaseHandler { table_metadata_manager: TableMetadataManagerRef, opening_region_keeper: OpeningRegionKeeperRef, ) -> Self { - let region_lease_keeper = RegionLeaseKeeper::new(table_metadata_manager); + let region_lease_keeper = + RegionLeaseKeeper::new(table_metadata_manager, opening_region_keeper.clone()); Self { region_lease_seconds, region_lease_keeper: Arc::new(region_lease_keeper), - opening_region_keeper, - } - } -} - -fn flip_role(role: RegionRole) -> RegionRole { - match role { - RegionRole::Follower => RegionRole::Leader, - RegionRole::Leader => RegionRole::Follower, - } -} - -/// Grants lease of regions. -/// -/// - If a region is in an `operable` set, it will be granted an `flip_role(current)`([RegionRole]); -/// otherwise, it will be granted a `current`([RegionRole]). -/// - If a region is in a `closeable` set, it won't be granted. -fn grant( - granted_regions: &mut Vec, - operable: &HashSet, - closeable: &HashSet, - regions: &[RegionId], - current: RegionRole, -) { - for region in regions { - if operable.contains(region) { - granted_regions.push(GrantedRegion::new(*region, flip_role(current))); - } else if closeable.contains(region) { - // Filters out the closeable regions. - } else { - granted_regions.push(GrantedRegion::new(*region, current)) } } } @@ -99,79 +67,33 @@ impl HeartbeatHandler for RegionLeaseHandler { let regions = stat.regions(); let cluster_id = stat.cluster_id; let datanode_id = stat.id; - let mut granted_regions = Vec::with_capacity(regions.len()); - let mut inactive_regions = HashSet::new(); - - let (leaders, followers): (Vec<_>, Vec<_>) = regions - .into_iter() - .map(|(id, role)| match role { - RegionRole::Follower => (None, Some(id)), - RegionRole::Leader => (Some(id), None), - }) - .unzip(); - - let leaders = leaders.into_iter().flatten().collect::>(); - let (downgradable, closeable) = self + let RenewRegionLeasesResponse { + non_exists, + renewed, + } = self .region_lease_keeper - .find_staled_leader_regions(cluster_id, datanode_id, &leaders) + .renew_region_leases(cluster_id, datanode_id, ®ions) .await?; - grant( - &mut granted_regions, - &downgradable, - &closeable, - &leaders, - RegionRole::Leader, - ); - if !closeable.is_empty() { - info!( - "Granting region lease, found closeable leader regions: {:?} on datanode {}", - closeable, datanode_id - ); - } - inactive_regions.extend(closeable); - - let followers = followers.into_iter().flatten().collect::>(); - - let (upgradeable, closeable) = self - .region_lease_keeper - .find_staled_follower_regions(cluster_id, datanode_id, &followers) - .await?; - - // If a region is opening, it will be filtered out from the closeable regions set. - let closeable = self - .opening_region_keeper - .filter_opening_regions(datanode_id, closeable); - - grant( - &mut granted_regions, - &upgradeable, - &closeable, - &followers, - RegionRole::Follower, - ); - if !closeable.is_empty() { - info!( - "Granting region lease, found closeable follower regions {:?} on datanode {}", - closeable, datanode_id - ); - } - inactive_regions.extend(closeable); + let renewed = renewed + .into_iter() + .map(|(region_id, region_role)| { + GrantedRegion { + region_id, + region_role, + } + .into() + }) + .collect::>(); acc.region_lease = Some(RegionLease { - regions: granted_regions - .into_iter() - .map(Into::into) - .collect::>(), + regions: renewed, duration_since_epoch: req.duration_since_epoch, lease_seconds: self.region_lease_seconds, - closeable_region_ids: inactive_regions - .iter() - .map(|region| region.as_u64()) - .collect(), + closeable_region_ids: non_exists.iter().map(|region| region.as_u64()).collect(), }); - acc.inactive_region_ids = inactive_regions; + acc.inactive_region_ids = non_exists; Ok(HandleControl::Continue) } @@ -179,7 +101,7 @@ impl HeartbeatHandler for RegionLeaseHandler { #[cfg(test)] mod test { - use std::collections::HashMap; + use std::collections::{HashMap, HashSet}; use std::sync::Arc; use common_meta::distributed_time_constants; @@ -188,6 +110,7 @@ mod test { use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute, RegionStatus}; + use store_api::region_engine::RegionRole; use store_api::storage::RegionId; use super::*; @@ -200,7 +123,8 @@ mod test { let table_metadata_manager = Arc::new(TableMetadataManager::new(store)); - RegionLeaseKeeper::new(table_metadata_manager) + let opening_keeper = Arc::new(OpeningRegionKeeper::default()); + RegionLeaseKeeper::new(table_metadata_manager, opening_keeper) } fn new_empty_region_stat(region_id: RegionId, role: RegionRole) -> RegionStat { diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs index f29495e1e5f8..fb8c3901d084 100644 --- a/src/meta-srv/src/region/lease_keeper.rs +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -12,36 +12,75 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod mito; -pub mod utils; - use std::collections::{HashMap, HashSet}; use std::sync::{Arc, RwLock}; use common_meta::key::table_route::TableRouteValue; use common_meta::key::TableMetadataManagerRef; +use common_meta::rpc::router::RegionRoute; use common_meta::DatanodeId; -use common_telemetry::warn; use snafu::ResultExt; +use store_api::region_engine::RegionRole; use store_api::storage::{RegionId, TableId}; -use self::mito::find_staled_leader_regions; use crate::error::{self, Result}; -use crate::metrics; -use crate::region::lease_keeper::utils::find_staled_follower_regions; pub type RegionLeaseKeeperRef = Arc; +/// Renews lease of regions. pub struct RegionLeaseKeeper { table_metadata_manager: TableMetadataManagerRef, + opening_region_keeper: OpeningRegionKeeperRef, +} + +/// The result of region lease renewal, +/// contains the renewed region leases and [RegionId] of non-existing regions. +pub struct RenewRegionLeasesResponse { + pub renewed: HashMap, + pub non_exists: HashSet, } impl RegionLeaseKeeper { - pub fn new(table_metadata_manager: TableMetadataManagerRef) -> Self { + pub fn new( + table_metadata_manager: TableMetadataManagerRef, + opening_region_keeper: OpeningRegionKeeperRef, + ) -> Self { Self { table_metadata_manager, + opening_region_keeper, + } + } +} + +fn renew_region_lease_via_region_route( + region_route: &RegionRoute, + datanode_id: DatanodeId, + region_id: RegionId, +) -> Option<(RegionId, RegionRole)> { + // If it's a leader region on this datanode. + if let Some(leader) = ®ion_route.leader_peer { + if leader.id == datanode_id { + let region_role = if region_route.is_leader_downgraded() { + RegionRole::Follower + } else { + RegionRole::Leader + }; + + return Some((region_id, region_role)); } } + + // If it's a follower region on this datanode. + if region_route + .follower_peers + .iter() + .any(|peer| peer.id == datanode_id) + { + return Some((region_id, RegionRole::Follower)); + } + + // The region doesn't belong to this datanode. + None } impl RegionLeaseKeeper { @@ -73,100 +112,67 @@ impl RegionLeaseKeeper { Ok(metadata_subset) } - /// Returns downgradable regions, and closeable regions. - /// - /// - Downgradable regions: - /// Region's peer(`datanode_id`) is the corresponding downgraded leader peer in `region_routes`. - /// - /// - closeable regions: - /// - It returns a region if it's peer(`datanode_id`) isn't the corresponding leader peer in `region_routes`. - /// - Expected as [RegionRole::Follower](store_api::region_engine::RegionRole::Follower) regions. - /// - Unexpected [RegionRole::Leader](store_api::region_engine::RegionRole::Leader) regions. - /// - It returns a region if the region's table metadata is not found. - pub async fn find_staled_leader_regions( + /// Returns [None] if specific region doesn't belong to the datanode. + fn renew_region_lease( &self, - _cluster_id: u64, - datanode_id: u64, - datanode_regions: &[RegionId], - ) -> Result<(HashSet, HashSet)> { - let tables = self.collect_tables(datanode_regions); - let table_ids = tables.keys().copied().collect::>(); - - let metadata_subset = { - let _timer = metrics::METRIC_META_LOAD_LEADER_METADATA.start_timer(); - self.collect_tables_metadata(&table_ids).await? - }; - - let mut closeable_set = HashSet::new(); - let mut downgradable_set = HashSet::new(); - - for (table_id, regions) in tables { - if let Some(metadata) = metadata_subset.get(&table_id) { - let region_routes = &metadata.region_routes; - - let (downgradable, closeable) = - find_staled_leader_regions(datanode_id, ®ions, region_routes); + table_metadata: &HashMap, + datanode_id: DatanodeId, + region_id: RegionId, + role: RegionRole, + ) -> Option<(RegionId, RegionRole)> { + // Renews the lease if it's a opening region. + if self.opening_region_keeper.contains(datanode_id, region_id) { + return Some((region_id, role)); + } - downgradable_set.extend(downgradable); - closeable_set.extend(closeable); - } else { - warn!( - "The table {} metadata is not found, appends closeable leader regions: {:?}", - table_id, regions - ); - // If table metadata is not found. - closeable_set.extend(regions); + if let Some(table_route) = table_metadata.get(®ion_id.table_id()) { + if let Some(region_route) = table_route.region_route(region_id) { + return renew_region_lease_via_region_route(®ion_route, datanode_id, region_id); } } - Ok((downgradable_set, closeable_set)) + None } - /// Returns upgradable regions, and closeable regions. + /// Renews the lease of regions for specific datanode. /// - /// Upgradable regions: - /// - Region's peer(`datanode_id`) is the corresponding leader peer in `region_routes`. + /// The lease of regions will be renewed if: + /// - The region of the specific datanode exists in [TableRouteValue]. + /// - The region of the specific datanode is opening. /// - /// closeable regions: - /// - Region's peer(`datanode_id`) isn't the corresponding leader/follower peer in `region_routes`. - /// - Region's table metadata is not found. - pub async fn find_staled_follower_regions( + /// Otherwise the lease of regions will't be renewed. + pub async fn renew_region_leases( &self, _cluster_id: u64, - datanode_id: u64, - datanode_regions: &[RegionId], - ) -> Result<(HashSet, HashSet)> { - let tables = self.collect_tables(datanode_regions); + datanode_id: DatanodeId, + regions: &[(RegionId, RegionRole)], + ) -> Result { + let region_ids = regions + .iter() + .map(|(region_id, _)| *region_id) + .collect::>(); + let tables = self.collect_tables(®ion_ids); let table_ids = tables.keys().copied().collect::>(); - - let metadata_subset = { - let _timer = metrics::METRIC_META_LOAD_FOLLOWER_METADATA.start_timer(); - self.collect_tables_metadata(&table_ids).await? - }; - - let mut upgradable_set = HashSet::new(); - let mut closeable_set = HashSet::new(); - - for (table_id, regions) in tables { - if let Some(metadata) = metadata_subset.get(&table_id) { - let region_routes = &metadata.region_routes; - - let (upgradable, closeable) = - find_staled_follower_regions(datanode_id, ®ions, region_routes); - - upgradable_set.extend(upgradable); - closeable_set.extend(closeable); - } else { - warn!( - "The table {} metadata is not found, appends closeable followers regions: {:?}", - table_id, regions - ); - // If table metadata is not found. - closeable_set.extend(regions); + let table_metadata = self.collect_tables_metadata(&table_ids).await?; + + let mut renewed = HashMap::new(); + let mut non_exists = HashSet::new(); + + for &(region, role) in regions { + match self.renew_region_lease(&table_metadata, datanode_id, region, role) { + Some((region, role)) => { + let _ = renewed.insert(region, role); + } + None => { + let _ = non_exists.insert(region); + } } } - Ok((upgradable_set, closeable_set)) + Ok(RenewRegionLeasesResponse { + renewed, + non_exists, + }) } #[cfg(test)] @@ -259,310 +265,244 @@ impl OpeningRegionKeeper { #[cfg(test)] mod tests { - use std::collections::HashSet; + use std::collections::{HashMap, HashSet}; use std::sync::Arc; use common_meta::key::test_utils::new_test_table_info; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::peer::Peer; - use common_meta::rpc::router::{Region, RegionRoute, RegionStatus}; + use common_meta::rpc::router::{Region, RegionRouteBuilder, RegionStatus}; + use store_api::region_engine::RegionRole; use store_api::storage::RegionId; use table::metadata::RawTableInfo; - use super::{OpeningRegionKeeper, RegionLeaseKeeper}; + use super::{renew_region_lease_via_region_route, OpeningRegionKeeper, RegionLeaseKeeper}; + use crate::region::lease_keeper::RenewRegionLeasesResponse; fn new_test_keeper() -> RegionLeaseKeeper { let store = Arc::new(MemoryKvBackend::new()); let table_metadata_manager = Arc::new(TableMetadataManager::new(store)); - RegionLeaseKeeper::new(table_metadata_manager) + let opening_region_keeper = Arc::new(OpeningRegionKeeper::default()); + RegionLeaseKeeper::new(table_metadata_manager, opening_region_keeper) } - #[tokio::test] - async fn test_empty_table_routes() { - let datanode_id = 1; - let region_number = 1u32; - let region_id = RegionId::from_u64(region_number as u64); + #[test] + fn test_opening_region_keeper() { + let keeper = OpeningRegionKeeper::new(); - let keeper = new_test_keeper(); + let guard = keeper.register(1, RegionId::from_u64(1)).unwrap(); + assert!(keeper.register(1, RegionId::from_u64(1)).is_none()); + let guard2 = keeper.register(1, RegionId::from_u64(2)).unwrap(); - let datanode_regions = vec![region_id]; + let output = keeper.filter_opening_regions( + 1, + HashSet::from([ + RegionId::from_u64(1), + RegionId::from_u64(2), + RegionId::from_u64(3), + ]), + ); + assert_eq!(output.len(), 1); + assert!(output.contains(&RegionId::from_u64(3))); - let (downgradable, closeable) = keeper - .find_staled_leader_regions(0, datanode_id, &datanode_regions) - .await - .unwrap(); + assert_eq!(keeper.len(), 2); + drop(guard); - assert_eq!(closeable.len(), 1); - assert!(closeable.contains(®ion_id)); - assert!(downgradable.is_empty()); + assert_eq!(keeper.len(), 1); - let (upgradable, closeable) = keeper - .find_staled_follower_regions(0, datanode_id, &datanode_regions) - .await - .unwrap(); + assert!(keeper.contains(1, RegionId::from_u64(2))); + drop(guard2); - assert!(upgradable.is_empty()); - assert_eq!(closeable.len(), 1); - assert!(closeable.contains(®ion_id)); + assert!(keeper.is_empty()); } - #[tokio::test] - async fn test_find_closeable_regions_simple() { - let datanode_id = 1; - let region_number = 1u32; - let table_id = 10; - let region_id = RegionId::new(table_id, region_number); - let peer = Peer::empty(datanode_id); - let table_info = new_test_table_info(table_id, vec![region_number]).into(); - - let region_routes = vec![RegionRoute { - region: Region::new_test(region_id), - leader_peer: Some(peer.clone()), - ..Default::default() - }]; - - let keeper = new_test_keeper(); - let table_metadata_manager = keeper.table_metadata_manager(); - - table_metadata_manager - .create_table_metadata(table_info, region_routes) - .await - .unwrap(); - - // `closeable` should be empty. - let datanode_regions = vec![region_id]; - - let (downgradable, closeable) = keeper - .find_staled_leader_regions(0, datanode_id, &datanode_regions) - .await + #[test] + fn test_renew_region_lease_via_region_route() { + let region_id = RegionId::new(1024, 1); + let leader_peer_id = 1024; + let follower_peer_id = 2048; + let mut region_route = RegionRouteBuilder::default() + .region(Region::new_test(region_id)) + .leader_peer(Peer::empty(leader_peer_id)) + .follower_peers(vec![Peer::empty(follower_peer_id)]) + .build() .unwrap(); - assert!(closeable.is_empty()); - assert!(downgradable.is_empty()); - - // `closeable` should be empty. - let datanode_regions = vec![]; + // The region doesn't belong to the datanode. + for region_id in [RegionId::new(1024, 2), region_id] { + assert!(renew_region_lease_via_region_route(®ion_route, 1, region_id).is_none()); + } - let (downgradable, closeable) = keeper - .find_staled_leader_regions(0, datanode_id, &datanode_regions) - .await - .unwrap(); + // The leader region on the datanode. + assert_eq!( + renew_region_lease_via_region_route(®ion_route, leader_peer_id, region_id), + Some((region_id, RegionRole::Leader)) + ); + // The follower region on the datanode. + assert_eq!( + renew_region_lease_via_region_route(®ion_route, follower_peer_id, region_id), + Some((region_id, RegionRole::Follower)) + ); - assert!(closeable.is_empty()); - assert!(downgradable.is_empty()); + region_route.leader_status = Some(RegionStatus::Downgraded); + // The downgraded leader region on the datanode. + assert_eq!( + renew_region_lease_via_region_route(®ion_route, leader_peer_id, region_id), + Some((region_id, RegionRole::Follower)) + ); } #[tokio::test] - async fn test_find_closeable_regions_2() { - let datanode_id = 1; - let region_number = 1u32; - let table_id = 10; - let region_id = RegionId::new(table_id, region_number); - let another_region_id = RegionId::new(table_id, region_number + 1); - let unknown_region_id = RegionId::new(table_id + 1, region_number); - - let peer = Peer::empty(datanode_id); - let another_peer = Peer::empty(datanode_id + 1); - - let table_info = - new_test_table_info(table_id, vec![region_number, region_number + 1]).into(); - - let region_routes = vec![ - RegionRoute { - region: Region::new_test(region_id), - leader_peer: Some(peer.clone()), - ..Default::default() - }, - RegionRoute { - region: Region::new_test(another_region_id), - leader_peer: None, - follower_peers: vec![another_peer.clone()], - leader_status: None, - }, - ]; - + async fn test_renew_region_leases_non_exists_regions() { let keeper = new_test_keeper(); - let table_metadata_manager = keeper.table_metadata_manager(); - - table_metadata_manager - .create_table_metadata(table_info, region_routes) - .await - .unwrap(); - - // Unexpected Leader region. - // `closeable` should be vec![unknown_region_id]. - let datanode_regions = vec![region_id, unknown_region_id]; - - let (downgradable, closeable) = keeper - .find_staled_leader_regions(0, datanode_id, &datanode_regions) - .await - .unwrap(); - - assert_eq!(closeable.len(), 1); - assert!(closeable.contains(&unknown_region_id)); - assert!(downgradable.is_empty()); - - // Expected as Follower region. - // `closeable` should be vec![another_region_id], because the `another_region_id` is a active region of `another_peer`. - let datanode_regions = vec![another_region_id]; - let (downgradable, closeable) = keeper - .find_staled_leader_regions(0, datanode_id, &datanode_regions) + let RenewRegionLeasesResponse { + non_exists, + renewed, + } = keeper + .renew_region_leases( + 0, + 1, + &[ + (RegionId::new(1024, 1), RegionRole::Follower), + (RegionId::new(1024, 2), RegionRole::Leader), + ], + ) .await .unwrap(); - assert_eq!(closeable.len(), 1); - assert!(closeable.contains(&another_region_id)); - assert!(downgradable.is_empty()); + assert!(renewed.is_empty()); + assert_eq!( + non_exists, + HashSet::from([RegionId::new(1024, 1), RegionId::new(1024, 2)]) + ); } #[tokio::test] - async fn test_find_staled_leader_region_downgraded() { - let datanode_id = 1; + async fn test_renew_region_leases_basic() { let region_number = 1u32; - let table_id = 10; - let region_id = RegionId::new(table_id, region_number); - let another_region_id = RegionId::new(table_id, region_number + 1); - let peer = Peer::empty(datanode_id); + let table_id = 1024; let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into(); - let region_routes = vec![RegionRoute { - region: Region::new_test(region_id), - leader_peer: Some(peer.clone()), - leader_status: Some(RegionStatus::Downgraded), - ..Default::default() - }]; - let keeper = new_test_keeper(); - let table_metadata_manager = keeper.table_metadata_manager(); - table_metadata_manager - .create_table_metadata(table_info, region_routes) - .await - .unwrap(); - - // `upgradable` should be empty, `closeable` should be empty. - let datanode_regions = vec![region_id, another_region_id]; - - let (downgradable, closeable) = keeper - .find_staled_leader_regions(0, datanode_id, &datanode_regions) - .await + let region_id = RegionId::new(table_id, 1); + let leader_peer_id = 1024; + let follower_peer_id = 2048; + let region_route = RegionRouteBuilder::default() + .region(Region::new_test(region_id)) + .leader_peer(Peer::empty(leader_peer_id)) + .follower_peers(vec![Peer::empty(follower_peer_id)]) + .build() .unwrap(); - assert_eq!(closeable.len(), 1); - assert!(closeable.contains(&another_region_id)); - assert_eq!(downgradable.len(), 1); - assert!(downgradable.contains(®ion_id)); - } - - #[tokio::test] - async fn test_find_staled_follower_regions() { - let datanode_id = 1; - let region_number = 1u32; - let table_id = 10; - let region_id = RegionId::new(table_id, region_number); - let peer = Peer::empty(datanode_id); - let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into(); - - let region_routes = vec![RegionRoute { - region: Region::new_test(region_id), - leader_peer: Some(peer.clone()), - ..Default::default() - }]; - let keeper = new_test_keeper(); let table_metadata_manager = keeper.table_metadata_manager(); - table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, vec![region_route.clone()]) .await .unwrap(); - // `upgradable` should be vec![region_id], `closeable` should be empty. - let datanode_regions = vec![region_id]; - - let (upgradable, closeable) = keeper - .find_staled_follower_regions(0, datanode_id, &datanode_regions) - .await - .unwrap(); + // The region doesn't belong to the datanode. + for region_id in [RegionId::new(1024, 2), region_id] { + let RenewRegionLeasesResponse { + non_exists, + renewed, + } = keeper + .renew_region_leases(0, 1, &[(region_id, RegionRole::Follower)]) + .await + .unwrap(); + assert!(renewed.is_empty()); + assert_eq!(non_exists, HashSet::from([region_id])); + } - assert!(closeable.is_empty()); - assert_eq!(upgradable.len(), 1); - assert!(upgradable.contains(®ion_id)); + // The leader region on the datanode. + for role in [RegionRole::Leader, RegionRole::Follower] { + let RenewRegionLeasesResponse { + non_exists, + renewed, + } = keeper + .renew_region_leases(0, leader_peer_id, &[(region_id, role)]) + .await + .unwrap(); + + assert!(non_exists.is_empty()); + assert_eq!(renewed, HashMap::from([(region_id, RegionRole::Leader)])); + } - // `upgradable` should be empty, `closeable` should be vec![region_id]. - let datanode_regions = vec![region_id]; + // The follower region on the datanode. + for role in [RegionRole::Leader, RegionRole::Follower] { + let RenewRegionLeasesResponse { + non_exists, + renewed, + } = keeper + .renew_region_leases(0, follower_peer_id, &[(region_id, role)]) + .await + .unwrap(); + + assert!(non_exists.is_empty()); + assert_eq!(renewed, HashMap::from([(region_id, RegionRole::Follower)])); + } - let (upgradable, closeable) = keeper - .find_staled_follower_regions(0, datanode_id + 1, &datanode_regions) - .await + let opening_region_id = RegionId::new(2048, 1); + let _guard = keeper + .opening_region_keeper + .register(leader_peer_id, opening_region_id) .unwrap(); - assert!(upgradable.is_empty()); - assert_eq!(closeable.len(), 1); - assert!(closeable.contains(®ion_id)); + // The opening region on the datanode. + // NOTES: The procedure lock will ensure only one opening leader. + for role in [RegionRole::Leader, RegionRole::Follower] { + let RenewRegionLeasesResponse { + non_exists, + renewed, + } = keeper + .renew_region_leases(0, leader_peer_id, &[(opening_region_id, role)]) + .await + .unwrap(); + + assert!(non_exists.is_empty()); + assert_eq!(renewed, HashMap::from([(opening_region_id, role)])); + } } #[tokio::test] - async fn test_find_staled_region_downgraded() { - let datanode_id = 1; + async fn test_renew_region_leases_with_downgrade_leader() { let region_number = 1u32; - let table_id = 10; - let region_id = RegionId::new(table_id, region_number); - let peer = Peer::empty(datanode_id); + let table_id = 1024; let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into(); - let region_routes = vec![RegionRoute { - region: Region::new_test(region_id), - leader_peer: Some(peer.clone()), - leader_status: Some(RegionStatus::Downgraded), - ..Default::default() - }]; + let region_id = RegionId::new(table_id, 1); + let leader_peer_id = 1024; + let follower_peer_id = 2048; + let region_route = RegionRouteBuilder::default() + .region(Region::new_test(region_id)) + .leader_peer(Peer::empty(leader_peer_id)) + .follower_peers(vec![Peer::empty(follower_peer_id)]) + .leader_status(RegionStatus::Downgraded) + .build() + .unwrap(); - let datanode_regions = vec![region_id]; let keeper = new_test_keeper(); let table_metadata_manager = keeper.table_metadata_manager(); table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, vec![region_route.clone()]) .await .unwrap(); - let (upgradable, closeable) = keeper - .find_staled_follower_regions(0, datanode_id, &datanode_regions) - .await - .unwrap(); - assert!(upgradable.is_empty()); - assert!(closeable.is_empty()); - } - - #[test] - fn test_opening_region_keeper() { - let keeper = OpeningRegionKeeper::new(); - - let guard = keeper.register(1, RegionId::from_u64(1)).unwrap(); - assert!(keeper.register(1, RegionId::from_u64(1)).is_none()); - let guard2 = keeper.register(1, RegionId::from_u64(2)).unwrap(); - - let output = keeper.filter_opening_regions( - 1, - HashSet::from([ - RegionId::from_u64(1), - RegionId::from_u64(2), - RegionId::from_u64(3), - ]), - ); - assert_eq!(output.len(), 1); - assert!(output.contains(&RegionId::from_u64(3))); - - assert_eq!(keeper.len(), 2); - drop(guard); - - assert_eq!(keeper.len(), 1); - - assert!(keeper.contains(1, RegionId::from_u64(2))); - drop(guard2); - - assert!(keeper.is_empty()); + // The leader region on the datanode. + for role in [RegionRole::Leader, RegionRole::Follower] { + let RenewRegionLeasesResponse { + non_exists, + renewed, + } = keeper + .renew_region_leases(0, follower_peer_id, &[(region_id, role)]) + .await + .unwrap(); + + assert!(non_exists.is_empty()); + assert_eq!(renewed, HashMap::from([(region_id, RegionRole::Follower)])); + } } } diff --git a/src/meta-srv/src/region/lease_keeper/mito.rs b/src/meta-srv/src/region/lease_keeper/mito.rs deleted file mode 100644 index ee3ebe44246b..000000000000 --- a/src/meta-srv/src/region/lease_keeper/mito.rs +++ /dev/null @@ -1,122 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashSet; - -use common_meta::rpc::router::{ - convert_to_region_leader_map, convert_to_region_leader_status_map, RegionRoute, -}; -use store_api::storage::RegionId; - -use super::utils::downgradable_leader_regions; -use crate::region::lease_keeper::utils::closeable_leader_region; - -/// Returns Downgradable regions and closeable regions. -/// -/// - Downgradable regions: -/// Region's peer(`datanode_id`) is the corresponding downgraded leader peer in `region_routes`. -/// -/// - closeable regions: -/// Region's peer(`datanode_id`) isn't the corresponding leader peer in `region_routes`. -/// - Expected as [RegionRole::Follower](store_api::region_engine::RegionRole::Follower) regions. -/// - Unexpected [RegionRole::Leader](store_api::region_engine::RegionRole::Leader) regions. -pub fn find_staled_leader_regions( - datanode_id: u64, - datanode_regions: &[RegionId], - region_routes: &[RegionRoute], -) -> (HashSet, HashSet) { - let region_leader_map = convert_to_region_leader_map(region_routes); - let region_leader_status_map = convert_to_region_leader_status_map(region_routes); - - let (downgradable, closeable): (HashSet<_>, HashSet<_>) = datanode_regions - .iter() - .map(|region_id| { - ( - downgradable_leader_regions( - datanode_id, - *region_id, - ®ion_leader_map, - ®ion_leader_status_map, - ), - closeable_leader_region(datanode_id, *region_id, ®ion_leader_map), - ) - }) - .unzip(); - - let downgradable = downgradable.into_iter().flatten().collect(); - let closeable = closeable.into_iter().flatten().collect(); - - (downgradable, closeable) -} - -#[cfg(test)] -mod tests { - - use common_meta::peer::Peer; - use common_meta::rpc::router::{Region, RegionRoute}; - use store_api::storage::RegionId; - - use crate::region::lease_keeper::mito::find_staled_leader_regions; - - #[test] - fn test_find_staled_regions() { - let datanode_id = 1u64; - let region_number = 1u32; - let region_id = RegionId::from_u64(region_number as u64); - let peer = Peer::empty(datanode_id); - let another_peer = Peer::empty(datanode_id + 1); - - let datanode_regions = vec![region_id]; - let region_routes = vec![RegionRoute { - region: Region::new_test(region_id), - leader_peer: Some(peer.clone()), - ..Default::default() - }]; - - // Grants lease. - // `closeable` should be empty, `region_id` is a active leader region of the `peer` - let (downgradable, closeable) = - find_staled_leader_regions(datanode_id, &datanode_regions, ®ion_routes); - - assert!(closeable.is_empty()); - assert!(downgradable.is_empty()); - - // Unexpected Leader region. - // `closeable` should be vec![`region_id`]; - let (downgradable, closeable) = - find_staled_leader_regions(datanode_id, &datanode_regions, &[]); - - assert_eq!(closeable.len(), 1); - assert!(closeable.contains(®ion_id)); - assert!(downgradable.is_empty()); - - let region_routes = vec![RegionRoute { - region: Region::new_test(region_id), - leader_peer: Some(another_peer.clone()), - follower_peers: vec![peer.clone()], - leader_status: None, - }]; - - let retained_active_regions = datanode_regions.clone(); - - // Expected as Follower region. - // `closeable` should be vec![`region_id`], `region_id` is RegionRole::Leader. - let (downgradable, closeable) = - find_staled_leader_regions(datanode_id, &retained_active_regions, ®ion_routes); - - assert!(downgradable.is_empty()); - assert_eq!(closeable.len(), 1); - assert!(closeable.contains(®ion_id)); - } -} diff --git a/src/meta-srv/src/region/lease_keeper/utils.rs b/src/meta-srv/src/region/lease_keeper/utils.rs deleted file mode 100644 index 835f37f67dee..000000000000 --- a/src/meta-srv/src/region/lease_keeper/utils.rs +++ /dev/null @@ -1,340 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::{HashMap, HashSet}; - -use common_meta::peer::Peer; -use common_meta::rpc::router::{ - convert_to_region_leader_map, convert_to_region_leader_status_map, convert_to_region_peer_map, - RegionRoute, RegionStatus, -}; -use store_api::storage::{RegionId, RegionNumber}; - -/// Returns Some(region_id) if it's not a leader region in `region_route`. -/// -/// It removes a leader region if its peer(`node_id`) isn't the corresponding leader peer in `region_routes`. -pub fn closeable_leader_region( - node_id: u64, - region_id: RegionId, - region_leader_map: &HashMap, -) -> Option { - let region_number = region_id.region_number(); - if let Some(peer) = region_leader_map.get(®ion_number) { - if peer.id == node_id { - None - } else { - Some(region_id) - } - } else { - Some(region_id) - } -} - -/// Returns Some(region_id) if its peer(`node_id`) a downgrade leader region peer in `region_route`. -pub fn downgradable_leader_regions( - node_id: u64, - region_id: RegionId, - region_leader_map: &HashMap, - region_leader_status: &HashMap, -) -> Option { - let region_number = region_id.region_number(); - let leader_status = region_leader_status.get(®ion_number); - let downgraded = matches!(leader_status, Some(RegionStatus::Downgraded)); - - if let Some(peer) = region_leader_map.get(®ion_number) { - if peer.id == node_id && downgraded { - Some(region_id) - } else { - None - } - } else { - None - } -} - -/// Returns upgradable regions, and closeable regions. -/// -/// Upgradable regions: -/// - Region's peer(`datanode_id`) is the corresponding leader peer in `region_routes`. -/// -/// closeable regions: -/// - Region's peer(`datanode_id`) isn't the corresponding leader/follower peer in `region_routes`. -pub fn find_staled_follower_regions( - datanode_id: u64, - datanode_regions: &[RegionId], - region_routes: &[RegionRoute], -) -> (HashSet, HashSet) { - let region_leader_map = convert_to_region_leader_map(region_routes); - let region_leader_status_map = convert_to_region_leader_status_map(region_routes); - let region_peer_map = convert_to_region_peer_map(region_routes); - - let (upgradable, closeable): (HashSet>, HashSet>) = - datanode_regions - .iter() - .map(|region_id| { - ( - upgradable_follower_region( - datanode_id, - *region_id, - ®ion_leader_map, - ®ion_leader_status_map, - ), - closeable_region(datanode_id, *region_id, ®ion_peer_map), - ) - }) - .unzip(); - - let upgradable = upgradable.into_iter().flatten().collect(); - let closeable = closeable.into_iter().flatten().collect(); - - (upgradable, closeable) -} - -/// Returns Some(region) if its peer(`node_id`) a leader region peer in `region_routes`. -pub fn upgradable_follower_region( - node_id: u64, - region_id: RegionId, - region_leader_map: &HashMap, - region_leader_status: &HashMap, -) -> Option { - let region_number = region_id.region_number(); - let leader_status = region_leader_status.get(®ion_number); - let downgraded = matches!(leader_status, Some(RegionStatus::Downgraded)); - - if let Some(peer) = region_leader_map.get(®ion_number) { - if peer.id == node_id && !downgraded { - Some(region_id) - } else { - None - } - } else { - None - } -} - -/// Returns Some(region) if its peer(`node_id) is't a leader or follower region peer in `region_routes`. -pub fn closeable_region( - node_id: u64, - region_id: RegionId, - region_peer_map: &HashMap>, -) -> Option { - if let Some(set) = region_peer_map.get(®ion_id.region_number()) { - if set.get(&node_id).is_some() { - None - } else { - Some(region_id) - } - } else { - Some(region_id) - } -} - -#[cfg(test)] -mod tests { - - use common_meta::peer::Peer; - use store_api::storage::RegionId; - - use super::*; - - #[test] - fn test_closeable_leader_region() { - let datanode_id = 1u64; - let region_number = 1u32; - let region_id = RegionId::from_u64(region_number as u64); - let peer = Peer::empty(datanode_id); - - let region_leader_map = [(region_number, &peer)].into(); - - // Should be None, `region_id` is an active region of `peer`. - assert_eq!( - None, - closeable_leader_region(datanode_id, region_id, ®ion_leader_map,) - ); - - // Should be Some(`region_id`), incorrect datanode_id. - assert_eq!( - Some(region_id), - closeable_leader_region(datanode_id + 1, region_id, ®ion_leader_map,) - ); - - // Should be Some(`region_id`), the inactive_leader_regions is empty. - assert_eq!( - Some(region_id), - closeable_leader_region(datanode_id, region_id, &Default::default(),) - ); - - let another_peer = Peer::empty(datanode_id + 1); - let region_leader_map = [(region_number, &another_peer)].into(); - - // Should be Some(`region_id`), `region_id` is active region of `another_peer`. - assert_eq!( - Some(region_id), - closeable_leader_region(datanode_id, region_id, ®ion_leader_map,) - ); - } - - #[test] - fn test_downgradable_region() { - let datanode_id = 1u64; - let region_number = 1u32; - let region_id = RegionId::from_u64(region_number as u64); - let peer = Peer::empty(datanode_id); - - let region_leader_map = [(region_number, &peer)].into(); - let region_leader_status_map = [(region_number, RegionStatus::Downgraded)].into(); - - // Should be Some(region_id), `region_id` is a downgraded leader region. - assert_eq!( - Some(region_id), - downgradable_leader_regions( - datanode_id, - region_id, - ®ion_leader_map, - ®ion_leader_status_map - ) - ); - - // Should be None, `region_id` is a leader region. - assert_eq!( - None, - downgradable_leader_regions( - datanode_id, - region_id, - ®ion_leader_map, - &Default::default(), - ) - ); - - // Should be None, incorrect datanode_id. - assert_eq!( - None, - downgradable_leader_regions( - datanode_id + 1, - region_id, - ®ion_leader_map, - ®ion_leader_status_map - ) - ); - - // Should be None, incorrect datanode_id. - assert_eq!( - None, - downgradable_leader_regions( - datanode_id + 1, - region_id, - ®ion_leader_map, - &Default::default(), - ) - ); - } - - #[test] - fn test_closeable_follower_region() { - let region_number = 1u32; - let region_id = RegionId::from_u64(region_number as u64); - let another_region_id = RegionId::from_u64(region_number as u64 + 1); - let region_peer_map = [(region_number, HashSet::from([1, 2, 3]))].into(); - - // Should be None. - assert_eq!(None, closeable_region(1, region_id, ®ion_peer_map)); - - // Should be Some(`region_id`), incorrect `datanode_id`. - assert_eq!( - Some(region_id), - closeable_region(4, region_id, ®ion_peer_map) - ); - - // Should be Some(`another_region_id`), `another_region_id` doesn't exist. - assert_eq!( - Some(another_region_id), - closeable_region(1, another_region_id, ®ion_peer_map) - ); - - // Should be Some(`another_region_id`), `another_region_id` doesn't exist, incorrect `datanode_id`. - assert_eq!( - Some(another_region_id), - closeable_region(4, another_region_id, ®ion_peer_map) - ); - } - - #[test] - fn test_upgradable_follower_region() { - let datanode_id = 1u64; - let region_number = 1u32; - let region_id = RegionId::from_u64(region_number as u64); - let another_region_id = RegionId::from_u64(region_number as u64 + 1); - let peer = Peer::empty(datanode_id); - - let region_leader_map = [(region_number, &peer)].into(); - let region_leader_status = HashMap::new(); - - // Should be Some(region_id), `region_id` is a leader region. - assert_eq!( - Some(region_id), - upgradable_follower_region( - datanode_id, - region_id, - ®ion_leader_map, - ®ion_leader_status - ) - ); - - let downgraded_leader = [(region_number, RegionStatus::Downgraded)].into(); - - // Should be None, `region_id` is a downgraded leader region. - assert_eq!( - None, - upgradable_follower_region( - datanode_id, - region_id, - ®ion_leader_map, - &downgraded_leader - ) - ); - - // Should be None, incorrect `datanode_id`. - assert_eq!( - None, - upgradable_follower_region( - datanode_id + 1, - region_id, - ®ion_leader_map, - ®ion_leader_status - ) - ); - - // Should be None, incorrect `datanode_id`, `another_region_id` doesn't exist. - assert_eq!( - None, - upgradable_follower_region( - datanode_id, - another_region_id, - ®ion_leader_map, - ®ion_leader_status - ) - ); - - // Should be None, incorrect `datanode_id`, `another_region_id` doesn't exist, incorrect `datanode_id`. - assert_eq!( - None, - upgradable_follower_region( - datanode_id + 1, - another_region_id, - ®ion_leader_map, - ®ion_leader_status - ) - ); - } -}