Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(source): minor refactors on source parser #17184

Merged
merged 7 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ repository = "https://github.com/risingwavelabs/risingwave"

[workspace.dependencies]
foyer = { version = "0.9.4", features = ["nightly"] }
apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "25113ba88234a9ae23296e981d8302c290fdaa4b", features = [
"snappy",
"zstandard",
"bzip",
"xz",
] }
auto_enums = { version = "0.8", features = ["futures03", "tokio1"] }
await-tree = "0.2.1"
aws-config = { version = "1", default-features = false, features = [
Expand Down
14 changes: 6 additions & 8 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,7 @@ normal = ["workspace-hack"]

[dependencies]
anyhow = "1"
apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "25113ba88234a9ae23296e981d8302c290fdaa4b", features = [
"snappy",
"zstandard",
"bzip",
"xz",
] }
apache-avro = { workspace = true }
arrow-array = { workspace = true }
arrow-array-iceberg = { workspace = true }
arrow-row = { workspace = true }
Expand Down Expand Up @@ -77,7 +72,6 @@ itertools = { workspace = true }
jni = { version = "0.21.1", features = ["invocation"] }
jsonbb = { workspace = true }
jsonwebtoken = "9.2.0"
jst = { package = 'jsonschema-transpiler', git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" }
maplit = "1.0.2"
moka = { version = "0.12", features = ["future"] }
mongodb = { version = "2.8.2", features = ["tokio-runtime"] }
Expand Down Expand Up @@ -129,7 +123,11 @@ rustls-native-certs = "0.7"
rustls-pemfile = "2"
rustls-pki-types = "1"
rw_futures_util = { workspace = true }
sea-schema = { version = "0.14", features = ["default", "sqlx-postgres", "sqlx-mysql"] }
sea-schema = { version = "0.14", default-features = false, features = [
"discovery",
"sqlx-postgres",
"sqlx-mysql",
] }
Comment on lines -132 to +130
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is changed because I found cargo check -p risingwave_connector compile failed.

serde = { version = "1", features = ["derive", "rc"] }
serde_derive = "1"
serde_json = "1"
Expand Down
9 changes: 3 additions & 6 deletions src/connector/codec/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,20 @@ normal = ["workspace-hack"]

[dependencies]
anyhow = "1"
apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "25113ba88234a9ae23296e981d8302c290fdaa4b", features = [
"snappy",
"zstandard",
"bzip",
"xz",
] }
apache-avro = { workspace = true }
chrono = { version = "0.4", default-features = false, features = [
"clock",
"std",
] }
easy-ext = "1"
itertools = { workspace = true }
jsonbb = { workspace = true }
jst = { package = 'jsonschema-transpiler', git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" }
num-bigint = "0.4"
risingwave_common = { workspace = true }
risingwave_pb = { workspace = true }
rust_decimal = "1"
serde_json = "1.0"
thiserror = "1"
thiserror-ext = { workspace = true }
time = "0.3.30"
Expand Down
35 changes: 35 additions & 0 deletions src/connector/codec/src/decoder/json/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::Context;
use risingwave_pb::plan_common::ColumnDesc;

use super::avro::{avro_schema_to_column_descs, MapHandling};

impl crate::JsonSchema {
/// FIXME: when the JSON schema is invalid, it will panic.
///
/// ## Notes on type conversion
/// Map will be used when an object doesn't have `properties` but has `additionalProperties`.
/// When an object has `properties` and `additionalProperties`, the latter will be ignored.
/// <https://github.com/mozilla/jsonschema-transpiler/blob/fb715c7147ebd52427e0aea09b2bba2d539850b1/src/jsonschema.rs#L228-L280>
///
/// TODO: examine other stuff like `oneOf`, `patternProperties`, etc.
pub fn json_schema_to_columns(&self) -> anyhow::Result<Vec<ColumnDesc>> {
let avro_schema = jst::convert_avro(&self.0, jst::Context::default()).to_string();
let schema =
apache_avro::Schema::parse_str(&avro_schema).context("failed to parse avro schema")?;
avro_schema_to_column_descs(&schema, Some(MapHandling::Jsonb)).map_err(Into::into)
}
}
1 change: 1 addition & 0 deletions src/connector/codec/src/decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

pub mod avro;
pub mod json;
pub mod utils;

use risingwave_common::types::{DataType, Datum, DatumCow, ToOwnedDatum};
Expand Down
19 changes: 19 additions & 0 deletions src/connector/codec/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,22 @@
/// Converts JSON/AVRO/Protobuf data to RisingWave datum.
/// The core API is [`decoder::Access`].
pub mod decoder;

pub use apache_avro::schema::Schema as AvroSchema;
pub use risingwave_pb::plan_common::ColumnDesc;
pub struct JsonSchema(pub serde_json::Value);
impl JsonSchema {
pub fn parse_str(schema: &str) -> anyhow::Result<Self> {
use anyhow::Context;

let value = serde_json::from_str(schema).context("failed to parse json schema")?;
Ok(Self(value))
}

pub fn parse_bytes(schema: &[u8]) -> anyhow::Result<Self> {
use anyhow::Context;

let value = serde_json::from_slice(schema).context("failed to parse json schema")?;
Ok(Self(value))
}
}
6 changes: 3 additions & 3 deletions src/connector/src/parser/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

mod parser;
pub mod schema_resolver;
pub mod util;
mod schema_resolver;

pub use parser::*;
pub use parser::{AvroAccessBuilder, AvroParserConfig};
pub use schema_resolver::ConfluentSchemaCache;
7 changes: 4 additions & 3 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ use anyhow::Context;
use apache_avro::types::Value;
use apache_avro::{from_avro_datum, Reader, Schema};
use risingwave_common::{bail, try_match_expand};
use risingwave_connector_codec::decoder::avro::{
avro_schema_to_column_descs, AvroAccess, AvroParseOptions, ResolvedAvroSchema,
};
use risingwave_pb::plan_common::ColumnDesc;

use super::schema_resolver::ConfluentSchemaCache;
use super::util::{avro_schema_to_column_descs, ResolvedAvroSchema};
use super::ConfluentSchemaCache;
use crate::error::ConnectorResult;
use crate::parser::unified::avro::{AvroAccess, AvroParseOptions};
use crate::parser::unified::AccessImpl;
use crate::parser::util::bytes_from_url;
use crate::parser::{AccessBuilder, AvroProperties, EncodingProperties, EncodingType, MapHandling};
Expand Down
17 changes: 0 additions & 17 deletions src/connector/src/parser/avro/util.rs

This file was deleted.

10 changes: 5 additions & 5 deletions src/connector/src/parser/debezium/avro_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ use std::sync::Arc;
use apache_avro::types::Value;
use apache_avro::{from_avro_datum, Schema};
use risingwave_common::try_match_expand;
use risingwave_connector_codec::decoder::avro::{
avro_extract_field_schema, avro_schema_skip_union, avro_schema_to_column_descs, AvroAccess,
AvroParseOptions, ResolvedAvroSchema,
};
use risingwave_pb::catalog::PbSchemaRegistryNameStrategy;
use risingwave_pb::plan_common::ColumnDesc;

use crate::error::ConnectorResult;
use crate::parser::avro::schema_resolver::ConfluentSchemaCache;
use crate::parser::avro::util::{avro_schema_to_column_descs, ResolvedAvroSchema};
use crate::parser::unified::avro::{
avro_extract_field_schema, avro_schema_skip_union, AvroAccess, AvroParseOptions,
};
use crate::parser::avro::ConfluentSchemaCache;
use crate::parser::unified::AccessImpl;
use crate::parser::{AccessBuilder, EncodingProperties, EncodingType};
use crate::schema::schema_registry::{
Expand Down
25 changes: 4 additions & 21 deletions src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,12 @@
use std::collections::BTreeMap;

use anyhow::Context as _;
use apache_avro::Schema;
use jst::{convert_avro, Context};
use risingwave_connector_codec::decoder::avro::MapHandling;
use risingwave_connector_codec::JsonSchema;
use risingwave_pb::plan_common::ColumnDesc;

use super::util::{bytes_from_url, get_kafka_topic};
use super::{JsonProperties, SchemaRegistryAuth};
use crate::error::ConnectorResult;
use crate::parser::avro::util::avro_schema_to_column_descs;
use crate::parser::unified::json::{JsonAccess, JsonParseOptions};
use crate::parser::unified::AccessImpl;
use crate::parser::AccessBuilder;
Expand Down Expand Up @@ -93,27 +90,13 @@ pub async fn fetch_json_schema_and_map_to_columns(
let schema = client
.get_schema_by_subject(&format!("{}-value", topic))
.await?;
serde_json::from_str(&schema.content)?
JsonSchema::parse_str(&schema.content)?
} else {
let url = url.first().unwrap();
let bytes = bytes_from_url(url, None).await?;
serde_json::from_slice(&bytes)?
JsonSchema::parse_bytes(&bytes)?
};
json_schema_to_columns(&json_schema)
}

/// FIXME: when the JSON schema is invalid, it will panic.
///
/// ## Notes on type conversion
/// Map will be used when an object doesn't have `properties` but has `additionalProperties`.
/// When an object has `properties` and `additionalProperties`, the latter will be ignored.
/// <https://github.com/mozilla/jsonschema-transpiler/blob/fb715c7147ebd52427e0aea09b2bba2d539850b1/src/jsonschema.rs#L228-L280>
///
/// TODO: examine other stuff like `oneOf`, `patternProperties`, etc.
fn json_schema_to_columns(json_schema: &serde_json::Value) -> ConnectorResult<Vec<ColumnDesc>> {
let avro_schema = convert_avro(json_schema, Context::default()).to_string();
let schema = Schema::parse_str(&avro_schema).context("failed to parse avro schema")?;
avro_schema_to_column_descs(&schema, Some(MapHandling::Jsonb)).map_err(Into::into)
json_schema.json_schema_to_columns().map_err(Into::into)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: json_schema_ in method name feels redundant.

}

#[cfg(test)]
Expand Down
15 changes: 0 additions & 15 deletions src/connector/src/parser/unified/avro.rs

This file was deleted.

3 changes: 1 addition & 2 deletions src/connector/src/parser/unified/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,17 @@

use auto_impl::auto_impl;
use risingwave_common::types::{DataType, DatumCow};
use risingwave_connector_codec::decoder::avro::AvroAccess;
pub use risingwave_connector_codec::decoder::{
bail_uncategorized, uncategorized, Access, AccessError, AccessResult,
};

use self::avro::AvroAccess;
use self::bytes::BytesAccess;
use self::json::JsonAccess;
use self::protobuf::ProtobufAccess;
use crate::parser::unified::debezium::MongoJsonAccess;
use crate::source::SourceColumnDesc;

pub mod avro;
pub mod bytes;
pub mod debezium;
pub mod json;
Expand Down
Loading