Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: minor refactor on avro #17024

Merged
merged 2 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ inout = "inout" # This is a SQL keyword!
numer = "numer" # numerator
nd = "nd" # N-dimentional / 2nd
steam = "stream" # You played with Steam games too much.
ser = "ser" # Serialization
# Some weird short variable names
ot = "ot"
bui = "bui" # BackwardUserIterator
Expand Down
7 changes: 5 additions & 2 deletions scripts/source/schema_registry_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ def load_avro_json(encoded, schema):


if __name__ == '__main__':
if len(sys.argv) < 5:
print("datagen.py <brokerlist> <schema-registry-url> <file> <name-strategy> <json/avro>")
if len(sys.argv) <= 5:
print(
"usage: schema_registry_producer.py <brokerlist> <schema-registry-url> <file> <name-strategy> <json/avro>"
)
exit(1)
broker_list = sys.argv[1]
schema_registry_url = sys.argv[2]
file = sys.argv[3]
Expand Down
10 changes: 5 additions & 5 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use apache_avro::{from_avro_datum, Reader, Schema};
use risingwave_common::{bail, try_match_expand};
use risingwave_pb::plan_common::ColumnDesc;

use super::schema_resolver::ConfluentSchemaResolver;
use super::schema_resolver::ConfluentSchemaCache;
use super::util::avro_schema_to_column_descs;
use crate::error::ConnectorResult;
use crate::parser::unified::avro::{AvroAccess, AvroParseOptions};
Expand All @@ -36,7 +36,7 @@ use crate::schema::schema_registry::{
#[derive(Debug)]
pub struct AvroAccessBuilder {
schema: Arc<Schema>,
pub schema_resolver: Option<Arc<ConfluentSchemaResolver>>,
pub schema_resolver: Option<Arc<ConfluentSchemaCache>>,
value: Option<Value>,
}

Expand All @@ -45,7 +45,7 @@ impl AccessBuilder for AvroAccessBuilder {
self.value = self.parse_avro_value(&payload, Some(&*self.schema)).await?;
Ok(AccessImpl::Avro(AvroAccess::new(
self.value.as_ref().unwrap(),
AvroParseOptions::default().with_schema(&self.schema),
AvroParseOptions::create(&self.schema),
)))
}
}
Expand Down Expand Up @@ -100,7 +100,7 @@ impl AvroAccessBuilder {
pub struct AvroParserConfig {
pub schema: Arc<Schema>,
pub key_schema: Option<Arc<Schema>>,
pub schema_resolver: Option<Arc<ConfluentSchemaResolver>>,
pub schema_resolver: Option<Arc<ConfluentSchemaCache>>,

pub map_handling: Option<MapHandling>,
}
Expand All @@ -122,7 +122,7 @@ impl AvroParserConfig {
let url = handle_sr_list(schema_location.as_str())?;
if use_schema_registry {
let client = Client::new(url, &client_config)?;
let resolver = ConfluentSchemaResolver::new(client);
let resolver = ConfluentSchemaCache::new(client);

let subject_key = if enable_upsert {
Some(get_subject_by_strategy(
Expand Down
7 changes: 4 additions & 3 deletions src/connector/src/parser/avro/schema_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ use moka::future::Cache;
use crate::error::ConnectorResult;
use crate::schema::schema_registry::{Client, ConfluentSchema};

/// TODO: support protobuf
#[derive(Debug)]
pub struct ConfluentSchemaResolver {
pub struct ConfluentSchemaCache {
writer_schemas: Cache<i32, Arc<Schema>>,
confluent_client: Client,
}

impl ConfluentSchemaResolver {
impl ConfluentSchemaCache {
async fn parse_and_cache_schema(
&self,
raw_schema: ConfluentSchema,
Expand All @@ -43,7 +44,7 @@ impl ConfluentSchemaResolver {

/// Create a new `ConfluentSchemaResolver`
pub fn new(client: Client) -> Self {
ConfluentSchemaResolver {
ConfluentSchemaCache {
writer_schemas: Cache::new(u64::MAX),
confluent_client: client,
}
Expand Down
10 changes: 5 additions & 5 deletions src/connector/src/parser/debezium/avro_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_pb::catalog::PbSchemaRegistryNameStrategy;
use risingwave_pb::plan_common::ColumnDesc;

use crate::error::ConnectorResult;
use crate::parser::avro::schema_resolver::ConfluentSchemaResolver;
use crate::parser::avro::schema_resolver::ConfluentSchemaCache;
use crate::parser::avro::util::avro_schema_to_column_descs;
use crate::parser::unified::avro::{
avro_extract_field_schema, avro_schema_skip_union, AvroAccess, AvroParseOptions,
Expand All @@ -41,7 +41,7 @@ const PAYLOAD: &str = "payload";
#[derive(Debug)]
pub struct DebeziumAvroAccessBuilder {
schema: Schema,
schema_resolver: Arc<ConfluentSchemaResolver>,
schema_resolver: Arc<ConfluentSchemaCache>,
key_schema: Option<Arc<Schema>>,
value: Option<Value>,
encoding_type: EncodingType,
Expand All @@ -59,7 +59,7 @@ impl AccessBuilder for DebeziumAvroAccessBuilder {
};
Ok(AccessImpl::Avro(AvroAccess::new(
self.value.as_mut().unwrap(),
AvroParseOptions::default().with_schema(match self.encoding_type {
AvroParseOptions::create(match self.encoding_type {
EncodingType::Key => self.key_schema.as_mut().unwrap(),
EncodingType::Value => &self.schema,
}),
Expand Down Expand Up @@ -96,7 +96,7 @@ impl DebeziumAvroAccessBuilder {
pub struct DebeziumAvroParserConfig {
pub key_schema: Arc<Schema>,
pub outer_schema: Arc<Schema>,
pub schema_resolver: Arc<ConfluentSchemaResolver>,
pub schema_resolver: Arc<ConfluentSchemaCache>,
}

impl DebeziumAvroParserConfig {
Expand All @@ -107,7 +107,7 @@ impl DebeziumAvroParserConfig {
let kafka_topic = &avro_config.topic;
let url = handle_sr_list(schema_location)?;
let client = Client::new(url, client_config)?;
let resolver = ConfluentSchemaResolver::new(client);
let resolver = ConfluentSchemaCache::new(client);

let name_strategy = &PbSchemaRegistryNameStrategy::Unspecified;
let key_subject = get_subject_by_strategy(name_strategy, kafka_topic, None, true)?;
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use jst::{convert_avro, Context};
use risingwave_common::{bail, try_match_expand};
use risingwave_pb::plan_common::ColumnDesc;

use super::avro::schema_resolver::ConfluentSchemaResolver;
use super::avro::schema_resolver::ConfluentSchemaCache;
use super::unified::Access;
use super::util::{bytes_from_url, get_kafka_topic};
use super::{EncodingProperties, JsonProperties, SchemaRegistryAuth, SpecificParserConfig};
Expand Down Expand Up @@ -161,7 +161,7 @@ pub async fn schema_to_columns(
let json_schema = if let Some(schema_registry_auth) = schema_registry_auth {
let client = Client::new(url, &schema_registry_auth)?;
let topic = get_kafka_topic(props)?;
let resolver = ConfluentSchemaResolver::new(client);
let resolver = ConfluentSchemaCache::new(client);
let content = resolver
.get_raw_schema_by_subject_name(&format!("{}-value", topic))
.await?
Expand Down
69 changes: 37 additions & 32 deletions src/connector/src/parser/unified/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,23 @@ use crate::parser::avro::util::avro_to_jsonb;
#[derive(Clone)]
/// Options for parsing an `AvroValue` into Datum, with an optional avro schema.
pub struct AvroParseOptions<'a> {
/// Currently, this schema is only used for decimal
pub schema: Option<&'a Schema>,
/// Strict Mode
/// If strict mode is disabled, an int64 can be parsed from an `AvroInt` (int32) value.
pub relax_numeric: bool,
}

impl<'a> Default for AvroParseOptions<'a> {
fn default() -> Self {
impl<'a> AvroParseOptions<'a> {
pub fn create(schema: &'a Schema) -> Self {
Self {
schema: None,
schema: Some(schema),
relax_numeric: true,
}
}
}

impl<'a> AvroParseOptions<'a> {
pub fn with_schema(mut self, schema: &'a Schema) -> Self {
self.schema = Some(schema);
self
}

fn extract_inner_schema(&self, key: Option<&'a str>) -> Option<&'a Schema> {
self.schema
.map(|schema| avro_extract_field_schema(schema, key))
Expand All @@ -71,15 +67,23 @@ impl<'a> AvroParseOptions<'a> {
}

/// Parse an avro value into expected type.
/// 3 kinds of type info are used to parsing things.
/// - `type_expected`. The type that we expect the value is.
/// - value type. The type info together with the value argument.
/// - schema. The `AvroSchema` provided in option.
/// If both `type_expected` and schema are provided, it will check both strictly.
/// If only `type_expected` is provided, it will try to match the value type and the
/// `type_expected`, converting the value if possible. If only value is provided (without
/// schema and `type_expected`), the `DateType` will be inferred.
pub fn parse<'b>(&self, value: &'b Value, type_expected: Option<&'b DataType>) -> AccessResult
///
/// 3 kinds of type info are used to parsing:
/// - `type_expected`. The type that we expect the value is.
/// - value type. The type info together with the value argument.
/// - schema. The `AvroSchema` provided in option.
///
/// Cases: (FIXME: Is this precise?)
/// - If both `type_expected` and schema are provided, it will check both strictly.
/// - If only `type_expected` is provided, it will try to match the value type and the
/// `type_expected`, converting the value if possible.
/// - If only value is provided (without schema and `type_expected`),
/// the `DataType` will be inferred.
pub fn convert_to_datum<'b>(
&self,
value: &'b Value,
type_expected: Option<&'b DataType>,
) -> AccessResult
where
'b: 'a,
{
Expand All @@ -97,7 +101,7 @@ impl<'a> AvroParseOptions<'a> {
schema,
relax_numeric: self.relax_numeric,
}
.parse(v, type_expected);
.convert_to_datum(v, type_expected);
}
// ---- Boolean -----
(Some(DataType::Boolean) | None, Value::Boolean(b)) => (*b).into(),
Expand Down Expand Up @@ -224,7 +228,7 @@ impl<'a> AvroParseOptions<'a> {
schema,
relax_numeric: self.relax_numeric,
}
.parse(value, Some(field_type))?)
.convert_to_datum(value, Some(field_type))?)
} else {
Ok(None)
}
Expand All @@ -241,7 +245,7 @@ impl<'a> AvroParseOptions<'a> {
schema,
relax_numeric: self.relax_numeric,
}
.parse(field_value, None)
.convert_to_datum(field_value, None)
})
.collect::<Result<Vec<Datum>, AccessError>>()?;
ScalarImpl::Struct(StructValue::new(rw_values))
Expand All @@ -255,7 +259,7 @@ impl<'a> AvroParseOptions<'a> {
schema,
relax_numeric: self.relax_numeric,
}
.parse(v, Some(item_type))?;
.convert_to_datum(v, Some(item_type))?;
builder.append(value);
}
builder.finish()
Expand Down Expand Up @@ -325,7 +329,7 @@ where
Err(create_error())?;
}

options.parse(value, type_expected)
options.convert_to_datum(value, type_expected)
}
}

Expand Down Expand Up @@ -484,12 +488,9 @@ mod tests {
value_schema: &Schema,
shape: &DataType,
) -> crate::error::ConnectorResult<Datum> {
AvroParseOptions {
schema: Some(value_schema),
relax_numeric: true,
}
.parse(&value, Some(shape))
.map_err(Into::into)
AvroParseOptions::create(value_schema)
.convert_to_datum(&value, Some(shape))
.map_err(Into::into)
}

#[test]
Expand Down Expand Up @@ -529,8 +530,10 @@ mod tests {
.unwrap();
let bytes = vec![0x3f, 0x3f, 0x3f, 0x3f, 0x3f, 0x3f, 0x3f];
let value = Value::Decimal(AvroDecimal::from(bytes));
let options = AvroParseOptions::default().with_schema(&schema);
let resp = options.parse(&value, Some(&DataType::Decimal)).unwrap();
let options = AvroParseOptions::create(&schema);
let resp = options
.convert_to_datum(&value, Some(&DataType::Decimal))
.unwrap();
assert_eq!(
resp,
Some(ScalarImpl::Decimal(Decimal::Normalized(
Expand Down Expand Up @@ -566,8 +569,10 @@ mod tests {
("value".to_string(), Value::Bytes(vec![0x01, 0x02, 0x03])),
]);

let options = AvroParseOptions::default().with_schema(&schema);
let resp = options.parse(&value, Some(&DataType::Decimal)).unwrap();
let options = AvroParseOptions::create(&schema);
let resp = options
.convert_to_datum(&value, Some(&DataType::Decimal))
.unwrap();
assert_eq!(resp, Some(ScalarImpl::Decimal(Decimal::from(66051))));
}
}
Loading