From b19f423d8230dabc1f893b9649913822cb762ad5 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Mon, 20 Nov 2023 17:49:34 +0800 Subject: [PATCH 1/6] add test --- .../tests/testdata/input/batch_source.yaml | 16 +++++++++++++++ .../tests/testdata/output/batch_source.yaml | 20 +++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/src/frontend/planner_test/tests/testdata/input/batch_source.yaml b/src/frontend/planner_test/tests/testdata/input/batch_source.yaml index e4fa78287f3d3..5890cc6866b9f 100644 --- a/src/frontend/planner_test/tests/testdata/input/batch_source.yaml +++ b/src/frontend/planner_test/tests/testdata/input/batch_source.yaml @@ -14,3 +14,19 @@ expected_outputs: - batch_plan - logical_plan +- sql: | + insert into s values (1,2); + create_table_with_connector: + format: plain + encode: protobuf + name: s + file: | + syntax = "proto3"; + package test; + message TestRecord { + int32 id = 1; + int32 value = 2; + } + expected_outputs: + - batch_plan + - logical_plan diff --git a/src/frontend/planner_test/tests/testdata/output/batch_source.yaml b/src/frontend/planner_test/tests/testdata/output/batch_source.yaml index 7b4d42cba1350..ac3c767343b04 100644 --- a/src/frontend/planner_test/tests/testdata/output/batch_source.yaml +++ b/src/frontend/planner_test/tests/testdata/output/batch_source.yaml @@ -19,3 +19,23 @@ int32 id = 1; int32 value = 2; } +- sql: | + insert into s values (1,2); + logical_plan: |- + LogicalInsert { table: s, mapping: [0:0, 1:1] } + └─LogicalValues { rows: [[1:Int32, 2:Int32]], schema: Schema { fields: [*VALUES*_0.column_0:Int32, *VALUES*_0.column_1:Int32] } } + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchInsert { table: s, mapping: [0:0, 1:1] } + └─BatchValues { rows: [[1:Int32, 2:Int32]] } + create_table_with_connector: + format: plain + encode: protobuf + name: s + file: | + syntax = "proto3"; + package test; + message TestRecord { + int32 id = 1; + int32 value = 2; + } From 35d78fbd11e3e2dd3e5797bbb9cd348ea24a2046 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Mon, 20 Nov 2023 17:51:23 +0800 Subject: [PATCH 2/6] fix(source): make _rw_key not hidden column --- .../tests/testdata/input/batch_source.yaml | 16 +++++++++++ .../tests/testdata/output/batch_source.yaml | 28 ++++++++++++++++--- .../tests/testdata/output/struct_query.yaml | 22 +++++++-------- src/frontend/src/handler/create_source.rs | 2 +- 4 files changed, 52 insertions(+), 16 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/input/batch_source.yaml b/src/frontend/planner_test/tests/testdata/input/batch_source.yaml index 5890cc6866b9f..486ce8bc26d20 100644 --- a/src/frontend/planner_test/tests/testdata/input/batch_source.yaml +++ b/src/frontend/planner_test/tests/testdata/input/batch_source.yaml @@ -30,3 +30,19 @@ expected_outputs: - batch_plan - logical_plan +- sql: | + insert into s values (1,2, E'\\xDEADBEEF'::bytea); + create_table_with_connector: + format: plain + encode: protobuf + name: s + file: | + syntax = "proto3"; + package test; + message TestRecord { + int32 id = 1; + int32 value = 2; + } + expected_outputs: + - batch_plan + - logical_plan diff --git a/src/frontend/planner_test/tests/testdata/output/batch_source.yaml b/src/frontend/planner_test/tests/testdata/output/batch_source.yaml index ac3c767343b04..04bc2ad98acfe 100644 --- a/src/frontend/planner_test/tests/testdata/output/batch_source.yaml +++ b/src/frontend/planner_test/tests/testdata/output/batch_source.yaml @@ -2,11 +2,11 @@ - sql: | select * from s logical_plan: |- - LogicalProject { exprs: [id, value] } + LogicalProject { exprs: [id, value, _rw_key] } └─LogicalSource { source: s, columns: [id, value, _rw_key, _rw_kafka_timestamp, _row_id], time_range: (Unbounded, Unbounded) } batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchProject { exprs: [id, value] } + └─BatchProject { exprs: [id, value, _rw_key] } └─BatchSource { source: s, columns: [id, value, _rw_key, _rw_kafka_timestamp, _row_id], filter: (None, None) } create_source: format: plain @@ -22,11 +22,11 @@ - sql: | insert into s values (1,2); logical_plan: |- - LogicalInsert { table: s, mapping: [0:0, 1:1] } + LogicalInsert { table: s, mapping: [0:0, 1:1], default: [2<-null:Bytea] } └─LogicalValues { rows: [[1:Int32, 2:Int32]], schema: Schema { fields: [*VALUES*_0.column_0:Int32, *VALUES*_0.column_1:Int32] } } batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchInsert { table: s, mapping: [0:0, 1:1] } + └─BatchInsert { table: s, mapping: [0:0, 1:1], default: [2<-null:Bytea] } └─BatchValues { rows: [[1:Int32, 2:Int32]] } create_table_with_connector: format: plain @@ -39,3 +39,23 @@ int32 id = 1; int32 value = 2; } +- sql: | + insert into s values (1,2, E'\\xDEADBEEF'::bytea); + logical_plan: |- + LogicalInsert { table: s, mapping: [0:0, 1:1, 2:2] } + └─LogicalValues { rows: [[1:Int32, 2:Int32, '\xdeadbeef':Bytea]], schema: Schema { fields: [*VALUES*_0.column_0:Int32, *VALUES*_0.column_1:Int32, *VALUES*_0.column_2:Bytea] } } + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchInsert { table: s, mapping: [0:0, 1:1, 2:2] } + └─BatchValues { rows: [[1:Int32, 2:Int32, '\xdeadbeef':Bytea]] } + create_table_with_connector: + format: plain + encode: protobuf + name: s + file: | + syntax = "proto3"; + package test; + message TestRecord { + int32 id = 1; + int32 value = 2; + } diff --git a/src/frontend/planner_test/tests/testdata/output/struct_query.yaml b/src/frontend/planner_test/tests/testdata/output/struct_query.yaml index 525ca3880ad47..fb6c498321471 100644 --- a/src/frontend/planner_test/tests/testdata/output/struct_query.yaml +++ b/src/frontend/planner_test/tests/testdata/output/struct_query.yaml @@ -36,7 +36,7 @@ select (t.country).city,t.country,(country).city.address from t; logical_plan: |- LogicalProject { exprs: [Field(t.country, 1:Int32) as $expr1, t.country, Field(Field(t.country, 1:Int32), 0:Int32) as $expr2] } - └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._row_id] } + └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._rw_key, t._row_id] } create_source: format: plain encode: protobuf @@ -80,7 +80,7 @@ select (t.country1).city.*,(t.country2).*,(country3).city.* from t; logical_plan: |- LogicalProject { exprs: [Field(Field(t.country1, 1:Int32), 0:Int32) as $expr1, Field(Field(t.country1, 1:Int32), 1:Int32) as $expr2, Field(t.country2, 0:Int32) as $expr3, Field(t.country2, 1:Int32) as $expr4, Field(t.country2, 2:Int32) as $expr5, Field(Field(t.country3, 1:Int32), 0:Int32) as $expr6, Field(Field(t.country3, 1:Int32), 1:Int32) as $expr7] } - └─LogicalScan { table: t, columns: [t.id, t.country1, t.country2, t.country3, t.zipcode, t.rate, t._row_id] } + └─LogicalScan { table: t, columns: [t.id, t.country1, t.country2, t.country3, t.zipcode, t.rate, t._rw_key, t._row_id] } create_source: format: plain encode: protobuf @@ -112,7 +112,7 @@ logical_plan: |- LogicalProject { exprs: [Field($expr1, 1:Int32) as $expr2] } └─LogicalProject { exprs: [Field(t.country, 1:Int32) as $expr1] } - └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._row_id] } + └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._rw_key, t._row_id] } create_source: format: plain encode: protobuf @@ -144,7 +144,7 @@ └─LogicalProject { exprs: [min($expr1)] } └─LogicalAgg { aggs: [min($expr1)] } └─LogicalProject { exprs: [Field(t.country, 1:Int32) as $expr1] } - └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._row_id] } + └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._rw_key, t._row_id] } create_source: format: plain encode: protobuf @@ -171,11 +171,11 @@ create materialized view t as select * from s; select * from (select (country).city as c from t) as vv join t on (c).zipcode=(t.country).zipcode; logical_plan: |- - LogicalProject { exprs: [$expr1, t.id, t.country, t.zipcode, t.rate] } + LogicalProject { exprs: [$expr1, t.id, t.country, t.zipcode, t.rate, t._rw_key] } └─LogicalJoin { type: Inner, on: (Field($expr1, 1:Int32) = Field(t.country, 2:Int32)), output: all } ├─LogicalProject { exprs: [Field(t.country, 1:Int32) as $expr1] } - │ └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._row_id] } - └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._row_id] } + │ └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._rw_key, t._row_id] } + └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._rw_key, t._row_id] } create_source: format: plain encode: protobuf @@ -205,7 +205,7 @@ LogicalProject { exprs: [(min($expr1) + (max($expr1) * count(t.zipcode))) as $expr2] } └─LogicalAgg { aggs: [min($expr1), max($expr1), count(t.zipcode)] } └─LogicalProject { exprs: [Field(Field(t.country, 1:Int32), 0:Int32) as $expr1, t.zipcode] } - └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._row_id] } + └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._rw_key, t._row_id] } create_source: format: plain encode: protobuf @@ -236,7 +236,7 @@ └─LogicalAgg { aggs: [count(1:Int32), count($expr1)] } └─LogicalProject { exprs: [1:Int32, Field(Field(t.country, 1:Int32), 1:Int32) as $expr1] } └─LogicalFilter { predicate: (Field(Field(t.country, 1:Int32), 0:Int32) > 1:Int32) } - └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._row_id] } + └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._rw_key, t._row_id] } create_source: format: plain encode: protobuf @@ -367,7 +367,7 @@ - sql: | insert into s values (1,2,(1,2,(1,2,null))); logical_plan: |- - LogicalInsert { table: s, mapping: [0:0, 1:1, 2:2] } + LogicalInsert { table: s, mapping: [0:0, 1:1, 2:2], default: [3<-null:Bytea] } └─LogicalValues { rows: [[1:Int32, 2:Int32, Row(1:Int32, 2:Int32, Row(1:Int32, 2:Int32, null:Int32))]], schema: Schema { fields: [*VALUES*_0.column_0:Int32, *VALUES*_0.column_1:Int32, *VALUES*_0.column_2:Struct(StructType { field_names: ["v1", "v2", "v3"], field_types: [Int32, Int32, Struct(StructType { field_names: ["v1", "v2", "v3"], field_types: [Int32, Int32, Int32] })] })] } } create_table_with_connector: format: plain @@ -394,7 +394,7 @@ - sql: | select * from s where s.v3 = (1,2,(1,2,3)); logical_plan: |- - LogicalProject { exprs: [s.v1, s.v2, s.v3] } + LogicalProject { exprs: [s.v1, s.v2, s.v3, s._rw_key] } └─LogicalFilter { predicate: (s.v3 = Row(1:Int32, 2:Int32, Row(1:Int32, 2:Int32, 3:Int32))) } └─LogicalScan { table: s, columns: [s.v1, s.v2, s.v3, s._rw_key, s._row_id] } create_table_with_connector: diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index e93d703671446..de6196c38d293 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -855,7 +855,7 @@ fn add_default_key_column(columns: &mut Vec) { generated_or_default_column: None, description: None, }, - is_hidden: true, + is_hidden: false, }; columns.push(column); } From fb8d1d0b2aaecfd841a1dbd68173a9557bca4b2f Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Mon, 20 Nov 2023 19:23:45 +0800 Subject: [PATCH 3/6] try fix tests --- e2e_test/source/basic/kafka.slt | 2 +- e2e_test/source/basic/kafka_batch.slt | 2 +- e2e_test/source/basic/nosim_kafka.slt | 12 ++++++++---- .../source/basic/old_row_format_syntax/kafka.slt | 2 +- 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/e2e_test/source/basic/kafka.slt b/e2e_test/source/basic/kafka.slt index b1ad66bda7f99..14380c3fd8302 100644 --- a/e2e_test/source/basic/kafka.slt +++ b/e2e_test/source/basic/kafka.slt @@ -775,7 +775,7 @@ order by 56166 2 query I -SELECT * FROM source_mv3 ORDER BY id; +SELECT id FROM source_mv3 ORDER BY id; ---- \x6b6b \x776561776566776566 diff --git a/e2e_test/source/basic/kafka_batch.slt b/e2e_test/source/basic/kafka_batch.slt index 525031684166c..cc9a50e1d4108 100644 --- a/e2e_test/source/basic/kafka_batch.slt +++ b/e2e_test/source/basic/kafka_batch.slt @@ -214,7 +214,7 @@ select count(*) from s8 0 query I -select * from s9 order by id +select id from s9 order by id ---- \x6b6b \x776561776566776566 diff --git a/e2e_test/source/basic/nosim_kafka.slt b/e2e_test/source/basic/nosim_kafka.slt index beafb02cb96ba..9e648015689b4 100644 --- a/e2e_test/source/basic/nosim_kafka.slt +++ b/e2e_test/source/basic/nosim_kafka.slt @@ -84,7 +84,7 @@ sleep 8s query II SELECT - * + op_type, "ID", "CLASS_ID", "ITEM_ID", "ATTR_ID", "ATTR_VALUE", "ORG_ID", "UNIT_INFO", "UPD_TIME" FROM upsert_avro_json_default_key ORDER BY @@ -110,7 +110,7 @@ delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z query II SELECT - * + "ID", "firstName", "lastName", "age", "height", "weight" FROM upsert_student_avro_json ORDER BY @@ -135,12 +135,16 @@ select count(*) from debezium_compact; 2 query TFITT -select * from kafka_json_schema_plain +select + "dimensions", "price", "productId", "productName", "tags" +from kafka_json_schema_plain ---- (9.5,7,12) 12.5 1 An ice sculpture {cold,ice} query TFITT -select * from kafka_json_schema_upsert order by id +select + "dimensions", "id", "price", "productName", "tags" +from kafka_json_schema_upsert order by id ---- (9.5,7,12) 1 23 An ice sculpture {cold,ice} (9.5,7,12) 2 12.5 An ice sculpture {cold,ice} diff --git a/e2e_test/source/basic/old_row_format_syntax/kafka.slt b/e2e_test/source/basic/old_row_format_syntax/kafka.slt index 05e0d55c28c48..87e4afa2819ac 100644 --- a/e2e_test/source/basic/old_row_format_syntax/kafka.slt +++ b/e2e_test/source/basic/old_row_format_syntax/kafka.slt @@ -707,7 +707,7 @@ order by 56166 2 query I -SELECT * FROM source_mv3 ORDER BY id; +SELECT id FROM source_mv3 ORDER BY id; ---- \x6b6b \x776561776566776566 From d27e8dd29195967f7c61f2ebfbab7a74c6504c84 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Mon, 20 Nov 2023 19:25:44 +0800 Subject: [PATCH 4/6] fix tailing space --- e2e_test/source/basic/nosim_kafka.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/source/basic/nosim_kafka.slt b/e2e_test/source/basic/nosim_kafka.slt index 9e648015689b4..333d9b5909ee3 100644 --- a/e2e_test/source/basic/nosim_kafka.slt +++ b/e2e_test/source/basic/nosim_kafka.slt @@ -135,7 +135,7 @@ select count(*) from debezium_compact; 2 query TFITT -select +select "dimensions", "price", "productId", "productName", "tags" from kafka_json_schema_plain ---- From e0e63847d3c55d4d14bc1b060c9646c77f297a45 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Mon, 20 Nov 2023 19:48:59 +0800 Subject: [PATCH 5/6] fix test --- e2e_test/source/basic/kafka.slt | 9 +++++++-- .../source/basic/old_row_format_syntax/kafka.slt | 14 ++++++++++++-- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/e2e_test/source/basic/kafka.slt b/e2e_test/source/basic/kafka.slt index 14380c3fd8302..21b6db8a08550 100644 --- a/e2e_test/source/basic/kafka.slt +++ b/e2e_test/source/basic/kafka.slt @@ -731,7 +731,7 @@ ORDER BY query II SELECT - * + "ID", "firstName", "lastName", "age", "height", "weight" FROM upsert_students ORDER BY @@ -747,7 +747,12 @@ ORDER BY query II SELECT - * + "ID", + "firstName", + "lastName", + "age", + "height", + "weight" FROM upsert_students_default_key ORDER BY diff --git a/e2e_test/source/basic/old_row_format_syntax/kafka.slt b/e2e_test/source/basic/old_row_format_syntax/kafka.slt index 87e4afa2819ac..77b8b759b8624 100644 --- a/e2e_test/source/basic/old_row_format_syntax/kafka.slt +++ b/e2e_test/source/basic/old_row_format_syntax/kafka.slt @@ -663,7 +663,12 @@ ORDER BY query II SELECT - * + "ID", + "firstName", + "lastName", + "age", + "height", + "weight" FROM upsert_students ORDER BY @@ -679,7 +684,12 @@ ORDER BY query II SELECT - * + "ID", + "firstName", + "lastName", + "age", + "height", + "weight" FROM upsert_students_default_key ORDER BY From ab21e306afa41a30ef73fe85bd3fda33212e4b70 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Mon, 20 Nov 2023 20:08:06 +0800 Subject: [PATCH 6/6] try fix --- e2e_test/source/basic/old_row_format_syntax/kafka_batch.slt | 2 +- e2e_test/source/basic/old_row_format_syntax/nosim_kafka.slt | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/e2e_test/source/basic/old_row_format_syntax/kafka_batch.slt b/e2e_test/source/basic/old_row_format_syntax/kafka_batch.slt index 7a1495470f253..44f40b90c08c4 100644 --- a/e2e_test/source/basic/old_row_format_syntax/kafka_batch.slt +++ b/e2e_test/source/basic/old_row_format_syntax/kafka_batch.slt @@ -192,7 +192,7 @@ select count(*) from s8 0 query I -select * from s9 order by id +select id from s9 order by id ---- \x6b6b \x776561776566776566 diff --git a/e2e_test/source/basic/old_row_format_syntax/nosim_kafka.slt b/e2e_test/source/basic/old_row_format_syntax/nosim_kafka.slt index 37e2ef2266ff0..eaeb70ae5bad9 100644 --- a/e2e_test/source/basic/old_row_format_syntax/nosim_kafka.slt +++ b/e2e_test/source/basic/old_row_format_syntax/nosim_kafka.slt @@ -70,7 +70,7 @@ sleep 10s query II SELECT - * + op_type, "ID", "CLASS_ID", "ITEM_ID", "ATTR_ID", "ATTR_VALUE", "ORG_ID", "UNIT_INFO", "UPD_TIME" FROM upsert_avro_json_default_key ORDER BY @@ -97,7 +97,7 @@ delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z query II SELECT - * + "ID", "firstName", "lastName", "age", "height", "weight" FROM upsert_student_avro_json ORDER BY