Skip to content

Commit

Permalink
feat(frontend): define generated column on table/source with external…
Browse files Browse the repository at this point in the history
… schema (#14644)
  • Loading branch information
yuhao-su authored Jan 19, 2024
1 parent 9f14212 commit be7f0b6
Show file tree
Hide file tree
Showing 13 changed files with 115 additions and 19 deletions.
7 changes: 4 additions & 3 deletions e2e_test/sink/kafka/avro.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
statement ok
create table from_kafka ( primary key (some_key) )
create table from_kafka ( *, gen_i32_field int as int32_field + 2, primary key (some_key) )
include key as some_key
with (
connector = 'kafka',
Expand Down Expand Up @@ -52,6 +52,7 @@ select
float_field,
double_field,
int32_field,
gen_i32_field,
int64_field,
record_field,
array_field,
Expand All @@ -61,8 +62,8 @@ select
time_micros_field,
time_millis_field from from_kafka order by string_field;
----
t Rising \x6130 3.5 4.25 22 23 NULL {{NULL,3},NULL,{7,NULL,2}} 2006-01-02 22:04:05+00:00 NULL NULL 12:34:56.123456 NULL
f Wave \x5a4446 1.5 NULL 11 12 (,foo) NULL NULL 2006-01-02 22:04:05+00:00 2021-04-01 NULL 23:45:16.654
t Rising \x6130 3.5 4.25 22 24 23 NULL {{NULL,3},NULL,{7,NULL,2}} 2006-01-02 22:04:05+00:00 NULL NULL 12:34:56.123456 NULL
f Wave \x5a4446 1.5 NULL 11 13 12 (,foo) NULL NULL 2006-01-02 22:04:05+00:00 2021-04-01 NULL 23:45:16.654

statement error SchemaFetchError
create sink sink_err from into_kafka with (
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ impl TestCase {
append_only,
cdc_table_info,
include_column_options,
wildcard_idx,
..
} => {
let source_schema = source_schema.map(|schema| schema.into_v2_with_warning());
Expand All @@ -438,6 +439,7 @@ impl TestCase {
handler_args,
name,
columns,
wildcard_idx,
constraints,
if_not_exists,
source_schema,
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ pub async fn handle_alter_table_column(
constraints,
source_watermarks,
append_only,
wildcard_idx,
..
} = definition
else {
Expand All @@ -167,6 +168,7 @@ pub async fn handle_alter_table_column(
handler_args,
col_id_gen,
columns,
wildcard_idx,
constraints,
source_watermarks,
append_only,
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ pub(crate) async fn reparse_table_for_sink(
let col_id_gen = ColumnIdGenerator::new_alter(table_catalog);
let Statement::CreateTable {
columns,
wildcard_idx,
constraints,
source_watermarks,
append_only,
Expand All @@ -488,6 +489,7 @@ pub(crate) async fn reparse_table_for_sink(
handler_args,
col_id_gen,
columns,
wildcard_idx,
constraints,
source_watermarks,
append_only,
Expand Down
30 changes: 27 additions & 3 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_common::catalog::{
is_column_ids_dedup, ColumnCatalog, ColumnDesc, TableId, INITIAL_SOURCE_VERSION_ID,
KAFKA_TIMESTAMP_COLUMN_NAME,
};
use risingwave_common::error::ErrorCode::{self, InvalidInputSyntax, ProtocolError};
use risingwave_common::error::ErrorCode::{self, InvalidInputSyntax, NotSupported, ProtocolError};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::DataType;
use risingwave_connector::parser::{
Expand Down Expand Up @@ -556,17 +556,40 @@ pub(crate) fn bind_all_columns(
cols_from_source: Option<Vec<ColumnCatalog>>,
cols_from_sql: Vec<ColumnCatalog>,
col_defs_from_sql: &[ColumnDef],
wildcard_idx: Option<usize>,
) -> Result<Vec<ColumnCatalog>> {
if let Some(cols_from_source) = cols_from_source {
if cols_from_sql.is_empty() {
Ok(cols_from_source)
} else if let Some(wildcard_idx) = wildcard_idx {
if col_defs_from_sql.iter().any(|c| !c.is_generated()) {
Err(RwError::from(NotSupported(
"Only generated columns are allowed in user-defined schema from SQL"
.to_string(),
"Remove the non-generated columns".to_string(),
)))
} else {
// Replace `*` with `cols_from_source`
let mut cols_from_sql = cols_from_sql;
let mut cols_from_source = cols_from_source;
let mut cols_from_sql_r = cols_from_sql.split_off(wildcard_idx);
cols_from_sql.append(&mut cols_from_source);
cols_from_sql.append(&mut cols_from_sql_r);
Ok(cols_from_sql)
}
} else {
// TODO(yuhao): https://github.com/risingwavelabs/risingwave/issues/12209
Err(RwError::from(ProtocolError(
format!("User-defined schema from SQL is not allowed with FORMAT {} ENCODE {}. \
Please refer to https://www.risingwave.dev/docs/current/sql-create-source/ for more information.", source_schema.format, source_schema.row_encode))))
format!("User-defined schema from SQL is not allowed with FORMAT {} ENCODE {}. \
Please refer to https://www.risingwave.dev/docs/current/sql-create-source/ for more information.", source_schema.format, source_schema.row_encode))))
}
} else {
if wildcard_idx.is_some() {
return Err(RwError::from(NotSupported(
"Wildcard in user-defined schema is only allowed when there exists columns from external schema".to_string(),
"Remove the wildcard or use a source with external schema".to_string(),
)));
}
// FIXME(yuhao): cols_from_sql should be None is no `()` is given.
if cols_from_sql.is_empty() {
return Err(RwError::from(ProtocolError(
Expand Down Expand Up @@ -1147,6 +1170,7 @@ pub async fn handle_create_source(
columns_from_resolve_source,
columns_from_sql,
&stmt.columns,
stmt.wildcard_idx,
)?;
// add additional columns before bind pk, because `format upsert` requires the key column
handle_addition_columns(&with_properties, stmt.include_column_options, &mut columns)?;
Expand Down
9 changes: 9 additions & 0 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
context: OptimizerContext,
table_name: ObjectName,
column_defs: Vec<ColumnDef>,
wildcard_idx: Option<usize>,
constraints: Vec<TableConstraint>,
source_schema: ConnectorSchema,
source_watermarks: Vec<SourceWatermark>,
Expand Down Expand Up @@ -487,6 +488,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
columns_from_resolve_source,
columns_from_sql,
&column_defs,
wildcard_idx,
)?;

// add additional columns before bind pk, because `format upsert` requires the key column
Expand Down Expand Up @@ -882,13 +884,15 @@ fn derive_connect_properties(
Ok(connect_properties.into_iter().collect())
}

#[allow(clippy::too_many_arguments)]
pub(super) async fn handle_create_table_plan(
context: OptimizerContext,
col_id_gen: ColumnIdGenerator,
source_schema: Option<ConnectorSchema>,
cdc_table_info: Option<CdcTableInfo>,
table_name: ObjectName,
column_defs: Vec<ColumnDef>,
wildcard_idx: Option<usize>,
constraints: Vec<TableConstraint>,
source_watermarks: Vec<SourceWatermark>,
append_only: bool,
Expand All @@ -907,6 +911,7 @@ pub(super) async fn handle_create_table_plan(
context,
table_name.clone(),
column_defs,
wildcard_idx,
constraints,
source_schema,
source_watermarks,
Expand Down Expand Up @@ -958,6 +963,7 @@ pub async fn handle_create_table(
handler_args: HandlerArgs,
table_name: ObjectName,
column_defs: Vec<ColumnDef>,
wildcard_idx: Option<usize>,
constraints: Vec<TableConstraint>,
if_not_exists: bool,
source_schema: Option<ConnectorSchema>,
Expand Down Expand Up @@ -990,6 +996,7 @@ pub async fn handle_create_table(
cdc_table_info,
table_name.clone(),
column_defs,
wildcard_idx,
constraints,
source_watermarks,
append_only,
Expand Down Expand Up @@ -1051,6 +1058,7 @@ pub async fn generate_stream_graph_for_table(
handler_args: HandlerArgs,
col_id_gen: ColumnIdGenerator,
columns: Vec<ColumnDef>,
wildcard_idx: Option<usize>,
constraints: Vec<TableConstraint>,
source_watermarks: Vec<SourceWatermark>,
append_only: bool,
Expand All @@ -1064,6 +1072,7 @@ pub async fn generate_stream_graph_for_table(
context,
table_name,
columns,
wildcard_idx,
constraints,
source_schema,
source_watermarks,
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ async fn do_handle_explain(
append_only,
cdc_table_info,
include_column_options,
wildcard_idx,
..
} => {
let col_id_gen = ColumnIdGenerator::new_initial();
Expand All @@ -77,6 +78,7 @@ async fn do_handle_explain(
cdc_table_info,
name.clone(),
columns,
wildcard_idx,
constraints,
source_watermarks,
append_only,
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ pub async fn handle(
Statement::CreateTable {
name,
columns,
wildcard_idx,
constraints,
query,
with_options: _, // It is put in OptimizerContext
Expand Down Expand Up @@ -279,6 +280,7 @@ pub async fn handle(
handler_args,
name,
columns,
wildcard_idx,
constraints,
if_not_exists,
source_schema,
Expand Down
5 changes: 4 additions & 1 deletion src/sqlparser/src/ast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1117,6 +1117,8 @@ pub enum Statement {
name: ObjectName,
/// Optional schema
columns: Vec<ColumnDef>,
// The wildchar position in columns defined in sql. Only exist when using external schema.
wildcard_idx: Option<usize>,
constraints: Vec<TableConstraint>,
with_options: Vec<SqlOption>,
/// Optional schema of the external source with which the table is created
Expand Down Expand Up @@ -1606,6 +1608,7 @@ impl fmt::Display for Statement {
Statement::CreateTable {
name,
columns,
wildcard_idx,
constraints,
with_options,
or_replace,
Expand Down Expand Up @@ -1634,7 +1637,7 @@ impl fmt::Display for Statement {
name = name,
)?;
if !columns.is_empty() || !constraints.is_empty() {
write!(f, " {}", fmt_create_items(columns, constraints, source_watermarks)?)?;
write!(f, " {}", fmt_create_items(columns, constraints, source_watermarks, *wildcard_idx)?)?;
} else if query.is_none() {
// PostgreSQL allows `CREATE TABLE t ();`, but requires empty parens
write!(f, " ()")?;
Expand Down
34 changes: 30 additions & 4 deletions src/sqlparser/src/ast/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ macro_rules! impl_fmt_display {
pub struct CreateSourceStatement {
pub if_not_exists: bool,
pub columns: Vec<ColumnDef>,
// The wildchar position in columns defined in sql. Only exist when using external schema.
pub wildcard_idx: Option<usize>,
pub constraints: Vec<TableConstraint>,
pub source_name: ObjectName,
pub with_properties: WithProperties,
Expand Down Expand Up @@ -325,7 +327,8 @@ impl ParseTo for CreateSourceStatement {
impl_parse_to!(source_name: ObjectName, p);

// parse columns
let (columns, constraints, source_watermarks) = p.parse_columns_with_watermark()?;
let (columns, constraints, source_watermarks, wildcard_idx) =
p.parse_columns_with_watermark()?;
let include_options = p.parse_include_options()?;

let with_options = p.parse_with_properties()?;
Expand All @@ -343,6 +346,7 @@ impl ParseTo for CreateSourceStatement {
Ok(Self {
if_not_exists,
columns,
wildcard_idx,
constraints,
source_name,
with_properties: WithProperties(with_options),
Expand All @@ -357,11 +361,28 @@ pub(super) fn fmt_create_items(
columns: &[ColumnDef],
constraints: &[TableConstraint],
watermarks: &[SourceWatermark],
wildcard_idx: Option<usize>,
) -> std::result::Result<String, fmt::Error> {
let mut items = String::new();
let has_items = !columns.is_empty() || !constraints.is_empty() || !watermarks.is_empty();
let has_items = !columns.is_empty()
|| !constraints.is_empty()
|| !watermarks.is_empty()
|| wildcard_idx.is_some();
has_items.then(|| write!(&mut items, "("));
write!(&mut items, "{}", display_comma_separated(columns))?;
if let Some(wildcard_idx) = wildcard_idx {
let (columns_l, columns_r) = columns.split_at(wildcard_idx);
write!(&mut items, "{}", display_comma_separated(columns_l))?;
if !columns_l.is_empty() {
write!(&mut items, ", ")?;
}
write!(&mut items, "{}", Token::Mul)?;
if !columns_r.is_empty() {
write!(&mut items, ", ")?;
}
write!(&mut items, "{}", display_comma_separated(columns_r))?;
} else {
write!(&mut items, "{}", display_comma_separated(columns))?;
}
if !columns.is_empty() && (!constraints.is_empty() || !watermarks.is_empty()) {
write!(&mut items, ", ")?;
}
Expand All @@ -380,7 +401,12 @@ impl fmt::Display for CreateSourceStatement {
impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
impl_fmt_display!(source_name, v, self);

let items = fmt_create_items(&self.columns, &self.constraints, &self.source_watermarks)?;
let items = fmt_create_items(
&self.columns,
&self.constraints,
&self.source_watermarks,
self.wildcard_idx,
)?;
if !items.is_empty() {
v.push(items);
}
Expand Down
Loading

0 comments on commit be7f0b6

Please sign in to comment.