Skip to content

Commit

Permalink
test named type
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jul 9, 2024
1 parent a305c1c commit bf445cf
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 13 deletions.
1 change: 0 additions & 1 deletion src/connector/codec/src/decoder/avro/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,6 @@ pub(super) fn avro_schema_to_struct_field_name(schema: &Schema) -> Result<String
Schema::Array(_) => "array".to_string(),
Schema::Map(_) => "map".to_string(),
// Named Complex types
// TODO: Verify is the namespace correct here
Schema::Enum(_) | Schema::Ref { name: _ } | Schema::Fixed(_) | Schema::Record(_) => {
schema.name().unwrap().fullname(None)
}
Expand Down
165 changes: 153 additions & 12 deletions src/connector/codec/tests/integration_tests/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::Context;
use apache_avro::from_avro_datum;
use risingwave_connector_codec::decoder::avro::{
avro_schema_to_column_descs, AvroAccess, AvroParseOptions, MapHandling, ResolvedAvroSchema,
};
use risingwave_connector_codec::decoder::Access;
use risingwave_connector_codec::AvroSchema;
use thiserror_ext::AsReport;

use crate::utils::*;

Expand All @@ -44,6 +46,24 @@ struct Config {
data_encoding: TestDataEncoding,
}

fn avro_schema_str_to_risingwave_schema(
avro_schema: &str,
config: &Config,
) -> anyhow::Result<(ResolvedAvroSchema, Vec<ColumnDesc>)> {
// manually implement some logic in AvroParserConfig::map_to_columns
let avro_schema = AvroSchema::parse_str(avro_schema).context("failed to parse Avro schema")?;
let resolved_schema =
ResolvedAvroSchema::create(avro_schema.into()).context("failed to resolve Avro schema")?;

let rw_schema =
avro_schema_to_column_descs(&resolved_schema.resolved_schema, config.map_handling)
.context("failed to convert Avro schema to RisingWave schema")?
.iter()
.map(ColumnDesc::from)
.collect_vec();
Ok((resolved_schema, rw_schema))
}

/// Data driven testing for converting Avro Schema to RisingWave Schema, and then converting Avro data into RisingWave data.
///
/// The expected results can be automatically updated. To run and update the tests:
Expand Down Expand Up @@ -79,17 +99,15 @@ fn check(
expected_risingwave_schema: expect_test::Expect,
expected_risingwave_data: expect_test::Expect,
) {
// manually implement some logic in AvroParserConfig::map_to_columns
let avro_schema = AvroSchema::parse_str(avro_schema).expect("failed to parse Avro schema");
let resolved_schema =
ResolvedAvroSchema::create(avro_schema.into()).expect("failed to resolve Avro schema");

let rw_schema =
avro_schema_to_column_descs(&resolved_schema.resolved_schema, config.map_handling)
.expect("failed to convert Avro schema to RisingWave schema")
.iter()
.map(ColumnDesc::from)
.collect_vec();
let (resolved_schema, rw_schema) =
match avro_schema_str_to_risingwave_schema(avro_schema, &config) {
Ok(res) => res,
Err(e) => {
expected_risingwave_schema.assert_eq(&format!("{}", e.as_report()));
expected_risingwave_data.assert_eq("");
return;
}
};
expected_risingwave_schema.assert_eq(&format!(
"{:#?}",
rw_schema.iter().map(ColumnDescTestDisplay).collect_vec()
Expand Down Expand Up @@ -557,6 +575,7 @@ fn test_1() {

#[test]
fn test_union() {
// A basic test
check(
r#"
{
Expand Down Expand Up @@ -613,7 +632,7 @@ fn test_union() {
map_handling: None,
data_encoding: TestDataEncoding::HexBinary,
},
// FIXME: why the struct type doesn't have field_descs?
// FIXME: why the struct type doesn't have field_descs? https://github.com/risingwavelabs/risingwave/issues/17128
expect![[r#"
[
unionType(#1): Struct {
Expand Down Expand Up @@ -669,6 +688,128 @@ fn test_union() {
Owned(null)"#]],
);

// logicalType is currently rejected
// https://github.com/risingwavelabs/risingwave/issues/17616
check(
r#"
{
"type": "record",
"name": "Root",
"fields": [
{
"name": "unionLogical",
"type": ["int", {"type":"int", "logicalType": "date"}]
}
]
}
"#,
&[],
Config {
map_handling: None,
data_encoding: TestDataEncoding::HexBinary,
},
expect![[r#"
failed to convert Avro schema to RisingWave schema: failed to convert Avro union to struct: Feature is not yet implemented: Avro logicalType used in Union type: Date
Tracking issue: https://github.com/risingwavelabs/risingwave/issues/17616"#]],
expect![""],
);

// test named type. Consider namespace.
// https://avro.apache.org/docs/1.11.1/specification/_print/#names
// List of things to take care:
// - Record fields and enum symbols DO NOT have namespace.
// - If the name specified contains a dot, then it is assumed to be a fullname, and any namespace also specified is IGNORED.
// - If a name doesn't have its own namespace, it will look for its most tightly enclosing named schema.
check(
r#"
{
"type": "record",
"name": "Root",
"namespace": "RootNamespace",
"fields": [
{
"name": "littleFieldToMakeNestingLooksBetter",
"type": ["null","int"], "default": null
},
{
"name": "recordField",
"type": ["null", "int", {
"type": "record",
"name": "my.name.spaced.record",
"namespace": "when.name.contains.dot.namespace.is.ignored",
"fields": [
{"name": "hello", "type": {"type": "int", "default": 1}},
{"name": "world", "type": {"type": "double", "default": 1}}
]
}],
"default": null
},
{
"name": "enumField",
"type": ["null", "int", {
"type": "enum",
"name": "myEnum",
"namespace": "my.namespace",
"symbols": ["A", "B", "C", "D"]
}],
"default": null
},
{
"name": "anotherEnumFieldUsingRootNamespace",
"type": ["null", "int", {
"type": "enum",
"name": "myEnum",
"symbols": ["A", "B", "C", "D"]
}],
"default": null
}
]
}
"#,
&[
// {
// "enumField":{"my.namespace.myEnum":"A"},
// "anotherEnumFieldUsingRootNamespace":{"RootNamespace.myEnum": "D"}
// }
"000004000406",
],
Config {
map_handling: None,
data_encoding: TestDataEncoding::HexBinary,
},
expect![[r#"
[
littleFieldToMakeNestingLooksBetter(#1): Int32,
recordField(#2): Struct {
int: Int32,
my.name.spaced.record: Struct {
hello: Int32,
world: Float64,
},
},
enumField(#3): Struct {
int: Int32,
my.namespace.myEnum: Varchar,
},
anotherEnumFieldUsingRootNamespace(#4): Struct {
int: Int32,
RootNamespace.myEnum: Varchar,
},
]"#]],
expect![[r#"
Owned(null)
Owned(null)
Owned(StructValue(
null,
Utf8("A"),
))
Owned(StructValue(
null,
Utf8("D"),
))"#]],
);

// This is provided by a user https://github.com/risingwavelabs/risingwave/issues/16273#issuecomment-2051480710
check(
r#"
{
Expand Down

0 comments on commit bf445cf

Please sign in to comment.