Skip to content

Commit

Permalink
refactor: move json schema to codec crate
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jun 10, 2024
1 parent 3b46db9 commit 3730c60
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 26 deletions.
1 change: 0 additions & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ itertools = { workspace = true }
jni = { version = "0.21.1", features = ["invocation"] }
jsonbb = { workspace = true }
jsonwebtoken = "9.2.0"
jst = { package = 'jsonschema-transpiler', git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" }
maplit = "1.0.2"
moka = { version = "0.12", features = ["future"] }
mysql_async = { version = "0.34", default-features = false, features = [
Expand Down
9 changes: 3 additions & 6 deletions src/connector/codec/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,19 @@ normal = ["workspace-hack"]

[dependencies]
anyhow = "1"
apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "5349b0c7b35940d117397edbd314ca9087cdb892", features = [
"snappy",
"zstandard",
"bzip",
"xz",
] }
apache-avro = { workspace = true }
chrono = { version = "0.4", default-features = false, features = [
"clock",
"std",
] }
itertools = { workspace = true }
jsonbb = { workspace = true }
jst = { package = 'jsonschema-transpiler', git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" }
num-bigint = "0.4"
risingwave_common = { workspace = true }
risingwave_pb = { workspace = true }
rust_decimal = "1"
serde_json = "1.0"
thiserror = "1"
thiserror-ext = { workspace = true }
time = "0.3.30"
Expand Down
33 changes: 33 additions & 0 deletions src/connector/codec/src/decoder/json/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::Context;
use risingwave_pb::plan_common::ColumnDesc;

use super::avro::{avro_schema_to_column_descs, MapHandling};

/// FIXME: when the JSON schema is invalid, it will panic.
///
/// ## Notes on type conversion
/// Map will be used when an object doesn't have `properties` but has `additionalProperties`.
/// When an object has `properties` and `additionalProperties`, the latter will be ignored.
/// <https://github.com/mozilla/jsonschema-transpiler/blob/fb715c7147ebd52427e0aea09b2bba2d539850b1/src/jsonschema.rs#L228-L280>
///
/// TODO: examine other stuff like `oneOf`, `patternProperties`, etc.
pub fn json_schema_to_columns(json_schema: &serde_json::Value) -> anyhow::Result<Vec<ColumnDesc>> {
let avro_schema = jst::convert_avro(json_schema, jst::Context::default()).to_string();
let schema =
apache_avro::Schema::parse_str(&avro_schema).context("failed to parse avro schema")?;
avro_schema_to_column_descs(&schema, Some(MapHandling::Jsonb)).map_err(Into::into)
}
1 change: 1 addition & 0 deletions src/connector/codec/src/decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

pub mod avro;
pub mod json;
pub mod utils;

use risingwave_common::types::{DataType, Datum};
Expand Down
13 changes: 13 additions & 0 deletions src/connector/codec/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,16 @@
/// Converts JSON/AVRO/Protobuf data to RisingWave datum.
/// The core API is [`decoder::Access`].
pub mod decoder;

pub use risingwave_pb::plan_common::ColumnDesc;
pub type RisingWaveSchema = Vec<ColumnDesc>;
pub use apache_avro::schema::Schema as AvroSchema;
pub struct JsonSchema(pub serde_json::Value);
impl JsonSchema {
pub fn parse_str(schema: &str) -> anyhow::Result<Self> {
use anyhow::Context;

let value = serde_json::from_str(schema).context("failed to parse json schema")?;
Ok(Self(value))
}
}
21 changes: 2 additions & 19 deletions src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,12 @@
use std::collections::HashMap;

use anyhow::Context as _;
use apache_avro::Schema;
use jst::{convert_avro, Context};
use risingwave_connector_codec::decoder::avro::MapHandling;
use risingwave_connector_codec::decoder::json::json_schema_to_columns;
use risingwave_pb::plan_common::ColumnDesc;

use super::util::{bytes_from_url, get_kafka_topic};
use super::{JsonProperties, SchemaRegistryAuth};
use crate::error::ConnectorResult;
use crate::parser::avro::util::avro_schema_to_column_descs;
use crate::parser::unified::json::{JsonAccess, JsonParseOptions};
use crate::parser::unified::AccessImpl;
use crate::parser::AccessBuilder;
Expand Down Expand Up @@ -99,21 +96,7 @@ pub async fn fetch_json_schema_and_map_to_columns(
let bytes = bytes_from_url(url, None).await?;
serde_json::from_slice(&bytes)?
};
json_schema_to_columns(&json_schema)
}

/// FIXME: when the JSON schema is invalid, it will panic.
///
/// ## Notes on type conversion
/// Map will be used when an object doesn't have `properties` but has `additionalProperties`.
/// When an object has `properties` and `additionalProperties`, the latter will be ignored.
/// <https://github.com/mozilla/jsonschema-transpiler/blob/fb715c7147ebd52427e0aea09b2bba2d539850b1/src/jsonschema.rs#L228-L280>
///
/// TODO: examine other stuff like `oneOf`, `patternProperties`, etc.
fn json_schema_to_columns(json_schema: &serde_json::Value) -> ConnectorResult<Vec<ColumnDesc>> {
let avro_schema = convert_avro(json_schema, Context::default()).to_string();
let schema = Schema::parse_str(&avro_schema).context("failed to parse avro schema")?;
avro_schema_to_column_descs(&schema, Some(MapHandling::Jsonb)).map_err(Into::into)
json_schema_to_columns(&json_schema).map_err(Into::into)
}

#[cfg(test)]
Expand Down

0 comments on commit 3730c60

Please sign in to comment.