Skip to content

Commit

Permalink
feat(source): support JSON schema addtionalProperties (map) (#17110)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored Jun 13, 2024
1 parent 32a1129 commit ef6d6c6
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 19 deletions.
20 changes: 18 additions & 2 deletions e2e_test/source/basic/nosim_kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,22 @@ CREATE TABLE kafka_json_schema_plain with (
kafka.scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}');


query
describe kafka_json_schema_plain;
----
dimensions (empty) false NULL
map jsonb false NULL
notMap (empty) false NULL
price double precision false NULL
productId bigint false NULL
productName character varying false NULL
tags character varying[] false NULL
_row_id serial true NULL
primary key _row_id NULL NULL
distribution key _row_id NULL NULL
table description kafka_json_schema_plain NULL NULL

statement ok
CREATE TABLE kafka_json_schema_upsert (PRIMARY KEY(rw_key))
INCLUDE KEY AS rw_key
Expand Down Expand Up @@ -83,10 +99,10 @@ select count(*) from debezium_compact;

query TFITT
select
"dimensions", "price", "productId", "productName", "tags"
*
from kafka_json_schema_plain
----
(9.5,7,12) 12.5 1 An ice sculpture {cold,ice}
(9.5,7,12) {"foo": "bar"} (b) 12.5 1 An ice sculpture {cold,ice}

query TFITT
select
Expand Down
4 changes: 2 additions & 2 deletions scripts/source/test_data/kafka_json_schema.1
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{"$schema":"https://json-schema.org/draft/2020-12/schema","$id":"https://example.com/product.schema.json","title":"Product","description":"A product from Acme's catalog","type":"object","properties":{"productId":{"description":"The unique identifier for a product","type":"integer"},"productName":{"description":"Name of the product","type":"string"},"price":{"description":"The price of the product","type":"number","exclusiveMinimum":0},"tags":{"description":"Tags for the product","type":"array","items":{"type":"string"},"minItems":1,"uniqueItems":true},"dimensions":{"type":"object","properties":{"length":{"type":"number"},"width":{"type":"number"},"height":{"type":"number"}},"required":["length","width","height"]}},"required":["productId","productName","price"]}
{"productId":1,"productName":"An ice sculpture","price":12.5,"tags":["cold","ice"],"dimensions":{"length":7,"width":12,"height":9.5}}
{"$schema":"https://json-schema.org/draft/2020-12/schema","$id":"https://example.com/product.schema.json","title":"Product","description":"A product from Acme's catalog","type":"object","properties":{"productId":{"description":"The unique identifier for a product","type":"integer"},"productName":{"description":"Name of the product","type":"string"},"price":{"description":"The price of the product","type":"number","exclusiveMinimum":0},"tags":{"description":"Tags for the product","type":"array","items":{"type":"string"},"minItems":1,"uniqueItems":true},"dimensions":{"type":"object","properties":{"length":{"type":"number"},"width":{"type":"number"},"height":{"type":"number"}},"required":["length","width","height"]},"map":{"type":"object","additionalProperties":{"type":"string"}},"notMap":{"type":"object","additionalProperties":{"type":"string"},"properties":{"a":{"type":"string"}}}},"required":["productId","productName","price"]}
{"productId":1,"productName":"An ice sculpture","price":12.5,"tags":["cold","ice"],"dimensions":{"length":7,"width":12,"height":9.5},"map":{"foo":"bar"},"notMap":{"a":"b","ignored":"c"}}
21 changes: 16 additions & 5 deletions src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::collections::BTreeMap;
use anyhow::Context as _;
use apache_avro::Schema;
use jst::{convert_avro, Context};
use risingwave_connector_codec::decoder::avro::MapHandling;
use risingwave_pb::plan_common::ColumnDesc;

use super::util::{bytes_from_url, get_kafka_topic};
Expand Down Expand Up @@ -80,7 +81,7 @@ impl JsonAccessBuilder {
}
}

pub async fn schema_to_columns(
pub async fn fetch_json_schema_and_map_to_columns(
schema_location: &str,
schema_registry_auth: Option<SchemaRegistryAuth>,
props: &BTreeMap<String, String>,
Expand All @@ -98,11 +99,21 @@ pub async fn schema_to_columns(
let bytes = bytes_from_url(url, None).await?;
serde_json::from_slice(&bytes)?
};
let context = Context::default();
let avro_schema = convert_avro(&json_schema, context).to_string();
json_schema_to_columns(&json_schema)
}

/// FIXME: when the JSON schema is invalid, it will panic.
///
/// ## Notes on type conversion
/// Map will be used when an object doesn't have `properties` but has `additionalProperties`.
/// When an object has `properties` and `additionalProperties`, the latter will be ignored.
/// <https://github.com/mozilla/jsonschema-transpiler/blob/fb715c7147ebd52427e0aea09b2bba2d539850b1/src/jsonschema.rs#L228-L280>
///
/// TODO: examine other stuff like `oneOf`, `patternProperties`, etc.
fn json_schema_to_columns(json_schema: &serde_json::Value) -> ConnectorResult<Vec<ColumnDesc>> {
let avro_schema = convert_avro(json_schema, Context::default()).to_string();
let schema = Schema::parse_str(&avro_schema).context("failed to parse avro schema")?;
// TODO: do we need to support map type here?
avro_schema_to_column_descs(&schema, None).map_err(Into::into)
avro_schema_to_column_descs(&schema, Some(MapHandling::Jsonb)).map_err(Into::into)
}

#[cfg(test)]
Expand Down
24 changes: 14 additions & 10 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ use risingwave_connector::parser::additional_columns::{
build_additional_column_catalog, get_supported_additional_columns,
};
use risingwave_connector::parser::{
schema_to_columns, AvroParserConfig, DebeziumAvroParserConfig, ProtobufParserConfig,
SpecificParserConfig, TimestamptzHandling, DEBEZIUM_IGNORE_KEY,
fetch_json_schema_and_map_to_columns, AvroParserConfig, DebeziumAvroParserConfig,
ProtobufParserConfig, SpecificParserConfig, TimestamptzHandling, DEBEZIUM_IGNORE_KEY,
};
use risingwave_connector::schema::schema_registry::{
name_strategy_from_str, SchemaRegistryAuth, SCHEMA_REGISTRY_PASSWORD, SCHEMA_REGISTRY_USERNAME,
Expand Down Expand Up @@ -102,14 +102,18 @@ async fn extract_json_table_schema(
auth
});
Ok(Some(
schema_to_columns(&schema_location.0, schema_registry_auth, with_properties)
.await?
.into_iter()
.map(|col| ColumnCatalog {
column_desc: col.into(),
is_hidden: false,
})
.collect_vec(),
fetch_json_schema_and_map_to_columns(
&schema_location.0,
schema_registry_auth,
with_properties,
)
.await?
.into_iter()
.map(|col| ColumnCatalog {
column_desc: col.into(),
is_hidden: false,
})
.collect_vec(),
))
}
}
Expand Down

0 comments on commit ef6d6c6

Please sign in to comment.