diff --git a/e2e_test/source/cdc_inline/alter/cdc_table_alter.slt b/e2e_test/source/cdc_inline/alter/cdc_table_alter.slt index 6bea5dce2fe45..baecff00c09a4 100644 --- a/e2e_test/source/cdc_inline/alter/cdc_table_alter.slt +++ b/e2e_test/source/cdc_inline/alter/cdc_table_alter.slt @@ -131,13 +131,13 @@ select order_id, product_id, shipment_id from enriched_orders order by order_id; system ok mysql -e " USE testdb1; - ALTER TABLE products ADD COLUMN weight DECIMAL(10, 2) NOT NULL DEFAULT 0.0; + ALTER TABLE products ADD COLUMN weight DECIMAL(10, 2) NOT NULL DEFAULT 1.1; ALTER TABLE orders ADD COLUMN order_comment VARCHAR(255); " # alter cdc tables statement ok -ALTER TABLE my_products ADD COLUMN weight DECIMAL; +ALTER TABLE my_products ADD COLUMN weight DECIMAL DEFAULT 1.1; statement ok ALTER TABLE my_orders ADD COLUMN order_comment VARCHAR; @@ -148,9 +148,9 @@ sleep 3s query ITTT SELECT id,name,description,weight FROM my_products order by id limit 3 ---- -101 scooter Small 2-wheel scooter NULL -102 car battery 12V car battery NULL -103 12-pack drill 12-pack of drill bits with sizes ranging from #40 to #3 NULL +101 scooter Small 2-wheel scooter 1.1 +102 car battery 12V car battery 1.1 +103 12-pack drill 12-pack of drill bits with sizes ranging from #40 to #3 1.1 # update mysql tables @@ -169,7 +169,7 @@ SELECT id,name,description,weight FROM my_products order by id limit 3 ---- 101 scooter Small 2-wheel scooter 10.50 102 car battery 12V car battery 12.50 -103 12-pack drill 12-pack of drill bits with sizes ranging from #40 to #3 NULL +103 12-pack drill 12-pack of drill bits with sizes ranging from #40 to #3 1.1 query ITTT SELECT order_id,order_date,customer_name,product_id,order_status,order_comment FROM my_orders order by order_id limit 2 diff --git a/e2e_test/source/cdc_inline/auto_schema_change_mysql.slt b/e2e_test/source/cdc_inline/auto_schema_change_mysql.slt index 3c386a2718479..f1c94be75ccf5 100644 --- a/e2e_test/source/cdc_inline/auto_schema_change_mysql.slt +++ b/e2e_test/source/cdc_inline/auto_schema_change_mysql.slt @@ -10,8 +10,11 @@ mysql -e " CREATE TABLE customers( id BIGINT PRIMARY KEY, modified DATETIME, + name VARCHAR(32), custinfo JSON ); + INSERT INTO customers VALUES(1, NOW(), 'John', NULL); + INSERT INTO customers VALUES(2, NOW(), 'Doe', NULL); ALTER TABLE customers ADD INDEX zipsa( (CAST(custinfo->'zipcode' AS UNSIGNED ARRAY)) ); " @@ -28,7 +31,7 @@ create source mysql_source with ( ); statement ok -create table rw_customers (id bigint, modified timestamp, custinfo jsonb, primary key (id)) from mysql_source table 'mytest.customers'; +create table rw_customers (id bigint, modified timestamp, name varchar, custinfo jsonb, primary key (id)) from mysql_source table 'mytest.customers'; # Name, Type, Is Hidden, Description query TTTT @@ -36,6 +39,7 @@ describe rw_customers; ---- id bigint false NULL modified timestamp without time zone false NULL +name character varying false NULL custinfo jsonb false NULL primary key id NULL NULL distribution key id NULL NULL @@ -46,8 +50,8 @@ table description rw_customers NULL NULL system ok mysql -e " USE mytest; - ALTER TABLE customers ADD COLUMN v1 VARCHAR(255); - ALTER TABLE customers ADD COLUMN v2 double(5,2); + ALTER TABLE customers ADD COLUMN v1 VARCHAR(255) DEFAULT 'hello'; + ALTER TABLE customers ADD COLUMN v2 double(5,2) DEFAULT 88.9; " sleep 3s @@ -58,6 +62,7 @@ describe rw_customers; ---- id bigint false NULL modified timestamp without time zone false NULL +name character varying false NULL custinfo jsonb false NULL v1 character varying false NULL v2 double precision false NULL @@ -65,6 +70,12 @@ primary key id NULL NULL distribution key id NULL NULL table description rw_customers NULL NULL +query TTTT +select id,v1,v2,name from rw_customers order by id; +---- +1 hello 88.9 John +2 hello 88.9 Doe + # rename column on upstream will not be replicated, since we do not support rename column system ok mysql -e " @@ -81,6 +92,7 @@ describe rw_customers; ---- id bigint false NULL modified timestamp without time zone false NULL +name character varying false NULL custinfo jsonb false NULL v1 character varying false NULL v2 double precision false NULL @@ -112,6 +124,7 @@ query TTTT describe rw_customers; ---- id bigint false NULL +name character varying false NULL custinfo jsonb false NULL primary key id NULL NULL distribution key id NULL NULL diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index b3065defea2a2..f1d1123fbfd4e 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -18,7 +18,7 @@ use itertools::Itertools; use risingwave_pb::expr::ExprNode; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::{ - AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc, + AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, PbColumnCatalog, PbColumnDesc, }; use super::{row_id_column_desc, USER_COLUMN_ID_OFFSET}; @@ -140,6 +140,18 @@ impl ColumnDesc { } } + pub fn named_with_default_value( + name: impl Into, + column_id: ColumnId, + data_type: DataType, + default_val: DefaultColumnDesc, + ) -> ColumnDesc { + ColumnDesc { + generated_or_default_column: Some(GeneratedOrDefaultColumn::DefaultColumn(default_val)), + ..Self::named(name, column_id, data_type) + } + } + pub fn named_with_additional_column( name: impl Into, column_id: ColumnId, diff --git a/src/connector/src/parser/unified/debezium.rs b/src/connector/src/parser/unified/debezium.rs index 2dbe78cf32e25..a2c5742b87d44 100644 --- a/src/connector/src/parser/unified/debezium.rs +++ b/src/connector/src/parser/unified/debezium.rs @@ -18,8 +18,12 @@ use risingwave_common::types::{ DataType, Datum, DatumCow, Scalar, ScalarImpl, ScalarRefImpl, Timestamptz, ToDatumRef, ToOwnedDatum, }; +use risingwave_common::util::value_encoding::DatumToProtoExt; use risingwave_connector_codec::decoder::AccessExt; +use risingwave_pb::expr::expr_node::{RexNode, Type as ExprType}; +use risingwave_pb::expr::ExprNode; use risingwave_pb::plan_common::additional_column::ColumnType; +use risingwave_pb::plan_common::DefaultColumnDesc; use thiserror_ext::AsReport; use super::{Access, AccessError, AccessResult, ChangeEvent, ChangeEventOperation}; @@ -221,7 +225,40 @@ pub fn parse_schema_change( } }; - column_descs.push(ColumnDesc::named(name, ColumnId::placeholder(), data_type)); + // handle default value expression, currently we only support constant expression + let column_desc = match col.access_object_field("defaultValueExpression") { + Some(default_val_expr_str) if !default_val_expr_str.is_jsonb_null() => { + let value_text = default_val_expr_str.as_string().unwrap(); + let snapshot_value: Datum = Some( + ScalarImpl::from_text(value_text.as_str(), &data_type).map_err( + |err| { + tracing::error!(target: "auto_schema_change", error=%err.as_report(), "failed to parse default value expression"); + AccessError::TypeError { + expected: "constant expression".into(), + got: data_type.to_string(), + value: value_text, + }}, + )?, + ); + // equivalent to `Literal::to_expr_proto` + let default_val_expr_node = ExprNode { + function_type: ExprType::Unspecified as i32, + return_type: Some(data_type.to_protobuf()), + rex_node: Some(RexNode::Constant(snapshot_value.to_protobuf())), + }; + ColumnDesc::named_with_default_value( + name, + ColumnId::placeholder(), + data_type, + DefaultColumnDesc { + expr: Some(default_val_expr_node), + snapshot_value: Some(snapshot_value.to_protobuf()), + }, + ) + } + _ => ColumnDesc::named(name, ColumnId::placeholder(), data_type), + }; + column_descs.push(column_desc); } } diff --git a/src/connector/src/source/manager.rs b/src/connector/src/source/manager.rs index 731d0c4ff8ae8..67826129c8b82 100644 --- a/src/connector/src/source/manager.rs +++ b/src/connector/src/source/manager.rs @@ -19,6 +19,7 @@ use risingwave_common::catalog::{ TABLE_NAME_COLUMN_NAME, }; use risingwave_common::types::DataType; +use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::{AdditionalColumn, ColumnDescVersion}; /// `SourceColumnDesc` is used to describe a column in the Source. @@ -137,11 +138,14 @@ impl From<&ColumnDesc> for SourceColumnDesc { version: _, }: &ColumnDesc, ) -> Self { - debug_assert!( - generated_or_default_column.is_none(), - "source column should not be generated or default: {:?}", - generated_or_default_column.as_ref().unwrap() - ); + if let Some(option) = generated_or_default_column { + debug_assert!( + matches!(option, GeneratedOrDefaultColumn::DefaultColumn(_)), + "source column should not be generated: {:?}", + generated_or_default_column.as_ref().unwrap() + ) + } + Self { name: name.clone(), data_type: data_type.clone(), diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index f00ff35992b43..4fd624929a175 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -58,6 +58,7 @@ pub async fn replace_table_with_definition( definition, original_catalog, source_schema, + None, ) .await?; @@ -73,7 +74,7 @@ pub async fn replace_table_with_definition( pub async fn get_new_table_definition_for_cdc_table( session: &Arc, table_name: ObjectName, - new_columns: Vec, + new_columns: &[ColumnCatalog], ) -> Result<(Statement, Arc)> { let original_catalog = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?; @@ -96,22 +97,24 @@ pub async fn get_new_table_definition_for_cdc_table( "source schema should be None for CDC table" ); - let orig_column_map: HashMap = HashMap::from_iter( - original_columns + let orig_column_catalog: HashMap = HashMap::from_iter( + original_catalog + .columns() .iter() - .map(|col| (col.name.real_value(), col.clone())), + .map(|col| (col.name().to_string(), col.clone())), ); // update the original columns with new version columns let mut new_column_defs = vec![]; - for col in new_columns { - // if the column exists in the original definitoins, use the original column definition. + for new_col in new_columns { + // if the column exists in the original catalog, use it to construct the column definition. // since we don't support altering the column type right now - if let Some(original_col) = orig_column_map.get(col.name()) { - new_column_defs.push(original_col.clone()); + if let Some(original_col) = orig_column_catalog.get(new_col.name()) { + let ty = to_ast_data_type(original_col.data_type())?; + new_column_defs.push(ColumnDef::new(original_col.name().into(), ty, None, vec![])); } else { - let ty = to_ast_data_type(col.data_type())?; - new_column_defs.push(ColumnDef::new(col.name().into(), ty, None, vec![])); + let ty = to_ast_data_type(new_col.data_type())?; + new_column_defs.push(ColumnDef::new(new_col.name().into(), ty, None, vec![])); } } *original_columns = new_column_defs; @@ -162,6 +165,7 @@ pub async fn get_replace_table_plan( definition: Statement, original_catalog: &Arc, source_schema: Option, + new_version_columns: Option>, // only provided in auto schema change ) -> Result<( Option, Table, @@ -202,6 +206,7 @@ pub async fn get_replace_table_plan( on_conflict, with_version_column, cdc_table_info, + new_version_columns, ) .await?; diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index f0c1ed074ed1e..2c602e9e859d4 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -693,6 +693,7 @@ pub(crate) async fn reparse_table_for_sink( on_conflict, with_version_column, None, + None, ) .await?; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 825ee595b4b29..4937132d94790 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -27,6 +27,7 @@ use risingwave_common::catalog::{ INITIAL_TABLE_VERSION_ID, }; use risingwave_common::license::Feature; +use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_common::util::value_encoding::DatumToProtoExt; use risingwave_connector::source::cdc::build_cdc_table_id; @@ -749,10 +750,10 @@ fn gen_table_plan_inner( /// in create table workflow, the `table_id` is a placeholder will be filled in the Meta #[allow(clippy::too_many_arguments)] pub(crate) fn gen_create_table_plan_for_cdc_table( - handler_args: HandlerArgs, - explain_options: ExplainOptions, + context: OptimizerContextRef, source: Arc, external_table_name: String, + column_defs: Vec, mut columns: Vec, pk_names: Vec, connect_properties: WithOptionsSecResolved, @@ -760,12 +761,12 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( on_conflict: Option, with_version_column: Option, include_column_options: IncludeOption, - resolved_table_name: String, + table_name: ObjectName, + resolved_table_name: String, // table name without schema prefix database_id: DatabaseId, schema_id: SchemaId, table_id: TableId, ) -> Result<(PlanRef, PbTable)> { - let context: OptimizerContextRef = OptimizerContext::new(handler_args, explain_options).into(); let session = context.session_ctx().clone(); // append additional columns to the end @@ -780,9 +781,18 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( c.column_desc.column_id = col_id_gen.generate(c.name()) } - let (columns, pk_column_ids, _row_id_index) = + let (mut columns, pk_column_ids, _row_id_index) = bind_pk_and_row_id_on_relation(columns, pk_names, true)?; + // NOTES: In auto schema change, default value is not provided in column definition. + bind_sql_column_constraints( + context.session_ctx(), + table_name.real_value(), + &mut columns, + column_defs, + &pk_column_ids, + )?; + let definition = context.normalized_sql().to_owned(); let pk_column_indices = { @@ -985,7 +995,7 @@ pub(super) async fn handle_create_table_plan( let session = &handler_args.session; let db_name = session.database(); let (schema_name, resolved_table_name) = - Binder::resolve_schema_qualified_name(db_name, table_name)?; + Binder::resolve_schema_qualified_name(db_name, table_name.clone())?; let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name.clone())?; @@ -1019,11 +1029,13 @@ pub(super) async fn handle_create_table_plan( ) .await?; + let context: OptimizerContextRef = + OptimizerContext::new(handler_args, explain_options).into(); let (plan, table) = gen_create_table_plan_for_cdc_table( - handler_args, - explain_options, + context, source, cdc_table.external_table_name.clone(), + column_defs, columns, pk_names, connect_properties, @@ -1031,6 +1043,7 @@ pub(super) async fn handle_create_table_plan( on_conflict, with_version_column, include_column_options, + table_name, resolved_table_name, database_id, schema_id, @@ -1119,13 +1132,20 @@ fn sanity_check_for_cdc_table( Ok(()) } +struct CdcSchemaChangeArgs { + /// original table catalog + original_catalog: Arc, + /// new version table columns, only provided in auto schema change + new_version_columns: Option>, +} + +/// Derive schema for cdc table when create a new Table or alter an existing Table async fn derive_schema_for_cdc_table( column_defs: &Vec, constraints: &Vec, connect_properties: WithOptionsSecResolved, need_auto_schema_map: bool, - // original table catalog available in auto schema change process - original_catalog: Option>, + schema_change_args: Option, ) -> Result<(Vec, Vec)> { // read cdc table schema from external db or parsing the schema from SQL definitions if need_auto_schema_map { @@ -1157,14 +1177,32 @@ async fn derive_schema_for_cdc_table( table.pk_names().clone(), )) } else { - let columns = bind_sql_columns(column_defs)?; - // For table created by `create table t (*)` the constraint is empty, we need to - // retrieve primary key names from original table catalog if available - let pk_names = if let Some(original_catalog) = original_catalog { - original_catalog + let mut columns = bind_sql_columns(column_defs)?; + let pk_names = if let Some(args) = schema_change_args { + // If new_version_columns is provided, we are in the process of auto schema change. + // update the default value column since the default value column is not set in the + // column sql definition. + if let Some(new_version_columns) = args.new_version_columns { + for (col, new_version_col) in columns + .iter_mut() + .zip_eq_fast(new_version_columns.into_iter()) + { + assert_eq!(col.name(), new_version_col.name()); + col.column_desc.generated_or_default_column = + new_version_col.column_desc.generated_or_default_column; + } + } + + // For table created by `create table t (*)` the constraint is empty, we need to + // retrieve primary key names from original table catalog if available + args.original_catalog .pk .iter() - .map(|x| original_catalog.columns[x.column_index].name().to_string()) + .map(|x| { + args.original_catalog.columns[x.column_index] + .name() + .to_string() + }) .collect() } else { bind_sql_pk_names(column_defs, constraints)? @@ -1288,6 +1326,7 @@ pub async fn generate_stream_graph_for_table( on_conflict: Option, with_version_column: Option, cdc_table_info: Option, + new_version_columns: Option>, ) -> Result<(StreamFragmentGraph, Table, Option, TableJobType)> { use risingwave_pb::catalog::table::OptionalAssociatedSourceId; @@ -1341,22 +1380,28 @@ pub async fn generate_stream_graph_for_table( &constraints, connect_properties.clone(), false, - Some(original_catalog.clone()), + Some(CdcSchemaChangeArgs { + original_catalog: original_catalog.clone(), + new_version_columns, + }), ) .await?; + let context: OptimizerContextRef = + OptimizerContext::new(handler_args, ExplainOptions::default()).into(); let (plan, table) = gen_create_table_plan_for_cdc_table( - handler_args, - ExplainOptions::default(), + context, source, cdc_table.external_table_name.clone(), + column_defs, columns, pk_names, connect_properties, col_id_gen, on_conflict, with_version_column, - vec![], // empty include options + IncludeOption::default(), + table_name, resolved_table_name, database_id, schema_id, diff --git a/src/frontend/src/rpc/mod.rs b/src/frontend/src/rpc/mod.rs index 257695cc99e48..b0472a431c2dd 100644 --- a/src/frontend/src/rpc/mod.rs +++ b/src/frontend/src/rpc/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use itertools::Itertools; use pgwire::pg_server::{BoxedError, SessionManager}; use risingwave_pb::ddl_service::{ReplaceTablePlan, TableSchemaChange}; use risingwave_pb::frontend_service::frontend_service_server::FrontendService; @@ -90,16 +91,22 @@ async fn get_new_table_plan( // get a session object for the corresponding user and database let session = session_mgr.create_dummy_session(database_id, owner)?; - let new_columns = table_change.columns.into_iter().map(|c| c.into()).collect(); + let new_version_columns = table_change + .columns + .into_iter() + .map(|c| c.into()) + .collect_vec(); let table_name = ObjectName::from(vec![table_name.as_str().into()]); let (new_table_definition, original_catalog) = - get_new_table_definition_for_cdc_table(&session, table_name.clone(), new_columns).await?; + get_new_table_definition_for_cdc_table(&session, table_name.clone(), &new_version_columns) + .await?; let (_, table, graph, col_index_mapping, job_type) = get_replace_table_plan( &session, table_name, new_table_definition, &original_catalog, None, + Some(new_version_columns), ) .await?; diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 1e8e1c9641d8a..641bc392f7228 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -19,6 +19,7 @@ use anyhow::anyhow; use rand::seq::SliceRandom; use rand::thread_rng; use risingwave_common::catalog::ColumnCatalog; +use risingwave_common::types::DataType; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_connector::sink::catalog::SinkId; use risingwave_meta::manager::{EventLogManagerRef, MetadataManager}; @@ -969,40 +970,41 @@ impl DdlService for DdlServiceImpl { for table in tables { // Since we only support `ADD` and `DROP` column, we check whether the new columns and the original columns // is a subset of the other. - let original_column_names: HashSet = HashSet::from_iter( - table - .columns - .iter() - .map(|col| ColumnCatalog::from(col.clone()).column_desc.name), - ); - let new_column_names: HashSet = HashSet::from_iter( - table_change - .columns - .iter() - .map(|col| ColumnCatalog::from(col.clone()).column_desc.name), - ); - if !(original_column_names.is_subset(&new_column_names) - || original_column_names.is_superset(&new_column_names)) + let original_columns: HashSet<(String, DataType)> = + HashSet::from_iter(table.columns.iter().map(|col| { + let col = ColumnCatalog::from(col.clone()); + let data_type = col.data_type().clone(); + (col.column_desc.name, data_type) + })); + let new_columns: HashSet<(String, DataType)> = + HashSet::from_iter(table_change.columns.iter().map(|col| { + let col = ColumnCatalog::from(col.clone()); + let data_type = col.data_type().clone(); + (col.column_desc.name, data_type) + })); + + if !(original_columns.is_subset(&new_columns) + || original_columns.is_superset(&new_columns)) { tracing::warn!(target: "auto_schema_change", table_id = table.id, cdc_table_id = table.cdc_table_id, upstraem_ddl = table_change.upstream_ddl, - original_columns = ?original_column_names, - new_columns = ?new_column_names, + original_columns = ?original_columns, + new_columns = ?new_columns, "New columns should be a subset or superset of the original columns, since only `ADD COLUMN` and `DROP COLUMN` is supported"); return Err(Status::invalid_argument( "New columns should be a subset or superset of the original columns", )); } // skip the schema change if there is no change to original columns - if original_column_names == new_column_names { + if original_columns == new_columns { tracing::warn!(target: "auto_schema_change", table_id = table.id, cdc_table_id = table.cdc_table_id, upstraem_ddl = table_change.upstream_ddl, - original_columns = ?original_column_names, - new_columns = ?new_column_names, + original_columns = ?original_columns, + new_columns = ?new_columns, "No change to columns, skipping the schema change"); continue; }