Skip to content

Commit

Permalink
refactor: minor refactor on avro (#17024)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored May 30, 2024
1 parent 4bef086 commit 0c8b036
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 49 deletions.
1 change: 1 addition & 0 deletions .typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ inout = "inout" # This is a SQL keyword!
numer = "numer" # numerator
nd = "nd" # N-dimentional / 2nd
steam = "stream" # You played with Steam games too much.
ser = "ser" # Serialization
# Some weird short variable names
ot = "ot"
bui = "bui" # BackwardUserIterator
Expand Down
7 changes: 5 additions & 2 deletions scripts/source/schema_registry_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ def load_avro_json(encoded, schema):


if __name__ == '__main__':
if len(sys.argv) < 5:
print("datagen.py <brokerlist> <schema-registry-url> <file> <name-strategy> <json/avro>")
if len(sys.argv) <= 5:
print(
"usage: schema_registry_producer.py <brokerlist> <schema-registry-url> <file> <name-strategy> <json/avro>"
)
exit(1)
broker_list = sys.argv[1]
schema_registry_url = sys.argv[2]
file = sys.argv[3]
Expand Down
10 changes: 5 additions & 5 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use apache_avro::{from_avro_datum, Reader, Schema};
use risingwave_common::{bail, try_match_expand};
use risingwave_pb::plan_common::ColumnDesc;

use super::schema_resolver::ConfluentSchemaResolver;
use super::schema_resolver::ConfluentSchemaCache;
use super::util::avro_schema_to_column_descs;
use crate::error::ConnectorResult;
use crate::parser::unified::avro::{AvroAccess, AvroParseOptions};
Expand All @@ -36,7 +36,7 @@ use crate::schema::schema_registry::{
#[derive(Debug)]
pub struct AvroAccessBuilder {
schema: Arc<Schema>,
pub schema_resolver: Option<Arc<ConfluentSchemaResolver>>,
pub schema_resolver: Option<Arc<ConfluentSchemaCache>>,
value: Option<Value>,
}

Expand All @@ -45,7 +45,7 @@ impl AccessBuilder for AvroAccessBuilder {
self.value = self.parse_avro_value(&payload, Some(&*self.schema)).await?;
Ok(AccessImpl::Avro(AvroAccess::new(
self.value.as_ref().unwrap(),
AvroParseOptions::default().with_schema(&self.schema),
AvroParseOptions::create(&self.schema),
)))
}
}
Expand Down Expand Up @@ -100,7 +100,7 @@ impl AvroAccessBuilder {
pub struct AvroParserConfig {
pub schema: Arc<Schema>,
pub key_schema: Option<Arc<Schema>>,
pub schema_resolver: Option<Arc<ConfluentSchemaResolver>>,
pub schema_resolver: Option<Arc<ConfluentSchemaCache>>,

pub map_handling: Option<MapHandling>,
}
Expand All @@ -122,7 +122,7 @@ impl AvroParserConfig {
let url = handle_sr_list(schema_location.as_str())?;
if use_schema_registry {
let client = Client::new(url, &client_config)?;
let resolver = ConfluentSchemaResolver::new(client);
let resolver = ConfluentSchemaCache::new(client);

let subject_key = if enable_upsert {
Some(get_subject_by_strategy(
Expand Down
7 changes: 4 additions & 3 deletions src/connector/src/parser/avro/schema_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ use moka::future::Cache;
use crate::error::ConnectorResult;
use crate::schema::schema_registry::{Client, ConfluentSchema};

/// TODO: support protobuf
#[derive(Debug)]
pub struct ConfluentSchemaResolver {
pub struct ConfluentSchemaCache {
writer_schemas: Cache<i32, Arc<Schema>>,
confluent_client: Client,
}

impl ConfluentSchemaResolver {
impl ConfluentSchemaCache {
async fn parse_and_cache_schema(
&self,
raw_schema: ConfluentSchema,
Expand All @@ -43,7 +44,7 @@ impl ConfluentSchemaResolver {

/// Create a new `ConfluentSchemaResolver`
pub fn new(client: Client) -> Self {
ConfluentSchemaResolver {
ConfluentSchemaCache {
writer_schemas: Cache::new(u64::MAX),
confluent_client: client,
}
Expand Down
10 changes: 5 additions & 5 deletions src/connector/src/parser/debezium/avro_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_pb::catalog::PbSchemaRegistryNameStrategy;
use risingwave_pb::plan_common::ColumnDesc;

use crate::error::ConnectorResult;
use crate::parser::avro::schema_resolver::ConfluentSchemaResolver;
use crate::parser::avro::schema_resolver::ConfluentSchemaCache;
use crate::parser::avro::util::avro_schema_to_column_descs;
use crate::parser::unified::avro::{
avro_extract_field_schema, avro_schema_skip_union, AvroAccess, AvroParseOptions,
Expand All @@ -41,7 +41,7 @@ const PAYLOAD: &str = "payload";
#[derive(Debug)]
pub struct DebeziumAvroAccessBuilder {
schema: Schema,
schema_resolver: Arc<ConfluentSchemaResolver>,
schema_resolver: Arc<ConfluentSchemaCache>,
key_schema: Option<Arc<Schema>>,
value: Option<Value>,
encoding_type: EncodingType,
Expand All @@ -59,7 +59,7 @@ impl AccessBuilder for DebeziumAvroAccessBuilder {
};
Ok(AccessImpl::Avro(AvroAccess::new(
self.value.as_mut().unwrap(),
AvroParseOptions::default().with_schema(match self.encoding_type {
AvroParseOptions::create(match self.encoding_type {
EncodingType::Key => self.key_schema.as_mut().unwrap(),
EncodingType::Value => &self.schema,
}),
Expand Down Expand Up @@ -96,7 +96,7 @@ impl DebeziumAvroAccessBuilder {
pub struct DebeziumAvroParserConfig {
pub key_schema: Arc<Schema>,
pub outer_schema: Arc<Schema>,
pub schema_resolver: Arc<ConfluentSchemaResolver>,
pub schema_resolver: Arc<ConfluentSchemaCache>,
}

impl DebeziumAvroParserConfig {
Expand All @@ -107,7 +107,7 @@ impl DebeziumAvroParserConfig {
let kafka_topic = &avro_config.topic;
let url = handle_sr_list(schema_location)?;
let client = Client::new(url, client_config)?;
let resolver = ConfluentSchemaResolver::new(client);
let resolver = ConfluentSchemaCache::new(client);

let name_strategy = &PbSchemaRegistryNameStrategy::Unspecified;
let key_subject = get_subject_by_strategy(name_strategy, kafka_topic, None, true)?;
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use jst::{convert_avro, Context};
use risingwave_common::{bail, try_match_expand};
use risingwave_pb::plan_common::ColumnDesc;

use super::avro::schema_resolver::ConfluentSchemaResolver;
use super::avro::schema_resolver::ConfluentSchemaCache;
use super::unified::Access;
use super::util::{bytes_from_url, get_kafka_topic};
use super::{EncodingProperties, JsonProperties, SchemaRegistryAuth, SpecificParserConfig};
Expand Down Expand Up @@ -161,7 +161,7 @@ pub async fn schema_to_columns(
let json_schema = if let Some(schema_registry_auth) = schema_registry_auth {
let client = Client::new(url, &schema_registry_auth)?;
let topic = get_kafka_topic(props)?;
let resolver = ConfluentSchemaResolver::new(client);
let resolver = ConfluentSchemaCache::new(client);
let content = resolver
.get_raw_schema_by_subject_name(&format!("{}-value", topic))
.await?
Expand Down
69 changes: 37 additions & 32 deletions src/connector/src/parser/unified/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,23 @@ use crate::parser::avro::util::avro_to_jsonb;
#[derive(Clone)]
/// Options for parsing an `AvroValue` into Datum, with an optional avro schema.
pub struct AvroParseOptions<'a> {
/// Currently, this schema is only used for decimal
pub schema: Option<&'a Schema>,
/// Strict Mode
/// If strict mode is disabled, an int64 can be parsed from an `AvroInt` (int32) value.
pub relax_numeric: bool,
}

impl<'a> Default for AvroParseOptions<'a> {
fn default() -> Self {
impl<'a> AvroParseOptions<'a> {
pub fn create(schema: &'a Schema) -> Self {
Self {
schema: None,
schema: Some(schema),
relax_numeric: true,
}
}
}

impl<'a> AvroParseOptions<'a> {
pub fn with_schema(mut self, schema: &'a Schema) -> Self {
self.schema = Some(schema);
self
}

fn extract_inner_schema(&self, key: Option<&'a str>) -> Option<&'a Schema> {
self.schema
.map(|schema| avro_extract_field_schema(schema, key))
Expand All @@ -71,15 +67,23 @@ impl<'a> AvroParseOptions<'a> {
}

/// Parse an avro value into expected type.
/// 3 kinds of type info are used to parsing things.
/// - `type_expected`. The type that we expect the value is.
/// - value type. The type info together with the value argument.
/// - schema. The `AvroSchema` provided in option.
/// If both `type_expected` and schema are provided, it will check both strictly.
/// If only `type_expected` is provided, it will try to match the value type and the
/// `type_expected`, converting the value if possible. If only value is provided (without
/// schema and `type_expected`), the `DateType` will be inferred.
pub fn parse<'b>(&self, value: &'b Value, type_expected: Option<&'b DataType>) -> AccessResult
///
/// 3 kinds of type info are used to parsing:
/// - `type_expected`. The type that we expect the value is.
/// - value type. The type info together with the value argument.
/// - schema. The `AvroSchema` provided in option.
///
/// Cases: (FIXME: Is this precise?)
/// - If both `type_expected` and schema are provided, it will check both strictly.
/// - If only `type_expected` is provided, it will try to match the value type and the
/// `type_expected`, converting the value if possible.
/// - If only value is provided (without schema and `type_expected`),
/// the `DataType` will be inferred.
pub fn convert_to_datum<'b>(
&self,
value: &'b Value,
type_expected: Option<&'b DataType>,
) -> AccessResult
where
'b: 'a,
{
Expand All @@ -97,7 +101,7 @@ impl<'a> AvroParseOptions<'a> {
schema,
relax_numeric: self.relax_numeric,
}
.parse(v, type_expected);
.convert_to_datum(v, type_expected);
}
// ---- Boolean -----
(Some(DataType::Boolean) | None, Value::Boolean(b)) => (*b).into(),
Expand Down Expand Up @@ -224,7 +228,7 @@ impl<'a> AvroParseOptions<'a> {
schema,
relax_numeric: self.relax_numeric,
}
.parse(value, Some(field_type))?)
.convert_to_datum(value, Some(field_type))?)
} else {
Ok(None)
}
Expand All @@ -241,7 +245,7 @@ impl<'a> AvroParseOptions<'a> {
schema,
relax_numeric: self.relax_numeric,
}
.parse(field_value, None)
.convert_to_datum(field_value, None)
})
.collect::<Result<Vec<Datum>, AccessError>>()?;
ScalarImpl::Struct(StructValue::new(rw_values))
Expand All @@ -255,7 +259,7 @@ impl<'a> AvroParseOptions<'a> {
schema,
relax_numeric: self.relax_numeric,
}
.parse(v, Some(item_type))?;
.convert_to_datum(v, Some(item_type))?;
builder.append(value);
}
builder.finish()
Expand Down Expand Up @@ -325,7 +329,7 @@ where
Err(create_error())?;
}

options.parse(value, type_expected)
options.convert_to_datum(value, type_expected)
}
}

Expand Down Expand Up @@ -484,12 +488,9 @@ mod tests {
value_schema: &Schema,
shape: &DataType,
) -> crate::error::ConnectorResult<Datum> {
AvroParseOptions {
schema: Some(value_schema),
relax_numeric: true,
}
.parse(&value, Some(shape))
.map_err(Into::into)
AvroParseOptions::create(value_schema)
.convert_to_datum(&value, Some(shape))
.map_err(Into::into)
}

#[test]
Expand Down Expand Up @@ -529,8 +530,10 @@ mod tests {
.unwrap();
let bytes = vec![0x3f, 0x3f, 0x3f, 0x3f, 0x3f, 0x3f, 0x3f];
let value = Value::Decimal(AvroDecimal::from(bytes));
let options = AvroParseOptions::default().with_schema(&schema);
let resp = options.parse(&value, Some(&DataType::Decimal)).unwrap();
let options = AvroParseOptions::create(&schema);
let resp = options
.convert_to_datum(&value, Some(&DataType::Decimal))
.unwrap();
assert_eq!(
resp,
Some(ScalarImpl::Decimal(Decimal::Normalized(
Expand Down Expand Up @@ -566,8 +569,10 @@ mod tests {
("value".to_string(), Value::Bytes(vec![0x01, 0x02, 0x03])),
]);

let options = AvroParseOptions::default().with_schema(&schema);
let resp = options.parse(&value, Some(&DataType::Decimal)).unwrap();
let options = AvroParseOptions::create(&schema);
let resp = options
.convert_to_datum(&value, Some(&DataType::Decimal))
.unwrap();
assert_eq!(resp, Some(ScalarImpl::Decimal(Decimal::from(66051))));
}
}

0 comments on commit 0c8b036

Please sign in to comment.