Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(source): prefer SpecificParserConfig over SourceStruct #12602

Merged
merged 4 commits into from
Oct 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use risingwave_connector::source::{
};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_source::connector_source::ConnectorSource;
use risingwave_source::source_desc::extract_source_struct;

use super::Executor;
use crate::error::BatchError;
Expand Down Expand Up @@ -71,9 +70,7 @@ impl BoxedExecutorBuilder for SourceExecutor {
.map_err(|e| RwError::from(ConnectorError(e.into())))?;

let info = source_node.get_info().unwrap();
let source_struct = extract_source_struct(info)?;
let parser_config =
SpecificParserConfig::new(source_struct, info, &source_node.properties)?;
let parser_config = SpecificParserConfig::new(info, &source_node.properties)?;

let columns: Vec<_> = source_node
.columns
Expand Down
11 changes: 5 additions & 6 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ mod test {
use risingwave_common::types::{DataType, Date, Interval, ScalarImpl, Timestamptz};
use risingwave_common::{error, try_match_expand};
use risingwave_pb::catalog::StreamSourceInfo;
use risingwave_pb::plan_common::{PbEncodeType, PbFormatType};
use url::Url;

use super::{
Expand All @@ -232,7 +233,7 @@ mod test {
use crate::parser::{
AccessBuilderImpl, EncodingType, SourceStreamChunkBuilder, SpecificParserConfig,
};
use crate::source::{SourceColumnDesc, SourceEncode, SourceFormat, SourceStruct};
use crate::source::SourceColumnDesc;

fn test_data_path(file_name: &str) -> String {
let curr_dir = env::current_dir().unwrap().into_os_string();
Expand Down Expand Up @@ -304,13 +305,11 @@ mod test {
let info = StreamSourceInfo {
row_schema_location: schema_path.clone(),
use_schema_registry: false,
format: PbFormatType::Plain.into(),
row_encode: PbEncodeType::Avro.into(),
..Default::default()
};
let parser_config = SpecificParserConfig::new(
SourceStruct::new(SourceFormat::Plain, SourceEncode::Avro),
&info,
&HashMap::new(),
)?;
let parser_config = SpecificParserConfig::new(&info, &HashMap::new())?;
AvroParserConfig::new(parser_config.encoding_config).await
}

Expand Down
11 changes: 5 additions & 6 deletions src/connector/src/parser/debezium/avro_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,13 @@ mod tests {
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_pb::catalog::StreamSourceInfo;
use risingwave_pb::plan_common::{PbEncodeType, PbFormatType};

use super::*;
use crate::parser::{
DebeziumAvroParserConfig, DebeziumParser, SourceStreamChunkBuilder, SpecificParserConfig,
};
use crate::source::{SourceColumnDesc, SourceEncode, SourceFormat, SourceStruct};
use crate::source::SourceColumnDesc;

const DEBEZIUM_AVRO_DATA: &[u8] = b"\x00\x00\x00\x00\x06\x00\x02\xd2\x0f\x0a\x53\x61\x6c\x6c\x79\x0c\x54\x68\x6f\x6d\x61\x73\x2a\x73\x61\x6c\x6c\x79\x2e\x74\x68\x6f\x6d\x61\x73\x40\x61\x63\x6d\x65\x2e\x63\x6f\x6d\x16\x32\x2e\x31\x2e\x32\x2e\x46\x69\x6e\x61\x6c\x0a\x6d\x79\x73\x71\x6c\x12\x64\x62\x73\x65\x72\x76\x65\x72\x31\xc0\xb4\xe8\xb7\xc9\x61\x00\x30\x66\x69\x72\x73\x74\x5f\x69\x6e\x5f\x64\x61\x74\x61\x5f\x63\x6f\x6c\x6c\x65\x63\x74\x69\x6f\x6e\x12\x69\x6e\x76\x65\x6e\x74\x6f\x72\x79\x00\x02\x12\x63\x75\x73\x74\x6f\x6d\x65\x72\x73\x00\x00\x20\x6d\x79\x73\x71\x6c\x2d\x62\x69\x6e\x2e\x30\x30\x30\x30\x30\x33\x8c\x06\x00\x00\x00\x02\x72\x02\x92\xc3\xe8\xb7\xc9\x61\x00";

Expand Down Expand Up @@ -298,13 +299,11 @@ mod tests {
));
let info = StreamSourceInfo {
row_schema_location: "http://127.0.0.1:8081".into(),
format: PbFormatType::Debezium.into(),
row_encode: PbEncodeType::Avro.into(),
..Default::default()
};
let parser_config = SpecificParserConfig::new(
SourceStruct::new(SourceFormat::Debezium, SourceEncode::Avro),
&info,
&props,
)?;
let parser_config = SpecificParserConfig::new(&info, &props)?;
let config = DebeziumAvroParserConfig::new(parser_config.clone().encoding_config).await?;
let columns = config
.map_to_columns()?
Expand Down
50 changes: 4 additions & 46 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ use self::util::get_kafka_topic;
use crate::aws_auth::AwsAuthProps;
use crate::parser::maxwell::MaxwellParser;
use crate::source::{
BoxSourceStream, SourceColumnDesc, SourceColumnType, SourceContext, SourceContextRef,
SourceEncode, SourceFormat, SourceMeta, SourceStruct, SourceWithStateStream, SplitId,
extract_source_struct, BoxSourceStream, SourceColumnDesc, SourceColumnType, SourceContext,
SourceContextRef, SourceEncode, SourceFormat, SourceMeta, SourceWithStateStream, SplitId,
StreamChunkWithState,
};

Expand Down Expand Up @@ -870,35 +870,9 @@ pub enum ProtocolProperties {
}

impl SpecificParserConfig {
pub fn get_source_struct(&self) -> SourceStruct {
let format = match self.protocol_config {
ProtocolProperties::Debezium => SourceFormat::Debezium,
ProtocolProperties::DebeziumMongo => SourceFormat::DebeziumMongo,
ProtocolProperties::Maxwell => SourceFormat::Maxwell,
ProtocolProperties::Canal => SourceFormat::Canal,
ProtocolProperties::Plain => SourceFormat::Plain,
ProtocolProperties::Upsert => SourceFormat::Upsert,
ProtocolProperties::Native => SourceFormat::Native,
ProtocolProperties::Unspecified => unreachable!(),
};
let encode = match self.encoding_config {
EncodingProperties::Avro(_) => SourceEncode::Avro,
EncodingProperties::Protobuf(_) => SourceEncode::Protobuf,
EncodingProperties::Csv(_) => SourceEncode::Csv,
EncodingProperties::Json(_) => SourceEncode::Json,
EncodingProperties::Bytes(_) => SourceEncode::Bytes,
EncodingProperties::Native => SourceEncode::Native,
EncodingProperties::Unspecified => unreachable!(),
};
SourceStruct { format, encode }
}

// The validity of (format, encode) is ensured by `extract_format_encode`
pub fn new(
source_struct: SourceStruct,
info: &StreamSourceInfo,
props: &HashMap<String, String>,
) -> Result<Self> {
pub fn new(info: &StreamSourceInfo, props: &HashMap<String, String>) -> Result<Self> {
let source_struct = extract_source_struct(info)?;
let format = source_struct.format;
let encode = source_struct.encode;
// this transformation is needed since there may be config for the protocol
Expand Down Expand Up @@ -1026,19 +1000,3 @@ impl SpecificParserConfig {
})
}
}

impl ParserConfig {
pub fn new(
source_struct: SourceStruct,
info: &StreamSourceInfo,
props: &HashMap<String, String>,
rw_columns: &Vec<SourceColumnDesc>,
) -> Result<Self> {
let common = CommonParserConfig {
rw_columns: rw_columns.to_owned(),
};
let specific = SpecificParserConfig::new(source_struct, info, props)?;

Ok(Self { common, specific })
}
}
36 changes: 13 additions & 23 deletions src/connector/src/parser/protobuf/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,13 +358,13 @@ mod test {
use risingwave_common::types::{DataType, StructType};
use risingwave_pb::catalog::StreamSourceInfo;
use risingwave_pb::data::data_type::PbTypeName;
use risingwave_pb::plan_common::{PbEncodeType, PbFormatType};

use super::*;
use crate::parser::protobuf::recursive::all_types::{EnumType, ExampleOneof, NestedMessage};
use crate::parser::protobuf::recursive::AllTypes;
use crate::parser::unified::Access;
use crate::parser::SpecificParserConfig;
use crate::source::{SourceEncode, SourceFormat, SourceStruct};

fn schema_dir() -> String {
let dir = PathBuf::from("src/test_data");
Expand All @@ -391,13 +391,11 @@ mod test {
proto_message_name: message_name.to_string(),
row_schema_location: location.to_string(),
use_schema_registry: false,
format: PbFormatType::Plain.into(),
row_encode: PbEncodeType::Protobuf.into(),
..Default::default()
};
let parser_config = SpecificParserConfig::new(
SourceStruct::new(SourceFormat::Plain, SourceEncode::Protobuf),
&info,
&HashMap::new(),
)?;
let parser_config = SpecificParserConfig::new(&info, &HashMap::new())?;
let conf = ProtobufParserConfig::new(parser_config.encoding_config).await?;
let value = DynamicMessage::decode(conf.message_descriptor, PRE_GEN_PROTO_DATA).unwrap();

Expand Down Expand Up @@ -438,13 +436,11 @@ mod test {
proto_message_name: message_name.to_string(),
row_schema_location: location.to_string(),
use_schema_registry: false,
format: PbFormatType::Plain.into(),
row_encode: PbEncodeType::Protobuf.into(),
..Default::default()
};
let parser_config = SpecificParserConfig::new(
SourceStruct::new(SourceFormat::Plain, SourceEncode::Protobuf),
&info,
&HashMap::new(),
)?;
let parser_config = SpecificParserConfig::new(&info, &HashMap::new())?;
let conf = ProtobufParserConfig::new(parser_config.encoding_config).await?;
let columns = conf.map_to_columns().unwrap();

Expand Down Expand Up @@ -489,14 +485,11 @@ mod test {
proto_message_name: message_name.to_string(),
row_schema_location: location.to_string(),
use_schema_registry: false,
format: PbFormatType::Plain.into(),
row_encode: PbEncodeType::Protobuf.into(),
..Default::default()
};
let parser_config = SpecificParserConfig::new(
SourceStruct::new(SourceFormat::Plain, SourceEncode::Protobuf),
&info,
&HashMap::new(),
)
.unwrap();
let parser_config = SpecificParserConfig::new(&info, &HashMap::new()).unwrap();
let conf = ProtobufParserConfig::new(parser_config.encoding_config)
.await
.unwrap();
Expand All @@ -518,14 +511,11 @@ mod test {
proto_message_name: message_name.to_string(),
row_schema_location: location.to_string(),
use_schema_registry: false,
format: PbFormatType::Plain.into(),
row_encode: PbEncodeType::Protobuf.into(),
..Default::default()
};
let parser_config = SpecificParserConfig::new(
SourceStruct::new(SourceFormat::Plain, SourceEncode::Protobuf),
&info,
&HashMap::new(),
)
.unwrap();
let parser_config = SpecificParserConfig::new(&info, &HashMap::new()).unwrap();

ProtobufParserConfig::new(parser_config.encoding_config)
.await
Expand Down
63 changes: 62 additions & 1 deletion src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::TableId;
use risingwave_common::error::{ErrorSuppressor, RwError};
use risingwave_common::types::{JsonbVal, Scalar};
use risingwave_pb::catalog::PbSource;
use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo};
use risingwave_pb::source::ConnectorSplit;
use risingwave_rpc_client::ConnectorClient;
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -253,6 +253,67 @@ impl SourceStruct {
}
}

// Only return valid (format, encode)
pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result<SourceStruct> {
use risingwave_pb::plan_common::{PbEncodeType, PbFormatType, RowFormatType};

// old version meta.
if let Ok(format) = info.get_row_format() {
let (format, encode) = match format {
RowFormatType::Json => (SourceFormat::Plain, SourceEncode::Json),
RowFormatType::Protobuf => (SourceFormat::Plain, SourceEncode::Protobuf),
RowFormatType::DebeziumJson => (SourceFormat::Debezium, SourceEncode::Json),
RowFormatType::Avro => (SourceFormat::Plain, SourceEncode::Avro),
RowFormatType::Maxwell => (SourceFormat::Maxwell, SourceEncode::Json),
RowFormatType::CanalJson => (SourceFormat::Canal, SourceEncode::Json),
RowFormatType::Csv => (SourceFormat::Plain, SourceEncode::Csv),
RowFormatType::Native => (SourceFormat::Native, SourceEncode::Native),
RowFormatType::DebeziumAvro => (SourceFormat::Debezium, SourceEncode::Avro),
RowFormatType::UpsertJson => (SourceFormat::Upsert, SourceEncode::Json),
RowFormatType::UpsertAvro => (SourceFormat::Upsert, SourceEncode::Avro),
RowFormatType::DebeziumMongoJson => (SourceFormat::DebeziumMongo, SourceEncode::Json),
RowFormatType::Bytes => (SourceFormat::Plain, SourceEncode::Bytes),
RowFormatType::RowUnspecified => unreachable!(),
};
return Ok(SourceStruct::new(format, encode));
}
let source_format = info.get_format().map_err(|e| anyhow!("{e:?}"))?;
let source_encode = info.get_row_encode().map_err(|e| anyhow!("{e:?}"))?;
let (format, encode) = match (source_format, source_encode) {
(PbFormatType::Plain, PbEncodeType::Json) => (SourceFormat::Plain, SourceEncode::Json),
(PbFormatType::Plain, PbEncodeType::Protobuf) => {
(SourceFormat::Plain, SourceEncode::Protobuf)
}
(PbFormatType::Debezium, PbEncodeType::Json) => {
(SourceFormat::Debezium, SourceEncode::Json)
}
(PbFormatType::Plain, PbEncodeType::Avro) => (SourceFormat::Plain, SourceEncode::Avro),
(PbFormatType::Maxwell, PbEncodeType::Json) => (SourceFormat::Maxwell, SourceEncode::Json),
(PbFormatType::Canal, PbEncodeType::Json) => (SourceFormat::Canal, SourceEncode::Json),
(PbFormatType::Plain, PbEncodeType::Csv) => (SourceFormat::Plain, SourceEncode::Csv),
(PbFormatType::Native, PbEncodeType::Native) => {
(SourceFormat::Native, SourceEncode::Native)
}
(PbFormatType::Debezium, PbEncodeType::Avro) => {
(SourceFormat::Debezium, SourceEncode::Avro)
}
(PbFormatType::Upsert, PbEncodeType::Json) => (SourceFormat::Upsert, SourceEncode::Json),
(PbFormatType::Upsert, PbEncodeType::Avro) => (SourceFormat::Upsert, SourceEncode::Avro),
(PbFormatType::DebeziumMongo, PbEncodeType::Json) => {
(SourceFormat::DebeziumMongo, SourceEncode::Json)
}
(PbFormatType::Plain, PbEncodeType::Bytes) => (SourceFormat::Plain, SourceEncode::Bytes),
(format, encode) => {
return Err(anyhow!(
"Unsupported combination of format {:?} and encode {:?}",
format,
encode
));
}
};
Ok(SourceStruct::new(format, encode))
}

pub type BoxSourceStream = BoxStream<'static, Result<Vec<SourceMessage>>>;

pub trait SourceWithStateStream =
Expand Down
28 changes: 17 additions & 11 deletions src/connector/src/source/datagen/source/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ use risingwave_common::row::OwnedRow;
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;

use crate::source::{
SourceEncode, SourceFormat, SourceMessage, SourceMeta, SourceStruct, SplitId,
StreamChunkWithState,
};
use crate::parser::{EncodingProperties, ProtocolProperties, SpecificParserConfig};
use crate::source::{SourceMessage, SourceMeta, SplitId, StreamChunkWithState};

pub enum FieldDesc {
// field is invisible, generate None
Expand All @@ -38,7 +36,7 @@ pub struct DatagenEventGenerator {
// fields_map: HashMap<String, FieldGeneratorImpl>,
field_names: Vec<String>,
fields_vec: Vec<FieldDesc>,
source_struct: SourceStruct,
source_format: SpecificParserConfig,
data_types: Vec<DataType>,
offset: u64,
split_id: SplitId,
Expand All @@ -56,7 +54,7 @@ impl DatagenEventGenerator {
pub fn new(
fields_vec: Vec<FieldDesc>,
field_names: Vec<String>,
source_struct: SourceStruct,
source_format: SpecificParserConfig,
data_types: Vec<DataType>,
rows_per_second: u64,
offset: u64,
Expand All @@ -72,7 +70,7 @@ impl DatagenEventGenerator {
Ok(Self {
field_names,
fields_vec,
source_struct,
source_format,
data_types,
offset,
split_id,
Expand All @@ -96,8 +94,11 @@ impl DatagenEventGenerator {
);
let mut msgs = Vec::with_capacity(num_rows_to_generate as usize);
'outer: for _ in 0..num_rows_to_generate {
let payload = match (self.source_struct.format, self.source_struct.encode) {
(SourceFormat::Plain, SourceEncode::Json) => {
let payload = match (
&self.source_format.protocol_config,
&self.source_format.encoding_config,
) {
(ProtocolProperties::Plain, EncodingProperties::Json(_)) => {
let mut map = serde_json::Map::with_capacity(self.fields_vec.len());
for (name, field_generator) in self
.field_names
Expand Down Expand Up @@ -225,7 +226,6 @@ mod tests {
use futures::stream::StreamExt;

use super::*;
use crate::source::SourceEncode;

async fn check_sequence_partition_result(
split_num: u64,
Expand Down Expand Up @@ -266,7 +266,13 @@ mod tests {
let generator = DatagenEventGenerator::new(
fields_vec,
vec!["c1".to_owned(), "c2".to_owned()],
SourceStruct::new(SourceFormat::Plain, SourceEncode::Json),
SpecificParserConfig {
protocol_config: ProtocolProperties::Plain,
encoding_config: EncodingProperties::Json(crate::parser::JsonProperties {
use_schema_registry: false,
}),
key_encoding_config: None,
},
data_types,
rows_per_second,
0,
Expand Down
Loading
Loading