diff --git a/e2e_test/source_inline/kafka/avro/ref.slt b/e2e_test/source_inline/kafka/avro/ref.slt index f9a09a0e88994..e3659953f6775 100644 --- a/e2e_test/source_inline/kafka/avro/ref.slt +++ b/e2e_test/source_inline/kafka/avro/ref.slt @@ -109,11 +109,102 @@ select (bar).b.y from s; ---- -3 4 5 6 NULL NULL NULL NULL +3 4 5 6 6 5 4 3 -# Parsing of column `bar` fails even with ints because now `schema` is required. -# This will be fully supported in the next PR -# 3 4 5 6 6 5 4 3 + +statement ok +drop source s; + + +system ok +curl -X DELETE "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro-ref-value" + + +system ok +curl -X DELETE "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro-ref-value?permanent=true" + + +system ok +rpk topic delete 'avro-ref' + + +system ok +rpk topic create avro-ref + + +system ok +sr_register avro-ref-value AVRO < { - /// Currently, this schema is only used for decimal. - /// - /// FIXME: In theory we should use resolved schema. - /// e.g., it's possible that a field is a reference to a decimal or a record containing a decimal field. - schema: &'a Schema, + /// The avro schema at root level + root_schema: &'a Schema, + /// The immutable "global" context during recursive parsing + inner: AvroParseOptionsInner<'a>, +} + +#[derive(Clone)] +/// Options for parsing an `AvroValue` into Datum, with names resolved from root schema. +struct AvroParseOptionsInner<'a> { + /// Mapping from type names to actual schema + refs: NamesRef<'a>, /// Strict Mode /// If strict mode is disabled, an int64 can be parsed from an `AvroInt` (int32) value. relax_numeric: bool, } impl<'a> AvroParseOptions<'a> { - pub fn create(schema: &'a Schema) -> Self { + pub fn create(root_schema: &'a Schema) -> Self { + let resolved = apache_avro::schema::ResolvedSchema::try_from(root_schema) + .expect("avro schema is self contained"); Self { - schema, - relax_numeric: true, + root_schema, + inner: AvroParseOptionsInner { + refs: resolved.get_names().clone(), + relax_numeric: true, + }, } } } -impl<'a> AvroParseOptions<'a> { +impl<'a> AvroParseOptionsInner<'a> { + fn lookup_ref(&self, schema: &'a Schema) -> &'a Schema { + match schema { + Schema::Ref { name } => self.refs[name], + _ => schema, + } + } + /// Parse an avro value into expected type. /// /// 3 kinds of type info are used to parsing: @@ -70,6 +88,7 @@ impl<'a> AvroParseOptions<'a> { /// the `DataType` will be inferred. fn convert_to_datum<'b>( &self, + unresolved_schema: &'a Schema, value: &'b Value, type_expected: &DataType, ) -> AccessResult> @@ -92,18 +111,14 @@ impl<'a> AvroParseOptions<'a> { (_, Value::Null) => return Ok(DatumCow::NULL), // ---- Union (with >=2 non null variants), and nullable Union ([null, record]) ----- (DataType::Struct(struct_type_info), Value::Union(variant, v)) => { - let Schema::Union(u) = self.schema else { + let Schema::Union(u) = self.lookup_ref(unresolved_schema) else { // XXX: Is this branch actually unreachable? (if self.schema is correctly used) return Err(create_error()); }; if let Some(inner) = get_nullable_union_inner(u) { // nullable Union ([null, record]) - return Self { - schema: inner, - relax_numeric: self.relax_numeric, - } - .convert_to_datum(v, type_expected); + return self.convert_to_datum(inner, v, type_expected); } let variant_schema = &u.variants()[*variant as usize]; @@ -120,12 +135,9 @@ impl<'a> AvroParseOptions<'a> { let mut fields = Vec::with_capacity(struct_type_info.len()); for (field_name, field_type) in struct_type_info.iter() { if field_name == expected_field_name { - let datum = Self { - schema: variant_schema, - relax_numeric: self.relax_numeric, - } - .convert_to_datum(v, field_type)? - .to_owned_datum(); + let datum = self + .convert_to_datum(variant_schema, v, field_type)? + .to_owned_datum(); fields.push(datum) } else { @@ -136,17 +148,13 @@ impl<'a> AvroParseOptions<'a> { } // nullable Union ([null, T]) (_, Value::Union(_, v)) => { - let Schema::Union(u) = self.schema else { + let Schema::Union(u) = self.lookup_ref(unresolved_schema) else { return Err(create_error()); }; let Some(schema) = get_nullable_union_inner(u) else { return Err(create_error()); }; - return Self { - schema, - relax_numeric: self.relax_numeric, - } - .convert_to_datum(v, type_expected); + return self.convert_to_datum(schema, v, type_expected); } // ---- Boolean ----- (DataType::Boolean, Value::Boolean(b)) => (*b).into(), @@ -168,7 +176,7 @@ impl<'a> AvroParseOptions<'a> { (DataType::Float64, Value::Float(i)) => (*i as f64).into(), // ---- Decimal ----- (DataType::Decimal, Value::Decimal(avro_decimal)) => { - let (precision, scale) = match self.schema { + let (precision, scale) = match self.lookup_ref(unresolved_schema) { Schema::Decimal(DecimalSchema { precision, scale, .. }) => (*precision, *scale), @@ -254,7 +262,7 @@ impl<'a> AvroParseOptions<'a> { } // ---- Struct ----- (DataType::Struct(struct_type_info), Value::Record(descs)) => StructValue::new({ - let Schema::Record(record_schema) = &self.schema else { + let Schema::Record(record_schema) = self.lookup_ref(unresolved_schema) else { return Err(create_error()); }; struct_type_info @@ -264,12 +272,9 @@ impl<'a> AvroParseOptions<'a> { if let Some(idx) = record_schema.lookup.get(field_name) { let value = &descs[*idx].1; let schema = &record_schema.fields[*idx].schema; - Ok(Self { - schema, - relax_numeric: self.relax_numeric, - } - .convert_to_datum(value, field_type)? - .to_owned_datum()) + Ok(self + .convert_to_datum(schema, value, field_type)? + .to_owned_datum()) } else { Ok(None) } @@ -279,17 +284,13 @@ impl<'a> AvroParseOptions<'a> { .into(), // ---- List ----- (DataType::List(item_type), Value::Array(array)) => ListValue::new({ - let Schema::Array(element_schema) = &self.schema else { + let Schema::Array(element_schema) = self.lookup_ref(unresolved_schema) else { return Err(create_error()); }; let schema = element_schema; let mut builder = item_type.create_array_builder(array.len()); for v in array { - let value = Self { - schema, - relax_numeric: self.relax_numeric, - } - .convert_to_datum(v, item_type)?; + let value = self.convert_to_datum(schema, v, item_type)?; builder.append(value); } builder.finish() @@ -309,7 +310,7 @@ impl<'a> AvroParseOptions<'a> { uuid.as_hyphenated().to_string().into_boxed_str().into() } (DataType::Map(map_type), Value::Map(map)) => { - let Schema::Map(value_schema) = &self.schema else { + let Schema::Map(value_schema) = self.lookup_ref(unresolved_schema) else { return Err(create_error()); }; let schema = value_schema; @@ -325,12 +326,9 @@ impl<'a> AvroParseOptions<'a> { // in tests. We might consider removing this, or make all MapValue sorted // in the future. for (k, v) in map.iter().sorted_by_key(|(k, _v)| *k) { - let value_datum = Self { - schema, - relax_numeric: self.relax_numeric, - } - .convert_to_datum(v, map_type.value())? - .to_owned_datum(); + let value_datum = self + .convert_to_datum(schema, v, map_type.value())? + .to_owned_datum(); builder.append( StructValue::new(vec![Some(k.as_str().into()), value_datum]) .to_owned_datum(), @@ -352,15 +350,18 @@ pub struct AvroAccess<'a> { } impl<'a> AvroAccess<'a> { - pub fn new(value: &'a Value, options: AvroParseOptions<'a>) -> Self { - Self { value, options } + pub fn new(root_value: &'a Value, options: AvroParseOptions<'a>) -> Self { + Self { + value: root_value, + options, + } } } impl Access for AvroAccess<'_> { fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult> { let mut value = self.value; - let mut options: AvroParseOptions<'_> = self.options.clone(); + let mut unresolved_schema = self.options.root_schema; debug_assert!( path.len() == 1 @@ -401,22 +402,24 @@ impl Access for AvroAccess<'_> { // }, // ...] value = v; - let Schema::Union(u) = options.schema else { + let Schema::Union(u) = self.options.inner.lookup_ref(unresolved_schema) else { return Err(create_error()); }; let Some(schema) = get_nullable_union_inner(u) else { return Err(create_error()); }; - options.schema = schema; + unresolved_schema = schema; continue; } Value::Record(fields) => { - let Schema::Record(record_schema) = &options.schema else { + let Schema::Record(record_schema) = + self.options.inner.lookup_ref(unresolved_schema) + else { return Err(create_error()); }; if let Some(idx) = record_schema.lookup.get(key) { value = &fields[*idx].1; - options.schema = &record_schema.fields[*idx].schema; + unresolved_schema = &record_schema.fields[*idx].schema; i += 1; continue; } @@ -426,7 +429,9 @@ impl Access for AvroAccess<'_> { Err(create_error())?; } - options.convert_to_datum(value, type_expected) + self.options + .inner + .convert_to_datum(unresolved_schema, value, type_expected) } } @@ -941,7 +946,8 @@ mod tests { shape: &DataType, ) -> anyhow::Result { Ok(AvroParseOptions::create(value_schema) - .convert_to_datum(&value, shape)? + .inner + .convert_to_datum(value_schema, &value, shape)? .to_owned_datum()) } @@ -982,11 +988,7 @@ mod tests { .unwrap(); let bytes = vec![0x3f, 0x3f, 0x3f, 0x3f, 0x3f, 0x3f, 0x3f]; let value = Value::Decimal(AvroDecimal::from(bytes)); - let options = AvroParseOptions::create(&schema); - let resp = options - .convert_to_datum(&value, &DataType::Decimal) - .unwrap() - .to_owned_datum(); + let resp = from_avro_value(value, &schema, &DataType::Decimal).unwrap(); assert_eq!( resp, Some(ScalarImpl::Decimal(Decimal::Normalized( @@ -1022,11 +1024,7 @@ mod tests { ("value".to_string(), Value::Bytes(vec![0x01, 0x02, 0x03])), ]); - let options = AvroParseOptions::create(&schema); - let resp = options - .convert_to_datum(&value, &DataType::Decimal) - .unwrap() - .to_owned_datum(); + let resp = from_avro_value(value, &schema, &DataType::Decimal).unwrap(); assert_eq!(resp, Some(ScalarImpl::Decimal(Decimal::from(66051)))); } } diff --git a/src/connector/codec/src/decoder/avro/schema.rs b/src/connector/codec/src/decoder/avro/schema.rs index d226098f63d47..c6a43decc78f0 100644 --- a/src/connector/codec/src/decoder/avro/schema.rs +++ b/src/connector/codec/src/decoder/avro/schema.rs @@ -35,18 +35,12 @@ use super::get_nullable_union_inner; pub struct ResolvedAvroSchema { /// Should be used for parsing bytes into Avro value pub original_schema: Arc, - /// Should be used for type mapping from Avro value to RisingWave datum - pub resolved_schema: Schema, } impl ResolvedAvroSchema { pub fn create(schema: Arc) -> AvroResult { - let resolver = ResolvedSchema::try_from(schema.as_ref())?; - // todo: to_resolved may cause stackoverflow if there's a loop in the schema - let resolved_schema = resolver.to_resolved(schema.as_ref())?; Ok(Self { original_schema: schema, - resolved_schema, }) } } diff --git a/src/connector/codec/tests/integration_tests/avro.rs b/src/connector/codec/tests/integration_tests/avro.rs index 2221917cc2b05..c0a0b1d49e2c2 100644 --- a/src/connector/codec/tests/integration_tests/avro.rs +++ b/src/connector/codec/tests/integration_tests/avro.rs @@ -94,7 +94,7 @@ fn check( // manually implement some logic in AvroAccessBuilder, and some in PlainParser::parse_inner let mut data_str = vec![]; for data in avro_data { - let parser = AvroParseOptions::create(&resolved_schema.resolved_schema); + let parser = AvroParseOptions::create(&resolved_schema.original_schema); match config.data_encoding { TestDataEncoding::Json => todo!(), diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index d4040a8420dea..472c416def8bd 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -49,7 +49,7 @@ impl AccessBuilder for AvroAccessBuilder { self.value = self.parse_avro_value(&payload).await?; Ok(AccessImpl::Avro(AvroAccess::new( self.value.as_ref().unwrap(), - AvroParseOptions::create(&self.schema.resolved_schema), + AvroParseOptions::create(&self.schema.original_schema), ))) } } diff --git a/src/connector/src/parser/debezium/avro_parser.rs b/src/connector/src/parser/debezium/avro_parser.rs index fd174a9dfe6e1..4058f1d331c42 100644 --- a/src/connector/src/parser/debezium/avro_parser.rs +++ b/src/connector/src/parser/debezium/avro_parser.rs @@ -57,7 +57,7 @@ impl AccessBuilder for DebeziumAvroAccessBuilder { // Assumption: Key will not contain reference, so unresolved schema can work here. AvroParseOptions::create(match self.encoding_type { EncodingType::Key => self.key_schema.as_mut().unwrap(), - EncodingType::Value => &self.schema.resolved_schema, + EncodingType::Value => &self.schema.original_schema, }), ))) }