Skip to content

Commit

Permalink
feat: metric engine support alter
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Jan 4, 2024
1 parent ec43b91 commit a25f839
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 12 deletions.
55 changes: 46 additions & 9 deletions src/common/meta/src/ddl/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use crate::error::{
};
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::DeserializedValueWithBytes;
use crate::metrics;
use crate::rpc::ddl::AlterTableTask;
Expand All @@ -65,6 +66,7 @@ impl AlterTableProcedure {
cluster_id: u64,
task: AlterTableTask,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
physical_table_name: Option<TableName>,
context: DdlContext,
) -> Result<Self> {
let alter_kind = task
Expand All @@ -84,7 +86,13 @@ impl AlterTableProcedure {

Ok(Self {
context,
data: AlterTableData::new(task, table_info_value, cluster_id, next_column_id),
data: AlterTableData::new(
task,
table_info_value,
physical_table_name,
cluster_id,
next_column_id,
),
kind,
})
}
Expand Down Expand Up @@ -182,23 +190,33 @@ impl AlterTableProcedure {

pub async fn submit_alter_region_requests(&mut self) -> Result<Status> {
let table_id = self.data.table_id();
let table_route_manager = self.context.table_metadata_manager.table_route_manager();

let table_route = self
.context
.table_metadata_manager
.table_route_manager()
let table_route = table_route_manager
.get(table_id)
.await?
.context(TableRouteNotFoundSnafu { table_id })?
.into_inner();
let region_routes = table_route.region_routes()?;
let region_routes = match table_route {
TableRouteValue::Physical(x) => x.region_routes,
TableRouteValue::Logical(x) => {
let physical_table_id = x.physical_table_id();
let physical_table_route = table_route_manager
.get(physical_table_id)
.await?
.context(TableRouteNotFoundSnafu {
table_id: physical_table_id,
})?;
physical_table_route.region_routes()?.clone()
}
};

let leaders = find_leaders(region_routes);
let leaders = find_leaders(&region_routes);
let mut alter_region_tasks = Vec::with_capacity(leaders.len());

for datanode in leaders {
let requester = self.context.datanode_manager.datanode(&datanode).await;
let regions = find_leader_regions(region_routes, &datanode);
let regions = find_leader_regions(&region_routes, &datanode);

for region in regions {
let region_id = RegionId::new(table_id, region);
Expand Down Expand Up @@ -335,13 +353,24 @@ impl AlterTableProcedure {
}

fn lock_key_inner(&self) -> Vec<String> {
let mut lock_key = vec![];

if let Some(physical_table_name) = self.data.physical_table_name() {
let physical_table_key = common_catalog::format_full_table_name(
&physical_table_name.catalog_name,
&physical_table_name.schema_name,
&physical_table_name.table_name,
);
lock_key.push(physical_table_key);
}

let table_ref = self.data.table_ref();
let table_key = common_catalog::format_full_table_name(
table_ref.catalog,
table_ref.schema,
table_ref.table,
);
let mut lock_key = vec![table_key];
lock_key.push(table_key);

if let Ok(Kind::RenameTable(RenameTable { new_table_name })) = self.alter_kind() {
lock_key.push(common_catalog::format_full_table_name(
Expand Down Expand Up @@ -415,6 +444,8 @@ pub struct AlterTableData {
task: AlterTableTask,
/// Table info value before alteration.
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
/// Physical table name, if the table to alter is a logical table.
physical_table_name: Option<TableName>,
cluster_id: u64,
/// Next column id of the table if the task adds columns to the table.
next_column_id: Option<ColumnId>,
Expand All @@ -424,13 +455,15 @@ impl AlterTableData {
pub fn new(
task: AlterTableTask,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
physical_table_name: Option<TableName>,
cluster_id: u64,
next_column_id: Option<ColumnId>,
) -> Self {
Self {
state: AlterTableState::Prepare,
task,
table_info_value,
physical_table_name,
cluster_id,
next_column_id,
}
Expand All @@ -447,6 +480,10 @@ impl AlterTableData {
fn table_info(&self) -> &RawTableInfo {
&self.table_info_value.table_info
}

fn physical_table_name(&self) -> Option<&TableName> {
self.physical_table_name.as_ref()
}
}

/// Creates region proto alter kind from `table_info` and `alter_kind`.
Expand Down
48 changes: 45 additions & 3 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ use crate::rpc::ddl::{
TruncateTableTask,
};
use crate::rpc::router::RegionRoute;
use crate::table_name::TableName;

pub type DdlManagerRef = Arc<DdlManager>;

/// The [DdlManager] provides the ability to execute Ddl.
Expand Down Expand Up @@ -160,11 +162,17 @@ impl DdlManager {
cluster_id: u64,
alter_table_task: AlterTableTask,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
physical_table_name: Option<TableName>,
) -> Result<ProcedureId> {
let context = self.create_context();

let procedure =
AlterTableProcedure::new(cluster_id, alter_table_task, table_info_value, context)?;
let procedure = AlterTableProcedure::new(
cluster_id,
alter_table_task,
table_info_value,
physical_table_name,
context,
)?;

let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));

Expand Down Expand Up @@ -327,8 +335,42 @@ async fn handle_alter_table_task(
table_name: table_ref.to_string(),
})?;

let table_route = ddl_manager
.table_metadata_manager
.table_route_manager()
.get(table_id)
.await?
.context(error::TableRouteNotFoundSnafu { table_id })?
.into_inner();

let physical_table_name = match table_route {
TableRouteValue::Physical(_) => None,
TableRouteValue::Logical(x) => {
let physical_table_id = x.physical_table_id();
let physical_table_info = &ddl_manager
.table_metadata_manager()
.table_info_manager()
.get(physical_table_id)
.await?
.with_context(|| error::TableInfoNotFoundSnafu {
table_name: table_ref.to_string(),
})?
.table_info;
Some(TableName {
catalog_name: physical_table_info.catalog_name.clone(),
schema_name: physical_table_info.schema_name.clone(),
table_name: physical_table_info.name.clone(),
})
}
};

let id = ddl_manager
.submit_alter_table_task(cluster_id, alter_table_task, table_info_value)
.submit_alter_table_task(
cluster_id,
alter_table_task,
table_info_value,
physical_table_name,
)
.await?;

info!("Table: {table_id} is altered via procedure_id {id:?}");
Expand Down
2 changes: 2 additions & 0 deletions src/meta-srv/src/procedure/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ fn test_create_alter_region_request() {
1,
alter_table_task,
DeserializedValueWithBytes::from_inner(TableInfoValue::new(test_data::new_table_info())),
None,
test_data::new_ddl_context(Arc::new(DatanodeClients::default())),
)
.unwrap();
Expand Down Expand Up @@ -383,6 +384,7 @@ async fn test_submit_alter_region_requests() {
1,
alter_table_task,
DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info)),
None,
context,
)
.unwrap();
Expand Down

0 comments on commit a25f839

Please sign in to comment.