diff --git a/src/connector/codec/src/decoder/avro/mod.rs b/src/connector/codec/src/decoder/avro/mod.rs index 71bb430059d87..a001531019759 100644 --- a/src/connector/codec/src/decoder/avro/mod.rs +++ b/src/connector/codec/src/decoder/avro/mod.rs @@ -108,9 +108,17 @@ impl<'a> AvroParseOptions<'a> { let v: ScalarImpl = match (type_expected, value) { (_, Value::Null) => return Ok(DatumCow::NULL), - // ---- Union ----- + // ---- Union (with >=2 non null variants) ----- (DataType::Struct(struct_type_info), Value::Union(variant, v)) => match self.schema { Some(Schema::Union(u)) => { + if let Some(inner) = get_nullable_union_inner(u) { + // nullable Union ([null, record]) + return Self { + schema: Some(inner), + relax_numeric: self.relax_numeric, + } + .convert_to_datum(v, type_expected); + } let variant_schema = &u.variants()[*variant as usize]; if matches!(variant_schema, &Schema::Null) { @@ -144,7 +152,7 @@ impl<'a> AvroParseOptions<'a> { } _ => Err(create_error())?, }, - // nullable Union + // nullable Union ([null, T]) (_, Value::Union(_, v)) => { let schema = self.extract_inner_schema(None); return Self {