Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(source): support Avro Union type #17485

Merged
merged 12 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions e2e_test/commands/sr_register
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/usr/bin/env bash

set -euo pipefail

# Register a schema to schema registry
#
# Usage: sr_register <subject> <schema>
#
# https://docs.confluent.io/platform/current/schema-registry/develop/api.html#post--subjects-(string-%20subject)-versions

# Validate arguments
if [[ $# -ne 2 ]]; then
echo "Usage: sr_register <subject> <schema>"
exit 1
fi

subject="$1"
schema="$2"


if [[ -z $subject || -z $schema ]]; then
echo "Error: Arguments cannot be empty"
exit 1
fi

echo "$schema" | jq '{"schema": tojson}' \
| curl -X POST -H 'content-type:application/vnd.schemaregistry.v1+json' -d @- "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/${subject}/versions"
6 changes: 2 additions & 4 deletions e2e_test/source_inline/kafka/avro/alter_source.slt
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ system ok
rpk topic create 'avro_alter_source_test'

system ok
echo '{"type":"record","name":"Root","fields":[{"name":"foo","type":"string"}]}' | jq '{"schema": tojson}' \
| curl -X POST -H 'content-type:application/json' -d @- "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro_alter_source_test-value/versions"
sr_register avro_alter_source_test '{"type":"record","name":"Root","fields":[{"name":"foo","type":"string"}]}'

statement ok
create source s
Expand All @@ -27,8 +26,7 @@ FORMAT PLAIN ENCODE AVRO (

# create a new version of schema and produce a message
system ok
echo '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","default":0},{"name":"foo","type":"string"}]}' | jq '{"schema": tojson}' \
| curl -X POST -H 'content-type:application/json' -d @- "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro_alter_source_test-value/versions"
sr_register avro_alter_source_test '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","default":0},{"name":"foo","type":"string"}]}'

system ok
echo '{"foo":"ABC", "bar":1}' | rpk topic produce --schema-id=topic avro_alter_source_test
Expand Down
100 changes: 100 additions & 0 deletions e2e_test/source_inline/kafka/avro/union.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
control substitution on

system ok
rpk topic delete 'avro-union' || true; \
(rpk sr subject delete 'avro-union-value' && rpk sr subject delete 'avro-union-value' --permanent) || true;
rpk topic create avro-union

system ok
sr_register avro-union-value '
{
"type": "record",
"name": "Root",
"fields": [
{
"name": "unionType",
"type": ["int", "string"]
},
{
"name": "unionTypeComplex",
"type": [
"null",
{"type": "record", "name": "Email","fields": [{"name":"inner","type":"string"}]},
{"type": "record", "name": "Fax","fields": [{"name":"inner","type":"int"}]},
{"type": "record", "name": "Sms","fields": [{"name":"inner","type":"int"}]}
]
},
{
"name": "enumField",
"type": ["null", "int", {
"type": "enum",
"name": "myEnum",
"namespace": "my.namespace",
"symbols": ["A", "B", "C", "D"]
}],
"default": null
}
]
}
'

system ok
cat<<EOF | rpk topic produce avro-union --schema-id=topic
{"unionType": {"int":1}, "unionTypeComplex": {"Sms": {"inner":6}}, "enumField": {"my.namespace.myEnum": "A"}}
{"unionType": {"string":"2"}, "unionTypeComplex": {"Fax": {"inner":6}}}
{"unionType": {"int":3}, "unionTypeComplex": {"Email": {"inner":"[email protected]"}}, "enumField": {"int":66}}
EOF

statement error
create source avro_union
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'avro-union'
)
FORMAT PLAIN ENCODE AVRO (
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'
);
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: connector error
2: failed to convert Avro union to struct
3: Feature is not yet implemented: Avro named type used in Union type: Record(RecordSchema { name: Name { name: "Email", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "inner", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }], lookup: {"inner": 0}, attributes: {} })
Tracking issue: https://github.com/risingwavelabs/risingwave/issues/17616


# FIXME: The following is the current buggy result.


# query ? rowsort
# select * from avro_union
# ----
# ("([email protected])",,)
# (,"(6)",)
# (,"(6)",)

# # Demonstrate how to access union variants (struct fields) below:
# # Note that we need to use quotes.
Comment on lines +77 to +78
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Demonstrate with a simpler example that is working now?


# query ? rowsort
# select ("enumField")."my.namespace.myEnum" from avro_union;
# ----
# A
# NULL
# NULL

# # To output the union’s tag (i.e. case in protobuf), a case-when can be used.
# query ? rowsort
# select
# case
# when ("unionTypeComplex")."Sms" is not null then 'Sms'
# when ("unionTypeComplex")."Fax" is not null then 'Fax'
# when ("unionTypeComplex")."Email" is not null then 'Email'
# else null -- optional
# end
# from avro_union;
# ----
# Email
# Fax
# Fax
2 changes: 2 additions & 0 deletions src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,8 @@ pub trait ScalarRef<'a>: ScalarBounds<ScalarRefImpl<'a>> + 'a + Copy {
macro_rules! scalar_impl_enum {
($( { $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty } ),*) => {
/// `ScalarImpl` embeds all possible scalars in the evaluation framework.
///
/// See `for_all_variants` for the definition.
#[derive(Debug, Clone, PartialEq, Eq, EstimateSize)]
pub enum ScalarImpl {
$( $variant_name($scalar) ),*
Expand Down
Loading
Loading