Skip to content

Commit

Permalink
DatagenEventGenerator with SpecificParserConfig rather than SourceStruct
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu committed Oct 2, 2023
1 parent 4dff44f commit 1c82fd7
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 37 deletions.
27 changes: 2 additions & 25 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ use crate::aws_auth::AwsAuthProps;
use crate::parser::maxwell::MaxwellParser;
use crate::source::{
extract_source_struct, BoxSourceStream, SourceColumnDesc, SourceColumnType, SourceContext,
SourceContextRef, SourceEncode, SourceFormat, SourceMeta, SourceStruct, SourceWithStateStream,
SplitId, StreamChunkWithState,
SourceContextRef, SourceEncode, SourceFormat, SourceMeta, SourceWithStateStream, SplitId,
StreamChunkWithState,
};

mod avro;
Expand Down Expand Up @@ -798,29 +798,6 @@ pub enum ProtocolProperties {
}

impl SpecificParserConfig {
pub fn get_source_struct(&self) -> SourceStruct {
let format = match self.protocol_config {
ProtocolProperties::Debezium => SourceFormat::Debezium,
ProtocolProperties::DebeziumMongo => SourceFormat::DebeziumMongo,
ProtocolProperties::Maxwell => SourceFormat::Maxwell,
ProtocolProperties::Canal => SourceFormat::Canal,
ProtocolProperties::Plain => SourceFormat::Plain,
ProtocolProperties::Upsert => SourceFormat::Upsert,
ProtocolProperties::Native => SourceFormat::Native,
ProtocolProperties::Unspecified => unreachable!(),
};
let encode = match self.encoding_config {
EncodingProperties::Avro(_) => SourceEncode::Avro,
EncodingProperties::Protobuf(_) => SourceEncode::Protobuf,
EncodingProperties::Csv(_) => SourceEncode::Csv,
EncodingProperties::Json(_) => SourceEncode::Json,
EncodingProperties::Bytes(_) => SourceEncode::Bytes,
EncodingProperties::Native => SourceEncode::Native,
EncodingProperties::Unspecified => unreachable!(),
};
SourceStruct { format, encode }
}

// The validity of (format, encode) is ensured by `extract_format_encode`
pub fn new(info: &StreamSourceInfo, props: &HashMap<String, String>) -> Result<Self> {
let source_struct = extract_source_struct(info)?;
Expand Down
28 changes: 17 additions & 11 deletions src/connector/src/source/datagen/source/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ use risingwave_common::row::OwnedRow;
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;

use crate::source::{
SourceEncode, SourceFormat, SourceMessage, SourceMeta, SourceStruct, SplitId,
StreamChunkWithState,
};
use crate::parser::{EncodingProperties, ProtocolProperties, SpecificParserConfig};
use crate::source::{SourceMessage, SourceMeta, SplitId, StreamChunkWithState};

pub enum FieldDesc {
// field is invisible, generate None
Expand All @@ -38,7 +36,7 @@ pub struct DatagenEventGenerator {
// fields_map: HashMap<String, FieldGeneratorImpl>,
field_names: Vec<String>,
fields_vec: Vec<FieldDesc>,
source_struct: SourceStruct,
source_format: SpecificParserConfig,
data_types: Vec<DataType>,
offset: u64,
split_id: SplitId,
Expand All @@ -56,7 +54,7 @@ impl DatagenEventGenerator {
pub fn new(
fields_vec: Vec<FieldDesc>,
field_names: Vec<String>,
source_struct: SourceStruct,
source_format: SpecificParserConfig,
data_types: Vec<DataType>,
rows_per_second: u64,
offset: u64,
Expand All @@ -72,7 +70,7 @@ impl DatagenEventGenerator {
Ok(Self {
field_names,
fields_vec,
source_struct,
source_format,
data_types,
offset,
split_id,
Expand All @@ -96,8 +94,11 @@ impl DatagenEventGenerator {
);
let mut msgs = Vec::with_capacity(num_rows_to_generate as usize);
'outer: for _ in 0..num_rows_to_generate {
let payload = match (self.source_struct.format, self.source_struct.encode) {
(SourceFormat::Plain, SourceEncode::Json) => {
let payload = match (
&self.source_format.protocol_config,
&self.source_format.encoding_config,
) {
(ProtocolProperties::Plain, EncodingProperties::Json(_)) => {
let mut map = serde_json::Map::with_capacity(self.fields_vec.len());
for (name, field_generator) in self
.field_names
Expand Down Expand Up @@ -225,7 +226,6 @@ mod tests {
use futures::stream::StreamExt;

use super::*;
use crate::source::SourceEncode;

async fn check_sequence_partition_result(
split_num: u64,
Expand Down Expand Up @@ -266,7 +266,13 @@ mod tests {
let generator = DatagenEventGenerator::new(
fields_vec,
vec!["c1".to_owned(), "c2".to_owned()],
SourceStruct::new(SourceFormat::Plain, SourceEncode::Json),
SpecificParserConfig {
protocol_config: ProtocolProperties::Plain,
encoding_config: EncodingProperties::Json(crate::parser::JsonProperties {
use_schema_registry: false,
}),
key_encoding_config: None,
},
data_types,
rows_per_second,
0,
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/datagen/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl SplitReader for DatagenSplitReader {
let generator = DatagenEventGenerator::new(
fields_vec,
field_names,
parser_config.specific.get_source_struct(),
parser_config.specific.clone(),
data_types,
rows_per_second,
events_so_far,
Expand Down

0 comments on commit 1c82fd7

Please sign in to comment.