Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(TableRouteValue): add panic notes and type checks #3031

Merged
merged 14 commits into from
Dec 30, 2023
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl AlterTableProcedure {
.await?
.context(TableRouteNotFoundSnafu { table_id })?
.into_inner();
let region_routes = table_route.region_routes();
let region_routes = table_route.region_routes()?;

let leaders = find_leaders(region_routes);
let mut alter_region_tasks = Vec::with_capacity(leaders.len());
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl CreateTableProcedure {
.context(TableRouteNotFoundSnafu {
table_id: physical_table_id,
})?;
let region_routes = physical_table_route.region_routes();
let region_routes = physical_table_route.region_routes()?;

let request_builder = self.new_region_request_builder(Some(physical_table_id))?;

Expand Down
6 changes: 3 additions & 3 deletions src/common/meta/src/ddl/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl DropTableProcedure {

/// Register dropping regions if doesn't exist.
fn register_dropping_regions(&mut self) -> Result<()> {
let region_routes = self.data.region_routes();
let region_routes = self.data.region_routes()?;

let dropping_regions = operating_leader_regions(region_routes);

Expand Down Expand Up @@ -190,7 +190,7 @@ impl DropTableProcedure {
pub async fn on_datanode_drop_regions(&self) -> Result<Status> {
let table_id = self.data.table_id();

let region_routes = &self.data.region_routes();
let region_routes = &self.data.region_routes()?;
let leaders = find_leaders(region_routes);
let mut drop_region_tasks = Vec::with_capacity(leaders.len());

Expand Down Expand Up @@ -306,7 +306,7 @@ impl DropTableData {
self.task.table_ref()
}

fn region_routes(&self) -> &Vec<RegionRoute> {
fn region_routes(&self) -> Result<&Vec<RegionRoute>> {
self.table_route_value.region_routes()
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ async fn handle_truncate_table_task(
let table_route_value =
table_route_value.context(error::TableRouteNotFoundSnafu { table_id })?;

let table_route = table_route_value.into_inner().region_routes().clone();
let table_route = table_route_value.into_inner().region_routes()?.clone();

let id = ddl_manager
.submit_truncate_table_task(
Expand Down
6 changes: 5 additions & 1 deletion src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,9 @@ pub enum Error {

#[snafu(display("The topic pool is empty"))]
EmptyTopicPool { location: Location },

#[snafu(display("Unexpected table route type: {}", err_msg))]
UnexpectedLogicalRouteTable { location: Location, err_msg: String },
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -369,7 +372,8 @@ impl ErrorExt for Error {
| BuildKafkaClient { .. }
| BuildKafkaCtrlClient { .. }
| CreateKafkaWalTopic { .. }
| EmptyTopicPool { .. } => StatusCode::Unexpected,
| EmptyTopicPool { .. }
| UnexpectedLogicalRouteTable { .. } => StatusCode::Unexpected,

SendMessage { .. }
| GetKvCache { .. }
Expand Down
42 changes: 25 additions & 17 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ impl TableMetadataManager {
.build_delete_txn(table_id, table_info_value)?;

// Deletes datanode table key value pairs.
let distribution = region_distribution(table_route_value.region_routes())?;
let distribution = region_distribution(table_route_value.region_routes()?)?;
let delete_datanode_txn = self
.datanode_table_manager()
.build_delete_txn(table_id, distribution)?;
Expand Down Expand Up @@ -608,7 +608,7 @@ impl TableMetadataManager {
) -> Result<()> {
// Updates the datanode table key value pairs.
let current_region_distribution =
region_distribution(current_table_route_value.region_routes())?;
region_distribution(current_table_route_value.region_routes()?)?;
let new_region_distribution = region_distribution(&new_region_routes)?;

let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
Expand All @@ -621,7 +621,7 @@ impl TableMetadataManager {
)?;

// Updates the table_route.
let new_table_route_value = current_table_route_value.update(new_region_routes);
let new_table_route_value = current_table_route_value.update(new_region_routes)?;

let (update_table_route_txn, on_update_table_route_failure) = self
.table_route_manager()
Expand Down Expand Up @@ -656,7 +656,7 @@ impl TableMetadataManager {
where
F: Fn(&RegionRoute) -> Option<Option<RegionStatus>>,
{
let mut new_region_routes = current_table_route_value.region_routes().clone();
let mut new_region_routes = current_table_route_value.region_routes()?.clone();

let mut updated = 0;
for route in &mut new_region_routes {
Expand All @@ -673,7 +673,7 @@ impl TableMetadataManager {
}

// Updates the table_route.
let new_table_route_value = current_table_route_value.update(new_region_routes);
let new_table_route_value = current_table_route_value.update(new_region_routes)?;

let (update_table_route_txn, on_update_table_route_failure) = self
.table_route_manager()
Expand Down Expand Up @@ -897,7 +897,11 @@ mod tests {
table_info
);
assert_eq!(
remote_table_route.unwrap().into_inner().region_routes(),
remote_table_route
.unwrap()
.into_inner()
.region_routes()
.unwrap(),
region_routes
);
}
Expand Down Expand Up @@ -978,7 +982,7 @@ mod tests {
.unwrap()
.unwrap()
.into_inner();
assert_eq!(removed_table_route.region_routes(), region_routes);
assert_eq!(removed_table_route.region_routes().unwrap(), region_routes);
}

#[tokio::test]
Expand Down Expand Up @@ -1173,11 +1177,11 @@ mod tests {
.unwrap();

assert_eq!(
updated_route_value.region_routes()[0].leader_status,
updated_route_value.region_routes().unwrap()[0].leader_status,
Some(RegionStatus::Downgraded)
);
assert_eq!(
updated_route_value.region_routes()[1].leader_status,
updated_route_value.region_routes().unwrap()[1].leader_status,
Some(RegionStatus::Downgraded)
);
}
Expand Down Expand Up @@ -1271,7 +1275,8 @@ mod tests {
let current_table_route_value = DeserializedValueWithBytes::from_inner(
current_table_route_value
.inner
.update(new_region_routes.clone()),
.update(new_region_routes.clone())
.unwrap(),
);
let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)];
// it should be ok.
Expand All @@ -1295,13 +1300,16 @@ mod tests {

// if the current_table_route_value is wrong, it should return an error.
// The ABA problem.
let wrong_table_route_value =
DeserializedValueWithBytes::from_inner(current_table_route_value.update(vec![
new_region_route(1, 1),
new_region_route(2, 2),
new_region_route(3, 3),
new_region_route(4, 4),
]));
let wrong_table_route_value = DeserializedValueWithBytes::from_inner(
current_table_route_value
.update(vec![
new_region_route(1, 1),
new_region_route(2, 2),
new_region_route(3, 3),
new_region_route(4, 4),
])
.unwrap(),
);
assert!(table_metadata_manager
.update_table_route(
table_id,
Expand Down
54 changes: 38 additions & 16 deletions src/common/meta/src/key/table_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ use std::collections::HashMap;
use std::fmt::Display;

use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use snafu::{ensure, ResultExt};
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;

use super::{DeserializedValueWithBytes, TableMetaValue};
use crate::error::{Result, SerdeJsonSnafu};
use crate::error::{Result, SerdeJsonSnafu, UnexpectedLogicalRouteTableSnafu};
use crate::key::{to_removed_key, RegionDistribution, TableMetaKey, TABLE_ROUTE_PREFIX};
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse};
use crate::kv_backend::KvBackendRef;
Expand Down Expand Up @@ -62,29 +62,48 @@ impl TableRouteValue {
}

/// Returns a new version [TableRouteValue] with `region_routes`.
pub fn update(&self, region_routes: Vec<RegionRoute>) -> Self {
pub fn update(&self, region_routes: Vec<RegionRoute>) -> Result<Self> {
ensure!(
self.is_physical(),
UnexpectedLogicalRouteTableSnafu {
err_msg: "{self:?} is a non-physical TableRouteValue.",
}
);
let version = self.physical_table_route().version;
Self::Physical(PhysicalTableRouteValue {
Ok(Self::Physical(PhysicalTableRouteValue {
region_routes,
version: version + 1,
})
}))
}

/// Returns the version.
///
/// For test purpose.
#[cfg(any(test, feature = "testing"))]
pub fn version(&self) -> u64 {
self.physical_table_route().version
pub fn version(&self) -> Result<u64> {
ensure!(
self.is_physical(),
UnexpectedLogicalRouteTableSnafu {
err_msg: "{self:?} is a non-physical TableRouteValue.",
}
);
Ok(self.physical_table_route().version)
}

/// Returns the corresponding [RegionRoute].
pub fn region_route(&self, region_id: RegionId) -> Option<RegionRoute> {
self.physical_table_route()
pub fn region_route(&self, region_id: RegionId) -> Result<Option<RegionRoute>> {
ensure!(
self.is_physical(),
UnexpectedLogicalRouteTableSnafu {
err_msg: "{self:?} is a non-physical TableRouteValue.",
}
);
Ok(self
.physical_table_route()
.region_routes
.iter()
.find(|route| route.region.id == region_id)
.cloned()
.cloned())
}

/// Returns true if it's [TableRouteValue::Physical].
Expand All @@ -93,11 +112,14 @@ impl TableRouteValue {
}

/// Gets the [RegionRoute]s of this [TableRouteValue::Physical].
///
/// # Panics
/// The route type is not the [TableRouteValue::Physical].
pub fn region_routes(&self) -> &Vec<RegionRoute> {
&self.physical_table_route().region_routes
pub fn region_routes(&self) -> Result<&Vec<RegionRoute>> {
ensure!(
self.is_physical(),
UnexpectedLogicalRouteTableSnafu {
err_msg: "{self:?} is a non-physical TableRouteValue.",
}
);
Ok(&self.physical_table_route().region_routes)
}

fn physical_table_route(&self) -> &PhysicalTableRouteValue {
Expand Down Expand Up @@ -354,7 +376,7 @@ impl TableRouteManager {
) -> Result<Option<RegionDistribution>> {
self.get(table_id)
.await?
.map(|table_route| region_distribution(table_route.region_routes()))
.map(|table_route| region_distribution(table_route.region_routes()?))
.transpose()
}
}
Expand Down
10 changes: 9 additions & 1 deletion src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,13 @@ pub enum Error {

#[snafu(display("Weight array is not set"))]
NotSetWeightArray { location: Location },

#[snafu(display("Unexpected table route type: {}", err_msg))]
UnexpectedLogicalRouteTable {
location: Location,
err_msg: String,
source: common_meta::error::Error,
},
}

impl Error {
Expand Down Expand Up @@ -713,7 +720,8 @@ impl ErrorExt for Error {
| Error::TableMetadataManager { source, .. }
| Error::KvBackend { source, .. }
| Error::UpdateTableRoute { source, .. }
| Error::GetFullTableInfo { source, .. } => source.status_code(),
| Error::GetFullTableInfo { source, .. }
| Error::UnexpectedLogicalRouteTable { source, .. } => source.status_code(),

Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } => {
source.status_code()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ mod tests {

let should_downgraded = table_route_value
.region_routes()
.unwrap()
.iter()
.find(|route| route.region.id.region_number() == failed_region.region_number)
.unwrap();
Expand Down
14 changes: 10 additions & 4 deletions src/meta-srv/src/procedure/region_failover/update_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,12 @@ impl UpdateRegionMetadata {
.context(error::TableMetadataManagerSnafu)?
.context(TableRouteNotFoundSnafu { table_id })?;

let mut new_region_routes = table_route_value.region_routes().clone();
let mut new_region_routes = table_route_value
.region_routes()
.context(error::UnexpectedLogicalRouteTableSnafu {
err_msg: "{self:?} is a non-physical TableRouteValue.",
})?
.clone();

for region_route in new_region_routes.iter_mut() {
if region_route.region.id.region_number() == failed_region.region_number {
Expand Down Expand Up @@ -234,6 +239,7 @@ mod tests {
.unwrap()
.into_inner()
.region_routes()
.unwrap()
.clone()
}

Expand Down Expand Up @@ -396,8 +402,8 @@ mod tests {
.unwrap()
.into_inner();

let peers = &extract_all_peers(table_route_value.region_routes());
let actual = table_route_value.region_routes();
let peers = &extract_all_peers(table_route_value.region_routes().unwrap());
let actual = table_route_value.region_routes().unwrap();
let expected = &vec![
new_region_route(1, peers, 2),
new_region_route(2, peers, 3),
Expand All @@ -416,7 +422,7 @@ mod tests {
.unwrap()
.into_inner();

let map = region_distribution(table_route_value.region_routes()).unwrap();
let map = region_distribution(table_route_value.region_routes().unwrap()).unwrap();
assert_eq!(map.len(), 2);
assert_eq!(map.get(&2), Some(&vec![1, 3]));
assert_eq!(map.get(&3), Some(&vec![2, 4]));
Expand Down
2 changes: 1 addition & 1 deletion src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ mod tests {
.unwrap()
.version();
// Should be unchanged.
assert_eq!(table_routes_version, 0);
assert_eq!(table_routes_version.unwrap(), 0);
}

#[tokio::test]
Expand Down
3 changes: 3 additions & 0 deletions src/meta-srv/src/procedure/region_migration/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ impl RegionMigrationManager {
// Safety: checked before.
let region_route = table_route
.region_route(region_id)
.context(error::UnexpectedLogicalRouteTableSnafu {
err_msg: "{self:?} is a non-physical TableRouteValue.",
})?
.context(error::RegionRouteNotFoundSnafu { region_id })?;

if self.has_migrated(&region_route, &task)? {
Expand Down
Loading
Loading