diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 6a0a385e3e38..dd94668b5d84 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -332,10 +332,7 @@ pub enum Error { EmptyTopicPool { location: Location }, #[snafu(display("Unexpected table route type: {}", err_msg))] - UnexpectedTableRouteType { - location: Location, - err_msg: String, - }, + UnexpectedTableRouteType { location: Location, err_msg: String }, } pub type Result = std::result::Result; diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index eda184322874..57de421be202 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -897,7 +897,11 @@ mod tests { table_info ); assert_eq!( - remote_table_route.unwrap().into_inner().region_routes(), + remote_table_route + .unwrap() + .into_inner() + .region_routes() + .unwrap(), region_routes ); } @@ -978,7 +982,7 @@ mod tests { .unwrap() .unwrap() .into_inner(); - assert_eq!(removed_table_route.region_routes(), region_routes); + assert_eq!(removed_table_route.region_routes().unwrap(), region_routes); } #[tokio::test] @@ -1173,11 +1177,11 @@ mod tests { .unwrap(); assert_eq!( - updated_route_value.region_routes()[0].leader_status, + updated_route_value.region_routes().unwrap()[0].leader_status, Some(RegionStatus::Downgraded) ); assert_eq!( - updated_route_value.region_routes()[1].leader_status, + updated_route_value.region_routes().unwrap()[1].leader_status, Some(RegionStatus::Downgraded) ); } @@ -1271,7 +1275,8 @@ mod tests { let current_table_route_value = DeserializedValueWithBytes::from_inner( current_table_route_value .inner - .update(new_region_routes.clone()), + .update(new_region_routes.clone()) + .unwrap(), ); let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)]; // it should be ok. @@ -1295,13 +1300,16 @@ mod tests { // if the current_table_route_value is wrong, it should return an error. // The ABA problem. - let wrong_table_route_value = - DeserializedValueWithBytes::from_inner(current_table_route_value.update(vec![ - new_region_route(1, 1), - new_region_route(2, 2), - new_region_route(3, 3), - new_region_route(4, 4), - ])); + let wrong_table_route_value = DeserializedValueWithBytes::from_inner( + current_table_route_value + .update(vec![ + new_region_route(1, 1), + new_region_route(2, 2), + new_region_route(3, 3), + new_region_route(4, 4), + ]) + .unwrap(), + ); assert!(table_metadata_manager .update_table_route( table_id, diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index 8cf0337f430c..537457ad3291 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use std::fmt::Display; use serde::{Deserialize, Serialize}; -use snafu::{ResultExt, ensure}; +use snafu::{ensure, ResultExt}; use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; @@ -62,7 +62,7 @@ impl TableRouteValue { } /// Returns a new version [TableRouteValue] with `region_routes`. - /// + /// /// # Panics /// The route type is not the [TableRouteValue::Physical]. pub fn update(&self, region_routes: Vec) -> Result { @@ -82,7 +82,7 @@ impl TableRouteValue { /// Returns the version. /// /// For test purpose. - /// + /// /// # Panics /// The route type is not the [TableRouteValue::Physical]. #[cfg(any(test, feature = "testing"))] @@ -97,7 +97,7 @@ impl TableRouteValue { } /// Returns the corresponding [RegionRoute]. - /// + /// /// # Panics /// The route type is not the [TableRouteValue::Physical]. pub fn region_route(&self, region_id: RegionId) -> Result> { @@ -107,7 +107,8 @@ impl TableRouteValue { err_msg: "{self:?} is a non-physical TableRouteValue.", } ); - Ok(self.physical_table_route() + Ok(self + .physical_table_route() .region_routes .iter() .find(|route| route.region.id == region_id) diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 4839de185294..a44b4a6f50f2 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -720,7 +720,7 @@ impl ErrorExt for Error { | Error::TableMetadataManager { source, .. } | Error::KvBackend { source, .. } | Error::UpdateTableRoute { source, .. } - | Error::GetFullTableInfo { source, .. } + | Error::GetFullTableInfo { source, .. } | Error::UnexpectedTableRouteType { source, .. } => source.status_code(), Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } => { diff --git a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs index c2d06590aec2..650c794126a6 100644 --- a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs @@ -208,6 +208,7 @@ mod tests { let should_downgraded = table_route_value .region_routes() + .unwrap() .iter() .find(|route| route.region.id.region_number() == failed_region.region_number) .unwrap(); diff --git a/src/meta-srv/src/procedure/region_failover/update_metadata.rs b/src/meta-srv/src/procedure/region_failover/update_metadata.rs index 53f7d15464b4..66f64dd597c6 100644 --- a/src/meta-srv/src/procedure/region_failover/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_failover/update_metadata.rs @@ -85,9 +85,10 @@ impl UpdateRegionMetadata { .context(error::TableMetadataManagerSnafu)? .context(TableRouteNotFoundSnafu { table_id })?; - let mut new_region_routes = table_route_value.region_routes() - .context(error::UnexpectedTableRouteTypeSnafu { - err_msg: "{self:?} is a non-physical TableRouteValue.", + let mut new_region_routes = table_route_value + .region_routes() + .context(error::UnexpectedTableRouteTypeSnafu { + err_msg: "{self:?} is a non-physical TableRouteValue.", })? .clone(); @@ -238,6 +239,7 @@ mod tests { .unwrap() .into_inner() .region_routes() + .unwrap() .clone() } @@ -400,8 +402,8 @@ mod tests { .unwrap() .into_inner(); - let peers = &extract_all_peers(table_route_value.region_routes()); - let actual = table_route_value.region_routes(); + let peers = &extract_all_peers(table_route_value.region_routes().unwrap()); + let actual = table_route_value.region_routes().unwrap(); let expected = &vec![ new_region_route(1, peers, 2), new_region_route(2, peers, 3), @@ -420,7 +422,7 @@ mod tests { .unwrap() .into_inner(); - let map = region_distribution(table_route_value.region_routes()).unwrap(); + let map = region_distribution(table_route_value.region_routes().unwrap()).unwrap(); assert_eq!(map.len(), 2); assert_eq!(map.get(&2), Some(&vec![1, 3])); assert_eq!(map.get(&3), Some(&vec![2, 4])); diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 5299972236e0..deb08ce52641 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -735,7 +735,7 @@ mod tests { .unwrap() .version(); // Should be unchanged. - assert_eq!(table_routes_version, 0); + assert_eq!(table_routes_version.unwrap(), 0); } #[tokio::test] diff --git a/src/meta-srv/src/procedure/region_migration/manager.rs b/src/meta-srv/src/procedure/region_migration/manager.rs index 30da1b7e4e29..4911aa2a517e 100644 --- a/src/meta-srv/src/procedure/region_migration/manager.rs +++ b/src/meta-srv/src/procedure/region_migration/manager.rs @@ -239,8 +239,8 @@ impl RegionMigrationManager { // Safety: checked before. let region_route = table_route .region_route(region_id) - .context(error::UnexpectedTableRouteTypeSnafu { - err_msg: "{self:?} is a non-physical TableRouteValue.", + .context(error::UnexpectedTableRouteTypeSnafu { + err_msg: "{self:?} is a non-physical TableRouteValue.", })? .context(error::RegionRouteNotFoundSnafu { region_id })?; diff --git a/src/meta-srv/src/procedure/region_migration/migration_start.rs b/src/meta-srv/src/procedure/region_migration/migration_start.rs index 95f0ed79bd66..097ed0bca737 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_start.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_start.rs @@ -18,8 +18,7 @@ use common_meta::peer::Peer; use common_meta::rpc::router::RegionRoute; use common_procedure::Status; use serde::{Deserialize, Serialize}; -use snafu::OptionExt; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; use super::migration_end::RegionMigrationEnd; @@ -86,8 +85,8 @@ impl RegionMigrationStart { let region_route = table_route .region_routes() - .context(error::UnexpectedTableRouteTypeSnafu { - err_msg: "{self:?} is a non-physical TableRouteValue.", + .context(error::UnexpectedTableRouteTypeSnafu { + err_msg: "{self:?} is a non-physical TableRouteValue.", })? .iter() .find(|route| route.region.id == region_id) diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index f3a0183f233f..8255504c9a7a 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -419,7 +419,7 @@ impl ProcedureMigrationTestSuite { .unwrap() .unwrap() .into_inner(); - let region_routes = table_route.region_routes(); + let region_routes = table_route.region_routes().unwrap(); let expected_leader_id = self.context.persistent_ctx.to_peer.id; let removed_follower_id = self.context.persistent_ctx.from_peer.id; diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs index cc67aa7ca8e9..818aadd9cda6 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs @@ -208,8 +208,8 @@ mod tests { .unwrap(); // It should remain unchanged. - assert_eq!(latest_table_route.version(), 0); - assert!(!latest_table_route.region_routes()[0].is_leader_downgraded()); + assert_eq!(latest_table_route.version().unwrap(), 0); + assert!(!latest_table_route.region_routes().unwrap()[0].is_leader_downgraded()); assert!(ctx.volatile_ctx.table_route.is_none()); } @@ -249,7 +249,7 @@ mod tests { .unwrap() .unwrap(); - assert!(latest_table_route.region_routes()[0].is_leader_downgraded()); + assert!(latest_table_route.region_routes().unwrap()[0].is_leader_downgraded()); assert!(ctx.volatile_ctx.table_route.is_none()); } } diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs index 7281737752a4..844188f2f1f9 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs @@ -170,7 +170,10 @@ mod tests { .unwrap() .unwrap() .into_inner(); - assert_eq!(&expected_region_routes, table_route.region_routes()); + assert_eq!( + &expected_region_routes, + table_route.region_routes().unwrap() + ); } #[tokio::test] @@ -231,6 +234,9 @@ mod tests { .unwrap() .unwrap() .into_inner(); - assert_eq!(&expected_region_routes, table_route.region_routes()); + assert_eq!( + &expected_region_routes, + table_route.region_routes().unwrap() + ); } } diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs index 2028e177a37c..001e79b84e5f 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs @@ -33,9 +33,10 @@ impl UpdateMetadata { let region_id = ctx.region_id(); let table_route_value = ctx.get_table_route_value().await?.clone(); - let mut region_routes = table_route_value.region_routes() - .context(error::UnexpectedTableRouteTypeSnafu { - err_msg: "{self:?} is a non-physical TableRouteValue.", + let mut region_routes = table_route_value + .region_routes() + .context(error::UnexpectedTableRouteTypeSnafu { + err_msg: "{self:?} is a non-physical TableRouteValue.", })? .clone(); let region_route = region_routes @@ -85,9 +86,10 @@ impl UpdateMetadata { let region_id = ctx.region_id(); let table_route_value = ctx.get_table_route_value().await?.clone(); - let region_routes = table_route_value.region_routes() - .context(error::UnexpectedTableRouteTypeSnafu { - err_msg: "{self:?} is a non-physical TableRouteValue.", + let region_routes = table_route_value + .region_routes() + .context(error::UnexpectedTableRouteTypeSnafu { + err_msg: "{self:?} is a non-physical TableRouteValue.", })? .clone(); let region_route = region_routes @@ -473,7 +475,9 @@ mod tests { .unwrap() .unwrap() .into_inner(); - let region_routes = table_route.region_routes(); + let region_routes = table_route + .region_routes() + .expect("expected physical table route"); assert!(ctx.volatile_ctx.table_route.is_none()); assert!(ctx.volatile_ctx.opening_region_guard.is_none()); diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs index 095e24a4e937..e1e99b6e032c 100644 --- a/src/meta-srv/src/selector/load_based.rs +++ b/src/meta-srv/src/selector/load_based.rs @@ -143,10 +143,14 @@ async fn get_leader_peer_ids( .context(error::TableMetadataManagerSnafu) .map(|route| { route.map_or_else(Vec::new, |route| { - find_leaders(route.region_routes().expect("expected physical table route")) - .into_iter() - .map(|peer| peer.id) - .collect() + find_leaders( + route + .region_routes() + .expect("expected physical table route"), + ) + .into_iter() + .map(|peer| peer.id) + .collect() }) }) } diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index f3845fe449ff..81bbc8f9ed2f 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -75,10 +75,12 @@ impl PartitionRuleManager { .context(error::TableRouteManagerSnafu)? .context(error::FindTableRoutesSnafu { table_id })? .into_inner(); - let region_routes = route.region_routes() - .context(error::UnexpectedTableRouteTypeSnafu { - err_msg: "{self:?} is a non-physical TableRouteValue.", - })?; + let region_routes = + route + .region_routes() + .context(error::UnexpectedTableRouteTypeSnafu { + err_msg: "{self:?} is a non-physical TableRouteValue.", + })?; Ok(RegionRoutes(region_routes.clone())) } @@ -90,10 +92,12 @@ impl PartitionRuleManager { .context(error::TableRouteManagerSnafu)? .context(error::FindTableRoutesSnafu { table_id })? .into_inner(); - let region_routes = route.region_routes() - .context(error::UnexpectedTableRouteTypeSnafu { - err_msg: "{self:?} is a non-physical TableRouteValue.", - })?; + let region_routes = + route + .region_routes() + .context(error::UnexpectedTableRouteTypeSnafu { + err_msg: "{self:?} is a non-physical TableRouteValue.", + })?; ensure!( !region_routes.is_empty(), diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index e9731cc336fa..24cd470c3905 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -521,11 +521,15 @@ CREATE TABLE {table_name} ( .unwrap() .into_inner(); - let region_to_dn_map = region_distribution(table_route_value.region_routes()) - .unwrap() - .iter() - .map(|(k, v)| (v[0], *k)) - .collect::>(); + let region_to_dn_map = region_distribution( + table_route_value + .region_routes() + .expect("physical table route"), + ) + .unwrap() + .iter() + .map(|(k, v)| (v[0], *k)) + .collect::>(); assert!(region_to_dn_map.len() <= instance.datanodes().len()); let stmt = QueryLanguageParser::parse_sql(&format!( diff --git a/tests-integration/src/instance.rs b/tests-integration/src/instance.rs index 05253dc0a236..5b7ed080d9d9 100644 --- a/tests-integration/src/instance.rs +++ b/tests-integration/src/instance.rs @@ -216,11 +216,15 @@ mod tests { .unwrap() .into_inner(); - let region_to_dn_map = region_distribution(table_route_value.region_routes()) - .unwrap() - .iter() - .map(|(k, v)| (v[0], *k)) - .collect::>(); + let region_to_dn_map = region_distribution( + table_route_value + .region_routes() + .expect("region routes should be physical"), + ) + .unwrap() + .iter() + .map(|(k, v)| (v[0], *k)) + .collect::>(); assert!(region_to_dn_map.len() <= instance.datanodes().len()); let stmt = QueryLanguageParser::parse_sql("SELECT ts, host FROM demo ORDER BY ts").unwrap();