diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index 1f77c1a678ae..18ac244f0970 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -67,7 +67,6 @@ impl AlterTableProcedure { cluster_id: u64, task: AlterTableTask, table_info_value: DeserializedValueWithBytes, - physical_table_info: Option<(TableId, TableName)>, context: DdlContext, ) -> Result { let alter_kind = task @@ -87,13 +86,7 @@ impl AlterTableProcedure { Ok(Self { context, - data: AlterTableData::new( - task, - table_info_value, - physical_table_info, - cluster_id, - next_column_id, - ), + data: AlterTableData::new(task, table_info_value, cluster_id, next_column_id), kind, }) } @@ -349,20 +342,6 @@ impl AlterTableProcedure { fn lock_key_inner(&self) -> Vec { let mut lock_key = vec![]; - - if let Some((physical_table_id, physical_table_name)) = self.data.physical_table_info() { - lock_key.push(CatalogLock::Read(&physical_table_name.catalog_name).into()); - lock_key.push( - SchemaLock::read( - &physical_table_name.catalog_name, - &physical_table_name.schema_name, - ) - .into(), - ); - // We must acquire the write lock since this may update the physical table schema - lock_key.push(TableLock::Write(*physical_table_id).into()) - } - let table_ref = self.data.table_ref(); let table_id = self.data.table_id(); lock_key.push(CatalogLock::Read(table_ref.catalog).into()); @@ -440,8 +419,6 @@ pub struct AlterTableData { task: AlterTableTask, /// Table info value before alteration. table_info_value: DeserializedValueWithBytes, - /// Physical table name, if the table to alter is a logical table. - physical_table_info: Option<(TableId, TableName)>, /// Next column id of the table if the task adds columns to the table. next_column_id: Option, } @@ -450,7 +427,6 @@ impl AlterTableData { pub fn new( task: AlterTableTask, table_info_value: DeserializedValueWithBytes, - physical_table_info: Option<(TableId, TableName)>, cluster_id: u64, next_column_id: Option, ) -> Self { @@ -458,7 +434,6 @@ impl AlterTableData { state: AlterTableState::Prepare, task, table_info_value, - physical_table_info, cluster_id, next_column_id, } @@ -475,10 +450,6 @@ impl AlterTableData { fn table_info(&self) -> &RawTableInfo { &self.table_info_value.table_info } - - fn physical_table_info(&self) -> Option<&(TableId, TableName)> { - self.physical_table_info.as_ref() - } } /// Creates region proto alter kind from `table_info` and `alter_kind`. diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index e63477f47562..72beec194f09 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -34,8 +34,10 @@ use crate::ddl::table_meta::TableMetadataAllocatorRef; use crate::ddl::truncate_table::TruncateTableProcedure; use crate::ddl::{utils, DdlContext, ExecutorContext, ProcedureExecutor}; use crate::error::{ - self, EmptyDdlTasksSnafu, ProcedureOutputSnafu, RegisterProcedureLoaderSnafu, Result, - SubmitProcedureSnafu, TableNotFoundSnafu, UnsupportedSnafu, WaitProcedureSnafu, + EmptyDdlTasksSnafu, ParseProcedureIdSnafu, ProcedureNotFoundSnafu, ProcedureOutputSnafu, + QueryProcedureSnafu, RegisterProcedureLoaderSnafu, Result, SubmitProcedureSnafu, + TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu, + UnexpectedLogicalRouteTableSnafu, UnsupportedSnafu, WaitProcedureSnafu, }; use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; @@ -53,7 +55,6 @@ use crate::rpc::ddl::{ use crate::rpc::procedure; use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse}; use crate::rpc::router::RegionRoute; -use crate::table_name::TableName; use crate::ClusterId; pub type DdlManagerRef = Arc; @@ -197,17 +198,11 @@ impl DdlManager { cluster_id: ClusterId, alter_table_task: AlterTableTask, table_info_value: DeserializedValueWithBytes, - physical_table_info: Option<(TableId, TableName)>, ) -> Result<(ProcedureId, Option)> { let context = self.create_context(); - let procedure = AlterTableProcedure::new( - cluster_id, - alter_table_task, - table_info_value, - physical_table_info, - context, - )?; + let procedure = + AlterTableProcedure::new(cluster_id, alter_table_task, table_info_value, context)?; let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); @@ -371,12 +366,11 @@ async fn handle_truncate_table_task( let (table_info_value, table_route_value) = table_metadata_manager.get_full_table_info(table_id).await?; - let table_info_value = table_info_value.with_context(|| error::TableInfoNotFoundSnafu { + let table_info_value = table_info_value.with_context(|| TableInfoNotFoundSnafu { table: table_ref.to_string(), })?; - let table_route_value = - table_route_value.context(error::TableRouteNotFoundSnafu { table_id })?; + let table_route_value = table_route_value.context(TableRouteNotFoundSnafu { table_id })?; let table_route = table_route_value.into_inner().region_routes()?.clone(); @@ -418,50 +412,28 @@ async fn handle_alter_table_task( })? .table_id(); - let table_info_value = ddl_manager - .table_metadata_manager() - .table_info_manager() - .get(table_id) - .await? - .with_context(|| error::TableInfoNotFoundSnafu { - table: table_ref.to_string(), - })?; - - let physical_table_id = ddl_manager + let (table_info_value, table_route_value) = ddl_manager .table_metadata_manager() - .table_route_manager() - .get_physical_table_id(table_id) + .get_full_table_info(table_id) .await?; - let physical_table_info = if physical_table_id == table_id { - None - } else { - let physical_table_info = &ddl_manager - .table_metadata_manager() - .table_info_manager() - .get(physical_table_id) - .await? - .with_context(|| error::TableInfoNotFoundSnafu { - table: table_ref.to_string(), - })? - .table_info; - Some(( - physical_table_id, - TableName { - catalog_name: physical_table_info.catalog_name.clone(), - schema_name: physical_table_info.schema_name.clone(), - table_name: physical_table_info.name.clone(), - }, - )) - }; + let table_route_value = table_route_value + .context(TableRouteNotFoundSnafu { table_id })? + .into_inner(); + + ensure!( + table_route_value.is_physical(), + UnexpectedLogicalRouteTableSnafu { + err_msg: format!("{:?} is a non-physical TableRouteValue.", table_ref), + } + ); + + let table_info_value = table_info_value.with_context(|| TableInfoNotFoundSnafu { + table: table_ref.to_string(), + })?; let (id, _) = ddl_manager - .submit_alter_table_task( - cluster_id, - alter_table_task, - table_info_value, - physical_table_info, - ) + .submit_alter_table_task(cluster_id, alter_table_task, table_info_value) .await?; info!("Table: {table_id} is altered via procedure_id {id:?}"); @@ -490,7 +462,7 @@ async fn handle_drop_table_task( .get_physical_table_route(table_id) .await?; - let table_info_value = table_info_value.with_context(|| error::TableInfoNotFoundSnafu { + let table_info_value = table_info_value.with_context(|| TableInfoNotFoundSnafu { table: table_ref.to_string(), })?; @@ -704,15 +676,15 @@ impl ProcedureExecutor for DdlManager { _ctx: &ExecutorContext, pid: &str, ) -> Result { - let pid = ProcedureId::parse_str(pid) - .with_context(|_| error::ParseProcedureIdSnafu { key: pid })?; + let pid = + ProcedureId::parse_str(pid).with_context(|_| ParseProcedureIdSnafu { key: pid })?; let state = self .procedure_manager .procedure_state(pid) .await - .context(error::QueryProcedureSnafu)? - .context(error::ProcedureNotFoundSnafu { + .context(QueryProcedureSnafu)? + .context(ProcedureNotFoundSnafu { pid: pid.to_string(), })?; diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index f1c7d1bfa86d..ed03e9c7cff3 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -356,7 +356,6 @@ impl TableMetadataManager { &self.kv_backend } - // TODO(ruihang): deprecate this pub async fn get_full_table_info( &self, table_id: TableId, @@ -368,17 +367,14 @@ impl TableMetadataManager { .table_route_manager .table_route_storage() .build_get_txn(table_id); - let (get_table_info_txn, table_info_decoder) = self.table_info_manager.build_get_txn(table_id); let txn = Txn::merge_all(vec![get_table_route_txn, get_table_info_txn]); + let res = self.kv_backend.txn(txn).await?; - let r = self.kv_backend.txn(txn).await?; - - let table_info_value = table_info_decoder(&r.responses)?; - - let table_route_value = table_route_decoder(&r.responses)?; + let table_info_value = table_info_decoder(&res.responses)?; + let table_route_value = table_route_decoder(&res.responses)?; Ok((table_info_value, table_route_value)) } diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index ae34f7eb19c8..a4797d9e1757 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -407,7 +407,6 @@ fn test_create_alter_region_request() { 1, alter_table_task, DeserializedValueWithBytes::from_inner(TableInfoValue::new(test_data::new_table_info())), - None, test_data::new_ddl_context(Arc::new(DatanodeClients::default())), ) .unwrap(); @@ -478,7 +477,6 @@ async fn test_submit_alter_region_requests() { 1, alter_table_task, DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info)), - None, context, ) .unwrap();