Skip to content

Commit

Permalink
fix(frontend): allow INCLUDE payload without other columns (#19003)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu authored and xiangjinwu committed Oct 18, 2024
1 parent 55dc238 commit 557862d
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 13 deletions.
13 changes: 13 additions & 0 deletions e2e_test/source_legacy/basic/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,16 @@ WITH (
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON

statement ok
CREATE TABLE test_include_payload_only
INCLUDE payload
WITH (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON

statement ok
flush;

Expand Down Expand Up @@ -908,5 +918,8 @@ drop table source_with_rdkafka_props;
statement ok
drop table debezium_ignore_key;

statement ok
drop table test_include_payload_only;

statement ok
drop table test_include_payload;
21 changes: 8 additions & 13 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,20 +646,14 @@ pub fn handle_addition_columns(
))));
}

let latest_col_id: ColumnId = columns
.iter()
.map(|col| col.column_desc.column_id)
.max()
.unwrap(); // there must be at least one column in the column catalog

while let Some(item) = additional_columns.pop() {
check_additional_column_compatibility(&item, source_schema)?;

let data_type_name: Option<String> = item
.header_inner_expect_type
.map(|dt| format!("{:?}", dt).to_lowercase());
let col = build_additional_column_desc(
latest_col_id.next(),
ColumnId::placeholder(),
connector_name.as_str(),
item.column_type.real_value().as_str(),
item.column_alias.map(|alias| alias.real_value()),
Expand Down Expand Up @@ -714,12 +708,6 @@ pub(crate) fn bind_all_columns(
"Remove the wildcard or use a source with external schema".to_string(),
)));
}
// FIXME(yuhao): cols_from_sql should be None is no `()` is given.
if cols_from_sql.is_empty() {
return Err(RwError::from(ProtocolError(
"Schema definition is required, either from SQL or schema registry.".to_string(),
)));
}
let non_generated_sql_defined_columns = non_generated_sql_columns(col_defs_from_sql);
match (&source_schema.format, &source_schema.row_encode) {
(Format::DebeziumMongo, Encode::Json) => {
Expand Down Expand Up @@ -1539,6 +1527,13 @@ pub async fn bind_create_source_or_table_with_connector(
&mut columns,
false,
)?;

if columns.is_empty() {
return Err(RwError::from(ProtocolError(
"Schema definition is required, either from SQL or schema registry.".to_string(),
)));
}

// compatible with the behavior that add a hidden column `_rw_kafka_timestamp` to each message from Kafka source
if is_create_source {
// must behind `handle_addition_columns`
Expand Down

0 comments on commit 557862d

Please sign in to comment.