diff --git a/src/source/src/source_desc.rs b/src/source/src/source_desc.rs index 4d4b9f9cb5b80..652935ff60789 100644 --- a/src/source/src/source_desc.rs +++ b/src/source/src/source_desc.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use risingwave_common::catalog::ColumnDesc; use risingwave_common::error::ErrorCode::ProtocolError; use risingwave_common::error::{Result, RwError}; -use risingwave_connector::parser::SpecificParserConfig; +use risingwave_connector::parser::{EncodingProperties, ProtocolProperties, SpecificParserConfig}; use risingwave_connector::source::monitor::SourceMetrics; use risingwave_connector::source::{ SourceColumnDesc, SourceColumnType, SourceEncode, SourceFormat, SourceStruct, @@ -36,7 +36,6 @@ pub const DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE: usize = 16; #[derive(Debug)] pub struct SourceDesc { pub source: ConnectorSource, - pub source_struct: SourceStruct, pub columns: Vec, pub metrics: Arc, } @@ -45,7 +44,6 @@ pub struct SourceDesc { #[derive(Debug)] pub struct FsSourceDesc { pub source: FsConnectorSource, - pub source_struct: SourceStruct, pub columns: Vec, pub metrics: Arc, } @@ -117,7 +115,6 @@ impl SourceDescBuilder { Ok(SourceDesc { source, - source_struct, columns, metrics: self.metrics, }) @@ -129,8 +126,17 @@ impl SourceDescBuilder { pub fn build_fs_source_desc(&self) -> Result { let source_struct = extract_source_struct(&self.source_info)?; - match (source_struct.format, source_struct.encode) { - (SourceFormat::Plain, SourceEncode::Csv | SourceEncode::Json) => {} + let parser_config = + SpecificParserConfig::new(source_struct, &self.source_info, &self.properties)?; + + match ( + &parser_config.protocol_config, + &parser_config.encoding_config, + ) { + ( + ProtocolProperties::Plain, + EncodingProperties::Csv(_) | EncodingProperties::Json(_), + ) => {} (format, encode) => { return Err(RwError::from(ProtocolError(format!( "Unsupported combination of format {:?} and encode {:?}", @@ -141,9 +147,6 @@ impl SourceDescBuilder { let columns = self.column_catalogs_to_source_column_descs(); - let parser_config = - SpecificParserConfig::new(source_struct, &self.source_info, &self.properties)?; - let source = FsConnectorSource::new( self.properties.clone(), columns.clone(), @@ -156,7 +159,6 @@ impl SourceDescBuilder { Ok(FsSourceDesc { source, - source_struct, columns, metrics: self.metrics.clone(), })