diff --git a/e2e_test/source_inline/kafka/avro/name_strategy.slt b/e2e_test/source_inline/kafka/avro/name_strategy.slt index e48be738886f..212c79e6f31a 100644 --- a/e2e_test/source_inline/kafka/avro/name_strategy.slt +++ b/e2e_test/source_inline/kafka/avro/name_strategy.slt @@ -84,17 +84,6 @@ python3 scripts/source/schema_registry_producer.py "${RISEDEV_KAFKA_BOOTSTRAP_S -statement error SCHEMA_REGISTRY_NAME_STRATEGY_TOPIC_RECORD_NAME_STRATEGY expect non-empty field key\.message -create table t_topic_record () with ( - connector = 'kafka', - topic = 'upsert_avro_json-topic-record', - properties.bootstrap.server = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}' -) format upsert encode avro ( - schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}', - schema.registry.name.strategy = 'topic_record_name_strategy', - message = 'CPLM.OBJ_ATTRIBUTE_VALUE' -); - statement ok create table t_topic_record (primary key(rw_key)) INCLUDE KEY AS rw_key @@ -105,8 +94,7 @@ with ( ) format upsert encode avro ( schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}', schema.registry.name.strategy = 'topic_record_name_strategy', - message = 'CPLM.OBJ_ATTRIBUTE_VALUE', - key.message = 'string' + message = 'CPLM.OBJ_ATTRIBUTE_VALUE' ); diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index 68ac7d446846..dde74a999ac9 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -29,7 +29,7 @@ use crate::error::ConnectorResult; use crate::parser::unified::AccessImpl; use crate::parser::util::bytes_from_url; use crate::parser::{ - AccessBuilder, AvroProperties, EncodingProperties, EncodingType, MapHandling, SchemaLocation, + AccessBuilder, AvroProperties, EncodingProperties, MapHandling, SchemaLocation, }; use crate::schema::schema_registry::{ extract_schema_id, get_subject_by_strategy, handle_sr_list, Client, @@ -55,18 +55,14 @@ impl AccessBuilder for AvroAccessBuilder { } impl AvroAccessBuilder { - pub fn new(config: AvroParserConfig, encoding_type: EncodingType) -> ConnectorResult { + pub fn new(config: AvroParserConfig) -> ConnectorResult { let AvroParserConfig { schema, - key_schema, writer_schema_cache, .. } = config; Ok(Self { - schema: match encoding_type { - EncodingType::Key => key_schema.context("Avro with empty key schema")?, - EncodingType::Value => schema, - }, + schema, writer_schema_cache, value: None, }) @@ -148,7 +144,6 @@ impl AvroAccessBuilder { #[derive(Debug, Clone)] pub struct AvroParserConfig { schema: Arc, - key_schema: Option>, /// Writer schema is the schema used to write the data. When parsing Avro data, the exactly same schema /// must be used to decode the message, and then convert it with the reader schema. writer_schema_cache: WriterSchemaCache, @@ -167,7 +162,6 @@ impl AvroParserConfig { pub async fn new(encoding_properties: EncodingProperties) -> ConnectorResult { let AvroProperties { schema_location, - enable_upsert, record_name, key_record_name, map_handling, @@ -183,38 +177,21 @@ impl AvroParserConfig { let client = Client::new(url, &client_config)?; let resolver = ConfluentSchemaCache::new(client); - let subject_key = if enable_upsert { - Some(get_subject_by_strategy( - &name_strategy, - topic.as_str(), - key_record_name.as_deref(), - true, - )?) - } else { - if let Some(name) = &key_record_name { - bail!("unused FORMAT ENCODE option: key.message='{name}'"); - } - None - }; + if let Some(name) = &key_record_name { + bail!("unused FORMAT ENCODE option: key.message='{name}'"); + } let subject_value = get_subject_by_strategy( &name_strategy, topic.as_str(), record_name.as_deref(), false, )?; - tracing::debug!("infer key subject {subject_key:?}, value subject {subject_value}"); + tracing::debug!("value subject {subject_value}"); Ok(Self { schema: Arc::new(ResolvedAvroSchema::create( resolver.get_by_subject(&subject_value).await?, )?), - key_schema: if let Some(subject_key) = subject_key { - Some(Arc::new(ResolvedAvroSchema::create( - resolver.get_by_subject(&subject_key).await?, - )?)) - } else { - None - }, writer_schema_cache: WriterSchemaCache::Confluent(Arc::new(resolver)), map_handling, }) @@ -224,16 +201,12 @@ impl AvroParserConfig { aws_auth_props, } => { let url = handle_sr_list(schema_location.as_str())?; - if enable_upsert { - bail!("avro upsert without schema registry is not supported"); - } let url = url.first().unwrap(); let schema_content = bytes_from_url(url, aws_auth_props.as_ref()).await?; let schema = Schema::parse_reader(&mut schema_content.as_slice()) .context("failed to parse avro schema")?; Ok(Self { schema: Arc::new(ResolvedAvroSchema::create(Arc::new(schema))?), - key_schema: None, writer_schema_cache: WriterSchemaCache::File, map_handling, }) @@ -248,7 +221,6 @@ impl AvroParserConfig { let schema = resolver.get_by_name(&schema_arn).await?; Ok(Self { schema: Arc::new(ResolvedAvroSchema::create(schema)?), - key_schema: None, writer_schema_cache: WriterSchemaCache::Glue(Arc::new(resolver)), map_handling, }) diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs index bc43e556cb4f..398659392077 100644 --- a/src/connector/src/parser/debezium/debezium_parser.rs +++ b/src/connector/src/parser/debezium/debezium_parser.rs @@ -75,9 +75,6 @@ async fn build_accessor_builder( .unwrap_or(TimestamptzHandling::GuessNumberUnit), )?, )), - EncodingProperties::Protobuf(_) => { - Ok(AccessBuilderImpl::new_default(config, encoding_type).await?) - } _ => bail!("unsupported encoding for Debezium"), } } diff --git a/src/connector/src/parser/maxwell/maxwell_parser.rs b/src/connector/src/parser/maxwell/maxwell_parser.rs index 8ba95ad21213..1f13c8a8bc30 100644 --- a/src/connector/src/parser/maxwell/maxwell_parser.rs +++ b/src/connector/src/parser/maxwell/maxwell_parser.rs @@ -19,7 +19,7 @@ use crate::only_parse_payload; use crate::parser::unified::maxwell::MaxwellChangeEvent; use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; use crate::parser::{ - AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, ParserFormat, + AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, ParserFormat, SourceStreamChunkRowWriter, SpecificParserConfig, }; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; @@ -39,9 +39,7 @@ impl MaxwellParser { ) -> ConnectorResult { match props.encoding_config { EncodingProperties::Json(_) => { - let payload_builder = - AccessBuilderImpl::new_default(props.encoding_config, EncodingType::Value) - .await?; + let payload_builder = AccessBuilderImpl::new_default(props.encoding_config).await?; Ok(Self { payload_builder, rw_columns, diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index fcf945184458..5029986fee30 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -869,14 +869,11 @@ pub enum AccessBuilderImpl { } impl AccessBuilderImpl { - pub async fn new_default( - config: EncodingProperties, - kv: EncodingType, - ) -> ConnectorResult { + pub async fn new_default(config: EncodingProperties) -> ConnectorResult { let accessor = match config { EncodingProperties::Avro(_) => { let config = AvroParserConfig::new(config).await?; - AccessBuilderImpl::Avro(AvroAccessBuilder::new(config, kv)?) + AccessBuilderImpl::Avro(AvroAccessBuilder::new(config)?) } EncodingProperties::Protobuf(_) => { let config = ProtobufParserConfig::new(config).await?; @@ -1072,7 +1069,6 @@ impl SpecificParserConfig { #[derive(Debug, Default, Clone)] pub struct AvroProperties { pub schema_location: SchemaLocation, - pub enable_upsert: bool, pub record_name: Option, pub key_record_name: Option, pub map_handling: Option, @@ -1224,9 +1220,6 @@ impl SpecificParserConfig { map_handling: MapHandling::from_options(&format_encode_options_with_secret)?, ..Default::default() }; - if format == SourceFormat::Upsert { - config.enable_upsert = true; - } config.schema_location = if let Some(schema_arn) = format_encode_options_with_secret.get(AWS_GLUE_SCHEMA_ARN_KEY) { diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index f34fd2983717..1454ca8ade1f 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -17,8 +17,8 @@ use risingwave_common::bail; use super::unified::json::TimestamptzHandling; use super::unified::kv_event::KvEvent; use super::{ - AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, - SourceStreamChunkRowWriter, SpecificParserConfig, + AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, SourceStreamChunkRowWriter, + SpecificParserConfig, }; use crate::error::ConnectorResult; use crate::parser::bytes_parser::BytesAccessBuilder; @@ -61,7 +61,7 @@ impl PlainParser { | EncodingProperties::Protobuf(_) | EncodingProperties::Avro(_) | EncodingProperties::Bytes(_) => { - AccessBuilderImpl::new_default(props.encoding_config, EncodingType::Value).await? + AccessBuilderImpl::new_default(props.encoding_config).await? } _ => bail!("Unsupported encoding for Plain"), }; diff --git a/src/connector/src/parser/upsert_parser.rs b/src/connector/src/parser/upsert_parser.rs index df5e3b66e313..c7bcce9f86a8 100644 --- a/src/connector/src/parser/upsert_parser.rs +++ b/src/connector/src/parser/upsert_parser.rs @@ -18,7 +18,7 @@ use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColum use super::bytes_parser::BytesAccessBuilder; use super::unified::{AccessImpl, ChangeEventOperation}; use super::{ - AccessBuilderImpl, ByteStreamSourceParser, BytesProperties, EncodingProperties, EncodingType, + AccessBuilderImpl, ByteStreamSourceParser, BytesProperties, EncodingProperties, SourceStreamChunkRowWriter, SpecificParserConfig, }; use crate::error::ConnectorResult; @@ -34,16 +34,11 @@ pub struct UpsertParser { source_ctx: SourceContextRef, } -async fn build_accessor_builder( - config: EncodingProperties, - encoding_type: EncodingType, -) -> ConnectorResult { +async fn build_accessor_builder(config: EncodingProperties) -> ConnectorResult { match config { EncodingProperties::Json(_) | EncodingProperties::Protobuf(_) - | EncodingProperties::Avro(_) => { - Ok(AccessBuilderImpl::new_default(config, encoding_type).await?) - } + | EncodingProperties::Avro(_) => Ok(AccessBuilderImpl::new_default(config).await?), _ => bail!("unsupported encoding for Upsert"), } } @@ -80,8 +75,7 @@ impl UpsertParser { } else { unreachable!("format upsert must have key column") }; - let payload_builder = - build_accessor_builder(props.encoding_config, EncodingType::Value).await?; + let payload_builder = build_accessor_builder(props.encoding_config).await?; Ok(Self { key_builder, payload_builder,