Skip to content

Commit

Permalink
Revert "feat: MetricsEngine table route (part 1) (GreptimeTeam#2952)"
Browse files Browse the repository at this point in the history
This reverts commit 6ac47e9.
  • Loading branch information
shuiyisong committed Dec 24, 2023
1 parent bb61e78 commit 48c4bc3
Show file tree
Hide file tree
Showing 16 changed files with 110 additions and 93 deletions.
8 changes: 4 additions & 4 deletions src/common/meta/src/ddl/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use crate::error::{
};
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::DeserializedValueWithBytes;
use crate::metrics;
use crate::rpc::ddl::AlterTableTask;
Expand Down Expand Up @@ -184,7 +185,7 @@ impl AlterTableProcedure {
let table_id = self.data.table_id();
let table_ref = self.data.table_ref();

let table_route = self
let TableRouteValue { region_routes, .. } = self
.context
.table_metadata_manager
.table_route_manager()
Expand All @@ -194,14 +195,13 @@ impl AlterTableProcedure {
table_name: table_ref.to_string(),
})?
.into_inner();
let region_routes = table_route.region_routes();

let leaders = find_leaders(region_routes);
let leaders = find_leaders(&region_routes);
let mut alter_region_tasks = Vec::with_capacity(leaders.len());

for datanode in leaders {
let requester = self.context.datanode_manager.datanode(&datanode).await;
let regions = find_leader_regions(region_routes, &datanode);
let regions = find_leader_regions(&region_routes, &datanode);

for region in regions {
let region_id = RegionId::new(table_id, region);
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ impl DropTableData {
}

fn region_routes(&self) -> &Vec<RegionRoute> {
self.table_route_value.region_routes()
&self.table_route_value.region_routes
}

fn table_info(&self) -> &RawTableInfo {
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ async fn handle_truncate_table_task(
table_name: table_ref.to_string(),
})?;

let table_route = table_route_value.into_inner().region_routes().clone();
let table_route = table_route_value.into_inner().region_routes;

let id = ddl_manager
.submit_truncate_table_task(
Expand Down
18 changes: 9 additions & 9 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ impl TableMetadataManager {
.build_delete_txn(table_id, table_info_value)?;

// Deletes datanode table key value pairs.
let distribution = region_distribution(table_route_value.region_routes())?;
let distribution = region_distribution(&table_route_value.region_routes)?;
let delete_datanode_txn = self
.datanode_table_manager()
.build_delete_txn(table_id, distribution)?;
Expand Down Expand Up @@ -603,7 +603,7 @@ impl TableMetadataManager {
) -> Result<()> {
// Updates the datanode table key value pairs.
let current_region_distribution =
region_distribution(current_table_route_value.region_routes())?;
region_distribution(&current_table_route_value.region_routes)?;
let new_region_distribution = region_distribution(&new_region_routes)?;

let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
Expand Down Expand Up @@ -651,7 +651,7 @@ impl TableMetadataManager {
where
F: Fn(&RegionRoute) -> Option<Option<RegionStatus>>,
{
let mut new_region_routes = current_table_route_value.region_routes().clone();
let mut new_region_routes = current_table_route_value.region_routes.clone();

let mut updated = 0;
for route in &mut new_region_routes {
Expand Down Expand Up @@ -836,7 +836,7 @@ mod tests {
let mem_kv = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let region_route = new_test_region_route();
let region_routes = &vec![region_route.clone()];
let region_routes = vec![region_route.clone()];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
// creates metadata.
Expand Down Expand Up @@ -879,7 +879,7 @@ mod tests {
table_info
);
assert_eq!(
remote_table_route.unwrap().into_inner().region_routes(),
remote_table_route.unwrap().into_inner().region_routes,
region_routes
);
}
Expand All @@ -889,7 +889,7 @@ mod tests {
let mem_kv = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let region_route = new_test_region_route();
let region_routes = &vec![region_route.clone()];
let region_routes = vec![region_route.clone()];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_id = table_info.ident.table_id;
Expand Down Expand Up @@ -960,7 +960,7 @@ mod tests {
.unwrap()
.unwrap()
.into_inner();
assert_eq!(removed_table_route.region_routes(), region_routes);
assert_eq!(removed_table_route.region_routes, region_routes);
}

#[tokio::test]
Expand Down Expand Up @@ -1154,11 +1154,11 @@ mod tests {
.unwrap();

assert_eq!(
updated_route_value.region_routes()[0].leader_status,
updated_route_value.region_routes[0].leader_status,
Some(RegionStatus::Downgraded)
);
assert_eq!(
updated_route_value.region_routes()[1].leader_status,
updated_route_value.region_routes[1].leader_status,
Some(RegionStatus::Downgraded)
);
}
Expand Down
48 changes: 10 additions & 38 deletions src/common/meta/src/key/table_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,70 +38,42 @@ impl TableRouteKey {
}

#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub enum TableRouteValue {
Physical(PhysicalTableRouteValue),
Logical(LogicalTableRouteValue),
}

#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct PhysicalTableRouteValue {
pub struct TableRouteValue {
pub region_routes: Vec<RegionRoute>,
version: u64,
}

#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct LogicalTableRouteValue {
// TODO(LFC): Add table route for MetricsEngine table.
}

impl TableRouteValue {
pub fn new(region_routes: Vec<RegionRoute>) -> Self {
Self::Physical(PhysicalTableRouteValue {
Self {
region_routes,
version: 0,
})
}
}

/// Returns a new version [TableRouteValue] with `region_routes`.
pub fn update(&self, region_routes: Vec<RegionRoute>) -> Self {
let version = self.physical_table_route().version;
Self::Physical(PhysicalTableRouteValue {
Self {
region_routes,
version: version + 1,
})
version: self.version + 1,
}
}

/// Returns the version.
///
/// For test purpose.
#[cfg(any(test, feature = "testing"))]
#[cfg(any(tets, feature = "testing"))]
pub fn version(&self) -> u64 {
self.physical_table_route().version
self.version
}

/// Returns the corresponding [RegionRoute].
pub fn region_route(&self, region_id: RegionId) -> Option<RegionRoute> {
self.physical_table_route()
.region_routes
self.region_routes
.iter()
.find(|route| route.region.id == region_id)
.cloned()
}

/// Gets the [RegionRoute]s of this [TableRouteValue::Physical].
///
/// # Panics
/// The route type is not the [TableRouteValue::Physical].
pub fn region_routes(&self) -> &Vec<RegionRoute> {
&self.physical_table_route().region_routes
}

fn physical_table_route(&self) -> &PhysicalTableRouteValue {
match self {
TableRouteValue::Physical(x) => x,
_ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),
}
}
}

impl TableMetaKey for TableRouteKey {
Expand Down Expand Up @@ -297,7 +269,7 @@ impl TableRouteManager {
) -> Result<Option<RegionDistribution>> {
self.get(table_id)
.await?
.map(|table_route| region_distribution(table_route.region_routes()))
.map(|table_route| region_distribution(&table_route.into_inner().region_routes))
.transpose()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ mod tests {
.unwrap();

let should_downgraded = table_route_value
.region_routes()
.region_routes
.iter()
.find(|route| route.region.id.region_number() == failed_region.region_number)
.unwrap();
Expand Down
11 changes: 5 additions & 6 deletions src/meta-srv/src/procedure/region_failover/update_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl UpdateRegionMetadata {
.context(error::TableMetadataManagerSnafu)?
.context(TableRouteNotFoundSnafu { table_id })?;

let mut new_region_routes = table_route_value.region_routes().clone();
let mut new_region_routes = table_route_value.region_routes.clone();

for region_route in new_region_routes.iter_mut() {
if region_route.region.id.region_number() == failed_region.region_number {
Expand Down Expand Up @@ -233,8 +233,7 @@ mod tests {
.unwrap()
.unwrap()
.into_inner()
.region_routes()
.clone()
.region_routes
}

// Original region routes:
Expand Down Expand Up @@ -396,8 +395,8 @@ mod tests {
.unwrap()
.into_inner();

let peers = &extract_all_peers(table_route_value.region_routes());
let actual = table_route_value.region_routes();
let peers = &extract_all_peers(&table_route_value.region_routes);
let actual = &table_route_value.region_routes;
let expected = &vec![
new_region_route(1, peers, 2),
new_region_route(2, peers, 3),
Expand All @@ -416,7 +415,7 @@ mod tests {
.unwrap()
.into_inner();

let map = region_distribution(table_route_value.region_routes()).unwrap();
let map = region_distribution(&table_route_value.region_routes).unwrap();
assert_eq!(map.len(), 2);
assert_eq!(map.get(&2), Some(&vec![1, 3]));
assert_eq!(map.get(&3), Some(&vec![2, 4]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl RegionMigrationStart {
let table_route = ctx.get_table_route_value().await?;

let region_route = table_route
.region_routes()
.region_routes
.iter()
.find(|route| route.region.id == region_id)
.cloned()
Expand Down
15 changes: 6 additions & 9 deletions src/meta-srv/src/procedure/region_migration/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,33 +377,30 @@ impl ProcedureMigrationTestSuite {
/// Verifies table metadata after region migration.
pub(crate) async fn verify_table_metadata(&self) {
let region_id = self.context.persistent_ctx.region_id;
let table_route = self
let region_routes = self
.env
.table_metadata_manager
.table_route_manager()
.get(region_id.table_id())
.await
.unwrap()
.unwrap()
.into_inner();
let region_routes = table_route.region_routes();
.into_inner()
.region_routes;

let expected_leader_id = self.context.persistent_ctx.to_peer.id;
let removed_follower_id = self.context.persistent_ctx.from_peer.id;

let region_route = region_routes
.iter()
.into_iter()
.find(|route| route.region.id == region_id)
.unwrap();

assert!(!region_route.is_leader_downgraded());
assert_eq!(
region_route.leader_peer.as_ref().unwrap().id,
expected_leader_id
);
assert_eq!(region_route.leader_peer.unwrap().id, expected_leader_id);
assert!(!region_route
.follower_peers
.iter()
.into_iter()
.any(|route| route.id == removed_follower_id))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ mod tests {

// It should remain unchanged.
assert_eq!(latest_table_route.version(), 0);
assert!(!latest_table_route.region_routes()[0].is_leader_downgraded());
assert!(!latest_table_route.region_routes[0].is_leader_downgraded());
assert!(ctx.volatile_ctx.table_route.is_none());
}

Expand Down Expand Up @@ -253,7 +253,7 @@ mod tests {
.unwrap()
.unwrap();

assert!(latest_table_route.region_routes()[0].is_leader_downgraded());
assert!(latest_table_route.region_routes[0].is_leader_downgraded());
assert!(ctx.volatile_ctx.table_route.is_none());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,15 @@ mod tests {

state.rollback_downgraded_region(&mut ctx).await.unwrap();

let table_route = table_metadata_manager
let region_routes = table_metadata_manager
.table_route_manager()
.get(table_id)
.await
.unwrap()
.unwrap()
.into_inner();
assert_eq!(&expected_region_routes, table_route.region_routes());
.into_inner()
.region_routes;
assert_eq!(expected_region_routes, region_routes);
}

#[tokio::test]
Expand Down Expand Up @@ -228,13 +229,14 @@ mod tests {

assert!(ctx.volatile_ctx.table_route.is_none());

let table_route = table_metadata_manager
let region_routes = table_metadata_manager
.table_route_manager()
.get(table_id)
.await
.unwrap()
.unwrap()
.into_inner();
assert_eq!(&expected_region_routes, table_route.region_routes());
.into_inner()
.region_routes;
assert_eq!(expected_region_routes, region_routes);
}
}
Loading

0 comments on commit 48c4bc3

Please sign in to comment.