From 577fbcc623e4e161323d1514ee36b8255fccd9fd Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Mon, 4 Nov 2024 19:28:01 +0800 Subject: [PATCH 01/11] remove intermediate func --- risedev.yml | 4 +- .../src/handler/alter_table_column.rs | 33 ++------- .../src/handler/alter_table_with_sr.rs | 70 +++++++++++-------- 3 files changed, 49 insertions(+), 58 deletions(-) diff --git a/risedev.yml b/risedev.yml index 0ec9b8b09af79..fcb24fde20714 100644 --- a/risedev.yml +++ b/risedev.yml @@ -51,8 +51,8 @@ profile: # - use: compactor # If you want to create source from Kafka, uncomment the following lines - # - use: kafka - # persist-data: true + - use: kafka + persist-data: true # To enable Confluent schema registry, uncomment the following line # - use: schema-registry diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index 88e886ad667bf..a75ebfb7fecec 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -46,31 +46,6 @@ use crate::handler::create_sink::{fetch_incoming_sinks, insert_merger_to_union_w use crate::session::SessionImpl; use crate::{Binder, TableCatalog, WithOptions}; -pub async fn replace_table_with_definition( - session: &Arc, - table_name: ObjectName, - definition: Statement, - original_catalog: &Arc, - source_schema: Option, -) -> Result<()> { - let (source, table, graph, col_index_mapping, job_type) = get_replace_table_plan( - session, - table_name, - definition, - original_catalog, - source_schema, - None, - ) - .await?; - - let catalog_writer = session.catalog_writer()?; - - catalog_writer - .replace_table(source, table, graph, col_index_mapping, job_type) - .await?; - Ok(()) -} - /// Used in auto schema change process pub async fn get_new_table_definition_for_cdc_table( session: &Arc, @@ -455,15 +430,21 @@ pub async fn handle_alter_table_column( _ => unreachable!(), }; - replace_table_with_definition( + let (source, table, graph, col_index_mapping, job_type) = get_replace_table_plan( &session, table_name, definition, &original_catalog, source_schema, + None, ) .await?; + let catalog_writer = session.catalog_writer()?; + + catalog_writer + .replace_table(source, table, graph, col_index_mapping, job_type) + .await?; Ok(PgResponse::empty_result(StatementType::ALTER_TABLE)) } diff --git a/src/frontend/src/handler/alter_table_with_sr.rs b/src/frontend/src/handler/alter_table_with_sr.rs index d932246759e22..4cabfd9948809 100644 --- a/src/frontend/src/handler/alter_table_with_sr.rs +++ b/src/frontend/src/handler/alter_table_with_sr.rs @@ -14,18 +14,16 @@ use anyhow::{anyhow, Context}; use fancy_regex::Regex; -use pgwire::pg_response::StatementType; +use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::bail_not_implemented; use risingwave_sqlparser::ast::{ConnectorSchema, ObjectName, Statement}; use risingwave_sqlparser::parser::Parser; use thiserror_ext::AsReport; use super::alter_source_with_sr::alter_definition_format_encode; -use super::alter_table_column::{ - fetch_table_catalog_for_alter, replace_table_with_definition, schema_has_schema_registry, -}; +use super::alter_table_column::{fetch_table_catalog_for_alter, schema_has_schema_registry}; use super::util::SourceSchemaCompatExt; -use super::{HandlerArgs, RwPgResponse}; +use super::{get_replace_table_plan, HandlerArgs, RwPgResponse}; use crate::error::{ErrorCode, Result}; use crate::TableCatalog; @@ -76,31 +74,43 @@ pub async fn handle_refresh_schema( .try_into() .unwrap(); - let result = replace_table_with_definition( - &session, - table_name, - definition, - &original_table, - Some(connector_schema), - ) - .await; - - match result { - Ok(_) => Ok(RwPgResponse::empty_result(StatementType::ALTER_TABLE)), - Err(e) => { - let report = e.to_report_string(); - // This is a workaround for reporting errors when columns to drop is referenced by generated column. - // Finding the actual columns to drop requires generating `PbSource` from the sql definition - // and fetching schema from schema registry, which will cause a lot of unnecessary refactor. - // Here we match the error message to yield when failing to bind generated column exprs. - let re = Regex::new(r#"fail to bind expression in generated column "(.*?)""#).unwrap(); - let captures = re.captures(&report).map_err(anyhow::Error::from)?; - if let Some(gen_col_name) = captures.and_then(|captures| captures.get(1)) { - Err(anyhow!(e).context(format!("failed to refresh schema because some of the columns to drop are referenced by a generated column \"{}\"", - gen_col_name.as_str())).into()) - } else { - Err(e) + let (source, table, graph, col_index_mapping, job_type) = { + let result = get_replace_table_plan( + &session, + table_name, + definition, + &original_table, + Some(connector_schema), + None, + ) + .await; + match result { + Ok((source, table, graph, col_index_mapping, job_type)) => { + Ok((source, table, graph, col_index_mapping, job_type)) + } + Err(e) => { + let report = e.to_report_string(); + // NOTE(yuhao): This is a workaround for reporting errors when columns to drop is referenced by generated column. + // Finding the actual columns to drop requires generating `PbSource` from the sql definition + // and fetching schema from schema registry, which will cause a lot of unnecessary refactor. + // Here we match the error message to yield when failing to bind generated column exprs. + let re = + Regex::new(r#"fail to bind expression in generated column "(.*?)""#).unwrap(); + let captures = re.captures(&report).map_err(anyhow::Error::from)?; + if let Some(gen_col_name) = captures.and_then(|captures| captures.get(1)) { + Err(anyhow!(e).context(format!("failed to refresh schema because some of the columns to drop are referenced by a generated column \"{}\"", + gen_col_name.as_str())).into()) + } else { + Err(e) + } } } - } + }?; + let catalog_writer = session.catalog_writer()?; + + catalog_writer + .replace_table(source, table, graph, col_index_mapping, job_type) + .await?; + + Ok(PgResponse::empty_result(StatementType::ALTER_TABLE)) } From 6e237f564968b8640b9299f3574428fd80c18868 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Mon, 4 Nov 2024 19:54:16 +0800 Subject: [PATCH 02/11] fmt --- src/frontend/src/handler/alter_table_column.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index da5cf88ea8cc5..ddd24bd9a4102 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -436,7 +436,7 @@ pub async fn handle_alter_table_column( definition, &original_catalog, format_encode, - None + None, ) .await?; From e63b32b92b62db106e73a7a8d53888eeac54a142 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 5 Nov 2024 18:34:30 +0800 Subject: [PATCH 03/11] refactor bind cdc table schema --- src/frontend/src/handler/create_table.rs | 148 +++++++++++------------ 1 file changed, 71 insertions(+), 77 deletions(-) diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 1e5dc489c1a0c..6c8e5415654d7 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -1031,19 +1031,15 @@ pub(super) async fn handle_create_table_plan( )?; source.clone() }; - let cdc_with_options = derive_with_options_for_cdc_table( + let cdc_with_options: WithOptionsSecResolved = derive_with_options_for_cdc_table( &source.with_properties, cdc_table.external_table_name.clone(), )?; - let (columns, pk_names) = derive_schema_for_cdc_table( - &column_defs, - &constraints, - cdc_with_options.clone(), - wildcard_idx.is_some(), - None, - ) - .await?; + let (columns, pk_names) = match wildcard_idx { + Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?, + None => bind_cdc_table_schema(&column_defs, &constraints, None)?, + }; let context: OptimizerContextRef = OptimizerContext::new(handler_args, explain_options).into(); @@ -1156,76 +1152,77 @@ struct CdcSchemaChangeArgs { } /// Derive schema for cdc table when create a new Table or alter an existing Table -async fn derive_schema_for_cdc_table( +async fn bind_cdc_table_schema_externally( + cdc_with_options: WithOptionsSecResolved, +) -> Result<(Vec, Vec)> { + // read cdc table schema from external db or parsing the schema from SQL definitions + Feature::CdcTableSchemaMap.check_available().map_err( + |err: risingwave_common::license::FeatureNotAvailable| { + ErrorCode::NotSupported( + err.to_report_string(), + "Please define the schema manually".to_owned(), + ) + }, + )?; + let (options, secret_refs) = cdc_with_options.into_parts(); + let config = ExternalTableConfig::try_from_btreemap(options, secret_refs) + .context("failed to extract external table config")?; + + let table = ExternalTableImpl::connect(config) + .await + .context("failed to auto derive table schema")?; + Ok(( + table + .column_descs() + .iter() + .cloned() + .map(|column_desc| ColumnCatalog { + column_desc, + is_hidden: false, + }) + .collect(), + table.pk_names().clone(), + )) +} + +/// Derive schema for cdc table when create a new Table or alter an existing Table +fn bind_cdc_table_schema( column_defs: &Vec, constraints: &Vec, - cdc_with_options: WithOptionsSecResolved, - need_auto_schema_map: bool, schema_change_args: Option, ) -> Result<(Vec, Vec)> { - // read cdc table schema from external db or parsing the schema from SQL definitions - if need_auto_schema_map { - Feature::CdcTableSchemaMap - .check_available() - .map_err(|err| { - ErrorCode::NotSupported( - err.to_report_string(), - "Please define the schema manually".to_owned(), - ) - })?; - let (options, secret_refs) = cdc_with_options.into_parts(); - let config = ExternalTableConfig::try_from_btreemap(options, secret_refs) - .context("failed to extract external table config")?; - - let table = ExternalTableImpl::connect(config) - .await - .context("failed to auto derive table schema")?; - Ok(( - table - .column_descs() - .iter() - .cloned() - .map(|column_desc| ColumnCatalog { - column_desc, - is_hidden: false, - }) - .collect(), - table.pk_names().clone(), - )) - } else { - let mut columns = bind_sql_columns(column_defs)?; - let pk_names = if let Some(args) = schema_change_args { - // If new_version_columns is provided, we are in the process of auto schema change. - // update the default value column since the default value column is not set in the - // column sql definition. - if let Some(new_version_columns) = args.new_version_columns { - for (col, new_version_col) in columns - .iter_mut() - .zip_eq_fast(new_version_columns.into_iter()) - { - assert_eq!(col.name(), new_version_col.name()); - col.column_desc.generated_or_default_column = - new_version_col.column_desc.generated_or_default_column; - } + let mut columns = bind_sql_columns(column_defs)?; + let pk_names = if let Some(args) = schema_change_args { + // If new_version_columns is provided, we are in the process of auto schema change. + // update the default value column since the default value column is not set in the + // column sql definition. + if let Some(new_version_columns) = args.new_version_columns { + for (col, new_version_col) in columns + .iter_mut() + .zip_eq_fast(new_version_columns.into_iter()) + { + assert_eq!(col.name(), new_version_col.name()); + col.column_desc.generated_or_default_column = + new_version_col.column_desc.generated_or_default_column; } + } - // For table created by `create table t (*)` the constraint is empty, we need to - // retrieve primary key names from original table catalog if available - args.original_catalog - .pk - .iter() - .map(|x| { - args.original_catalog.columns[x.column_index] - .name() - .to_string() - }) - .collect() - } else { - bind_sql_pk_names(column_defs, constraints)? - }; + // For table created by `create table t (*)` the constraint is empty, we need to + // retrieve primary key names from original table catalog if available + args.original_catalog + .pk + .iter() + .map(|x| { + args.original_catalog.columns[x.column_index] + .name() + .to_string() + }) + .collect() + } else { + bind_sql_pk_names(column_defs, constraints)? + }; - Ok((columns, pk_names)) - } + Ok((columns, pk_names)) } #[allow(clippy::too_many_arguments)] @@ -1387,17 +1384,14 @@ pub async fn generate_stream_graph_for_replace_table( cdc_table.external_table_name.clone(), )?; - let (columns, pk_names) = derive_schema_for_cdc_table( + let (columns, pk_names) = bind_cdc_table_schema( &column_defs, &constraints, - cdc_with_options.clone(), - false, Some(CdcSchemaChangeArgs { original_catalog: original_catalog.clone(), new_version_columns, }), - ) - .await?; + )?; let context: OptimizerContextRef = OptimizerContext::new(handler_args, ExplainOptions::default()).into(); From 322cbf060830ec9b5b038c4658c2c050859367f4 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 5 Nov 2024 19:14:26 +0800 Subject: [PATCH 04/11] some rename --- .../src/handler/alter_table_column.rs | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index ddd24bd9a4102..798d1e4790d80 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -138,8 +138,8 @@ fn to_ast_data_type(ty: &DataType) -> Result { pub async fn get_replace_table_plan( session: &Arc, table_name: ObjectName, - definition: Statement, - original_catalog: &Arc, + new_definition: Statement, + old_catalog: &Arc, format_encode: Option, new_version_columns: Option>, // only provided in auto schema change ) -> Result<( @@ -150,8 +150,8 @@ pub async fn get_replace_table_plan( TableJobType, )> { // Create handler args as if we're creating a new table with the altered definition. - let handler_args = HandlerArgs::new(session.clone(), &definition, Arc::from(""))?; - let col_id_gen = ColumnIdGenerator::new_alter(original_catalog); + let handler_args = HandlerArgs::new(session.clone(), &new_definition, Arc::from(""))?; + let col_id_gen = ColumnIdGenerator::new_alter(old_catalog); let Statement::CreateTable { columns, constraints, @@ -162,15 +162,15 @@ pub async fn get_replace_table_plan( wildcard_idx, cdc_table_info, .. - } = definition + } = new_definition else { - panic!("unexpected statement type: {:?}", definition); + panic!("unexpected statement type: {:?}", new_definition); }; let (mut graph, table, source, job_type) = generate_stream_graph_for_replace_table( session, table_name, - original_catalog, + old_catalog, format_encode, handler_args.clone(), col_id_gen, @@ -188,7 +188,7 @@ pub async fn get_replace_table_plan( // Calculate the mapping from the original columns to the new columns. let col_index_mapping = ColIndexMapping::new( - original_catalog + old_catalog .columns() .iter() .map(|old_c| { @@ -200,7 +200,7 @@ pub async fn get_replace_table_plan( table.columns.len(), ); - let incoming_sink_ids: HashSet<_> = original_catalog.incoming_sinks.iter().copied().collect(); + let incoming_sink_ids: HashSet<_> = old_catalog.incoming_sinks.iter().copied().collect(); let target_columns = table .columns @@ -220,7 +220,7 @@ pub async fn get_replace_table_plan( // Set some fields ourselves so that the meta service does not need to maintain them. let mut table = table; table.incoming_sinks = incoming_sink_ids.iter().copied().collect(); - table.maybe_vnode_count = VnodeCount::set(original_catalog.vnode_count()).to_protobuf(); + table.maybe_vnode_count = VnodeCount::set(old_catalog.vnode_count()).to_protobuf(); Ok((source, table, graph, col_index_mapping, job_type)) } From 47a190f37d6f34a9ffe4d8b175d83f5d28e93ffd Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 5 Nov 2024 20:45:31 +0800 Subject: [PATCH 05/11] not pass format encode because it has been in the table definition --- src/frontend/src/handler/alter_table_column.rs | 18 ++++++++---------- .../src/handler/alter_table_with_sr.rs | 12 +++--------- src/frontend/src/rpc/mod.rs | 1 - 3 files changed, 11 insertions(+), 20 deletions(-) diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index 798d1e4790d80..1d6fbc1036a98 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -140,7 +140,6 @@ pub async fn get_replace_table_plan( table_name: ObjectName, new_definition: Statement, old_catalog: &Arc, - format_encode: Option, new_version_columns: Option>, // only provided in auto schema change ) -> Result<( Option, @@ -161,12 +160,17 @@ pub async fn get_replace_table_plan( with_version_column, wildcard_idx, cdc_table_info, + format_encode, .. } = new_definition else { panic!("unexpected statement type: {:?}", new_definition); }; + let format_encode = format_encode + .clone() + .map(|format_encode| format_encode.into_v2_with_warning()); + let (mut graph, table, source, job_type) = generate_stream_graph_for_replace_table( session, table_name, @@ -307,6 +311,7 @@ pub async fn handle_alter_table_column( else { panic!("unexpected statement: {:?}", definition); }; + let format_encode = format_encode .clone() .map(|format_encode| format_encode.into_v2_with_warning()); @@ -430,15 +435,8 @@ pub async fn handle_alter_table_column( _ => unreachable!(), }; - let (source, table, graph, col_index_mapping, job_type) = get_replace_table_plan( - &session, - table_name, - definition, - &original_catalog, - format_encode, - None, - ) - .await?; + let (source, table, graph, col_index_mapping, job_type) = + get_replace_table_plan(&session, table_name, definition, &original_catalog, None).await?; let catalog_writer = session.catalog_writer()?; diff --git a/src/frontend/src/handler/alter_table_with_sr.rs b/src/frontend/src/handler/alter_table_with_sr.rs index 0f353fd969a10..5fcafa68f80e5 100644 --- a/src/frontend/src/handler/alter_table_with_sr.rs +++ b/src/frontend/src/handler/alter_table_with_sr.rs @@ -64,6 +64,7 @@ pub async fn handle_refresh_schema( format_encode.unwrap() }; + // NOTE(st1page): since we have not implemented alter format encode for table, it is actually no use. let definition = alter_definition_format_encode( &original_table.definition, format_encode.row_options.clone(), @@ -75,15 +76,8 @@ pub async fn handle_refresh_schema( .unwrap(); let (source, table, graph, col_index_mapping, job_type) = { - let result = get_replace_table_plan( - &session, - table_name, - definition, - &original_table, - Some(format_encode), - None, - ) - .await; + let result = + get_replace_table_plan(&session, table_name, definition, &original_table, None).await; match result { Ok((source, table, graph, col_index_mapping, job_type)) => { Ok((source, table, graph, col_index_mapping, job_type)) diff --git a/src/frontend/src/rpc/mod.rs b/src/frontend/src/rpc/mod.rs index b0472a431c2dd..2f5207ca6e928 100644 --- a/src/frontend/src/rpc/mod.rs +++ b/src/frontend/src/rpc/mod.rs @@ -105,7 +105,6 @@ async fn get_new_table_plan( table_name, new_table_definition, &original_catalog, - None, Some(new_version_columns), ) .await?; From d6fdbe07e33765f24bf41a17ca1d732b9e9d350f Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Thu, 7 Nov 2024 14:52:23 +0800 Subject: [PATCH 06/11] refactor bind pk and cdc table schema change unparse pk constraint --- .../src/handler/alter_table_column.rs | 21 ++++++- src/frontend/src/handler/create_source.rs | 5 +- src/frontend/src/handler/create_table.rs | 57 ++++++------------- 3 files changed, 40 insertions(+), 43 deletions(-) diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index 1d6fbc1036a98..d078a41e258cd 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -30,7 +30,7 @@ use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph}; use risingwave_sqlparser::ast::{ AlterTableOperation, ColumnDef, ColumnOption, DataType as AstDataType, Encode, - FormatEncodeOptions, ObjectName, Statement, StructField, + FormatEncodeOptions, Ident, ObjectName, Statement, StructField, TableConstraint, }; use risingwave_sqlparser::parser::Parser; @@ -43,6 +43,7 @@ use crate::catalog::table_catalog::TableType; use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{Expr, ExprImpl, InputRef, Literal}; use crate::handler::create_sink::{fetch_incoming_sinks, insert_merger_to_union_with_project}; +use crate::handler::create_table::bind_table_constraints; use crate::session::SessionImpl; use crate::{Binder, TableCatalog, WithOptions}; @@ -59,9 +60,11 @@ pub async fn get_new_table_definition_for_cdc_table( .context("unable to parse original table definition")? .try_into() .unwrap(); + let Statement::CreateTable { columns: original_columns, format_encode, + constraints, .. } = &mut definition else { @@ -73,6 +76,22 @@ pub async fn get_new_table_definition_for_cdc_table( "source schema should be None for CDC table" ); + if bind_table_constraints(constraints).unwrap().is_empty() { + // For table created by `create table t (*)` the constraint is empty, we need to + // retrieve primary key names from original table catalog if available + let pk_names: Vec<_> = original_catalog + .pk + .iter() + .map(|x| original_catalog.columns[x.column_index].name().to_string()) + .collect(); + + constraints.push(TableConstraint::Unique { + name: None, + columns: pk_names.iter().map(Ident::new_unchecked).collect(), + is_primary: true, + }); + } + let orig_column_catalog: HashMap = HashMap::from_iter( original_catalog .columns() diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 9c37799422a59..438e3cea1a736 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -78,7 +78,7 @@ use crate::error::{Result, RwError}; use crate::expr::Expr; use crate::handler::create_table::{ bind_pk_and_row_id_on_relation, bind_sql_column_constraints, bind_sql_columns, - bind_sql_pk_names, ensure_table_constraints_supported, ColumnIdGenerator, + bind_sql_pk_names, bind_table_constraints, ColumnIdGenerator, }; use crate::handler::util::SourceSchemaCompatExt; use crate::handler::HandlerArgs; @@ -1523,8 +1523,7 @@ pub async fn bind_create_source_or_table_with_connector( } } - ensure_table_constraints_supported(&constraints)?; - let sql_pk_names = bind_sql_pk_names(sql_columns_defs, &constraints)?; + let sql_pk_names = bind_sql_pk_names(sql_columns_defs, bind_table_constraints(&constraints)?)?; let columns_from_sql = bind_sql_columns(sql_columns_defs)?; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 6c8e5415654d7..d6cc912d935d0 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -360,25 +360,30 @@ pub fn bind_sql_column_constraints( Ok(()) } -pub fn ensure_table_constraints_supported(table_constraints: &[TableConstraint]) -> Result<()> { +/// Currently we only support Primary key table constraint, so just return pk names if it exists +pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result> { + let mut pk_column_names = vec![]; + for constraint in table_constraints { match constraint { TableConstraint::Unique { name: _, - columns: _, + columns, is_primary: true, - } => {} + } => { + pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec(); + } _ => bail_not_implemented!("table constraint \"{}\"", constraint), } } - Ok(()) + Ok(pk_column_names) } pub fn bind_sql_pk_names( columns_defs: &[ColumnDef], - table_constraints: &[TableConstraint], + pk_names_from_table_constraints: Vec, ) -> Result> { - let mut pk_column_names = vec![]; + let mut pk_column_names = pk_names_from_table_constraints; for column in columns_defs { for option_def in &column.options { @@ -391,19 +396,6 @@ pub fn bind_sql_pk_names( } } - for constraint in table_constraints { - if let TableConstraint::Unique { - name: _, - columns, - is_primary: true, - } = constraint - { - if !pk_column_names.is_empty() { - return Err(multiple_pk_definition_err()); - } - pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec(); - } - } Ok(pk_column_names) } @@ -585,8 +577,7 @@ pub(crate) fn gen_create_table_plan_without_source( with_version_column: Option, version: Option, ) -> Result<(PlanRef, PbTable)> { - ensure_table_constraints_supported(&constraints)?; - let pk_names = bind_sql_pk_names(&column_defs, &constraints)?; + let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?; let (mut columns, pk_column_ids, row_id_index) = bind_pk_and_row_id_on_relation(columns, pk_names, true)?; @@ -1192,7 +1183,7 @@ fn bind_cdc_table_schema( schema_change_args: Option, ) -> Result<(Vec, Vec)> { let mut columns = bind_sql_columns(column_defs)?; - let pk_names = if let Some(args) = schema_change_args { + if let Some(args) = schema_change_args { // If new_version_columns is provided, we are in the process of auto schema change. // update the default value column since the default value column is not set in the // column sql definition. @@ -1206,21 +1197,8 @@ fn bind_cdc_table_schema( new_version_col.column_desc.generated_or_default_column; } } - - // For table created by `create table t (*)` the constraint is empty, we need to - // retrieve primary key names from original table catalog if available - args.original_catalog - .pk - .iter() - .map(|x| { - args.original_catalog.columns[x.column_index] - .name() - .to_string() - }) - .collect() - } else { - bind_sql_pk_names(column_defs, constraints)? - }; + } + let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?; Ok((columns, pk_names)) } @@ -1613,8 +1591,9 @@ mod tests { for c in &mut columns { c.column_desc.column_id = col_id_gen.generate(c.name()) } - ensure_table_constraints_supported(&constraints)?; - let pk_names = bind_sql_pk_names(&column_defs, &constraints)?; + + let pk_names = + bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?; let (_, pk_column_ids, _) = bind_pk_and_row_id_on_relation(columns, pk_names, true)?; Ok(pk_column_ids) From 8354538719a58a06ed63cf1f75e92f53116f4994 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Thu, 7 Nov 2024 16:16:43 +0800 Subject: [PATCH 07/11] remove cdc args helper --- src/frontend/src/handler/create_table.rs | 44 ++++++++---------------- 1 file changed, 15 insertions(+), 29 deletions(-) diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index d6cc912d935d0..ce4626ca146a5 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -1135,13 +1135,6 @@ fn sanity_check_for_cdc_table( Ok(()) } -struct CdcSchemaChangeArgs { - /// original table catalog - original_catalog: Arc, - /// new version table columns, only provided in auto schema change - new_version_columns: Option>, -} - /// Derive schema for cdc table when create a new Table or alter an existing Table async fn bind_cdc_table_schema_externally( cdc_with_options: WithOptionsSecResolved, @@ -1180,24 +1173,23 @@ async fn bind_cdc_table_schema_externally( fn bind_cdc_table_schema( column_defs: &Vec, constraints: &Vec, - schema_change_args: Option, + new_version_columns: Option>, ) -> Result<(Vec, Vec)> { let mut columns = bind_sql_columns(column_defs)?; - if let Some(args) = schema_change_args { - // If new_version_columns is provided, we are in the process of auto schema change. - // update the default value column since the default value column is not set in the - // column sql definition. - if let Some(new_version_columns) = args.new_version_columns { - for (col, new_version_col) in columns - .iter_mut() - .zip_eq_fast(new_version_columns.into_iter()) - { - assert_eq!(col.name(), new_version_col.name()); - col.column_desc.generated_or_default_column = - new_version_col.column_desc.generated_or_default_column; - } + // If new_version_columns is provided, we are in the process of auto schema change. + // update the default value column since the default value column is not set in the + // column sql definition. + if let Some(new_version_columns) = new_version_columns { + for (col, new_version_col) in columns + .iter_mut() + .zip_eq_fast(new_version_columns.into_iter()) + { + assert_eq!(col.name(), new_version_col.name()); + col.column_desc.generated_or_default_column = + new_version_col.column_desc.generated_or_default_column; } } + let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?; Ok((columns, pk_names)) @@ -1362,14 +1354,8 @@ pub async fn generate_stream_graph_for_replace_table( cdc_table.external_table_name.clone(), )?; - let (columns, pk_names) = bind_cdc_table_schema( - &column_defs, - &constraints, - Some(CdcSchemaChangeArgs { - original_catalog: original_catalog.clone(), - new_version_columns, - }), - )?; + let (columns, pk_names) = + bind_cdc_table_schema(&column_defs, &constraints, new_version_columns)?; let context: OptimizerContextRef = OptimizerContext::new(handler_args, ExplainOptions::default()).into(); From 046335d26d21d753dfd4ef28f0d1c8f5b70f9f45 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Thu, 7 Nov 2024 16:17:46 +0800 Subject: [PATCH 08/11] fix risedev yml --- risedev.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/risedev.yml b/risedev.yml index ee5bd7b5628e0..8e3668dcb49c2 100644 --- a/risedev.yml +++ b/risedev.yml @@ -51,8 +51,8 @@ profile: # - use: compactor # If you want to create source from Kafka, uncomment the following lines - - use: kafka - persist-data: true + # - use: kafka + # persist-data: true # To enable Confluent schema registry, uncomment the following line # - use: schema-registry From 1e7c9c4f2de0dd30d3bf19d6e842ccea307f8784 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Thu, 7 Nov 2024 16:41:22 +0800 Subject: [PATCH 09/11] add multiple pk test --- src/frontend/src/handler/create_table.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index ce4626ca146a5..89aca1eeff80a 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -371,7 +371,10 @@ pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result { - pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec(); + if !pk_column_names.is_empty() { + return Err(multiple_pk_definition_err()); + } +pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec(); } _ => bail_not_implemented!("table constraint \"{}\"", constraint), } From b9c3302ea8c062b863cfab5840cfa733eaef6539 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Thu, 7 Nov 2024 16:45:24 +0800 Subject: [PATCH 10/11] fmt --- src/frontend/src/handler/create_table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 89aca1eeff80a..8b7732e7b425b 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -374,7 +374,7 @@ pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result bail_not_implemented!("table constraint \"{}\"", constraint), } From 0ed4550557fb97e554634246b21b308e03c70031 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Fri, 8 Nov 2024 15:35:43 +0800 Subject: [PATCH 11/11] resolve comment --- src/frontend/src/handler/alter_table_column.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index d078a41e258cd..69aacb11a2df0 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -76,7 +76,7 @@ pub async fn get_new_table_definition_for_cdc_table( "source schema should be None for CDC table" ); - if bind_table_constraints(constraints).unwrap().is_empty() { + if bind_table_constraints(constraints)?.is_empty() { // For table created by `create table t (*)` the constraint is empty, we need to // retrieve primary key names from original table catalog if available let pk_names: Vec<_> = original_catalog