Skip to content

Commit

Permalink
chore: by comment
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Jan 5, 2024
1 parent a25f839 commit f0b7587
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 54 deletions.
35 changes: 9 additions & 26 deletions src/common/meta/src/ddl/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,9 @@ use table::requests::AlterKind;
use crate::cache_invalidator::Context;
use crate::ddl::utils::handle_operate_region_error;
use crate::ddl::DdlContext;
use crate::error::{
self, ConvertAlterTableRequestSnafu, InvalidProtoMsgSnafu, Result, TableRouteNotFoundSnafu,
};
use crate::error::{self, ConvertAlterTableRequestSnafu, InvalidProtoMsgSnafu, Result};
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::DeserializedValueWithBytes;
use crate::metrics;
use crate::rpc::ddl::AlterTableTask;
Expand Down Expand Up @@ -190,33 +187,19 @@ impl AlterTableProcedure {

pub async fn submit_alter_region_requests(&mut self) -> Result<Status> {
let table_id = self.data.table_id();
let table_route_manager = self.context.table_metadata_manager.table_route_manager();

let table_route = table_route_manager
.get(table_id)
.await?
.context(TableRouteNotFoundSnafu { table_id })?
.into_inner();
let region_routes = match table_route {
TableRouteValue::Physical(x) => x.region_routes,
TableRouteValue::Logical(x) => {
let physical_table_id = x.physical_table_id();
let physical_table_route = table_route_manager
.get(physical_table_id)
.await?
.context(TableRouteNotFoundSnafu {
table_id: physical_table_id,
})?;
physical_table_route.region_routes()?.clone()
}
};
let (_, physical_table_route) = self
.context
.table_metadata_manager
.table_route_manager()
.get_physical_table_route(table_id)
.await?;

let leaders = find_leaders(&region_routes);
let leaders = find_leaders(&physical_table_route.region_routes);
let mut alter_region_tasks = Vec::with_capacity(leaders.len());

for datanode in leaders {
let requester = self.context.datanode_manager.datanode(&datanode).await;
let regions = find_leader_regions(&region_routes, &datanode);
let regions = find_leader_regions(&physical_table_route.region_routes, &datanode);

for region in regions {
let region_id = RegionId::new(table_id, region);
Expand Down
48 changes: 22 additions & 26 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,33 +335,29 @@ async fn handle_alter_table_task(
table_name: table_ref.to_string(),
})?;

let table_route = ddl_manager
.table_metadata_manager
let physical_table_id = ddl_manager
.table_metadata_manager()
.table_route_manager()
.get(table_id)
.await?
.context(error::TableRouteNotFoundSnafu { table_id })?
.into_inner();

let physical_table_name = match table_route {
TableRouteValue::Physical(_) => None,
TableRouteValue::Logical(x) => {
let physical_table_id = x.physical_table_id();
let physical_table_info = &ddl_manager
.table_metadata_manager()
.table_info_manager()
.get(physical_table_id)
.await?
.with_context(|| error::TableInfoNotFoundSnafu {
table_name: table_ref.to_string(),
})?
.table_info;
Some(TableName {
catalog_name: physical_table_info.catalog_name.clone(),
schema_name: physical_table_info.schema_name.clone(),
table_name: physical_table_info.name.clone(),
})
}
.get_physical_table_id(table_id)
.await?;

let physical_table_name = if physical_table_id == table_id {
None
} else {
let physical_table_info = &ddl_manager
.table_metadata_manager()
.table_info_manager()
.get(physical_table_id)
.await?
.with_context(|| error::TableInfoNotFoundSnafu {
table_name: table_ref.to_string(),
})?
.table_info;
Some(TableName {
catalog_name: physical_table_info.catalog_name.clone(),
schema_name: physical_table_info.schema_name.clone(),
table_name: physical_table_info.name.clone(),
})
};

let id = ddl_manager
Expand Down
54 changes: 52 additions & 2 deletions src/common/meta/src/key/table_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ use std::collections::HashMap;
use std::fmt::Display;

use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;

use super::{DeserializedValueWithBytes, TableMetaValue};
use crate::error::{Result, SerdeJsonSnafu, UnexpectedLogicalRouteTableSnafu};
use crate::error::{
Result, SerdeJsonSnafu, TableRouteNotFoundSnafu, UnexpectedLogicalRouteTableSnafu,
};
use crate::key::{to_removed_key, RegionDistribution, TableMetaKey, TABLE_ROUTE_PREFIX};
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse};
use crate::kv_backend::KvBackendRef;
Expand Down Expand Up @@ -332,6 +334,54 @@ impl TableRouteManager {
.transpose()
}

pub async fn get_physical_table_id(
&self,
logical_or_physical_table_id: TableId,
) -> Result<TableId> {
let table_route = self
.get(logical_or_physical_table_id)
.await?
.context(TableRouteNotFoundSnafu {
table_id: logical_or_physical_table_id,
})?
.into_inner();

match table_route {
TableRouteValue::Physical(_) => Ok(logical_or_physical_table_id),
TableRouteValue::Logical(x) => Ok(x.physical_table_id()),
}
}

pub async fn get_physical_table_route(
&self,
logical_or_physical_table_id: TableId,
) -> Result<(TableId, PhysicalTableRouteValue)> {
let table_route = self
.get(logical_or_physical_table_id)
.await?
.context(TableRouteNotFoundSnafu {
table_id: logical_or_physical_table_id,
})?
.into_inner();

match table_route {
TableRouteValue::Physical(x) => Ok((logical_or_physical_table_id, x)),
TableRouteValue::Logical(x) => {
let physical_table_id = x.physical_table_id();
let physical_table_route =
self.get(physical_table_id)
.await?
.context(TableRouteNotFoundSnafu {
table_id: physical_table_id,
})?;
Ok((
physical_table_id,
physical_table_route.physical_table_route().clone(),
))
}
}
}

/// It may return a subset of the `table_ids`.
pub async fn batch_get(
&self,
Expand Down

0 comments on commit f0b7587

Please sign in to comment.