From 3730c6047f82fd48f7e9509f69dc35204379358c Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 10 Jun 2024 14:38:40 +0800 Subject: [PATCH] refactor: move json schema to codec crate --- src/connector/Cargo.toml | 1 - src/connector/codec/Cargo.toml | 9 ++---- src/connector/codec/src/decoder/json/mod.rs | 33 +++++++++++++++++++++ src/connector/codec/src/decoder/mod.rs | 1 + src/connector/codec/src/lib.rs | 13 ++++++++ src/connector/src/parser/json_parser.rs | 21 ++----------- 6 files changed, 52 insertions(+), 26 deletions(-) create mode 100644 src/connector/codec/src/decoder/json/mod.rs diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 1cd0362151b83..938e7ed03d9ac 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -73,7 +73,6 @@ itertools = { workspace = true } jni = { version = "0.21.1", features = ["invocation"] } jsonbb = { workspace = true } jsonwebtoken = "9.2.0" -jst = { package = 'jsonschema-transpiler', git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" } maplit = "1.0.2" moka = { version = "0.12", features = ["future"] } mysql_async = { version = "0.34", default-features = false, features = [ diff --git a/src/connector/codec/Cargo.toml b/src/connector/codec/Cargo.toml index a7b45aaa640bc..f79fa964cf211 100644 --- a/src/connector/codec/Cargo.toml +++ b/src/connector/codec/Cargo.toml @@ -16,22 +16,19 @@ normal = ["workspace-hack"] [dependencies] anyhow = "1" -apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "5349b0c7b35940d117397edbd314ca9087cdb892", features = [ - "snappy", - "zstandard", - "bzip", - "xz", -] } +apache-avro = { workspace = true } chrono = { version = "0.4", default-features = false, features = [ "clock", "std", ] } itertools = { workspace = true } jsonbb = { workspace = true } +jst = { package = 'jsonschema-transpiler', git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" } num-bigint = "0.4" risingwave_common = { workspace = true } risingwave_pb = { workspace = true } rust_decimal = "1" +serde_json = "1.0" thiserror = "1" thiserror-ext = { workspace = true } time = "0.3.30" diff --git a/src/connector/codec/src/decoder/json/mod.rs b/src/connector/codec/src/decoder/json/mod.rs new file mode 100644 index 0000000000000..651ac21f70e2f --- /dev/null +++ b/src/connector/codec/src/decoder/json/mod.rs @@ -0,0 +1,33 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::Context; +use risingwave_pb::plan_common::ColumnDesc; + +use super::avro::{avro_schema_to_column_descs, MapHandling}; + +/// FIXME: when the JSON schema is invalid, it will panic. +/// +/// ## Notes on type conversion +/// Map will be used when an object doesn't have `properties` but has `additionalProperties`. +/// When an object has `properties` and `additionalProperties`, the latter will be ignored. +/// +/// +/// TODO: examine other stuff like `oneOf`, `patternProperties`, etc. +pub fn json_schema_to_columns(json_schema: &serde_json::Value) -> anyhow::Result> { + let avro_schema = jst::convert_avro(json_schema, jst::Context::default()).to_string(); + let schema = + apache_avro::Schema::parse_str(&avro_schema).context("failed to parse avro schema")?; + avro_schema_to_column_descs(&schema, Some(MapHandling::Jsonb)).map_err(Into::into) +} diff --git a/src/connector/codec/src/decoder/mod.rs b/src/connector/codec/src/decoder/mod.rs index b38d1db0a89db..90279fcba7a23 100644 --- a/src/connector/codec/src/decoder/mod.rs +++ b/src/connector/codec/src/decoder/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod avro; +pub mod json; pub mod utils; use risingwave_common::types::{DataType, Datum}; diff --git a/src/connector/codec/src/lib.rs b/src/connector/codec/src/lib.rs index 67198c78516a0..c0483087eb78a 100644 --- a/src/connector/codec/src/lib.rs +++ b/src/connector/codec/src/lib.rs @@ -42,3 +42,16 @@ /// Converts JSON/AVRO/Protobuf data to RisingWave datum. /// The core API is [`decoder::Access`]. pub mod decoder; + +pub use risingwave_pb::plan_common::ColumnDesc; +pub type RisingWaveSchema = Vec; +pub use apache_avro::schema::Schema as AvroSchema; +pub struct JsonSchema(pub serde_json::Value); +impl JsonSchema { + pub fn parse_str(schema: &str) -> anyhow::Result { + use anyhow::Context; + + let value = serde_json::from_str(schema).context("failed to parse json schema")?; + Ok(Self(value)) + } +} diff --git a/src/connector/src/parser/json_parser.rs b/src/connector/src/parser/json_parser.rs index cf4cee597627b..d39fd856238a6 100644 --- a/src/connector/src/parser/json_parser.rs +++ b/src/connector/src/parser/json_parser.rs @@ -24,15 +24,12 @@ use std::collections::HashMap; use anyhow::Context as _; -use apache_avro::Schema; -use jst::{convert_avro, Context}; -use risingwave_connector_codec::decoder::avro::MapHandling; +use risingwave_connector_codec::decoder::json::json_schema_to_columns; use risingwave_pb::plan_common::ColumnDesc; use super::util::{bytes_from_url, get_kafka_topic}; use super::{JsonProperties, SchemaRegistryAuth}; use crate::error::ConnectorResult; -use crate::parser::avro::util::avro_schema_to_column_descs; use crate::parser::unified::json::{JsonAccess, JsonParseOptions}; use crate::parser::unified::AccessImpl; use crate::parser::AccessBuilder; @@ -99,21 +96,7 @@ pub async fn fetch_json_schema_and_map_to_columns( let bytes = bytes_from_url(url, None).await?; serde_json::from_slice(&bytes)? }; - json_schema_to_columns(&json_schema) -} - -/// FIXME: when the JSON schema is invalid, it will panic. -/// -/// ## Notes on type conversion -/// Map will be used when an object doesn't have `properties` but has `additionalProperties`. -/// When an object has `properties` and `additionalProperties`, the latter will be ignored. -/// -/// -/// TODO: examine other stuff like `oneOf`, `patternProperties`, etc. -fn json_schema_to_columns(json_schema: &serde_json::Value) -> ConnectorResult> { - let avro_schema = convert_avro(json_schema, Context::default()).to_string(); - let schema = Schema::parse_str(&avro_schema).context("failed to parse avro schema")?; - avro_schema_to_column_descs(&schema, Some(MapHandling::Jsonb)).map_err(Into::into) + json_schema_to_columns(&json_schema).map_err(Into::into) } #[cfg(test)]