diff --git a/Cargo.lock b/Cargo.lock index 3ccd821d4caa8..20ee05110f01e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -241,7 +241,7 @@ dependencies = [ [[package]] name = "apache-avro" version = "0.16.0" -source = "git+https://github.com/risingwavelabs/avro?rev=25113ba88234a9ae23296e981d8302c290fdaa4b#25113ba88234a9ae23296e981d8302c290fdaa4b" +source = "git+https://github.com/risingwavelabs/avro?rev=5349b0c7b35940d117397edbd314ca9087cdb892#5349b0c7b35940d117397edbd314ca9087cdb892" dependencies = [ "bzip2", "crc32fast", @@ -2991,9 +2991,9 @@ checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff" [[package]] name = "crc32c" -version = "0.6.7" +version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0227b9f93e535d49bc7ce914c066243424ce85ed90864cebd0874b184e9b6947" +checksum = "3a47af21622d091a8f0fb295b88bc886ac74efcc613efc19f5d0b21de5c89e47" dependencies = [ "rustc_version 0.4.0", ] @@ -3236,15 +3236,16 @@ dependencies = [ [[package]] name = "curve25519-dalek" -version = "4.1.3" +version = "4.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be" +checksum = "0a677b8922c94e01bdbb12126b0bc852f00447528dee1782229af9c720c3f348" dependencies = [ "cfg-if", "cpufeatures", "curve25519-dalek-derive", "digest", "fiat-crypto", + "platforms", "rustc_version 0.4.0", "subtle", "zeroize", @@ -8949,6 +8950,12 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +[[package]] +name = "platforms" +version = "3.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4503fa043bf02cee09a9582e9554b4c6403b2ef55e4612e96561d294419429f8" + [[package]] name = "plotters" version = "0.3.5" @@ -10828,7 +10835,6 @@ dependencies = [ "itertools 0.12.1", "jni", "jsonbb", - "jsonschema-transpiler", "jsonwebtoken", "madsim-rdkafka", "madsim-tokio", @@ -10915,10 +10921,12 @@ dependencies = [ "easy-ext", "itertools 0.12.1", "jsonbb", + "jsonschema-transpiler", "num-bigint", "risingwave_common", "risingwave_pb", "rust_decimal", + "serde_json", "thiserror", "thiserror-ext", "time", diff --git a/Cargo.toml b/Cargo.toml index 457e260cb3b1d..d5462dd40cd7d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,6 +77,12 @@ repository = "https://github.com/risingwavelabs/risingwave" [workspace.dependencies] foyer = { version = "0.9.4", features = ["nightly"] } +apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "5349b0c7b35940d117397edbd314ca9087cdb892", features = [ + "snappy", + "zstandard", + "bzip", + "xz", +] } auto_enums = { version = "0.8", features = ["futures03", "tokio1"] } await-tree = "0.2.1" aws-config = { version = "1", default-features = false, features = [ diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index bfc283fa6c195..029b68c9c619c 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -15,12 +15,7 @@ normal = ["workspace-hack"] [dependencies] anyhow = "1" -apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "25113ba88234a9ae23296e981d8302c290fdaa4b", features = [ - "snappy", - "zstandard", - "bzip", - "xz", -] } +apache-avro = { workspace = true } arrow-array = { workspace = true } arrow-row = { workspace = true } arrow-schema = { workspace = true } @@ -64,7 +59,10 @@ gcp-bigquery-client = "0.18.0" glob = "0.3" google-cloud-bigquery = { version = "0.9.0", features = ["auth"] } google-cloud-gax = "0.17.0" -google-cloud-googleapis = { version = "0.13", features = ["pubsub", "bigquery"] } +google-cloud-googleapis = { version = "0.13", features = [ + "pubsub", + "bigquery", +] } google-cloud-pubsub = "0.25" http = "0.2" icelake = { workspace = true } @@ -73,7 +71,6 @@ itertools = { workspace = true } jni = { version = "0.21.1", features = ["invocation"] } jsonbb = { workspace = true } jsonwebtoken = "9.2.0" -jst = { package = 'jsonschema-transpiler', git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" } maplit = "1.0.2" moka = { version = "0.12", features = ["future"] } mongodb = { version = "2.8.2", features = ["tokio-runtime"] } @@ -109,7 +106,12 @@ rdkafka = { workspace = true, features = [ "gssapi", "zstd", ] } -redis = { version = "0.25", features = ["aio", "tokio-comp", "async-std-comp","cluster-async"] } +redis = { version = "0.25", features = [ + "aio", + "tokio-comp", + "async-std-comp", + "cluster-async", +] } regex = "1.4" reqwest = { version = "0.12.2", features = ["json", "stream"] } risingwave_common = { workspace = true } @@ -125,7 +127,11 @@ rustls-native-certs = "0.7" rustls-pemfile = "2" rustls-pki-types = "1" rw_futures_util = { workspace = true } -sea-schema = { version = "0.14", features = ["default", "sqlx-postgres", "sqlx-mysql"] } +sea-schema = { version = "0.14", features = [ + "default", + "sqlx-postgres", + "sqlx-mysql", +] } serde = { version = "1", features = ["derive", "rc"] } serde_derive = "1" serde_json = "1" @@ -137,7 +143,14 @@ strum_macros = "0.26" tempfile = "3" thiserror = "1" thiserror-ext = { workspace = true } -tiberius = { version = "0.12", default-features = false, features = ["chrono", "time", "tds73", "rust_decimal", "bigdecimal", "rustls"] } +tiberius = { version = "0.12", default-features = false, features = [ + "chrono", + "time", + "tds73", + "rust_decimal", + "bigdecimal", + "rustls", +] } time = "0.3.30" tokio = { version = "0.2", package = "madsim-tokio", features = [ "rt", diff --git a/src/connector/codec/Cargo.toml b/src/connector/codec/Cargo.toml index 172aacb1c53f3..d9e6e274f0fef 100644 --- a/src/connector/codec/Cargo.toml +++ b/src/connector/codec/Cargo.toml @@ -16,12 +16,7 @@ normal = ["workspace-hack"] [dependencies] anyhow = "1" -apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "25113ba88234a9ae23296e981d8302c290fdaa4b", features = [ - "snappy", - "zstandard", - "bzip", - "xz", -] } +apache-avro = { workspace = true } chrono = { version = "0.4", default-features = false, features = [ "clock", "std", @@ -29,10 +24,12 @@ chrono = { version = "0.4", default-features = false, features = [ easy-ext = "1" itertools = { workspace = true } jsonbb = { workspace = true } +jst = { package = 'jsonschema-transpiler', git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" } num-bigint = "0.4" risingwave_common = { workspace = true } risingwave_pb = { workspace = true } rust_decimal = "1" +serde_json = "1.0" thiserror = "1" thiserror-ext = { workspace = true } time = "0.3.30" diff --git a/src/connector/codec/src/decoder/json/mod.rs b/src/connector/codec/src/decoder/json/mod.rs new file mode 100644 index 0000000000000..651ac21f70e2f --- /dev/null +++ b/src/connector/codec/src/decoder/json/mod.rs @@ -0,0 +1,33 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::Context; +use risingwave_pb::plan_common::ColumnDesc; + +use super::avro::{avro_schema_to_column_descs, MapHandling}; + +/// FIXME: when the JSON schema is invalid, it will panic. +/// +/// ## Notes on type conversion +/// Map will be used when an object doesn't have `properties` but has `additionalProperties`. +/// When an object has `properties` and `additionalProperties`, the latter will be ignored. +/// +/// +/// TODO: examine other stuff like `oneOf`, `patternProperties`, etc. +pub fn json_schema_to_columns(json_schema: &serde_json::Value) -> anyhow::Result> { + let avro_schema = jst::convert_avro(json_schema, jst::Context::default()).to_string(); + let schema = + apache_avro::Schema::parse_str(&avro_schema).context("failed to parse avro schema")?; + avro_schema_to_column_descs(&schema, Some(MapHandling::Jsonb)).map_err(Into::into) +} diff --git a/src/connector/codec/src/decoder/mod.rs b/src/connector/codec/src/decoder/mod.rs index d71186815697e..61d761410fae6 100644 --- a/src/connector/codec/src/decoder/mod.rs +++ b/src/connector/codec/src/decoder/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod avro; +pub mod json; pub mod utils; use risingwave_common::types::{DataType, Datum, DatumCow, ToOwnedDatum}; diff --git a/src/connector/codec/src/lib.rs b/src/connector/codec/src/lib.rs index 67198c78516a0..c0483087eb78a 100644 --- a/src/connector/codec/src/lib.rs +++ b/src/connector/codec/src/lib.rs @@ -42,3 +42,16 @@ /// Converts JSON/AVRO/Protobuf data to RisingWave datum. /// The core API is [`decoder::Access`]. pub mod decoder; + +pub use risingwave_pb::plan_common::ColumnDesc; +pub type RisingWaveSchema = Vec; +pub use apache_avro::schema::Schema as AvroSchema; +pub struct JsonSchema(pub serde_json::Value); +impl JsonSchema { + pub fn parse_str(schema: &str) -> anyhow::Result { + use anyhow::Context; + + let value = serde_json::from_str(schema).context("failed to parse json schema")?; + Ok(Self(value)) + } +} diff --git a/src/connector/src/parser/json_parser.rs b/src/connector/src/parser/json_parser.rs index 89f118eb1022f..081df749d3b38 100644 --- a/src/connector/src/parser/json_parser.rs +++ b/src/connector/src/parser/json_parser.rs @@ -24,15 +24,12 @@ use std::collections::BTreeMap; use anyhow::Context as _; -use apache_avro::Schema; -use jst::{convert_avro, Context}; -use risingwave_connector_codec::decoder::avro::MapHandling; +use risingwave_connector_codec::decoder::json::json_schema_to_columns; use risingwave_pb::plan_common::ColumnDesc; use super::util::{bytes_from_url, get_kafka_topic}; use super::{JsonProperties, SchemaRegistryAuth}; use crate::error::ConnectorResult; -use crate::parser::avro::util::avro_schema_to_column_descs; use crate::parser::unified::json::{JsonAccess, JsonParseOptions}; use crate::parser::unified::AccessImpl; use crate::parser::AccessBuilder; @@ -99,21 +96,7 @@ pub async fn fetch_json_schema_and_map_to_columns( let bytes = bytes_from_url(url, None).await?; serde_json::from_slice(&bytes)? }; - json_schema_to_columns(&json_schema) -} - -/// FIXME: when the JSON schema is invalid, it will panic. -/// -/// ## Notes on type conversion -/// Map will be used when an object doesn't have `properties` but has `additionalProperties`. -/// When an object has `properties` and `additionalProperties`, the latter will be ignored. -/// -/// -/// TODO: examine other stuff like `oneOf`, `patternProperties`, etc. -fn json_schema_to_columns(json_schema: &serde_json::Value) -> ConnectorResult> { - let avro_schema = convert_avro(json_schema, Context::default()).to_string(); - let schema = Schema::parse_str(&avro_schema).context("failed to parse avro schema")?; - avro_schema_to_column_descs(&schema, Some(MapHandling::Jsonb)).map_err(Into::into) + json_schema_to_columns(&json_schema).map_err(Into::into) } #[cfg(test)]