Skip to content

Commit

Permalink
chore: apply suggestions from CR
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Nov 13, 2023
1 parent 80e6049 commit d6a35b3
Showing 1 changed file with 49 additions and 18 deletions.
67 changes: 49 additions & 18 deletions src/meta-srv/src/handler/region_lease_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::sync::Arc;

use api::v1::meta::{HeartbeatRequest, RegionLease, Role};
use async_trait::async_trait;
use common_meta::key::TableMetadataManagerRef;
use store_api::region_engine::{GrantedRegion, RegionRole};
use store_api::storage::RegionId;

use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
Expand All @@ -41,6 +43,37 @@ impl RegionLeaseHandler {
}
}

fn flip_role(role: RegionRole) -> RegionRole {
match role {
RegionRole::Follower => RegionRole::Leader,
RegionRole::Leader => RegionRole::Follower,
}
}

/// Grants lease of regions.
///
/// - If a region is in an `operable` set, it will be granted an `operation(initial)`([RegionRole]);
/// otherwise, it will be granted a `initial`([RegionRole]).
/// - If a region is in a `closable` set, it won't be granted.
fn grant(
granted_regions: &mut Vec<GrantedRegion>,
operable: &HashSet<RegionId>,
closable: &HashSet<RegionId>,
regions: &[RegionId],
initial: RegionRole,
operation: impl Fn(RegionRole) -> RegionRole,
) {
for region in regions {
if operable.contains(region) {
granted_regions.push(GrantedRegion::new(*region, operation(initial)));
} else if closable.contains(region) {
// Filters out the closable regions.
} else {
granted_regions.push(GrantedRegion::new(*region, initial))
}
}
}

#[async_trait]
impl HeartbeatHandler for RegionLeaseHandler {
fn is_acceptable(&self, role: Role) -> bool {
Expand Down Expand Up @@ -77,15 +110,14 @@ impl HeartbeatHandler for RegionLeaseHandler {
.find_staled_leader_regions(cluster_id, datanode_id, &leaders)
.await?;

for leader in leaders {
if downgradable.contains(&leader) {
granted_regions.push(GrantedRegion::new(leader, RegionRole::Follower))
} else if closable.contains(&leader) {
// Filters out the closable regions.
} else {
granted_regions.push(GrantedRegion::new(leader, RegionRole::Leader))
}
}
grant(
&mut granted_regions,
&downgradable,
&closable,
&leaders,
RegionRole::Leader,
flip_role,
);

let followers = followers.into_iter().flatten().collect::<Vec<_>>();

Expand All @@ -94,15 +126,14 @@ impl HeartbeatHandler for RegionLeaseHandler {
.find_staled_follower_regions(cluster_id, datanode_id, &followers)
.await?;

for follower in followers {
if upgradeable.contains(&follower) {
granted_regions.push(GrantedRegion::new(follower, RegionRole::Leader))
} else if closable.contains(&follower) {
// Filters out the closable regions.
} else {
granted_regions.push(GrantedRegion::new(follower, RegionRole::Follower))
}
}
grant(
&mut granted_regions,
&upgradeable,
&closable,
&followers,
RegionRole::Follower,
flip_role,
);

acc.region_lease = Some(RegionLease {
regions: granted_regions
Expand Down

0 comments on commit d6a35b3

Please sign in to comment.