Skip to content

Commit

Permalink
add error defines and checks
Browse files Browse the repository at this point in the history
  • Loading branch information
AntiTopQuark committed Dec 28, 2023
1 parent bafa459 commit 25316e9
Show file tree
Hide file tree
Showing 16 changed files with 106 additions and 46 deletions.
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl AlterTableProcedure {
.await?
.context(TableRouteNotFoundSnafu { table_id })?
.into_inner();
let region_routes = table_route.region_routes();
let region_routes = table_route.region_routes()?;

let leaders = find_leaders(region_routes);
let mut alter_region_tasks = Vec::with_capacity(leaders.len());
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl CreateTableProcedure {
.context(TableRouteNotFoundSnafu {
table_id: physical_table_id,
})?;
let region_routes = physical_table_route.region_routes();
let region_routes = physical_table_route.region_routes()?;

let request_builder = self.new_region_request_builder(Some(physical_table_id))?;

Expand Down
6 changes: 3 additions & 3 deletions src/common/meta/src/ddl/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl DropTableProcedure {

/// Register dropping regions if doesn't exist.
fn register_dropping_regions(&mut self) -> Result<()> {
let region_routes = self.data.region_routes();
let region_routes = self.data.region_routes()?;

let dropping_regions = operating_leader_regions(region_routes);

Expand Down Expand Up @@ -190,7 +190,7 @@ impl DropTableProcedure {
pub async fn on_datanode_drop_regions(&self) -> Result<Status> {
let table_id = self.data.table_id();

let region_routes = &self.data.region_routes();
let region_routes = &self.data.region_routes()?;
let leaders = find_leaders(region_routes);
let mut drop_region_tasks = Vec::with_capacity(leaders.len());

Expand Down Expand Up @@ -306,7 +306,7 @@ impl DropTableData {
self.task.table_ref()
}

fn region_routes(&self) -> &Vec<RegionRoute> {
fn region_routes(&self) -> Result<&Vec<RegionRoute>> {
self.table_route_value.region_routes()
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ async fn handle_truncate_table_task(
let table_route_value =
table_route_value.context(error::TableRouteNotFoundSnafu { table_id })?;

let table_route = table_route_value.into_inner().region_routes().clone();
let table_route = table_route_value.into_inner().region_routes()?.clone();

let id = ddl_manager
.submit_truncate_table_task(
Expand Down
9 changes: 8 additions & 1 deletion src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,12 @@ pub enum Error {

#[snafu(display("The topic pool is empty"))]
EmptyTopicPool { location: Location },

#[snafu(display("Unexpected table route type: {}", err_msg))]
UnexpectedTableRouteType {
location: Location,
err_msg: String,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -369,7 +375,8 @@ impl ErrorExt for Error {
| BuildKafkaClient { .. }
| BuildKafkaCtrlClient { .. }
| CreateKafkaWalTopic { .. }
| EmptyTopicPool { .. } => StatusCode::Unexpected,
| EmptyTopicPool { .. }
| UnexpectedTableRouteType { .. } => StatusCode::Unexpected,

SendMessage { .. }
| GetKvCache { .. }
Expand Down
10 changes: 5 additions & 5 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ impl TableMetadataManager {
.build_delete_txn(table_id, table_info_value)?;

// Deletes datanode table key value pairs.
let distribution = region_distribution(table_route_value.region_routes())?;
let distribution = region_distribution(table_route_value.region_routes()?)?;
let delete_datanode_txn = self
.datanode_table_manager()
.build_delete_txn(table_id, distribution)?;
Expand Down Expand Up @@ -608,7 +608,7 @@ impl TableMetadataManager {
) -> Result<()> {
// Updates the datanode table key value pairs.
let current_region_distribution =
region_distribution(current_table_route_value.region_routes())?;
region_distribution(current_table_route_value.region_routes()?)?;
let new_region_distribution = region_distribution(&new_region_routes)?;

let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
Expand All @@ -621,7 +621,7 @@ impl TableMetadataManager {
)?;

// Updates the table_route.
let new_table_route_value = current_table_route_value.update(new_region_routes);
let new_table_route_value = current_table_route_value.update(new_region_routes)?;

let (update_table_route_txn, on_update_table_route_failure) = self
.table_route_manager()
Expand Down Expand Up @@ -656,7 +656,7 @@ impl TableMetadataManager {
where
F: Fn(&RegionRoute) -> Option<Option<RegionStatus>>,
{
let mut new_region_routes = current_table_route_value.region_routes().clone();
let mut new_region_routes = current_table_route_value.region_routes()?.clone();

let mut updated = 0;
for route in &mut new_region_routes {
Expand All @@ -673,7 +673,7 @@ impl TableMetadataManager {
}

// Updates the table_route.
let new_table_route_value = current_table_route_value.update(new_region_routes);
let new_table_route_value = current_table_route_value.update(new_region_routes)?;

let (update_table_route_txn, on_update_table_route_failure) = self
.table_route_manager()
Expand Down
62 changes: 37 additions & 25 deletions src/common/meta/src/key/table_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ use std::collections::HashMap;
use std::fmt::Display;

use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use snafu::{ResultExt, ensure};
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;

use super::{DeserializedValueWithBytes, TableMetaValue};
use crate::error::{Result, SerdeJsonSnafu};
use crate::error::{Result, SerdeJsonSnafu, UnexpectedTableRouteTypeSnafu};
use crate::key::{to_removed_key, RegionDistribution, TableMetaKey, TABLE_ROUTE_PREFIX};
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse};
use crate::kv_backend::KvBackendRef;
Expand Down Expand Up @@ -65,15 +65,18 @@ impl TableRouteValue {
///
/// # Panics
/// The route type is not the [TableRouteValue::Physical].
pub fn update(&self, region_routes: Vec<RegionRoute>) -> Self {
if (!self.is_physical()) {
panic!("Mistakenly been treated as a Physical TableRoute: {self:?}");
}
pub fn update(&self, region_routes: Vec<RegionRoute>) -> Result<Self> {
ensure!(
self.is_physical(),
UnexpectedTableRouteTypeSnafu {
err_msg: "{self:?} is a non-physical TableRouteValue.",
}
);
let version = self.physical_table_route().version;
Self::Physical(PhysicalTableRouteValue {
Ok(Self::Physical(PhysicalTableRouteValue {
region_routes,
version: version + 1,
})
}))
}

/// Returns the version.
Expand All @@ -83,26 +86,32 @@ impl TableRouteValue {
/// # Panics
/// The route type is not the [TableRouteValue::Physical].
#[cfg(any(test, feature = "testing"))]
pub fn version(&self) -> u64 {
if (!self.is_physical()) {
panic!("Mistakenly been treated as a Physical TableRoute: {self:?}");
}
self.physical_table_route().version
pub fn version(&self) -> Result<u64> {
ensure!(
self.is_physical(),
UnexpectedTableRouteTypeSnafu {
err_msg: "{self:?} is a non-physical TableRouteValue.",
}
);
Ok(self.physical_table_route().version)
}

/// Returns the corresponding [RegionRoute].
///
/// # Panics
/// The route type is not the [TableRouteValue::Physical].
pub fn region_route(&self, region_id: RegionId) -> Option<RegionRoute> {
if (!self.is_physical()) {
panic!("Mistakenly been treated as a Physical TableRoute: {self:?}");
}
self.physical_table_route()
pub fn region_route(&self, region_id: RegionId) -> Result<Option<RegionRoute>> {
ensure!(
self.is_physical(),
UnexpectedTableRouteTypeSnafu {
err_msg: "{self:?} is a non-physical TableRouteValue.",
}
);
Ok(self.physical_table_route()
.region_routes
.iter()
.find(|route| route.region.id == region_id)
.cloned()
.cloned())
}

/// Returns true if it's [TableRouteValue::Physical].
Expand All @@ -114,11 +123,14 @@ impl TableRouteValue {
///
/// # Panics
/// The route type is not the [TableRouteValue::Physical].
pub fn region_routes(&self) -> &Vec<RegionRoute> {
if (!self.is_physical()) {
panic!("Mistakenly been treated as a Physical TableRoute: {self:?}");
}
&self.physical_table_route().region_routes
pub fn region_routes(&self) -> Result<&Vec<RegionRoute>> {
ensure!(
self.is_physical(),
UnexpectedTableRouteTypeSnafu {
err_msg: "{self:?} is a non-physical TableRouteValue.",
}
);
Ok(&self.physical_table_route().region_routes)
}

fn physical_table_route(&self) -> &PhysicalTableRouteValue {
Expand Down Expand Up @@ -375,7 +387,7 @@ impl TableRouteManager {
) -> Result<Option<RegionDistribution>> {
self.get(table_id)
.await?
.map(|table_route| region_distribution(table_route.region_routes()))
.map(|table_route| region_distribution(table_route.region_routes()?))
.transpose()
}
}
Expand Down
10 changes: 9 additions & 1 deletion src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,13 @@ pub enum Error {

#[snafu(display("Weight array is not set"))]
NotSetWeightArray { location: Location },

#[snafu(display("Unexpected table route type: {}", err_msg))]
UnexpectedTableRouteType {
location: Location,
err_msg: String,
source: common_meta::error::Error,
},
}

impl Error {
Expand Down Expand Up @@ -713,7 +720,8 @@ impl ErrorExt for Error {
| Error::TableMetadataManager { source, .. }
| Error::KvBackend { source, .. }
| Error::UpdateTableRoute { source, .. }
| Error::GetFullTableInfo { source, .. } => source.status_code(),
| Error::GetFullTableInfo { source, .. }
| Error::UnexpectedTableRouteType { source, .. } => source.status_code(),

Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } => {
source.status_code()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,11 @@ impl UpdateRegionMetadata {
.context(error::TableMetadataManagerSnafu)?
.context(TableRouteNotFoundSnafu { table_id })?;

let mut new_region_routes = table_route_value.region_routes().clone();
let mut new_region_routes = table_route_value.region_routes()
.context(error::UnexpectedTableRouteTypeSnafu {
err_msg: "{self:?} is a non-physical TableRouteValue.",
})?
.clone();

for region_route in new_region_routes.iter_mut() {
if region_route.region.id.region_number() == failed_region.region_number {
Expand Down
3 changes: 3 additions & 0 deletions src/meta-srv/src/procedure/region_migration/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ impl RegionMigrationManager {
// Safety: checked before.
let region_route = table_route
.region_route(region_id)
.context(error::UnexpectedTableRouteTypeSnafu {
err_msg: "{self:?} is a non-physical TableRouteValue.",
})?
.context(error::RegionRouteNotFoundSnafu { region_id })?;

if self.has_migrated(&region_route, &task)? {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use common_meta::rpc::router::RegionRoute;
use common_procedure::Status;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use snafu::ResultExt;
use store_api::storage::RegionId;

use super::migration_end::RegionMigrationEnd;
Expand Down Expand Up @@ -85,6 +86,9 @@ impl RegionMigrationStart {

let region_route = table_route
.region_routes()
.context(error::UnexpectedTableRouteTypeSnafu {
err_msg: "{self:?} is a non-physical TableRouteValue.",
})?
.iter()
.find(|route| route.region.id == region_id)
.cloned()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ impl UpdateMetadata {
let region_id = ctx.region_id();
let table_route_value = ctx.get_table_route_value().await?.clone();

let mut region_routes = table_route_value.region_routes().clone();
let mut region_routes = table_route_value.region_routes()
.context(error::UnexpectedTableRouteTypeSnafu {
err_msg: "{self:?} is a non-physical TableRouteValue.",
})?
.clone();
let region_route = region_routes
.iter_mut()
.find(|route| route.region.id == region_id)
Expand Down Expand Up @@ -81,7 +85,11 @@ impl UpdateMetadata {
let region_id = ctx.region_id();
let table_route_value = ctx.get_table_route_value().await?.clone();

let region_routes = table_route_value.region_routes().clone();
let region_routes = table_route_value.region_routes()
.context(error::UnexpectedTableRouteTypeSnafu {
err_msg: "{self:?} is a non-physical TableRouteValue.",
})?
.clone();
let region_route = region_routes
.into_iter()
.find(|route| route.region.id == region_id)
Expand Down
2 changes: 1 addition & 1 deletion src/meta-srv/src/region/lease_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl RegionLeaseKeeper {
}

if let Some(table_route) = table_metadata.get(&region_id.table_id()) {
if let Some(region_route) = table_route.region_route(region_id) {
if let Ok(Some(region_route)) = table_route.region_route(region_id) {
return renew_region_lease_via_region_route(&region_route, datanode_id, region_id);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta-srv/src/selector/load_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ async fn get_leader_peer_ids(
.context(error::TableMetadataManagerSnafu)
.map(|route| {
route.map_or_else(Vec::new, |route| {
find_leaders(route.region_routes())
find_leaders(route.region_routes().expect("expected physical table route"))
.into_iter()
.map(|peer| peer.id)
.collect()
Expand Down
8 changes: 8 additions & 0 deletions src/partition/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ pub enum Error {
region_id: RegionId,
location: Location,
},

#[snafu(display("Unexpected table route type: {}", err_msg))]
UnexpectedTableRouteType {
location: Location,
err_msg: String,
source: common_meta::error::Error,
},
}

impl ErrorExt for Error {
Expand All @@ -138,6 +145,7 @@ impl ErrorExt for Error {
Error::FindDatanode { .. } => StatusCode::InvalidArguments,
Error::TableRouteManager { source, .. } => source.status_code(),
Error::MissingDefaultValue { .. } => StatusCode::Internal,
Error::UnexpectedTableRouteType { source, .. } => source.status_code(),
}
}

Expand Down
12 changes: 9 additions & 3 deletions src/partition/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,11 @@ impl PartitionRuleManager {
.context(error::TableRouteManagerSnafu)?
.context(error::FindTableRoutesSnafu { table_id })?
.into_inner();

Ok(RegionRoutes(route.region_routes().clone()))
let region_routes = route.region_routes()
.context(error::UnexpectedTableRouteTypeSnafu {
err_msg: "{self:?} is a non-physical TableRouteValue.",
})?;
Ok(RegionRoutes(region_routes.clone()))
}

pub async fn find_table_partitions(&self, table_id: TableId) -> Result<Vec<PartitionInfo>> {
Expand All @@ -87,7 +90,10 @@ impl PartitionRuleManager {
.context(error::TableRouteManagerSnafu)?
.context(error::FindTableRoutesSnafu { table_id })?
.into_inner();
let region_routes = route.region_routes();
let region_routes = route.region_routes()
.context(error::UnexpectedTableRouteTypeSnafu {
err_msg: "{self:?} is a non-physical TableRouteValue.",
})?;

ensure!(
!region_routes.is_empty(),
Expand Down

0 comments on commit 25316e9

Please sign in to comment.