Skip to content

Commit

Permalink
fix(frontend): allow INCLUDE payload without other columns (#19003) (
Browse files Browse the repository at this point in the history
…#19006)

Co-authored-by: xiangjinwu <[email protected]>
  • Loading branch information
github-actions[bot] and xiangjinwu authored Oct 21, 2024
1 parent 58e12ff commit a54d508
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 13 deletions.
13 changes: 13 additions & 0 deletions e2e_test/source_legacy/basic/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,16 @@ WITH (
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON

statement ok
CREATE TABLE test_include_payload_only
INCLUDE payload
WITH (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON

statement ok
flush;

Expand Down Expand Up @@ -911,6 +921,9 @@ drop table source_with_rdkafka_props;
statement ok
drop table debezium_ignore_key;

statement ok
drop table test_include_payload_only;

statement ok
drop table test_include_payload;

Expand Down
21 changes: 8 additions & 13 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,20 +646,14 @@ pub fn handle_addition_columns(
))));
}

let latest_col_id: ColumnId = columns
.iter()
.map(|col| col.column_desc.column_id)
.max()
.unwrap(); // there must be at least one column in the column catalog

while let Some(item) = additional_columns.pop() {
check_additional_column_compatibility(&item, source_schema)?;

let data_type_name: Option<String> = item
.header_inner_expect_type
.map(|dt| format!("{:?}", dt).to_lowercase());
let col = build_additional_column_desc(
latest_col_id.next(),
ColumnId::placeholder(),
connector_name.as_str(),
item.column_type.real_value().as_str(),
item.column_alias.map(|alias| alias.real_value()),
Expand Down Expand Up @@ -714,12 +708,6 @@ pub(crate) fn bind_all_columns(
"Remove the wildcard or use a source with external schema".to_string(),
)));
}
// FIXME(yuhao): cols_from_sql should be None is no `()` is given.
if cols_from_sql.is_empty() {
return Err(RwError::from(ProtocolError(
"Schema definition is required, either from SQL or schema registry.".to_string(),
)));
}
let non_generated_sql_defined_columns = non_generated_sql_columns(col_defs_from_sql);
match (&source_schema.format, &source_schema.row_encode) {
(Format::DebeziumMongo, Encode::Json) => {
Expand Down Expand Up @@ -1539,6 +1527,13 @@ pub async fn bind_create_source_or_table_with_connector(
&mut columns,
false,
)?;

if columns.is_empty() {
return Err(RwError::from(ProtocolError(
"Schema definition is required, either from SQL or schema registry.".to_string(),
)));
}

// compatible with the behavior that add a hidden column `_rw_kafka_timestamp` to each message from Kafka source
if is_create_source {
// must behind `handle_addition_columns`
Expand Down

0 comments on commit a54d508

Please sign in to comment.