Skip to content

Commit

Permalink
refactor(source): minor refactors on source parser (#17184)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <xxchan22f@gmail.com>
  • Loading branch information
xxchan authored Jun 24, 2024
1 parent 4d03c48 commit 9c73c30
Show file tree
Hide file tree
Showing 14 changed files with 89 additions and 81 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ repository = "https://github.com/risingwavelabs/risingwave"

[workspace.dependencies]
foyer = { version = "0.9.4", features = ["nightly"] }
apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "25113ba88234a9ae23296e981d8302c290fdaa4b", features = [
"snappy",
"zstandard",
"bzip",
"xz",
] }
auto_enums = { version = "0.8", features = ["futures03", "tokio1"] }
await-tree = "0.2.1"
aws-config = { version = "1", default-features = false, features = [
Expand Down
14 changes: 6 additions & 8 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,7 @@ normal = ["workspace-hack"]

[dependencies]
anyhow = "1"
apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "25113ba88234a9ae23296e981d8302c290fdaa4b", features = [
"snappy",
"zstandard",
"bzip",
"xz",
] }
apache-avro = { workspace = true }
arrow-array = { workspace = true }
arrow-array-iceberg = { workspace = true }
arrow-row = { workspace = true }
Expand Down Expand Up @@ -77,7 +72,6 @@ itertools = { workspace = true }
jni = { version = "0.21.1", features = ["invocation"] }
jsonbb = { workspace = true }
jsonwebtoken = "9.2.0"
jst = { package = 'jsonschema-transpiler', git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" }
maplit = "1.0.2"
moka = { version = "0.12", features = ["future"] }
mongodb = { version = "2.8.2", features = ["tokio-runtime"] }
Expand Down Expand Up @@ -129,7 +123,11 @@ rustls-native-certs = "0.7"
rustls-pemfile = "2"
rustls-pki-types = "1"
rw_futures_util = { workspace = true }
sea-schema = { version = "0.14", features = ["default", "sqlx-postgres", "sqlx-mysql"] }
sea-schema = { version = "0.14", default-features = false, features = [
"discovery",
"sqlx-postgres",
"sqlx-mysql",
] }
serde = { version = "1", features = ["derive", "rc"] }
serde_derive = "1"
serde_json = "1"
Expand Down
9 changes: 3 additions & 6 deletions src/connector/codec/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,20 @@ normal = ["workspace-hack"]

[dependencies]
anyhow = "1"
apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "25113ba88234a9ae23296e981d8302c290fdaa4b", features = [
"snappy",
"zstandard",
"bzip",
"xz",
] }
apache-avro = { workspace = true }
chrono = { version = "0.4", default-features = false, features = [
"clock",
"std",
] }
easy-ext = "1"
itertools = { workspace = true }
jsonbb = { workspace = true }
jst = { package = 'jsonschema-transpiler', git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" }
num-bigint = "0.4"
risingwave_common = { workspace = true }
risingwave_pb = { workspace = true }
rust_decimal = "1"
serde_json = "1.0"
thiserror = "1"
thiserror-ext = { workspace = true }
time = "0.3.30"
Expand Down
35 changes: 35 additions & 0 deletions src/connector/codec/src/decoder/json/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::Context;
use risingwave_pb::plan_common::ColumnDesc;

use super::avro::{avro_schema_to_column_descs, MapHandling};

impl crate::JsonSchema {
/// FIXME: when the JSON schema is invalid, it will panic.
///
/// ## Notes on type conversion
/// Map will be used when an object doesn't have `properties` but has `additionalProperties`.
/// When an object has `properties` and `additionalProperties`, the latter will be ignored.
/// <https://github.com/mozilla/jsonschema-transpiler/blob/fb715c7147ebd52427e0aea09b2bba2d539850b1/src/jsonschema.rs#L228-L280>
///
/// TODO: examine other stuff like `oneOf`, `patternProperties`, etc.
pub fn json_schema_to_columns(&self) -> anyhow::Result<Vec<ColumnDesc>> {
let avro_schema = jst::convert_avro(&self.0, jst::Context::default()).to_string();
let schema =
apache_avro::Schema::parse_str(&avro_schema).context("failed to parse avro schema")?;
avro_schema_to_column_descs(&schema, Some(MapHandling::Jsonb)).map_err(Into::into)
}
}
1 change: 1 addition & 0 deletions src/connector/codec/src/decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

pub mod avro;
pub mod json;
pub mod utils;

use risingwave_common::types::{DataType, Datum, DatumCow, ToOwnedDatum};
Expand Down
19 changes: 19 additions & 0 deletions src/connector/codec/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,22 @@
/// Converts JSON/AVRO/Protobuf data to RisingWave datum.
/// The core API is [`decoder::Access`].
pub mod decoder;

pub use apache_avro::schema::Schema as AvroSchema;
pub use risingwave_pb::plan_common::ColumnDesc;
pub struct JsonSchema(pub serde_json::Value);
impl JsonSchema {
pub fn parse_str(schema: &str) -> anyhow::Result<Self> {
use anyhow::Context;

let value = serde_json::from_str(schema).context("failed to parse json schema")?;
Ok(Self(value))
}

pub fn parse_bytes(schema: &[u8]) -> anyhow::Result<Self> {
use anyhow::Context;

let value = serde_json::from_slice(schema).context("failed to parse json schema")?;
Ok(Self(value))
}
}
6 changes: 3 additions & 3 deletions src/connector/src/parser/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

mod parser;
pub mod schema_resolver;
pub mod util;
mod schema_resolver;

pub use parser::*;
pub use parser::{AvroAccessBuilder, AvroParserConfig};
pub use schema_resolver::ConfluentSchemaCache;
7 changes: 4 additions & 3 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ use anyhow::Context;
use apache_avro::types::Value;
use apache_avro::{from_avro_datum, Reader, Schema};
use risingwave_common::{bail, try_match_expand};
use risingwave_connector_codec::decoder::avro::{
avro_schema_to_column_descs, AvroAccess, AvroParseOptions, ResolvedAvroSchema,
};
use risingwave_pb::plan_common::ColumnDesc;

use super::schema_resolver::ConfluentSchemaCache;
use super::util::{avro_schema_to_column_descs, ResolvedAvroSchema};
use super::ConfluentSchemaCache;
use crate::error::ConnectorResult;
use crate::parser::unified::avro::{AvroAccess, AvroParseOptions};
use crate::parser::unified::AccessImpl;
use crate::parser::util::bytes_from_url;
use crate::parser::{AccessBuilder, AvroProperties, EncodingProperties, EncodingType, MapHandling};
Expand Down
17 changes: 0 additions & 17 deletions src/connector/src/parser/avro/util.rs

This file was deleted.

10 changes: 5 additions & 5 deletions src/connector/src/parser/debezium/avro_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ use std::sync::Arc;
use apache_avro::types::Value;
use apache_avro::{from_avro_datum, Schema};
use risingwave_common::try_match_expand;
use risingwave_connector_codec::decoder::avro::{
avro_extract_field_schema, avro_schema_skip_union, avro_schema_to_column_descs, AvroAccess,
AvroParseOptions, ResolvedAvroSchema,
};
use risingwave_pb::catalog::PbSchemaRegistryNameStrategy;
use risingwave_pb::plan_common::ColumnDesc;

use crate::error::ConnectorResult;
use crate::parser::avro::schema_resolver::ConfluentSchemaCache;
use crate::parser::avro::util::{avro_schema_to_column_descs, ResolvedAvroSchema};
use crate::parser::unified::avro::{
avro_extract_field_schema, avro_schema_skip_union, AvroAccess, AvroParseOptions,
};
use crate::parser::avro::ConfluentSchemaCache;
use crate::parser::unified::AccessImpl;
use crate::parser::{AccessBuilder, EncodingProperties, EncodingType};
use crate::schema::schema_registry::{
Expand Down
25 changes: 4 additions & 21 deletions src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,12 @@
use std::collections::BTreeMap;

use anyhow::Context as _;
use apache_avro::Schema;
use jst::{convert_avro, Context};
use risingwave_connector_codec::decoder::avro::MapHandling;
use risingwave_connector_codec::JsonSchema;
use risingwave_pb::plan_common::ColumnDesc;

use super::util::{bytes_from_url, get_kafka_topic};
use super::{JsonProperties, SchemaRegistryAuth};
use crate::error::ConnectorResult;
use crate::parser::avro::util::avro_schema_to_column_descs;
use crate::parser::unified::json::{JsonAccess, JsonParseOptions};
use crate::parser::unified::AccessImpl;
use crate::parser::AccessBuilder;
Expand Down Expand Up @@ -93,27 +90,13 @@ pub async fn fetch_json_schema_and_map_to_columns(
let schema = client
.get_schema_by_subject(&format!("{}-value", topic))
.await?;
serde_json::from_str(&schema.content)?
JsonSchema::parse_str(&schema.content)?
} else {
let url = url.first().unwrap();
let bytes = bytes_from_url(url, None).await?;
serde_json::from_slice(&bytes)?
JsonSchema::parse_bytes(&bytes)?
};
json_schema_to_columns(&json_schema)
}

/// FIXME: when the JSON schema is invalid, it will panic.
///
/// ## Notes on type conversion
/// Map will be used when an object doesn't have `properties` but has `additionalProperties`.
/// When an object has `properties` and `additionalProperties`, the latter will be ignored.
/// <https://github.com/mozilla/jsonschema-transpiler/blob/fb715c7147ebd52427e0aea09b2bba2d539850b1/src/jsonschema.rs#L228-L280>
///
/// TODO: examine other stuff like `oneOf`, `patternProperties`, etc.
fn json_schema_to_columns(json_schema: &serde_json::Value) -> ConnectorResult<Vec<ColumnDesc>> {
let avro_schema = convert_avro(json_schema, Context::default()).to_string();
let schema = Schema::parse_str(&avro_schema).context("failed to parse avro schema")?;
avro_schema_to_column_descs(&schema, Some(MapHandling::Jsonb)).map_err(Into::into)
json_schema.json_schema_to_columns().map_err(Into::into)
}

#[cfg(test)]
Expand Down
15 changes: 0 additions & 15 deletions src/connector/src/parser/unified/avro.rs

This file was deleted.

3 changes: 1 addition & 2 deletions src/connector/src/parser/unified/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,17 @@
use auto_impl::auto_impl;
use risingwave_common::types::{DataType, DatumCow};
use risingwave_connector_codec::decoder::avro::AvroAccess;
pub use risingwave_connector_codec::decoder::{
bail_uncategorized, uncategorized, Access, AccessError, AccessResult,
};

use self::avro::AvroAccess;
use self::bytes::BytesAccess;
use self::json::JsonAccess;
use self::protobuf::ProtobufAccess;
use crate::parser::unified::debezium::MongoJsonAccess;
use crate::source::SourceColumnDesc;

pub mod avro;
pub mod bytes;
pub mod debezium;
pub mod json;
Expand Down

0 comments on commit 9c73c30

Please sign in to comment.