Skip to content

Commit

Permalink
extract merge_columns_defs
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page committed Oct 24, 2024
1 parent 20a5c22 commit 3def29a
Showing 1 changed file with 38 additions and 24 deletions.
62 changes: 38 additions & 24 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,38 @@ pub fn handle_addition_columns(
Ok(())
}

/// Merge the column defs from external and user's column definition in `CREATE` SQL.
pub(crate) fn merge_columns_defs(
cols_from_source: Vec<ColumnCatalog>,
cols_from_sql: Vec<ColumnCatalog>,
col_defs_from_sql: &[ColumnDef],
wildcard_idx: Option<usize>,
) -> Result<Vec<ColumnCatalog>> {
if cols_from_sql.is_empty() {
Ok(cols_from_source)
} else if let Some(wildcard_idx) = wildcard_idx {
if col_defs_from_sql.iter().any(|c| !c.is_generated()) {
Err(RwError::from(NotSupported(
"Only generated columns are allowed in user-defined schema from SQL".to_string(),
"Remove the non-generated columns".to_string(),
)))
} else {
// Replace `*` with `cols_from_source`
let mut cols_from_sql = cols_from_sql;
let mut cols_from_source = cols_from_source;
let mut cols_from_sql_r = cols_from_sql.split_off(wildcard_idx);
cols_from_sql.append(&mut cols_from_source);
cols_from_sql.append(&mut cols_from_sql_r);
Ok(cols_from_sql)
}
} else {
Err(RwError::from(ProtocolError(
"User-defined schema from SQL is not allowed here, since RisingWave has got the column definitions from the external system. \
You can try to use `CREATE TABLE/SOURCE obj_name (*, [generated column defs])`\
Please refer to https://www.risingwave.dev/docs/current/sql-create-source/ for more information.".to_string())))
}
}

/// Bind columns from both source and sql defined.
pub(crate) fn bind_all_columns(
source_schema: &ConnectorSchema,
Expand All @@ -685,30 +717,12 @@ pub(crate) fn bind_all_columns(
wildcard_idx: Option<usize>,
) -> Result<Vec<ColumnCatalog>> {
if let Some(cols_from_source) = cols_from_source {
if cols_from_sql.is_empty() {
Ok(cols_from_source)
} else if let Some(wildcard_idx) = wildcard_idx {
if col_defs_from_sql.iter().any(|c| !c.is_generated()) {
Err(RwError::from(NotSupported(
"Only generated columns are allowed in user-defined schema from SQL"
.to_string(),
"Remove the non-generated columns".to_string(),
)))
} else {
// Replace `*` with `cols_from_source`
let mut cols_from_sql = cols_from_sql;
let mut cols_from_source = cols_from_source;
let mut cols_from_sql_r = cols_from_sql.split_off(wildcard_idx);
cols_from_sql.append(&mut cols_from_source);
cols_from_sql.append(&mut cols_from_sql_r);
Ok(cols_from_sql)
}
} else {
// TODO(yuhao): https://github.com/risingwavelabs/risingwave/issues/12209
Err(RwError::from(ProtocolError(
format!("User-defined schema from SQL is not allowed with FORMAT {} ENCODE {}. \
Please refer to https://www.risingwave.dev/docs/current/sql-create-source/ for more information.", source_schema.format, source_schema.row_encode))))
}
merge_columns_defs(
cols_from_source,
cols_from_sql,
col_defs_from_sql,
wildcard_idx,
)
} else {
if wildcard_idx.is_some() {
return Err(RwError::from(NotSupported(
Expand Down

0 comments on commit 3def29a

Please sign in to comment.