From bd1a5dc265bca7d52c62694bb672edcf8be2c072 Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Fri, 5 Jan 2024 17:46:39 +0800 Subject: [PATCH] feat: metric engine support alter (#3098) * feat: metric engine support alter * chore: by comment * feat: get physical table route for frontend --- src/common/meta/src/ddl/alter_table.rs | 48 ++++++++++++++++------- src/common/meta/src/ddl_manager.rs | 44 +++++++++++++++++++-- src/common/meta/src/key/table_route.rs | 54 +++++++++++++++++++++++++- src/meta-srv/src/procedure/tests.rs | 2 + src/partition/src/manager.rs | 39 ++++--------------- 5 files changed, 136 insertions(+), 51 deletions(-) diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index e196ed70c6d6..b22f08ef9817 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -40,9 +40,7 @@ use table::requests::AlterKind; use crate::cache_invalidator::Context; use crate::ddl::utils::handle_operate_region_error; use crate::ddl::DdlContext; -use crate::error::{ - self, ConvertAlterTableRequestSnafu, InvalidProtoMsgSnafu, Result, TableRouteNotFoundSnafu, -}; +use crate::error::{self, ConvertAlterTableRequestSnafu, InvalidProtoMsgSnafu, Result}; use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::DeserializedValueWithBytes; @@ -65,6 +63,7 @@ impl AlterTableProcedure { cluster_id: u64, task: AlterTableTask, table_info_value: DeserializedValueWithBytes, + physical_table_name: Option, context: DdlContext, ) -> Result { let alter_kind = task @@ -84,7 +83,13 @@ impl AlterTableProcedure { Ok(Self { context, - data: AlterTableData::new(task, table_info_value, cluster_id, next_column_id), + data: AlterTableData::new( + task, + table_info_value, + physical_table_name, + cluster_id, + next_column_id, + ), kind, }) } @@ -182,23 +187,19 @@ impl AlterTableProcedure { pub async fn submit_alter_region_requests(&mut self) -> Result { let table_id = self.data.table_id(); - - let table_route = self + let (_, physical_table_route) = self .context .table_metadata_manager .table_route_manager() - .get(table_id) - .await? - .context(TableRouteNotFoundSnafu { table_id })? - .into_inner(); - let region_routes = table_route.region_routes()?; + .get_physical_table_route(table_id) + .await?; - let leaders = find_leaders(region_routes); + let leaders = find_leaders(&physical_table_route.region_routes); let mut alter_region_tasks = Vec::with_capacity(leaders.len()); for datanode in leaders { let requester = self.context.datanode_manager.datanode(&datanode).await; - let regions = find_leader_regions(region_routes, &datanode); + let regions = find_leader_regions(&physical_table_route.region_routes, &datanode); for region in regions { let region_id = RegionId::new(table_id, region); @@ -335,13 +336,24 @@ impl AlterTableProcedure { } fn lock_key_inner(&self) -> Vec { + let mut lock_key = vec![]; + + if let Some(physical_table_name) = self.data.physical_table_name() { + let physical_table_key = common_catalog::format_full_table_name( + &physical_table_name.catalog_name, + &physical_table_name.schema_name, + &physical_table_name.table_name, + ); + lock_key.push(physical_table_key); + } + let table_ref = self.data.table_ref(); let table_key = common_catalog::format_full_table_name( table_ref.catalog, table_ref.schema, table_ref.table, ); - let mut lock_key = vec![table_key]; + lock_key.push(table_key); if let Ok(Kind::RenameTable(RenameTable { new_table_name })) = self.alter_kind() { lock_key.push(common_catalog::format_full_table_name( @@ -415,6 +427,8 @@ pub struct AlterTableData { task: AlterTableTask, /// Table info value before alteration. table_info_value: DeserializedValueWithBytes, + /// Physical table name, if the table to alter is a logical table. + physical_table_name: Option, cluster_id: u64, /// Next column id of the table if the task adds columns to the table. next_column_id: Option, @@ -424,6 +438,7 @@ impl AlterTableData { pub fn new( task: AlterTableTask, table_info_value: DeserializedValueWithBytes, + physical_table_name: Option, cluster_id: u64, next_column_id: Option, ) -> Self { @@ -431,6 +446,7 @@ impl AlterTableData { state: AlterTableState::Prepare, task, table_info_value, + physical_table_name, cluster_id, next_column_id, } @@ -447,6 +463,10 @@ impl AlterTableData { fn table_info(&self) -> &RawTableInfo { &self.table_info_value.table_info } + + fn physical_table_name(&self) -> Option<&TableName> { + self.physical_table_name.as_ref() + } } /// Creates region proto alter kind from `table_info` and `alter_kind`. diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index fe5163b73098..28024b6602c4 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -46,6 +46,8 @@ use crate::rpc::ddl::{ TruncateTableTask, }; use crate::rpc::router::RegionRoute; +use crate::table_name::TableName; + pub type DdlManagerRef = Arc; /// The [DdlManager] provides the ability to execute Ddl. @@ -160,11 +162,17 @@ impl DdlManager { cluster_id: u64, alter_table_task: AlterTableTask, table_info_value: DeserializedValueWithBytes, + physical_table_name: Option, ) -> Result { let context = self.create_context(); - let procedure = - AlterTableProcedure::new(cluster_id, alter_table_task, table_info_value, context)?; + let procedure = AlterTableProcedure::new( + cluster_id, + alter_table_task, + table_info_value, + physical_table_name, + context, + )?; let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); @@ -327,8 +335,38 @@ async fn handle_alter_table_task( table_name: table_ref.to_string(), })?; + let physical_table_id = ddl_manager + .table_metadata_manager() + .table_route_manager() + .get_physical_table_id(table_id) + .await?; + + let physical_table_name = if physical_table_id == table_id { + None + } else { + let physical_table_info = &ddl_manager + .table_metadata_manager() + .table_info_manager() + .get(physical_table_id) + .await? + .with_context(|| error::TableInfoNotFoundSnafu { + table_name: table_ref.to_string(), + })? + .table_info; + Some(TableName { + catalog_name: physical_table_info.catalog_name.clone(), + schema_name: physical_table_info.schema_name.clone(), + table_name: physical_table_info.name.clone(), + }) + }; + let id = ddl_manager - .submit_alter_table_task(cluster_id, alter_table_task, table_info_value) + .submit_alter_table_task( + cluster_id, + alter_table_task, + table_info_value, + physical_table_name, + ) .await?; info!("Table: {table_id} is altered via procedure_id {id:?}"); diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index c95a28de60fe..9c9270f22d84 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -16,12 +16,14 @@ use std::collections::HashMap; use std::fmt::Display; use serde::{Deserialize, Serialize}; -use snafu::{ensure, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; use super::{DeserializedValueWithBytes, TableMetaValue}; -use crate::error::{Result, SerdeJsonSnafu, UnexpectedLogicalRouteTableSnafu}; +use crate::error::{ + Result, SerdeJsonSnafu, TableRouteNotFoundSnafu, UnexpectedLogicalRouteTableSnafu, +}; use crate::key::{to_removed_key, RegionDistribution, TableMetaKey, TABLE_ROUTE_PREFIX}; use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse}; use crate::kv_backend::KvBackendRef; @@ -334,6 +336,54 @@ impl TableRouteManager { .transpose() } + pub async fn get_physical_table_id( + &self, + logical_or_physical_table_id: TableId, + ) -> Result { + let table_route = self + .get(logical_or_physical_table_id) + .await? + .context(TableRouteNotFoundSnafu { + table_id: logical_or_physical_table_id, + })? + .into_inner(); + + match table_route { + TableRouteValue::Physical(_) => Ok(logical_or_physical_table_id), + TableRouteValue::Logical(x) => Ok(x.physical_table_id()), + } + } + + pub async fn get_physical_table_route( + &self, + logical_or_physical_table_id: TableId, + ) -> Result<(TableId, PhysicalTableRouteValue)> { + let table_route = self + .get(logical_or_physical_table_id) + .await? + .context(TableRouteNotFoundSnafu { + table_id: logical_or_physical_table_id, + })? + .into_inner(); + + match table_route { + TableRouteValue::Physical(x) => Ok((logical_or_physical_table_id, x)), + TableRouteValue::Logical(x) => { + let physical_table_id = x.physical_table_id(); + let physical_table_route = + self.get(physical_table_id) + .await? + .context(TableRouteNotFoundSnafu { + table_id: physical_table_id, + })?; + Ok(( + physical_table_id, + physical_table_route.physical_table_route().clone(), + )) + } + } + } + /// It may return a subset of the `table_ids`. pub async fn batch_get( &self, diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index 9ffad3aa6cf9..d042cdc37378 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -313,6 +313,7 @@ fn test_create_alter_region_request() { 1, alter_table_task, DeserializedValueWithBytes::from_inner(TableInfoValue::new(test_data::new_table_info())), + None, test_data::new_ddl_context(Arc::new(DatanodeClients::default())), ) .unwrap(); @@ -383,6 +384,7 @@ async fn test_submit_alter_region_requests() { 1, alter_table_task, DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info)), + None, context, ) .unwrap(); diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index 5b73ee8fedf7..4e424b595a0e 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -16,7 +16,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use api::v1::Rows; -use common_meta::key::table_route::{TableRouteManager, TableRouteValue}; +use common_meta::key::table_route::TableRouteManager; use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; use common_meta::rpc::router; @@ -29,7 +29,7 @@ use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; use crate::columns::RangeColumnsPartitionRule; -use crate::error::{FindLeaderSnafu, InvalidTableRouteDataSnafu, Result}; +use crate::error::{FindLeaderSnafu, Result}; use crate::partition::{PartitionBound, PartitionDef, PartitionExpr}; use crate::range::RangePartitionRule; use crate::splitter::RowSplitter; @@ -65,38 +65,13 @@ impl PartitionRuleManager { } } - /// Find table route of given table name. - async fn find_table_route(&self, table_id: TableId) -> Result { - let route = self + async fn find_region_routes(&self, table_id: TableId) -> Result> { + let (_, route) = self .table_route_manager - .get(table_id) + .get_physical_table_route(table_id) .await - .context(error::TableRouteManagerSnafu)? - .context(error::FindTableRoutesSnafu { table_id })? - .into_inner(); - Ok(route) - } - - async fn find_region_routes(&self, table_id: TableId) -> Result> { - let table_route = self.find_table_route(table_id).await?; - - let region_routes = match table_route { - TableRouteValue::Physical(x) => x.region_routes, - - TableRouteValue::Logical(x) => { - let TableRouteValue::Physical(physical_table_route) = - self.find_table_route(x.physical_table_id()).await? - else { - return InvalidTableRouteDataSnafu { - table_id: x.physical_table_id(), - err_msg: "expected to be a physical table route", - } - .fail(); - }; - physical_table_route.region_routes - } - }; - Ok(region_routes) + .context(error::TableRouteManagerSnafu)?; + Ok(route.region_routes) } pub async fn find_table_partitions(&self, table_id: TableId) -> Result> {