Skip to content

Commit

Permalink
chore: apply suggestions from CR
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Oct 24, 2023
1 parent 4e1778e commit bce0879
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 40 deletions.
34 changes: 34 additions & 0 deletions src/common/meta/src/key/table_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::fmt::Display;

use serde::{Deserialize, Serialize};
Expand All @@ -23,6 +24,7 @@ use crate::key::{to_removed_key, RegionDistribution, TableMetaKey, TABLE_ROUTE_P
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse};
use crate::kv_backend::KvBackendRef;
use crate::rpc::router::{region_distribution, RegionRoute};
use crate::rpc::store::BatchGetRequest;

pub struct TableRouteKey {
pub table_id: TableId,
Expand Down Expand Up @@ -197,6 +199,38 @@ impl TableRouteManager {
.transpose()
}

/// It may return a subset of the `table_ids`.
pub async fn batch_get(
&self,
table_ids: &[TableId],
) -> Result<HashMap<TableId, TableRouteValue>> {
let lookup_table = table_ids
.iter()
.map(|id| (TableRouteKey::new(*id).as_raw_key(), id))
.collect::<HashMap<_, _>>();

let resp = self
.kv_backend
.batch_get(BatchGetRequest {
keys: lookup_table.keys().cloned().collect::<Vec<_>>(),
})
.await?;

let values = resp
.kvs
.iter()
.map(|kv| {
Ok((
// Safety: must exist.
**lookup_table.get(kv.key()).unwrap(),
TableRouteValue::try_from_raw_value(&kv.value)?,
))
})
.collect::<Result<HashMap<_, _>>>()?;

Ok(values)
}

#[cfg(test)]
pub async fn get_removed(
&self,
Expand Down
17 changes: 4 additions & 13 deletions src/common/meta/src/rpc/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,28 +138,19 @@ pub fn convert_to_region_followers_map(

/// Returns the HashMap<[RegionNumber], HashSet<DatanodeId>>.
pub fn convert_to_region_peers_map(region_routes: &[RegionRoute]) -> HashMap<u32, HashSet<u64>> {
let mut set = region_routes
region_routes
.iter()
.map(|x| {
(
x.region.id.region_number(),
x.follower_peers
.iter()
.map(|peer| (peer.id))
.chain(&x.leader_peer)
.map(|peer| peer.id)
.collect::<HashSet<u64>>(),
)
})
.collect::<HashMap<_, _>>();

for route in region_routes {
if let Some(peer) = &route.leader_peer {
let entry = set.entry(route.region.id.region_number()).or_default();

entry.insert(peer.id);
}
}

set
.collect::<HashMap<_, _>>()
}

pub fn find_region_leader(region_routes: &[RegionRoute], region_number: u32) -> Option<&Peer> {
Expand Down
1 change: 0 additions & 1 deletion src/meta-srv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#![feature(async_closure)]
#![feature(result_flattening)]
#![feature(extract_if)]

pub mod bootstrap;
mod cache_invalidator;
Expand Down
30 changes: 10 additions & 20 deletions src/meta-srv/src/region/lease_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::collections::{HashMap, HashSet};

use common_catalog::consts::{FILE_ENGINE, MITO2_ENGINE};
use common_meta::key::TableMetadataManagerRef;
use futures::future::try_join_all;
use snafu::ResultExt;
use store_api::region_engine::RegionRole;
use store_api::storage::{RegionId, TableId};
Expand Down Expand Up @@ -82,26 +81,19 @@ impl RegionLeaseKeeper {
table.push((*region_id, *role));
}

let table_ids: Vec<TableId> = tables.keys().cloned().collect::<Vec<_>>();
let metadata = try_join_all(
table_ids
.iter()
.map(|table_id| table_route_manager.get(*table_id)),
)
.await
.context(error::TableMetadataManagerSnafu)?;

// All tables' metadata.
let tables_metadata = table_ids
.into_iter()
.zip(metadata)
.collect::<HashMap<TableId, Option<_>>>();
let table_ids = tables.keys().cloned().collect::<Vec<_>>();

// The subset of all table metadata.
let metadata_subset = table_route_manager
.batch_get(&table_ids)
.await
.context(error::TableMetadataManagerSnafu)?;

let mut inactive_regions = HashSet::new();

// Removes inactive regions.
for (table_id, metadata) in tables_metadata {
if let Some(metadata) = metadata {
for table_id in table_ids {
if let Some(metadata) = metadata_subset.get(&table_id) {
// Safety: Value must exist.
let regions = tables.get_mut(&table_id).unwrap();
let region_routes = &metadata.region_routes;
Expand All @@ -120,9 +112,7 @@ impl RegionLeaseKeeper {
}
}

let _ = datanode_regions
.extract_if(|(region_id, _)| inactive_regions.contains(region_id))
.collect::<Vec<_>>();
datanode_regions.retain(|(region_id, _)| !inactive_regions.contains(region_id));

Ok(inactive_regions)
}
Expand Down
5 changes: 2 additions & 3 deletions src/meta-srv/src/region/lease_keeper/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ pub fn retain_active_regions(
})
.collect::<HashSet<_>>();

let _ = datanode_regions
.extract_if(|(region_id, _)| inactive_region_ids.contains(region_id))
.collect::<Vec<_>>();
datanode_regions.retain(|(region_id, _)| !inactive_region_ids.contains(region_id));

inactive_region_ids
}

Expand Down
4 changes: 1 addition & 3 deletions src/meta-srv/src/region/lease_keeper/mito.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ pub fn retain_active_regions(
})
.collect::<HashSet<_>>();

let _ = datanode_regions
.extract_if(|(region_id, _)| inactive_region_ids.contains(region_id))
.collect::<Vec<_>>();
datanode_regions.retain(|(region_id, _)| !inactive_region_ids.contains(region_id));

inactive_region_ids
}
Expand Down

0 comments on commit bce0879

Please sign in to comment.