Skip to content

Commit

Permalink
refactor: move json schema to codec crate
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jun 20, 2024
1 parent 0815905 commit 7f5056c
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 42 deletions.
20 changes: 14 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ repository = "https://github.com/risingwavelabs/risingwave"

[workspace.dependencies]
foyer = { version = "0.9.4", features = ["nightly"] }
apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "5349b0c7b35940d117397edbd314ca9087cdb892", features = [
"snappy",
"zstandard",
"bzip",
"xz",
] }
auto_enums = { version = "0.8", features = ["futures03", "tokio1"] }
await-tree = "0.2.1"
aws-config = { version = "1", default-features = false, features = [
Expand Down
35 changes: 24 additions & 11 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,7 @@ normal = ["workspace-hack"]

[dependencies]
anyhow = "1"
apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "25113ba88234a9ae23296e981d8302c290fdaa4b", features = [
"snappy",
"zstandard",
"bzip",
"xz",
] }
apache-avro = { workspace = true }
arrow-array = { workspace = true }
arrow-row = { workspace = true }
arrow-schema = { workspace = true }
Expand Down Expand Up @@ -64,7 +59,10 @@ gcp-bigquery-client = "0.18.0"
glob = "0.3"
google-cloud-bigquery = { version = "0.9.0", features = ["auth"] }
google-cloud-gax = "0.17.0"
google-cloud-googleapis = { version = "0.13", features = ["pubsub", "bigquery"] }
google-cloud-googleapis = { version = "0.13", features = [
"pubsub",
"bigquery",
] }
google-cloud-pubsub = "0.25"
http = "0.2"
icelake = { workspace = true }
Expand All @@ -73,7 +71,6 @@ itertools = { workspace = true }
jni = { version = "0.21.1", features = ["invocation"] }
jsonbb = { workspace = true }
jsonwebtoken = "9.2.0"
jst = { package = 'jsonschema-transpiler', git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" }
maplit = "1.0.2"
moka = { version = "0.12", features = ["future"] }
mongodb = { version = "2.8.2", features = ["tokio-runtime"] }
Expand Down Expand Up @@ -109,7 +106,12 @@ rdkafka = { workspace = true, features = [
"gssapi",
"zstd",
] }
redis = { version = "0.25", features = ["aio", "tokio-comp", "async-std-comp","cluster-async"] }
redis = { version = "0.25", features = [
"aio",
"tokio-comp",
"async-std-comp",
"cluster-async",
] }
regex = "1.4"
reqwest = { version = "0.12.2", features = ["json", "stream"] }
risingwave_common = { workspace = true }
Expand All @@ -125,7 +127,11 @@ rustls-native-certs = "0.7"
rustls-pemfile = "2"
rustls-pki-types = "1"
rw_futures_util = { workspace = true }
sea-schema = { version = "0.14", features = ["default", "sqlx-postgres", "sqlx-mysql"] }
sea-schema = { version = "0.14", features = [
"default",
"sqlx-postgres",
"sqlx-mysql",
] }
serde = { version = "1", features = ["derive", "rc"] }
serde_derive = "1"
serde_json = "1"
Expand All @@ -137,7 +143,14 @@ strum_macros = "0.26"
tempfile = "3"
thiserror = "1"
thiserror-ext = { workspace = true }
tiberius = { version = "0.12", default-features = false, features = ["chrono", "time", "tds73", "rust_decimal", "bigdecimal", "rustls"] }
tiberius = { version = "0.12", default-features = false, features = [
"chrono",
"time",
"tds73",
"rust_decimal",
"bigdecimal",
"rustls",
] }
time = "0.3.30"
tokio = { version = "0.2", package = "madsim-tokio", features = [
"rt",
Expand Down
9 changes: 3 additions & 6 deletions src/connector/codec/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,20 @@ normal = ["workspace-hack"]

[dependencies]
anyhow = "1"
apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "25113ba88234a9ae23296e981d8302c290fdaa4b", features = [
"snappy",
"zstandard",
"bzip",
"xz",
] }
apache-avro = { workspace = true }
chrono = { version = "0.4", default-features = false, features = [
"clock",
"std",
] }
easy-ext = "1"
itertools = { workspace = true }
jsonbb = { workspace = true }
jst = { package = 'jsonschema-transpiler', git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" }
num-bigint = "0.4"
risingwave_common = { workspace = true }
risingwave_pb = { workspace = true }
rust_decimal = "1"
serde_json = "1.0"
thiserror = "1"
thiserror-ext = { workspace = true }
time = "0.3.30"
Expand Down
33 changes: 33 additions & 0 deletions src/connector/codec/src/decoder/json/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::Context;
use risingwave_pb::plan_common::ColumnDesc;

use super::avro::{avro_schema_to_column_descs, MapHandling};

/// FIXME: when the JSON schema is invalid, it will panic.
///
/// ## Notes on type conversion
/// Map will be used when an object doesn't have `properties` but has `additionalProperties`.
/// When an object has `properties` and `additionalProperties`, the latter will be ignored.
/// <https://github.com/mozilla/jsonschema-transpiler/blob/fb715c7147ebd52427e0aea09b2bba2d539850b1/src/jsonschema.rs#L228-L280>
///
/// TODO: examine other stuff like `oneOf`, `patternProperties`, etc.
pub fn json_schema_to_columns(json_schema: &serde_json::Value) -> anyhow::Result<Vec<ColumnDesc>> {
let avro_schema = jst::convert_avro(json_schema, jst::Context::default()).to_string();
let schema =
apache_avro::Schema::parse_str(&avro_schema).context("failed to parse avro schema")?;
avro_schema_to_column_descs(&schema, Some(MapHandling::Jsonb)).map_err(Into::into)
}
1 change: 1 addition & 0 deletions src/connector/codec/src/decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

pub mod avro;
pub mod json;
pub mod utils;

use risingwave_common::types::{DataType, Datum, DatumCow, ToOwnedDatum};
Expand Down
13 changes: 13 additions & 0 deletions src/connector/codec/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,16 @@
/// Converts JSON/AVRO/Protobuf data to RisingWave datum.
/// The core API is [`decoder::Access`].
pub mod decoder;

pub use risingwave_pb::plan_common::ColumnDesc;
pub type RisingWaveSchema = Vec<ColumnDesc>;
pub use apache_avro::schema::Schema as AvroSchema;
pub struct JsonSchema(pub serde_json::Value);
impl JsonSchema {
pub fn parse_str(schema: &str) -> anyhow::Result<Self> {
use anyhow::Context;

let value = serde_json::from_str(schema).context("failed to parse json schema")?;
Ok(Self(value))
}
}
21 changes: 2 additions & 19 deletions src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,12 @@
use std::collections::BTreeMap;

use anyhow::Context as _;
use apache_avro::Schema;
use jst::{convert_avro, Context};
use risingwave_connector_codec::decoder::avro::MapHandling;
use risingwave_connector_codec::decoder::json::json_schema_to_columns;
use risingwave_pb::plan_common::ColumnDesc;

use super::util::{bytes_from_url, get_kafka_topic};
use super::{JsonProperties, SchemaRegistryAuth};
use crate::error::ConnectorResult;
use crate::parser::avro::util::avro_schema_to_column_descs;
use crate::parser::unified::json::{JsonAccess, JsonParseOptions};
use crate::parser::unified::AccessImpl;
use crate::parser::AccessBuilder;
Expand Down Expand Up @@ -99,21 +96,7 @@ pub async fn fetch_json_schema_and_map_to_columns(
let bytes = bytes_from_url(url, None).await?;
serde_json::from_slice(&bytes)?
};
json_schema_to_columns(&json_schema)
}

/// FIXME: when the JSON schema is invalid, it will panic.
///
/// ## Notes on type conversion
/// Map will be used when an object doesn't have `properties` but has `additionalProperties`.
/// When an object has `properties` and `additionalProperties`, the latter will be ignored.
/// <https://github.com/mozilla/jsonschema-transpiler/blob/fb715c7147ebd52427e0aea09b2bba2d539850b1/src/jsonschema.rs#L228-L280>
///
/// TODO: examine other stuff like `oneOf`, `patternProperties`, etc.
fn json_schema_to_columns(json_schema: &serde_json::Value) -> ConnectorResult<Vec<ColumnDesc>> {
let avro_schema = convert_avro(json_schema, Context::default()).to_string();
let schema = Schema::parse_str(&avro_schema).context("failed to parse avro schema")?;
avro_schema_to_column_descs(&schema, Some(MapHandling::Jsonb)).map_err(Into::into)
json_schema_to_columns(&json_schema).map_err(Into::into)
}

#[cfg(test)]
Expand Down

0 comments on commit 7f5056c

Please sign in to comment.