Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: MetricsEngine table route (part 1) #2952

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/common/meta/src/ddl/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use crate::error::{
};
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::DeserializedValueWithBytes;
use crate::metrics;
use crate::rpc::ddl::AlterTableTask;
Expand Down Expand Up @@ -185,7 +184,7 @@ impl AlterTableProcedure {
let table_id = self.data.table_id();
let table_ref = self.data.table_ref();

let TableRouteValue { region_routes, .. } = self
let table_route = self
.context
.table_metadata_manager
.table_route_manager()
Expand All @@ -195,13 +194,14 @@ impl AlterTableProcedure {
table_name: table_ref.to_string(),
})?
.into_inner();
let region_routes = table_route.region_routes();

let leaders = find_leaders(&region_routes);
let leaders = find_leaders(region_routes);
let mut alter_region_tasks = Vec::with_capacity(leaders.len());

for datanode in leaders {
let requester = self.context.datanode_manager.datanode(&datanode).await;
let regions = find_leader_regions(&region_routes, &datanode);
let regions = find_leader_regions(region_routes, &datanode);

for region in regions {
let region_id = RegionId::new(table_id, region);
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ impl DropTableData {
}

fn region_routes(&self) -> &Vec<RegionRoute> {
&self.table_route_value.region_routes
self.table_route_value.region_routes()
}

fn table_info(&self) -> &RawTableInfo {
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ async fn handle_truncate_table_task(
table_name: table_ref.to_string(),
})?;

let table_route = table_route_value.into_inner().region_routes;
let table_route = table_route_value.into_inner().region_routes().clone();

let id = ddl_manager
.submit_truncate_table_task(
Expand Down
18 changes: 9 additions & 9 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ impl TableMetadataManager {
.build_delete_txn(table_id, table_info_value)?;

// Deletes datanode table key value pairs.
let distribution = region_distribution(&table_route_value.region_routes)?;
let distribution = region_distribution(table_route_value.region_routes())?;
let delete_datanode_txn = self
.datanode_table_manager()
.build_delete_txn(table_id, distribution)?;
Expand Down Expand Up @@ -593,7 +593,7 @@ impl TableMetadataManager {
) -> Result<()> {
// Updates the datanode table key value pairs.
let current_region_distribution =
region_distribution(&current_table_route_value.region_routes)?;
region_distribution(current_table_route_value.region_routes())?;
let new_region_distribution = region_distribution(&new_region_routes)?;

let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
Expand Down Expand Up @@ -641,7 +641,7 @@ impl TableMetadataManager {
where
F: Fn(&RegionRoute) -> Option<Option<RegionStatus>>,
{
let mut new_region_routes = current_table_route_value.region_routes.clone();
let mut new_region_routes = current_table_route_value.region_routes().clone();

let mut updated = 0;
for route in &mut new_region_routes {
Expand Down Expand Up @@ -826,7 +826,7 @@ mod tests {
let mem_kv = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let region_route = new_test_region_route();
let region_routes = vec![region_route.clone()];
let region_routes = &vec![region_route.clone()];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
// creates metadata.
Expand Down Expand Up @@ -869,7 +869,7 @@ mod tests {
table_info
);
assert_eq!(
remote_table_route.unwrap().into_inner().region_routes,
remote_table_route.unwrap().into_inner().region_routes(),
region_routes
);
}
Expand All @@ -879,7 +879,7 @@ mod tests {
let mem_kv = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let region_route = new_test_region_route();
let region_routes = vec![region_route.clone()];
let region_routes = &vec![region_route.clone()];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_id = table_info.ident.table_id;
Expand Down Expand Up @@ -950,7 +950,7 @@ mod tests {
.unwrap()
.unwrap()
.into_inner();
assert_eq!(removed_table_route.region_routes, region_routes);
assert_eq!(removed_table_route.region_routes(), region_routes);
}

#[tokio::test]
Expand Down Expand Up @@ -1144,11 +1144,11 @@ mod tests {
.unwrap();

assert_eq!(
updated_route_value.region_routes[0].leader_status,
updated_route_value.region_routes()[0].leader_status,
Some(RegionStatus::Downgraded)
);
assert_eq!(
updated_route_value.region_routes[1].leader_status,
updated_route_value.region_routes()[1].leader_status,
Some(RegionStatus::Downgraded)
);
}
Expand Down
48 changes: 38 additions & 10 deletions src/common/meta/src/key/table_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,42 +38,70 @@ impl TableRouteKey {
}

#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct TableRouteValue {
pub enum TableRouteValue {
Physical(PhysicalTableRouteValue),
Logical(LogicalTableRouteValue),
}

#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct PhysicalTableRouteValue {
pub region_routes: Vec<RegionRoute>,
version: u64,
}

#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct LogicalTableRouteValue {
// TODO(LFC): Add table route for MetricsEngine table.
}

impl TableRouteValue {
pub fn new(region_routes: Vec<RegionRoute>) -> Self {
Self {
Self::Physical(PhysicalTableRouteValue {
region_routes,
version: 0,
}
})
}

/// Returns a new version [TableRouteValue] with `region_routes`.
pub fn update(&self, region_routes: Vec<RegionRoute>) -> Self {
Self {
let version = self.physical_table_route().version;
Self::Physical(PhysicalTableRouteValue {
region_routes,
version: self.version + 1,
}
version: version + 1,
})
}

/// Returns the version.
///
/// For test purpose.
#[cfg(any(tets, feature = "testing"))]
#[cfg(any(test, feature = "testing"))]
pub fn version(&self) -> u64 {
self.version
self.physical_table_route().version
}

/// Returns the corresponding [RegionRoute].
pub fn region_route(&self, region_id: RegionId) -> Option<RegionRoute> {
self.region_routes
self.physical_table_route()
.region_routes
.iter()
.find(|route| route.region.id == region_id)
.cloned()
}

/// Gets the [RegionRoute]s of this table route.
///
/// # Panics
/// The engine type of this table is not "`Mito`".
MichaelScofield marked this conversation as resolved.
Show resolved Hide resolved
pub fn region_routes(&self) -> &Vec<RegionRoute> {
&self.physical_table_route().region_routes
}

fn physical_table_route(&self) -> &PhysicalTableRouteValue {
match self {
TableRouteValue::Physical(x) => x,
_ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),
}
}
}

impl TableMetaKey for TableRouteKey {
Expand Down Expand Up @@ -269,7 +297,7 @@ impl TableRouteManager {
) -> Result<Option<RegionDistribution>> {
self.get(table_id)
.await?
.map(|table_route| region_distribution(&table_route.into_inner().region_routes))
.map(|table_route| region_distribution(table_route.region_routes()))
.transpose()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ mod tests {
.unwrap();

let should_downgraded = table_route_value
.region_routes
.region_routes()
.iter()
.find(|route| route.region.id.region_number() == failed_region.region_number)
.unwrap();
Expand Down
11 changes: 6 additions & 5 deletions src/meta-srv/src/procedure/region_failover/update_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl UpdateRegionMetadata {
.context(error::TableMetadataManagerSnafu)?
.context(TableRouteNotFoundSnafu { table_id })?;

let mut new_region_routes = table_route_value.region_routes.clone();
let mut new_region_routes = table_route_value.region_routes().clone();

for region_route in new_region_routes.iter_mut() {
if region_route.region.id.region_number() == failed_region.region_number {
Expand Down Expand Up @@ -233,7 +233,8 @@ mod tests {
.unwrap()
.unwrap()
.into_inner()
.region_routes
.region_routes()
.clone()
}

// Original region routes:
Expand Down Expand Up @@ -395,8 +396,8 @@ mod tests {
.unwrap()
.into_inner();

let peers = &extract_all_peers(&table_route_value.region_routes);
let actual = &table_route_value.region_routes;
let peers = &extract_all_peers(table_route_value.region_routes());
let actual = table_route_value.region_routes();
let expected = &vec![
new_region_route(1, peers, 2),
new_region_route(2, peers, 3),
Expand All @@ -415,7 +416,7 @@ mod tests {
.unwrap()
.into_inner();

let map = region_distribution(&table_route_value.region_routes).unwrap();
let map = region_distribution(table_route_value.region_routes()).unwrap();
assert_eq!(map.len(), 2);
assert_eq!(map.get(&2), Some(&vec![1, 3]));
assert_eq!(map.get(&3), Some(&vec![2, 4]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl RegionMigrationStart {
let table_route = ctx.get_table_route_value().await?;

let region_route = table_route
.region_routes
.region_routes()
.iter()
.find(|route| route.region.id == region_id)
.cloned()
Expand Down
15 changes: 9 additions & 6 deletions src/meta-srv/src/procedure/region_migration/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,30 +377,33 @@ impl ProcedureMigrationTestSuite {
/// Verifies table metadata after region migration.
pub(crate) async fn verify_table_metadata(&self) {
let region_id = self.context.persistent_ctx.region_id;
let region_routes = self
let table_route = self
.env
.table_metadata_manager
.table_route_manager()
.get(region_id.table_id())
.await
.unwrap()
.unwrap()
.into_inner()
.region_routes;
.into_inner();
let region_routes = table_route.region_routes();

let expected_leader_id = self.context.persistent_ctx.to_peer.id;
let removed_follower_id = self.context.persistent_ctx.from_peer.id;

let region_route = region_routes
.into_iter()
.iter()
.find(|route| route.region.id == region_id)
.unwrap();

assert!(!region_route.is_leader_downgraded());
assert_eq!(region_route.leader_peer.unwrap().id, expected_leader_id);
assert_eq!(
region_route.leader_peer.as_ref().unwrap().id,
expected_leader_id
);
assert!(!region_route
.follower_peers
.into_iter()
.iter()
.any(|route| route.id == removed_follower_id))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ mod tests {

// It should remain unchanged.
assert_eq!(latest_table_route.version(), 0);
assert!(!latest_table_route.region_routes[0].is_leader_downgraded());
assert!(!latest_table_route.region_routes()[0].is_leader_downgraded());
assert!(ctx.volatile_ctx.table_route.is_none());
}

Expand Down Expand Up @@ -253,7 +253,7 @@ mod tests {
.unwrap()
.unwrap();

assert!(latest_table_route.region_routes[0].is_leader_downgraded());
assert!(latest_table_route.region_routes()[0].is_leader_downgraded());
assert!(ctx.volatile_ctx.table_route.is_none());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,14 @@ mod tests {

state.rollback_downgraded_region(&mut ctx).await.unwrap();

let region_routes = table_metadata_manager
let table_route = table_metadata_manager
.table_route_manager()
.get(table_id)
.await
.unwrap()
.unwrap()
.into_inner()
.region_routes;
assert_eq!(expected_region_routes, region_routes);
.into_inner();
assert_eq!(&expected_region_routes, table_route.region_routes());
}

#[tokio::test]
Expand Down Expand Up @@ -229,14 +228,13 @@ mod tests {

assert!(ctx.volatile_ctx.table_route.is_none());

let region_routes = table_metadata_manager
let table_route = table_metadata_manager
.table_route_manager()
.get(table_id)
.await
.unwrap()
.unwrap()
.into_inner()
.region_routes;
assert_eq!(expected_region_routes, region_routes);
.into_inner();
assert_eq!(&expected_region_routes, table_route.region_routes());
}
}
Loading
Loading