Skip to content

Commit

Permalink
hardcoded EncodingType::Value
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu committed Jul 19, 2024
1 parent 2d44fbc commit 6c58cb2
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 31 deletions.
9 changes: 3 additions & 6 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::error::ConnectorResult;
use crate::parser::unified::AccessImpl;
use crate::parser::util::bytes_from_url;
use crate::parser::{
AccessBuilder, AvroProperties, EncodingProperties, EncodingType, MapHandling, SchemaLocation,
AccessBuilder, AvroProperties, EncodingProperties, MapHandling, SchemaLocation,
};
use crate::schema::schema_registry::{
extract_schema_id, get_subject_by_strategy, handle_sr_list, Client,
Expand All @@ -55,17 +55,14 @@ impl AccessBuilder for AvroAccessBuilder {
}

impl AvroAccessBuilder {
pub fn new(config: AvroParserConfig, encoding_type: EncodingType) -> ConnectorResult<Self> {
pub fn new(config: AvroParserConfig) -> ConnectorResult<Self> {
let AvroParserConfig {
schema,
writer_schema_cache,
..
} = config;
Ok(Self {
schema: match encoding_type {
EncodingType::Key => bail!("Avro with empty key schema"),
EncodingType::Value => schema,
},
schema,
writer_schema_cache,
value: None,
})
Expand Down
3 changes: 0 additions & 3 deletions src/connector/src/parser/debezium/debezium_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,6 @@ async fn build_accessor_builder(
.unwrap_or(TimestamptzHandling::GuessNumberUnit),
)?,
)),
EncodingProperties::Protobuf(_) => {
Ok(AccessBuilderImpl::new_default(config, encoding_type).await?)
}
_ => bail!("unsupported encoding for Debezium"),
}
}
Expand Down
6 changes: 2 additions & 4 deletions src/connector/src/parser/maxwell/maxwell_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::only_parse_payload;
use crate::parser::unified::maxwell::MaxwellChangeEvent;
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
use crate::parser::{
AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, ParserFormat,
AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, ParserFormat,
SourceStreamChunkRowWriter, SpecificParserConfig,
};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};
Expand All @@ -39,9 +39,7 @@ impl MaxwellParser {
) -> ConnectorResult<Self> {
match props.encoding_config {
EncodingProperties::Json(_) => {
let payload_builder =
AccessBuilderImpl::new_default(props.encoding_config, EncodingType::Value)
.await?;
let payload_builder = AccessBuilderImpl::new_default(props.encoding_config).await?;
Ok(Self {
payload_builder,
rw_columns,
Expand Down
7 changes: 2 additions & 5 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -869,14 +869,11 @@ pub enum AccessBuilderImpl {
}

impl AccessBuilderImpl {
pub async fn new_default(
config: EncodingProperties,
kv: EncodingType,
) -> ConnectorResult<Self> {
pub async fn new_default(config: EncodingProperties) -> ConnectorResult<Self> {
let accessor = match config {
EncodingProperties::Avro(_) => {
let config = AvroParserConfig::new(config).await?;
AccessBuilderImpl::Avro(AvroAccessBuilder::new(config, kv)?)
AccessBuilderImpl::Avro(AvroAccessBuilder::new(config)?)
}
EncodingProperties::Protobuf(_) => {
let config = ProtobufParserConfig::new(config).await?;
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use risingwave_common::bail;
use super::unified::json::TimestamptzHandling;
use super::unified::kv_event::KvEvent;
use super::{
AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType,
SourceStreamChunkRowWriter, SpecificParserConfig,
AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, SourceStreamChunkRowWriter,
SpecificParserConfig,
};
use crate::error::ConnectorResult;
use crate::parser::bytes_parser::BytesAccessBuilder;
Expand Down Expand Up @@ -61,7 +61,7 @@ impl PlainParser {
| EncodingProperties::Protobuf(_)
| EncodingProperties::Avro(_)
| EncodingProperties::Bytes(_) => {
AccessBuilderImpl::new_default(props.encoding_config, EncodingType::Value).await?
AccessBuilderImpl::new_default(props.encoding_config).await?
}
_ => bail!("Unsupported encoding for Plain"),
};
Expand Down
14 changes: 4 additions & 10 deletions src/connector/src/parser/upsert_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColum
use super::bytes_parser::BytesAccessBuilder;
use super::unified::{AccessImpl, ChangeEventOperation};
use super::{
AccessBuilderImpl, ByteStreamSourceParser, BytesProperties, EncodingProperties, EncodingType,
AccessBuilderImpl, ByteStreamSourceParser, BytesProperties, EncodingProperties,
SourceStreamChunkRowWriter, SpecificParserConfig,
};
use crate::error::ConnectorResult;
Expand All @@ -34,16 +34,11 @@ pub struct UpsertParser {
source_ctx: SourceContextRef,
}

async fn build_accessor_builder(
config: EncodingProperties,
encoding_type: EncodingType,
) -> ConnectorResult<AccessBuilderImpl> {
async fn build_accessor_builder(config: EncodingProperties) -> ConnectorResult<AccessBuilderImpl> {
match config {
EncodingProperties::Json(_)
| EncodingProperties::Protobuf(_)
| EncodingProperties::Avro(_) => {
Ok(AccessBuilderImpl::new_default(config, encoding_type).await?)
}
| EncodingProperties::Avro(_) => Ok(AccessBuilderImpl::new_default(config).await?),
_ => bail!("unsupported encoding for Upsert"),
}
}
Expand Down Expand Up @@ -80,8 +75,7 @@ impl UpsertParser {
} else {
unreachable!("format upsert must have key column")
};
let payload_builder =
build_accessor_builder(props.encoding_config, EncodingType::Value).await?;
let payload_builder = build_accessor_builder(props.encoding_config).await?;
Ok(Self {
key_builder,
payload_builder,
Expand Down

0 comments on commit 6c58cb2

Please sign in to comment.