diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index 71cac6e917dc..8bf9fc5b7e61 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -30,7 +30,6 @@ use risingwave_connector::source::{ }; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_source::connector_source::ConnectorSource; -use risingwave_source::source_desc::extract_source_struct; use super::Executor; use crate::error::BatchError; @@ -71,9 +70,7 @@ impl BoxedExecutorBuilder for SourceExecutor { .map_err(|e| RwError::from(ConnectorError(e.into())))?; let info = source_node.get_info().unwrap(); - let source_struct = extract_source_struct(info)?; - let parser_config = - SpecificParserConfig::new(source_struct, info, &source_node.properties)?; + let parser_config = SpecificParserConfig::new(info, &source_node.properties)?; let columns: Vec<_> = source_node .columns diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index d583ef83a822..3fcfc01cd958 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -220,6 +220,7 @@ mod test { use risingwave_common::types::{DataType, Date, Interval, ScalarImpl, Timestamptz}; use risingwave_common::{error, try_match_expand}; use risingwave_pb::catalog::StreamSourceInfo; + use risingwave_pb::plan_common::{PbEncodeType, PbFormatType}; use url::Url; use super::{ @@ -232,7 +233,7 @@ mod test { use crate::parser::{ AccessBuilderImpl, EncodingType, SourceStreamChunkBuilder, SpecificParserConfig, }; - use crate::source::{SourceColumnDesc, SourceEncode, SourceFormat, SourceStruct}; + use crate::source::SourceColumnDesc; fn test_data_path(file_name: &str) -> String { let curr_dir = env::current_dir().unwrap().into_os_string(); @@ -304,13 +305,11 @@ mod test { let info = StreamSourceInfo { row_schema_location: schema_path.clone(), use_schema_registry: false, + format: PbFormatType::Plain.into(), + row_encode: PbEncodeType::Avro.into(), ..Default::default() }; - let parser_config = SpecificParserConfig::new( - SourceStruct::new(SourceFormat::Plain, SourceEncode::Avro), - &info, - &HashMap::new(), - )?; + let parser_config = SpecificParserConfig::new(&info, &HashMap::new())?; AvroParserConfig::new(parser_config.encoding_config).await } diff --git a/src/connector/src/parser/debezium/avro_parser.rs b/src/connector/src/parser/debezium/avro_parser.rs index 23161ba635b4..bd573ad3d052 100644 --- a/src/connector/src/parser/debezium/avro_parser.rs +++ b/src/connector/src/parser/debezium/avro_parser.rs @@ -151,12 +151,13 @@ mod tests { use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_pb::catalog::StreamSourceInfo; + use risingwave_pb::plan_common::{PbEncodeType, PbFormatType}; use super::*; use crate::parser::{ DebeziumAvroParserConfig, DebeziumParser, SourceStreamChunkBuilder, SpecificParserConfig, }; - use crate::source::{SourceColumnDesc, SourceEncode, SourceFormat, SourceStruct}; + use crate::source::SourceColumnDesc; const DEBEZIUM_AVRO_DATA: &[u8] = b"\x00\x00\x00\x00\x06\x00\x02\xd2\x0f\x0a\x53\x61\x6c\x6c\x79\x0c\x54\x68\x6f\x6d\x61\x73\x2a\x73\x61\x6c\x6c\x79\x2e\x74\x68\x6f\x6d\x61\x73\x40\x61\x63\x6d\x65\x2e\x63\x6f\x6d\x16\x32\x2e\x31\x2e\x32\x2e\x46\x69\x6e\x61\x6c\x0a\x6d\x79\x73\x71\x6c\x12\x64\x62\x73\x65\x72\x76\x65\x72\x31\xc0\xb4\xe8\xb7\xc9\x61\x00\x30\x66\x69\x72\x73\x74\x5f\x69\x6e\x5f\x64\x61\x74\x61\x5f\x63\x6f\x6c\x6c\x65\x63\x74\x69\x6f\x6e\x12\x69\x6e\x76\x65\x6e\x74\x6f\x72\x79\x00\x02\x12\x63\x75\x73\x74\x6f\x6d\x65\x72\x73\x00\x00\x20\x6d\x79\x73\x71\x6c\x2d\x62\x69\x6e\x2e\x30\x30\x30\x30\x30\x33\x8c\x06\x00\x00\x00\x02\x72\x02\x92\xc3\xe8\xb7\xc9\x61\x00"; @@ -298,13 +299,11 @@ mod tests { )); let info = StreamSourceInfo { row_schema_location: "http://127.0.0.1:8081".into(), + format: PbFormatType::Debezium.into(), + row_encode: PbEncodeType::Avro.into(), ..Default::default() }; - let parser_config = SpecificParserConfig::new( - SourceStruct::new(SourceFormat::Debezium, SourceEncode::Avro), - &info, - &props, - )?; + let parser_config = SpecificParserConfig::new(&info, &props)?; let config = DebeziumAvroParserConfig::new(parser_config.clone().encoding_config).await?; let columns = config .map_to_columns()? diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 2ff661447d1b..c4f955492cc9 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -47,8 +47,8 @@ use self::util::get_kafka_topic; use crate::aws_auth::AwsAuthProps; use crate::parser::maxwell::MaxwellParser; use crate::source::{ - BoxSourceStream, SourceColumnDesc, SourceColumnType, SourceContext, SourceContextRef, - SourceEncode, SourceFormat, SourceMeta, SourceStruct, SourceWithStateStream, SplitId, + extract_source_struct, BoxSourceStream, SourceColumnDesc, SourceColumnType, SourceContext, + SourceContextRef, SourceEncode, SourceFormat, SourceMeta, SourceWithStateStream, SplitId, StreamChunkWithState, }; @@ -870,35 +870,9 @@ pub enum ProtocolProperties { } impl SpecificParserConfig { - pub fn get_source_struct(&self) -> SourceStruct { - let format = match self.protocol_config { - ProtocolProperties::Debezium => SourceFormat::Debezium, - ProtocolProperties::DebeziumMongo => SourceFormat::DebeziumMongo, - ProtocolProperties::Maxwell => SourceFormat::Maxwell, - ProtocolProperties::Canal => SourceFormat::Canal, - ProtocolProperties::Plain => SourceFormat::Plain, - ProtocolProperties::Upsert => SourceFormat::Upsert, - ProtocolProperties::Native => SourceFormat::Native, - ProtocolProperties::Unspecified => unreachable!(), - }; - let encode = match self.encoding_config { - EncodingProperties::Avro(_) => SourceEncode::Avro, - EncodingProperties::Protobuf(_) => SourceEncode::Protobuf, - EncodingProperties::Csv(_) => SourceEncode::Csv, - EncodingProperties::Json(_) => SourceEncode::Json, - EncodingProperties::Bytes(_) => SourceEncode::Bytes, - EncodingProperties::Native => SourceEncode::Native, - EncodingProperties::Unspecified => unreachable!(), - }; - SourceStruct { format, encode } - } - // The validity of (format, encode) is ensured by `extract_format_encode` - pub fn new( - source_struct: SourceStruct, - info: &StreamSourceInfo, - props: &HashMap, - ) -> Result { + pub fn new(info: &StreamSourceInfo, props: &HashMap) -> Result { + let source_struct = extract_source_struct(info)?; let format = source_struct.format; let encode = source_struct.encode; // this transformation is needed since there may be config for the protocol @@ -1026,19 +1000,3 @@ impl SpecificParserConfig { }) } } - -impl ParserConfig { - pub fn new( - source_struct: SourceStruct, - info: &StreamSourceInfo, - props: &HashMap, - rw_columns: &Vec, - ) -> Result { - let common = CommonParserConfig { - rw_columns: rw_columns.to_owned(), - }; - let specific = SpecificParserConfig::new(source_struct, info, props)?; - - Ok(Self { common, specific }) - } -} diff --git a/src/connector/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs index cc8dc0734bdf..b2ee1c23bca0 100644 --- a/src/connector/src/parser/protobuf/parser.rs +++ b/src/connector/src/parser/protobuf/parser.rs @@ -358,13 +358,13 @@ mod test { use risingwave_common::types::{DataType, StructType}; use risingwave_pb::catalog::StreamSourceInfo; use risingwave_pb::data::data_type::PbTypeName; + use risingwave_pb::plan_common::{PbEncodeType, PbFormatType}; use super::*; use crate::parser::protobuf::recursive::all_types::{EnumType, ExampleOneof, NestedMessage}; use crate::parser::protobuf::recursive::AllTypes; use crate::parser::unified::Access; use crate::parser::SpecificParserConfig; - use crate::source::{SourceEncode, SourceFormat, SourceStruct}; fn schema_dir() -> String { let dir = PathBuf::from("src/test_data"); @@ -391,13 +391,11 @@ mod test { proto_message_name: message_name.to_string(), row_schema_location: location.to_string(), use_schema_registry: false, + format: PbFormatType::Plain.into(), + row_encode: PbEncodeType::Protobuf.into(), ..Default::default() }; - let parser_config = SpecificParserConfig::new( - SourceStruct::new(SourceFormat::Plain, SourceEncode::Protobuf), - &info, - &HashMap::new(), - )?; + let parser_config = SpecificParserConfig::new(&info, &HashMap::new())?; let conf = ProtobufParserConfig::new(parser_config.encoding_config).await?; let value = DynamicMessage::decode(conf.message_descriptor, PRE_GEN_PROTO_DATA).unwrap(); @@ -438,13 +436,11 @@ mod test { proto_message_name: message_name.to_string(), row_schema_location: location.to_string(), use_schema_registry: false, + format: PbFormatType::Plain.into(), + row_encode: PbEncodeType::Protobuf.into(), ..Default::default() }; - let parser_config = SpecificParserConfig::new( - SourceStruct::new(SourceFormat::Plain, SourceEncode::Protobuf), - &info, - &HashMap::new(), - )?; + let parser_config = SpecificParserConfig::new(&info, &HashMap::new())?; let conf = ProtobufParserConfig::new(parser_config.encoding_config).await?; let columns = conf.map_to_columns().unwrap(); @@ -489,14 +485,11 @@ mod test { proto_message_name: message_name.to_string(), row_schema_location: location.to_string(), use_schema_registry: false, + format: PbFormatType::Plain.into(), + row_encode: PbEncodeType::Protobuf.into(), ..Default::default() }; - let parser_config = SpecificParserConfig::new( - SourceStruct::new(SourceFormat::Plain, SourceEncode::Protobuf), - &info, - &HashMap::new(), - ) - .unwrap(); + let parser_config = SpecificParserConfig::new(&info, &HashMap::new()).unwrap(); let conf = ProtobufParserConfig::new(parser_config.encoding_config) .await .unwrap(); @@ -518,14 +511,11 @@ mod test { proto_message_name: message_name.to_string(), row_schema_location: location.to_string(), use_schema_registry: false, + format: PbFormatType::Plain.into(), + row_encode: PbEncodeType::Protobuf.into(), ..Default::default() }; - let parser_config = SpecificParserConfig::new( - SourceStruct::new(SourceFormat::Plain, SourceEncode::Protobuf), - &info, - &HashMap::new(), - ) - .unwrap(); + let parser_config = SpecificParserConfig::new(&info, &HashMap::new()).unwrap(); ProtobufParserConfig::new(parser_config.encoding_config) .await diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index a0c0aee86592..6a8cd12ce9fa 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -28,7 +28,7 @@ use risingwave_common::array::StreamChunk; use risingwave_common::catalog::TableId; use risingwave_common::error::{ErrorSuppressor, RwError}; use risingwave_common::types::{JsonbVal, Scalar}; -use risingwave_pb::catalog::PbSource; +use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo}; use risingwave_pb::source::ConnectorSplit; use risingwave_rpc_client::ConnectorClient; use serde::de::DeserializeOwned; @@ -253,6 +253,67 @@ impl SourceStruct { } } +// Only return valid (format, encode) +pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result { + use risingwave_pb::plan_common::{PbEncodeType, PbFormatType, RowFormatType}; + + // old version meta. + if let Ok(format) = info.get_row_format() { + let (format, encode) = match format { + RowFormatType::Json => (SourceFormat::Plain, SourceEncode::Json), + RowFormatType::Protobuf => (SourceFormat::Plain, SourceEncode::Protobuf), + RowFormatType::DebeziumJson => (SourceFormat::Debezium, SourceEncode::Json), + RowFormatType::Avro => (SourceFormat::Plain, SourceEncode::Avro), + RowFormatType::Maxwell => (SourceFormat::Maxwell, SourceEncode::Json), + RowFormatType::CanalJson => (SourceFormat::Canal, SourceEncode::Json), + RowFormatType::Csv => (SourceFormat::Plain, SourceEncode::Csv), + RowFormatType::Native => (SourceFormat::Native, SourceEncode::Native), + RowFormatType::DebeziumAvro => (SourceFormat::Debezium, SourceEncode::Avro), + RowFormatType::UpsertJson => (SourceFormat::Upsert, SourceEncode::Json), + RowFormatType::UpsertAvro => (SourceFormat::Upsert, SourceEncode::Avro), + RowFormatType::DebeziumMongoJson => (SourceFormat::DebeziumMongo, SourceEncode::Json), + RowFormatType::Bytes => (SourceFormat::Plain, SourceEncode::Bytes), + RowFormatType::RowUnspecified => unreachable!(), + }; + return Ok(SourceStruct::new(format, encode)); + } + let source_format = info.get_format().map_err(|e| anyhow!("{e:?}"))?; + let source_encode = info.get_row_encode().map_err(|e| anyhow!("{e:?}"))?; + let (format, encode) = match (source_format, source_encode) { + (PbFormatType::Plain, PbEncodeType::Json) => (SourceFormat::Plain, SourceEncode::Json), + (PbFormatType::Plain, PbEncodeType::Protobuf) => { + (SourceFormat::Plain, SourceEncode::Protobuf) + } + (PbFormatType::Debezium, PbEncodeType::Json) => { + (SourceFormat::Debezium, SourceEncode::Json) + } + (PbFormatType::Plain, PbEncodeType::Avro) => (SourceFormat::Plain, SourceEncode::Avro), + (PbFormatType::Maxwell, PbEncodeType::Json) => (SourceFormat::Maxwell, SourceEncode::Json), + (PbFormatType::Canal, PbEncodeType::Json) => (SourceFormat::Canal, SourceEncode::Json), + (PbFormatType::Plain, PbEncodeType::Csv) => (SourceFormat::Plain, SourceEncode::Csv), + (PbFormatType::Native, PbEncodeType::Native) => { + (SourceFormat::Native, SourceEncode::Native) + } + (PbFormatType::Debezium, PbEncodeType::Avro) => { + (SourceFormat::Debezium, SourceEncode::Avro) + } + (PbFormatType::Upsert, PbEncodeType::Json) => (SourceFormat::Upsert, SourceEncode::Json), + (PbFormatType::Upsert, PbEncodeType::Avro) => (SourceFormat::Upsert, SourceEncode::Avro), + (PbFormatType::DebeziumMongo, PbEncodeType::Json) => { + (SourceFormat::DebeziumMongo, SourceEncode::Json) + } + (PbFormatType::Plain, PbEncodeType::Bytes) => (SourceFormat::Plain, SourceEncode::Bytes), + (format, encode) => { + return Err(anyhow!( + "Unsupported combination of format {:?} and encode {:?}", + format, + encode + )); + } + }; + Ok(SourceStruct::new(format, encode)) +} + pub type BoxSourceStream = BoxStream<'static, Result>>; pub trait SourceWithStateStream = diff --git a/src/connector/src/source/datagen/source/generator.rs b/src/connector/src/source/datagen/source/generator.rs index c073403a7565..3e42b07ce442 100644 --- a/src/connector/src/source/datagen/source/generator.rs +++ b/src/connector/src/source/datagen/source/generator.rs @@ -23,10 +23,8 @@ use risingwave_common::row::OwnedRow; use risingwave_common::types::DataType; use risingwave_common::util::iter_util::ZipEqFast; -use crate::source::{ - SourceEncode, SourceFormat, SourceMessage, SourceMeta, SourceStruct, SplitId, - StreamChunkWithState, -}; +use crate::parser::{EncodingProperties, ProtocolProperties, SpecificParserConfig}; +use crate::source::{SourceMessage, SourceMeta, SplitId, StreamChunkWithState}; pub enum FieldDesc { // field is invisible, generate None @@ -38,7 +36,7 @@ pub struct DatagenEventGenerator { // fields_map: HashMap, field_names: Vec, fields_vec: Vec, - source_struct: SourceStruct, + source_format: SpecificParserConfig, data_types: Vec, offset: u64, split_id: SplitId, @@ -56,7 +54,7 @@ impl DatagenEventGenerator { pub fn new( fields_vec: Vec, field_names: Vec, - source_struct: SourceStruct, + source_format: SpecificParserConfig, data_types: Vec, rows_per_second: u64, offset: u64, @@ -72,7 +70,7 @@ impl DatagenEventGenerator { Ok(Self { field_names, fields_vec, - source_struct, + source_format, data_types, offset, split_id, @@ -96,8 +94,11 @@ impl DatagenEventGenerator { ); let mut msgs = Vec::with_capacity(num_rows_to_generate as usize); 'outer: for _ in 0..num_rows_to_generate { - let payload = match (self.source_struct.format, self.source_struct.encode) { - (SourceFormat::Plain, SourceEncode::Json) => { + let payload = match ( + &self.source_format.protocol_config, + &self.source_format.encoding_config, + ) { + (ProtocolProperties::Plain, EncodingProperties::Json(_)) => { let mut map = serde_json::Map::with_capacity(self.fields_vec.len()); for (name, field_generator) in self .field_names @@ -225,7 +226,6 @@ mod tests { use futures::stream::StreamExt; use super::*; - use crate::source::SourceEncode; async fn check_sequence_partition_result( split_num: u64, @@ -266,7 +266,13 @@ mod tests { let generator = DatagenEventGenerator::new( fields_vec, vec!["c1".to_owned(), "c2".to_owned()], - SourceStruct::new(SourceFormat::Plain, SourceEncode::Json), + SpecificParserConfig { + protocol_config: ProtocolProperties::Plain, + encoding_config: EncodingProperties::Json(crate::parser::JsonProperties { + use_schema_registry: false, + }), + key_encoding_config: None, + }, data_types, rows_per_second, 0, diff --git a/src/connector/src/source/datagen/source/reader.rs b/src/connector/src/source/datagen/source/reader.rs index 3840b0e1f5db..bd9f74ee3aa9 100644 --- a/src/connector/src/source/datagen/source/reader.rs +++ b/src/connector/src/source/datagen/source/reader.rs @@ -120,7 +120,7 @@ impl SplitReader for DatagenSplitReader { let generator = DatagenEventGenerator::new( fields_vec, field_names, - parser_config.specific.get_source_struct(), + parser_config.specific.clone(), data_types, rows_per_second, events_so_far, diff --git a/src/frontend/src/handler/alter_source_column.rs b/src/frontend/src/handler/alter_source_column.rs index 6e13a16185bf..385a1010b50c 100644 --- a/src/frontend/src/handler/alter_source_column.rs +++ b/src/frontend/src/handler/alter_source_column.rs @@ -16,8 +16,7 @@ use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::catalog::ColumnId; use risingwave_common::error::{ErrorCode, Result, RwError}; -use risingwave_connector::source::{SourceEncode, SourceStruct}; -use risingwave_source::source_desc::extract_source_struct; +use risingwave_connector::source::{extract_source_struct, SourceEncode, SourceStruct}; use risingwave_sqlparser::ast::{ AlterSourceOperation, ColumnDef, CreateSourceStatement, ObjectName, Statement, }; diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index d585caf70a92..bb02553e2cd9 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -37,9 +37,8 @@ use risingwave_connector::source::datagen::DATAGEN_CONNECTOR; use risingwave_connector::source::nexmark::source::{get_event_data_types_with_names, EventType}; use risingwave_connector::source::test_source::TEST_CONNECTOR; use risingwave_connector::source::{ - SourceEncode, SourceFormat, SourceStruct, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR, - KINESIS_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR, - S3_V2_CONNECTOR, + GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR, + PULSAR_CONNECTOR, S3_CONNECTOR, S3_V2_CONNECTOR, }; use risingwave_pb::catalog::{ PbSchemaRegistryNameStrategy, PbSource, StreamSourceInfo, WatermarkDesc, @@ -99,11 +98,7 @@ async fn extract_avro_table_schema( info: &StreamSourceInfo, with_properties: &HashMap, ) -> Result> { - let parser_config = SpecificParserConfig::new( - SourceStruct::new(SourceFormat::Plain, SourceEncode::Avro), - info, - with_properties, - )?; + let parser_config = SpecificParserConfig::new(info, with_properties)?; let conf = AvroParserConfig::new(parser_config.encoding_config).await?; let vec_column_desc = conf.map_to_columns()?; Ok(vec_column_desc @@ -120,11 +115,7 @@ async fn extract_upsert_avro_table_schema( info: &StreamSourceInfo, with_properties: &HashMap, ) -> Result<(Vec, Vec)> { - let parser_config = SpecificParserConfig::new( - SourceStruct::new(SourceFormat::Upsert, SourceEncode::Avro), - info, - with_properties, - )?; + let parser_config = SpecificParserConfig::new(info, with_properties)?; let conf = AvroParserConfig::new(parser_config.encoding_config).await?; let vec_column_desc = conf.map_to_columns()?; let mut vec_column_catalog = vec_column_desc @@ -164,11 +155,7 @@ async fn extract_debezium_avro_table_pk_columns( info: &StreamSourceInfo, with_properties: &HashMap, ) -> Result> { - let parser_config = SpecificParserConfig::new( - SourceStruct::new(SourceFormat::Debezium, SourceEncode::Avro), - info, - with_properties, - )?; + let parser_config = SpecificParserConfig::new(info, with_properties)?; let conf = DebeziumAvroParserConfig::new(parser_config.encoding_config).await?; Ok(conf.extract_pks()?.drain(..).map(|c| c.name).collect()) } @@ -178,11 +165,7 @@ async fn extract_debezium_avro_table_schema( info: &StreamSourceInfo, with_properties: &HashMap, ) -> Result> { - let parser_config = SpecificParserConfig::new( - SourceStruct::new(SourceFormat::Debezium, SourceEncode::Avro), - info, - with_properties, - )?; + let parser_config = SpecificParserConfig::new(info, with_properties)?; let conf = DebeziumAvroParserConfig::new(parser_config.encoding_config).await?; let vec_column_desc = conf.map_to_columns()?; let column_catalog = vec_column_desc @@ -204,13 +187,11 @@ async fn extract_protobuf_table_schema( proto_message_name: schema.message_name.0.clone(), row_schema_location: schema.row_schema_location.0.clone(), use_schema_registry: schema.use_schema_registry, + format: FormatType::Plain.into(), + row_encode: EncodeType::Protobuf.into(), ..Default::default() }; - let parser_config = SpecificParserConfig::new( - SourceStruct::new(SourceFormat::Plain, SourceEncode::Protobuf), - &info, - &with_properties, - )?; + let parser_config = SpecificParserConfig::new(&info, &with_properties)?; let conf = ProtobufParserConfig::new(parser_config.encoding_config).await?; let column_descs = conf.map_to_columns()?; diff --git a/src/source/src/source_desc.rs b/src/source/src/source_desc.rs index bf28c00e42f1..161bbc41ceb6 100644 --- a/src/source/src/source_desc.rs +++ b/src/source/src/source_desc.rs @@ -18,15 +18,12 @@ use std::sync::Arc; use risingwave_common::catalog::ColumnDesc; use risingwave_common::error::ErrorCode::ProtocolError; use risingwave_common::error::{Result, RwError}; -use risingwave_connector::parser::SpecificParserConfig; +use risingwave_connector::parser::{EncodingProperties, ProtocolProperties, SpecificParserConfig}; use risingwave_connector::source::monitor::SourceMetrics; -use risingwave_connector::source::{ - ConnectorProperties, SourceColumnDesc, SourceColumnType, SourceEncode, SourceFormat, - SourceStruct, -}; +use risingwave_connector::source::{ConnectorProperties, SourceColumnDesc, SourceColumnType}; use risingwave_connector::ConnectorParams; use risingwave_pb::catalog::PbStreamSourceInfo; -use risingwave_pb::plan_common::{PbColumnCatalog, PbEncodeType, PbFormatType, RowFormatType}; +use risingwave_pb::plan_common::PbColumnCatalog; use crate::connector_source::ConnectorSource; use crate::fs_connector_source::FsConnectorSource; @@ -37,7 +34,6 @@ pub const DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE: usize = 16; #[derive(Debug, Clone)] pub struct SourceDesc { pub source: ConnectorSource, - pub source_struct: SourceStruct, pub columns: Vec, pub metrics: Arc, @@ -48,7 +44,6 @@ pub struct SourceDesc { #[derive(Debug)] pub struct FsSourceDesc { pub source: FsConnectorSource, - pub source_struct: SourceStruct, pub columns: Vec, pub metrics: Arc, } @@ -107,9 +102,7 @@ impl SourceDescBuilder { pub fn build(mut self) -> Result { let columns = self.column_catalogs_to_source_column_descs(); - let source_struct = extract_source_struct(&self.source_info)?; - let psrser_config = - SpecificParserConfig::new(source_struct, &self.source_info, &self.properties)?; + let psrser_config = SpecificParserConfig::new(&self.source_info, &self.properties)?; let is_new_fs_source = ConnectorProperties::is_new_fs_connector_hash_map(&self.properties); if is_new_fs_source { @@ -126,7 +119,6 @@ impl SourceDescBuilder { Ok(SourceDesc { source, - source_struct, columns, metrics: self.metrics, is_new_fs_source, @@ -138,9 +130,16 @@ impl SourceDescBuilder { } pub fn build_fs_source_desc(&self) -> Result { - let source_struct = extract_source_struct(&self.source_info)?; - match (source_struct.format, source_struct.encode) { - (SourceFormat::Plain, SourceEncode::Csv | SourceEncode::Json) => {} + let parser_config = SpecificParserConfig::new(&self.source_info, &self.properties)?; + + match ( + &parser_config.protocol_config, + &parser_config.encoding_config, + ) { + ( + ProtocolProperties::Plain, + EncodingProperties::Csv(_) | EncodingProperties::Json(_), + ) => {} (format, encode) => { return Err(RwError::from(ProtocolError(format!( "Unsupported combination of format {:?} and encode {:?}", @@ -151,9 +150,6 @@ impl SourceDescBuilder { let columns = self.column_catalogs_to_source_column_descs(); - let parser_config = - SpecificParserConfig::new(source_struct, &self.source_info, &self.properties)?; - let source = FsConnectorSource::new( self.properties.clone(), columns.clone(), @@ -166,71 +162,12 @@ impl SourceDescBuilder { Ok(FsSourceDesc { source, - source_struct, columns, metrics: self.metrics.clone(), }) } } -// Only return valid (format, encode) -pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result { - // old version meta. - if let Ok(format) = info.get_row_format() { - let (format, encode) = match format { - RowFormatType::Json => (SourceFormat::Plain, SourceEncode::Json), - RowFormatType::Protobuf => (SourceFormat::Plain, SourceEncode::Protobuf), - RowFormatType::DebeziumJson => (SourceFormat::Debezium, SourceEncode::Json), - RowFormatType::Avro => (SourceFormat::Plain, SourceEncode::Avro), - RowFormatType::Maxwell => (SourceFormat::Maxwell, SourceEncode::Json), - RowFormatType::CanalJson => (SourceFormat::Canal, SourceEncode::Json), - RowFormatType::Csv => (SourceFormat::Plain, SourceEncode::Csv), - RowFormatType::Native => (SourceFormat::Native, SourceEncode::Native), - RowFormatType::DebeziumAvro => (SourceFormat::Debezium, SourceEncode::Avro), - RowFormatType::UpsertJson => (SourceFormat::Upsert, SourceEncode::Json), - RowFormatType::UpsertAvro => (SourceFormat::Upsert, SourceEncode::Avro), - RowFormatType::DebeziumMongoJson => (SourceFormat::DebeziumMongo, SourceEncode::Json), - RowFormatType::Bytes => (SourceFormat::Plain, SourceEncode::Bytes), - RowFormatType::RowUnspecified => unreachable!(), - }; - return Ok(SourceStruct::new(format, encode)); - } - let source_format = info.get_format()?; - let source_encode = info.get_row_encode()?; - let (format, encode) = match (source_format, source_encode) { - (PbFormatType::Plain, PbEncodeType::Json) => (SourceFormat::Plain, SourceEncode::Json), - (PbFormatType::Plain, PbEncodeType::Protobuf) => { - (SourceFormat::Plain, SourceEncode::Protobuf) - } - (PbFormatType::Debezium, PbEncodeType::Json) => { - (SourceFormat::Debezium, SourceEncode::Json) - } - (PbFormatType::Plain, PbEncodeType::Avro) => (SourceFormat::Plain, SourceEncode::Avro), - (PbFormatType::Maxwell, PbEncodeType::Json) => (SourceFormat::Maxwell, SourceEncode::Json), - (PbFormatType::Canal, PbEncodeType::Json) => (SourceFormat::Canal, SourceEncode::Json), - (PbFormatType::Plain, PbEncodeType::Csv) => (SourceFormat::Plain, SourceEncode::Csv), - (PbFormatType::Native, PbEncodeType::Native) => { - (SourceFormat::Native, SourceEncode::Native) - } - (PbFormatType::Debezium, PbEncodeType::Avro) => { - (SourceFormat::Debezium, SourceEncode::Avro) - } - (PbFormatType::Upsert, PbEncodeType::Json) => (SourceFormat::Upsert, SourceEncode::Json), - (PbFormatType::Upsert, PbEncodeType::Avro) => (SourceFormat::Upsert, SourceEncode::Avro), - (PbFormatType::DebeziumMongo, PbEncodeType::Json) => { - (SourceFormat::DebeziumMongo, SourceEncode::Json) - } - (PbFormatType::Plain, PbEncodeType::Bytes) => (SourceFormat::Plain, SourceEncode::Bytes), - (format, encode) => { - return Err(RwError::from(ProtocolError(format!( - "Unsupported combination of format {:?} and encode {:?}", - format, encode - )))); - } - }; - Ok(SourceStruct::new(format, encode)) -} - pub mod test_utils { use std::collections::HashMap;