diff --git a/e2e_test/source_inline/kafka/include_key_as.slt b/e2e_test/source_inline/kafka/include_key_as.slt index 332e15bd54b03..2258204313b0a 100644 --- a/e2e_test/source_inline/kafka/include_key_as.slt +++ b/e2e_test/source_inline/kafka/include_key_as.slt @@ -186,21 +186,9 @@ rpk topic create 'test_additional_columns' system ok bash -c 'for i in {0..100}; do echo "key\$i:{\\"a\\": \$i}" | rpk topic produce test_additional_columns -f "%k:%v\\n" -H "header1=v1" -H "header2=v2"; done' -statement ok -create table additional_columns (a int) -include key as key_col -include partition as partition_col -include offset as offset_col -include timestamp as timestamp_col -include header as header_col -WITH ( - ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, - topic = 'test_additional_columns') -FORMAT PLAIN ENCODE JSON -# header with varchar type & non-exist header key statement error -create table additional_columns_1 (a int) +create table additional_columns (a int) include key as key_col include partition as partition_col include offset as offset_col @@ -220,13 +208,13 @@ Caused by: Protocol error: Only header column can have inner field, but got "timestamp" -# header with varchar type & non-exist header key statement ok -create table additional_columns_1 (a int) +create table additional_columns (a int) include key as key_col include partition as partition_col include offset as offset_col include timestamp as timestamp_col +include header as header_col_combined include header 'header1' as header_col_1 include header 'header2' as header_col_2 include header 'header2' varchar as header_col_3 @@ -236,7 +224,6 @@ WITH ( topic = 'test_additional_columns') FORMAT PLAIN ENCODE JSON - # Wait enough time to ensure SourceExecutor consumes all Kafka data. sleep 3s @@ -284,19 +271,20 @@ WHERE key_col IS NOT NULL AND partition_col IS NOT NULL AND offset_col IS NOT NULL AND timestamp_col IS NOT NULL - AND header_col IS NOT NULL + AND header_col_combined IS NOT NULL ---- 101 query ?? -SELECT (header_col[1]).key AS key, (header_col[1]).value::text AS value -FROM additional_columns limit 1; +WITH arr AS (SELECT header_col_combined FROM additional_columns limit 1) +SELECT unnest(header_col_combined) FROM arr ORDER BY 1; ---- -header1 \x7631 +(header1,"\\x7631") +(header2,"\\x7632") query ???? -select header_col_1, header_col_2, header_col_3, header_col_4 from additional_columns_1 limit 1 +select header_col_1, header_col_2, header_col_3, header_col_4 from additional_columns limit 1 ---- \x7631 \x7632 v2 NULL @@ -305,3 +293,6 @@ drop table upsert_students statement ok drop table plain_students + +statement ok +drop table additional_columns