-
Notifications
You must be signed in to change notification settings - Fork 590
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(source): support Avro Union type (#17485)
Signed-off-by: xxchan <[email protected]>
- Loading branch information
Showing
10 changed files
with
1,161 additions
and
56 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
#!/usr/bin/env bash | ||
|
||
set -euo pipefail | ||
|
||
# Register a schema to schema registry | ||
# | ||
# Usage: sr_register <subject> <schema> | ||
# | ||
# https://docs.confluent.io/platform/current/schema-registry/develop/api.html#post--subjects-(string-%20subject)-versions | ||
|
||
# Validate arguments | ||
if [[ $# -ne 2 ]]; then | ||
echo "Usage: sr_register <subject> <schema>" | ||
exit 1 | ||
fi | ||
|
||
subject="$1" | ||
schema="$2" | ||
|
||
|
||
if [[ -z $subject || -z $schema ]]; then | ||
echo "Error: Arguments cannot be empty" | ||
exit 1 | ||
fi | ||
|
||
echo "$schema" | jq '{"schema": tojson}' \ | ||
| curl -X POST -H 'content-type:application/vnd.schemaregistry.v1+json' -d @- "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/${subject}/versions" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
control substitution on | ||
|
||
system ok | ||
rpk topic delete 'avro-union' || true; \ | ||
(rpk sr subject delete 'avro-union-value' && rpk sr subject delete 'avro-union-value' --permanent) || true; | ||
rpk topic create avro-union | ||
|
||
system ok | ||
sr_register avro-union-value ' | ||
{ | ||
"type": "record", | ||
"name": "Root", | ||
"fields": [ | ||
{ | ||
"name": "unionType", | ||
"type": ["int", "string"] | ||
}, | ||
{ | ||
"name": "unionTypeComplex", | ||
"type": [ | ||
"null", | ||
{"type": "record", "name": "Email","fields": [{"name":"inner","type":"string"}]}, | ||
{"type": "record", "name": "Fax","fields": [{"name":"inner","type":"int"}]}, | ||
{"type": "record", "name": "Sms","fields": [{"name":"inner","type":"int"}]} | ||
] | ||
}, | ||
{ | ||
"name": "enumField", | ||
"type": ["null", "int", { | ||
"type": "enum", | ||
"name": "myEnum", | ||
"namespace": "my.namespace", | ||
"symbols": ["A", "B", "C", "D"] | ||
}], | ||
"default": null | ||
} | ||
] | ||
} | ||
' | ||
|
||
system ok | ||
cat<<EOF | rpk topic produce avro-union --schema-id=topic | ||
{"unionType": {"int":1}, "unionTypeComplex": {"Sms": {"inner":6}}, "enumField": {"my.namespace.myEnum": "A"}} | ||
{"unionType": {"string":"2"}, "unionTypeComplex": {"Fax": {"inner":6}}} | ||
{"unionType": {"int":3}, "unionTypeComplex": {"Email": {"inner":"[email protected]"}}, "enumField": {"int":66}} | ||
EOF | ||
|
||
statement error | ||
create source avro_union | ||
WITH ( | ||
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, | ||
topic = 'avro-union' | ||
) | ||
FORMAT PLAIN ENCODE AVRO ( | ||
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}' | ||
); | ||
---- | ||
db error: ERROR: Failed to run the query | ||
|
||
Caused by these errors (recent errors listed first): | ||
1: connector error | ||
2: failed to convert Avro union to struct | ||
3: Feature is not yet implemented: Avro named type used in Union type: Record(RecordSchema { name: Name { name: "Email", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "inner", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }], lookup: {"inner": 0}, attributes: {} }) | ||
Tracking issue: https://github.com/risingwavelabs/risingwave/issues/17632 | ||
|
||
|
||
# FIXME: The following is the current buggy result. | ||
|
||
|
||
# query ? rowsort | ||
# select * from avro_union | ||
# ---- | ||
# ("([email protected])",,) | ||
# (,"(6)",) | ||
# (,"(6)",) | ||
|
||
# # Demonstrate how to access union variants (struct fields) below: | ||
# # Note that we need to use quotes. | ||
|
||
# query ? rowsort | ||
# select ("enumField")."my.namespace.myEnum" from avro_union; | ||
# ---- | ||
# A | ||
# NULL | ||
# NULL | ||
|
||
# # To output the union’s tag (i.e. case in protobuf), a case-when can be used. | ||
# query ? rowsort | ||
# select | ||
# case | ||
# when ("unionTypeComplex")."Sms" is not null then 'Sms' | ||
# when ("unionTypeComplex")."Fax" is not null then 'Fax' | ||
# when ("unionTypeComplex")."Email" is not null then 'Email' | ||
# else null -- optional | ||
# end | ||
# from avro_union; | ||
# ---- | ||
# Fax | ||
# Fax | ||
|
||
|
||
|
||
system ok | ||
rpk topic delete 'avro-union-simple' || true; \ | ||
(rpk sr subject delete 'avro-union-simple-value' && rpk sr subject delete 'avro-union-simple-value' --permanent) || true; | ||
rpk topic create avro-union-simple | ||
|
||
system ok | ||
sr_register avro-union-simple-value ' | ||
{ | ||
"type": "record", | ||
"name": "Root", | ||
"fields": [ | ||
{ | ||
"name": "unionType", | ||
"type": ["int", "string", "null", "boolean"] | ||
} | ||
] | ||
} | ||
' | ||
|
||
system ok | ||
cat<<EOF | rpk topic produce avro-union-simple --schema-id=topic | ||
{"unionType": {"int":1}} | ||
{"unionType": {"string":"2"}} | ||
{"unionType": {"boolean": true}} | ||
{"unionType": null} | ||
EOF | ||
|
||
statement ok | ||
create source avro_union | ||
WITH ( | ||
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, | ||
topic = 'avro-union-simple' | ||
) | ||
FORMAT PLAIN ENCODE AVRO ( | ||
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}' | ||
); | ||
|
||
|
||
query ? rowsort | ||
select * from avro_union | ||
---- | ||
(,,t) | ||
(,2,) | ||
(1,,) | ||
NULL | ||
|
||
# Demonstrate how to access union variants (struct fields) below: | ||
# Note that we need to use quotes. | ||
|
||
query ? rowsort | ||
select ("unionType")."string" from avro_union; | ||
---- | ||
2 | ||
NULL | ||
NULL | ||
NULL | ||
|
||
# To output the union’s tag (i.e. case in protobuf), a case-when can be used. | ||
query ? rowsort | ||
select | ||
case | ||
when ("unionType")."int" is not null then 'int' | ||
when ("unionType")."string" is not null then 'string' | ||
when ("unionType")."boolean" is not null then 'boolean' | ||
else null -- optional | ||
end | ||
from avro_union; | ||
---- | ||
NULL | ||
boolean | ||
int | ||
string |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.