From 950ed58b5fcab88ca00a08c7683120a2d042b54d Mon Sep 17 00:00:00 2001 From: Phoenix7Rise <81153420+keven-huang@users.noreply.github.com> Date: Mon, 13 Nov 2023 17:49:08 +0800 Subject: [PATCH] fix(conn): fix throw error msg of connector when encode Json or Csv (#13390) Co-authored-by: jiamin.huang --- src/connector/src/parser/plain_parser.rs | 2 +- .../tests/testdata/output/create_source.yaml | 16 +++++++-------- .../testdata/output/emit_on_window_close.yaml | 20 +++++++++---------- .../tests/testdata/output/explain.yaml | 8 ++++---- .../testdata/output/generated_columns.yaml | 2 +- .../tests/testdata/output/watermark.yaml | 18 ++++++++--------- src/frontend/src/handler/create_source.rs | 4 +++- 7 files changed, 36 insertions(+), 34 deletions(-) diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index 7448efb6f19d2..cd805af4a1893 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -70,7 +70,7 @@ impl PlainParser { payload: Option>, mut writer: SourceStreamChunkRowWriter<'_>, ) -> Result<()> { - // if key is empty, set it as vec![]su + // if key is empty, set it as vec![] let key_data = key.unwrap_or_default(); // if payload is empty, report error let payload_data = payload.ok_or_else(|| { diff --git a/src/frontend/planner_test/tests/testdata/output/create_source.yaml b/src/frontend/planner_test/tests/testdata/output/create_source.yaml index c20af237eadc1..fef47aa736aff 100644 --- a/src/frontend/planner_test/tests/testdata/output/create_source.yaml +++ b/src/frontend/planner_test/tests/testdata/output/create_source.yaml @@ -20,11 +20,11 @@ scan.startup.mode = 'earliest' ) FORMAT PLAIN ENCODE CSV (delimiter = ',', without_header = true); explain_output: | - StreamMaterialize { columns: [v1, v2, _rw_key(hidden), _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: Overwrite } + StreamMaterialize { columns: [v1, v2, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: Overwrite } └─StreamExchange { dist: HashShard(_row_id) } - └─StreamRowIdGen { row_id_index: 3 } - └─StreamDml { columns: [v1, v2, _rw_key, _row_id] } - └─StreamSource { source: s0, columns: [v1, v2, _rw_key, _row_id] } + └─StreamRowIdGen { row_id_index: 2 } + └─StreamDml { columns: [v1, v2, _row_id] } + └─StreamSource { source: s0, columns: [v1, v2, _row_id] } - id: csv_delimiter_tab sql: | explain create table s0 (v1 int, v2 varchar) with ( @@ -34,11 +34,11 @@ scan.startup.mode = 'earliest' ) FORMAT PLAIN ENCODE CSV (delimiter = E'\t', without_header = true); explain_output: | - StreamMaterialize { columns: [v1, v2, _rw_key(hidden), _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: Overwrite } + StreamMaterialize { columns: [v1, v2, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: Overwrite } └─StreamExchange { dist: HashShard(_row_id) } - └─StreamRowIdGen { row_id_index: 3 } - └─StreamDml { columns: [v1, v2, _rw_key, _row_id] } - └─StreamSource { source: s0, columns: [v1, v2, _rw_key, _row_id] } + └─StreamRowIdGen { row_id_index: 2 } + └─StreamDml { columns: [v1, v2, _row_id] } + └─StreamSource { source: s0, columns: [v1, v2, _row_id] } - id: create_source_with_cdc_backfill sql: | create source mysql_mydb with ( diff --git a/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml b/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml index cb7155cfa0a4f..cd3019382bd66 100644 --- a/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml +++ b/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml @@ -19,17 +19,17 @@ └─StreamProject { exprs: [v1, min(v2), count(distinct v3)], output_watermarks: [v1] } └─StreamHashAgg [append_only] { group_key: [v1], aggs: [min(v2), count(distinct v3), count], output_watermarks: [v1] } └─StreamExchange { dist: HashShard(v1) } - └─StreamRowIdGen { row_id_index: 4 } + └─StreamRowIdGen { row_id_index: 3 } └─StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - 10:Int32) }], output_watermarks: [v1] } - └─StreamSource { source: t, columns: [v1, v2, v3, _rw_key, _row_id] } + └─StreamSource { source: t, columns: [v1, v2, v3, _row_id] } eowc_stream_plan: |- StreamMaterialize { columns: [v1, min, agg], stream_key: [v1], pk_columns: [v1], pk_conflict: NoCheck, watermark_columns: [v1] } └─StreamProject { exprs: [v1, min(v2), count(distinct v3)], output_watermarks: [v1] } └─StreamHashAgg [append_only, eowc] { group_key: [v1], aggs: [min(v2), count(distinct v3), count], output_watermarks: [v1] } └─StreamExchange { dist: HashShard(v1) } - └─StreamRowIdGen { row_id_index: 4 } + └─StreamRowIdGen { row_id_index: 3 } └─StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - 10:Int32) }], output_watermarks: [v1] } - └─StreamSource { source: t, columns: [v1, v2, v3, _rw_key, _row_id] } + └─StreamSource { source: t, columns: [v1, v2, v3, _row_id] } eowc_stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [v1, min, agg], stream_key: [v1], pk_columns: [v1], pk_conflict: NoCheck, watermark_columns: [v1] } @@ -42,10 +42,10 @@ └── StreamExchange Hash([0]) from 1 Fragment 1 - StreamRowIdGen { row_id_index: 4 } + StreamRowIdGen { row_id_index: 3 } └── StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - 10:Int32) }], output_watermarks: [v1] } ├── state tables: [ 2 ] - └── StreamSource { source: t, columns: [v1, v2, v3, _rw_key, _row_id] } { source state table: 3 } + └── StreamSource { source: t, columns: [v1, v2, v3, _row_id] } { source state table: 3 } Table 0 ├── columns: [ v1, min(v2), count(distinct v3), count ] @@ -142,9 +142,9 @@ └─StreamEowcSort { sort_column: tm } └─StreamExchange { dist: HashShard(b) } └─StreamProject { exprs: [a, b, tm, _row_id], output_watermarks: [tm] } - └─StreamRowIdGen { row_id_index: 4 } + └─StreamRowIdGen { row_id_index: 3 } └─StreamWatermarkFilter { watermark_descs: [Desc { column: tm, expr: (tm - '00:05:00':Interval) }], output_watermarks: [tm] } - └─StreamSource { source: t, columns: [a, b, tm, _rw_key, _row_id] } + └─StreamSource { source: t, columns: [a, b, tm, _row_id] } eowc_stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [lag, _row_id(hidden), b(hidden)], stream_key: [_row_id, b], pk_columns: [_row_id, b], pk_conflict: NoCheck } @@ -157,10 +157,10 @@ Fragment 1 StreamProject { exprs: [a, b, tm, _row_id], output_watermarks: [tm] } - └── StreamRowIdGen { row_id_index: 4 } + └── StreamRowIdGen { row_id_index: 3 } └── StreamWatermarkFilter { watermark_descs: [Desc { column: tm, expr: (tm - '00:05:00':Interval) }], output_watermarks: [tm] } ├── state tables: [ 2 ] - └── StreamSource { source: t, columns: [a, b, tm, _rw_key, _row_id] } { source state table: 3 } + └── StreamSource { source: t, columns: [a, b, tm, _row_id] } { source state table: 3 } Table 0 ├── columns: [ a, b, tm, _row_id ] diff --git a/src/frontend/planner_test/tests/testdata/output/explain.yaml b/src/frontend/planner_test/tests/testdata/output/explain.yaml index d3ac80df9b574..63899bc26c0e4 100644 --- a/src/frontend/planner_test/tests/testdata/output/explain.yaml +++ b/src/frontend/planner_test/tests/testdata/output/explain.yaml @@ -198,8 +198,8 @@ - sql: | explain create table t (v1 int, v2 varchar) with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON; explain_output: | - StreamMaterialize { columns: [v1, v2, _rw_key(hidden), _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: Overwrite } + StreamMaterialize { columns: [v1, v2, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: Overwrite } └─StreamExchange { dist: HashShard(_row_id) } - └─StreamRowIdGen { row_id_index: 3 } - └─StreamDml { columns: [v1, v2, _rw_key, _row_id] } - └─StreamSource { source: t, columns: [v1, v2, _rw_key, _row_id] } + └─StreamRowIdGen { row_id_index: 2 } + └─StreamDml { columns: [v1, v2, _row_id] } + └─StreamSource { source: t, columns: [v1, v2, _row_id] } diff --git a/src/frontend/planner_test/tests/testdata/output/generated_columns.yaml b/src/frontend/planner_test/tests/testdata/output/generated_columns.yaml index bb91f726670a9..a3acaf174d237 100644 --- a/src/frontend/planner_test/tests/testdata/output/generated_columns.yaml +++ b/src/frontend/planner_test/tests/testdata/output/generated_columns.yaml @@ -16,7 +16,7 @@ batch_plan: |- BatchExchange { order: [], dist: Single } └─BatchProject { exprs: [(v2 + 1:Int32) as $expr1] } - └─BatchSource { source: s1, columns: [v2, _rw_key, _row_id], filter: (None, None) } + └─BatchSource { source: s1, columns: [v2, _row_id], filter: (None, None) } - name: select proctime() sql: | select proctime(); diff --git a/src/frontend/planner_test/tests/testdata/output/watermark.yaml b/src/frontend/planner_test/tests/testdata/output/watermark.yaml index e56f4eff7496f..d57d41fa76bc3 100644 --- a/src/frontend/planner_test/tests/testdata/output/watermark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/watermark.yaml @@ -5,22 +5,22 @@ select t.v1 - INTERVAL '2' SECOND as v1 from t; logical_plan: |- LogicalProject { exprs: [(v1 - '00:00:02':Interval) as $expr1] } - └─LogicalSource { source: t, columns: [v1, _rw_key, _row_id], time_range: (Unbounded, Unbounded) } + └─LogicalSource { source: t, columns: [v1, _row_id], time_range: (Unbounded, Unbounded) } stream_plan: |- StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck, watermark_columns: [v1] } └─StreamProject { exprs: [SubtractWithTimeZone(v1, '00:00:02':Interval, 'UTC':Varchar) as $expr1, _row_id], output_watermarks: [$expr1] } - └─StreamRowIdGen { row_id_index: 2 } + └─StreamRowIdGen { row_id_index: 1 } └─StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - '00:00:01':Interval) }], output_watermarks: [v1] } - └─StreamSource { source: t, columns: [v1, _rw_key, _row_id] } + └─StreamSource { source: t, columns: [v1, _row_id] } stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck, watermark_columns: [v1] } ├── materialized table: 4294967294 └── StreamProject { exprs: [SubtractWithTimeZone(v1, '00:00:02':Interval, 'UTC':Varchar) as $expr1, _row_id], output_watermarks: [$expr1] } - └── StreamRowIdGen { row_id_index: 2 } + └── StreamRowIdGen { row_id_index: 1 } └── StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - '00:00:01':Interval) }], output_watermarks: [v1] } ├── state tables: [ 0 ] - └── StreamSource { source: t, columns: [v1, _rw_key, _row_id] } { source state table: 1 } + └── StreamSource { source: t, columns: [v1, _row_id] } { source state table: 1 } Table 0 ├── columns: [ vnode, offset ] @@ -48,11 +48,11 @@ sql: | explain create table t (v1 timestamp with time zone, watermark for v1 as v1 - INTERVAL '1' SECOND) append only with (connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest') FORMAT PLAIN ENCODE JSON; explain_output: | - StreamMaterialize { columns: [v1, _rw_key(hidden), _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck, watermark_columns: [v1] } - └─StreamRowIdGen { row_id_index: 2 } + StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck, watermark_columns: [v1] } + └─StreamRowIdGen { row_id_index: 1 } └─StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - '00:00:01':Interval) }], output_watermarks: [v1] } - └─StreamDml { columns: [v1, _rw_key, _row_id] } - └─StreamSource { source: t, columns: [v1, _rw_key, _row_id] } + └─StreamDml { columns: [v1, _row_id] } + └─StreamSource { source: t, columns: [v1, _row_id] } - name: watermark on append only table without source sql: | explain create table t (v1 timestamp with time zone, watermark for v1 as v1 - INTERVAL '1' SECOND) append only; diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index d9ff525b5d85e..e93d703671446 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -718,7 +718,9 @@ pub(crate) async fn bind_source_pk( let sql_defined_pk = !sql_defined_pk_names.is_empty(); let res = match (&source_schema.format, &source_schema.row_encode) { - (Format::Native, Encode::Native) => sql_defined_pk_names, + (Format::Native, Encode::Native) | (Format::Plain, Encode::Json | Encode::Csv) => { + sql_defined_pk_names + } (Format::Plain, _) => { if is_key_mq_connector(with_properties) { add_default_key_column(columns);