Skip to content

Commit

Permalink
feat: let alter table procedure can only alter physical table (#3613)
Browse files Browse the repository at this point in the history
* feat: let alter table procedure can only alter physicale table

* chore: rm unnecessary todo
  • Loading branch information
fengjiachun authored Mar 29, 2024
1 parent 7a19f66 commit 93da45f
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 97 deletions.
31 changes: 1 addition & 30 deletions src/common/meta/src/ddl/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ impl AlterTableProcedure {
cluster_id: u64,
task: AlterTableTask,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
physical_table_info: Option<(TableId, TableName)>,
context: DdlContext,
) -> Result<Self> {
let alter_kind = task
Expand All @@ -87,13 +86,7 @@ impl AlterTableProcedure {

Ok(Self {
context,
data: AlterTableData::new(
task,
table_info_value,
physical_table_info,
cluster_id,
next_column_id,
),
data: AlterTableData::new(task, table_info_value, cluster_id, next_column_id),
kind,
})
}
Expand Down Expand Up @@ -349,20 +342,6 @@ impl AlterTableProcedure {

fn lock_key_inner(&self) -> Vec<StringKey> {
let mut lock_key = vec![];

if let Some((physical_table_id, physical_table_name)) = self.data.physical_table_info() {
lock_key.push(CatalogLock::Read(&physical_table_name.catalog_name).into());
lock_key.push(
SchemaLock::read(
&physical_table_name.catalog_name,
&physical_table_name.schema_name,
)
.into(),
);
// We must acquire the write lock since this may update the physical table schema
lock_key.push(TableLock::Write(*physical_table_id).into())
}

let table_ref = self.data.table_ref();
let table_id = self.data.table_id();
lock_key.push(CatalogLock::Read(table_ref.catalog).into());
Expand Down Expand Up @@ -440,8 +419,6 @@ pub struct AlterTableData {
task: AlterTableTask,
/// Table info value before alteration.
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
/// Physical table name, if the table to alter is a logical table.
physical_table_info: Option<(TableId, TableName)>,
/// Next column id of the table if the task adds columns to the table.
next_column_id: Option<ColumnId>,
}
Expand All @@ -450,15 +427,13 @@ impl AlterTableData {
pub fn new(
task: AlterTableTask,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
physical_table_info: Option<(TableId, TableName)>,
cluster_id: u64,
next_column_id: Option<ColumnId>,
) -> Self {
Self {
state: AlterTableState::Prepare,
task,
table_info_value,
physical_table_info,
cluster_id,
next_column_id,
}
Expand All @@ -475,10 +450,6 @@ impl AlterTableData {
fn table_info(&self) -> &RawTableInfo {
&self.table_info_value.table_info
}

fn physical_table_info(&self) -> Option<&(TableId, TableName)> {
self.physical_table_info.as_ref()
}
}

/// Creates region proto alter kind from `table_info` and `alter_kind`.
Expand Down
88 changes: 30 additions & 58 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ use crate::ddl::table_meta::TableMetadataAllocatorRef;
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::{utils, DdlContext, ExecutorContext, ProcedureExecutor};
use crate::error::{
self, EmptyDdlTasksSnafu, ProcedureOutputSnafu, RegisterProcedureLoaderSnafu, Result,
SubmitProcedureSnafu, TableNotFoundSnafu, UnsupportedSnafu, WaitProcedureSnafu,
EmptyDdlTasksSnafu, ParseProcedureIdSnafu, ProcedureNotFoundSnafu, ProcedureOutputSnafu,
QueryProcedureSnafu, RegisterProcedureLoaderSnafu, Result, SubmitProcedureSnafu,
TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
UnexpectedLogicalRouteTableSnafu, UnsupportedSnafu, WaitProcedureSnafu,
};
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
Expand All @@ -53,7 +55,6 @@ use crate::rpc::ddl::{
use crate::rpc::procedure;
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
use crate::rpc::router::RegionRoute;
use crate::table_name::TableName;
use crate::ClusterId;

pub type DdlManagerRef = Arc<DdlManager>;
Expand Down Expand Up @@ -197,17 +198,11 @@ impl DdlManager {
cluster_id: ClusterId,
alter_table_task: AlterTableTask,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
physical_table_info: Option<(TableId, TableName)>,
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();

let procedure = AlterTableProcedure::new(
cluster_id,
alter_table_task,
table_info_value,
physical_table_info,
context,
)?;
let procedure =
AlterTableProcedure::new(cluster_id, alter_table_task, table_info_value, context)?;

let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));

Expand Down Expand Up @@ -371,12 +366,11 @@ async fn handle_truncate_table_task(
let (table_info_value, table_route_value) =
table_metadata_manager.get_full_table_info(table_id).await?;

let table_info_value = table_info_value.with_context(|| error::TableInfoNotFoundSnafu {
let table_info_value = table_info_value.with_context(|| TableInfoNotFoundSnafu {
table: table_ref.to_string(),
})?;

let table_route_value =
table_route_value.context(error::TableRouteNotFoundSnafu { table_id })?;
let table_route_value = table_route_value.context(TableRouteNotFoundSnafu { table_id })?;

let table_route = table_route_value.into_inner().region_routes()?.clone();

Expand Down Expand Up @@ -418,50 +412,28 @@ async fn handle_alter_table_task(
})?
.table_id();

let table_info_value = ddl_manager
.table_metadata_manager()
.table_info_manager()
.get(table_id)
.await?
.with_context(|| error::TableInfoNotFoundSnafu {
table: table_ref.to_string(),
})?;

let physical_table_id = ddl_manager
let (table_info_value, table_route_value) = ddl_manager
.table_metadata_manager()
.table_route_manager()
.get_physical_table_id(table_id)
.get_full_table_info(table_id)
.await?;

let physical_table_info = if physical_table_id == table_id {
None
} else {
let physical_table_info = &ddl_manager
.table_metadata_manager()
.table_info_manager()
.get(physical_table_id)
.await?
.with_context(|| error::TableInfoNotFoundSnafu {
table: table_ref.to_string(),
})?
.table_info;
Some((
physical_table_id,
TableName {
catalog_name: physical_table_info.catalog_name.clone(),
schema_name: physical_table_info.schema_name.clone(),
table_name: physical_table_info.name.clone(),
},
))
};
let table_route_value = table_route_value
.context(TableRouteNotFoundSnafu { table_id })?
.into_inner();

ensure!(
table_route_value.is_physical(),
UnexpectedLogicalRouteTableSnafu {
err_msg: format!("{:?} is a non-physical TableRouteValue.", table_ref),
}
);

let table_info_value = table_info_value.with_context(|| TableInfoNotFoundSnafu {
table: table_ref.to_string(),
})?;

let (id, _) = ddl_manager
.submit_alter_table_task(
cluster_id,
alter_table_task,
table_info_value,
physical_table_info,
)
.submit_alter_table_task(cluster_id, alter_table_task, table_info_value)
.await?;

info!("Table: {table_id} is altered via procedure_id {id:?}");
Expand Down Expand Up @@ -490,7 +462,7 @@ async fn handle_drop_table_task(
.get_physical_table_route(table_id)
.await?;

let table_info_value = table_info_value.with_context(|| error::TableInfoNotFoundSnafu {
let table_info_value = table_info_value.with_context(|| TableInfoNotFoundSnafu {
table: table_ref.to_string(),
})?;

Expand Down Expand Up @@ -704,15 +676,15 @@ impl ProcedureExecutor for DdlManager {
_ctx: &ExecutorContext,
pid: &str,
) -> Result<ProcedureStateResponse> {
let pid = ProcedureId::parse_str(pid)
.with_context(|_| error::ParseProcedureIdSnafu { key: pid })?;
let pid =
ProcedureId::parse_str(pid).with_context(|_| ParseProcedureIdSnafu { key: pid })?;

let state = self
.procedure_manager
.procedure_state(pid)
.await
.context(error::QueryProcedureSnafu)?
.context(error::ProcedureNotFoundSnafu {
.context(QueryProcedureSnafu)?
.context(ProcedureNotFoundSnafu {
pid: pid.to_string(),
})?;

Expand Down
10 changes: 3 additions & 7 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@ impl TableMetadataManager {
&self.kv_backend
}

// TODO(ruihang): deprecate this
pub async fn get_full_table_info(
&self,
table_id: TableId,
Expand All @@ -368,17 +367,14 @@ impl TableMetadataManager {
.table_route_manager
.table_route_storage()
.build_get_txn(table_id);

let (get_table_info_txn, table_info_decoder) =
self.table_info_manager.build_get_txn(table_id);

let txn = Txn::merge_all(vec![get_table_route_txn, get_table_info_txn]);
let res = self.kv_backend.txn(txn).await?;

let r = self.kv_backend.txn(txn).await?;

let table_info_value = table_info_decoder(&r.responses)?;

let table_route_value = table_route_decoder(&r.responses)?;
let table_info_value = table_info_decoder(&res.responses)?;
let table_route_value = table_route_decoder(&res.responses)?;

Ok((table_info_value, table_route_value))
}
Expand Down
2 changes: 0 additions & 2 deletions src/meta-srv/src/procedure/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,6 @@ fn test_create_alter_region_request() {
1,
alter_table_task,
DeserializedValueWithBytes::from_inner(TableInfoValue::new(test_data::new_table_info())),
None,
test_data::new_ddl_context(Arc::new(DatanodeClients::default())),
)
.unwrap();
Expand Down Expand Up @@ -478,7 +477,6 @@ async fn test_submit_alter_region_requests() {
1,
alter_table_task,
DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info)),
None,
context,
)
.unwrap();
Expand Down

0 comments on commit 93da45f

Please sign in to comment.