Skip to content

Commit

Permalink
fix(conn): fix throw error msg of connector when encode Json or Csv (#…
Browse files Browse the repository at this point in the history
…13390)

Co-authored-by: jiamin.huang <[email protected]>
  • Loading branch information
keven-huang and jiamin.huang authored Nov 13, 2023
1 parent df1a260 commit 950ed58
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 34 deletions.
2 changes: 1 addition & 1 deletion src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl PlainParser {
payload: Option<Vec<u8>>,
mut writer: SourceStreamChunkRowWriter<'_>,
) -> Result<()> {
// if key is empty, set it as vec![]su
// if key is empty, set it as vec![]
let key_data = key.unwrap_or_default();
// if payload is empty, report error
let payload_data = payload.ok_or_else(|| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE CSV (delimiter = ',', without_header = true);
explain_output: |
StreamMaterialize { columns: [v1, v2, _rw_key(hidden), _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: Overwrite }
StreamMaterialize { columns: [v1, v2, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: Overwrite }
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamRowIdGen { row_id_index: 3 }
└─StreamDml { columns: [v1, v2, _rw_key, _row_id] }
└─StreamSource { source: s0, columns: [v1, v2, _rw_key, _row_id] }
└─StreamRowIdGen { row_id_index: 2 }
└─StreamDml { columns: [v1, v2, _row_id] }
└─StreamSource { source: s0, columns: [v1, v2, _row_id] }
- id: csv_delimiter_tab
sql: |
explain create table s0 (v1 int, v2 varchar) with (
Expand All @@ -34,11 +34,11 @@
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE CSV (delimiter = E'\t', without_header = true);
explain_output: |
StreamMaterialize { columns: [v1, v2, _rw_key(hidden), _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: Overwrite }
StreamMaterialize { columns: [v1, v2, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: Overwrite }
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamRowIdGen { row_id_index: 3 }
└─StreamDml { columns: [v1, v2, _rw_key, _row_id] }
└─StreamSource { source: s0, columns: [v1, v2, _rw_key, _row_id] }
└─StreamRowIdGen { row_id_index: 2 }
└─StreamDml { columns: [v1, v2, _row_id] }
└─StreamSource { source: s0, columns: [v1, v2, _row_id] }
- id: create_source_with_cdc_backfill
sql: |
create source mysql_mydb with (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@
└─StreamProject { exprs: [v1, min(v2), count(distinct v3)], output_watermarks: [v1] }
└─StreamHashAgg [append_only] { group_key: [v1], aggs: [min(v2), count(distinct v3), count], output_watermarks: [v1] }
└─StreamExchange { dist: HashShard(v1) }
└─StreamRowIdGen { row_id_index: 4 }
└─StreamRowIdGen { row_id_index: 3 }
└─StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - 10:Int32) }], output_watermarks: [v1] }
└─StreamSource { source: t, columns: [v1, v2, v3, _rw_key, _row_id] }
└─StreamSource { source: t, columns: [v1, v2, v3, _row_id] }
eowc_stream_plan: |-
StreamMaterialize { columns: [v1, min, agg], stream_key: [v1], pk_columns: [v1], pk_conflict: NoCheck, watermark_columns: [v1] }
└─StreamProject { exprs: [v1, min(v2), count(distinct v3)], output_watermarks: [v1] }
└─StreamHashAgg [append_only, eowc] { group_key: [v1], aggs: [min(v2), count(distinct v3), count], output_watermarks: [v1] }
└─StreamExchange { dist: HashShard(v1) }
└─StreamRowIdGen { row_id_index: 4 }
└─StreamRowIdGen { row_id_index: 3 }
└─StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - 10:Int32) }], output_watermarks: [v1] }
└─StreamSource { source: t, columns: [v1, v2, v3, _rw_key, _row_id] }
└─StreamSource { source: t, columns: [v1, v2, v3, _row_id] }
eowc_stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [v1, min, agg], stream_key: [v1], pk_columns: [v1], pk_conflict: NoCheck, watermark_columns: [v1] }
Expand All @@ -42,10 +42,10 @@
└── StreamExchange Hash([0]) from 1
Fragment 1
StreamRowIdGen { row_id_index: 4 }
StreamRowIdGen { row_id_index: 3 }
└── StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - 10:Int32) }], output_watermarks: [v1] }
├── state tables: [ 2 ]
└── StreamSource { source: t, columns: [v1, v2, v3, _rw_key, _row_id] } { source state table: 3 }
└── StreamSource { source: t, columns: [v1, v2, v3, _row_id] } { source state table: 3 }
Table 0
├── columns: [ v1, min(v2), count(distinct v3), count ]
Expand Down Expand Up @@ -142,9 +142,9 @@
└─StreamEowcSort { sort_column: tm }
└─StreamExchange { dist: HashShard(b) }
└─StreamProject { exprs: [a, b, tm, _row_id], output_watermarks: [tm] }
└─StreamRowIdGen { row_id_index: 4 }
└─StreamRowIdGen { row_id_index: 3 }
└─StreamWatermarkFilter { watermark_descs: [Desc { column: tm, expr: (tm - '00:05:00':Interval) }], output_watermarks: [tm] }
└─StreamSource { source: t, columns: [a, b, tm, _rw_key, _row_id] }
└─StreamSource { source: t, columns: [a, b, tm, _row_id] }
eowc_stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [lag, _row_id(hidden), b(hidden)], stream_key: [_row_id, b], pk_columns: [_row_id, b], pk_conflict: NoCheck }
Expand All @@ -157,10 +157,10 @@
Fragment 1
StreamProject { exprs: [a, b, tm, _row_id], output_watermarks: [tm] }
└── StreamRowIdGen { row_id_index: 4 }
└── StreamRowIdGen { row_id_index: 3 }
└── StreamWatermarkFilter { watermark_descs: [Desc { column: tm, expr: (tm - '00:05:00':Interval) }], output_watermarks: [tm] }
├── state tables: [ 2 ]
└── StreamSource { source: t, columns: [a, b, tm, _rw_key, _row_id] } { source state table: 3 }
└── StreamSource { source: t, columns: [a, b, tm, _row_id] } { source state table: 3 }
Table 0
├── columns: [ a, b, tm, _row_id ]
Expand Down
8 changes: 4 additions & 4 deletions src/frontend/planner_test/tests/testdata/output/explain.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@
- sql: |
explain create table t (v1 int, v2 varchar) with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON;
explain_output: |
StreamMaterialize { columns: [v1, v2, _rw_key(hidden), _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: Overwrite }
StreamMaterialize { columns: [v1, v2, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: Overwrite }
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamRowIdGen { row_id_index: 3 }
└─StreamDml { columns: [v1, v2, _rw_key, _row_id] }
└─StreamSource { source: t, columns: [v1, v2, _rw_key, _row_id] }
└─StreamRowIdGen { row_id_index: 2 }
└─StreamDml { columns: [v1, v2, _row_id] }
└─StreamSource { source: t, columns: [v1, v2, _row_id] }
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [(v2 + 1:Int32) as $expr1] }
└─BatchSource { source: s1, columns: [v2, _rw_key, _row_id], filter: (None, None) }
└─BatchSource { source: s1, columns: [v2, _row_id], filter: (None, None) }
- name: select proctime()
sql: |
select proctime();
Expand Down
18 changes: 9 additions & 9 deletions src/frontend/planner_test/tests/testdata/output/watermark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,22 @@
select t.v1 - INTERVAL '2' SECOND as v1 from t;
logical_plan: |-
LogicalProject { exprs: [(v1 - '00:00:02':Interval) as $expr1] }
└─LogicalSource { source: t, columns: [v1, _rw_key, _row_id], time_range: (Unbounded, Unbounded) }
└─LogicalSource { source: t, columns: [v1, _row_id], time_range: (Unbounded, Unbounded) }
stream_plan: |-
StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck, watermark_columns: [v1] }
└─StreamProject { exprs: [SubtractWithTimeZone(v1, '00:00:02':Interval, 'UTC':Varchar) as $expr1, _row_id], output_watermarks: [$expr1] }
└─StreamRowIdGen { row_id_index: 2 }
└─StreamRowIdGen { row_id_index: 1 }
└─StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - '00:00:01':Interval) }], output_watermarks: [v1] }
└─StreamSource { source: t, columns: [v1, _rw_key, _row_id] }
└─StreamSource { source: t, columns: [v1, _row_id] }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck, watermark_columns: [v1] }
├── materialized table: 4294967294
└── StreamProject { exprs: [SubtractWithTimeZone(v1, '00:00:02':Interval, 'UTC':Varchar) as $expr1, _row_id], output_watermarks: [$expr1] }
└── StreamRowIdGen { row_id_index: 2 }
└── StreamRowIdGen { row_id_index: 1 }
└── StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - '00:00:01':Interval) }], output_watermarks: [v1] }
├── state tables: [ 0 ]
└── StreamSource { source: t, columns: [v1, _rw_key, _row_id] } { source state table: 1 }
└── StreamSource { source: t, columns: [v1, _row_id] } { source state table: 1 }
Table 0
├── columns: [ vnode, offset ]
Expand Down Expand Up @@ -48,11 +48,11 @@
sql: |
explain create table t (v1 timestamp with time zone, watermark for v1 as v1 - INTERVAL '1' SECOND) append only with (connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest') FORMAT PLAIN ENCODE JSON;
explain_output: |
StreamMaterialize { columns: [v1, _rw_key(hidden), _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck, watermark_columns: [v1] }
└─StreamRowIdGen { row_id_index: 2 }
StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck, watermark_columns: [v1] }
└─StreamRowIdGen { row_id_index: 1 }
└─StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - '00:00:01':Interval) }], output_watermarks: [v1] }
└─StreamDml { columns: [v1, _rw_key, _row_id] }
└─StreamSource { source: t, columns: [v1, _rw_key, _row_id] }
└─StreamDml { columns: [v1, _row_id] }
└─StreamSource { source: t, columns: [v1, _row_id] }
- name: watermark on append only table without source
sql: |
explain create table t (v1 timestamp with time zone, watermark for v1 as v1 - INTERVAL '1' SECOND) append only;
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,9 @@ pub(crate) async fn bind_source_pk(
let sql_defined_pk = !sql_defined_pk_names.is_empty();

let res = match (&source_schema.format, &source_schema.row_encode) {
(Format::Native, Encode::Native) => sql_defined_pk_names,
(Format::Native, Encode::Native) | (Format::Plain, Encode::Json | Encode::Csv) => {
sql_defined_pk_names
}
(Format::Plain, _) => {
if is_key_mq_connector(with_properties) {
add_default_key_column(columns);
Expand Down

0 comments on commit 950ed58

Please sign in to comment.