From 9c73c3067ec34950f2630e0a9a24dadbb6cfc3cb Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 24 Jun 2024 12:14:50 +0800 Subject: [PATCH] refactor(source): minor refactors on source parser (#17184) Signed-off-by: xxchan --- Cargo.lock | 3 +- Cargo.toml | 6 ++++ src/connector/Cargo.toml | 14 ++++---- src/connector/codec/Cargo.toml | 9 ++--- src/connector/codec/src/decoder/json/mod.rs | 35 +++++++++++++++++++ src/connector/codec/src/decoder/mod.rs | 1 + src/connector/codec/src/lib.rs | 19 ++++++++++ src/connector/src/parser/avro/mod.rs | 6 ++-- src/connector/src/parser/avro/parser.rs | 7 ++-- src/connector/src/parser/avro/util.rs | 17 --------- .../src/parser/debezium/avro_parser.rs | 10 +++--- src/connector/src/parser/json_parser.rs | 25 +++---------- src/connector/src/parser/unified/avro.rs | 15 -------- src/connector/src/parser/unified/mod.rs | 3 +- 14 files changed, 89 insertions(+), 81 deletions(-) create mode 100644 src/connector/codec/src/decoder/json/mod.rs delete mode 100644 src/connector/src/parser/avro/util.rs delete mode 100644 src/connector/src/parser/unified/avro.rs diff --git a/Cargo.lock b/Cargo.lock index 5d800c50dd951..cbbb3db4da6a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11010,7 +11010,6 @@ dependencies = [ "itertools 0.12.1", "jni", "jsonbb", - "jsonschema-transpiler", "jsonwebtoken", "madsim-rdkafka", "madsim-tokio", @@ -11098,10 +11097,12 @@ dependencies = [ "easy-ext", "itertools 0.12.1", "jsonbb", + "jsonschema-transpiler", "num-bigint", "risingwave_common", "risingwave_pb", "rust_decimal", + "serde_json", "thiserror", "thiserror-ext", "time", diff --git a/Cargo.toml b/Cargo.toml index 817f86f1cb198..249a0f2258863 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,6 +77,12 @@ repository = "https://github.com/risingwavelabs/risingwave" [workspace.dependencies] foyer = { version = "0.9.4", features = ["nightly"] } +apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "25113ba88234a9ae23296e981d8302c290fdaa4b", features = [ + "snappy", + "zstandard", + "bzip", + "xz", +] } auto_enums = { version = "0.8", features = ["futures03", "tokio1"] } await-tree = "0.2.1" aws-config = { version = "1", default-features = false, features = [ diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 933e77c6945c2..2740a3a8754a4 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -15,12 +15,7 @@ normal = ["workspace-hack"] [dependencies] anyhow = "1" -apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "25113ba88234a9ae23296e981d8302c290fdaa4b", features = [ - "snappy", - "zstandard", - "bzip", - "xz", -] } +apache-avro = { workspace = true } arrow-array = { workspace = true } arrow-array-iceberg = { workspace = true } arrow-row = { workspace = true } @@ -77,7 +72,6 @@ itertools = { workspace = true } jni = { version = "0.21.1", features = ["invocation"] } jsonbb = { workspace = true } jsonwebtoken = "9.2.0" -jst = { package = 'jsonschema-transpiler', git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" } maplit = "1.0.2" moka = { version = "0.12", features = ["future"] } mongodb = { version = "2.8.2", features = ["tokio-runtime"] } @@ -129,7 +123,11 @@ rustls-native-certs = "0.7" rustls-pemfile = "2" rustls-pki-types = "1" rw_futures_util = { workspace = true } -sea-schema = { version = "0.14", features = ["default", "sqlx-postgres", "sqlx-mysql"] } +sea-schema = { version = "0.14", default-features = false, features = [ + "discovery", + "sqlx-postgres", + "sqlx-mysql", +] } serde = { version = "1", features = ["derive", "rc"] } serde_derive = "1" serde_json = "1" diff --git a/src/connector/codec/Cargo.toml b/src/connector/codec/Cargo.toml index 172aacb1c53f3..d9e6e274f0fef 100644 --- a/src/connector/codec/Cargo.toml +++ b/src/connector/codec/Cargo.toml @@ -16,12 +16,7 @@ normal = ["workspace-hack"] [dependencies] anyhow = "1" -apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "25113ba88234a9ae23296e981d8302c290fdaa4b", features = [ - "snappy", - "zstandard", - "bzip", - "xz", -] } +apache-avro = { workspace = true } chrono = { version = "0.4", default-features = false, features = [ "clock", "std", @@ -29,10 +24,12 @@ chrono = { version = "0.4", default-features = false, features = [ easy-ext = "1" itertools = { workspace = true } jsonbb = { workspace = true } +jst = { package = 'jsonschema-transpiler', git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" } num-bigint = "0.4" risingwave_common = { workspace = true } risingwave_pb = { workspace = true } rust_decimal = "1" +serde_json = "1.0" thiserror = "1" thiserror-ext = { workspace = true } time = "0.3.30" diff --git a/src/connector/codec/src/decoder/json/mod.rs b/src/connector/codec/src/decoder/json/mod.rs new file mode 100644 index 0000000000000..b56ce3abbee25 --- /dev/null +++ b/src/connector/codec/src/decoder/json/mod.rs @@ -0,0 +1,35 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::Context; +use risingwave_pb::plan_common::ColumnDesc; + +use super::avro::{avro_schema_to_column_descs, MapHandling}; + +impl crate::JsonSchema { + /// FIXME: when the JSON schema is invalid, it will panic. + /// + /// ## Notes on type conversion + /// Map will be used when an object doesn't have `properties` but has `additionalProperties`. + /// When an object has `properties` and `additionalProperties`, the latter will be ignored. + /// + /// + /// TODO: examine other stuff like `oneOf`, `patternProperties`, etc. + pub fn json_schema_to_columns(&self) -> anyhow::Result> { + let avro_schema = jst::convert_avro(&self.0, jst::Context::default()).to_string(); + let schema = + apache_avro::Schema::parse_str(&avro_schema).context("failed to parse avro schema")?; + avro_schema_to_column_descs(&schema, Some(MapHandling::Jsonb)).map_err(Into::into) + } +} diff --git a/src/connector/codec/src/decoder/mod.rs b/src/connector/codec/src/decoder/mod.rs index d71186815697e..61d761410fae6 100644 --- a/src/connector/codec/src/decoder/mod.rs +++ b/src/connector/codec/src/decoder/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod avro; +pub mod json; pub mod utils; use risingwave_common::types::{DataType, Datum, DatumCow, ToOwnedDatum}; diff --git a/src/connector/codec/src/lib.rs b/src/connector/codec/src/lib.rs index 67198c78516a0..6b1335ab32e26 100644 --- a/src/connector/codec/src/lib.rs +++ b/src/connector/codec/src/lib.rs @@ -42,3 +42,22 @@ /// Converts JSON/AVRO/Protobuf data to RisingWave datum. /// The core API is [`decoder::Access`]. pub mod decoder; + +pub use apache_avro::schema::Schema as AvroSchema; +pub use risingwave_pb::plan_common::ColumnDesc; +pub struct JsonSchema(pub serde_json::Value); +impl JsonSchema { + pub fn parse_str(schema: &str) -> anyhow::Result { + use anyhow::Context; + + let value = serde_json::from_str(schema).context("failed to parse json schema")?; + Ok(Self(value)) + } + + pub fn parse_bytes(schema: &[u8]) -> anyhow::Result { + use anyhow::Context; + + let value = serde_json::from_slice(schema).context("failed to parse json schema")?; + Ok(Self(value)) + } +} diff --git a/src/connector/src/parser/avro/mod.rs b/src/connector/src/parser/avro/mod.rs index 1c22b326770c4..19193035bd567 100644 --- a/src/connector/src/parser/avro/mod.rs +++ b/src/connector/src/parser/avro/mod.rs @@ -13,7 +13,7 @@ // limitations under the License. mod parser; -pub mod schema_resolver; -pub mod util; +mod schema_resolver; -pub use parser::*; +pub use parser::{AvroAccessBuilder, AvroParserConfig}; +pub use schema_resolver::ConfluentSchemaCache; diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index 6d69460ee1198..f6abe86a17f0d 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -19,12 +19,13 @@ use anyhow::Context; use apache_avro::types::Value; use apache_avro::{from_avro_datum, Reader, Schema}; use risingwave_common::{bail, try_match_expand}; +use risingwave_connector_codec::decoder::avro::{ + avro_schema_to_column_descs, AvroAccess, AvroParseOptions, ResolvedAvroSchema, +}; use risingwave_pb::plan_common::ColumnDesc; -use super::schema_resolver::ConfluentSchemaCache; -use super::util::{avro_schema_to_column_descs, ResolvedAvroSchema}; +use super::ConfluentSchemaCache; use crate::error::ConnectorResult; -use crate::parser::unified::avro::{AvroAccess, AvroParseOptions}; use crate::parser::unified::AccessImpl; use crate::parser::util::bytes_from_url; use crate::parser::{AccessBuilder, AvroProperties, EncodingProperties, EncodingType, MapHandling}; diff --git a/src/connector/src/parser/avro/util.rs b/src/connector/src/parser/avro/util.rs deleted file mode 100644 index 58043daa08b0f..0000000000000 --- a/src/connector/src/parser/avro/util.rs +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub use risingwave_connector_codec::decoder::avro::{ - avro_schema_to_column_descs, ResolvedAvroSchema, -}; diff --git a/src/connector/src/parser/debezium/avro_parser.rs b/src/connector/src/parser/debezium/avro_parser.rs index 430d5072a88db..1a40b87c9d498 100644 --- a/src/connector/src/parser/debezium/avro_parser.rs +++ b/src/connector/src/parser/debezium/avro_parser.rs @@ -18,15 +18,15 @@ use std::sync::Arc; use apache_avro::types::Value; use apache_avro::{from_avro_datum, Schema}; use risingwave_common::try_match_expand; +use risingwave_connector_codec::decoder::avro::{ + avro_extract_field_schema, avro_schema_skip_union, avro_schema_to_column_descs, AvroAccess, + AvroParseOptions, ResolvedAvroSchema, +}; use risingwave_pb::catalog::PbSchemaRegistryNameStrategy; use risingwave_pb::plan_common::ColumnDesc; use crate::error::ConnectorResult; -use crate::parser::avro::schema_resolver::ConfluentSchemaCache; -use crate::parser::avro::util::{avro_schema_to_column_descs, ResolvedAvroSchema}; -use crate::parser::unified::avro::{ - avro_extract_field_schema, avro_schema_skip_union, AvroAccess, AvroParseOptions, -}; +use crate::parser::avro::ConfluentSchemaCache; use crate::parser::unified::AccessImpl; use crate::parser::{AccessBuilder, EncodingProperties, EncodingType}; use crate::schema::schema_registry::{ diff --git a/src/connector/src/parser/json_parser.rs b/src/connector/src/parser/json_parser.rs index 89f118eb1022f..4ed9af1d30dfc 100644 --- a/src/connector/src/parser/json_parser.rs +++ b/src/connector/src/parser/json_parser.rs @@ -24,15 +24,12 @@ use std::collections::BTreeMap; use anyhow::Context as _; -use apache_avro::Schema; -use jst::{convert_avro, Context}; -use risingwave_connector_codec::decoder::avro::MapHandling; +use risingwave_connector_codec::JsonSchema; use risingwave_pb::plan_common::ColumnDesc; use super::util::{bytes_from_url, get_kafka_topic}; use super::{JsonProperties, SchemaRegistryAuth}; use crate::error::ConnectorResult; -use crate::parser::avro::util::avro_schema_to_column_descs; use crate::parser::unified::json::{JsonAccess, JsonParseOptions}; use crate::parser::unified::AccessImpl; use crate::parser::AccessBuilder; @@ -93,27 +90,13 @@ pub async fn fetch_json_schema_and_map_to_columns( let schema = client .get_schema_by_subject(&format!("{}-value", topic)) .await?; - serde_json::from_str(&schema.content)? + JsonSchema::parse_str(&schema.content)? } else { let url = url.first().unwrap(); let bytes = bytes_from_url(url, None).await?; - serde_json::from_slice(&bytes)? + JsonSchema::parse_bytes(&bytes)? }; - json_schema_to_columns(&json_schema) -} - -/// FIXME: when the JSON schema is invalid, it will panic. -/// -/// ## Notes on type conversion -/// Map will be used when an object doesn't have `properties` but has `additionalProperties`. -/// When an object has `properties` and `additionalProperties`, the latter will be ignored. -/// -/// -/// TODO: examine other stuff like `oneOf`, `patternProperties`, etc. -fn json_schema_to_columns(json_schema: &serde_json::Value) -> ConnectorResult> { - let avro_schema = convert_avro(json_schema, Context::default()).to_string(); - let schema = Schema::parse_str(&avro_schema).context("failed to parse avro schema")?; - avro_schema_to_column_descs(&schema, Some(MapHandling::Jsonb)).map_err(Into::into) + json_schema.json_schema_to_columns().map_err(Into::into) } #[cfg(test)] diff --git a/src/connector/src/parser/unified/avro.rs b/src/connector/src/parser/unified/avro.rs deleted file mode 100644 index 68e95d6f78b9a..0000000000000 --- a/src/connector/src/parser/unified/avro.rs +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub use risingwave_connector_codec::decoder::avro::*; diff --git a/src/connector/src/parser/unified/mod.rs b/src/connector/src/parser/unified/mod.rs index 6c8316042026f..8045ce0132401 100644 --- a/src/connector/src/parser/unified/mod.rs +++ b/src/connector/src/parser/unified/mod.rs @@ -16,18 +16,17 @@ use auto_impl::auto_impl; use risingwave_common::types::{DataType, DatumCow}; +use risingwave_connector_codec::decoder::avro::AvroAccess; pub use risingwave_connector_codec::decoder::{ bail_uncategorized, uncategorized, Access, AccessError, AccessResult, }; -use self::avro::AvroAccess; use self::bytes::BytesAccess; use self::json::JsonAccess; use self::protobuf::ProtobufAccess; use crate::parser::unified::debezium::MongoJsonAccess; use crate::source::SourceColumnDesc; -pub mod avro; pub mod bytes; pub mod debezium; pub mod json;