Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: metric engine support alter #3098

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 34 additions & 14 deletions src/common/meta/src/ddl/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ use table::requests::AlterKind;
use crate::cache_invalidator::Context;
use crate::ddl::utils::handle_operate_region_error;
use crate::ddl::DdlContext;
use crate::error::{
self, ConvertAlterTableRequestSnafu, InvalidProtoMsgSnafu, Result, TableRouteNotFoundSnafu,
};
use crate::error::{self, ConvertAlterTableRequestSnafu, InvalidProtoMsgSnafu, Result};
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::DeserializedValueWithBytes;
Expand All @@ -65,6 +63,7 @@ impl AlterTableProcedure {
cluster_id: u64,
task: AlterTableTask,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
physical_table_name: Option<TableName>,
context: DdlContext,
) -> Result<Self> {
let alter_kind = task
Expand All @@ -84,7 +83,13 @@ impl AlterTableProcedure {

Ok(Self {
context,
data: AlterTableData::new(task, table_info_value, cluster_id, next_column_id),
data: AlterTableData::new(
task,
table_info_value,
physical_table_name,
cluster_id,
next_column_id,
),
kind,
})
}
Expand Down Expand Up @@ -182,23 +187,19 @@ impl AlterTableProcedure {

pub async fn submit_alter_region_requests(&mut self) -> Result<Status> {
let table_id = self.data.table_id();

let table_route = self
let (_, physical_table_route) = self
.context
.table_metadata_manager
.table_route_manager()
.get(table_id)
.await?
.context(TableRouteNotFoundSnafu { table_id })?
.into_inner();
let region_routes = table_route.region_routes()?;
.get_physical_table_route(table_id)
.await?;

let leaders = find_leaders(region_routes);
let leaders = find_leaders(&physical_table_route.region_routes);
let mut alter_region_tasks = Vec::with_capacity(leaders.len());

for datanode in leaders {
let requester = self.context.datanode_manager.datanode(&datanode).await;
let regions = find_leader_regions(region_routes, &datanode);
let regions = find_leader_regions(&physical_table_route.region_routes, &datanode);

for region in regions {
let region_id = RegionId::new(table_id, region);
Expand Down Expand Up @@ -335,13 +336,24 @@ impl AlterTableProcedure {
}

fn lock_key_inner(&self) -> Vec<String> {
let mut lock_key = vec![];

if let Some(physical_table_name) = self.data.physical_table_name() {
let physical_table_key = common_catalog::format_full_table_name(
&physical_table_name.catalog_name,
&physical_table_name.schema_name,
&physical_table_name.table_name,
);
lock_key.push(physical_table_key);
MichaelScofield marked this conversation as resolved.
Show resolved Hide resolved
}

let table_ref = self.data.table_ref();
let table_key = common_catalog::format_full_table_name(
table_ref.catalog,
table_ref.schema,
table_ref.table,
);
let mut lock_key = vec![table_key];
lock_key.push(table_key);

if let Ok(Kind::RenameTable(RenameTable { new_table_name })) = self.alter_kind() {
lock_key.push(common_catalog::format_full_table_name(
Expand Down Expand Up @@ -415,6 +427,8 @@ pub struct AlterTableData {
task: AlterTableTask,
/// Table info value before alteration.
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
/// Physical table name, if the table to alter is a logical table.
physical_table_name: Option<TableName>,
cluster_id: u64,
/// Next column id of the table if the task adds columns to the table.
next_column_id: Option<ColumnId>,
Expand All @@ -424,13 +438,15 @@ impl AlterTableData {
pub fn new(
task: AlterTableTask,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
physical_table_name: Option<TableName>,
cluster_id: u64,
next_column_id: Option<ColumnId>,
) -> Self {
Self {
state: AlterTableState::Prepare,
task,
table_info_value,
physical_table_name,
cluster_id,
next_column_id,
}
Expand All @@ -447,6 +463,10 @@ impl AlterTableData {
fn table_info(&self) -> &RawTableInfo {
&self.table_info_value.table_info
}

fn physical_table_name(&self) -> Option<&TableName> {
self.physical_table_name.as_ref()
}
}

/// Creates region proto alter kind from `table_info` and `alter_kind`.
Expand Down
44 changes: 41 additions & 3 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ use crate::rpc::ddl::{
TruncateTableTask,
};
use crate::rpc::router::RegionRoute;
use crate::table_name::TableName;

pub type DdlManagerRef = Arc<DdlManager>;

/// The [DdlManager] provides the ability to execute Ddl.
Expand Down Expand Up @@ -160,11 +162,17 @@ impl DdlManager {
cluster_id: u64,
alter_table_task: AlterTableTask,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
physical_table_name: Option<TableName>,
) -> Result<ProcedureId> {
let context = self.create_context();

let procedure =
AlterTableProcedure::new(cluster_id, alter_table_task, table_info_value, context)?;
let procedure = AlterTableProcedure::new(
cluster_id,
alter_table_task,
table_info_value,
physical_table_name,
context,
)?;

let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));

Expand Down Expand Up @@ -327,8 +335,38 @@ async fn handle_alter_table_task(
table_name: table_ref.to_string(),
})?;

let physical_table_id = ddl_manager
.table_metadata_manager()
.table_route_manager()
.get_physical_table_id(table_id)
.await?;

let physical_table_name = if physical_table_id == table_id {
None
} else {
let physical_table_info = &ddl_manager
.table_metadata_manager()
.table_info_manager()
.get(physical_table_id)
.await?
.with_context(|| error::TableInfoNotFoundSnafu {
table_name: table_ref.to_string(),
})?
.table_info;
Some(TableName {
catalog_name: physical_table_info.catalog_name.clone(),
schema_name: physical_table_info.schema_name.clone(),
table_name: physical_table_info.name.clone(),
})
};

let id = ddl_manager
.submit_alter_table_task(cluster_id, alter_table_task, table_info_value)
.submit_alter_table_task(
cluster_id,
alter_table_task,
table_info_value,
physical_table_name,
)
.await?;

info!("Table: {table_id} is altered via procedure_id {id:?}");
Expand Down
54 changes: 52 additions & 2 deletions src/common/meta/src/key/table_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ use std::collections::HashMap;
use std::fmt::Display;

use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;

use super::{DeserializedValueWithBytes, TableMetaValue};
use crate::error::{Result, SerdeJsonSnafu, UnexpectedLogicalRouteTableSnafu};
use crate::error::{
Result, SerdeJsonSnafu, TableRouteNotFoundSnafu, UnexpectedLogicalRouteTableSnafu,
};
use crate::key::{to_removed_key, RegionDistribution, TableMetaKey, TABLE_ROUTE_PREFIX};
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse};
use crate::kv_backend::KvBackendRef;
Expand Down Expand Up @@ -332,6 +334,54 @@ impl TableRouteManager {
.transpose()
}

pub async fn get_physical_table_id(
&self,
logical_or_physical_table_id: TableId,
) -> Result<TableId> {
let table_route = self
.get(logical_or_physical_table_id)
.await?
.context(TableRouteNotFoundSnafu {
table_id: logical_or_physical_table_id,
})?
.into_inner();

match table_route {
TableRouteValue::Physical(_) => Ok(logical_or_physical_table_id),
TableRouteValue::Logical(x) => Ok(x.physical_table_id()),
}
}

pub async fn get_physical_table_route(
&self,
logical_or_physical_table_id: TableId,
) -> Result<(TableId, PhysicalTableRouteValue)> {
let table_route = self
.get(logical_or_physical_table_id)
.await?
.context(TableRouteNotFoundSnafu {
table_id: logical_or_physical_table_id,
})?
.into_inner();

match table_route {
TableRouteValue::Physical(x) => Ok((logical_or_physical_table_id, x)),
TableRouteValue::Logical(x) => {
let physical_table_id = x.physical_table_id();
let physical_table_route =
self.get(physical_table_id)
.await?
.context(TableRouteNotFoundSnafu {
table_id: physical_table_id,
})?;
Ok((
physical_table_id,
physical_table_route.physical_table_route().clone(),
))
}
}
}

/// It may return a subset of the `table_ids`.
pub async fn batch_get(
&self,
Expand Down
2 changes: 2 additions & 0 deletions src/meta-srv/src/procedure/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ fn test_create_alter_region_request() {
1,
alter_table_task,
DeserializedValueWithBytes::from_inner(TableInfoValue::new(test_data::new_table_info())),
None,
test_data::new_ddl_context(Arc::new(DatanodeClients::default())),
)
.unwrap();
Expand Down Expand Up @@ -383,6 +384,7 @@ async fn test_submit_alter_region_requests() {
1,
alter_table_task,
DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info)),
None,
context,
)
.unwrap();
Expand Down
39 changes: 7 additions & 32 deletions src/partition/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use api::v1::Rows;
use common_meta::key::table_route::{TableRouteManager, TableRouteValue};
use common_meta::key::table_route::TableRouteManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_meta::rpc::router;
Expand All @@ -29,7 +29,7 @@ use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;

use crate::columns::RangeColumnsPartitionRule;
use crate::error::{FindLeaderSnafu, InvalidTableRouteDataSnafu, Result};
use crate::error::{FindLeaderSnafu, Result};
use crate::partition::{PartitionBound, PartitionDef, PartitionExpr};
use crate::range::RangePartitionRule;
use crate::splitter::RowSplitter;
Expand Down Expand Up @@ -65,38 +65,13 @@ impl PartitionRuleManager {
}
}

/// Find table route of given table name.
async fn find_table_route(&self, table_id: TableId) -> Result<TableRouteValue> {
let route = self
async fn find_region_routes(&self, table_id: TableId) -> Result<Vec<RegionRoute>> {
let (_, route) = self
.table_route_manager
.get(table_id)
.get_physical_table_route(table_id)
.await
.context(error::TableRouteManagerSnafu)?
.context(error::FindTableRoutesSnafu { table_id })?
.into_inner();
Ok(route)
}

async fn find_region_routes(&self, table_id: TableId) -> Result<Vec<RegionRoute>> {
let table_route = self.find_table_route(table_id).await?;

let region_routes = match table_route {
TableRouteValue::Physical(x) => x.region_routes,

TableRouteValue::Logical(x) => {
let TableRouteValue::Physical(physical_table_route) =
self.find_table_route(x.physical_table_id()).await?
else {
return InvalidTableRouteDataSnafu {
table_id: x.physical_table_id(),
err_msg: "expected to be a physical table route",
}
.fail();
};
physical_table_route.region_routes
}
};
Ok(region_routes)
.context(error::TableRouteManagerSnafu)?;
Ok(route.region_routes)
}

pub async fn find_table_partitions(&self, table_id: TableId) -> Result<Vec<PartitionInfo>> {
Expand Down
Loading