From 0c2f5b03513d1c3a653b32616c91aa8ed84d0ae4 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Mon, 4 Dec 2023 14:51:53 +0800 Subject: [PATCH 1/6] Revert "fix(conn): fix throw error msg of connector when encode Json or Csv (#13390)" This reverts commit 950ed58b --- src/connector/src/parser/plain_parser.rs | 2 +- .../testdata/output/emit_on_window_close.yaml | 20 +++++++++---------- .../testdata/output/generated_columns.yaml | 2 +- .../tests/testdata/output/watermark.yaml | 14 ++++++------- src/frontend/src/handler/create_source.rs | 4 +--- 5 files changed, 20 insertions(+), 22 deletions(-) diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index cd805af4a1893..7448efb6f19d2 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -70,7 +70,7 @@ impl PlainParser { payload: Option>, mut writer: SourceStreamChunkRowWriter<'_>, ) -> Result<()> { - // if key is empty, set it as vec![] + // if key is empty, set it as vec![]su let key_data = key.unwrap_or_default(); // if payload is empty, report error let payload_data = payload.ok_or_else(|| { diff --git a/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml b/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml index cd3019382bd66..cb7155cfa0a4f 100644 --- a/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml +++ b/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml @@ -19,17 +19,17 @@ └─StreamProject { exprs: [v1, min(v2), count(distinct v3)], output_watermarks: [v1] } └─StreamHashAgg [append_only] { group_key: [v1], aggs: [min(v2), count(distinct v3), count], output_watermarks: [v1] } └─StreamExchange { dist: HashShard(v1) } - └─StreamRowIdGen { row_id_index: 3 } + └─StreamRowIdGen { row_id_index: 4 } └─StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - 10:Int32) }], output_watermarks: [v1] } - └─StreamSource { source: t, columns: [v1, v2, v3, _row_id] } + └─StreamSource { source: t, columns: [v1, v2, v3, _rw_key, _row_id] } eowc_stream_plan: |- StreamMaterialize { columns: [v1, min, agg], stream_key: [v1], pk_columns: [v1], pk_conflict: NoCheck, watermark_columns: [v1] } └─StreamProject { exprs: [v1, min(v2), count(distinct v3)], output_watermarks: [v1] } └─StreamHashAgg [append_only, eowc] { group_key: [v1], aggs: [min(v2), count(distinct v3), count], output_watermarks: [v1] } └─StreamExchange { dist: HashShard(v1) } - └─StreamRowIdGen { row_id_index: 3 } + └─StreamRowIdGen { row_id_index: 4 } └─StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - 10:Int32) }], output_watermarks: [v1] } - └─StreamSource { source: t, columns: [v1, v2, v3, _row_id] } + └─StreamSource { source: t, columns: [v1, v2, v3, _rw_key, _row_id] } eowc_stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [v1, min, agg], stream_key: [v1], pk_columns: [v1], pk_conflict: NoCheck, watermark_columns: [v1] } @@ -42,10 +42,10 @@ └── StreamExchange Hash([0]) from 1 Fragment 1 - StreamRowIdGen { row_id_index: 3 } + StreamRowIdGen { row_id_index: 4 } └── StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - 10:Int32) }], output_watermarks: [v1] } ├── state tables: [ 2 ] - └── StreamSource { source: t, columns: [v1, v2, v3, _row_id] } { source state table: 3 } + └── StreamSource { source: t, columns: [v1, v2, v3, _rw_key, _row_id] } { source state table: 3 } Table 0 ├── columns: [ v1, min(v2), count(distinct v3), count ] @@ -142,9 +142,9 @@ └─StreamEowcSort { sort_column: tm } └─StreamExchange { dist: HashShard(b) } └─StreamProject { exprs: [a, b, tm, _row_id], output_watermarks: [tm] } - └─StreamRowIdGen { row_id_index: 3 } + └─StreamRowIdGen { row_id_index: 4 } └─StreamWatermarkFilter { watermark_descs: [Desc { column: tm, expr: (tm - '00:05:00':Interval) }], output_watermarks: [tm] } - └─StreamSource { source: t, columns: [a, b, tm, _row_id] } + └─StreamSource { source: t, columns: [a, b, tm, _rw_key, _row_id] } eowc_stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [lag, _row_id(hidden), b(hidden)], stream_key: [_row_id, b], pk_columns: [_row_id, b], pk_conflict: NoCheck } @@ -157,10 +157,10 @@ Fragment 1 StreamProject { exprs: [a, b, tm, _row_id], output_watermarks: [tm] } - └── StreamRowIdGen { row_id_index: 3 } + └── StreamRowIdGen { row_id_index: 4 } └── StreamWatermarkFilter { watermark_descs: [Desc { column: tm, expr: (tm - '00:05:00':Interval) }], output_watermarks: [tm] } ├── state tables: [ 2 ] - └── StreamSource { source: t, columns: [a, b, tm, _row_id] } { source state table: 3 } + └── StreamSource { source: t, columns: [a, b, tm, _rw_key, _row_id] } { source state table: 3 } Table 0 ├── columns: [ a, b, tm, _row_id ] diff --git a/src/frontend/planner_test/tests/testdata/output/generated_columns.yaml b/src/frontend/planner_test/tests/testdata/output/generated_columns.yaml index e88df797de0c8..445609c3f2bb0 100644 --- a/src/frontend/planner_test/tests/testdata/output/generated_columns.yaml +++ b/src/frontend/planner_test/tests/testdata/output/generated_columns.yaml @@ -17,7 +17,7 @@ batch_plan: |- BatchExchange { order: [], dist: Single } └─BatchProject { exprs: [(v2 + 1:Int32) as $expr1] } - └─BatchSource { source: s1, columns: [v2, _row_id], filter: (None, None) } + └─BatchSource { source: s1, columns: [v2, _rw_key, _row_id], filter: (None, None) } - name: select proctime() sql: | select proctime(); diff --git a/src/frontend/planner_test/tests/testdata/output/watermark.yaml b/src/frontend/planner_test/tests/testdata/output/watermark.yaml index cc54951beec4a..18c0cc558c3ec 100644 --- a/src/frontend/planner_test/tests/testdata/output/watermark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/watermark.yaml @@ -5,22 +5,22 @@ select t.v1 - INTERVAL '2' SECOND as v1 from t; logical_plan: |- LogicalProject { exprs: [(v1 - '00:00:02':Interval) as $expr1] } - └─LogicalSource { source: t, columns: [v1, _row_id], time_range: (Unbounded, Unbounded) } + └─LogicalSource { source: t, columns: [v1, _rw_key, _row_id], time_range: (Unbounded, Unbounded) } stream_plan: |- StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck, watermark_columns: [v1] } └─StreamProject { exprs: [SubtractWithTimeZone(v1, '00:00:02':Interval, 'UTC':Varchar) as $expr1, _row_id], output_watermarks: [$expr1] } - └─StreamRowIdGen { row_id_index: 1 } + └─StreamRowIdGen { row_id_index: 2 } └─StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - '00:00:01':Interval) }], output_watermarks: [v1] } - └─StreamSource { source: t, columns: [v1, _row_id] } + └─StreamSource { source: t, columns: [v1, _rw_key, _row_id] } stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck, watermark_columns: [v1] } ├── materialized table: 4294967294 └── StreamProject { exprs: [SubtractWithTimeZone(v1, '00:00:02':Interval, 'UTC':Varchar) as $expr1, _row_id], output_watermarks: [$expr1] } - └── StreamRowIdGen { row_id_index: 1 } + └── StreamRowIdGen { row_id_index: 2 } └── StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - '00:00:01':Interval) }], output_watermarks: [v1] } ├── state tables: [ 0 ] - └── StreamSource { source: t, columns: [v1, _row_id] } { source state table: 1 } + └── StreamSource { source: t, columns: [v1, _rw_key, _row_id] } { source state table: 1 } Table 0 ├── columns: [ vnode, offset ] @@ -48,8 +48,8 @@ sql: | explain create table t (v1 timestamp with time zone, watermark for v1 as v1 - INTERVAL '1' SECOND) append only with (connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest') FORMAT PLAIN ENCODE JSON; explain_output: | - StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck, watermark_columns: [v1] } - └─StreamRowIdGen { row_id_index: 1 } + StreamMaterialize { columns: [v1, _rw_key(hidden), _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck, watermark_columns: [v1] } + └─StreamRowIdGen { row_id_index: 2 } └─StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - '00:00:01':Interval) }], output_watermarks: [v1] } └─StreamUnion { all: true } ├─StreamExchange [no_shuffle] { dist: SomeShard } diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 7c8e597bad3db..85a560f532885 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -694,9 +694,7 @@ pub(crate) async fn bind_source_pk( let sql_defined_pk = !sql_defined_pk_names.is_empty(); let res = match (&source_schema.format, &source_schema.row_encode) { - (Format::Native, Encode::Native) | (Format::Plain, Encode::Json | Encode::Csv) => { - sql_defined_pk_names - } + (Format::Native, Encode::Native) => sql_defined_pk_names, (Format::Plain, _) => { if is_key_mq_connector(with_properties) { add_default_key_column(columns); From cf781e8ea3c557b3fb9825a98db4a888b67052ea Mon Sep 17 00:00:00 2001 From: tabVersion Date: Mon, 4 Dec 2023 15:14:57 +0800 Subject: [PATCH 2/6] Revert "feat: set default message queue key to format plain (#13278)" This reverts commit 4c78170f --- e2e_test/source/basic/kafka.slt | 18 +-- src/connector/src/parser/avro/parser.rs | 140 ++++++++---------- src/connector/src/parser/bytes_parser.rs | 19 +-- src/connector/src/parser/plain_parser.rs | 40 +---- src/connector/src/parser/unified/util.rs | 18 +-- .../tests/testdata/output/batch_source.yaml | 4 +- .../testdata/output/emit_on_window_close.yaml | 20 +-- .../testdata/output/generated_columns.yaml | 2 +- .../tests/testdata/output/struct_query.yaml | 2 +- src/frontend/src/handler/create_source.rs | 13 +- src/frontend/src/handler/show.rs | 1 - src/frontend/src/handler/util.rs | 14 +- 12 files changed, 105 insertions(+), 186 deletions(-) diff --git a/e2e_test/source/basic/kafka.slt b/e2e_test/source/basic/kafka.slt index 21b6db8a08550..e51c51cbba906 100644 --- a/e2e_test/source/basic/kafka.slt +++ b/e2e_test/source/basic/kafka.slt @@ -368,13 +368,13 @@ create table s27 with ( ) FORMAT PLAIN ENCODE JSON (schema.location = 'file:///risingwave/json-complex-schema') # currently _rw_key can be set as primary key -statement ok -create table s28 (id bytea, PRIMARY KEY(_rw_key)) with ( - connector = 'kafka', - topic = 'kafka_source_format_bytes', - properties.bootstrap.server = 'message_queue:29092', - scan.startup.mode = 'earliest' -) FORMAT PLAIN ENCODE BYTES +# statement ok +# create table s28 (id bytea, PRIMARY KEY(_rw_key)) with ( +# connector = 'kafka', +# topic = 'kafka_source_format_bytes', +# properties.bootstrap.server = 'message_queue:29092', +# scan.startup.mode = 'earliest' +# ) FORMAT PLAIN ENCODE BYTES # throttle option statement ok @@ -858,8 +858,8 @@ drop source s24 statement ok drop table s27 -statement ok -drop table s28 +# statement ok +# drop table s28 statement ok drop table s29 diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index 366a353b570e8..10473a031e89c 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -208,7 +208,7 @@ mod test { use apache_avro::{Codec, Days, Duration, Millis, Months, Reader, Schema, Writer}; use itertools::Itertools; use risingwave_common::array::Op; - use risingwave_common::catalog::{ColumnId, DEFAULT_KEY_COLUMN_NAME}; + use risingwave_common::catalog::ColumnId; use risingwave_common::row::Row; use risingwave_common::types::{DataType, Date, Interval, ScalarImpl, Timestamptz}; use risingwave_common::{error, try_match_expand}; @@ -221,12 +221,10 @@ mod test { AvroParserConfig, }; use crate::common::AwsAuthProps; - use crate::parser::bytes_parser::BytesAccessBuilder; use crate::parser::plain_parser::PlainParser; use crate::parser::unified::avro::unix_epoch_days; use crate::parser::{ - AccessBuilderImpl, BytesProperties, EncodingProperties, EncodingType, - SourceStreamChunkBuilder, SpecificParserConfig, + AccessBuilderImpl, EncodingType, SourceStreamChunkBuilder, SpecificParserConfig, }; use crate::source::SourceColumnDesc; @@ -307,11 +305,6 @@ mod test { let conf = new_avro_conf_from_local(file_name).await?; Ok(PlainParser { - key_builder: AccessBuilderImpl::Bytes(BytesAccessBuilder::new( - EncodingProperties::Bytes(BytesProperties { - column_name: Some(DEFAULT_KEY_COLUMN_NAME.into()), - }), - )?), payload_builder: AccessBuilderImpl::Avro(AvroAccessBuilder::new( conf, EncodingType::Value, @@ -335,75 +328,68 @@ mod test { let flush = writer.flush().unwrap(); assert!(flush > 0); let input_data = writer.into_inner().unwrap(); - // _rw_key test cases - let key_testcases = vec![Some(br#"r"#.to_vec()), Some(vec![]), None]; let columns = build_rw_columns(); - for key_data in key_testcases { - let mut builder = SourceStreamChunkBuilder::with_capacity(columns.clone(), 1); - { - let writer = builder.row_writer(); - parser - .parse_inner(key_data, Some(input_data.clone()), writer) - .await - .unwrap(); - } - let chunk = builder.finish(); - let (op, row) = chunk.rows().next().unwrap(); - assert_eq!(op, Op::Insert); - let row = row.into_owned_row(); - for (i, field) in record.fields.iter().enumerate() { - let value = field.clone().1; - match value { - Value::String(str) | Value::Union(_, box Value::String(str)) => { - assert_eq!(row[i], Some(ScalarImpl::Utf8(str.into_boxed_str()))); - } - Value::Boolean(bool_val) => { - assert_eq!(row[i], Some(ScalarImpl::Bool(bool_val))); - } - Value::Int(int_val) => { - assert_eq!(row[i], Some(ScalarImpl::Int32(int_val))); - } - Value::Long(i64_val) => { - assert_eq!(row[i], Some(ScalarImpl::Int64(i64_val))); - } - Value::Float(f32_val) => { - assert_eq!(row[i], Some(ScalarImpl::Float32(f32_val.into()))); - } - Value::Double(f64_val) => { - assert_eq!(row[i], Some(ScalarImpl::Float64(f64_val.into()))); - } - Value::Date(days) => { - assert_eq!( - row[i], - Some(ScalarImpl::Date( - Date::with_days(days + unix_epoch_days()).unwrap(), - )) - ); - } - Value::TimestampMillis(millis) => { - assert_eq!( - row[i], - Some(Timestamptz::from_millis(millis).unwrap().into()) - ); - } - Value::TimestampMicros(micros) => { - assert_eq!(row[i], Some(Timestamptz::from_micros(micros).into())); - } - Value::Bytes(bytes) => { - assert_eq!(row[i], Some(ScalarImpl::Bytea(bytes.into_boxed_slice()))); - } - Value::Duration(duration) => { - let months = u32::from(duration.months()) as i32; - let days = u32::from(duration.days()) as i32; - let usecs = (u32::from(duration.millis()) as i64) * 1000; // never overflows - assert_eq!( - row[i], - Some(Interval::from_month_day_usec(months, days, usecs).into()) - ); - } - _ => { - unreachable!() - } + let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 1); + { + let writer = builder.row_writer(); + parser.parse_inner(input_data, writer).await.unwrap(); + } + let chunk = builder.finish(); + let (op, row) = chunk.rows().next().unwrap(); + assert_eq!(op, Op::Insert); + let row = row.into_owned_row(); + for (i, field) in record.fields.iter().enumerate() { + let value = field.clone().1; + match value { + Value::String(str) | Value::Union(_, box Value::String(str)) => { + assert_eq!(row[i], Some(ScalarImpl::Utf8(str.into_boxed_str()))); + } + Value::Boolean(bool_val) => { + assert_eq!(row[i], Some(ScalarImpl::Bool(bool_val))); + } + Value::Int(int_val) => { + assert_eq!(row[i], Some(ScalarImpl::Int32(int_val))); + } + Value::Long(i64_val) => { + assert_eq!(row[i], Some(ScalarImpl::Int64(i64_val))); + } + Value::Float(f32_val) => { + assert_eq!(row[i], Some(ScalarImpl::Float32(f32_val.into()))); + } + Value::Double(f64_val) => { + assert_eq!(row[i], Some(ScalarImpl::Float64(f64_val.into()))); + } + Value::Date(days) => { + assert_eq!( + row[i], + Some(ScalarImpl::Date( + Date::with_days(days + unix_epoch_days()).unwrap(), + )) + ); + } + Value::TimestampMillis(millis) => { + assert_eq!( + row[i], + Some(Timestamptz::from_millis(millis).unwrap().into()) + ); + } + Value::TimestampMicros(micros) => { + assert_eq!(row[i], Some(Timestamptz::from_micros(micros).into())); + } + Value::Bytes(bytes) => { + assert_eq!(row[i], Some(ScalarImpl::Bytea(bytes.into_boxed_slice()))); + } + Value::Duration(duration) => { + let months = u32::from(duration.months()) as i32; + let days = u32::from(duration.days()) as i32; + let usecs = (u32::from(duration.millis()) as i64) * 1000; // never overflows + assert_eq!( + row[i], + Some(Interval::from_month_day_usec(months, days, usecs).into()) + ); + } + _ => { + unreachable!() } } } diff --git a/src/connector/src/parser/bytes_parser.rs b/src/connector/src/parser/bytes_parser.rs index 3001c837882d1..2a0b2f1b90f2a 100644 --- a/src/connector/src/parser/bytes_parser.rs +++ b/src/connector/src/parser/bytes_parser.rs @@ -55,15 +55,11 @@ mod tests { SourceStreamChunkBuilder, SpecificParserConfig, }; - type Item = (Vec, Vec); - fn get_item() -> Vec { - vec![ - (br#"a"#.to_vec(), br#"t"#.to_vec()), - (br#"r"#.to_vec(), br#"random"#.to_vec()), - ] + fn get_payload() -> Vec> { + vec![br#"t"#.to_vec(), br#"random"#.to_vec()] } - async fn test_bytes_parser(get_item: fn() -> Vec) { + async fn test_bytes_parser(get_payload: fn() -> Vec>) { let descs = vec![SourceColumnDesc::simple("id", DataType::Bytea, 0.into())]; let props = SpecificParserConfig { key_encoding_config: None, @@ -76,12 +72,9 @@ mod tests { let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 2); - for item in get_item() { + for payload in get_payload() { let writer = builder.row_writer(); - parser - .parse_inner(Some(item.0), Some(item.1), writer) - .await - .unwrap(); + parser.parse_inner(payload, writer).await.unwrap(); } let chunk = builder.finish(); @@ -107,6 +100,6 @@ mod tests { #[tokio::test] async fn test_bytes_parse_object_top_level() { - test_bytes_parser(get_item).await; + test_bytes_parser(get_payload).await; } } diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index 7448efb6f19d2..defb7ef54a1e6 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -12,22 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::catalog::DEFAULT_KEY_COLUMN_NAME; use risingwave_common::error::ErrorCode::ProtocolError; use risingwave_common::error::{ErrorCode, Result, RwError}; -use super::bytes_parser::BytesAccessBuilder; -use super::unified::util::apply_key_val_accessor_on_stream_chunk_writer; +use super::unified::util::apply_row_accessor_on_stream_chunk_writer; use super::{ - AccessBuilderImpl, ByteStreamSourceParser, BytesProperties, EncodingProperties, EncodingType, + AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, SourceStreamChunkRowWriter, SpecificParserConfig, }; +use crate::only_parse_payload; use crate::parser::ParserFormat; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; #[derive(Debug)] pub struct PlainParser { - pub key_builder: AccessBuilderImpl, pub payload_builder: AccessBuilderImpl, pub(crate) rw_columns: Vec, pub source_ctx: SourceContextRef, @@ -39,11 +37,6 @@ impl PlainParser { rw_columns: Vec, source_ctx: SourceContextRef, ) -> Result { - let key_builder = AccessBuilderImpl::Bytes(BytesAccessBuilder::new( - EncodingProperties::Bytes(BytesProperties { - column_name: Some(DEFAULT_KEY_COLUMN_NAME.into()), - }), - )?); let payload_builder = match props.encoding_config { EncodingProperties::Protobuf(_) | EncodingProperties::Avro(_) @@ -57,7 +50,6 @@ impl PlainParser { } }; Ok(Self { - key_builder, payload_builder, rw_columns, source_ctx, @@ -66,28 +58,12 @@ impl PlainParser { pub async fn parse_inner( &mut self, - key: Option>, - payload: Option>, + payload: Vec, mut writer: SourceStreamChunkRowWriter<'_>, ) -> Result<()> { - // if key is empty, set it as vec![]su - let key_data = key.unwrap_or_default(); - // if payload is empty, report error - let payload_data = payload.ok_or_else(|| { - RwError::from(ErrorCode::InternalError( - "Empty payload with nonempty key".into(), - )) - })?; + let accessor = self.payload_builder.generate_accessor(payload).await?; - let key_accessor = self.key_builder.generate_accessor(key_data).await?; - let payload_accessor = self.payload_builder.generate_accessor(payload_data).await?; - apply_key_val_accessor_on_stream_chunk_writer( - DEFAULT_KEY_COLUMN_NAME, - key_accessor, - payload_accessor, - &mut writer, - ) - .map_err(Into::into) + apply_row_accessor_on_stream_chunk_writer(accessor, &mut writer).map_err(Into::into) } } @@ -106,10 +82,10 @@ impl ByteStreamSourceParser for PlainParser { async fn parse_one<'a>( &'a mut self, - key: Option>, + _key: Option>, payload: Option>, writer: SourceStreamChunkRowWriter<'a>, ) -> Result<()> { - self.parse_inner(key, payload, writer).await + only_parse_payload!(self, payload, writer) } } diff --git a/src/connector/src/parser/unified/util.rs b/src/connector/src/parser/unified/util.rs index 4117591aa9000..92cf5da3ac81c 100644 --- a/src/connector/src/parser/unified/util.rs +++ b/src/connector/src/parser/unified/util.rs @@ -14,7 +14,7 @@ use risingwave_common::error::{ErrorCode, RwError}; -use super::{Access, AccessError, AccessImpl, AccessResult, ChangeEvent}; +use super::{Access, AccessError, AccessResult, ChangeEvent}; use crate::parser::unified::ChangeEventOperation; use crate::parser::SourceStreamChunkRowWriter; use crate::source::SourceColumnDesc; @@ -46,22 +46,6 @@ pub fn apply_row_accessor_on_stream_chunk_writer( writer.insert(|column| accessor.access(&[&column.name], Some(&column.data_type))) } -pub fn apply_key_val_accessor_on_stream_chunk_writer( - key_column_name: &str, - key_accessor: AccessImpl<'_, '_>, - val_accessor: AccessImpl<'_, '_>, - writer: &mut SourceStreamChunkRowWriter<'_>, -) -> AccessResult<()> { - let f = |column: &SourceColumnDesc| { - if column.name == key_column_name { - key_accessor.access(&[&column.name], Some(&column.data_type)) - } else { - val_accessor.access(&[&column.name], Some(&column.data_type)) - } - }; - writer.insert(f) -} - impl From for RwError { fn from(val: AccessError) -> Self { ErrorCode::InternalError(format!("AccessError: {:?}", val)).into() diff --git a/src/frontend/planner_test/tests/testdata/output/batch_source.yaml b/src/frontend/planner_test/tests/testdata/output/batch_source.yaml index 04bc2ad98acfe..a802774eabf5b 100644 --- a/src/frontend/planner_test/tests/testdata/output/batch_source.yaml +++ b/src/frontend/planner_test/tests/testdata/output/batch_source.yaml @@ -3,11 +3,11 @@ select * from s logical_plan: |- LogicalProject { exprs: [id, value, _rw_key] } - └─LogicalSource { source: s, columns: [id, value, _rw_key, _rw_kafka_timestamp, _row_id], time_range: (Unbounded, Unbounded) } + └─LogicalSource { source: s, columns: [id, value, _rw_kafka_timestamp, _row_id], time_range: (Unbounded, Unbounded) } batch_plan: |- BatchExchange { order: [], dist: Single } └─BatchProject { exprs: [id, value, _rw_key] } - └─BatchSource { source: s, columns: [id, value, _rw_key, _rw_kafka_timestamp, _row_id], filter: (None, None) } + └─BatchSource { source: s, columns: [id, value, _rw_kafka_timestamp, _row_id], filter: (None, None) } create_source: format: plain encode: protobuf diff --git a/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml b/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml index cb7155cfa0a4f..cd3019382bd66 100644 --- a/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml +++ b/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml @@ -19,17 +19,17 @@ └─StreamProject { exprs: [v1, min(v2), count(distinct v3)], output_watermarks: [v1] } └─StreamHashAgg [append_only] { group_key: [v1], aggs: [min(v2), count(distinct v3), count], output_watermarks: [v1] } └─StreamExchange { dist: HashShard(v1) } - └─StreamRowIdGen { row_id_index: 4 } + └─StreamRowIdGen { row_id_index: 3 } └─StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - 10:Int32) }], output_watermarks: [v1] } - └─StreamSource { source: t, columns: [v1, v2, v3, _rw_key, _row_id] } + └─StreamSource { source: t, columns: [v1, v2, v3, _row_id] } eowc_stream_plan: |- StreamMaterialize { columns: [v1, min, agg], stream_key: [v1], pk_columns: [v1], pk_conflict: NoCheck, watermark_columns: [v1] } └─StreamProject { exprs: [v1, min(v2), count(distinct v3)], output_watermarks: [v1] } └─StreamHashAgg [append_only, eowc] { group_key: [v1], aggs: [min(v2), count(distinct v3), count], output_watermarks: [v1] } └─StreamExchange { dist: HashShard(v1) } - └─StreamRowIdGen { row_id_index: 4 } + └─StreamRowIdGen { row_id_index: 3 } └─StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - 10:Int32) }], output_watermarks: [v1] } - └─StreamSource { source: t, columns: [v1, v2, v3, _rw_key, _row_id] } + └─StreamSource { source: t, columns: [v1, v2, v3, _row_id] } eowc_stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [v1, min, agg], stream_key: [v1], pk_columns: [v1], pk_conflict: NoCheck, watermark_columns: [v1] } @@ -42,10 +42,10 @@ └── StreamExchange Hash([0]) from 1 Fragment 1 - StreamRowIdGen { row_id_index: 4 } + StreamRowIdGen { row_id_index: 3 } └── StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - 10:Int32) }], output_watermarks: [v1] } ├── state tables: [ 2 ] - └── StreamSource { source: t, columns: [v1, v2, v3, _rw_key, _row_id] } { source state table: 3 } + └── StreamSource { source: t, columns: [v1, v2, v3, _row_id] } { source state table: 3 } Table 0 ├── columns: [ v1, min(v2), count(distinct v3), count ] @@ -142,9 +142,9 @@ └─StreamEowcSort { sort_column: tm } └─StreamExchange { dist: HashShard(b) } └─StreamProject { exprs: [a, b, tm, _row_id], output_watermarks: [tm] } - └─StreamRowIdGen { row_id_index: 4 } + └─StreamRowIdGen { row_id_index: 3 } └─StreamWatermarkFilter { watermark_descs: [Desc { column: tm, expr: (tm - '00:05:00':Interval) }], output_watermarks: [tm] } - └─StreamSource { source: t, columns: [a, b, tm, _rw_key, _row_id] } + └─StreamSource { source: t, columns: [a, b, tm, _row_id] } eowc_stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [lag, _row_id(hidden), b(hidden)], stream_key: [_row_id, b], pk_columns: [_row_id, b], pk_conflict: NoCheck } @@ -157,10 +157,10 @@ Fragment 1 StreamProject { exprs: [a, b, tm, _row_id], output_watermarks: [tm] } - └── StreamRowIdGen { row_id_index: 4 } + └── StreamRowIdGen { row_id_index: 3 } └── StreamWatermarkFilter { watermark_descs: [Desc { column: tm, expr: (tm - '00:05:00':Interval) }], output_watermarks: [tm] } ├── state tables: [ 2 ] - └── StreamSource { source: t, columns: [a, b, tm, _rw_key, _row_id] } { source state table: 3 } + └── StreamSource { source: t, columns: [a, b, tm, _row_id] } { source state table: 3 } Table 0 ├── columns: [ a, b, tm, _row_id ] diff --git a/src/frontend/planner_test/tests/testdata/output/generated_columns.yaml b/src/frontend/planner_test/tests/testdata/output/generated_columns.yaml index 445609c3f2bb0..e88df797de0c8 100644 --- a/src/frontend/planner_test/tests/testdata/output/generated_columns.yaml +++ b/src/frontend/planner_test/tests/testdata/output/generated_columns.yaml @@ -17,7 +17,7 @@ batch_plan: |- BatchExchange { order: [], dist: Single } └─BatchProject { exprs: [(v2 + 1:Int32) as $expr1] } - └─BatchSource { source: s1, columns: [v2, _rw_key, _row_id], filter: (None, None) } + └─BatchSource { source: s1, columns: [v2, _row_id], filter: (None, None) } - name: select proctime() sql: | select proctime(); diff --git a/src/frontend/planner_test/tests/testdata/output/struct_query.yaml b/src/frontend/planner_test/tests/testdata/output/struct_query.yaml index f55c30f4436a8..297064dc0b97c 100644 --- a/src/frontend/planner_test/tests/testdata/output/struct_query.yaml +++ b/src/frontend/planner_test/tests/testdata/output/struct_query.yaml @@ -395,7 +395,7 @@ logical_plan: |- LogicalProject { exprs: [s.v1, s.v2, s.v3, s._rw_key] } └─LogicalFilter { predicate: (s.v3 = Row(1:Int32, 2:Int32, Row(1:Int32, 2:Int32, 3:Int32))) } - └─LogicalScan { table: s, columns: [s.v1, s.v2, s.v3, s._rw_key, s._row_id] } + └─LogicalScan { table: s, columns: [s.v1, s.v2, s.v3, s._row_id] } create_table_with_connector: format: plain encode: protobuf diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 85a560f532885..0e6dc219ac5b8 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -582,7 +582,6 @@ pub(crate) async fn bind_columns_from_source( ); session.notice_to_user(err_string); } - Ok(res) } @@ -694,13 +693,7 @@ pub(crate) async fn bind_source_pk( let sql_defined_pk = !sql_defined_pk_names.is_empty(); let res = match (&source_schema.format, &source_schema.row_encode) { - (Format::Native, Encode::Native) => sql_defined_pk_names, - (Format::Plain, _) => { - if is_key_mq_connector(with_properties) { - add_default_key_column(columns); - } - sql_defined_pk_names - } + (Format::Native, Encode::Native) | (Format::Plain, _) => sql_defined_pk_names, (Format::Upsert, Encode::Json) => { if sql_defined_pk { sql_defined_pk_names @@ -1248,8 +1241,8 @@ pub mod tests { use std::collections::HashMap; use risingwave_common::catalog::{ - CDC_SOURCE_COLUMN_NUM, DEFAULT_DATABASE_NAME, DEFAULT_KEY_COLUMN_NAME, DEFAULT_SCHEMA_NAME, - OFFSET_COLUMN_NAME, ROWID_PREFIX, TABLE_NAME_COLUMN_NAME, + CDC_SOURCE_COLUMN_NUM, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, OFFSET_COLUMN_NAME, + ROWID_PREFIX, TABLE_NAME_COLUMN_NAME, }; use risingwave_common::types::DataType; diff --git a/src/frontend/src/handler/show.rs b/src/frontend/src/handler/show.rs index 42ba3b6ad968d..8a7dacfd8a5eb 100644 --- a/src/frontend/src/handler/show.rs +++ b/src/frontend/src/handler/show.rs @@ -499,7 +499,6 @@ mod tests { "country".into() => "test.Country".into(), "_rw_kafka_timestamp".into() => "timestamp with time zone".into(), "_row_id".into() => "serial".into(), - "_rw_key".into() => "bytea".into() }; assert_eq!(columns, expected_columns); diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index 583af954bb2c5..b2a9031027251 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -32,7 +32,7 @@ use risingwave_common::error::{ErrorCode, Result as RwResult}; use risingwave_common::row::Row as _; use risingwave_common::types::{DataType, ScalarRefImpl, Timestamptz}; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_connector::source::{KAFKA_CONNECTOR, KINESIS_CONNECTOR, PULSAR_CONNECTOR}; +use risingwave_connector::source::KAFKA_CONNECTOR; use risingwave_sqlparser::ast::{display_comma_separated, CompatibleSourceSchema, ConnectorSchema}; use crate::catalog::IndexCatalog; @@ -257,18 +257,6 @@ pub fn is_kafka_connector(with_properties: &HashMap) -> bool { connector == KAFKA_CONNECTOR } -#[inline(always)] -pub fn is_key_mq_connector(with_properties: &HashMap) -> bool { - let Some(connector) = get_connector(with_properties) else { - return false; - }; - - matches!( - connector.as_str(), - KINESIS_CONNECTOR | PULSAR_CONNECTOR | KAFKA_CONNECTOR - ) -} - #[inline(always)] pub fn is_cdc_connector(with_properties: &HashMap) -> bool { let Some(connector) = get_connector(with_properties) else { From f0c3ff3ba5ec5bb974793f82d239914535e35339 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Mon, 4 Dec 2023 16:10:50 +0800 Subject: [PATCH 3/6] update planner test --- .../tests/testdata/input/batch_source.yaml | 16 ----------- .../tests/testdata/output/batch_source.yaml | 28 +++---------------- .../tests/testdata/output/struct_query.yaml | 22 +++++++-------- .../tests/testdata/output/watermark.yaml | 14 +++++----- src/frontend/src/handler/create_source.rs | 6 ++-- 5 files changed, 25 insertions(+), 61 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/input/batch_source.yaml b/src/frontend/planner_test/tests/testdata/input/batch_source.yaml index 486ce8bc26d20..5890cc6866b9f 100644 --- a/src/frontend/planner_test/tests/testdata/input/batch_source.yaml +++ b/src/frontend/planner_test/tests/testdata/input/batch_source.yaml @@ -30,19 +30,3 @@ expected_outputs: - batch_plan - logical_plan -- sql: | - insert into s values (1,2, E'\\xDEADBEEF'::bytea); - create_table_with_connector: - format: plain - encode: protobuf - name: s - file: | - syntax = "proto3"; - package test; - message TestRecord { - int32 id = 1; - int32 value = 2; - } - expected_outputs: - - batch_plan - - logical_plan diff --git a/src/frontend/planner_test/tests/testdata/output/batch_source.yaml b/src/frontend/planner_test/tests/testdata/output/batch_source.yaml index a802774eabf5b..8613f28305667 100644 --- a/src/frontend/planner_test/tests/testdata/output/batch_source.yaml +++ b/src/frontend/planner_test/tests/testdata/output/batch_source.yaml @@ -2,11 +2,11 @@ - sql: | select * from s logical_plan: |- - LogicalProject { exprs: [id, value, _rw_key] } + LogicalProject { exprs: [id, value] } └─LogicalSource { source: s, columns: [id, value, _rw_kafka_timestamp, _row_id], time_range: (Unbounded, Unbounded) } batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchProject { exprs: [id, value, _rw_key] } + └─BatchProject { exprs: [id, value] } └─BatchSource { source: s, columns: [id, value, _rw_kafka_timestamp, _row_id], filter: (None, None) } create_source: format: plain @@ -22,11 +22,11 @@ - sql: | insert into s values (1,2); logical_plan: |- - LogicalInsert { table: s, mapping: [0:0, 1:1], default: [2<-null:Bytea] } + LogicalInsert { table: s, mapping: [0:0, 1:1] } └─LogicalValues { rows: [[1:Int32, 2:Int32]], schema: Schema { fields: [*VALUES*_0.column_0:Int32, *VALUES*_0.column_1:Int32] } } batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchInsert { table: s, mapping: [0:0, 1:1], default: [2<-null:Bytea] } + └─BatchInsert { table: s, mapping: [0:0, 1:1] } └─BatchValues { rows: [[1:Int32, 2:Int32]] } create_table_with_connector: format: plain @@ -39,23 +39,3 @@ int32 id = 1; int32 value = 2; } -- sql: | - insert into s values (1,2, E'\\xDEADBEEF'::bytea); - logical_plan: |- - LogicalInsert { table: s, mapping: [0:0, 1:1, 2:2] } - └─LogicalValues { rows: [[1:Int32, 2:Int32, '\xdeadbeef':Bytea]], schema: Schema { fields: [*VALUES*_0.column_0:Int32, *VALUES*_0.column_1:Int32, *VALUES*_0.column_2:Bytea] } } - batch_plan: |- - BatchExchange { order: [], dist: Single } - └─BatchInsert { table: s, mapping: [0:0, 1:1, 2:2] } - └─BatchValues { rows: [[1:Int32, 2:Int32, '\xdeadbeef':Bytea]] } - create_table_with_connector: - format: plain - encode: protobuf - name: s - file: | - syntax = "proto3"; - package test; - message TestRecord { - int32 id = 1; - int32 value = 2; - } diff --git a/src/frontend/planner_test/tests/testdata/output/struct_query.yaml b/src/frontend/planner_test/tests/testdata/output/struct_query.yaml index 297064dc0b97c..fc80a61cfcf78 100644 --- a/src/frontend/planner_test/tests/testdata/output/struct_query.yaml +++ b/src/frontend/planner_test/tests/testdata/output/struct_query.yaml @@ -36,7 +36,7 @@ select (t.country).city,t.country,(country).city.address from t; logical_plan: |- LogicalProject { exprs: [Field(t.country, 1:Int32) as $expr1, t.country, Field(Field(t.country, 1:Int32), 0:Int32) as $expr2] } - └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._rw_key, t._row_id] } + └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._row_id] } create_source: format: plain encode: protobuf @@ -80,7 +80,7 @@ select (t.country1).city.*,(t.country2).*,(country3).city.* from t; logical_plan: |- LogicalProject { exprs: [Field(Field(t.country1, 1:Int32), 0:Int32) as $expr1, Field(Field(t.country1, 1:Int32), 1:Int32) as $expr2, Field(t.country2, 0:Int32) as $expr3, Field(t.country2, 1:Int32) as $expr4, Field(t.country2, 2:Int32) as $expr5, Field(Field(t.country3, 1:Int32), 0:Int32) as $expr6, Field(Field(t.country3, 1:Int32), 1:Int32) as $expr7] } - └─LogicalScan { table: t, columns: [t.id, t.country1, t.country2, t.country3, t.zipcode, t.rate, t._rw_key, t._row_id] } + └─LogicalScan { table: t, columns: [t.id, t.country1, t.country2, t.country3, t.zipcode, t.rate, t._row_id] } create_source: format: plain encode: protobuf @@ -112,7 +112,7 @@ logical_plan: |- LogicalProject { exprs: [Field($expr1, 1:Int32) as $expr2] } └─LogicalProject { exprs: [Field(t.country, 1:Int32) as $expr1] } - └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._rw_key, t._row_id] } + └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._row_id] } create_source: format: plain encode: protobuf @@ -144,7 +144,7 @@ └─LogicalProject { exprs: [min($expr1)] } └─LogicalAgg { aggs: [min($expr1)] } └─LogicalProject { exprs: [Field(t.country, 1:Int32) as $expr1] } - └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._rw_key, t._row_id] } + └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._row_id] } create_source: format: plain encode: protobuf @@ -171,11 +171,11 @@ create materialized view t as select * from s; select * from (select (country).city as c from t) as vv join t on (c).zipcode=(t.country).zipcode; logical_plan: |- - LogicalProject { exprs: [$expr1, t.id, t.country, t.zipcode, t.rate, t._rw_key] } + LogicalProject { exprs: [$expr1, t.id, t.country, t.zipcode, t.rate] } └─LogicalJoin { type: Inner, on: (Field($expr1, 1:Int32) = Field(t.country, 2:Int32)), output: all } ├─LogicalProject { exprs: [Field(t.country, 1:Int32) as $expr1] } - │ └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._rw_key, t._row_id] } - └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._rw_key, t._row_id] } + │ └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._row_id] } + └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._row_id] } create_source: format: plain encode: protobuf @@ -205,7 +205,7 @@ LogicalProject { exprs: [(min($expr1) + (max($expr1) * count(t.zipcode))) as $expr2] } └─LogicalAgg { aggs: [min($expr1), max($expr1), count(t.zipcode)] } └─LogicalProject { exprs: [Field(Field(t.country, 1:Int32), 0:Int32) as $expr1, t.zipcode] } - └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._rw_key, t._row_id] } + └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._row_id] } create_source: format: plain encode: protobuf @@ -236,7 +236,7 @@ └─LogicalAgg { aggs: [count(1:Int32), count($expr1)] } └─LogicalProject { exprs: [1:Int32, Field(Field(t.country, 1:Int32), 1:Int32) as $expr1] } └─LogicalFilter { predicate: (Field(Field(t.country, 1:Int32), 0:Int32) > 1:Int32) } - └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._rw_key, t._row_id] } + └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._row_id] } create_source: format: plain encode: protobuf @@ -366,7 +366,7 @@ - sql: | insert into s values (1,2,(1,2,(1,2,null))); logical_plan: |- - LogicalInsert { table: s, mapping: [0:0, 1:1, 2:2], default: [3<-null:Bytea] } + LogicalInsert { table: s, mapping: [0:0, 1:1, 2:2] } └─LogicalValues { rows: [[1:Int32, 2:Int32, Row(1:Int32, 2:Int32, Row(1:Int32, 2:Int32, null:Int32))]], schema: Schema { fields: [*VALUES*_0.column_0:Int32, *VALUES*_0.column_1:Int32, *VALUES*_0.column_2:Struct(StructType { field_names: ["v1", "v2", "v3"], field_types: [Int32, Int32, Struct(StructType { field_names: ["v1", "v2", "v3"], field_types: [Int32, Int32, Int32] })] })] } } create_table_with_connector: format: plain @@ -393,7 +393,7 @@ - sql: | select * from s where s.v3 = (1,2,(1,2,3)); logical_plan: |- - LogicalProject { exprs: [s.v1, s.v2, s.v3, s._rw_key] } + LogicalProject { exprs: [s.v1, s.v2, s.v3] } └─LogicalFilter { predicate: (s.v3 = Row(1:Int32, 2:Int32, Row(1:Int32, 2:Int32, 3:Int32))) } └─LogicalScan { table: s, columns: [s.v1, s.v2, s.v3, s._row_id] } create_table_with_connector: diff --git a/src/frontend/planner_test/tests/testdata/output/watermark.yaml b/src/frontend/planner_test/tests/testdata/output/watermark.yaml index 18c0cc558c3ec..cc54951beec4a 100644 --- a/src/frontend/planner_test/tests/testdata/output/watermark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/watermark.yaml @@ -5,22 +5,22 @@ select t.v1 - INTERVAL '2' SECOND as v1 from t; logical_plan: |- LogicalProject { exprs: [(v1 - '00:00:02':Interval) as $expr1] } - └─LogicalSource { source: t, columns: [v1, _rw_key, _row_id], time_range: (Unbounded, Unbounded) } + └─LogicalSource { source: t, columns: [v1, _row_id], time_range: (Unbounded, Unbounded) } stream_plan: |- StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck, watermark_columns: [v1] } └─StreamProject { exprs: [SubtractWithTimeZone(v1, '00:00:02':Interval, 'UTC':Varchar) as $expr1, _row_id], output_watermarks: [$expr1] } - └─StreamRowIdGen { row_id_index: 2 } + └─StreamRowIdGen { row_id_index: 1 } └─StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - '00:00:01':Interval) }], output_watermarks: [v1] } - └─StreamSource { source: t, columns: [v1, _rw_key, _row_id] } + └─StreamSource { source: t, columns: [v1, _row_id] } stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck, watermark_columns: [v1] } ├── materialized table: 4294967294 └── StreamProject { exprs: [SubtractWithTimeZone(v1, '00:00:02':Interval, 'UTC':Varchar) as $expr1, _row_id], output_watermarks: [$expr1] } - └── StreamRowIdGen { row_id_index: 2 } + └── StreamRowIdGen { row_id_index: 1 } └── StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - '00:00:01':Interval) }], output_watermarks: [v1] } ├── state tables: [ 0 ] - └── StreamSource { source: t, columns: [v1, _rw_key, _row_id] } { source state table: 1 } + └── StreamSource { source: t, columns: [v1, _row_id] } { source state table: 1 } Table 0 ├── columns: [ vnode, offset ] @@ -48,8 +48,8 @@ sql: | explain create table t (v1 timestamp with time zone, watermark for v1 as v1 - INTERVAL '1' SECOND) append only with (connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest') FORMAT PLAIN ENCODE JSON; explain_output: | - StreamMaterialize { columns: [v1, _rw_key(hidden), _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck, watermark_columns: [v1] } - └─StreamRowIdGen { row_id_index: 2 } + StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck, watermark_columns: [v1] } + └─StreamRowIdGen { row_id_index: 1 } └─StreamWatermarkFilter { watermark_descs: [Desc { column: v1, expr: (v1 - '00:00:01':Interval) }], output_watermarks: [v1] } └─StreamUnion { all: true } ├─StreamExchange [no_shuffle] { dist: SomeShard } diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 0e6dc219ac5b8..a6ead36426a33 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -65,7 +65,7 @@ use crate::handler::create_table::{ ensure_table_constraints_supported, ColumnIdGenerator, }; use crate::handler::util::{ - get_connector, is_cdc_connector, is_kafka_connector, is_key_mq_connector, SourceSchemaCompatExt, + get_connector, is_cdc_connector, is_kafka_connector, SourceSchemaCompatExt, }; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext}; @@ -1241,8 +1241,8 @@ pub mod tests { use std::collections::HashMap; use risingwave_common::catalog::{ - CDC_SOURCE_COLUMN_NUM, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, OFFSET_COLUMN_NAME, - ROWID_PREFIX, TABLE_NAME_COLUMN_NAME, + CDC_SOURCE_COLUMN_NUM, DEFAULT_DATABASE_NAME, DEFAULT_KEY_COLUMN_NAME, DEFAULT_SCHEMA_NAME, + OFFSET_COLUMN_NAME, ROWID_PREFIX, TABLE_NAME_COLUMN_NAME, }; use risingwave_common::types::DataType; From fd99cef4374531fbdf473931b9468eeb2e4f17e3 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Mon, 4 Dec 2023 16:42:48 +0800 Subject: [PATCH 4/6] fix ut --- src/frontend/src/handler/create_source.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index a6ead36426a33..b1a7402e06990 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -1284,7 +1284,6 @@ pub mod tests { ); let expected_columns = maplit::hashmap! { ROWID_PREFIX => DataType::Serial, - DEFAULT_KEY_COLUMN_NAME => DataType::Bytea, "id" => DataType::Int32, "zipcode" => DataType::Int64, "rate" => DataType::Float32, From 0d85577916d62cf0505b10e33a387b616ad56b05 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Mon, 4 Dec 2023 16:44:44 +0800 Subject: [PATCH 5/6] fix s2s --- e2e_test/source/basic/kafka.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/source/basic/kafka.slt b/e2e_test/source/basic/kafka.slt index e51c51cbba906..b4782d36ced80 100644 --- a/e2e_test/source/basic/kafka.slt +++ b/e2e_test/source/basic/kafka.slt @@ -378,7 +378,7 @@ create table s27 with ( # throttle option statement ok -create table s29 (id bytea, PRIMARY KEY(_rw_key)) with ( +create table s29 (id bytea) with ( connector = 'kafka', topic = 'kafka_source_format_bytes', properties.bootstrap.server = 'message_queue:29092', From c1becaed83ddc6ea680c9b44babb96feb0e23fdb Mon Sep 17 00:00:00 2001 From: tabVersion Date: Mon, 4 Dec 2023 16:51:51 +0800 Subject: [PATCH 6/6] format --- src/frontend/src/handler/create_source.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index b1a7402e06990..7050d0b10ccfb 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -1241,8 +1241,8 @@ pub mod tests { use std::collections::HashMap; use risingwave_common::catalog::{ - CDC_SOURCE_COLUMN_NUM, DEFAULT_DATABASE_NAME, DEFAULT_KEY_COLUMN_NAME, DEFAULT_SCHEMA_NAME, - OFFSET_COLUMN_NAME, ROWID_PREFIX, TABLE_NAME_COLUMN_NAME, + CDC_SOURCE_COLUMN_NUM, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, OFFSET_COLUMN_NAME, + ROWID_PREFIX, TABLE_NAME_COLUMN_NAME, }; use risingwave_common::types::DataType;