Skip to content

Commit

Permalink
feat(TableRouteValue): add panic notes and type checks (#3031)
Browse files Browse the repository at this point in the history
* refactor(TableRouteValue): add panic notes and type checks

* chore: add deprecate develop branch warning

Signed-off-by: Ruihang Xia <[email protected]>

* add error defines and checks

* Update README.md

* update code format and fix tests

* update name of error

* delete unused note

* fix unsafe .expect() for region_route()

* update error name

* update unwrap

* update code format

---------

Signed-off-by: Ruihang Xia <[email protected]>
Co-authored-by: Ruihang Xia <[email protected]>
  • Loading branch information
AntiTopQuark and waynexia authored Dec 30, 2023
1 parent 69a5313 commit 4460af8
Show file tree
Hide file tree
Showing 23 changed files with 180 additions and 77 deletions.
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl AlterTableProcedure {
.await?
.context(TableRouteNotFoundSnafu { table_id })?
.into_inner();
let region_routes = table_route.region_routes();
let region_routes = table_route.region_routes()?;

let leaders = find_leaders(region_routes);
let mut alter_region_tasks = Vec::with_capacity(leaders.len());
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl CreateTableProcedure {
.context(TableRouteNotFoundSnafu {
table_id: physical_table_id,
})?;
let region_routes = physical_table_route.region_routes();
let region_routes = physical_table_route.region_routes()?;

let request_builder = self.new_region_request_builder(Some(physical_table_id))?;

Expand Down
6 changes: 3 additions & 3 deletions src/common/meta/src/ddl/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl DropTableProcedure {

/// Register dropping regions if doesn't exist.
fn register_dropping_regions(&mut self) -> Result<()> {
let region_routes = self.data.region_routes();
let region_routes = self.data.region_routes()?;

let dropping_regions = operating_leader_regions(region_routes);

Expand Down Expand Up @@ -190,7 +190,7 @@ impl DropTableProcedure {
pub async fn on_datanode_drop_regions(&self) -> Result<Status> {
let table_id = self.data.table_id();

let region_routes = &self.data.region_routes();
let region_routes = &self.data.region_routes()?;
let leaders = find_leaders(region_routes);
let mut drop_region_tasks = Vec::with_capacity(leaders.len());

Expand Down Expand Up @@ -306,7 +306,7 @@ impl DropTableData {
self.task.table_ref()
}

fn region_routes(&self) -> &Vec<RegionRoute> {
fn region_routes(&self) -> Result<&Vec<RegionRoute>> {
self.table_route_value.region_routes()
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ async fn handle_truncate_table_task(
let table_route_value =
table_route_value.context(error::TableRouteNotFoundSnafu { table_id })?;

let table_route = table_route_value.into_inner().region_routes().clone();
let table_route = table_route_value.into_inner().region_routes()?.clone();

let id = ddl_manager
.submit_truncate_table_task(
Expand Down
6 changes: 5 additions & 1 deletion src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,9 @@ pub enum Error {

#[snafu(display("The topic pool is empty"))]
EmptyTopicPool { location: Location },

#[snafu(display("Unexpected table route type: {}", err_msg))]
UnexpectedLogicalRouteTable { location: Location, err_msg: String },
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -392,7 +395,8 @@ impl ErrorExt for Error {
| BuildKafkaPartitionClient { .. }
| ProduceRecord { .. }
| CreateKafkaWalTopic { .. }
| EmptyTopicPool { .. } => StatusCode::Unexpected,
| EmptyTopicPool { .. }
| UnexpectedLogicalRouteTable { .. } => StatusCode::Unexpected,

SendMessage { .. }
| GetKvCache { .. }
Expand Down
42 changes: 25 additions & 17 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ impl TableMetadataManager {
.build_delete_txn(table_id, table_info_value)?;

// Deletes datanode table key value pairs.
let distribution = region_distribution(table_route_value.region_routes())?;
let distribution = region_distribution(table_route_value.region_routes()?)?;
let delete_datanode_txn = self
.datanode_table_manager()
.build_delete_txn(table_id, distribution)?;
Expand Down Expand Up @@ -608,7 +608,7 @@ impl TableMetadataManager {
) -> Result<()> {
// Updates the datanode table key value pairs.
let current_region_distribution =
region_distribution(current_table_route_value.region_routes())?;
region_distribution(current_table_route_value.region_routes()?)?;
let new_region_distribution = region_distribution(&new_region_routes)?;

let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
Expand All @@ -621,7 +621,7 @@ impl TableMetadataManager {
)?;

// Updates the table_route.
let new_table_route_value = current_table_route_value.update(new_region_routes);
let new_table_route_value = current_table_route_value.update(new_region_routes)?;

let (update_table_route_txn, on_update_table_route_failure) = self
.table_route_manager()
Expand Down Expand Up @@ -656,7 +656,7 @@ impl TableMetadataManager {
where
F: Fn(&RegionRoute) -> Option<Option<RegionStatus>>,
{
let mut new_region_routes = current_table_route_value.region_routes().clone();
let mut new_region_routes = current_table_route_value.region_routes()?.clone();

let mut updated = 0;
for route in &mut new_region_routes {
Expand All @@ -673,7 +673,7 @@ impl TableMetadataManager {
}

// Updates the table_route.
let new_table_route_value = current_table_route_value.update(new_region_routes);
let new_table_route_value = current_table_route_value.update(new_region_routes)?;

let (update_table_route_txn, on_update_table_route_failure) = self
.table_route_manager()
Expand Down Expand Up @@ -897,7 +897,11 @@ mod tests {
table_info
);
assert_eq!(
remote_table_route.unwrap().into_inner().region_routes(),
remote_table_route
.unwrap()
.into_inner()
.region_routes()
.unwrap(),
region_routes
);
}
Expand Down Expand Up @@ -978,7 +982,7 @@ mod tests {
.unwrap()
.unwrap()
.into_inner();
assert_eq!(removed_table_route.region_routes(), region_routes);
assert_eq!(removed_table_route.region_routes().unwrap(), region_routes);
}

#[tokio::test]
Expand Down Expand Up @@ -1173,11 +1177,11 @@ mod tests {
.unwrap();

assert_eq!(
updated_route_value.region_routes()[0].leader_status,
updated_route_value.region_routes().unwrap()[0].leader_status,
Some(RegionStatus::Downgraded)
);
assert_eq!(
updated_route_value.region_routes()[1].leader_status,
updated_route_value.region_routes().unwrap()[1].leader_status,
Some(RegionStatus::Downgraded)
);
}
Expand Down Expand Up @@ -1271,7 +1275,8 @@ mod tests {
let current_table_route_value = DeserializedValueWithBytes::from_inner(
current_table_route_value
.inner
.update(new_region_routes.clone()),
.update(new_region_routes.clone())
.unwrap(),
);
let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)];
// it should be ok.
Expand All @@ -1295,13 +1300,16 @@ mod tests {

// if the current_table_route_value is wrong, it should return an error.
// The ABA problem.
let wrong_table_route_value =
DeserializedValueWithBytes::from_inner(current_table_route_value.update(vec![
new_region_route(1, 1),
new_region_route(2, 2),
new_region_route(3, 3),
new_region_route(4, 4),
]));
let wrong_table_route_value = DeserializedValueWithBytes::from_inner(
current_table_route_value
.update(vec![
new_region_route(1, 1),
new_region_route(2, 2),
new_region_route(3, 3),
new_region_route(4, 4),
])
.unwrap(),
);
assert!(table_metadata_manager
.update_table_route(
table_id,
Expand Down
54 changes: 38 additions & 16 deletions src/common/meta/src/key/table_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ use std::collections::HashMap;
use std::fmt::Display;

use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use snafu::{ensure, ResultExt};
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;

use super::{DeserializedValueWithBytes, TableMetaValue};
use crate::error::{Result, SerdeJsonSnafu};
use crate::error::{Result, SerdeJsonSnafu, UnexpectedLogicalRouteTableSnafu};
use crate::key::{to_removed_key, RegionDistribution, TableMetaKey, TABLE_ROUTE_PREFIX};
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse};
use crate::kv_backend::KvBackendRef;
Expand Down Expand Up @@ -62,29 +62,48 @@ impl TableRouteValue {
}

/// Returns a new version [TableRouteValue] with `region_routes`.
pub fn update(&self, region_routes: Vec<RegionRoute>) -> Self {
pub fn update(&self, region_routes: Vec<RegionRoute>) -> Result<Self> {
ensure!(
self.is_physical(),
UnexpectedLogicalRouteTableSnafu {
err_msg: "{self:?} is a non-physical TableRouteValue.",
}
);
let version = self.physical_table_route().version;
Self::Physical(PhysicalTableRouteValue {
Ok(Self::Physical(PhysicalTableRouteValue {
region_routes,
version: version + 1,
})
}))
}

/// Returns the version.
///
/// For test purpose.
#[cfg(any(test, feature = "testing"))]
pub fn version(&self) -> u64 {
self.physical_table_route().version
pub fn version(&self) -> Result<u64> {
ensure!(
self.is_physical(),
UnexpectedLogicalRouteTableSnafu {
err_msg: "{self:?} is a non-physical TableRouteValue.",
}
);
Ok(self.physical_table_route().version)
}

/// Returns the corresponding [RegionRoute].
pub fn region_route(&self, region_id: RegionId) -> Option<RegionRoute> {
self.physical_table_route()
pub fn region_route(&self, region_id: RegionId) -> Result<Option<RegionRoute>> {
ensure!(
self.is_physical(),
UnexpectedLogicalRouteTableSnafu {
err_msg: "{self:?} is a non-physical TableRouteValue.",
}
);
Ok(self
.physical_table_route()
.region_routes
.iter()
.find(|route| route.region.id == region_id)
.cloned()
.cloned())
}

/// Returns true if it's [TableRouteValue::Physical].
Expand All @@ -93,11 +112,14 @@ impl TableRouteValue {
}

/// Gets the [RegionRoute]s of this [TableRouteValue::Physical].
///
/// # Panics
/// The route type is not the [TableRouteValue::Physical].
pub fn region_routes(&self) -> &Vec<RegionRoute> {
&self.physical_table_route().region_routes
pub fn region_routes(&self) -> Result<&Vec<RegionRoute>> {
ensure!(
self.is_physical(),
UnexpectedLogicalRouteTableSnafu {
err_msg: "{self:?} is a non-physical TableRouteValue.",
}
);
Ok(&self.physical_table_route().region_routes)
}

fn physical_table_route(&self) -> &PhysicalTableRouteValue {
Expand Down Expand Up @@ -354,7 +376,7 @@ impl TableRouteManager {
) -> Result<Option<RegionDistribution>> {
self.get(table_id)
.await?
.map(|table_route| region_distribution(table_route.region_routes()))
.map(|table_route| region_distribution(table_route.region_routes()?))
.transpose()
}
}
Expand Down
10 changes: 9 additions & 1 deletion src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,13 @@ pub enum Error {

#[snafu(display("Weight array is not set"))]
NotSetWeightArray { location: Location },

#[snafu(display("Unexpected table route type: {}", err_msg))]
UnexpectedLogicalRouteTable {
location: Location,
err_msg: String,
source: common_meta::error::Error,
},
}

impl Error {
Expand Down Expand Up @@ -717,7 +724,8 @@ impl ErrorExt for Error {
| Error::TableMetadataManager { source, .. }
| Error::KvBackend { source, .. }
| Error::UpdateTableRoute { source, .. }
| Error::GetFullTableInfo { source, .. } => source.status_code(),
| Error::GetFullTableInfo { source, .. }
| Error::UnexpectedLogicalRouteTable { source, .. } => source.status_code(),

Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } => {
source.status_code()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ mod tests {

let should_downgraded = table_route_value
.region_routes()
.unwrap()
.iter()
.find(|route| route.region.id.region_number() == failed_region.region_number)
.unwrap();
Expand Down
14 changes: 10 additions & 4 deletions src/meta-srv/src/procedure/region_failover/update_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,12 @@ impl UpdateRegionMetadata {
.context(error::TableMetadataManagerSnafu)?
.context(TableRouteNotFoundSnafu { table_id })?;

let mut new_region_routes = table_route_value.region_routes().clone();
let mut new_region_routes = table_route_value
.region_routes()
.context(error::UnexpectedLogicalRouteTableSnafu {
err_msg: "{self:?} is a non-physical TableRouteValue.",
})?
.clone();

for region_route in new_region_routes.iter_mut() {
if region_route.region.id.region_number() == failed_region.region_number {
Expand Down Expand Up @@ -234,6 +239,7 @@ mod tests {
.unwrap()
.into_inner()
.region_routes()
.unwrap()
.clone()
}

Expand Down Expand Up @@ -396,8 +402,8 @@ mod tests {
.unwrap()
.into_inner();

let peers = &extract_all_peers(table_route_value.region_routes());
let actual = table_route_value.region_routes();
let peers = &extract_all_peers(table_route_value.region_routes().unwrap());
let actual = table_route_value.region_routes().unwrap();
let expected = &vec![
new_region_route(1, peers, 2),
new_region_route(2, peers, 3),
Expand All @@ -416,7 +422,7 @@ mod tests {
.unwrap()
.into_inner();

let map = region_distribution(table_route_value.region_routes()).unwrap();
let map = region_distribution(table_route_value.region_routes().unwrap()).unwrap();
assert_eq!(map.len(), 2);
assert_eq!(map.get(&2), Some(&vec![1, 3]));
assert_eq!(map.get(&3), Some(&vec![2, 4]));
Expand Down
2 changes: 1 addition & 1 deletion src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ mod tests {
.unwrap()
.version();
// Should be unchanged.
assert_eq!(table_routes_version, 0);
assert_eq!(table_routes_version.unwrap(), 0);
}

#[tokio::test]
Expand Down
3 changes: 3 additions & 0 deletions src/meta-srv/src/procedure/region_migration/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ impl RegionMigrationManager {
// Safety: checked before.
let region_route = table_route
.region_route(region_id)
.context(error::UnexpectedLogicalRouteTableSnafu {
err_msg: "{self:?} is a non-physical TableRouteValue.",
})?
.context(error::RegionRouteNotFoundSnafu { region_id })?;

if self.has_migrated(&region_route, &task)? {
Expand Down
Loading

0 comments on commit 4460af8

Please sign in to comment.