diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index a48e46913173..5d3f0e447ce8 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -45,6 +45,7 @@ use crate::error::{ }; use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; +use crate::key::table_route::TableRouteValue; use crate::key::DeserializedValueWithBytes; use crate::metrics; use crate::rpc::ddl::AlterTableTask; @@ -184,7 +185,7 @@ impl AlterTableProcedure { let table_id = self.data.table_id(); let table_ref = self.data.table_ref(); - let table_route = self + let TableRouteValue { region_routes, .. } = self .context .table_metadata_manager .table_route_manager() @@ -194,14 +195,13 @@ impl AlterTableProcedure { table_name: table_ref.to_string(), })? .into_inner(); - let region_routes = table_route.region_routes(); - let leaders = find_leaders(region_routes); + let leaders = find_leaders(®ion_routes); let mut alter_region_tasks = Vec::with_capacity(leaders.len()); for datanode in leaders { let requester = self.context.datanode_manager.datanode(&datanode).await; - let regions = find_leader_regions(region_routes, &datanode); + let regions = find_leader_regions(®ion_routes, &datanode); for region in regions { let region_id = RegionId::new(table_id, region); diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index 94c6cdf0a06a..6076e6125294 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -307,7 +307,7 @@ impl DropTableData { } fn region_routes(&self) -> &Vec { - self.table_route_value.region_routes() + &self.table_route_value.region_routes } fn table_info(&self) -> &RawTableInfo { diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 471de7ac852f..b1821047370e 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -279,7 +279,7 @@ async fn handle_truncate_table_task( table_name: table_ref.to_string(), })?; - let table_route = table_route_value.into_inner().region_routes().clone(); + let table_route = table_route_value.into_inner().region_routes; let id = ddl_manager .submit_truncate_table_task( diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index d0e24c309b2e..d86880d9b339 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -478,7 +478,7 @@ impl TableMetadataManager { .build_delete_txn(table_id, table_info_value)?; // Deletes datanode table key value pairs. - let distribution = region_distribution(table_route_value.region_routes())?; + let distribution = region_distribution(&table_route_value.region_routes)?; let delete_datanode_txn = self .datanode_table_manager() .build_delete_txn(table_id, distribution)?; @@ -603,7 +603,7 @@ impl TableMetadataManager { ) -> Result<()> { // Updates the datanode table key value pairs. let current_region_distribution = - region_distribution(current_table_route_value.region_routes())?; + region_distribution(¤t_table_route_value.region_routes)?; let new_region_distribution = region_distribution(&new_region_routes)?; let update_datanode_table_txn = self.datanode_table_manager().build_update_txn( @@ -651,7 +651,7 @@ impl TableMetadataManager { where F: Fn(&RegionRoute) -> Option>, { - let mut new_region_routes = current_table_route_value.region_routes().clone(); + let mut new_region_routes = current_table_route_value.region_routes.clone(); let mut updated = 0; for route in &mut new_region_routes { @@ -836,7 +836,7 @@ mod tests { let mem_kv = Arc::new(MemoryKvBackend::default()); let table_metadata_manager = TableMetadataManager::new(mem_kv); let region_route = new_test_region_route(); - let region_routes = &vec![region_route.clone()]; + let region_routes = vec![region_route.clone()]; let table_info: RawTableInfo = new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into(); // creates metadata. @@ -879,7 +879,7 @@ mod tests { table_info ); assert_eq!( - remote_table_route.unwrap().into_inner().region_routes(), + remote_table_route.unwrap().into_inner().region_routes, region_routes ); } @@ -889,7 +889,7 @@ mod tests { let mem_kv = Arc::new(MemoryKvBackend::default()); let table_metadata_manager = TableMetadataManager::new(mem_kv); let region_route = new_test_region_route(); - let region_routes = &vec![region_route.clone()]; + let region_routes = vec![region_route.clone()]; let table_info: RawTableInfo = new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into(); let table_id = table_info.ident.table_id; @@ -960,7 +960,7 @@ mod tests { .unwrap() .unwrap() .into_inner(); - assert_eq!(removed_table_route.region_routes(), region_routes); + assert_eq!(removed_table_route.region_routes, region_routes); } #[tokio::test] @@ -1154,11 +1154,11 @@ mod tests { .unwrap(); assert_eq!( - updated_route_value.region_routes()[0].leader_status, + updated_route_value.region_routes[0].leader_status, Some(RegionStatus::Downgraded) ); assert_eq!( - updated_route_value.region_routes()[1].leader_status, + updated_route_value.region_routes[1].leader_status, Some(RegionStatus::Downgraded) ); } diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index 852c17937c34..231c71ccba92 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -38,70 +38,42 @@ impl TableRouteKey { } #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] -pub enum TableRouteValue { - Physical(PhysicalTableRouteValue), - Logical(LogicalTableRouteValue), -} - -#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] -pub struct PhysicalTableRouteValue { +pub struct TableRouteValue { pub region_routes: Vec, version: u64, } -#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] -pub struct LogicalTableRouteValue { - // TODO(LFC): Add table route for MetricsEngine table. -} - impl TableRouteValue { pub fn new(region_routes: Vec) -> Self { - Self::Physical(PhysicalTableRouteValue { + Self { region_routes, version: 0, - }) + } } /// Returns a new version [TableRouteValue] with `region_routes`. pub fn update(&self, region_routes: Vec) -> Self { - let version = self.physical_table_route().version; - Self::Physical(PhysicalTableRouteValue { + Self { region_routes, - version: version + 1, - }) + version: self.version + 1, + } } /// Returns the version. /// /// For test purpose. - #[cfg(any(test, feature = "testing"))] + #[cfg(any(tets, feature = "testing"))] pub fn version(&self) -> u64 { - self.physical_table_route().version + self.version } /// Returns the corresponding [RegionRoute]. pub fn region_route(&self, region_id: RegionId) -> Option { - self.physical_table_route() - .region_routes + self.region_routes .iter() .find(|route| route.region.id == region_id) .cloned() } - - /// Gets the [RegionRoute]s of this [TableRouteValue::Physical]. - /// - /// # Panics - /// The route type is not the [TableRouteValue::Physical]. - pub fn region_routes(&self) -> &Vec { - &self.physical_table_route().region_routes - } - - fn physical_table_route(&self) -> &PhysicalTableRouteValue { - match self { - TableRouteValue::Physical(x) => x, - _ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"), - } - } } impl TableMetaKey for TableRouteKey { @@ -297,7 +269,7 @@ impl TableRouteManager { ) -> Result> { self.get(table_id) .await? - .map(|table_route| region_distribution(table_route.region_routes())) + .map(|table_route| region_distribution(&table_route.into_inner().region_routes)) .transpose() } } diff --git a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs index c2d06590aec2..84466eb19928 100644 --- a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs @@ -207,7 +207,7 @@ mod tests { .unwrap(); let should_downgraded = table_route_value - .region_routes() + .region_routes .iter() .find(|route| route.region.id.region_number() == failed_region.region_number) .unwrap(); diff --git a/src/meta-srv/src/procedure/region_failover/update_metadata.rs b/src/meta-srv/src/procedure/region_failover/update_metadata.rs index 23ade1a2a1fe..505f1cb55a51 100644 --- a/src/meta-srv/src/procedure/region_failover/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_failover/update_metadata.rs @@ -85,7 +85,7 @@ impl UpdateRegionMetadata { .context(error::TableMetadataManagerSnafu)? .context(TableRouteNotFoundSnafu { table_id })?; - let mut new_region_routes = table_route_value.region_routes().clone(); + let mut new_region_routes = table_route_value.region_routes.clone(); for region_route in new_region_routes.iter_mut() { if region_route.region.id.region_number() == failed_region.region_number { @@ -233,8 +233,7 @@ mod tests { .unwrap() .unwrap() .into_inner() - .region_routes() - .clone() + .region_routes } // Original region routes: @@ -396,8 +395,8 @@ mod tests { .unwrap() .into_inner(); - let peers = &extract_all_peers(table_route_value.region_routes()); - let actual = table_route_value.region_routes(); + let peers = &extract_all_peers(&table_route_value.region_routes); + let actual = &table_route_value.region_routes; let expected = &vec![ new_region_route(1, peers, 2), new_region_route(2, peers, 3), @@ -416,7 +415,7 @@ mod tests { .unwrap() .into_inner(); - let map = region_distribution(table_route_value.region_routes()).unwrap(); + let map = region_distribution(&table_route_value.region_routes).unwrap(); assert_eq!(map.len(), 2); assert_eq!(map.get(&2), Some(&vec![1, 3])); assert_eq!(map.get(&3), Some(&vec![2, 4])); diff --git a/src/meta-srv/src/procedure/region_migration/migration_start.rs b/src/meta-srv/src/procedure/region_migration/migration_start.rs index cd9b5bad5a5d..3ef5d46c6595 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_start.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_start.rs @@ -84,7 +84,7 @@ impl RegionMigrationStart { let table_route = ctx.get_table_route_value().await?; let region_route = table_route - .region_routes() + .region_routes .iter() .find(|route| route.region.id == region_id) .cloned() diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index 6496c18ee516..1c95a2d393a7 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -377,7 +377,7 @@ impl ProcedureMigrationTestSuite { /// Verifies table metadata after region migration. pub(crate) async fn verify_table_metadata(&self) { let region_id = self.context.persistent_ctx.region_id; - let table_route = self + let region_routes = self .env .table_metadata_manager .table_route_manager() @@ -385,25 +385,22 @@ impl ProcedureMigrationTestSuite { .await .unwrap() .unwrap() - .into_inner(); - let region_routes = table_route.region_routes(); + .into_inner() + .region_routes; let expected_leader_id = self.context.persistent_ctx.to_peer.id; let removed_follower_id = self.context.persistent_ctx.from_peer.id; let region_route = region_routes - .iter() + .into_iter() .find(|route| route.region.id == region_id) .unwrap(); assert!(!region_route.is_leader_downgraded()); - assert_eq!( - region_route.leader_peer.as_ref().unwrap().id, - expected_leader_id - ); + assert_eq!(region_route.leader_peer.unwrap().id, expected_leader_id); assert!(!region_route .follower_peers - .iter() + .into_iter() .any(|route| route.id == removed_follower_id)) } } diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs index 05dbb1935f19..7deaddb5c27b 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs @@ -212,7 +212,7 @@ mod tests { // It should remain unchanged. assert_eq!(latest_table_route.version(), 0); - assert!(!latest_table_route.region_routes()[0].is_leader_downgraded()); + assert!(!latest_table_route.region_routes[0].is_leader_downgraded()); assert!(ctx.volatile_ctx.table_route.is_none()); } @@ -253,7 +253,7 @@ mod tests { .unwrap() .unwrap(); - assert!(latest_table_route.region_routes()[0].is_leader_downgraded()); + assert!(latest_table_route.region_routes[0].is_leader_downgraded()); assert!(ctx.volatile_ctx.table_route.is_none()); } } diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs index e7fa73dedf8d..6c1a2648535a 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs @@ -166,14 +166,15 @@ mod tests { state.rollback_downgraded_region(&mut ctx).await.unwrap(); - let table_route = table_metadata_manager + let region_routes = table_metadata_manager .table_route_manager() .get(table_id) .await .unwrap() .unwrap() - .into_inner(); - assert_eq!(&expected_region_routes, table_route.region_routes()); + .into_inner() + .region_routes; + assert_eq!(expected_region_routes, region_routes); } #[tokio::test] @@ -228,13 +229,14 @@ mod tests { assert!(ctx.volatile_ctx.table_route.is_none()); - let table_route = table_metadata_manager + let region_routes = table_metadata_manager .table_route_manager() .get(table_id) .await .unwrap() .unwrap() - .into_inner(); - assert_eq!(&expected_region_routes, table_route.region_routes()); + .into_inner() + .region_routes; + assert_eq!(expected_region_routes, region_routes); } } diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs index bb86280ba000..4886df0e5af4 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs @@ -33,7 +33,7 @@ impl UpdateMetadata { let region_id = ctx.region_id(); let table_route_value = ctx.get_table_route_value().await?.clone(); - let mut region_routes = table_route_value.region_routes().clone(); + let mut region_routes = table_route_value.region_routes.clone(); let region_route = region_routes .iter_mut() .find(|route| route.region.id == region_id) @@ -81,7 +81,7 @@ impl UpdateMetadata { let region_id = ctx.region_id(); let table_route_value = ctx.get_table_route_value().await?.clone(); - let region_routes = table_route_value.region_routes().clone(); + let region_routes = table_route_value.region_routes.clone(); let region_route = region_routes .into_iter() .find(|route| route.region.id == region_id) @@ -480,14 +480,14 @@ mod tests { let _ = next.as_any().downcast_ref::().unwrap(); - let table_route = table_metadata_manager + let region_routes = table_metadata_manager .table_route_manager() .get(table_id) .await .unwrap() .unwrap() - .into_inner(); - let region_routes = table_route.region_routes(); + .into_inner() + .region_routes; assert!(ctx.volatile_ctx.table_route.is_none()); assert!(ctx.volatile_ctx.opening_region_guard.is_none()); diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs index a5f5beeacd35..114a48beff72 100644 --- a/src/meta-srv/src/selector/load_based.rs +++ b/src/meta-srv/src/selector/load_based.rs @@ -143,7 +143,7 @@ async fn get_leader_peer_ids( .context(error::TableMetadataManagerSnafu) .map(|route| { route.map_or_else(Vec::new, |route| { - find_leaders(route.region_routes()) + find_leaders(&route.region_routes) .into_iter() .map(|peer| peer.id) .collect() diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index ad15c62cc1dd..41b3bef065f8 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -19,7 +19,7 @@ use api::v1::Rows; use common_meta::key::table_route::TableRouteManager; use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; -use common_meta::rpc::router::RegionRoutes; +use common_meta::rpc::router::{convert_to_region_leader_map, RegionRoutes}; use common_query::prelude::Expr; use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator}; use datatypes::prelude::Value; @@ -76,10 +76,39 @@ impl PartitionRuleManager { .context(error::FindTableRoutesSnafu { table_id })? .into_inner(); - Ok(RegionRoutes(route.region_routes().clone())) + Ok(RegionRoutes(route.region_routes)) } - pub async fn find_table_partitions(&self, table_id: TableId) -> Result> { + /// Find datanodes of corresponding regions of given table. + pub async fn find_region_datanodes( + &self, + table_id: TableId, + regions: Vec, + ) -> Result>> { + let route = self + .table_route_manager + .get(table_id) + .await + .context(error::TableRouteManagerSnafu)? + .context(error::FindTableRoutesSnafu { table_id })? + .into_inner(); + let mut datanodes = HashMap::with_capacity(regions.len()); + let region_map = convert_to_region_leader_map(&route.region_routes); + for region in regions.iter() { + let datanode = *region_map.get(region).context(error::FindDatanodeSnafu { + table_id, + region: *region, + })?; + datanodes + .entry(datanode.clone()) + .or_insert_with(Vec::new) + .push(*region); + } + Ok(datanodes) + } + + /// Find all leader peers of given table. + pub async fn find_table_region_leaders(&self, table_id: TableId) -> Result> { let route = self .table_route_manager .get(table_id) @@ -87,15 +116,33 @@ impl PartitionRuleManager { .context(error::TableRouteManagerSnafu)? .context(error::FindTableRoutesSnafu { table_id })? .into_inner(); - let region_routes = route.region_routes(); + let mut peers = Vec::with_capacity(route.region_routes.len()); + + for peer in &route.region_routes { + peers.push(peer.leader_peer.clone().with_context(|| FindLeaderSnafu { + region_id: peer.region.id, + table_id, + })?); + } + Ok(peers) + } + + pub async fn find_table_partitions(&self, table_id: TableId) -> Result> { + let route = self + .table_route_manager + .get(table_id) + .await + .context(error::TableRouteManagerSnafu)? + .context(error::FindTableRoutesSnafu { table_id })? + .into_inner(); ensure!( - !region_routes.is_empty(), + !route.region_routes.is_empty(), error::FindTableRoutesSnafu { table_id } ); - let mut partitions = Vec::with_capacity(region_routes.len()); - for r in region_routes { + let mut partitions = Vec::with_capacity(route.region_routes.len()); + for r in route.region_routes.iter() { let partition = r .region .partition diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index e9731cc336fa..e997139b5357 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -521,7 +521,7 @@ CREATE TABLE {table_name} ( .unwrap() .into_inner(); - let region_to_dn_map = region_distribution(table_route_value.region_routes()) + let region_to_dn_map = region_distribution(&table_route_value.region_routes) .unwrap() .iter() .map(|(k, v)| (v[0], *k)) diff --git a/tests-integration/src/instance.rs b/tests-integration/src/instance.rs index 05253dc0a236..ac5a2e4b3ca9 100644 --- a/tests-integration/src/instance.rs +++ b/tests-integration/src/instance.rs @@ -216,7 +216,7 @@ mod tests { .unwrap() .into_inner(); - let region_to_dn_map = region_distribution(table_route_value.region_routes()) + let region_to_dn_map = region_distribution(&table_route_value.region_routes) .unwrap() .iter() .map(|(k, v)| (v[0], *k))