Skip to content

Commit

Permalink
SpecificParserConfig::new without SourceStruct
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu committed Oct 2, 2023
1 parent 3043976 commit 4dff44f
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 161 deletions.
5 changes: 1 addition & 4 deletions src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use risingwave_connector::source::{
};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_source::connector_source::ConnectorSource;
use risingwave_source::source_desc::extract_source_struct;

use super::Executor;
use crate::error::BatchError;
Expand Down Expand Up @@ -71,9 +70,7 @@ impl BoxedExecutorBuilder for SourceExecutor {
.map_err(|e| RwError::from(ConnectorError(e.into())))?;

let info = source_node.get_info().unwrap();
let source_struct = extract_source_struct(info)?;
let parser_config =
SpecificParserConfig::new(source_struct, info, &source_node.properties)?;
let parser_config = SpecificParserConfig::new(info, &source_node.properties)?;

let columns: Vec<_> = source_node
.columns
Expand Down
11 changes: 5 additions & 6 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ mod test {
use risingwave_common::types::{DataType, Date, Interval, ScalarImpl, Timestamptz};
use risingwave_common::{error, try_match_expand};
use risingwave_pb::catalog::StreamSourceInfo;
use risingwave_pb::plan_common::{PbEncodeType, PbFormatType};
use url::Url;

use super::{
Expand All @@ -222,7 +223,7 @@ mod test {
use crate::parser::{
AccessBuilderImpl, EncodingType, SourceStreamChunkBuilder, SpecificParserConfig,
};
use crate::source::{SourceColumnDesc, SourceEncode, SourceFormat, SourceStruct};
use crate::source::SourceColumnDesc;

fn test_data_path(file_name: &str) -> String {
let curr_dir = env::current_dir().unwrap().into_os_string();
Expand Down Expand Up @@ -294,13 +295,11 @@ mod test {
let info = StreamSourceInfo {
row_schema_location: schema_path.clone(),
use_schema_registry: false,
format: PbFormatType::Plain.into(),
row_encode: PbEncodeType::Avro.into(),
..Default::default()
};
let parser_config = SpecificParserConfig::new(
SourceStruct::new(SourceFormat::Plain, SourceEncode::Avro),
&info,
&HashMap::new(),
)?;
let parser_config = SpecificParserConfig::new(&info, &HashMap::new())?;
AvroParserConfig::new(parser_config.encoding_config).await
}

Expand Down
11 changes: 5 additions & 6 deletions src/connector/src/parser/debezium/avro_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,13 @@ mod tests {
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_pb::catalog::StreamSourceInfo;
use risingwave_pb::plan_common::{PbEncodeType, PbFormatType};

use super::*;
use crate::parser::{
DebeziumAvroParserConfig, DebeziumParser, SourceStreamChunkBuilder, SpecificParserConfig,
};
use crate::source::{SourceColumnDesc, SourceEncode, SourceFormat, SourceStruct};
use crate::source::SourceColumnDesc;

const DEBEZIUM_AVRO_DATA: &[u8] = b"\x00\x00\x00\x00\x06\x00\x02\xd2\x0f\x0a\x53\x61\x6c\x6c\x79\x0c\x54\x68\x6f\x6d\x61\x73\x2a\x73\x61\x6c\x6c\x79\x2e\x74\x68\x6f\x6d\x61\x73\x40\x61\x63\x6d\x65\x2e\x63\x6f\x6d\x16\x32\x2e\x31\x2e\x32\x2e\x46\x69\x6e\x61\x6c\x0a\x6d\x79\x73\x71\x6c\x12\x64\x62\x73\x65\x72\x76\x65\x72\x31\xc0\xb4\xe8\xb7\xc9\x61\x00\x30\x66\x69\x72\x73\x74\x5f\x69\x6e\x5f\x64\x61\x74\x61\x5f\x63\x6f\x6c\x6c\x65\x63\x74\x69\x6f\x6e\x12\x69\x6e\x76\x65\x6e\x74\x6f\x72\x79\x00\x02\x12\x63\x75\x73\x74\x6f\x6d\x65\x72\x73\x00\x00\x20\x6d\x79\x73\x71\x6c\x2d\x62\x69\x6e\x2e\x30\x30\x30\x30\x30\x33\x8c\x06\x00\x00\x00\x02\x72\x02\x92\xc3\xe8\xb7\xc9\x61\x00";

Expand Down Expand Up @@ -297,13 +298,11 @@ mod tests {
));
let info = StreamSourceInfo {
row_schema_location: "http://127.0.0.1:8081".into(),
format: PbFormatType::Debezium.into(),
row_encode: PbEncodeType::Avro.into(),
..Default::default()
};
let parser_config = SpecificParserConfig::new(
SourceStruct::new(SourceFormat::Debezium, SourceEncode::Avro),
&info,
&props,
)?;
let parser_config = SpecificParserConfig::new(&info, &props)?;
let config = DebeziumAvroParserConfig::new(parser_config.clone().encoding_config).await?;
let columns = config
.map_to_columns()?
Expand Down
29 changes: 5 additions & 24 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ use self::util::get_kafka_topic;
use crate::aws_auth::AwsAuthProps;
use crate::parser::maxwell::MaxwellParser;
use crate::source::{
BoxSourceStream, SourceColumnDesc, SourceColumnType, SourceContext, SourceContextRef,
SourceEncode, SourceFormat, SourceMeta, SourceStruct, SourceWithStateStream, SplitId,
StreamChunkWithState,
extract_source_struct, BoxSourceStream, SourceColumnDesc, SourceColumnType, SourceContext,
SourceContextRef, SourceEncode, SourceFormat, SourceMeta, SourceStruct, SourceWithStateStream,
SplitId, StreamChunkWithState,
};

mod avro;
Expand Down Expand Up @@ -822,11 +822,8 @@ impl SpecificParserConfig {
}

// The validity of (format, encode) is ensured by `extract_format_encode`
pub fn new(
source_struct: SourceStruct,
info: &StreamSourceInfo,
props: &HashMap<String, String>,
) -> Result<Self> {
pub fn new(info: &StreamSourceInfo, props: &HashMap<String, String>) -> Result<Self> {
let source_struct = extract_source_struct(info)?;
let format = source_struct.format;
let encode = source_struct.encode;
// this transformation is needed since there may be config for the protocol
Expand Down Expand Up @@ -954,19 +951,3 @@ impl SpecificParserConfig {
})
}
}

impl ParserConfig {
pub fn new(
source_struct: SourceStruct,
info: &StreamSourceInfo,
props: &HashMap<String, String>,
rw_columns: &Vec<SourceColumnDesc>,
) -> Result<Self> {
let common = CommonParserConfig {
rw_columns: rw_columns.to_owned(),
};
let specific = SpecificParserConfig::new(source_struct, info, props)?;

Ok(Self { common, specific })
}
}
36 changes: 13 additions & 23 deletions src/connector/src/parser/protobuf/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,13 +356,13 @@ mod test {
use risingwave_common::types::{DataType, StructType};
use risingwave_pb::catalog::StreamSourceInfo;
use risingwave_pb::data::data_type::PbTypeName;
use risingwave_pb::plan_common::{PbEncodeType, PbFormatType};

use super::*;
use crate::parser::protobuf::recursive::all_types::{EnumType, ExampleOneof, NestedMessage};
use crate::parser::protobuf::recursive::AllTypes;
use crate::parser::unified::Access;
use crate::parser::SpecificParserConfig;
use crate::source::{SourceEncode, SourceFormat, SourceStruct};

fn schema_dir() -> String {
let dir = PathBuf::from("src/test_data");
Expand All @@ -389,13 +389,11 @@ mod test {
proto_message_name: message_name.to_string(),
row_schema_location: location.to_string(),
use_schema_registry: false,
format: PbFormatType::Plain.into(),
row_encode: PbEncodeType::Protobuf.into(),
..Default::default()
};
let parser_config = SpecificParserConfig::new(
SourceStruct::new(SourceFormat::Plain, SourceEncode::Protobuf),
&info,
&HashMap::new(),
)?;
let parser_config = SpecificParserConfig::new(&info, &HashMap::new())?;
let conf = ProtobufParserConfig::new(parser_config.encoding_config).await?;
let value = DynamicMessage::decode(conf.message_descriptor, PRE_GEN_PROTO_DATA).unwrap();

Expand Down Expand Up @@ -436,13 +434,11 @@ mod test {
proto_message_name: message_name.to_string(),
row_schema_location: location.to_string(),
use_schema_registry: false,
format: PbFormatType::Plain.into(),
row_encode: PbEncodeType::Protobuf.into(),
..Default::default()
};
let parser_config = SpecificParserConfig::new(
SourceStruct::new(SourceFormat::Plain, SourceEncode::Protobuf),
&info,
&HashMap::new(),
)?;
let parser_config = SpecificParserConfig::new(&info, &HashMap::new())?;
let conf = ProtobufParserConfig::new(parser_config.encoding_config).await?;
let columns = conf.map_to_columns().unwrap();

Expand Down Expand Up @@ -487,14 +483,11 @@ mod test {
proto_message_name: message_name.to_string(),
row_schema_location: location.to_string(),
use_schema_registry: false,
format: PbFormatType::Plain.into(),
row_encode: PbEncodeType::Protobuf.into(),
..Default::default()
};
let parser_config = SpecificParserConfig::new(
SourceStruct::new(SourceFormat::Plain, SourceEncode::Protobuf),
&info,
&HashMap::new(),
)
.unwrap();
let parser_config = SpecificParserConfig::new(&info, &HashMap::new()).unwrap();
let conf = ProtobufParserConfig::new(parser_config.encoding_config)
.await
.unwrap();
Expand All @@ -516,14 +509,11 @@ mod test {
proto_message_name: message_name.to_string(),
row_schema_location: location.to_string(),
use_schema_registry: false,
format: PbFormatType::Plain.into(),
row_encode: PbEncodeType::Protobuf.into(),
..Default::default()
};
let parser_config = SpecificParserConfig::new(
SourceStruct::new(SourceFormat::Plain, SourceEncode::Protobuf),
&info,
&HashMap::new(),
)
.unwrap();
let parser_config = SpecificParserConfig::new(&info, &HashMap::new()).unwrap();

ProtobufParserConfig::new(parser_config.encoding_config)
.await
Expand Down
63 changes: 62 additions & 1 deletion src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::TableId;
use risingwave_common::error::{ErrorSuppressor, RwError};
use risingwave_common::types::{JsonbVal, Scalar};
use risingwave_pb::catalog::PbSource;
use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo};
use risingwave_pb::source::ConnectorSplit;
use risingwave_rpc_client::ConnectorClient;
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -249,6 +249,67 @@ impl SourceStruct {
}
}

// Only return valid (format, encode)
pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result<SourceStruct> {
use risingwave_pb::plan_common::{PbEncodeType, PbFormatType, RowFormatType};

// old version meta.
if let Ok(format) = info.get_row_format() {
let (format, encode) = match format {
RowFormatType::Json => (SourceFormat::Plain, SourceEncode::Json),
RowFormatType::Protobuf => (SourceFormat::Plain, SourceEncode::Protobuf),
RowFormatType::DebeziumJson => (SourceFormat::Debezium, SourceEncode::Json),
RowFormatType::Avro => (SourceFormat::Plain, SourceEncode::Avro),
RowFormatType::Maxwell => (SourceFormat::Maxwell, SourceEncode::Json),
RowFormatType::CanalJson => (SourceFormat::Canal, SourceEncode::Json),
RowFormatType::Csv => (SourceFormat::Plain, SourceEncode::Csv),
RowFormatType::Native => (SourceFormat::Native, SourceEncode::Native),
RowFormatType::DebeziumAvro => (SourceFormat::Debezium, SourceEncode::Avro),
RowFormatType::UpsertJson => (SourceFormat::Upsert, SourceEncode::Json),
RowFormatType::UpsertAvro => (SourceFormat::Upsert, SourceEncode::Avro),
RowFormatType::DebeziumMongoJson => (SourceFormat::DebeziumMongo, SourceEncode::Json),
RowFormatType::Bytes => (SourceFormat::Plain, SourceEncode::Bytes),
RowFormatType::RowUnspecified => unreachable!(),
};
return Ok(SourceStruct::new(format, encode));
}
let source_format = info.get_format().map_err(|e| anyhow!("{e:?}"))?;
let source_encode = info.get_row_encode().map_err(|e| anyhow!("{e:?}"))?;
let (format, encode) = match (source_format, source_encode) {
(PbFormatType::Plain, PbEncodeType::Json) => (SourceFormat::Plain, SourceEncode::Json),
(PbFormatType::Plain, PbEncodeType::Protobuf) => {
(SourceFormat::Plain, SourceEncode::Protobuf)
}
(PbFormatType::Debezium, PbEncodeType::Json) => {
(SourceFormat::Debezium, SourceEncode::Json)
}
(PbFormatType::Plain, PbEncodeType::Avro) => (SourceFormat::Plain, SourceEncode::Avro),
(PbFormatType::Maxwell, PbEncodeType::Json) => (SourceFormat::Maxwell, SourceEncode::Json),
(PbFormatType::Canal, PbEncodeType::Json) => (SourceFormat::Canal, SourceEncode::Json),
(PbFormatType::Plain, PbEncodeType::Csv) => (SourceFormat::Plain, SourceEncode::Csv),
(PbFormatType::Native, PbEncodeType::Native) => {
(SourceFormat::Native, SourceEncode::Native)
}
(PbFormatType::Debezium, PbEncodeType::Avro) => {
(SourceFormat::Debezium, SourceEncode::Avro)
}
(PbFormatType::Upsert, PbEncodeType::Json) => (SourceFormat::Upsert, SourceEncode::Json),
(PbFormatType::Upsert, PbEncodeType::Avro) => (SourceFormat::Upsert, SourceEncode::Avro),
(PbFormatType::DebeziumMongo, PbEncodeType::Json) => {
(SourceFormat::DebeziumMongo, SourceEncode::Json)
}
(PbFormatType::Plain, PbEncodeType::Bytes) => (SourceFormat::Plain, SourceEncode::Bytes),
(format, encode) => {
return Err(anyhow!(
"Unsupported combination of format {:?} and encode {:?}",
format,
encode
));
}
};
Ok(SourceStruct::new(format, encode))
}

pub type BoxSourceStream = BoxStream<'static, Result<Vec<SourceMessage>>>;

pub trait SourceWithStateStream =
Expand Down
3 changes: 1 addition & 2 deletions src/frontend/src/handler/alter_source_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::ColumnId;
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_connector::source::{SourceEncode, SourceStruct};
use risingwave_source::source_desc::extract_source_struct;
use risingwave_connector::source::{extract_source_struct, SourceEncode, SourceStruct};
use risingwave_sqlparser::ast::{
AlterSourceOperation, ColumnDef, CreateSourceStatement, ObjectName, Statement,
};
Expand Down
36 changes: 9 additions & 27 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ use risingwave_connector::source::filesystem::S3_CONNECTOR;
use risingwave_connector::source::nexmark::source::{get_event_data_types_with_names, EventType};
use risingwave_connector::source::test_source::TEST_CONNECTOR;
use risingwave_connector::source::{
SourceEncode, SourceFormat, SourceStruct, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR,
KINESIS_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR, PULSAR_CONNECTOR,
GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR,
PULSAR_CONNECTOR,
};
use risingwave_pb::catalog::{
PbSchemaRegistryNameStrategy, PbSource, StreamSourceInfo, WatermarkDesc,
Expand Down Expand Up @@ -99,11 +99,7 @@ async fn extract_avro_table_schema(
info: &StreamSourceInfo,
with_properties: &HashMap<String, String>,
) -> Result<Vec<ColumnCatalog>> {
let parser_config = SpecificParserConfig::new(
SourceStruct::new(SourceFormat::Plain, SourceEncode::Avro),
info,
with_properties,
)?;
let parser_config = SpecificParserConfig::new(info, with_properties)?;
let conf = AvroParserConfig::new(parser_config.encoding_config).await?;
let vec_column_desc = conf.map_to_columns()?;
Ok(vec_column_desc
Expand All @@ -120,11 +116,7 @@ async fn extract_upsert_avro_table_schema(
info: &StreamSourceInfo,
with_properties: &HashMap<String, String>,
) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
let parser_config = SpecificParserConfig::new(
SourceStruct::new(SourceFormat::Upsert, SourceEncode::Avro),
info,
with_properties,
)?;
let parser_config = SpecificParserConfig::new(info, with_properties)?;
let conf = AvroParserConfig::new(parser_config.encoding_config).await?;
let vec_column_desc = conf.map_to_columns()?;
let mut vec_column_catalog = vec_column_desc
Expand Down Expand Up @@ -164,11 +156,7 @@ async fn extract_debezium_avro_table_pk_columns(
info: &StreamSourceInfo,
with_properties: &HashMap<String, String>,
) -> Result<Vec<String>> {
let parser_config = SpecificParserConfig::new(
SourceStruct::new(SourceFormat::Debezium, SourceEncode::Avro),
info,
with_properties,
)?;
let parser_config = SpecificParserConfig::new(info, with_properties)?;
let conf = DebeziumAvroParserConfig::new(parser_config.encoding_config).await?;
Ok(conf.extract_pks()?.drain(..).map(|c| c.name).collect())
}
Expand All @@ -178,11 +166,7 @@ async fn extract_debezium_avro_table_schema(
info: &StreamSourceInfo,
with_properties: &HashMap<String, String>,
) -> Result<Vec<ColumnCatalog>> {
let parser_config = SpecificParserConfig::new(
SourceStruct::new(SourceFormat::Debezium, SourceEncode::Avro),
info,
with_properties,
)?;
let parser_config = SpecificParserConfig::new(info, with_properties)?;
let conf = DebeziumAvroParserConfig::new(parser_config.encoding_config).await?;
let vec_column_desc = conf.map_to_columns()?;
let column_catalog = vec_column_desc
Expand All @@ -204,13 +188,11 @@ async fn extract_protobuf_table_schema(
proto_message_name: schema.message_name.0.clone(),
row_schema_location: schema.row_schema_location.0.clone(),
use_schema_registry: schema.use_schema_registry,
format: FormatType::Plain.into(),
row_encode: EncodeType::Protobuf.into(),
..Default::default()
};
let parser_config = SpecificParserConfig::new(
SourceStruct::new(SourceFormat::Plain, SourceEncode::Protobuf),
&info,
&with_properties,
)?;
let parser_config = SpecificParserConfig::new(&info, &with_properties)?;
let conf = ProtobufParserConfig::new(parser_config.encoding_config).await?;

let column_descs = conf.map_to_columns()?;
Expand Down
Loading

0 comments on commit 4dff44f

Please sign in to comment.