diff --git a/e2e_test/source_legacy/basic/kafka.slt b/e2e_test/source_legacy/basic/kafka.slt index 227c0aa46bac1..8dccb9f5bdb1b 100644 --- a/e2e_test/source_legacy/basic/kafka.slt +++ b/e2e_test/source_legacy/basic/kafka.slt @@ -496,6 +496,16 @@ WITH ( scan.startup.mode = 'earliest' ) FORMAT PLAIN ENCODE JSON +statement ok +CREATE TABLE test_include_payload_only +INCLUDE payload +WITH ( + connector = 'kafka', + topic = 'kafka_1_partition_topic', + properties.bootstrap.server = 'message_queue:29092', + scan.startup.mode = 'earliest' +) FORMAT PLAIN ENCODE JSON + statement ok flush; @@ -908,5 +918,8 @@ drop table source_with_rdkafka_props; statement ok drop table debezium_ignore_key; +statement ok +drop table test_include_payload_only; + statement ok drop table test_include_payload; diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index f168f7d0de3f6..2e8b88b0a6490 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -646,12 +646,6 @@ pub fn handle_addition_columns( )))); } - let latest_col_id: ColumnId = columns - .iter() - .map(|col| col.column_desc.column_id) - .max() - .unwrap(); // there must be at least one column in the column catalog - while let Some(item) = additional_columns.pop() { check_additional_column_compatibility(&item, source_schema)?; @@ -667,7 +661,7 @@ pub fn handle_addition_columns( ); } let col = build_additional_column_desc( - latest_col_id.next(), + ColumnId::placeholder(), connector_name.as_str(), item.column_type.real_value().as_str(), item.column_alias.map(|alias| alias.real_value()), @@ -722,12 +716,6 @@ pub(crate) fn bind_all_columns( "Remove the wildcard or use a source with external schema".to_string(), ))); } - // FIXME(yuhao): cols_from_sql should be None is no `()` is given. - if cols_from_sql.is_empty() { - return Err(RwError::from(ProtocolError( - "Schema definition is required, either from SQL or schema registry.".to_string(), - ))); - } let non_generated_sql_defined_columns = non_generated_sql_columns(col_defs_from_sql); match (&source_schema.format, &source_schema.row_encode) { (Format::DebeziumMongo, Encode::Json) => { @@ -1547,6 +1535,13 @@ pub async fn bind_create_source_or_table_with_connector( &mut columns, false, )?; + + if columns.is_empty() { + return Err(RwError::from(ProtocolError( + "Schema definition is required, either from SQL or schema registry.".to_string(), + ))); + } + // compatible with the behavior that add a hidden column `_rw_kafka_timestamp` to each message from Kafka source if is_create_source { // must behind `handle_addition_columns`