Skip to content

Commit

Permalink
feat(cdc): support constant default value for alter table ADD COLUMN (#…
Browse files Browse the repository at this point in the history
…18322) (#18369)

Co-authored-by: StrikeW <[email protected]>
  • Loading branch information
github-actions[bot] and StrikeW authored Sep 3, 2024
1 parent 92e45bb commit 5f2938a
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 67 deletions.
12 changes: 6 additions & 6 deletions e2e_test/source/cdc_inline/alter/cdc_table_alter.slt
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,13 @@ select order_id, product_id, shipment_id from enriched_orders order by order_id;
system ok
mysql -e "
USE testdb1;
ALTER TABLE products ADD COLUMN weight DECIMAL(10, 2) NOT NULL DEFAULT 0.0;
ALTER TABLE products ADD COLUMN weight DECIMAL(10, 2) NOT NULL DEFAULT 1.1;
ALTER TABLE orders ADD COLUMN order_comment VARCHAR(255);
"

# alter cdc tables
statement ok
ALTER TABLE my_products ADD COLUMN weight DECIMAL;
ALTER TABLE my_products ADD COLUMN weight DECIMAL DEFAULT 1.1;

statement ok
ALTER TABLE my_orders ADD COLUMN order_comment VARCHAR;
Expand All @@ -148,9 +148,9 @@ sleep 3s
query ITTT
SELECT id,name,description,weight FROM my_products order by id limit 3
----
101 scooter Small 2-wheel scooter NULL
102 car battery 12V car battery NULL
103 12-pack drill 12-pack of drill bits with sizes ranging from #40 to #3 NULL
101 scooter Small 2-wheel scooter 1.1
102 car battery 12V car battery 1.1
103 12-pack drill 12-pack of drill bits with sizes ranging from #40 to #3 1.1


# update mysql tables
Expand All @@ -169,7 +169,7 @@ SELECT id,name,description,weight FROM my_products order by id limit 3
----
101 scooter Small 2-wheel scooter 10.50
102 car battery 12V car battery 12.50
103 12-pack drill 12-pack of drill bits with sizes ranging from #40 to #3 NULL
103 12-pack drill 12-pack of drill bits with sizes ranging from #40 to #3 1.1

query ITTT
SELECT order_id,order_date,customer_name,product_id,order_status,order_comment FROM my_orders order by order_id limit 2
Expand Down
19 changes: 16 additions & 3 deletions e2e_test/source/cdc_inline/auto_schema_change_mysql.slt
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ mysql -e "
CREATE TABLE customers(
id BIGINT PRIMARY KEY,
modified DATETIME,
name VARCHAR(32),
custinfo JSON
);
INSERT INTO customers VALUES(1, NOW(), 'John', NULL);
INSERT INTO customers VALUES(2, NOW(), 'Doe', NULL);
ALTER TABLE customers ADD INDEX zipsa( (CAST(custinfo->'zipcode' AS UNSIGNED ARRAY)) );
"

Expand All @@ -28,14 +31,15 @@ create source mysql_source with (
);

statement ok
create table rw_customers (id bigint, modified timestamp, custinfo jsonb, primary key (id)) from mysql_source table 'mytest.customers';
create table rw_customers (id bigint, modified timestamp, name varchar, custinfo jsonb, primary key (id)) from mysql_source table 'mytest.customers';

# Name, Type, Is Hidden, Description
query TTTT
describe rw_customers;
----
id bigint false NULL
modified timestamp without time zone false NULL
name character varying false NULL
custinfo jsonb false NULL
primary key id NULL NULL
distribution key id NULL NULL
Expand All @@ -46,8 +50,8 @@ table description rw_customers NULL NULL
system ok
mysql -e "
USE mytest;
ALTER TABLE customers ADD COLUMN v1 VARCHAR(255);
ALTER TABLE customers ADD COLUMN v2 double(5,2);
ALTER TABLE customers ADD COLUMN v1 VARCHAR(255) DEFAULT 'hello';
ALTER TABLE customers ADD COLUMN v2 double(5,2) DEFAULT 88.9;
"

sleep 3s
Expand All @@ -58,13 +62,20 @@ describe rw_customers;
----
id bigint false NULL
modified timestamp without time zone false NULL
name character varying false NULL
custinfo jsonb false NULL
v1 character varying false NULL
v2 double precision false NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL

query TTTT
select id,v1,v2,name from rw_customers order by id;
----
1 hello 88.9 John
2 hello 88.9 Doe

# rename column on upstream will not be replicated, since we do not support rename column
system ok
mysql -e "
Expand All @@ -81,6 +92,7 @@ describe rw_customers;
----
id bigint false NULL
modified timestamp without time zone false NULL
name character varying false NULL
custinfo jsonb false NULL
v1 character varying false NULL
v2 double precision false NULL
Expand Down Expand Up @@ -112,6 +124,7 @@ query TTTT
describe rw_customers;
----
id bigint false NULL
name character varying false NULL
custinfo jsonb false NULL
primary key id NULL NULL
distribution key id NULL NULL
Expand Down
14 changes: 13 additions & 1 deletion src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use itertools::Itertools;
use risingwave_pb::expr::ExprNode;
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_pb::plan_common::{
AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc,
AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, PbColumnCatalog, PbColumnDesc,
};

use super::{row_id_column_desc, USER_COLUMN_ID_OFFSET};
Expand Down Expand Up @@ -140,6 +140,18 @@ impl ColumnDesc {
}
}

pub fn named_with_default_value(
name: impl Into<String>,
column_id: ColumnId,
data_type: DataType,
default_val: DefaultColumnDesc,
) -> ColumnDesc {
ColumnDesc {
generated_or_default_column: Some(GeneratedOrDefaultColumn::DefaultColumn(default_val)),
..Self::named(name, column_id, data_type)
}
}

pub fn named_with_additional_column(
name: impl Into<String>,
column_id: ColumnId,
Expand Down
39 changes: 38 additions & 1 deletion src/connector/src/parser/unified/debezium.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ use risingwave_common::types::{
DataType, Datum, DatumCow, Scalar, ScalarImpl, ScalarRefImpl, Timestamptz, ToDatumRef,
ToOwnedDatum,
};
use risingwave_common::util::value_encoding::DatumToProtoExt;
use risingwave_connector_codec::decoder::AccessExt;
use risingwave_pb::expr::expr_node::{RexNode, Type as ExprType};
use risingwave_pb::expr::ExprNode;
use risingwave_pb::plan_common::additional_column::ColumnType;
use risingwave_pb::plan_common::DefaultColumnDesc;
use thiserror_ext::AsReport;

use super::{Access, AccessError, AccessResult, ChangeEvent, ChangeEventOperation};
Expand Down Expand Up @@ -221,7 +225,40 @@ pub fn parse_schema_change(
}
};

column_descs.push(ColumnDesc::named(name, ColumnId::placeholder(), data_type));
// handle default value expression, currently we only support constant expression
let column_desc = match col.access_object_field("defaultValueExpression") {
Some(default_val_expr_str) if !default_val_expr_str.is_jsonb_null() => {
let value_text = default_val_expr_str.as_string().unwrap();
let snapshot_value: Datum = Some(
ScalarImpl::from_text(value_text.as_str(), &data_type).map_err(
|err| {
tracing::error!(target: "auto_schema_change", error=%err.as_report(), "failed to parse default value expression");
AccessError::TypeError {
expected: "constant expression".into(),
got: data_type.to_string(),
value: value_text,
}},
)?,
);
// equivalent to `Literal::to_expr_proto`
let default_val_expr_node = ExprNode {
function_type: ExprType::Unspecified as i32,
return_type: Some(data_type.to_protobuf()),
rex_node: Some(RexNode::Constant(snapshot_value.to_protobuf())),
};
ColumnDesc::named_with_default_value(
name,
ColumnId::placeholder(),
data_type,
DefaultColumnDesc {
expr: Some(default_val_expr_node),
snapshot_value: Some(snapshot_value.to_protobuf()),
},
)
}
_ => ColumnDesc::named(name, ColumnId::placeholder(), data_type),
};
column_descs.push(column_desc);
}
}

Expand Down
14 changes: 9 additions & 5 deletions src/connector/src/source/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use risingwave_common::catalog::{
TABLE_NAME_COLUMN_NAME,
};
use risingwave_common::types::DataType;
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_pb::plan_common::{AdditionalColumn, ColumnDescVersion};

/// `SourceColumnDesc` is used to describe a column in the Source.
Expand Down Expand Up @@ -137,11 +138,14 @@ impl From<&ColumnDesc> for SourceColumnDesc {
version: _,
}: &ColumnDesc,
) -> Self {
debug_assert!(
generated_or_default_column.is_none(),
"source column should not be generated or default: {:?}",
generated_or_default_column.as_ref().unwrap()
);
if let Some(option) = generated_or_default_column {
debug_assert!(
matches!(option, GeneratedOrDefaultColumn::DefaultColumn(_)),
"source column should not be generated: {:?}",
generated_or_default_column.as_ref().unwrap()
)
}

Self {
name: name.clone(),
data_type: data_type.clone(),
Expand Down
25 changes: 15 additions & 10 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub async fn replace_table_with_definition(
definition,
original_catalog,
source_schema,
None,
)
.await?;

Expand All @@ -73,7 +74,7 @@ pub async fn replace_table_with_definition(
pub async fn get_new_table_definition_for_cdc_table(
session: &Arc<SessionImpl>,
table_name: ObjectName,
new_columns: Vec<ColumnCatalog>,
new_columns: &[ColumnCatalog],
) -> Result<(Statement, Arc<TableCatalog>)> {
let original_catalog = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;

Expand All @@ -96,22 +97,24 @@ pub async fn get_new_table_definition_for_cdc_table(
"source schema should be None for CDC table"
);

let orig_column_map: HashMap<String, ColumnDef> = HashMap::from_iter(
original_columns
let orig_column_catalog: HashMap<String, ColumnCatalog> = HashMap::from_iter(
original_catalog
.columns()
.iter()
.map(|col| (col.name.real_value(), col.clone())),
.map(|col| (col.name().to_string(), col.clone())),
);

// update the original columns with new version columns
let mut new_column_defs = vec![];
for col in new_columns {
// if the column exists in the original definitoins, use the original column definition.
for new_col in new_columns {
// if the column exists in the original catalog, use it to construct the column definition.
// since we don't support altering the column type right now
if let Some(original_col) = orig_column_map.get(col.name()) {
new_column_defs.push(original_col.clone());
if let Some(original_col) = orig_column_catalog.get(new_col.name()) {
let ty = to_ast_data_type(original_col.data_type())?;
new_column_defs.push(ColumnDef::new(original_col.name().into(), ty, None, vec![]));
} else {
let ty = to_ast_data_type(col.data_type())?;
new_column_defs.push(ColumnDef::new(col.name().into(), ty, None, vec![]));
let ty = to_ast_data_type(new_col.data_type())?;
new_column_defs.push(ColumnDef::new(new_col.name().into(), ty, None, vec![]));
}
}
*original_columns = new_column_defs;
Expand Down Expand Up @@ -162,6 +165,7 @@ pub async fn get_replace_table_plan(
definition: Statement,
original_catalog: &Arc<TableCatalog>,
source_schema: Option<ConnectorSchema>,
new_version_columns: Option<Vec<ColumnCatalog>>, // only provided in auto schema change
) -> Result<(
Option<Source>,
Table,
Expand Down Expand Up @@ -202,6 +206,7 @@ pub async fn get_replace_table_plan(
on_conflict,
with_version_column,
cdc_table_info,
new_version_columns,
)
.await?;

Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ pub(crate) async fn reparse_table_for_sink(
on_conflict,
with_version_column,
None,
None,
)
.await?;

Expand Down
Loading

0 comments on commit 5f2938a

Please sign in to comment.