From 3def29a180fa063d37fc78f567fe7622f0dc9c5c Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Thu, 24 Oct 2024 19:17:09 +0800 Subject: [PATCH] extract merge_columns_defs --- src/frontend/src/handler/create_source.rs | 62 ++++++++++++++--------- 1 file changed, 38 insertions(+), 24 deletions(-) diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 3616997d384cb..b62eab3dd55f8 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -676,6 +676,38 @@ pub fn handle_addition_columns( Ok(()) } +/// Merge the column defs from external and user's column definition in `CREATE` SQL. +pub(crate) fn merge_columns_defs( + cols_from_source: Vec, + cols_from_sql: Vec, + col_defs_from_sql: &[ColumnDef], + wildcard_idx: Option, +) -> Result> { + if cols_from_sql.is_empty() { + Ok(cols_from_source) + } else if let Some(wildcard_idx) = wildcard_idx { + if col_defs_from_sql.iter().any(|c| !c.is_generated()) { + Err(RwError::from(NotSupported( + "Only generated columns are allowed in user-defined schema from SQL".to_string(), + "Remove the non-generated columns".to_string(), + ))) + } else { + // Replace `*` with `cols_from_source` + let mut cols_from_sql = cols_from_sql; + let mut cols_from_source = cols_from_source; + let mut cols_from_sql_r = cols_from_sql.split_off(wildcard_idx); + cols_from_sql.append(&mut cols_from_source); + cols_from_sql.append(&mut cols_from_sql_r); + Ok(cols_from_sql) + } + } else { + Err(RwError::from(ProtocolError( + "User-defined schema from SQL is not allowed here, since RisingWave has got the column definitions from the external system. \ + You can try to use `CREATE TABLE/SOURCE obj_name (*, [generated column defs])`\ + Please refer to https://www.risingwave.dev/docs/current/sql-create-source/ for more information.".to_string()))) + } +} + /// Bind columns from both source and sql defined. pub(crate) fn bind_all_columns( source_schema: &ConnectorSchema, @@ -685,30 +717,12 @@ pub(crate) fn bind_all_columns( wildcard_idx: Option, ) -> Result> { if let Some(cols_from_source) = cols_from_source { - if cols_from_sql.is_empty() { - Ok(cols_from_source) - } else if let Some(wildcard_idx) = wildcard_idx { - if col_defs_from_sql.iter().any(|c| !c.is_generated()) { - Err(RwError::from(NotSupported( - "Only generated columns are allowed in user-defined schema from SQL" - .to_string(), - "Remove the non-generated columns".to_string(), - ))) - } else { - // Replace `*` with `cols_from_source` - let mut cols_from_sql = cols_from_sql; - let mut cols_from_source = cols_from_source; - let mut cols_from_sql_r = cols_from_sql.split_off(wildcard_idx); - cols_from_sql.append(&mut cols_from_source); - cols_from_sql.append(&mut cols_from_sql_r); - Ok(cols_from_sql) - } - } else { - // TODO(yuhao): https://github.com/risingwavelabs/risingwave/issues/12209 - Err(RwError::from(ProtocolError( - format!("User-defined schema from SQL is not allowed with FORMAT {} ENCODE {}. \ - Please refer to https://www.risingwave.dev/docs/current/sql-create-source/ for more information.", source_schema.format, source_schema.row_encode)))) - } + merge_columns_defs( + cols_from_source, + cols_from_sql, + col_defs_from_sql, + wildcard_idx, + ) } else { if wildcard_idx.is_some() { return Err(RwError::from(NotSupported(