Skip to content

Commit

Permalink
feat: grant lease to opening regions (#2752)
Browse files Browse the repository at this point in the history
* feat: add OpeningRegionKeeper

* feat: grant lease to opening regions
  • Loading branch information
WenyXu authored Nov 17, 2023
1 parent 1420353 commit b28af94
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 4 deletions.
53 changes: 51 additions & 2 deletions src/meta-srv/src/handler/region_lease_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,27 @@ use store_api::storage::RegionId;
use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
use crate::region::lease_keeper::RegionLeaseKeeperRef;
use crate::region::lease_keeper::{OpeningRegionKeeperRef, RegionLeaseKeeperRef};
use crate::region::RegionLeaseKeeper;

pub struct RegionLeaseHandler {
region_lease_seconds: u64,
region_lease_keeper: RegionLeaseKeeperRef,
opening_region_keeper: OpeningRegionKeeperRef,
}

impl RegionLeaseHandler {
pub fn new(region_lease_seconds: u64, table_metadata_manager: TableMetadataManagerRef) -> Self {
pub fn new(
region_lease_seconds: u64,
table_metadata_manager: TableMetadataManagerRef,
opening_region_keeper: OpeningRegionKeeperRef,
) -> Self {
let region_lease_keeper = RegionLeaseKeeper::new(table_metadata_manager);

Self {
region_lease_seconds,
region_lease_keeper: Arc::new(region_lease_keeper),
opening_region_keeper,
}
}
}
Expand Down Expand Up @@ -124,6 +130,11 @@ impl HeartbeatHandler for RegionLeaseHandler {
.find_staled_follower_regions(cluster_id, datanode_id, &followers)
.await?;

// If a region is opening, it will be filtered out from the closable regions set.
let closable = self
.opening_region_keeper
.filter_opening_regions(datanode_id, closable);

grant(
&mut granted_regions,
&upgradeable,
Expand Down Expand Up @@ -161,6 +172,7 @@ mod test {
use super::*;
use crate::handler::node_stat::{RegionStat, Stat};
use crate::metasrv::builder::MetaSrvBuilder;
use crate::region::lease_keeper::OpeningRegionKeeper;

fn new_test_keeper() -> RegionLeaseKeeper {
let store = Arc::new(MemoryKvBackend::new());
Expand Down Expand Up @@ -230,9 +242,12 @@ mod test {
..Default::default()
};

let opening_region_keeper = Arc::new(OpeningRegionKeeper::default());

let handler = RegionLeaseHandler::new(
distributed_time_constants::REGION_LEASE_SECS,
table_metadata_manager.clone(),
opening_region_keeper.clone(),
);

handler.handle(&req, ctx, acc).await.unwrap();
Expand Down Expand Up @@ -262,6 +277,39 @@ mod test {
acc,
vec![GrantedRegion::new(region_id, RegionRole::Follower)],
);

let opening_region_id = RegionId::new(table_id, region_number + 2);
let _guard = opening_region_keeper
.register(follower_peer.id, opening_region_id)
.unwrap();

let acc = &mut HeartbeatAccumulator::default();

acc.stat = Some(Stat {
cluster_id,
id: follower_peer.id,
region_stats: vec![
new_empty_region_stat(region_id, RegionRole::Follower),
new_empty_region_stat(another_region_id, RegionRole::Follower),
new_empty_region_stat(opening_region_id, RegionRole::Follower),
],
..Default::default()
});

handler.handle(&req, ctx, acc).await.unwrap();

assert_eq!(
acc.region_lease.as_ref().unwrap().lease_seconds,
distributed_time_constants::REGION_LEASE_SECS
);

assert_region_lease(
acc,
vec![
GrantedRegion::new(region_id, RegionRole::Follower),
GrantedRegion::new(opening_region_id, RegionRole::Follower),
],
);
}

#[tokio::test]
Expand Down Expand Up @@ -325,6 +373,7 @@ mod test {
let handler = RegionLeaseHandler::new(
distributed_time_constants::REGION_LEASE_SECS,
table_metadata_manager.clone(),
Default::default(),
);

handler.handle(&req, ctx, acc).await.unwrap();
Expand Down
6 changes: 6 additions & 0 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::handler::HeartbeatHandlerGroup;
use crate::lock::DistLockRef;
use crate::pubsub::{PublishRef, SubscribeManagerRef};
use crate::region::lease_keeper::OpeningRegionKeeperRef;
use crate::selector::{Selector, SelectorType};
use crate::service::mailbox::MailboxRef;
use crate::service::store::cached_kv::LeaderCachedKvBackend;
Expand Down Expand Up @@ -241,6 +242,7 @@ pub struct MetaSrv {
mailbox: MailboxRef,
ddl_executor: DdlTaskExecutorRef,
table_metadata_manager: TableMetadataManagerRef,
opening_region_keeper: OpeningRegionKeeperRef,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,

plugins: Plugins,
Expand Down Expand Up @@ -403,6 +405,10 @@ impl MetaSrv {
&self.table_metadata_manager
}

pub fn opening_region_keeper(&self) -> &OpeningRegionKeeperRef {
&self.opening_region_keeper
}

pub fn publish(&self) -> Option<PublishRef> {
self.plugins.get::<PublishRef>()
}
Expand Down
4 changes: 4 additions & 0 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use crate::metasrv::{
};
use crate::procedure::region_failover::RegionFailoverManager;
use crate::pubsub::PublishRef;
use crate::region::lease_keeper::OpeningRegionKeeper;
use crate::selector::lease_based::LeaseBasedSelector;
use crate::service::mailbox::MailboxRef;
use crate::service::store::cached_kv::{CheckLeader, LeaderCachedKvBackend};
Expand Down Expand Up @@ -197,6 +198,7 @@ impl MetaSrvBuilder {
&table_id_sequence,
);
let _ = ddl_manager.try_start();
let opening_region_keeper = Arc::new(OpeningRegionKeeper::default());

let handler_group = match handler_group {
Some(handler_group) => handler_group,
Expand Down Expand Up @@ -231,6 +233,7 @@ impl MetaSrvBuilder {
let region_lease_handler = RegionLeaseHandler::new(
distributed_time_constants::REGION_LEASE_SECS,
table_metadata_manager.clone(),
opening_region_keeper.clone(),
);

let group = HeartbeatHandlerGroup::new(pushers);
Expand Down Expand Up @@ -283,6 +286,7 @@ impl MetaSrvBuilder {
)
.await,
plugins: plugins.unwrap_or_else(Plugins::default),
opening_region_keeper,
})
}
}
Expand Down
111 changes: 109 additions & 2 deletions src/meta-srv/src/region/lease_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ pub mod mito;
pub mod utils;

use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use common_meta::key::table_route::TableRouteValue;
use common_meta::key::TableMetadataManagerRef;
use common_meta::DatanodeId;
use snafu::ResultExt;
use store_api::storage::{RegionId, TableId};

Expand Down Expand Up @@ -156,8 +157,84 @@ impl RegionLeaseKeeper {
}
}

#[derive(Debug, Clone)]
pub struct OpeningRegionGuard {
datanode_id: DatanodeId,
region_id: RegionId,
inner: Arc<RwLock<HashSet<(DatanodeId, RegionId)>>>,
}

impl Drop for OpeningRegionGuard {
fn drop(&mut self) {
let mut inner = self.inner.write().unwrap();
inner.remove(&(self.datanode_id, self.region_id));
}
}

pub type OpeningRegionKeeperRef = Arc<OpeningRegionKeeper>;

#[derive(Debug, Clone, Default)]
/// Tracks opening regions.
pub struct OpeningRegionKeeper {
inner: Arc<RwLock<HashSet<(DatanodeId, RegionId)>>>,
}

impl OpeningRegionKeeper {
pub fn new() -> Self {
Default::default()
}

/// Returns [OpeningRegionGuard] if Region(`region_id`) on Peer(`datanode_id`) does not exist.
pub fn register(
&self,
datanode_id: DatanodeId,
region_id: RegionId,
) -> Option<OpeningRegionGuard> {
let mut inner = self.inner.write().unwrap();

if inner.insert((datanode_id, region_id)) {
Some(OpeningRegionGuard {
datanode_id,
region_id,
inner: self.inner.clone(),
})
} else {
None
}
}

/// Returns true if the keeper contains a (`datanoe_id`, `region_id`) tuple.
pub fn contains(&self, datanode_id: DatanodeId, region_id: RegionId) -> bool {
let inner = self.inner.read().unwrap();
inner.contains(&(datanode_id, region_id))
}

/// Returns a set of filtered out regions that are opening.
pub fn filter_opening_regions(
&self,
datanode_id: DatanodeId,
mut region_ids: HashSet<RegionId>,
) -> HashSet<RegionId> {
let inner = self.inner.read().unwrap();
region_ids.retain(|region_id| !inner.contains(&(datanode_id, *region_id)));

region_ids
}

/// Returns number of element in tracking set.
pub fn len(&self) -> usize {
let inner = self.inner.read().unwrap();
inner.len()
}

pub fn is_empty(&self) -> bool {
self.len() == 0
}
}

#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::sync::Arc;

use common_meta::key::test_utils::new_test_table_info;
Expand All @@ -168,7 +245,7 @@ mod tests {
use store_api::storage::RegionId;
use table::metadata::RawTableInfo;

use super::RegionLeaseKeeper;
use super::{OpeningRegionKeeper, RegionLeaseKeeper};

fn new_test_keeper() -> RegionLeaseKeeper {
let store = Arc::new(MemoryKvBackend::new());
Expand Down Expand Up @@ -433,4 +510,34 @@ mod tests {
assert!(upgradable.is_empty());
assert!(closable.is_empty());
}

#[test]
fn test_opening_region_keeper() {
let keeper = OpeningRegionKeeper::new();

let guard = keeper.register(1, RegionId::from_u64(1)).unwrap();
assert!(keeper.register(1, RegionId::from_u64(1)).is_none());
let guard2 = keeper.register(1, RegionId::from_u64(2)).unwrap();

let output = keeper.filter_opening_regions(
1,
HashSet::from([
RegionId::from_u64(1),
RegionId::from_u64(2),
RegionId::from_u64(3),
]),
);
assert_eq!(output.len(), 1);
assert!(output.contains(&RegionId::from_u64(3)));

assert_eq!(keeper.len(), 2);
drop(guard);

assert_eq!(keeper.len(), 1);

assert!(keeper.contains(1, RegionId::from_u64(2)));
drop(guard2);

assert!(keeper.is_empty());
}
}

0 comments on commit b28af94

Please sign in to comment.