diff --git a/e2e_test/source/basic/kafka.slt b/e2e_test/source/basic/kafka.slt index 21b6db8a08550..b4782d36ced80 100644 --- a/e2e_test/source/basic/kafka.slt +++ b/e2e_test/source/basic/kafka.slt @@ -368,17 +368,17 @@ create table s27 with ( ) FORMAT PLAIN ENCODE JSON (schema.location = 'file:///risingwave/json-complex-schema') # currently _rw_key can be set as primary key -statement ok -create table s28 (id bytea, PRIMARY KEY(_rw_key)) with ( - connector = 'kafka', - topic = 'kafka_source_format_bytes', - properties.bootstrap.server = 'message_queue:29092', - scan.startup.mode = 'earliest' -) FORMAT PLAIN ENCODE BYTES +# statement ok +# create table s28 (id bytea, PRIMARY KEY(_rw_key)) with ( +# connector = 'kafka', +# topic = 'kafka_source_format_bytes', +# properties.bootstrap.server = 'message_queue:29092', +# scan.startup.mode = 'earliest' +# ) FORMAT PLAIN ENCODE BYTES # throttle option statement ok -create table s29 (id bytea, PRIMARY KEY(_rw_key)) with ( +create table s29 (id bytea) with ( connector = 'kafka', topic = 'kafka_source_format_bytes', properties.bootstrap.server = 'message_queue:29092', @@ -858,8 +858,8 @@ drop source s24 statement ok drop table s27 -statement ok -drop table s28 +# statement ok +# drop table s28 statement ok drop table s29 diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index 366a353b570e8..10473a031e89c 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -208,7 +208,7 @@ mod test { use apache_avro::{Codec, Days, Duration, Millis, Months, Reader, Schema, Writer}; use itertools::Itertools; use risingwave_common::array::Op; - use risingwave_common::catalog::{ColumnId, DEFAULT_KEY_COLUMN_NAME}; + use risingwave_common::catalog::ColumnId; use risingwave_common::row::Row; use risingwave_common::types::{DataType, Date, Interval, ScalarImpl, Timestamptz}; use risingwave_common::{error, try_match_expand}; @@ -221,12 +221,10 @@ mod test { AvroParserConfig, }; use crate::common::AwsAuthProps; - use crate::parser::bytes_parser::BytesAccessBuilder; use crate::parser::plain_parser::PlainParser; use crate::parser::unified::avro::unix_epoch_days; use crate::parser::{ - AccessBuilderImpl, BytesProperties, EncodingProperties, EncodingType, - SourceStreamChunkBuilder, SpecificParserConfig, + AccessBuilderImpl, EncodingType, SourceStreamChunkBuilder, SpecificParserConfig, }; use crate::source::SourceColumnDesc; @@ -307,11 +305,6 @@ mod test { let conf = new_avro_conf_from_local(file_name).await?; Ok(PlainParser { - key_builder: AccessBuilderImpl::Bytes(BytesAccessBuilder::new( - EncodingProperties::Bytes(BytesProperties { - column_name: Some(DEFAULT_KEY_COLUMN_NAME.into()), - }), - )?), payload_builder: AccessBuilderImpl::Avro(AvroAccessBuilder::new( conf, EncodingType::Value, @@ -335,75 +328,68 @@ mod test { let flush = writer.flush().unwrap(); assert!(flush > 0); let input_data = writer.into_inner().unwrap(); - // _rw_key test cases - let key_testcases = vec![Some(br#"r"#.to_vec()), Some(vec![]), None]; let columns = build_rw_columns(); - for key_data in key_testcases { - let mut builder = SourceStreamChunkBuilder::with_capacity(columns.clone(), 1); - { - let writer = builder.row_writer(); - parser - .parse_inner(key_data, Some(input_data.clone()), writer) - .await - .unwrap(); - } - let chunk = builder.finish(); - let (op, row) = chunk.rows().next().unwrap(); - assert_eq!(op, Op::Insert); - let row = row.into_owned_row(); - for (i, field) in record.fields.iter().enumerate() { - let value = field.clone().1; - match value { - Value::String(str) | Value::Union(_, box Value::String(str)) => { - assert_eq!(row[i], Some(ScalarImpl::Utf8(str.into_boxed_str()))); - } - Value::Boolean(bool_val) => { - assert_eq!(row[i], Some(ScalarImpl::Bool(bool_val))); - } - Value::Int(int_val) => { - assert_eq!(row[i], Some(ScalarImpl::Int32(int_val))); - } - Value::Long(i64_val) => { - assert_eq!(row[i], Some(ScalarImpl::Int64(i64_val))); - } - Value::Float(f32_val) => { - assert_eq!(row[i], Some(ScalarImpl::Float32(f32_val.into()))); - } - Value::Double(f64_val) => { - assert_eq!(row[i], Some(ScalarImpl::Float64(f64_val.into()))); - } - Value::Date(days) => { - assert_eq!( - row[i], - Some(ScalarImpl::Date( - Date::with_days(days + unix_epoch_days()).unwrap(), - )) - ); - } - Value::TimestampMillis(millis) => { - assert_eq!( - row[i], - Some(Timestamptz::from_millis(millis).unwrap().into()) - ); - } - Value::TimestampMicros(micros) => { - assert_eq!(row[i], Some(Timestamptz::from_micros(micros).into())); - } - Value::Bytes(bytes) => { - assert_eq!(row[i], Some(ScalarImpl::Bytea(bytes.into_boxed_slice()))); - } - Value::Duration(duration) => { - let months = u32::from(duration.months()) as i32; - let days = u32::from(duration.days()) as i32; - let usecs = (u32::from(duration.millis()) as i64) * 1000; // never overflows - assert_eq!( - row[i], - Some(Interval::from_month_day_usec(months, days, usecs).into()) - ); - } - _ => { - unreachable!() - } + let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 1); + { + let writer = builder.row_writer(); + parser.parse_inner(input_data, writer).await.unwrap(); + } + let chunk = builder.finish(); + let (op, row) = chunk.rows().next().unwrap(); + assert_eq!(op, Op::Insert); + let row = row.into_owned_row(); + for (i, field) in record.fields.iter().enumerate() { + let value = field.clone().1; + match value { + Value::String(str) | Value::Union(_, box Value::String(str)) => { + assert_eq!(row[i], Some(ScalarImpl::Utf8(str.into_boxed_str()))); + } + Value::Boolean(bool_val) => { + assert_eq!(row[i], Some(ScalarImpl::Bool(bool_val))); + } + Value::Int(int_val) => { + assert_eq!(row[i], Some(ScalarImpl::Int32(int_val))); + } + Value::Long(i64_val) => { + assert_eq!(row[i], Some(ScalarImpl::Int64(i64_val))); + } + Value::Float(f32_val) => { + assert_eq!(row[i], Some(ScalarImpl::Float32(f32_val.into()))); + } + Value::Double(f64_val) => { + assert_eq!(row[i], Some(ScalarImpl::Float64(f64_val.into()))); + } + Value::Date(days) => { + assert_eq!( + row[i], + Some(ScalarImpl::Date( + Date::with_days(days + unix_epoch_days()).unwrap(), + )) + ); + } + Value::TimestampMillis(millis) => { + assert_eq!( + row[i], + Some(Timestamptz::from_millis(millis).unwrap().into()) + ); + } + Value::TimestampMicros(micros) => { + assert_eq!(row[i], Some(Timestamptz::from_micros(micros).into())); + } + Value::Bytes(bytes) => { + assert_eq!(row[i], Some(ScalarImpl::Bytea(bytes.into_boxed_slice()))); + } + Value::Duration(duration) => { + let months = u32::from(duration.months()) as i32; + let days = u32::from(duration.days()) as i32; + let usecs = (u32::from(duration.millis()) as i64) * 1000; // never overflows + assert_eq!( + row[i], + Some(Interval::from_month_day_usec(months, days, usecs).into()) + ); + } + _ => { + unreachable!() } } } diff --git a/src/connector/src/parser/bytes_parser.rs b/src/connector/src/parser/bytes_parser.rs index 3001c837882d1..2a0b2f1b90f2a 100644 --- a/src/connector/src/parser/bytes_parser.rs +++ b/src/connector/src/parser/bytes_parser.rs @@ -55,15 +55,11 @@ mod tests { SourceStreamChunkBuilder, SpecificParserConfig, }; - type Item = (Vec, Vec); - fn get_item() -> Vec { - vec![ - (br#"a"#.to_vec(), br#"t"#.to_vec()), - (br#"r"#.to_vec(), br#"random"#.to_vec()), - ] + fn get_payload() -> Vec> { + vec![br#"t"#.to_vec(), br#"random"#.to_vec()] } - async fn test_bytes_parser(get_item: fn() -> Vec) { + async fn test_bytes_parser(get_payload: fn() -> Vec>) { let descs = vec![SourceColumnDesc::simple("id", DataType::Bytea, 0.into())]; let props = SpecificParserConfig { key_encoding_config: None, @@ -76,12 +72,9 @@ mod tests { let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 2); - for item in get_item() { + for payload in get_payload() { let writer = builder.row_writer(); - parser - .parse_inner(Some(item.0), Some(item.1), writer) - .await - .unwrap(); + parser.parse_inner(payload, writer).await.unwrap(); } let chunk = builder.finish(); @@ -107,6 +100,6 @@ mod tests { #[tokio::test] async fn test_bytes_parse_object_top_level() { - test_bytes_parser(get_item).await; + test_bytes_parser(get_payload).await; } } diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index cd805af4a1893..defb7ef54a1e6 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -12,22 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::catalog::DEFAULT_KEY_COLUMN_NAME; use risingwave_common::error::ErrorCode::ProtocolError; use risingwave_common::error::{ErrorCode, Result, RwError}; -use super::bytes_parser::BytesAccessBuilder; -use super::unified::util::apply_key_val_accessor_on_stream_chunk_writer; +use super::unified::util::apply_row_accessor_on_stream_chunk_writer; use super::{ - AccessBuilderImpl, ByteStreamSourceParser, BytesProperties, EncodingProperties, EncodingType, + AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, SourceStreamChunkRowWriter, SpecificParserConfig, }; +use crate::only_parse_payload; use crate::parser::ParserFormat; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; #[derive(Debug)] pub struct PlainParser { - pub key_builder: AccessBuilderImpl, pub payload_builder: AccessBuilderImpl, pub(crate) rw_columns: Vec, pub source_ctx: SourceContextRef, @@ -39,11 +37,6 @@ impl PlainParser { rw_columns: Vec, source_ctx: SourceContextRef, ) -> Result { - let key_builder = AccessBuilderImpl::Bytes(BytesAccessBuilder::new( - EncodingProperties::Bytes(BytesProperties { - column_name: Some(DEFAULT_KEY_COLUMN_NAME.into()), - }), - )?); let payload_builder = match props.encoding_config { EncodingProperties::Protobuf(_) | EncodingProperties::Avro(_) @@ -57,7 +50,6 @@ impl PlainParser { } }; Ok(Self { - key_builder, payload_builder, rw_columns, source_ctx, @@ -66,28 +58,12 @@ impl PlainParser { pub async fn parse_inner( &mut self, - key: Option>, - payload: Option>, + payload: Vec, mut writer: SourceStreamChunkRowWriter<'_>, ) -> Result<()> { - // if key is empty, set it as vec![] - let key_data = key.unwrap_or_default(); - // if payload is empty, report error - let payload_data = payload.ok_or_else(|| { - RwError::from(ErrorCode::InternalError( - "Empty payload with nonempty key".into(), - )) - })?; + let accessor = self.payload_builder.generate_accessor(payload).await?; - let key_accessor = self.key_builder.generate_accessor(key_data).await?; - let payload_accessor = self.payload_builder.generate_accessor(payload_data).await?; - apply_key_val_accessor_on_stream_chunk_writer( - DEFAULT_KEY_COLUMN_NAME, - key_accessor, - payload_accessor, - &mut writer, - ) - .map_err(Into::into) + apply_row_accessor_on_stream_chunk_writer(accessor, &mut writer).map_err(Into::into) } } @@ -106,10 +82,10 @@ impl ByteStreamSourceParser for PlainParser { async fn parse_one<'a>( &'a mut self, - key: Option>, + _key: Option>, payload: Option>, writer: SourceStreamChunkRowWriter<'a>, ) -> Result<()> { - self.parse_inner(key, payload, writer).await + only_parse_payload!(self, payload, writer) } } diff --git a/src/connector/src/parser/unified/util.rs b/src/connector/src/parser/unified/util.rs index 4117591aa9000..92cf5da3ac81c 100644 --- a/src/connector/src/parser/unified/util.rs +++ b/src/connector/src/parser/unified/util.rs @@ -14,7 +14,7 @@ use risingwave_common::error::{ErrorCode, RwError}; -use super::{Access, AccessError, AccessImpl, AccessResult, ChangeEvent}; +use super::{Access, AccessError, AccessResult, ChangeEvent}; use crate::parser::unified::ChangeEventOperation; use crate::parser::SourceStreamChunkRowWriter; use crate::source::SourceColumnDesc; @@ -46,22 +46,6 @@ pub fn apply_row_accessor_on_stream_chunk_writer( writer.insert(|column| accessor.access(&[&column.name], Some(&column.data_type))) } -pub fn apply_key_val_accessor_on_stream_chunk_writer( - key_column_name: &str, - key_accessor: AccessImpl<'_, '_>, - val_accessor: AccessImpl<'_, '_>, - writer: &mut SourceStreamChunkRowWriter<'_>, -) -> AccessResult<()> { - let f = |column: &SourceColumnDesc| { - if column.name == key_column_name { - key_accessor.access(&[&column.name], Some(&column.data_type)) - } else { - val_accessor.access(&[&column.name], Some(&column.data_type)) - } - }; - writer.insert(f) -} - impl From for RwError { fn from(val: AccessError) -> Self { ErrorCode::InternalError(format!("AccessError: {:?}", val)).into() diff --git a/src/frontend/planner_test/tests/testdata/input/batch_source.yaml b/src/frontend/planner_test/tests/testdata/input/batch_source.yaml index 486ce8bc26d20..5890cc6866b9f 100644 --- a/src/frontend/planner_test/tests/testdata/input/batch_source.yaml +++ b/src/frontend/planner_test/tests/testdata/input/batch_source.yaml @@ -30,19 +30,3 @@ expected_outputs: - batch_plan - logical_plan -- sql: | - insert into s values (1,2, E'\\xDEADBEEF'::bytea); - create_table_with_connector: - format: plain - encode: protobuf - name: s - file: | - syntax = "proto3"; - package test; - message TestRecord { - int32 id = 1; - int32 value = 2; - } - expected_outputs: - - batch_plan - - logical_plan diff --git a/src/frontend/planner_test/tests/testdata/output/batch_source.yaml b/src/frontend/planner_test/tests/testdata/output/batch_source.yaml index 04bc2ad98acfe..8613f28305667 100644 --- a/src/frontend/planner_test/tests/testdata/output/batch_source.yaml +++ b/src/frontend/planner_test/tests/testdata/output/batch_source.yaml @@ -2,12 +2,12 @@ - sql: | select * from s logical_plan: |- - LogicalProject { exprs: [id, value, _rw_key] } - └─LogicalSource { source: s, columns: [id, value, _rw_key, _rw_kafka_timestamp, _row_id], time_range: (Unbounded, Unbounded) } + LogicalProject { exprs: [id, value] } + └─LogicalSource { source: s, columns: [id, value, _rw_kafka_timestamp, _row_id], time_range: (Unbounded, Unbounded) } batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchProject { exprs: [id, value, _rw_key] } - └─BatchSource { source: s, columns: [id, value, _rw_key, _rw_kafka_timestamp, _row_id], filter: (None, None) } + └─BatchProject { exprs: [id, value] } + └─BatchSource { source: s, columns: [id, value, _rw_kafka_timestamp, _row_id], filter: (None, None) } create_source: format: plain encode: protobuf @@ -22,11 +22,11 @@ - sql: | insert into s values (1,2); logical_plan: |- - LogicalInsert { table: s, mapping: [0:0, 1:1], default: [2<-null:Bytea] } + LogicalInsert { table: s, mapping: [0:0, 1:1] } └─LogicalValues { rows: [[1:Int32, 2:Int32]], schema: Schema { fields: [*VALUES*_0.column_0:Int32, *VALUES*_0.column_1:Int32] } } batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchInsert { table: s, mapping: [0:0, 1:1], default: [2<-null:Bytea] } + └─BatchInsert { table: s, mapping: [0:0, 1:1] } └─BatchValues { rows: [[1:Int32, 2:Int32]] } create_table_with_connector: format: plain @@ -39,23 +39,3 @@ int32 id = 1; int32 value = 2; } -- sql: | - insert into s values (1,2, E'\\xDEADBEEF'::bytea); - logical_plan: |- - LogicalInsert { table: s, mapping: [0:0, 1:1, 2:2] } - └─LogicalValues { rows: [[1:Int32, 2:Int32, '\xdeadbeef':Bytea]], schema: Schema { fields: [*VALUES*_0.column_0:Int32, *VALUES*_0.column_1:Int32, *VALUES*_0.column_2:Bytea] } } - batch_plan: |- - BatchExchange { order: [], dist: Single } - └─BatchInsert { table: s, mapping: [0:0, 1:1, 2:2] } - └─BatchValues { rows: [[1:Int32, 2:Int32, '\xdeadbeef':Bytea]] } - create_table_with_connector: - format: plain - encode: protobuf - name: s - file: | - syntax = "proto3"; - package test; - message TestRecord { - int32 id = 1; - int32 value = 2; - } diff --git a/src/frontend/planner_test/tests/testdata/output/struct_query.yaml b/src/frontend/planner_test/tests/testdata/output/struct_query.yaml index f55c30f4436a8..fc80a61cfcf78 100644 --- a/src/frontend/planner_test/tests/testdata/output/struct_query.yaml +++ b/src/frontend/planner_test/tests/testdata/output/struct_query.yaml @@ -36,7 +36,7 @@ select (t.country).city,t.country,(country).city.address from t; logical_plan: |- LogicalProject { exprs: [Field(t.country, 1:Int32) as $expr1, t.country, Field(Field(t.country, 1:Int32), 0:Int32) as $expr2] } - └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._rw_key, t._row_id] } + └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._row_id] } create_source: format: plain encode: protobuf @@ -80,7 +80,7 @@ select (t.country1).city.*,(t.country2).*,(country3).city.* from t; logical_plan: |- LogicalProject { exprs: [Field(Field(t.country1, 1:Int32), 0:Int32) as $expr1, Field(Field(t.country1, 1:Int32), 1:Int32) as $expr2, Field(t.country2, 0:Int32) as $expr3, Field(t.country2, 1:Int32) as $expr4, Field(t.country2, 2:Int32) as $expr5, Field(Field(t.country3, 1:Int32), 0:Int32) as $expr6, Field(Field(t.country3, 1:Int32), 1:Int32) as $expr7] } - └─LogicalScan { table: t, columns: [t.id, t.country1, t.country2, t.country3, t.zipcode, t.rate, t._rw_key, t._row_id] } + └─LogicalScan { table: t, columns: [t.id, t.country1, t.country2, t.country3, t.zipcode, t.rate, t._row_id] } create_source: format: plain encode: protobuf @@ -112,7 +112,7 @@ logical_plan: |- LogicalProject { exprs: [Field($expr1, 1:Int32) as $expr2] } └─LogicalProject { exprs: [Field(t.country, 1:Int32) as $expr1] } - └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._rw_key, t._row_id] } + └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._row_id] } create_source: format: plain encode: protobuf @@ -144,7 +144,7 @@ └─LogicalProject { exprs: [min($expr1)] } └─LogicalAgg { aggs: [min($expr1)] } └─LogicalProject { exprs: [Field(t.country, 1:Int32) as $expr1] } - └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._rw_key, t._row_id] } + └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._row_id] } create_source: format: plain encode: protobuf @@ -171,11 +171,11 @@ create materialized view t as select * from s; select * from (select (country).city as c from t) as vv join t on (c).zipcode=(t.country).zipcode; logical_plan: |- - LogicalProject { exprs: [$expr1, t.id, t.country, t.zipcode, t.rate, t._rw_key] } + LogicalProject { exprs: [$expr1, t.id, t.country, t.zipcode, t.rate] } └─LogicalJoin { type: Inner, on: (Field($expr1, 1:Int32) = Field(t.country, 2:Int32)), output: all } ├─LogicalProject { exprs: [Field(t.country, 1:Int32) as $expr1] } - │ └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._rw_key, t._row_id] } - └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._rw_key, t._row_id] } + │ └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._row_id] } + └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._row_id] } create_source: format: plain encode: protobuf @@ -205,7 +205,7 @@ LogicalProject { exprs: [(min($expr1) + (max($expr1) * count(t.zipcode))) as $expr2] } └─LogicalAgg { aggs: [min($expr1), max($expr1), count(t.zipcode)] } └─LogicalProject { exprs: [Field(Field(t.country, 1:Int32), 0:Int32) as $expr1, t.zipcode] } - └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._rw_key, t._row_id] } + └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._row_id] } create_source: format: plain encode: protobuf @@ -236,7 +236,7 @@ └─LogicalAgg { aggs: [count(1:Int32), count($expr1)] } └─LogicalProject { exprs: [1:Int32, Field(Field(t.country, 1:Int32), 1:Int32) as $expr1] } └─LogicalFilter { predicate: (Field(Field(t.country, 1:Int32), 0:Int32) > 1:Int32) } - └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._rw_key, t._row_id] } + └─LogicalScan { table: t, columns: [t.id, t.country, t.zipcode, t.rate, t._row_id] } create_source: format: plain encode: protobuf @@ -366,7 +366,7 @@ - sql: | insert into s values (1,2,(1,2,(1,2,null))); logical_plan: |- - LogicalInsert { table: s, mapping: [0:0, 1:1, 2:2], default: [3<-null:Bytea] } + LogicalInsert { table: s, mapping: [0:0, 1:1, 2:2] } └─LogicalValues { rows: [[1:Int32, 2:Int32, Row(1:Int32, 2:Int32, Row(1:Int32, 2:Int32, null:Int32))]], schema: Schema { fields: [*VALUES*_0.column_0:Int32, *VALUES*_0.column_1:Int32, *VALUES*_0.column_2:Struct(StructType { field_names: ["v1", "v2", "v3"], field_types: [Int32, Int32, Struct(StructType { field_names: ["v1", "v2", "v3"], field_types: [Int32, Int32, Int32] })] })] } } create_table_with_connector: format: plain @@ -393,9 +393,9 @@ - sql: | select * from s where s.v3 = (1,2,(1,2,3)); logical_plan: |- - LogicalProject { exprs: [s.v1, s.v2, s.v3, s._rw_key] } + LogicalProject { exprs: [s.v1, s.v2, s.v3] } └─LogicalFilter { predicate: (s.v3 = Row(1:Int32, 2:Int32, Row(1:Int32, 2:Int32, 3:Int32))) } - └─LogicalScan { table: s, columns: [s.v1, s.v2, s.v3, s._rw_key, s._row_id] } + └─LogicalScan { table: s, columns: [s.v1, s.v2, s.v3, s._row_id] } create_table_with_connector: format: plain encode: protobuf diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 7c8e597bad3db..7050d0b10ccfb 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -65,7 +65,7 @@ use crate::handler::create_table::{ ensure_table_constraints_supported, ColumnIdGenerator, }; use crate::handler::util::{ - get_connector, is_cdc_connector, is_kafka_connector, is_key_mq_connector, SourceSchemaCompatExt, + get_connector, is_cdc_connector, is_kafka_connector, SourceSchemaCompatExt, }; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext}; @@ -582,7 +582,6 @@ pub(crate) async fn bind_columns_from_source( ); session.notice_to_user(err_string); } - Ok(res) } @@ -694,15 +693,7 @@ pub(crate) async fn bind_source_pk( let sql_defined_pk = !sql_defined_pk_names.is_empty(); let res = match (&source_schema.format, &source_schema.row_encode) { - (Format::Native, Encode::Native) | (Format::Plain, Encode::Json | Encode::Csv) => { - sql_defined_pk_names - } - (Format::Plain, _) => { - if is_key_mq_connector(with_properties) { - add_default_key_column(columns); - } - sql_defined_pk_names - } + (Format::Native, Encode::Native) | (Format::Plain, _) => sql_defined_pk_names, (Format::Upsert, Encode::Json) => { if sql_defined_pk { sql_defined_pk_names @@ -1250,8 +1241,8 @@ pub mod tests { use std::collections::HashMap; use risingwave_common::catalog::{ - CDC_SOURCE_COLUMN_NUM, DEFAULT_DATABASE_NAME, DEFAULT_KEY_COLUMN_NAME, DEFAULT_SCHEMA_NAME, - OFFSET_COLUMN_NAME, ROWID_PREFIX, TABLE_NAME_COLUMN_NAME, + CDC_SOURCE_COLUMN_NUM, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, OFFSET_COLUMN_NAME, + ROWID_PREFIX, TABLE_NAME_COLUMN_NAME, }; use risingwave_common::types::DataType; @@ -1293,7 +1284,6 @@ pub mod tests { ); let expected_columns = maplit::hashmap! { ROWID_PREFIX => DataType::Serial, - DEFAULT_KEY_COLUMN_NAME => DataType::Bytea, "id" => DataType::Int32, "zipcode" => DataType::Int64, "rate" => DataType::Float32, diff --git a/src/frontend/src/handler/show.rs b/src/frontend/src/handler/show.rs index 42ba3b6ad968d..8a7dacfd8a5eb 100644 --- a/src/frontend/src/handler/show.rs +++ b/src/frontend/src/handler/show.rs @@ -499,7 +499,6 @@ mod tests { "country".into() => "test.Country".into(), "_rw_kafka_timestamp".into() => "timestamp with time zone".into(), "_row_id".into() => "serial".into(), - "_rw_key".into() => "bytea".into() }; assert_eq!(columns, expected_columns); diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index 583af954bb2c5..b2a9031027251 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -32,7 +32,7 @@ use risingwave_common::error::{ErrorCode, Result as RwResult}; use risingwave_common::row::Row as _; use risingwave_common::types::{DataType, ScalarRefImpl, Timestamptz}; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_connector::source::{KAFKA_CONNECTOR, KINESIS_CONNECTOR, PULSAR_CONNECTOR}; +use risingwave_connector::source::KAFKA_CONNECTOR; use risingwave_sqlparser::ast::{display_comma_separated, CompatibleSourceSchema, ConnectorSchema}; use crate::catalog::IndexCatalog; @@ -257,18 +257,6 @@ pub fn is_kafka_connector(with_properties: &HashMap) -> bool { connector == KAFKA_CONNECTOR } -#[inline(always)] -pub fn is_key_mq_connector(with_properties: &HashMap) -> bool { - let Some(connector) = get_connector(with_properties) else { - return false; - }; - - matches!( - connector.as_str(), - KINESIS_CONNECTOR | PULSAR_CONNECTOR | KAFKA_CONNECTOR - ) -} - #[inline(always)] pub fn is_cdc_connector(with_properties: &HashMap) -> bool { let Some(connector) = get_connector(with_properties) else {