Skip to content

Commit

Permalink
fix: alter table procedure forgets to update next column id (#2385)
Browse files Browse the repository at this point in the history
* feat: add more info to error messages

* feat: store next column id in procedure

* fix: update next column id for table info

* test: fix add col test

* chore: remove location from invalid request error

* test: update test

* test: fix test
  • Loading branch information
evenyag authored Sep 14, 2023
1 parent cc7eb3d commit da54a0c
Show file tree
Hide file tree
Showing 12 changed files with 214 additions and 73 deletions.
179 changes: 122 additions & 57 deletions src/common/meta/src/ddl/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use common_telemetry::{debug, info};
use futures::future;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionId;
use store_api::storage::{ColumnId, RegionId};
use strum::AsRefStr;
use table::engine::TableReference;
use table::metadata::{RawTableInfo, TableId, TableInfo};
Expand All @@ -54,6 +54,8 @@ use crate::table_name::TableName;
pub struct AlterTableProcedure {
context: DdlContext,
data: AlterTableData,
/// proto alter Kind.
kind: alter_request::Kind,
}

impl AlterTableProcedure {
Expand All @@ -64,17 +66,50 @@ impl AlterTableProcedure {
task: AlterTableTask,
table_info_value: TableInfoValue,
context: DdlContext,
) -> Self {
Self {
) -> Result<Self> {
let alter_kind = task
.alter_table
.kind
.as_ref()
.context(InvalidProtoMsgSnafu {
err_msg: "'kind' is absent",
})?;
let (kind, next_column_id) =
create_proto_alter_kind(&table_info_value.table_info, alter_kind)?;

debug!(
"New AlterTableProcedure, kind: {:?}, next_column_id: {:?}",
kind, next_column_id
);

Ok(Self {
context,
data: AlterTableData::new(task, table_info_value, cluster_id),
}
data: AlterTableData::new(task, table_info_value, cluster_id, next_column_id),
kind,
})
}

pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;
let data: AlterTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
let alter_kind = data
.task
.alter_table
.kind
.as_ref()
.context(InvalidProtoMsgSnafu {
err_msg: "'kind' is absent",
})
.map_err(ProcedureError::external)?;
let (kind, next_column_id) =
create_proto_alter_kind(&data.table_info_value.table_info, alter_kind)
.map_err(ProcedureError::external)?;
assert_eq!(data.next_column_id, next_column_id);

Ok(AlterTableProcedure { context, data })
Ok(AlterTableProcedure {
context,
data,
kind,
})
}

// Checks whether the table exists.
Expand Down Expand Up @@ -133,56 +168,10 @@ impl AlterTableProcedure {
pub fn create_alter_region_request(&self, region_id: RegionId) -> Result<AlterRequest> {
let table_info = self.data.table_info();

let kind =
match self.alter_kind()? {
Kind::AddColumns(x) => {
let mut next_column_id = table_info.meta.next_column_id;

let add_columns =
x.add_columns
.iter()
.map(|add_column| {
let column_def = add_column.column_def.as_ref().context(
InvalidProtoMsgSnafu {
err_msg: "'column_def' is absent",
},
)?;

let column_id = next_column_id;
next_column_id += 1;

let column_def = RegionColumnDef {
column_def: Some(column_def.clone()),
column_id,
};

Ok(AddColumn {
column_def: Some(column_def),
location: add_column.location.clone(),
})
})
.collect::<Result<Vec<_>>>()?;

alter_request::Kind::AddColumns(AddColumns { add_columns })
}
Kind::DropColumns(x) => {
let drop_columns = x
.drop_columns
.iter()
.map(|x| DropColumn {
name: x.name.clone(),
})
.collect::<Vec<_>>();

alter_request::Kind::DropColumns(DropColumns { drop_columns })
}
Kind::RenameTable(_) => unreachable!(),
};

Ok(AlterRequest {
region_id: region_id.as_u64(),
schema_version: table_info.ident.version,
kind: Some(kind),
kind: Some(self.kind.clone()),
})
}

Expand Down Expand Up @@ -279,8 +268,11 @@ impl AlterTableProcedure {
})?;

let mut new_info = table_info.clone();
new_info.ident.version = table_info.ident.version + 1;
new_info.meta = new_meta;
new_info.ident.version = table_info.ident.version + 1;
if let Some(column_id) = self.data.next_column_id {
new_info.meta.next_column_id = new_info.meta.next_column_id.max(column_id);
}

if let AlterKind::RenameTable { new_table_name } = &request.alter_kind {
new_info.name = new_table_name.to_string();
Expand Down Expand Up @@ -308,7 +300,7 @@ impl AlterTableProcedure {
self.on_update_metadata_for_alter(new_info.into()).await?;
}

info!("Updated table metadata for table {table_id}");
info!("Updated table metadata for table {table_ref}, table_id: {table_id}");

self.data.state = AlterTableState::InvalidateTableCache;
Ok(Status::executing(true))
Expand Down Expand Up @@ -424,17 +416,26 @@ enum AlterTableState {
pub struct AlterTableData {
state: AlterTableState,
task: AlterTableTask,
/// Table info value before alteration.
table_info_value: TableInfoValue,
cluster_id: u64,
/// Next column id of the table if the task adds columns to the table.
next_column_id: Option<ColumnId>,
}

impl AlterTableData {
pub fn new(task: AlterTableTask, table_info_value: TableInfoValue, cluster_id: u64) -> Self {
pub fn new(
task: AlterTableTask,
table_info_value: TableInfoValue,
cluster_id: u64,
next_column_id: Option<ColumnId>,
) -> Self {
Self {
state: AlterTableState::Prepare,
task,
table_info_value,
cluster_id,
next_column_id,
}
}

Expand All @@ -450,3 +451,67 @@ impl AlterTableData {
&self.table_info_value.table_info
}
}

/// Creates region proto alter kind from `table_info` and `alter_kind`.
///
/// Returns the kind and next column id if it adds new columns.
///
/// # Panics
/// Panics if kind is rename.
pub fn create_proto_alter_kind(
table_info: &RawTableInfo,
alter_kind: &Kind,
) -> Result<(alter_request::Kind, Option<ColumnId>)> {
match alter_kind {
Kind::AddColumns(x) => {
let mut next_column_id = table_info.meta.next_column_id;

let add_columns = x
.add_columns
.iter()
.map(|add_column| {
let column_def =
add_column
.column_def
.as_ref()
.context(InvalidProtoMsgSnafu {
err_msg: "'column_def' is absent",
})?;

let column_id = next_column_id;
next_column_id += 1;

let column_def = RegionColumnDef {
column_def: Some(column_def.clone()),
column_id,
};

Ok(AddColumn {
column_def: Some(column_def),
location: add_column.location.clone(),
})
})
.collect::<Result<Vec<_>>>()?;

Ok((
alter_request::Kind::AddColumns(AddColumns { add_columns }),
Some(next_column_id),
))
}
Kind::DropColumns(x) => {
let drop_columns = x
.drop_columns
.iter()
.map(|x| DropColumn {
name: x.name.clone(),
})
.collect::<Vec<_>>();

Ok((
alter_request::Kind::DropColumns(DropColumns { drop_columns }),
None,
))
}
Kind::RenameTable(_) => unreachable!(),
}
}
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl DdlManager {
let context = self.create_context();

let procedure =
AlterTableProcedure::new(cluster_id, alter_table_task, table_info_value, context);
AlterTableProcedure::new(cluster_id, alter_table_task, table_info_value, context)?;

let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));

Expand Down
6 changes: 6 additions & 0 deletions src/frontend/src/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,12 @@ impl StatementExecutor {
let engine = table.table_info().meta.engine.to_string();
self.verify_alter(table_id, table.table_info(), expr.clone())?;

info!(
"Table info before alter is {:?}, expr: {:?}",
table.table_info(),
expr
);

let req = SubmitDdlTaskRequest {
task: DdlTask::new_alter_table(expr.clone()),
};
Expand Down
6 changes: 4 additions & 2 deletions src/meta-srv/src/procedure/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,8 @@ fn test_create_alter_region_request() {
alter_table_task,
TableInfoValue::new(test_data::new_table_info()),
test_data::new_ddl_context(Arc::new(DatanodeClients::default())),
);
)
.unwrap();

let region_id = RegionId::new(42, 1);
let alter_region_request = procedure.create_alter_region_request(region_id).unwrap();
Expand Down Expand Up @@ -358,7 +359,8 @@ async fn test_submit_alter_region_requests() {
alter_table_task,
TableInfoValue::new(table_info),
context,
);
)
.unwrap();

let expected_altered_regions = Arc::new(Mutex::new(HashSet::from([
RegionId::new(42, 1),
Expand Down
7 changes: 1 addition & 6 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,7 @@ pub enum Error {
location: Location,
},

#[snafu(display(
"Invalid request to region {}, location: {}, reason: {}",
region_id,
location,
reason
))]
#[snafu(display("Invalid request to region {}, reason: {}", region_id, reason))]
InvalidRequest {
region_id: RegionId,
reason: String,
Expand Down
7 changes: 5 additions & 2 deletions src/mito2/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,10 @@ impl WriteRequest {
!has_null || column.column_schema.is_nullable(),
InvalidRequestSnafu {
region_id,
reason: format!("column {} is not null", column.column_schema.name),
reason: format!(
"column {} is not null but input has null",
column.column_schema.name
),
}
);
} else {
Expand Down Expand Up @@ -805,7 +808,7 @@ mod tests {

let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap();
let err = request.check_schema(&metadata).unwrap_err();
check_invalid_request(&err, "column ts is not null");
check_invalid_request(&err, "column ts is not null but input has null");
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion src/sql/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub enum Error {
},

#[snafu(display(
"Unsupported expr in default constraint: {} for column: {}",
"Unsupported expr in default constraint: {:?} for column: {}",
expr,
column_name
))]
Expand Down
4 changes: 2 additions & 2 deletions src/store-api/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ impl RegionMetadata {
!id_names.contains_key(&col.column_id),
InvalidMetaSnafu {
reason: format!(
"column {} and {} have the same column id",
id_names[&col.column_id], col.column_schema.name
"column {} and {} have the same column id {}",
id_names[&col.column_id], col.column_schema.name, col.column_id,
),
}
);
Expand Down
Loading

0 comments on commit da54a0c

Please sign in to comment.