Skip to content

Commit

Permalink
refactor: simplify lease keeper
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 5, 2023
1 parent 7155bfa commit fd98745
Show file tree
Hide file tree
Showing 8 changed files with 308 additions and 890 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datatypes.workspace = true
derive_builder.workspace = true
etcd-client.workspace = true
futures.workspace = true
humantime-serde.workspace = true
Expand Down
10 changes: 10 additions & 0 deletions src/common/meta/src/key/table_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashMap;
use std::fmt::Display;

use serde::{Deserialize, Serialize};
use store_api::storage::RegionId;
use table::metadata::TableId;

use super::DeserializedValueWithBytes;
Expand Down Expand Up @@ -50,12 +51,21 @@ impl TableRouteValue {
}
}

/// Returns a new version [TableRouteValue] with `region_routes`.
pub fn update(&self, region_routes: Vec<RegionRoute>) -> Self {
Self {
region_routes,
version: self.version + 1,
}
}

/// Returns the corresponding [RegionRoute].
pub fn region_route(&self, region_id: RegionId) -> Option<RegionRoute> {
self.region_routes
.iter()
.find(|route| route.region.id == region_id)
.cloned()
}
}

impl TableMetaKey for TableRouteKey {
Expand Down
6 changes: 5 additions & 1 deletion src/common/meta/src/rpc/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use api::v1::meta::{
Partition as PbPartition, Peer as PbPeer, Region as PbRegion, Table as PbTable,
TableRoute as PbTableRoute,
};
use derive_builder::Builder;
use serde::ser::SerializeSeq;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use snafu::OptionExt;
Expand Down Expand Up @@ -231,12 +232,15 @@ impl From<Table> for PbTable {
}
}

#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq)]
#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq, Builder)]
pub struct RegionRoute {
pub region: Region,
#[builder(setter(into, strip_option))]
pub leader_peer: Option<Peer>,
#[builder(setter(into, strip_option), default)]
pub follower_peers: Vec<Peer>,
/// `None` by default.
#[builder(setter(into, strip_option), default)]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub leader_status: Option<RegionStatus>,
}
Expand Down
132 changes: 28 additions & 104 deletions src/meta-srv/src/handler/region_lease_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::sync::Arc;

use api::v1::meta::{HeartbeatRequest, RegionLease, Role};
use async_trait::async_trait;
use common_meta::key::TableMetadataManagerRef;
use common_telemetry::info;
use store_api::region_engine::{GrantedRegion, RegionRole};
use store_api::storage::RegionId;
use store_api::region_engine::GrantedRegion;

use crate::error::Result;
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
use crate::region::lease_keeper::{OpeningRegionKeeperRef, RegionLeaseKeeperRef};
use crate::region::lease_keeper::{
OpeningRegionKeeperRef, RegionLeaseKeeperRef, RenewRegionLeasesResponse,
};
use crate::region::RegionLeaseKeeper;

pub struct RegionLeaseHandler {
region_lease_seconds: u64,
region_lease_keeper: RegionLeaseKeeperRef,
opening_region_keeper: OpeningRegionKeeperRef,
}

impl RegionLeaseHandler {
Expand All @@ -40,42 +38,12 @@ impl RegionLeaseHandler {
table_metadata_manager: TableMetadataManagerRef,
opening_region_keeper: OpeningRegionKeeperRef,
) -> Self {
let region_lease_keeper = RegionLeaseKeeper::new(table_metadata_manager);
let region_lease_keeper =
RegionLeaseKeeper::new(table_metadata_manager, opening_region_keeper.clone());

Self {
region_lease_seconds,
region_lease_keeper: Arc::new(region_lease_keeper),
opening_region_keeper,
}
}
}

fn flip_role(role: RegionRole) -> RegionRole {
match role {
RegionRole::Follower => RegionRole::Leader,
RegionRole::Leader => RegionRole::Follower,
}
}

/// Grants lease of regions.
///
/// - If a region is in an `operable` set, it will be granted an `flip_role(current)`([RegionRole]);
/// otherwise, it will be granted a `current`([RegionRole]).
/// - If a region is in a `closeable` set, it won't be granted.
fn grant(
granted_regions: &mut Vec<GrantedRegion>,
operable: &HashSet<RegionId>,
closeable: &HashSet<RegionId>,
regions: &[RegionId],
current: RegionRole,
) {
for region in regions {
if operable.contains(region) {
granted_regions.push(GrantedRegion::new(*region, flip_role(current)));
} else if closeable.contains(region) {
// Filters out the closeable regions.
} else {
granted_regions.push(GrantedRegion::new(*region, current))
}
}
}
Expand All @@ -99,87 +67,41 @@ impl HeartbeatHandler for RegionLeaseHandler {
let regions = stat.regions();
let cluster_id = stat.cluster_id;
let datanode_id = stat.id;
let mut granted_regions = Vec::with_capacity(regions.len());
let mut inactive_regions = HashSet::new();

let (leaders, followers): (Vec<_>, Vec<_>) = regions
.into_iter()
.map(|(id, role)| match role {
RegionRole::Follower => (None, Some(id)),
RegionRole::Leader => (Some(id), None),
})
.unzip();

let leaders = leaders.into_iter().flatten().collect::<Vec<_>>();

let (downgradable, closeable) = self
let RenewRegionLeasesResponse {
non_exists,
renewed,
} = self
.region_lease_keeper
.find_staled_leader_regions(cluster_id, datanode_id, &leaders)
.renew_region_leases(cluster_id, datanode_id, &regions)
.await?;

grant(
&mut granted_regions,
&downgradable,
&closeable,
&leaders,
RegionRole::Leader,
);
if !closeable.is_empty() {
info!(
"Granting region lease, found closeable leader regions: {:?} on datanode {}",
closeable, datanode_id
);
}
inactive_regions.extend(closeable);

let followers = followers.into_iter().flatten().collect::<Vec<_>>();

let (upgradeable, closeable) = self
.region_lease_keeper
.find_staled_follower_regions(cluster_id, datanode_id, &followers)
.await?;

// If a region is opening, it will be filtered out from the closeable regions set.
let closeable = self
.opening_region_keeper
.filter_opening_regions(datanode_id, closeable);

grant(
&mut granted_regions,
&upgradeable,
&closeable,
&followers,
RegionRole::Follower,
);
if !closeable.is_empty() {
info!(
"Granting region lease, found closeable follower regions {:?} on datanode {}",
closeable, datanode_id
);
}
inactive_regions.extend(closeable);
let renewed = renewed
.into_iter()
.map(|(region_id, region_role)| {
GrantedRegion {
region_id,
region_role,
}
.into()
})
.collect::<Vec<_>>();

acc.region_lease = Some(RegionLease {
regions: granted_regions
.into_iter()
.map(Into::into)
.collect::<Vec<_>>(),
regions: renewed,
duration_since_epoch: req.duration_since_epoch,
lease_seconds: self.region_lease_seconds,
closeable_region_ids: inactive_regions
.iter()
.map(|region| region.as_u64())
.collect(),
closeable_region_ids: non_exists.iter().map(|region| region.as_u64()).collect(),
});
acc.inactive_region_ids = inactive_regions;
acc.inactive_region_ids = non_exists;

Ok(HandleControl::Continue)
}
}

#[cfg(test)]
mod test {
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use common_meta::distributed_time_constants;
Expand All @@ -188,6 +110,7 @@ mod test {
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute, RegionStatus};
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;

use super::*;
Expand All @@ -200,7 +123,8 @@ mod test {

let table_metadata_manager = Arc::new(TableMetadataManager::new(store));

RegionLeaseKeeper::new(table_metadata_manager)
let opening_keeper = Arc::new(OpeningRegionKeeper::default());
RegionLeaseKeeper::new(table_metadata_manager, opening_keeper)
}

fn new_empty_region_stat(region_id: RegionId, role: RegionRole) -> RegionStat {
Expand Down
Loading

0 comments on commit fd98745

Please sign in to comment.