From d97e80703b1c2c2b7c50de2c90f335ad424c9bc6 Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 30 May 2024 15:26:15 +0800 Subject: [PATCH 1/2] minor refactor on avro --- .typos.toml | 1 + scripts/source/schema_registry_producer.py | 7 +- src/connector/src/parser/avro/parser.rs | 2 +- .../src/parser/debezium/avro_parser.rs | 2 +- src/connector/src/parser/unified/avro.rs | 69 ++++++++++--------- 5 files changed, 45 insertions(+), 36 deletions(-) diff --git a/.typos.toml b/.typos.toml index c062e9de44d2d..7dcf4af6257d4 100644 --- a/.typos.toml +++ b/.typos.toml @@ -6,6 +6,7 @@ inout = "inout" # This is a SQL keyword! numer = "numer" # numerator nd = "nd" # N-dimentional / 2nd steam = "stream" # You played with Steam games too much. +ser = "ser" # Serialization # Some weird short variable names ot = "ot" bui = "bui" # BackwardUserIterator diff --git a/scripts/source/schema_registry_producer.py b/scripts/source/schema_registry_producer.py index 79a3d4db1b40f..a88861b65bd26 100644 --- a/scripts/source/schema_registry_producer.py +++ b/scripts/source/schema_registry_producer.py @@ -39,8 +39,11 @@ def load_avro_json(encoded, schema): if __name__ == '__main__': - if len(sys.argv) < 5: - print("datagen.py ") + if len(sys.argv) <= 5: + print( + "usage: schema_registry_producer.py " + ) + exit(1) broker_list = sys.argv[1] schema_registry_url = sys.argv[2] file = sys.argv[3] diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index 7f498a055ac7e..8565ab9dd20ce 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -45,7 +45,7 @@ impl AccessBuilder for AvroAccessBuilder { self.value = self.parse_avro_value(&payload, Some(&*self.schema)).await?; Ok(AccessImpl::Avro(AvroAccess::new( self.value.as_ref().unwrap(), - AvroParseOptions::default().with_schema(&self.schema), + AvroParseOptions::create(&self.schema), ))) } } diff --git a/src/connector/src/parser/debezium/avro_parser.rs b/src/connector/src/parser/debezium/avro_parser.rs index 8d73a789b2669..656904c5d50c9 100644 --- a/src/connector/src/parser/debezium/avro_parser.rs +++ b/src/connector/src/parser/debezium/avro_parser.rs @@ -59,7 +59,7 @@ impl AccessBuilder for DebeziumAvroAccessBuilder { }; Ok(AccessImpl::Avro(AvroAccess::new( self.value.as_mut().unwrap(), - AvroParseOptions::default().with_schema(match self.encoding_type { + AvroParseOptions::create(match self.encoding_type { EncodingType::Key => self.key_schema.as_mut().unwrap(), EncodingType::Value => &self.schema, }), diff --git a/src/connector/src/parser/unified/avro.rs b/src/connector/src/parser/unified/avro.rs index bbab918f5be1d..2c94eb47ccfd1 100644 --- a/src/connector/src/parser/unified/avro.rs +++ b/src/connector/src/parser/unified/avro.rs @@ -34,27 +34,23 @@ use crate::parser::avro::util::avro_to_jsonb; #[derive(Clone)] /// Options for parsing an `AvroValue` into Datum, with an optional avro schema. pub struct AvroParseOptions<'a> { + /// Currently, this schema is only used for decimal pub schema: Option<&'a Schema>, /// Strict Mode /// If strict mode is disabled, an int64 can be parsed from an `AvroInt` (int32) value. pub relax_numeric: bool, } -impl<'a> Default for AvroParseOptions<'a> { - fn default() -> Self { +impl<'a> AvroParseOptions<'a> { + pub fn create(schema: &'a Schema) -> Self { Self { - schema: None, + schema: Some(schema), relax_numeric: true, } } } impl<'a> AvroParseOptions<'a> { - pub fn with_schema(mut self, schema: &'a Schema) -> Self { - self.schema = Some(schema); - self - } - fn extract_inner_schema(&self, key: Option<&'a str>) -> Option<&'a Schema> { self.schema .map(|schema| avro_extract_field_schema(schema, key)) @@ -71,15 +67,23 @@ impl<'a> AvroParseOptions<'a> { } /// Parse an avro value into expected type. - /// 3 kinds of type info are used to parsing things. - /// - `type_expected`. The type that we expect the value is. - /// - value type. The type info together with the value argument. - /// - schema. The `AvroSchema` provided in option. - /// If both `type_expected` and schema are provided, it will check both strictly. - /// If only `type_expected` is provided, it will try to match the value type and the - /// `type_expected`, converting the value if possible. If only value is provided (without - /// schema and `type_expected`), the `DateType` will be inferred. - pub fn parse<'b>(&self, value: &'b Value, type_expected: Option<&'b DataType>) -> AccessResult + /// + /// 3 kinds of type info are used to parsing: + /// - `type_expected`. The type that we expect the value is. + /// - value type. The type info together with the value argument. + /// - schema. The `AvroSchema` provided in option. + /// + /// Cases: (FIXME: Is this precise?) + /// - If both `type_expected` and schema are provided, it will check both strictly. + /// - If only `type_expected` is provided, it will try to match the value type and the + /// `type_expected`, converting the value if possible. + /// - If only value is provided (without schema and `type_expected`), + /// the `DataType` will be inferred. + pub fn convert_to_datum<'b>( + &self, + value: &'b Value, + type_expected: Option<&'b DataType>, + ) -> AccessResult where 'b: 'a, { @@ -97,7 +101,7 @@ impl<'a> AvroParseOptions<'a> { schema, relax_numeric: self.relax_numeric, } - .parse(v, type_expected); + .convert_to_datum(v, type_expected); } // ---- Boolean ----- (Some(DataType::Boolean) | None, Value::Boolean(b)) => (*b).into(), @@ -224,7 +228,7 @@ impl<'a> AvroParseOptions<'a> { schema, relax_numeric: self.relax_numeric, } - .parse(value, Some(field_type))?) + .convert_to_datum(value, Some(field_type))?) } else { Ok(None) } @@ -241,7 +245,7 @@ impl<'a> AvroParseOptions<'a> { schema, relax_numeric: self.relax_numeric, } - .parse(field_value, None) + .convert_to_datum(field_value, None) }) .collect::, AccessError>>()?; ScalarImpl::Struct(StructValue::new(rw_values)) @@ -255,7 +259,7 @@ impl<'a> AvroParseOptions<'a> { schema, relax_numeric: self.relax_numeric, } - .parse(v, Some(item_type))?; + .convert_to_datum(v, Some(item_type))?; builder.append(value); } builder.finish() @@ -325,7 +329,7 @@ where Err(create_error())?; } - options.parse(value, type_expected) + options.convert_to_datum(value, type_expected) } } @@ -484,12 +488,9 @@ mod tests { value_schema: &Schema, shape: &DataType, ) -> crate::error::ConnectorResult { - AvroParseOptions { - schema: Some(value_schema), - relax_numeric: true, - } - .parse(&value, Some(shape)) - .map_err(Into::into) + AvroParseOptions::create(value_schema) + .convert_to_datum(&value, Some(shape)) + .map_err(Into::into) } #[test] @@ -529,8 +530,10 @@ mod tests { .unwrap(); let bytes = vec![0x3f, 0x3f, 0x3f, 0x3f, 0x3f, 0x3f, 0x3f]; let value = Value::Decimal(AvroDecimal::from(bytes)); - let options = AvroParseOptions::default().with_schema(&schema); - let resp = options.parse(&value, Some(&DataType::Decimal)).unwrap(); + let options = AvroParseOptions::create(&schema); + let resp = options + .convert_to_datum(&value, Some(&DataType::Decimal)) + .unwrap(); assert_eq!( resp, Some(ScalarImpl::Decimal(Decimal::Normalized( @@ -566,8 +569,10 @@ mod tests { ("value".to_string(), Value::Bytes(vec![0x01, 0x02, 0x03])), ]); - let options = AvroParseOptions::default().with_schema(&schema); - let resp = options.parse(&value, Some(&DataType::Decimal)).unwrap(); + let options = AvroParseOptions::create(&schema); + let resp = options + .convert_to_datum(&value, Some(&DataType::Decimal)) + .unwrap(); assert_eq!(resp, Some(ScalarImpl::Decimal(Decimal::from(66051)))); } } From d9e32af98e07d92b8e31f426617d29f44b782b16 Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 30 May 2024 17:15:27 +0800 Subject: [PATCH 2/2] rename ConfluentSchemaResolver to Cache Signed-off-by: xxchan --- src/connector/src/parser/avro/parser.rs | 8 ++++---- src/connector/src/parser/avro/schema_resolver.rs | 7 ++++--- src/connector/src/parser/debezium/avro_parser.rs | 8 ++++---- src/connector/src/parser/json_parser.rs | 4 ++-- 4 files changed, 14 insertions(+), 13 deletions(-) diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index 8565ab9dd20ce..b37417c41ee40 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -21,7 +21,7 @@ use apache_avro::{from_avro_datum, Reader, Schema}; use risingwave_common::{bail, try_match_expand}; use risingwave_pb::plan_common::ColumnDesc; -use super::schema_resolver::ConfluentSchemaResolver; +use super::schema_resolver::ConfluentSchemaCache; use super::util::avro_schema_to_column_descs; use crate::error::ConnectorResult; use crate::parser::unified::avro::{AvroAccess, AvroParseOptions}; @@ -36,7 +36,7 @@ use crate::schema::schema_registry::{ #[derive(Debug)] pub struct AvroAccessBuilder { schema: Arc, - pub schema_resolver: Option>, + pub schema_resolver: Option>, value: Option, } @@ -100,7 +100,7 @@ impl AvroAccessBuilder { pub struct AvroParserConfig { pub schema: Arc, pub key_schema: Option>, - pub schema_resolver: Option>, + pub schema_resolver: Option>, pub map_handling: Option, } @@ -122,7 +122,7 @@ impl AvroParserConfig { let url = handle_sr_list(schema_location.as_str())?; if use_schema_registry { let client = Client::new(url, &client_config)?; - let resolver = ConfluentSchemaResolver::new(client); + let resolver = ConfluentSchemaCache::new(client); let subject_key = if enable_upsert { Some(get_subject_by_strategy( diff --git a/src/connector/src/parser/avro/schema_resolver.rs b/src/connector/src/parser/avro/schema_resolver.rs index cdc52de7accee..72410e51ab162 100644 --- a/src/connector/src/parser/avro/schema_resolver.rs +++ b/src/connector/src/parser/avro/schema_resolver.rs @@ -21,13 +21,14 @@ use moka::future::Cache; use crate::error::ConnectorResult; use crate::schema::schema_registry::{Client, ConfluentSchema}; +/// TODO: support protobuf #[derive(Debug)] -pub struct ConfluentSchemaResolver { +pub struct ConfluentSchemaCache { writer_schemas: Cache>, confluent_client: Client, } -impl ConfluentSchemaResolver { +impl ConfluentSchemaCache { async fn parse_and_cache_schema( &self, raw_schema: ConfluentSchema, @@ -43,7 +44,7 @@ impl ConfluentSchemaResolver { /// Create a new `ConfluentSchemaResolver` pub fn new(client: Client) -> Self { - ConfluentSchemaResolver { + ConfluentSchemaCache { writer_schemas: Cache::new(u64::MAX), confluent_client: client, } diff --git a/src/connector/src/parser/debezium/avro_parser.rs b/src/connector/src/parser/debezium/avro_parser.rs index 656904c5d50c9..50762171106fc 100644 --- a/src/connector/src/parser/debezium/avro_parser.rs +++ b/src/connector/src/parser/debezium/avro_parser.rs @@ -22,7 +22,7 @@ use risingwave_pb::catalog::PbSchemaRegistryNameStrategy; use risingwave_pb::plan_common::ColumnDesc; use crate::error::ConnectorResult; -use crate::parser::avro::schema_resolver::ConfluentSchemaResolver; +use crate::parser::avro::schema_resolver::ConfluentSchemaCache; use crate::parser::avro::util::avro_schema_to_column_descs; use crate::parser::unified::avro::{ avro_extract_field_schema, avro_schema_skip_union, AvroAccess, AvroParseOptions, @@ -41,7 +41,7 @@ const PAYLOAD: &str = "payload"; #[derive(Debug)] pub struct DebeziumAvroAccessBuilder { schema: Schema, - schema_resolver: Arc, + schema_resolver: Arc, key_schema: Option>, value: Option, encoding_type: EncodingType, @@ -96,7 +96,7 @@ impl DebeziumAvroAccessBuilder { pub struct DebeziumAvroParserConfig { pub key_schema: Arc, pub outer_schema: Arc, - pub schema_resolver: Arc, + pub schema_resolver: Arc, } impl DebeziumAvroParserConfig { @@ -107,7 +107,7 @@ impl DebeziumAvroParserConfig { let kafka_topic = &avro_config.topic; let url = handle_sr_list(schema_location)?; let client = Client::new(url, client_config)?; - let resolver = ConfluentSchemaResolver::new(client); + let resolver = ConfluentSchemaCache::new(client); let name_strategy = &PbSchemaRegistryNameStrategy::Unspecified; let key_subject = get_subject_by_strategy(name_strategy, kafka_topic, None, true)?; diff --git a/src/connector/src/parser/json_parser.rs b/src/connector/src/parser/json_parser.rs index 3621fbc2724b3..f9f5b1c848c46 100644 --- a/src/connector/src/parser/json_parser.rs +++ b/src/connector/src/parser/json_parser.rs @@ -21,7 +21,7 @@ use jst::{convert_avro, Context}; use risingwave_common::{bail, try_match_expand}; use risingwave_pb::plan_common::ColumnDesc; -use super::avro::schema_resolver::ConfluentSchemaResolver; +use super::avro::schema_resolver::ConfluentSchemaCache; use super::unified::Access; use super::util::{bytes_from_url, get_kafka_topic}; use super::{EncodingProperties, JsonProperties, SchemaRegistryAuth, SpecificParserConfig}; @@ -161,7 +161,7 @@ pub async fn schema_to_columns( let json_schema = if let Some(schema_registry_auth) = schema_registry_auth { let client = Client::new(url, &schema_registry_auth)?; let topic = get_kafka_topic(props)?; - let resolver = ConfluentSchemaResolver::new(client); + let resolver = ConfluentSchemaCache::new(client); let content = resolver .get_raw_schema_by_subject_name(&format!("{}-value", topic)) .await?