diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 8b1f38614754..d907a24a78c7 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -12,12 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::sync::Arc; use api::v1::meta::{HeartbeatRequest, RegionLease, Role}; use async_trait::async_trait; use common_meta::key::TableMetadataManagerRef; use store_api::region_engine::{GrantedRegion, RegionRole}; +use store_api::storage::RegionId; use crate::error::Result; use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; @@ -41,6 +43,37 @@ impl RegionLeaseHandler { } } +fn flip_role(role: RegionRole) -> RegionRole { + match role { + RegionRole::Follower => RegionRole::Leader, + RegionRole::Leader => RegionRole::Follower, + } +} + +/// Grants lease of regions. +/// +/// - If a region is in an `operable` set, it will be granted an `operation(initial)`([RegionRole]); +/// otherwise, it will be granted a `initial`([RegionRole]). +/// - If a region is in a `closable` set, it won't be granted. +fn grant( + granted_regions: &mut Vec, + operable: &HashSet, + closable: &HashSet, + regions: &[RegionId], + initial: RegionRole, + operation: impl Fn(RegionRole) -> RegionRole, +) { + for region in regions { + if operable.contains(region) { + granted_regions.push(GrantedRegion::new(*region, operation(initial))); + } else if closable.contains(region) { + // Filters out the closable regions. + } else { + granted_regions.push(GrantedRegion::new(*region, initial)) + } + } +} + #[async_trait] impl HeartbeatHandler for RegionLeaseHandler { fn is_acceptable(&self, role: Role) -> bool { @@ -77,15 +110,14 @@ impl HeartbeatHandler for RegionLeaseHandler { .find_staled_leader_regions(cluster_id, datanode_id, &leaders) .await?; - for leader in leaders { - if downgradable.contains(&leader) { - granted_regions.push(GrantedRegion::new(leader, RegionRole::Follower)) - } else if closable.contains(&leader) { - // Filters out the closable regions. - } else { - granted_regions.push(GrantedRegion::new(leader, RegionRole::Leader)) - } - } + grant( + &mut granted_regions, + &downgradable, + &closable, + &leaders, + RegionRole::Leader, + flip_role, + ); let followers = followers.into_iter().flatten().collect::>(); @@ -94,15 +126,14 @@ impl HeartbeatHandler for RegionLeaseHandler { .find_staled_follower_regions(cluster_id, datanode_id, &followers) .await?; - for follower in followers { - if upgradeable.contains(&follower) { - granted_regions.push(GrantedRegion::new(follower, RegionRole::Leader)) - } else if closable.contains(&follower) { - // Filters out the closable regions. - } else { - granted_regions.push(GrantedRegion::new(follower, RegionRole::Follower)) - } - } + grant( + &mut granted_regions, + &upgradeable, + &closable, + &followers, + RegionRole::Follower, + flip_role, + ); acc.region_lease = Some(RegionLease { regions: granted_regions