diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 24827ef40f17..7ef74713c892 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -24,21 +24,27 @@ use store_api::storage::RegionId; use crate::error::Result; use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::Context; -use crate::region::lease_keeper::RegionLeaseKeeperRef; +use crate::region::lease_keeper::{OpeningRegionKeeperRef, RegionLeaseKeeperRef}; use crate::region::RegionLeaseKeeper; pub struct RegionLeaseHandler { region_lease_seconds: u64, region_lease_keeper: RegionLeaseKeeperRef, + opening_region_keeper: OpeningRegionKeeperRef, } impl RegionLeaseHandler { - pub fn new(region_lease_seconds: u64, table_metadata_manager: TableMetadataManagerRef) -> Self { + pub fn new( + region_lease_seconds: u64, + table_metadata_manager: TableMetadataManagerRef, + opening_region_keeper: OpeningRegionKeeperRef, + ) -> Self { let region_lease_keeper = RegionLeaseKeeper::new(table_metadata_manager); Self { region_lease_seconds, region_lease_keeper: Arc::new(region_lease_keeper), + opening_region_keeper, } } } @@ -124,6 +130,11 @@ impl HeartbeatHandler for RegionLeaseHandler { .find_staled_follower_regions(cluster_id, datanode_id, &followers) .await?; + // If a region is opening, it will be filtered out from the closable regions set. + let closable = self + .opening_region_keeper + .filter_opening_regions(datanode_id, closable); + grant( &mut granted_regions, &upgradeable, @@ -161,6 +172,7 @@ mod test { use super::*; use crate::handler::node_stat::{RegionStat, Stat}; use crate::metasrv::builder::MetaSrvBuilder; + use crate::region::lease_keeper::OpeningRegionKeeper; fn new_test_keeper() -> RegionLeaseKeeper { let store = Arc::new(MemoryKvBackend::new()); @@ -230,9 +242,12 @@ mod test { ..Default::default() }; + let opening_region_keeper = Arc::new(OpeningRegionKeeper::default()); + let handler = RegionLeaseHandler::new( distributed_time_constants::REGION_LEASE_SECS, table_metadata_manager.clone(), + opening_region_keeper.clone(), ); handler.handle(&req, ctx, acc).await.unwrap(); @@ -262,6 +277,39 @@ mod test { acc, vec![GrantedRegion::new(region_id, RegionRole::Follower)], ); + + let opening_region_id = RegionId::new(table_id, region_number + 2); + let _guard = opening_region_keeper + .register(follower_peer.id, opening_region_id) + .unwrap(); + + let acc = &mut HeartbeatAccumulator::default(); + + acc.stat = Some(Stat { + cluster_id, + id: follower_peer.id, + region_stats: vec![ + new_empty_region_stat(region_id, RegionRole::Follower), + new_empty_region_stat(another_region_id, RegionRole::Follower), + new_empty_region_stat(opening_region_id, RegionRole::Follower), + ], + ..Default::default() + }); + + handler.handle(&req, ctx, acc).await.unwrap(); + + assert_eq!( + acc.region_lease.as_ref().unwrap().lease_seconds, + distributed_time_constants::REGION_LEASE_SECS + ); + + assert_region_lease( + acc, + vec![ + GrantedRegion::new(region_id, RegionRole::Follower), + GrantedRegion::new(opening_region_id, RegionRole::Follower), + ], + ); } #[tokio::test] @@ -325,6 +373,7 @@ mod test { let handler = RegionLeaseHandler::new( distributed_time_constants::REGION_LEASE_SECS, table_metadata_manager.clone(), + Default::default(), ); handler.handle(&req, ctx, acc).await.unwrap(); diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index c7fb8c7529eb..0db67bd0b1e2 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -46,6 +46,7 @@ use crate::failure_detector::PhiAccrualFailureDetectorOptions; use crate::handler::HeartbeatHandlerGroup; use crate::lock::DistLockRef; use crate::pubsub::{PublishRef, SubscribeManagerRef}; +use crate::region::lease_keeper::OpeningRegionKeeperRef; use crate::selector::{Selector, SelectorType}; use crate::service::mailbox::MailboxRef; use crate::service::store::cached_kv::LeaderCachedKvBackend; @@ -241,6 +242,7 @@ pub struct MetaSrv { mailbox: MailboxRef, ddl_executor: DdlTaskExecutorRef, table_metadata_manager: TableMetadataManagerRef, + opening_region_keeper: OpeningRegionKeeperRef, greptimedb_telemetry_task: Arc, plugins: Plugins, @@ -403,6 +405,10 @@ impl MetaSrv { &self.table_metadata_manager } + pub fn opening_region_keeper(&self) -> &OpeningRegionKeeperRef { + &self.opening_region_keeper + } + pub fn publish(&self) -> Option { self.plugins.get::() } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 12550f34f60e..92b259c10cdd 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -52,6 +52,7 @@ use crate::metasrv::{ }; use crate::procedure::region_failover::RegionFailoverManager; use crate::pubsub::PublishRef; +use crate::region::lease_keeper::OpeningRegionKeeper; use crate::selector::lease_based::LeaseBasedSelector; use crate::service::mailbox::MailboxRef; use crate::service::store::cached_kv::{CheckLeader, LeaderCachedKvBackend}; @@ -197,6 +198,7 @@ impl MetaSrvBuilder { &table_id_sequence, ); let _ = ddl_manager.try_start(); + let opening_region_keeper = Arc::new(OpeningRegionKeeper::default()); let handler_group = match handler_group { Some(handler_group) => handler_group, @@ -231,6 +233,7 @@ impl MetaSrvBuilder { let region_lease_handler = RegionLeaseHandler::new( distributed_time_constants::REGION_LEASE_SECS, table_metadata_manager.clone(), + opening_region_keeper.clone(), ); let group = HeartbeatHandlerGroup::new(pushers); @@ -283,6 +286,7 @@ impl MetaSrvBuilder { ) .await, plugins: plugins.unwrap_or_else(Plugins::default), + opening_region_keeper, }) } } diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs index 76f5f57dd866..993f40c08181 100644 --- a/src/meta-srv/src/region/lease_keeper.rs +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -16,10 +16,11 @@ pub mod mito; pub mod utils; use std::collections::{HashMap, HashSet}; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use common_meta::key::table_route::TableRouteValue; use common_meta::key::TableMetadataManagerRef; +use common_meta::DatanodeId; use snafu::ResultExt; use store_api::storage::{RegionId, TableId}; @@ -156,8 +157,84 @@ impl RegionLeaseKeeper { } } +#[derive(Debug, Clone)] +pub struct OpeningRegionGuard { + datanode_id: DatanodeId, + region_id: RegionId, + inner: Arc>>, +} + +impl Drop for OpeningRegionGuard { + fn drop(&mut self) { + let mut inner = self.inner.write().unwrap(); + inner.remove(&(self.datanode_id, self.region_id)); + } +} + +pub type OpeningRegionKeeperRef = Arc; + +#[derive(Debug, Clone, Default)] +/// Tracks opening regions. +pub struct OpeningRegionKeeper { + inner: Arc>>, +} + +impl OpeningRegionKeeper { + pub fn new() -> Self { + Default::default() + } + + /// Returns [OpeningRegionGuard] if Region(`region_id`) on Peer(`datanode_id`) does not exist. + pub fn register( + &self, + datanode_id: DatanodeId, + region_id: RegionId, + ) -> Option { + let mut inner = self.inner.write().unwrap(); + + if inner.insert((datanode_id, region_id)) { + Some(OpeningRegionGuard { + datanode_id, + region_id, + inner: self.inner.clone(), + }) + } else { + None + } + } + + /// Returns true if the keeper contains a (`datanoe_id`, `region_id`) tuple. + pub fn contains(&self, datanode_id: DatanodeId, region_id: RegionId) -> bool { + let inner = self.inner.read().unwrap(); + inner.contains(&(datanode_id, region_id)) + } + + /// Returns a set of filtered out regions that are opening. + pub fn filter_opening_regions( + &self, + datanode_id: DatanodeId, + mut region_ids: HashSet, + ) -> HashSet { + let inner = self.inner.read().unwrap(); + region_ids.retain(|region_id| !inner.contains(&(datanode_id, *region_id))); + + region_ids + } + + /// Returns number of element in tracking set. + pub fn len(&self) -> usize { + let inner = self.inner.read().unwrap(); + inner.len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + #[cfg(test)] mod tests { + use std::collections::HashSet; use std::sync::Arc; use common_meta::key::test_utils::new_test_table_info; @@ -168,7 +245,7 @@ mod tests { use store_api::storage::RegionId; use table::metadata::RawTableInfo; - use super::RegionLeaseKeeper; + use super::{OpeningRegionKeeper, RegionLeaseKeeper}; fn new_test_keeper() -> RegionLeaseKeeper { let store = Arc::new(MemoryKvBackend::new()); @@ -433,4 +510,34 @@ mod tests { assert!(upgradable.is_empty()); assert!(closable.is_empty()); } + + #[test] + fn test_opening_region_keeper() { + let keeper = OpeningRegionKeeper::new(); + + let guard = keeper.register(1, RegionId::from_u64(1)).unwrap(); + assert!(keeper.register(1, RegionId::from_u64(1)).is_none()); + let guard2 = keeper.register(1, RegionId::from_u64(2)).unwrap(); + + let output = keeper.filter_opening_regions( + 1, + HashSet::from([ + RegionId::from_u64(1), + RegionId::from_u64(2), + RegionId::from_u64(3), + ]), + ); + assert_eq!(output.len(), 1); + assert!(output.contains(&RegionId::from_u64(3))); + + assert_eq!(keeper.len(), 2); + drop(guard); + + assert_eq!(keeper.len(), 1); + + assert!(keeper.contains(1, RegionId::from_u64(2))); + drop(guard2); + + assert!(keeper.is_empty()); + } }