Skip to content

Commit

Permalink
fix: retrieve all info instead of checking on demand (#3846)
Browse files Browse the repository at this point in the history
* fix: renew region lease bug

* refactor: only register regions once

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored May 6, 2024
1 parent 7d447c2 commit 6e12e1b
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 47 deletions.
15 changes: 7 additions & 8 deletions src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,13 @@ impl CreateTableProcedure {
pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
let table_route = self.table_route()?.clone();
let request_builder = self.new_region_request_builder(None)?;
// Registers opening regions
let guards = self
.creator
.register_opening_regions(&self.context, &table_route.region_routes)?;
if !guards.is_empty() {
self.creator.opening_regions = guards;
}
self.create_regions(&table_route.region_routes, request_builder)
.await
}
Expand All @@ -203,14 +210,6 @@ impl CreateTableProcedure {
region_routes: &[RegionRoute],
request_builder: CreateRequestBuilder,
) -> Result<Status> {
// Registers opening regions
let guards = self
.creator
.register_opening_regions(&self.context, region_routes)?;
if !guards.is_empty() {
self.creator.opening_regions = guards;
}

let create_table_data = &self.creator.data;
// Safety: the region_wal_options must be allocated
let region_wal_options = self.region_wal_options()?;
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#![feature(async_closure)]
#![feature(let_chains)]
#![feature(extract_if)]
#![feature(hash_extract_if)]

pub mod cache_invalidator;
pub mod cluster;
Expand Down
36 changes: 18 additions & 18 deletions src/common/meta/src/region_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,18 @@ impl MemoryRegionKeeper {
inner.contains(&(datanode_id, region_id))
}

/// Returns a set of filtered out regions that are opening.
pub fn filter_opening_regions(
/// Extracts all operating regions from `region_ids` and returns operating regions.
pub fn extract_operating_regions(
&self,
datanode_id: DatanodeId,
mut region_ids: HashSet<RegionId>,
region_ids: &mut HashSet<RegionId>,
) -> HashSet<RegionId> {
let inner = self.inner.read().unwrap();
region_ids.retain(|region_id| !inner.contains(&(datanode_id, *region_id)));
let operating_regions = region_ids
.extract_if(|region_id| inner.contains(&(datanode_id, *region_id)))
.collect::<HashSet<_>>();

region_ids
operating_regions
}

/// Returns number of element in tracking set.
Expand Down Expand Up @@ -122,25 +124,23 @@ mod tests {
assert!(keeper.register(1, RegionId::from_u64(1)).is_none());
let guard2 = keeper.register(1, RegionId::from_u64(2)).unwrap();

let output = keeper.filter_opening_regions(
1,
HashSet::from([
RegionId::from_u64(1),
RegionId::from_u64(2),
RegionId::from_u64(3),
]),
);
assert_eq!(output.len(), 1);
assert!(output.contains(&RegionId::from_u64(3)));
let mut regions = HashSet::from([
RegionId::from_u64(1),
RegionId::from_u64(2),
RegionId::from_u64(3),
]);
let output = keeper.extract_operating_regions(1, &mut regions);
assert_eq!(output.len(), 2);

assert!(output.contains(&RegionId::from_u64(1)));
assert!(output.contains(&RegionId::from_u64(2)));
assert_eq!(keeper.len(), 2);
drop(guard);

drop(guard);
assert_eq!(keeper.len(), 1);

assert!(keeper.contains(1, RegionId::from_u64(2)));
drop(guard2);

drop(guard2);
assert!(keeper.is_empty());
}
}
103 changes: 82 additions & 21 deletions src/meta-srv/src/region/lease_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,7 @@ fn renew_region_lease_via_region_route(
}

impl RegionLeaseKeeper {
fn collect_tables(&self, datanode_regions: &[RegionId]) -> HashMap<TableId, Vec<RegionId>> {
let mut tables = HashMap::<TableId, Vec<RegionId>>::new();

// Group by `table_id`.
for region_id in datanode_regions.iter() {
let table = tables.entry(region_id.table_id()).or_default();
table.push(*region_id);
}

tables
}

async fn collect_tables_metadata(
async fn collect_table_metadata(
&self,
table_ids: &[TableId],
) -> Result<HashMap<TableId, TableRouteValue>> {
Expand Down Expand Up @@ -131,12 +119,12 @@ impl RegionLeaseKeeper {
fn renew_region_lease(
&self,
table_metadata: &HashMap<TableId, TableRouteValue>,
operating_regions: &HashSet<RegionId>,
datanode_id: DatanodeId,
region_id: RegionId,
role: RegionRole,
) -> Option<(RegionId, RegionRole)> {
// Renews the lease if it's a opening region or deleting region.
if self.memory_region_keeper.contains(datanode_id, region_id) {
if operating_regions.contains(&region_id) {
return Some((region_id, role));
}

Expand All @@ -152,6 +140,23 @@ impl RegionLeaseKeeper {
None
}

async fn collect_metadata(
&self,
datanode_id: DatanodeId,
mut region_ids: HashSet<RegionId>,
) -> Result<(HashMap<TableId, TableRouteValue>, HashSet<RegionId>)> {
// Filters out operating region first, improves the cache hit rate(reduce expensive remote fetches).
let operating_regions = self
.memory_region_keeper
.extract_operating_regions(datanode_id, &mut region_ids);
let table_ids = region_ids
.into_iter()
.map(|region_id| region_id.table_id())
.collect::<Vec<_>>();
let table_metadata = self.collect_table_metadata(&table_ids).await?;
Ok((table_metadata, operating_regions))
}

/// Renews the lease of regions for specific datanode.
///
/// The lease of regions will be renewed if:
Expand All @@ -169,16 +174,20 @@ impl RegionLeaseKeeper {
let region_ids = regions
.iter()
.map(|(region_id, _)| *region_id)
.collect::<Vec<_>>();
let tables = self.collect_tables(&region_ids);
let table_ids = tables.keys().copied().collect::<Vec<_>>();
let table_metadata = self.collect_tables_metadata(&table_ids).await?;

.collect::<HashSet<_>>();
let (table_metadata, operating_regions) =
self.collect_metadata(datanode_id, region_ids).await?;
let mut renewed = HashMap::new();
let mut non_exists = HashSet::new();

for &(region, role) in regions {
match self.renew_region_lease(&table_metadata, datanode_id, region, role) {
match self.renew_region_lease(
&table_metadata,
&operating_regions,
datanode_id,
region,
role,
) {
Some((region, renewed_role)) => {
renewed.insert(region, renewed_role);
}
Expand Down Expand Up @@ -290,6 +299,58 @@ mod tests {
);
}

#[tokio::test]
async fn test_collect_metadata() {
let region_number = 1u32;
let table_id = 1024;
let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into();

let region_id = RegionId::new(table_id, 1);
let leader_peer_id = 1024;
let follower_peer_id = 2048;
let region_route = RegionRouteBuilder::default()
.region(Region::new_test(region_id))
.leader_peer(Peer::empty(leader_peer_id))
.follower_peers(vec![Peer::empty(follower_peer_id)])
.build()
.unwrap();

let keeper = new_test_keeper();
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(
table_info,
TableRouteValue::physical(vec![region_route]),
HashMap::default(),
)
.await
.unwrap();
let opening_region_id = RegionId::new(1025, 1);
let _guard = keeper
.memory_region_keeper
.register(leader_peer_id, opening_region_id)
.unwrap();
let another_opening_region_id = RegionId::new(1025, 2);
let _guard2 = keeper
.memory_region_keeper
.register(follower_peer_id, another_opening_region_id)
.unwrap();

let (metadata, regions) = keeper
.collect_metadata(
leader_peer_id,
HashSet::from([region_id, opening_region_id]),
)
.await
.unwrap();
assert_eq!(
metadata.keys().cloned().collect::<Vec<_>>(),
vec![region_id.table_id()]
);
assert!(regions.contains(&opening_region_id));
assert_eq!(regions.len(), 1);
}

#[tokio::test]
async fn test_renew_region_leases_basic() {
let region_number = 1u32;
Expand Down

0 comments on commit 6e12e1b

Please sign in to comment.