From c65834074c8334d835a03333362c0f2eb67a0802 Mon Sep 17 00:00:00 2001 From: stonepage <40830455+st1page@users.noreply.github.com> Date: Fri, 8 Nov 2024 16:09:46 +0800 Subject: [PATCH] refactor: some CDC table's code (#19255) --- .../src/handler/alter_table_column.rs | 88 ++++---- .../src/handler/alter_table_with_sr.rs | 64 +++--- src/frontend/src/handler/create_source.rs | 5 +- src/frontend/src/handler/create_table.rs | 188 +++++++----------- src/frontend/src/rpc/mod.rs | 1 - 5 files changed, 154 insertions(+), 192 deletions(-) diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index ce90fa94253f..69aacb11a2df 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -30,7 +30,7 @@ use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph}; use risingwave_sqlparser::ast::{ AlterTableOperation, ColumnDef, ColumnOption, DataType as AstDataType, Encode, - FormatEncodeOptions, ObjectName, Statement, StructField, + FormatEncodeOptions, Ident, ObjectName, Statement, StructField, TableConstraint, }; use risingwave_sqlparser::parser::Parser; @@ -43,34 +43,10 @@ use crate::catalog::table_catalog::TableType; use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{Expr, ExprImpl, InputRef, Literal}; use crate::handler::create_sink::{fetch_incoming_sinks, insert_merger_to_union_with_project}; +use crate::handler::create_table::bind_table_constraints; use crate::session::SessionImpl; use crate::{Binder, TableCatalog, WithOptions}; -pub async fn replace_table_with_definition( - session: &Arc, - table_name: ObjectName, - definition: Statement, - original_catalog: &Arc, - format_encode: Option, -) -> Result<()> { - let (source, table, graph, col_index_mapping, job_type) = get_replace_table_plan( - session, - table_name, - definition, - original_catalog, - format_encode, - None, - ) - .await?; - - let catalog_writer = session.catalog_writer()?; - - catalog_writer - .replace_table(source, table, graph, col_index_mapping, job_type) - .await?; - Ok(()) -} - /// Used in auto schema change process pub async fn get_new_table_definition_for_cdc_table( session: &Arc, @@ -84,9 +60,11 @@ pub async fn get_new_table_definition_for_cdc_table( .context("unable to parse original table definition")? .try_into() .unwrap(); + let Statement::CreateTable { columns: original_columns, format_encode, + constraints, .. } = &mut definition else { @@ -98,6 +76,22 @@ pub async fn get_new_table_definition_for_cdc_table( "source schema should be None for CDC table" ); + if bind_table_constraints(constraints)?.is_empty() { + // For table created by `create table t (*)` the constraint is empty, we need to + // retrieve primary key names from original table catalog if available + let pk_names: Vec<_> = original_catalog + .pk + .iter() + .map(|x| original_catalog.columns[x.column_index].name().to_string()) + .collect(); + + constraints.push(TableConstraint::Unique { + name: None, + columns: pk_names.iter().map(Ident::new_unchecked).collect(), + is_primary: true, + }); + } + let orig_column_catalog: HashMap = HashMap::from_iter( original_catalog .columns() @@ -163,9 +157,8 @@ fn to_ast_data_type(ty: &DataType) -> Result { pub async fn get_replace_table_plan( session: &Arc, table_name: ObjectName, - definition: Statement, - original_catalog: &Arc, - format_encode: Option, + new_definition: Statement, + old_catalog: &Arc, new_version_columns: Option>, // only provided in auto schema change ) -> Result<( Option, @@ -175,8 +168,8 @@ pub async fn get_replace_table_plan( TableJobType, )> { // Create handler args as if we're creating a new table with the altered definition. - let handler_args = HandlerArgs::new(session.clone(), &definition, Arc::from(""))?; - let col_id_gen = ColumnIdGenerator::new_alter(original_catalog); + let handler_args = HandlerArgs::new(session.clone(), &new_definition, Arc::from(""))?; + let col_id_gen = ColumnIdGenerator::new_alter(old_catalog); let Statement::CreateTable { columns, constraints, @@ -186,16 +179,21 @@ pub async fn get_replace_table_plan( with_version_column, wildcard_idx, cdc_table_info, + format_encode, .. - } = definition + } = new_definition else { - panic!("unexpected statement type: {:?}", definition); + panic!("unexpected statement type: {:?}", new_definition); }; + let format_encode = format_encode + .clone() + .map(|format_encode| format_encode.into_v2_with_warning()); + let (mut graph, table, source, job_type) = generate_stream_graph_for_replace_table( session, table_name, - original_catalog, + old_catalog, format_encode, handler_args.clone(), col_id_gen, @@ -213,7 +211,7 @@ pub async fn get_replace_table_plan( // Calculate the mapping from the original columns to the new columns. let col_index_mapping = ColIndexMapping::new( - original_catalog + old_catalog .columns() .iter() .map(|old_c| { @@ -225,7 +223,7 @@ pub async fn get_replace_table_plan( table.columns.len(), ); - let incoming_sink_ids: HashSet<_> = original_catalog.incoming_sinks.iter().copied().collect(); + let incoming_sink_ids: HashSet<_> = old_catalog.incoming_sinks.iter().copied().collect(); let target_columns = table .columns @@ -245,7 +243,7 @@ pub async fn get_replace_table_plan( // Set some fields ourselves so that the meta service does not need to maintain them. let mut table = table; table.incoming_sinks = incoming_sink_ids.iter().copied().collect(); - table.maybe_vnode_count = VnodeCount::set(original_catalog.vnode_count()).to_protobuf(); + table.maybe_vnode_count = VnodeCount::set(old_catalog.vnode_count()).to_protobuf(); Ok((source, table, graph, col_index_mapping, job_type)) } @@ -332,6 +330,7 @@ pub async fn handle_alter_table_column( else { panic!("unexpected statement: {:?}", definition); }; + let format_encode = format_encode .clone() .map(|format_encode| format_encode.into_v2_with_warning()); @@ -455,15 +454,14 @@ pub async fn handle_alter_table_column( _ => unreachable!(), }; - replace_table_with_definition( - &session, - table_name, - definition, - &original_catalog, - format_encode, - ) - .await?; + let (source, table, graph, col_index_mapping, job_type) = + get_replace_table_plan(&session, table_name, definition, &original_catalog, None).await?; + let catalog_writer = session.catalog_writer()?; + + catalog_writer + .replace_table(source, table, graph, col_index_mapping, job_type) + .await?; Ok(PgResponse::empty_result(StatementType::ALTER_TABLE)) } diff --git a/src/frontend/src/handler/alter_table_with_sr.rs b/src/frontend/src/handler/alter_table_with_sr.rs index b5489b28b58f..5fcafa68f80e 100644 --- a/src/frontend/src/handler/alter_table_with_sr.rs +++ b/src/frontend/src/handler/alter_table_with_sr.rs @@ -14,18 +14,16 @@ use anyhow::{anyhow, Context}; use fancy_regex::Regex; -use pgwire::pg_response::StatementType; +use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::bail_not_implemented; use risingwave_sqlparser::ast::{FormatEncodeOptions, ObjectName, Statement}; use risingwave_sqlparser::parser::Parser; use thiserror_ext::AsReport; use super::alter_source_with_sr::alter_definition_format_encode; -use super::alter_table_column::{ - fetch_table_catalog_for_alter, replace_table_with_definition, schema_has_schema_registry, -}; +use super::alter_table_column::{fetch_table_catalog_for_alter, schema_has_schema_registry}; use super::util::SourceSchemaCompatExt; -use super::{HandlerArgs, RwPgResponse}; +use super::{get_replace_table_plan, HandlerArgs, RwPgResponse}; use crate::error::{ErrorCode, Result}; use crate::TableCatalog; @@ -66,6 +64,7 @@ pub async fn handle_refresh_schema( format_encode.unwrap() }; + // NOTE(st1page): since we have not implemented alter format encode for table, it is actually no use. let definition = alter_definition_format_encode( &original_table.definition, format_encode.row_options.clone(), @@ -76,31 +75,36 @@ pub async fn handle_refresh_schema( .try_into() .unwrap(); - let result = replace_table_with_definition( - &session, - table_name, - definition, - &original_table, - Some(format_encode), - ) - .await; - - match result { - Ok(_) => Ok(RwPgResponse::empty_result(StatementType::ALTER_TABLE)), - Err(e) => { - let report = e.to_report_string(); - // This is a workaround for reporting errors when columns to drop is referenced by generated column. - // Finding the actual columns to drop requires generating `PbSource` from the sql definition - // and fetching schema from schema registry, which will cause a lot of unnecessary refactor. - // Here we match the error message to yield when failing to bind generated column exprs. - let re = Regex::new(r#"fail to bind expression in generated column "(.*?)""#).unwrap(); - let captures = re.captures(&report).map_err(anyhow::Error::from)?; - if let Some(gen_col_name) = captures.and_then(|captures| captures.get(1)) { - Err(anyhow!(e).context(format!("failed to refresh schema because some of the columns to drop are referenced by a generated column \"{}\"", - gen_col_name.as_str())).into()) - } else { - Err(e) + let (source, table, graph, col_index_mapping, job_type) = { + let result = + get_replace_table_plan(&session, table_name, definition, &original_table, None).await; + match result { + Ok((source, table, graph, col_index_mapping, job_type)) => { + Ok((source, table, graph, col_index_mapping, job_type)) + } + Err(e) => { + let report = e.to_report_string(); + // NOTE(yuhao): This is a workaround for reporting errors when columns to drop is referenced by generated column. + // Finding the actual columns to drop requires generating `PbSource` from the sql definition + // and fetching schema from schema registry, which will cause a lot of unnecessary refactor. + // Here we match the error message to yield when failing to bind generated column exprs. + let re = + Regex::new(r#"fail to bind expression in generated column "(.*?)""#).unwrap(); + let captures = re.captures(&report).map_err(anyhow::Error::from)?; + if let Some(gen_col_name) = captures.and_then(|captures| captures.get(1)) { + Err(anyhow!(e).context(format!("failed to refresh schema because some of the columns to drop are referenced by a generated column \"{}\"", + gen_col_name.as_str())).into()) + } else { + Err(e) + } } } - } + }?; + let catalog_writer = session.catalog_writer()?; + + catalog_writer + .replace_table(source, table, graph, col_index_mapping, job_type) + .await?; + + Ok(PgResponse::empty_result(StatementType::ALTER_TABLE)) } diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 9c37799422a5..438e3cea1a73 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -78,7 +78,7 @@ use crate::error::{Result, RwError}; use crate::expr::Expr; use crate::handler::create_table::{ bind_pk_and_row_id_on_relation, bind_sql_column_constraints, bind_sql_columns, - bind_sql_pk_names, ensure_table_constraints_supported, ColumnIdGenerator, + bind_sql_pk_names, bind_table_constraints, ColumnIdGenerator, }; use crate::handler::util::SourceSchemaCompatExt; use crate::handler::HandlerArgs; @@ -1523,8 +1523,7 @@ pub async fn bind_create_source_or_table_with_connector( } } - ensure_table_constraints_supported(&constraints)?; - let sql_pk_names = bind_sql_pk_names(sql_columns_defs, &constraints)?; + let sql_pk_names = bind_sql_pk_names(sql_columns_defs, bind_table_constraints(&constraints)?)?; let columns_from_sql = bind_sql_columns(sql_columns_defs)?; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 1e5dc489c1a0..8b7732e7b425 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -360,25 +360,33 @@ pub fn bind_sql_column_constraints( Ok(()) } -pub fn ensure_table_constraints_supported(table_constraints: &[TableConstraint]) -> Result<()> { +/// Currently we only support Primary key table constraint, so just return pk names if it exists +pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result> { + let mut pk_column_names = vec![]; + for constraint in table_constraints { match constraint { TableConstraint::Unique { name: _, - columns: _, + columns, is_primary: true, - } => {} + } => { + if !pk_column_names.is_empty() { + return Err(multiple_pk_definition_err()); + } + pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec(); + } _ => bail_not_implemented!("table constraint \"{}\"", constraint), } } - Ok(()) + Ok(pk_column_names) } pub fn bind_sql_pk_names( columns_defs: &[ColumnDef], - table_constraints: &[TableConstraint], + pk_names_from_table_constraints: Vec, ) -> Result> { - let mut pk_column_names = vec![]; + let mut pk_column_names = pk_names_from_table_constraints; for column in columns_defs { for option_def in &column.options { @@ -391,19 +399,6 @@ pub fn bind_sql_pk_names( } } - for constraint in table_constraints { - if let TableConstraint::Unique { - name: _, - columns, - is_primary: true, - } = constraint - { - if !pk_column_names.is_empty() { - return Err(multiple_pk_definition_err()); - } - pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec(); - } - } Ok(pk_column_names) } @@ -585,8 +580,7 @@ pub(crate) fn gen_create_table_plan_without_source( with_version_column: Option, version: Option, ) -> Result<(PlanRef, PbTable)> { - ensure_table_constraints_supported(&constraints)?; - let pk_names = bind_sql_pk_names(&column_defs, &constraints)?; + let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?; let (mut columns, pk_column_ids, row_id_index) = bind_pk_and_row_id_on_relation(columns, pk_names, true)?; @@ -1031,19 +1025,15 @@ pub(super) async fn handle_create_table_plan( )?; source.clone() }; - let cdc_with_options = derive_with_options_for_cdc_table( + let cdc_with_options: WithOptionsSecResolved = derive_with_options_for_cdc_table( &source.with_properties, cdc_table.external_table_name.clone(), )?; - let (columns, pk_names) = derive_schema_for_cdc_table( - &column_defs, - &constraints, - cdc_with_options.clone(), - wildcard_idx.is_some(), - None, - ) - .await?; + let (columns, pk_names) = match wildcard_idx { + Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?, + None => bind_cdc_table_schema(&column_defs, &constraints, None)?, + }; let context: OptimizerContextRef = OptimizerContext::new(handler_args, explain_options).into(); @@ -1148,84 +1138,64 @@ fn sanity_check_for_cdc_table( Ok(()) } -struct CdcSchemaChangeArgs { - /// original table catalog - original_catalog: Arc, - /// new version table columns, only provided in auto schema change - new_version_columns: Option>, +/// Derive schema for cdc table when create a new Table or alter an existing Table +async fn bind_cdc_table_schema_externally( + cdc_with_options: WithOptionsSecResolved, +) -> Result<(Vec, Vec)> { + // read cdc table schema from external db or parsing the schema from SQL definitions + Feature::CdcTableSchemaMap.check_available().map_err( + |err: risingwave_common::license::FeatureNotAvailable| { + ErrorCode::NotSupported( + err.to_report_string(), + "Please define the schema manually".to_owned(), + ) + }, + )?; + let (options, secret_refs) = cdc_with_options.into_parts(); + let config = ExternalTableConfig::try_from_btreemap(options, secret_refs) + .context("failed to extract external table config")?; + + let table = ExternalTableImpl::connect(config) + .await + .context("failed to auto derive table schema")?; + Ok(( + table + .column_descs() + .iter() + .cloned() + .map(|column_desc| ColumnCatalog { + column_desc, + is_hidden: false, + }) + .collect(), + table.pk_names().clone(), + )) } /// Derive schema for cdc table when create a new Table or alter an existing Table -async fn derive_schema_for_cdc_table( +fn bind_cdc_table_schema( column_defs: &Vec, constraints: &Vec, - cdc_with_options: WithOptionsSecResolved, - need_auto_schema_map: bool, - schema_change_args: Option, + new_version_columns: Option>, ) -> Result<(Vec, Vec)> { - // read cdc table schema from external db or parsing the schema from SQL definitions - if need_auto_schema_map { - Feature::CdcTableSchemaMap - .check_available() - .map_err(|err| { - ErrorCode::NotSupported( - err.to_report_string(), - "Please define the schema manually".to_owned(), - ) - })?; - let (options, secret_refs) = cdc_with_options.into_parts(); - let config = ExternalTableConfig::try_from_btreemap(options, secret_refs) - .context("failed to extract external table config")?; - - let table = ExternalTableImpl::connect(config) - .await - .context("failed to auto derive table schema")?; - Ok(( - table - .column_descs() - .iter() - .cloned() - .map(|column_desc| ColumnCatalog { - column_desc, - is_hidden: false, - }) - .collect(), - table.pk_names().clone(), - )) - } else { - let mut columns = bind_sql_columns(column_defs)?; - let pk_names = if let Some(args) = schema_change_args { - // If new_version_columns is provided, we are in the process of auto schema change. - // update the default value column since the default value column is not set in the - // column sql definition. - if let Some(new_version_columns) = args.new_version_columns { - for (col, new_version_col) in columns - .iter_mut() - .zip_eq_fast(new_version_columns.into_iter()) - { - assert_eq!(col.name(), new_version_col.name()); - col.column_desc.generated_or_default_column = - new_version_col.column_desc.generated_or_default_column; - } - } + let mut columns = bind_sql_columns(column_defs)?; + // If new_version_columns is provided, we are in the process of auto schema change. + // update the default value column since the default value column is not set in the + // column sql definition. + if let Some(new_version_columns) = new_version_columns { + for (col, new_version_col) in columns + .iter_mut() + .zip_eq_fast(new_version_columns.into_iter()) + { + assert_eq!(col.name(), new_version_col.name()); + col.column_desc.generated_or_default_column = + new_version_col.column_desc.generated_or_default_column; + } + } - // For table created by `create table t (*)` the constraint is empty, we need to - // retrieve primary key names from original table catalog if available - args.original_catalog - .pk - .iter() - .map(|x| { - args.original_catalog.columns[x.column_index] - .name() - .to_string() - }) - .collect() - } else { - bind_sql_pk_names(column_defs, constraints)? - }; + let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?; - Ok((columns, pk_names)) - } + Ok((columns, pk_names)) } #[allow(clippy::too_many_arguments)] @@ -1387,17 +1357,8 @@ pub async fn generate_stream_graph_for_replace_table( cdc_table.external_table_name.clone(), )?; - let (columns, pk_names) = derive_schema_for_cdc_table( - &column_defs, - &constraints, - cdc_with_options.clone(), - false, - Some(CdcSchemaChangeArgs { - original_catalog: original_catalog.clone(), - new_version_columns, - }), - ) - .await?; + let (columns, pk_names) = + bind_cdc_table_schema(&column_defs, &constraints, new_version_columns)?; let context: OptimizerContextRef = OptimizerContext::new(handler_args, ExplainOptions::default()).into(); @@ -1619,8 +1580,9 @@ mod tests { for c in &mut columns { c.column_desc.column_id = col_id_gen.generate(c.name()) } - ensure_table_constraints_supported(&constraints)?; - let pk_names = bind_sql_pk_names(&column_defs, &constraints)?; + + let pk_names = + bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?; let (_, pk_column_ids, _) = bind_pk_and_row_id_on_relation(columns, pk_names, true)?; Ok(pk_column_ids) diff --git a/src/frontend/src/rpc/mod.rs b/src/frontend/src/rpc/mod.rs index b0472a431c2d..2f5207ca6e92 100644 --- a/src/frontend/src/rpc/mod.rs +++ b/src/frontend/src/rpc/mod.rs @@ -105,7 +105,6 @@ async fn get_new_table_plan( table_name, new_table_definition, &original_catalog, - None, Some(new_version_columns), ) .await?;