diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 25807f8bf65cf..b7a5e37e0846c 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -47,8 +47,8 @@ use crate::aws_auth::AwsAuthProps; use crate::parser::maxwell::MaxwellParser; use crate::source::{ extract_source_struct, BoxSourceStream, SourceColumnDesc, SourceColumnType, SourceContext, - SourceContextRef, SourceEncode, SourceFormat, SourceMeta, SourceStruct, SourceWithStateStream, - SplitId, StreamChunkWithState, + SourceContextRef, SourceEncode, SourceFormat, SourceMeta, SourceWithStateStream, SplitId, + StreamChunkWithState, }; mod avro; @@ -798,29 +798,6 @@ pub enum ProtocolProperties { } impl SpecificParserConfig { - pub fn get_source_struct(&self) -> SourceStruct { - let format = match self.protocol_config { - ProtocolProperties::Debezium => SourceFormat::Debezium, - ProtocolProperties::DebeziumMongo => SourceFormat::DebeziumMongo, - ProtocolProperties::Maxwell => SourceFormat::Maxwell, - ProtocolProperties::Canal => SourceFormat::Canal, - ProtocolProperties::Plain => SourceFormat::Plain, - ProtocolProperties::Upsert => SourceFormat::Upsert, - ProtocolProperties::Native => SourceFormat::Native, - ProtocolProperties::Unspecified => unreachable!(), - }; - let encode = match self.encoding_config { - EncodingProperties::Avro(_) => SourceEncode::Avro, - EncodingProperties::Protobuf(_) => SourceEncode::Protobuf, - EncodingProperties::Csv(_) => SourceEncode::Csv, - EncodingProperties::Json(_) => SourceEncode::Json, - EncodingProperties::Bytes(_) => SourceEncode::Bytes, - EncodingProperties::Native => SourceEncode::Native, - EncodingProperties::Unspecified => unreachable!(), - }; - SourceStruct { format, encode } - } - // The validity of (format, encode) is ensured by `extract_format_encode` pub fn new(info: &StreamSourceInfo, props: &HashMap) -> Result { let source_struct = extract_source_struct(info)?; diff --git a/src/connector/src/source/datagen/source/generator.rs b/src/connector/src/source/datagen/source/generator.rs index c073403a7565d..3e42b07ce4422 100644 --- a/src/connector/src/source/datagen/source/generator.rs +++ b/src/connector/src/source/datagen/source/generator.rs @@ -23,10 +23,8 @@ use risingwave_common::row::OwnedRow; use risingwave_common::types::DataType; use risingwave_common::util::iter_util::ZipEqFast; -use crate::source::{ - SourceEncode, SourceFormat, SourceMessage, SourceMeta, SourceStruct, SplitId, - StreamChunkWithState, -}; +use crate::parser::{EncodingProperties, ProtocolProperties, SpecificParserConfig}; +use crate::source::{SourceMessage, SourceMeta, SplitId, StreamChunkWithState}; pub enum FieldDesc { // field is invisible, generate None @@ -38,7 +36,7 @@ pub struct DatagenEventGenerator { // fields_map: HashMap, field_names: Vec, fields_vec: Vec, - source_struct: SourceStruct, + source_format: SpecificParserConfig, data_types: Vec, offset: u64, split_id: SplitId, @@ -56,7 +54,7 @@ impl DatagenEventGenerator { pub fn new( fields_vec: Vec, field_names: Vec, - source_struct: SourceStruct, + source_format: SpecificParserConfig, data_types: Vec, rows_per_second: u64, offset: u64, @@ -72,7 +70,7 @@ impl DatagenEventGenerator { Ok(Self { field_names, fields_vec, - source_struct, + source_format, data_types, offset, split_id, @@ -96,8 +94,11 @@ impl DatagenEventGenerator { ); let mut msgs = Vec::with_capacity(num_rows_to_generate as usize); 'outer: for _ in 0..num_rows_to_generate { - let payload = match (self.source_struct.format, self.source_struct.encode) { - (SourceFormat::Plain, SourceEncode::Json) => { + let payload = match ( + &self.source_format.protocol_config, + &self.source_format.encoding_config, + ) { + (ProtocolProperties::Plain, EncodingProperties::Json(_)) => { let mut map = serde_json::Map::with_capacity(self.fields_vec.len()); for (name, field_generator) in self .field_names @@ -225,7 +226,6 @@ mod tests { use futures::stream::StreamExt; use super::*; - use crate::source::SourceEncode; async fn check_sequence_partition_result( split_num: u64, @@ -266,7 +266,13 @@ mod tests { let generator = DatagenEventGenerator::new( fields_vec, vec!["c1".to_owned(), "c2".to_owned()], - SourceStruct::new(SourceFormat::Plain, SourceEncode::Json), + SpecificParserConfig { + protocol_config: ProtocolProperties::Plain, + encoding_config: EncodingProperties::Json(crate::parser::JsonProperties { + use_schema_registry: false, + }), + key_encoding_config: None, + }, data_types, rows_per_second, 0, diff --git a/src/connector/src/source/datagen/source/reader.rs b/src/connector/src/source/datagen/source/reader.rs index 3840b0e1f5dbc..bd9f74ee3aa9a 100644 --- a/src/connector/src/source/datagen/source/reader.rs +++ b/src/connector/src/source/datagen/source/reader.rs @@ -120,7 +120,7 @@ impl SplitReader for DatagenSplitReader { let generator = DatagenEventGenerator::new( fields_vec, field_names, - parser_config.specific.get_source_struct(), + parser_config.specific.clone(), data_types, rows_per_second, events_so_far,