Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): define generated column on table/source with external schema #14644

Merged
merged 6 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions e2e_test/sink/kafka/avro.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
statement ok
create table from_kafka ( primary key (some_key) )
create table from_kafka ( *, gen_i32_field int as int32_field + 2, primary key (some_key) )
include key as some_key
with (
connector = 'kafka',
Expand Down Expand Up @@ -52,6 +52,7 @@ select
float_field,
double_field,
int32_field,
gen_i32_field,
int64_field,
record_field,
array_field,
Expand All @@ -61,8 +62,8 @@ select
time_micros_field,
time_millis_field from from_kafka order by string_field;
----
t Rising \x6130 3.5 4.25 22 23 NULL {{NULL,3},NULL,{7,NULL,2}} 2006-01-02 22:04:05+00:00 NULL NULL 12:34:56.123456 NULL
f Wave \x5a4446 1.5 NULL 11 12 (,foo) NULL NULL 2006-01-02 22:04:05+00:00 2021-04-01 NULL 23:45:16.654
t Rising \x6130 3.5 4.25 22 24 23 NULL {{NULL,3},NULL,{7,NULL,2}} 2006-01-02 22:04:05+00:00 NULL NULL 12:34:56.123456 NULL
f Wave \x5a4446 1.5 NULL 11 13 12 (,foo) NULL NULL 2006-01-02 22:04:05+00:00 2021-04-01 NULL 23:45:16.654

statement error SchemaFetchError
create sink sink_err from into_kafka with (
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ impl TestCase {
append_only,
cdc_table_info,
include_column_options,
wildcard_idx,
..
} => {
let source_schema = source_schema.map(|schema| schema.into_v2_with_warning());
Expand All @@ -438,6 +439,7 @@ impl TestCase {
handler_args,
name,
columns,
wildcard_idx,
constraints,
if_not_exists,
source_schema,
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ pub async fn handle_alter_table_column(
constraints,
source_watermarks,
append_only,
wildcard_idx,
..
} = definition
else {
Expand All @@ -167,6 +168,7 @@ pub async fn handle_alter_table_column(
handler_args,
col_id_gen,
columns,
wildcard_idx,
constraints,
source_watermarks,
append_only,
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ pub(crate) async fn reparse_table_for_sink(
let col_id_gen = ColumnIdGenerator::new_alter(table_catalog);
let Statement::CreateTable {
columns,
wildcard_idx,
constraints,
source_watermarks,
append_only,
Expand All @@ -488,6 +489,7 @@ pub(crate) async fn reparse_table_for_sink(
handler_args,
col_id_gen,
columns,
wildcard_idx,
constraints,
source_watermarks,
append_only,
Expand Down
30 changes: 27 additions & 3 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_common::catalog::{
is_column_ids_dedup, ColumnCatalog, ColumnDesc, TableId, INITIAL_SOURCE_VERSION_ID,
KAFKA_TIMESTAMP_COLUMN_NAME,
};
use risingwave_common::error::ErrorCode::{self, InvalidInputSyntax, ProtocolError};
use risingwave_common::error::ErrorCode::{self, InvalidInputSyntax, NotSupported, ProtocolError};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::DataType;
use risingwave_connector::parser::{
Expand Down Expand Up @@ -556,17 +556,40 @@ pub(crate) fn bind_all_columns(
cols_from_source: Option<Vec<ColumnCatalog>>,
cols_from_sql: Vec<ColumnCatalog>,
col_defs_from_sql: &[ColumnDef],
wildcard_idx: Option<usize>,
) -> Result<Vec<ColumnCatalog>> {
if let Some(cols_from_source) = cols_from_source {
if cols_from_sql.is_empty() {
Ok(cols_from_source)
} else if let Some(wildcard_idx) = wildcard_idx {
if col_defs_from_sql.iter().any(|c| !c.is_generated()) {
Err(RwError::from(NotSupported(
"Only generated columns are allowed in user-defined schema from SQL"
.to_string(),
"Remove the non-generated columns".to_string(),
)))
} else {
// Replace `*` with `cols_from_source`
let mut cols_from_sql = cols_from_sql;
let mut cols_from_source = cols_from_source;
let mut cols_from_sql_r = cols_from_sql.split_off(wildcard_idx);
cols_from_sql.append(&mut cols_from_source);
cols_from_sql.append(&mut cols_from_sql_r);
Ok(cols_from_sql)
}
} else {
// TODO(yuhao): https://github.com/risingwavelabs/risingwave/issues/12209
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This TODO should be removed

Err(RwError::from(ProtocolError(
format!("User-defined schema from SQL is not allowed with FORMAT {} ENCODE {}. \
Please refer to https://www.risingwave.dev/docs/current/sql-create-source/ for more information.", source_schema.format, source_schema.row_encode))))
format!("User-defined schema from SQL is not allowed with FORMAT {} ENCODE {}. \
Please refer to https://www.risingwave.dev/docs/current/sql-create-source/ for more information.", source_schema.format, source_schema.row_encode))))
}
} else {
if wildcard_idx.is_some() {
return Err(RwError::from(NotSupported(
"Wildcard in user-defined schema is only allowed when there exists columns from external schema".to_string(),
"Remove the wildcard or use a source with external schema".to_string(),
)));
}
// FIXME(yuhao): cols_from_sql should be None is no `()` is given.
if cols_from_sql.is_empty() {
return Err(RwError::from(ProtocolError(
Expand Down Expand Up @@ -1147,6 +1170,7 @@ pub async fn handle_create_source(
columns_from_resolve_source,
columns_from_sql,
&stmt.columns,
stmt.wildcard_idx,
)?;
// add additional columns before bind pk, because `format upsert` requires the key column
handle_addition_columns(&with_properties, stmt.include_column_options, &mut columns)?;
Expand Down
9 changes: 9 additions & 0 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
context: OptimizerContext,
table_name: ObjectName,
column_defs: Vec<ColumnDef>,
wildcard_idx: Option<usize>,
constraints: Vec<TableConstraint>,
source_schema: ConnectorSchema,
source_watermarks: Vec<SourceWatermark>,
Expand Down Expand Up @@ -487,6 +488,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
columns_from_resolve_source,
columns_from_sql,
&column_defs,
wildcard_idx,
)?;

// add additional columns before bind pk, because `format upsert` requires the key column
Expand Down Expand Up @@ -882,13 +884,15 @@ fn derive_connect_properties(
Ok(connect_properties.into_iter().collect())
}

#[allow(clippy::too_many_arguments)]
pub(super) async fn handle_create_table_plan(
context: OptimizerContext,
col_id_gen: ColumnIdGenerator,
source_schema: Option<ConnectorSchema>,
cdc_table_info: Option<CdcTableInfo>,
table_name: ObjectName,
column_defs: Vec<ColumnDef>,
wildcard_idx: Option<usize>,
constraints: Vec<TableConstraint>,
source_watermarks: Vec<SourceWatermark>,
append_only: bool,
Expand All @@ -907,6 +911,7 @@ pub(super) async fn handle_create_table_plan(
context,
table_name.clone(),
column_defs,
wildcard_idx,
constraints,
source_schema,
source_watermarks,
Expand Down Expand Up @@ -958,6 +963,7 @@ pub async fn handle_create_table(
handler_args: HandlerArgs,
table_name: ObjectName,
column_defs: Vec<ColumnDef>,
wildcard_idx: Option<usize>,
constraints: Vec<TableConstraint>,
if_not_exists: bool,
source_schema: Option<ConnectorSchema>,
Expand Down Expand Up @@ -990,6 +996,7 @@ pub async fn handle_create_table(
cdc_table_info,
table_name.clone(),
column_defs,
wildcard_idx,
constraints,
source_watermarks,
append_only,
Expand Down Expand Up @@ -1051,6 +1058,7 @@ pub async fn generate_stream_graph_for_table(
handler_args: HandlerArgs,
col_id_gen: ColumnIdGenerator,
columns: Vec<ColumnDef>,
wildcard_idx: Option<usize>,
constraints: Vec<TableConstraint>,
source_watermarks: Vec<SourceWatermark>,
append_only: bool,
Expand All @@ -1064,6 +1072,7 @@ pub async fn generate_stream_graph_for_table(
context,
table_name,
columns,
wildcard_idx,
constraints,
source_schema,
source_watermarks,
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ async fn do_handle_explain(
append_only,
cdc_table_info,
include_column_options,
wildcard_idx,
..
} => {
let col_id_gen = ColumnIdGenerator::new_initial();
Expand All @@ -77,6 +78,7 @@ async fn do_handle_explain(
cdc_table_info,
name.clone(),
columns,
wildcard_idx,
constraints,
source_watermarks,
append_only,
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ pub async fn handle(
Statement::CreateTable {
name,
columns,
wildcard_idx,
constraints,
query,
with_options: _, // It is put in OptimizerContext
Expand Down Expand Up @@ -279,6 +280,7 @@ pub async fn handle(
handler_args,
name,
columns,
wildcard_idx,
constraints,
if_not_exists,
source_schema,
Expand Down
5 changes: 4 additions & 1 deletion src/sqlparser/src/ast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1117,6 +1117,8 @@ pub enum Statement {
name: ObjectName,
/// Optional schema
columns: Vec<ColumnDef>,
// The wildchar position in columns defined in sql. Only exist when using external schema.
wildcard_idx: Option<usize>,
constraints: Vec<TableConstraint>,
with_options: Vec<SqlOption>,
/// Optional schema of the external source with which the table is created
Expand Down Expand Up @@ -1606,6 +1608,7 @@ impl fmt::Display for Statement {
Statement::CreateTable {
name,
columns,
wildcard_idx,
constraints,
with_options,
or_replace,
Expand Down Expand Up @@ -1634,7 +1637,7 @@ impl fmt::Display for Statement {
name = name,
)?;
if !columns.is_empty() || !constraints.is_empty() {
write!(f, " {}", fmt_create_items(columns, constraints, source_watermarks)?)?;
write!(f, " {}", fmt_create_items(columns, constraints, source_watermarks, *wildcard_idx)?)?;
} else if query.is_none() {
// PostgreSQL allows `CREATE TABLE t ();`, but requires empty parens
write!(f, " ()")?;
Expand Down
34 changes: 30 additions & 4 deletions src/sqlparser/src/ast/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ macro_rules! impl_fmt_display {
pub struct CreateSourceStatement {
pub if_not_exists: bool,
pub columns: Vec<ColumnDef>,
// The wildchar position in columns defined in sql. Only exist when using external schema.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what doesexternal schema refer to

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Schema from external connector

pub wildcard_idx: Option<usize>,
pub constraints: Vec<TableConstraint>,
pub source_name: ObjectName,
pub with_properties: WithProperties,
Expand Down Expand Up @@ -325,7 +327,8 @@ impl ParseTo for CreateSourceStatement {
impl_parse_to!(source_name: ObjectName, p);

// parse columns
let (columns, constraints, source_watermarks) = p.parse_columns_with_watermark()?;
let (columns, constraints, source_watermarks, wildcard_idx) =
p.parse_columns_with_watermark()?;
let include_options = p.parse_include_options()?;

let with_options = p.parse_with_properties()?;
Expand All @@ -343,6 +346,7 @@ impl ParseTo for CreateSourceStatement {
Ok(Self {
if_not_exists,
columns,
wildcard_idx,
constraints,
source_name,
with_properties: WithProperties(with_options),
Expand All @@ -357,11 +361,28 @@ pub(super) fn fmt_create_items(
columns: &[ColumnDef],
constraints: &[TableConstraint],
watermarks: &[SourceWatermark],
wildcard_idx: Option<usize>,
) -> std::result::Result<String, fmt::Error> {
let mut items = String::new();
let has_items = !columns.is_empty() || !constraints.is_empty() || !watermarks.is_empty();
let has_items = !columns.is_empty()
|| !constraints.is_empty()
|| !watermarks.is_empty()
|| wildcard_idx.is_some();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we store the *'s index in catalog?
Is it true that the columns on the left of * means defined schema and on the right means generated columns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be in the sql string in catalog

Is it true that the columns on the left of * means defined schema and on the right means generated columns
No, * means all columns from external source

has_items.then(|| write!(&mut items, "("));
write!(&mut items, "{}", display_comma_separated(columns))?;
if let Some(wildcard_idx) = wildcard_idx {
let (columns_l, columns_r) = columns.split_at(wildcard_idx);
write!(&mut items, "{}", display_comma_separated(columns_l))?;
if !columns_l.is_empty() {
write!(&mut items, ", ")?;
}
write!(&mut items, "{}", Token::Mul)?;
if !columns_r.is_empty() {
write!(&mut items, ", ")?;
}
write!(&mut items, "{}", display_comma_separated(columns_r))?;
} else {
write!(&mut items, "{}", display_comma_separated(columns))?;
}
if !columns.is_empty() && (!constraints.is_empty() || !watermarks.is_empty()) {
write!(&mut items, ", ")?;
}
Expand All @@ -380,7 +401,12 @@ impl fmt::Display for CreateSourceStatement {
impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
impl_fmt_display!(source_name, v, self);

let items = fmt_create_items(&self.columns, &self.constraints, &self.source_watermarks)?;
let items = fmt_create_items(
&self.columns,
&self.constraints,
&self.source_watermarks,
self.wildcard_idx,
)?;
if !items.is_empty() {
v.push(items);
}
Expand Down
Loading
Loading