diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 044715b32381..1e0930f4efee 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -194,6 +194,13 @@ impl CreateTableProcedure { pub async fn on_datanode_create_regions(&mut self) -> Result { let table_route = self.table_route()?.clone(); let request_builder = self.new_region_request_builder(None)?; + // Registers opening regions + let guards = self + .creator + .register_opening_regions(&self.context, &table_route.region_routes)?; + if !guards.is_empty() { + self.creator.opening_regions = guards; + } self.create_regions(&table_route.region_routes, request_builder) .await } @@ -203,14 +210,6 @@ impl CreateTableProcedure { region_routes: &[RegionRoute], request_builder: CreateRequestBuilder, ) -> Result { - // Registers opening regions - let guards = self - .creator - .register_opening_regions(&self.context, region_routes)?; - if !guards.is_empty() { - self.creator.opening_regions = guards; - } - let create_table_data = &self.creator.data; // Safety: the region_wal_options must be allocated let region_wal_options = self.region_wal_options()?; diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 385f46818a47..5236a09767de 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -17,6 +17,7 @@ #![feature(async_closure)] #![feature(let_chains)] #![feature(extract_if)] +#![feature(hash_extract_if)] pub mod cache_invalidator; pub mod cluster; diff --git a/src/common/meta/src/region_keeper.rs b/src/common/meta/src/region_keeper.rs index 1002dcc6b8d0..53d163a07a44 100644 --- a/src/common/meta/src/region_keeper.rs +++ b/src/common/meta/src/region_keeper.rs @@ -83,16 +83,18 @@ impl MemoryRegionKeeper { inner.contains(&(datanode_id, region_id)) } - /// Returns a set of filtered out regions that are opening. - pub fn filter_opening_regions( + /// Extracts all operating regions from `region_ids` and returns operating regions. + pub fn extract_operating_regions( &self, datanode_id: DatanodeId, - mut region_ids: HashSet, + region_ids: &mut HashSet, ) -> HashSet { let inner = self.inner.read().unwrap(); - region_ids.retain(|region_id| !inner.contains(&(datanode_id, *region_id))); + let operating_regions = region_ids + .extract_if(|region_id| inner.contains(&(datanode_id, *region_id))) + .collect::>(); - region_ids + operating_regions } /// Returns number of element in tracking set. @@ -122,25 +124,23 @@ mod tests { assert!(keeper.register(1, RegionId::from_u64(1)).is_none()); let guard2 = keeper.register(1, RegionId::from_u64(2)).unwrap(); - let output = keeper.filter_opening_regions( - 1, - HashSet::from([ - RegionId::from_u64(1), - RegionId::from_u64(2), - RegionId::from_u64(3), - ]), - ); - assert_eq!(output.len(), 1); - assert!(output.contains(&RegionId::from_u64(3))); + let mut regions = HashSet::from([ + RegionId::from_u64(1), + RegionId::from_u64(2), + RegionId::from_u64(3), + ]); + let output = keeper.extract_operating_regions(1, &mut regions); + assert_eq!(output.len(), 2); + assert!(output.contains(&RegionId::from_u64(1))); + assert!(output.contains(&RegionId::from_u64(2))); assert_eq!(keeper.len(), 2); - drop(guard); + drop(guard); assert_eq!(keeper.len(), 1); - assert!(keeper.contains(1, RegionId::from_u64(2))); - drop(guard2); + drop(guard2); assert!(keeper.is_empty()); } } diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs index 3e15360e8d7e..a1c42fe49651 100644 --- a/src/meta-srv/src/region/lease_keeper.rs +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -90,19 +90,7 @@ fn renew_region_lease_via_region_route( } impl RegionLeaseKeeper { - fn collect_tables(&self, datanode_regions: &[RegionId]) -> HashMap> { - let mut tables = HashMap::>::new(); - - // Group by `table_id`. - for region_id in datanode_regions.iter() { - let table = tables.entry(region_id.table_id()).or_default(); - table.push(*region_id); - } - - tables - } - - async fn collect_tables_metadata( + async fn collect_table_metadata( &self, table_ids: &[TableId], ) -> Result> { @@ -131,12 +119,12 @@ impl RegionLeaseKeeper { fn renew_region_lease( &self, table_metadata: &HashMap, + operating_regions: &HashSet, datanode_id: DatanodeId, region_id: RegionId, role: RegionRole, ) -> Option<(RegionId, RegionRole)> { - // Renews the lease if it's a opening region or deleting region. - if self.memory_region_keeper.contains(datanode_id, region_id) { + if operating_regions.contains(®ion_id) { return Some((region_id, role)); } @@ -152,6 +140,23 @@ impl RegionLeaseKeeper { None } + async fn collect_metadata( + &self, + datanode_id: DatanodeId, + mut region_ids: HashSet, + ) -> Result<(HashMap, HashSet)> { + // Filters out operating region first, improves the cache hit rate(reduce expensive remote fetches). + let operating_regions = self + .memory_region_keeper + .extract_operating_regions(datanode_id, &mut region_ids); + let table_ids = region_ids + .into_iter() + .map(|region_id| region_id.table_id()) + .collect::>(); + let table_metadata = self.collect_table_metadata(&table_ids).await?; + Ok((table_metadata, operating_regions)) + } + /// Renews the lease of regions for specific datanode. /// /// The lease of regions will be renewed if: @@ -169,16 +174,20 @@ impl RegionLeaseKeeper { let region_ids = regions .iter() .map(|(region_id, _)| *region_id) - .collect::>(); - let tables = self.collect_tables(®ion_ids); - let table_ids = tables.keys().copied().collect::>(); - let table_metadata = self.collect_tables_metadata(&table_ids).await?; - + .collect::>(); + let (table_metadata, operating_regions) = + self.collect_metadata(datanode_id, region_ids).await?; let mut renewed = HashMap::new(); let mut non_exists = HashSet::new(); for &(region, role) in regions { - match self.renew_region_lease(&table_metadata, datanode_id, region, role) { + match self.renew_region_lease( + &table_metadata, + &operating_regions, + datanode_id, + region, + role, + ) { Some((region, renewed_role)) => { renewed.insert(region, renewed_role); } @@ -290,6 +299,58 @@ mod tests { ); } + #[tokio::test] + async fn test_collect_metadata() { + let region_number = 1u32; + let table_id = 1024; + let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into(); + + let region_id = RegionId::new(table_id, 1); + let leader_peer_id = 1024; + let follower_peer_id = 2048; + let region_route = RegionRouteBuilder::default() + .region(Region::new_test(region_id)) + .leader_peer(Peer::empty(leader_peer_id)) + .follower_peers(vec![Peer::empty(follower_peer_id)]) + .build() + .unwrap(); + + let keeper = new_test_keeper(); + let table_metadata_manager = keeper.table_metadata_manager(); + table_metadata_manager + .create_table_metadata( + table_info, + TableRouteValue::physical(vec![region_route]), + HashMap::default(), + ) + .await + .unwrap(); + let opening_region_id = RegionId::new(1025, 1); + let _guard = keeper + .memory_region_keeper + .register(leader_peer_id, opening_region_id) + .unwrap(); + let another_opening_region_id = RegionId::new(1025, 2); + let _guard2 = keeper + .memory_region_keeper + .register(follower_peer_id, another_opening_region_id) + .unwrap(); + + let (metadata, regions) = keeper + .collect_metadata( + leader_peer_id, + HashSet::from([region_id, opening_region_id]), + ) + .await + .unwrap(); + assert_eq!( + metadata.keys().cloned().collect::>(), + vec![region_id.table_id()] + ); + assert!(regions.contains(&opening_region_id)); + assert_eq!(regions.len(), 1); + } + #[tokio::test] async fn test_renew_region_leases_basic() { let region_number = 1u32;