From e0680e9e968b039b5e059e96be495cdc20c5ea81 Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 10 Jun 2024 14:38:40 +0800 Subject: [PATCH 1/7] refactor: move json schema to codec crate --- Cargo.lock | 20 ++++++++---- Cargo.toml | 6 ++++ src/connector/Cargo.toml | 35 ++++++++++++++------- src/connector/codec/Cargo.toml | 9 ++---- src/connector/codec/src/decoder/json/mod.rs | 33 +++++++++++++++++++ src/connector/codec/src/decoder/mod.rs | 1 + src/connector/codec/src/lib.rs | 13 ++++++++ src/connector/src/parser/json_parser.rs | 21 ++----------- 8 files changed, 96 insertions(+), 42 deletions(-) create mode 100644 src/connector/codec/src/decoder/json/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 5d800c50dd951..99fbfdb81065f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -241,7 +241,7 @@ dependencies = [ [[package]] name = "apache-avro" version = "0.16.0" -source = "git+https://github.com/risingwavelabs/avro?rev=25113ba88234a9ae23296e981d8302c290fdaa4b#25113ba88234a9ae23296e981d8302c290fdaa4b" +source = "git+https://github.com/risingwavelabs/avro?rev=5349b0c7b35940d117397edbd314ca9087cdb892#5349b0c7b35940d117397edbd314ca9087cdb892" dependencies = [ "bzip2", "crc32fast", @@ -3151,9 +3151,9 @@ checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff" [[package]] name = "crc32c" -version = "0.6.7" +version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0227b9f93e535d49bc7ce914c066243424ce85ed90864cebd0874b184e9b6947" +checksum = "3a47af21622d091a8f0fb295b88bc886ac74efcc613efc19f5d0b21de5c89e47" dependencies = [ "rustc_version 0.4.0", ] @@ -3396,15 +3396,16 @@ dependencies = [ [[package]] name = "curve25519-dalek" -version = "4.1.3" +version = "4.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be" +checksum = "0a677b8922c94e01bdbb12126b0bc852f00447528dee1782229af9c720c3f348" dependencies = [ "cfg-if", "cpufeatures", "curve25519-dalek-derive", "digest", "fiat-crypto", + "platforms", "rustc_version 0.4.0", "subtle", "zeroize", @@ -9124,6 +9125,12 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +[[package]] +name = "platforms" +version = "3.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4503fa043bf02cee09a9582e9554b4c6403b2ef55e4612e96561d294419429f8" + [[package]] name = "plotters" version = "0.3.5" @@ -11010,7 +11017,6 @@ dependencies = [ "itertools 0.12.1", "jni", "jsonbb", - "jsonschema-transpiler", "jsonwebtoken", "madsim-rdkafka", "madsim-tokio", @@ -11098,10 +11104,12 @@ dependencies = [ "easy-ext", "itertools 0.12.1", "jsonbb", + "jsonschema-transpiler", "num-bigint", "risingwave_common", "risingwave_pb", "rust_decimal", + "serde_json", "thiserror", "thiserror-ext", "time", diff --git a/Cargo.toml b/Cargo.toml index 817f86f1cb198..d8dc12f584917 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,6 +77,12 @@ repository = "https://github.com/risingwavelabs/risingwave" [workspace.dependencies] foyer = { version = "0.9.4", features = ["nightly"] } +apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "5349b0c7b35940d117397edbd314ca9087cdb892", features = [ + "snappy", + "zstandard", + "bzip", + "xz", +] } auto_enums = { version = "0.8", features = ["futures03", "tokio1"] } await-tree = "0.2.1" aws-config = { version = "1", default-features = false, features = [ diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 933e77c6945c2..30f4ce1d93e1e 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -15,12 +15,7 @@ normal = ["workspace-hack"] [dependencies] anyhow = "1" -apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "25113ba88234a9ae23296e981d8302c290fdaa4b", features = [ - "snappy", - "zstandard", - "bzip", - "xz", -] } +apache-avro = { workspace = true } arrow-array = { workspace = true } arrow-array-iceberg = { workspace = true } arrow-row = { workspace = true } @@ -66,7 +61,10 @@ gcp-bigquery-client = "0.18.0" glob = "0.3" google-cloud-bigquery = { version = "0.9.0", features = ["auth"] } google-cloud-gax = "0.17.0" -google-cloud-googleapis = { version = "0.13", features = ["pubsub", "bigquery"] } +google-cloud-googleapis = { version = "0.13", features = [ + "pubsub", + "bigquery", +] } google-cloud-pubsub = "0.25" http = "0.2" # TODO: remove this once the opendal version is updated to 0.47 @@ -77,7 +75,6 @@ itertools = { workspace = true } jni = { version = "0.21.1", features = ["invocation"] } jsonbb = { workspace = true } jsonwebtoken = "9.2.0" -jst = { package = 'jsonschema-transpiler', git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" } maplit = "1.0.2" moka = { version = "0.12", features = ["future"] } mongodb = { version = "2.8.2", features = ["tokio-runtime"] } @@ -113,7 +110,12 @@ rdkafka = { workspace = true, features = [ "gssapi", "zstd", ] } -redis = { version = "0.25", features = ["aio", "tokio-comp", "async-std-comp","cluster-async"] } +redis = { version = "0.25", features = [ + "aio", + "tokio-comp", + "async-std-comp", + "cluster-async", +] } regex = "1.4" reqwest = { version = "0.12.2", features = ["json", "stream"] } risingwave_common = { workspace = true } @@ -129,7 +131,11 @@ rustls-native-certs = "0.7" rustls-pemfile = "2" rustls-pki-types = "1" rw_futures_util = { workspace = true } -sea-schema = { version = "0.14", features = ["default", "sqlx-postgres", "sqlx-mysql"] } +sea-schema = { version = "0.14", features = [ + "default", + "sqlx-postgres", + "sqlx-mysql", +] } serde = { version = "1", features = ["derive", "rc"] } serde_derive = "1" serde_json = "1" @@ -141,7 +147,14 @@ strum_macros = "0.26" tempfile = "3" thiserror = "1" thiserror-ext = { workspace = true } -tiberius = { version = "0.12", default-features = false, features = ["chrono", "time", "tds73", "rust_decimal", "bigdecimal", "rustls"] } +tiberius = { version = "0.12", default-features = false, features = [ + "chrono", + "time", + "tds73", + "rust_decimal", + "bigdecimal", + "rustls", +] } time = "0.3.30" tokio = { version = "0.2", package = "madsim-tokio", features = [ "rt", diff --git a/src/connector/codec/Cargo.toml b/src/connector/codec/Cargo.toml index 172aacb1c53f3..d9e6e274f0fef 100644 --- a/src/connector/codec/Cargo.toml +++ b/src/connector/codec/Cargo.toml @@ -16,12 +16,7 @@ normal = ["workspace-hack"] [dependencies] anyhow = "1" -apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "25113ba88234a9ae23296e981d8302c290fdaa4b", features = [ - "snappy", - "zstandard", - "bzip", - "xz", -] } +apache-avro = { workspace = true } chrono = { version = "0.4", default-features = false, features = [ "clock", "std", @@ -29,10 +24,12 @@ chrono = { version = "0.4", default-features = false, features = [ easy-ext = "1" itertools = { workspace = true } jsonbb = { workspace = true } +jst = { package = 'jsonschema-transpiler', git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" } num-bigint = "0.4" risingwave_common = { workspace = true } risingwave_pb = { workspace = true } rust_decimal = "1" +serde_json = "1.0" thiserror = "1" thiserror-ext = { workspace = true } time = "0.3.30" diff --git a/src/connector/codec/src/decoder/json/mod.rs b/src/connector/codec/src/decoder/json/mod.rs new file mode 100644 index 0000000000000..651ac21f70e2f --- /dev/null +++ b/src/connector/codec/src/decoder/json/mod.rs @@ -0,0 +1,33 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::Context; +use risingwave_pb::plan_common::ColumnDesc; + +use super::avro::{avro_schema_to_column_descs, MapHandling}; + +/// FIXME: when the JSON schema is invalid, it will panic. +/// +/// ## Notes on type conversion +/// Map will be used when an object doesn't have `properties` but has `additionalProperties`. +/// When an object has `properties` and `additionalProperties`, the latter will be ignored. +/// +/// +/// TODO: examine other stuff like `oneOf`, `patternProperties`, etc. +pub fn json_schema_to_columns(json_schema: &serde_json::Value) -> anyhow::Result> { + let avro_schema = jst::convert_avro(json_schema, jst::Context::default()).to_string(); + let schema = + apache_avro::Schema::parse_str(&avro_schema).context("failed to parse avro schema")?; + avro_schema_to_column_descs(&schema, Some(MapHandling::Jsonb)).map_err(Into::into) +} diff --git a/src/connector/codec/src/decoder/mod.rs b/src/connector/codec/src/decoder/mod.rs index d71186815697e..61d761410fae6 100644 --- a/src/connector/codec/src/decoder/mod.rs +++ b/src/connector/codec/src/decoder/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod avro; +pub mod json; pub mod utils; use risingwave_common::types::{DataType, Datum, DatumCow, ToOwnedDatum}; diff --git a/src/connector/codec/src/lib.rs b/src/connector/codec/src/lib.rs index 67198c78516a0..c0483087eb78a 100644 --- a/src/connector/codec/src/lib.rs +++ b/src/connector/codec/src/lib.rs @@ -42,3 +42,16 @@ /// Converts JSON/AVRO/Protobuf data to RisingWave datum. /// The core API is [`decoder::Access`]. pub mod decoder; + +pub use risingwave_pb::plan_common::ColumnDesc; +pub type RisingWaveSchema = Vec; +pub use apache_avro::schema::Schema as AvroSchema; +pub struct JsonSchema(pub serde_json::Value); +impl JsonSchema { + pub fn parse_str(schema: &str) -> anyhow::Result { + use anyhow::Context; + + let value = serde_json::from_str(schema).context("failed to parse json schema")?; + Ok(Self(value)) + } +} diff --git a/src/connector/src/parser/json_parser.rs b/src/connector/src/parser/json_parser.rs index 89f118eb1022f..081df749d3b38 100644 --- a/src/connector/src/parser/json_parser.rs +++ b/src/connector/src/parser/json_parser.rs @@ -24,15 +24,12 @@ use std::collections::BTreeMap; use anyhow::Context as _; -use apache_avro::Schema; -use jst::{convert_avro, Context}; -use risingwave_connector_codec::decoder::avro::MapHandling; +use risingwave_connector_codec::decoder::json::json_schema_to_columns; use risingwave_pb::plan_common::ColumnDesc; use super::util::{bytes_from_url, get_kafka_topic}; use super::{JsonProperties, SchemaRegistryAuth}; use crate::error::ConnectorResult; -use crate::parser::avro::util::avro_schema_to_column_descs; use crate::parser::unified::json::{JsonAccess, JsonParseOptions}; use crate::parser::unified::AccessImpl; use crate::parser::AccessBuilder; @@ -99,21 +96,7 @@ pub async fn fetch_json_schema_and_map_to_columns( let bytes = bytes_from_url(url, None).await?; serde_json::from_slice(&bytes)? }; - json_schema_to_columns(&json_schema) -} - -/// FIXME: when the JSON schema is invalid, it will panic. -/// -/// ## Notes on type conversion -/// Map will be used when an object doesn't have `properties` but has `additionalProperties`. -/// When an object has `properties` and `additionalProperties`, the latter will be ignored. -/// -/// -/// TODO: examine other stuff like `oneOf`, `patternProperties`, etc. -fn json_schema_to_columns(json_schema: &serde_json::Value) -> ConnectorResult> { - let avro_schema = convert_avro(json_schema, Context::default()).to_string(); - let schema = Schema::parse_str(&avro_schema).context("failed to parse avro schema")?; - avro_schema_to_column_descs(&schema, Some(MapHandling::Jsonb)).map_err(Into::into) + json_schema_to_columns(&json_schema).map_err(Into::into) } #[cfg(test)] From 9fe97c492d6308d3561cf01a6ec7d37e2a8ca5a6 Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 20 Jun 2024 16:50:40 +0800 Subject: [PATCH 2/7] remove dead code --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/connector/Cargo.toml | 4 +-- src/connector/src/parser/mod.rs | 62 +++------------------------------ 4 files changed, 9 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 99fbfdb81065f..94222a2ea252e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -241,7 +241,7 @@ dependencies = [ [[package]] name = "apache-avro" version = "0.16.0" -source = "git+https://github.com/risingwavelabs/avro?rev=5349b0c7b35940d117397edbd314ca9087cdb892#5349b0c7b35940d117397edbd314ca9087cdb892" +source = "git+https://github.com/risingwavelabs/avro?rev=25113ba88234a9ae23296e981d8302c290fdaa4b#25113ba88234a9ae23296e981d8302c290fdaa4b" dependencies = [ "bzip2", "crc32fast", diff --git a/Cargo.toml b/Cargo.toml index d8dc12f584917..249a0f2258863 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,7 +77,7 @@ repository = "https://github.com/risingwavelabs/risingwave" [workspace.dependencies] foyer = { version = "0.9.4", features = ["nightly"] } -apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "5349b0c7b35940d117397edbd314ca9087cdb892", features = [ +apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "25113ba88234a9ae23296e981d8302c290fdaa4b", features = [ "snappy", "zstandard", "bzip", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 30f4ce1d93e1e..41c97b0f8a64d 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -131,8 +131,8 @@ rustls-native-certs = "0.7" rustls-pemfile = "2" rustls-pki-types = "1" rw_futures_util = { workspace = true } -sea-schema = { version = "0.14", features = [ - "default", +sea-schema = { version = "0.14", default-features = false, features = [ + "discovery", "sqlx-postgres", "sqlx-mysql", ] } diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 26cf746b535dc..10858aefa92a3 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -245,11 +245,9 @@ impl<'a> MessageMeta<'a> { } trait OpAction { - type Output<'a>; + fn output_for<'a>(datum: impl Into>) -> DatumCow<'a>; - fn output_for<'a>(datum: impl Into>) -> Self::Output<'a>; - - fn apply(builder: &mut ArrayBuilderImpl, output: Self::Output<'_>); + fn apply(builder: &mut ArrayBuilderImpl, output: DatumCow<'_>); fn rollback(builder: &mut ArrayBuilderImpl); @@ -259,10 +257,8 @@ trait OpAction { struct OpActionInsert; impl OpAction for OpActionInsert { - type Output<'a> = DatumCow<'a>; - #[inline(always)] - fn output_for<'a>(datum: impl Into>) -> Self::Output<'a> { + fn output_for<'a>(datum: impl Into>) -> DatumCow<'a> { datum.into() } @@ -285,10 +281,8 @@ impl OpAction for OpActionInsert { struct OpActionDelete; impl OpAction for OpActionDelete { - type Output<'a> = DatumCow<'a>; - #[inline(always)] - fn output_for<'a>(datum: impl Into>) -> Self::Output<'a> { + fn output_for<'a>(datum: impl Into>) -> DatumCow<'a> { datum.into() } @@ -308,36 +302,6 @@ impl OpAction for OpActionDelete { } } -struct OpActionUpdate; - -impl OpAction for OpActionUpdate { - type Output<'a> = (DatumCow<'a>, DatumCow<'a>); - - #[inline(always)] - fn output_for<'a>(datum: impl Into>) -> Self::Output<'a> { - let datum = datum.into(); - (datum.clone(), datum) - } - - #[inline(always)] - fn apply(builder: &mut ArrayBuilderImpl, output: (DatumCow<'_>, DatumCow<'_>)) { - builder.append(output.0); - builder.append(output.1); - } - - #[inline(always)] - fn rollback(builder: &mut ArrayBuilderImpl) { - builder.pop().unwrap(); - builder.pop().unwrap(); - } - - #[inline(always)] - fn finish(writer: &mut SourceStreamChunkRowWriter<'_>) { - writer.append_op(Op::UpdateDelete); - writer.append_op(Op::UpdateInsert); - } -} - impl SourceStreamChunkRowWriter<'_> { fn append_op(&mut self, op: Op) { self.op_builder.push(op); @@ -346,7 +310,7 @@ impl SourceStreamChunkRowWriter<'_> { fn do_action<'a, A: OpAction>( &'a mut self, - mut f: impl FnMut(&SourceColumnDesc) -> AccessResult>, + mut f: impl FnMut(&SourceColumnDesc) -> AccessResult>, ) -> AccessResult<()> { let mut parse_field = |desc: &SourceColumnDesc| { match f(desc) { @@ -543,22 +507,6 @@ impl SourceStreamChunkRowWriter<'_> { { self.do_action::(|desc| f(desc).map(Into::into)) } - - /// Write a `Update` record to the [`StreamChunk`], with the given fallible closure that - /// produces two [`Datum`]s as old and new value by corresponding [`SourceColumnDesc`]. - /// - /// See the [struct-level documentation](SourceStreamChunkRowWriter) for more details. - #[inline(always)] - pub fn do_update<'a, D1, D2>( - &mut self, - mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<(D1, D2)>, - ) -> AccessResult<()> - where - D1: Into>, - D2: Into>, - { - self.do_action::(|desc| f(desc).map(|(old, new)| (old.into(), new.into()))) - } } /// Transaction control message. Currently only used by Debezium messages. From 52874e8554459f8281028a60c434b5524e7d6dab Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 20 Jun 2024 17:00:05 +0800 Subject: [PATCH 3/7] cleanup avro --- src/connector/src/parser/avro/mod.rs | 6 +++--- src/connector/src/parser/avro/parser.rs | 7 ++++--- src/connector/src/parser/avro/util.rs | 17 ----------------- .../src/parser/debezium/avro_parser.rs | 10 +++++----- src/connector/src/parser/unified/avro.rs | 15 --------------- src/connector/src/parser/unified/mod.rs | 3 +-- 6 files changed, 13 insertions(+), 45 deletions(-) delete mode 100644 src/connector/src/parser/avro/util.rs delete mode 100644 src/connector/src/parser/unified/avro.rs diff --git a/src/connector/src/parser/avro/mod.rs b/src/connector/src/parser/avro/mod.rs index 1c22b326770c4..19193035bd567 100644 --- a/src/connector/src/parser/avro/mod.rs +++ b/src/connector/src/parser/avro/mod.rs @@ -13,7 +13,7 @@ // limitations under the License. mod parser; -pub mod schema_resolver; -pub mod util; +mod schema_resolver; -pub use parser::*; +pub use parser::{AvroAccessBuilder, AvroParserConfig}; +pub use schema_resolver::ConfluentSchemaCache; diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index 6d69460ee1198..f6abe86a17f0d 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -19,12 +19,13 @@ use anyhow::Context; use apache_avro::types::Value; use apache_avro::{from_avro_datum, Reader, Schema}; use risingwave_common::{bail, try_match_expand}; +use risingwave_connector_codec::decoder::avro::{ + avro_schema_to_column_descs, AvroAccess, AvroParseOptions, ResolvedAvroSchema, +}; use risingwave_pb::plan_common::ColumnDesc; -use super::schema_resolver::ConfluentSchemaCache; -use super::util::{avro_schema_to_column_descs, ResolvedAvroSchema}; +use super::ConfluentSchemaCache; use crate::error::ConnectorResult; -use crate::parser::unified::avro::{AvroAccess, AvroParseOptions}; use crate::parser::unified::AccessImpl; use crate::parser::util::bytes_from_url; use crate::parser::{AccessBuilder, AvroProperties, EncodingProperties, EncodingType, MapHandling}; diff --git a/src/connector/src/parser/avro/util.rs b/src/connector/src/parser/avro/util.rs deleted file mode 100644 index 58043daa08b0f..0000000000000 --- a/src/connector/src/parser/avro/util.rs +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub use risingwave_connector_codec::decoder::avro::{ - avro_schema_to_column_descs, ResolvedAvroSchema, -}; diff --git a/src/connector/src/parser/debezium/avro_parser.rs b/src/connector/src/parser/debezium/avro_parser.rs index 430d5072a88db..1a40b87c9d498 100644 --- a/src/connector/src/parser/debezium/avro_parser.rs +++ b/src/connector/src/parser/debezium/avro_parser.rs @@ -18,15 +18,15 @@ use std::sync::Arc; use apache_avro::types::Value; use apache_avro::{from_avro_datum, Schema}; use risingwave_common::try_match_expand; +use risingwave_connector_codec::decoder::avro::{ + avro_extract_field_schema, avro_schema_skip_union, avro_schema_to_column_descs, AvroAccess, + AvroParseOptions, ResolvedAvroSchema, +}; use risingwave_pb::catalog::PbSchemaRegistryNameStrategy; use risingwave_pb::plan_common::ColumnDesc; use crate::error::ConnectorResult; -use crate::parser::avro::schema_resolver::ConfluentSchemaCache; -use crate::parser::avro::util::{avro_schema_to_column_descs, ResolvedAvroSchema}; -use crate::parser::unified::avro::{ - avro_extract_field_schema, avro_schema_skip_union, AvroAccess, AvroParseOptions, -}; +use crate::parser::avro::ConfluentSchemaCache; use crate::parser::unified::AccessImpl; use crate::parser::{AccessBuilder, EncodingProperties, EncodingType}; use crate::schema::schema_registry::{ diff --git a/src/connector/src/parser/unified/avro.rs b/src/connector/src/parser/unified/avro.rs deleted file mode 100644 index 68e95d6f78b9a..0000000000000 --- a/src/connector/src/parser/unified/avro.rs +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub use risingwave_connector_codec::decoder::avro::*; diff --git a/src/connector/src/parser/unified/mod.rs b/src/connector/src/parser/unified/mod.rs index 6c8316042026f..8045ce0132401 100644 --- a/src/connector/src/parser/unified/mod.rs +++ b/src/connector/src/parser/unified/mod.rs @@ -16,18 +16,17 @@ use auto_impl::auto_impl; use risingwave_common::types::{DataType, DatumCow}; +use risingwave_connector_codec::decoder::avro::AvroAccess; pub use risingwave_connector_codec::decoder::{ bail_uncategorized, uncategorized, Access, AccessError, AccessResult, }; -use self::avro::AvroAccess; use self::bytes::BytesAccess; use self::json::JsonAccess; use self::protobuf::ProtobufAccess; use crate::parser::unified::debezium::MongoJsonAccess; use crate::source::SourceColumnDesc; -pub mod avro; pub mod bytes; pub mod debezium; pub mod json; From d1d66fa40d1963854e4319212e43c219584cb0de Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 20 Jun 2024 17:28:54 +0800 Subject: [PATCH 4/7] revert toml fmt change Signed-off-by: xxchan --- src/connector/Cargo.toml | 21 +++------------------ 1 file changed, 3 insertions(+), 18 deletions(-) diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 41c97b0f8a64d..2740a3a8754a4 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -61,10 +61,7 @@ gcp-bigquery-client = "0.18.0" glob = "0.3" google-cloud-bigquery = { version = "0.9.0", features = ["auth"] } google-cloud-gax = "0.17.0" -google-cloud-googleapis = { version = "0.13", features = [ - "pubsub", - "bigquery", -] } +google-cloud-googleapis = { version = "0.13", features = ["pubsub", "bigquery"] } google-cloud-pubsub = "0.25" http = "0.2" # TODO: remove this once the opendal version is updated to 0.47 @@ -110,12 +107,7 @@ rdkafka = { workspace = true, features = [ "gssapi", "zstd", ] } -redis = { version = "0.25", features = [ - "aio", - "tokio-comp", - "async-std-comp", - "cluster-async", -] } +redis = { version = "0.25", features = ["aio", "tokio-comp", "async-std-comp","cluster-async"] } regex = "1.4" reqwest = { version = "0.12.2", features = ["json", "stream"] } risingwave_common = { workspace = true } @@ -147,14 +139,7 @@ strum_macros = "0.26" tempfile = "3" thiserror = "1" thiserror-ext = { workspace = true } -tiberius = { version = "0.12", default-features = false, features = [ - "chrono", - "time", - "tds73", - "rust_decimal", - "bigdecimal", - "rustls", -] } +tiberius = { version = "0.12", default-features = false, features = ["chrono", "time", "tds73", "rust_decimal", "bigdecimal", "rustls"] } time = "0.3.30" tokio = { version = "0.2", package = "madsim-tokio", features = [ "rt", From 86b47d129c2d1a7ee2c19df11d15bbe07318396a Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 21 Jun 2024 11:56:43 +0800 Subject: [PATCH 5/7] Revert "remove dead code" Signed-off-by: xxchan --- src/connector/src/parser/mod.rs | 62 ++++++++++++++++++++++++++++++--- 1 file changed, 57 insertions(+), 5 deletions(-) diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 10858aefa92a3..26cf746b535dc 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -245,9 +245,11 @@ impl<'a> MessageMeta<'a> { } trait OpAction { - fn output_for<'a>(datum: impl Into>) -> DatumCow<'a>; + type Output<'a>; - fn apply(builder: &mut ArrayBuilderImpl, output: DatumCow<'_>); + fn output_for<'a>(datum: impl Into>) -> Self::Output<'a>; + + fn apply(builder: &mut ArrayBuilderImpl, output: Self::Output<'_>); fn rollback(builder: &mut ArrayBuilderImpl); @@ -257,8 +259,10 @@ trait OpAction { struct OpActionInsert; impl OpAction for OpActionInsert { + type Output<'a> = DatumCow<'a>; + #[inline(always)] - fn output_for<'a>(datum: impl Into>) -> DatumCow<'a> { + fn output_for<'a>(datum: impl Into>) -> Self::Output<'a> { datum.into() } @@ -281,8 +285,10 @@ impl OpAction for OpActionInsert { struct OpActionDelete; impl OpAction for OpActionDelete { + type Output<'a> = DatumCow<'a>; + #[inline(always)] - fn output_for<'a>(datum: impl Into>) -> DatumCow<'a> { + fn output_for<'a>(datum: impl Into>) -> Self::Output<'a> { datum.into() } @@ -302,6 +308,36 @@ impl OpAction for OpActionDelete { } } +struct OpActionUpdate; + +impl OpAction for OpActionUpdate { + type Output<'a> = (DatumCow<'a>, DatumCow<'a>); + + #[inline(always)] + fn output_for<'a>(datum: impl Into>) -> Self::Output<'a> { + let datum = datum.into(); + (datum.clone(), datum) + } + + #[inline(always)] + fn apply(builder: &mut ArrayBuilderImpl, output: (DatumCow<'_>, DatumCow<'_>)) { + builder.append(output.0); + builder.append(output.1); + } + + #[inline(always)] + fn rollback(builder: &mut ArrayBuilderImpl) { + builder.pop().unwrap(); + builder.pop().unwrap(); + } + + #[inline(always)] + fn finish(writer: &mut SourceStreamChunkRowWriter<'_>) { + writer.append_op(Op::UpdateDelete); + writer.append_op(Op::UpdateInsert); + } +} + impl SourceStreamChunkRowWriter<'_> { fn append_op(&mut self, op: Op) { self.op_builder.push(op); @@ -310,7 +346,7 @@ impl SourceStreamChunkRowWriter<'_> { fn do_action<'a, A: OpAction>( &'a mut self, - mut f: impl FnMut(&SourceColumnDesc) -> AccessResult>, + mut f: impl FnMut(&SourceColumnDesc) -> AccessResult>, ) -> AccessResult<()> { let mut parse_field = |desc: &SourceColumnDesc| { match f(desc) { @@ -507,6 +543,22 @@ impl SourceStreamChunkRowWriter<'_> { { self.do_action::(|desc| f(desc).map(Into::into)) } + + /// Write a `Update` record to the [`StreamChunk`], with the given fallible closure that + /// produces two [`Datum`]s as old and new value by corresponding [`SourceColumnDesc`]. + /// + /// See the [struct-level documentation](SourceStreamChunkRowWriter) for more details. + #[inline(always)] + pub fn do_update<'a, D1, D2>( + &mut self, + mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<(D1, D2)>, + ) -> AccessResult<()> + where + D1: Into>, + D2: Into>, + { + self.do_action::(|desc| f(desc).map(|(old, new)| (old.into(), new.into()))) + } } /// Transaction control message. Currently only used by Debezium messages. From d4ae5fc6c37154e545173ca09b3193f9fce061df Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 21 Jun 2024 12:00:48 +0800 Subject: [PATCH 6/7] B happy Signed-off-by: xxchan --- src/connector/codec/src/decoder/json/mod.rs | 28 +++++++++++---------- src/connector/codec/src/lib.rs | 10 ++++++-- src/connector/src/parser/json_parser.rs | 8 +++--- 3 files changed, 27 insertions(+), 19 deletions(-) diff --git a/src/connector/codec/src/decoder/json/mod.rs b/src/connector/codec/src/decoder/json/mod.rs index 651ac21f70e2f..b56ce3abbee25 100644 --- a/src/connector/codec/src/decoder/json/mod.rs +++ b/src/connector/codec/src/decoder/json/mod.rs @@ -17,17 +17,19 @@ use risingwave_pb::plan_common::ColumnDesc; use super::avro::{avro_schema_to_column_descs, MapHandling}; -/// FIXME: when the JSON schema is invalid, it will panic. -/// -/// ## Notes on type conversion -/// Map will be used when an object doesn't have `properties` but has `additionalProperties`. -/// When an object has `properties` and `additionalProperties`, the latter will be ignored. -/// -/// -/// TODO: examine other stuff like `oneOf`, `patternProperties`, etc. -pub fn json_schema_to_columns(json_schema: &serde_json::Value) -> anyhow::Result> { - let avro_schema = jst::convert_avro(json_schema, jst::Context::default()).to_string(); - let schema = - apache_avro::Schema::parse_str(&avro_schema).context("failed to parse avro schema")?; - avro_schema_to_column_descs(&schema, Some(MapHandling::Jsonb)).map_err(Into::into) +impl crate::JsonSchema { + /// FIXME: when the JSON schema is invalid, it will panic. + /// + /// ## Notes on type conversion + /// Map will be used when an object doesn't have `properties` but has `additionalProperties`. + /// When an object has `properties` and `additionalProperties`, the latter will be ignored. + /// + /// + /// TODO: examine other stuff like `oneOf`, `patternProperties`, etc. + pub fn json_schema_to_columns(&self) -> anyhow::Result> { + let avro_schema = jst::convert_avro(&self.0, jst::Context::default()).to_string(); + let schema = + apache_avro::Schema::parse_str(&avro_schema).context("failed to parse avro schema")?; + avro_schema_to_column_descs(&schema, Some(MapHandling::Jsonb)).map_err(Into::into) + } } diff --git a/src/connector/codec/src/lib.rs b/src/connector/codec/src/lib.rs index c0483087eb78a..6b1335ab32e26 100644 --- a/src/connector/codec/src/lib.rs +++ b/src/connector/codec/src/lib.rs @@ -43,9 +43,8 @@ /// The core API is [`decoder::Access`]. pub mod decoder; -pub use risingwave_pb::plan_common::ColumnDesc; -pub type RisingWaveSchema = Vec; pub use apache_avro::schema::Schema as AvroSchema; +pub use risingwave_pb::plan_common::ColumnDesc; pub struct JsonSchema(pub serde_json::Value); impl JsonSchema { pub fn parse_str(schema: &str) -> anyhow::Result { @@ -54,4 +53,11 @@ impl JsonSchema { let value = serde_json::from_str(schema).context("failed to parse json schema")?; Ok(Self(value)) } + + pub fn parse_bytes(schema: &[u8]) -> anyhow::Result { + use anyhow::Context; + + let value = serde_json::from_slice(schema).context("failed to parse json schema")?; + Ok(Self(value)) + } } diff --git a/src/connector/src/parser/json_parser.rs b/src/connector/src/parser/json_parser.rs index 081df749d3b38..4ed9af1d30dfc 100644 --- a/src/connector/src/parser/json_parser.rs +++ b/src/connector/src/parser/json_parser.rs @@ -24,7 +24,7 @@ use std::collections::BTreeMap; use anyhow::Context as _; -use risingwave_connector_codec::decoder::json::json_schema_to_columns; +use risingwave_connector_codec::JsonSchema; use risingwave_pb::plan_common::ColumnDesc; use super::util::{bytes_from_url, get_kafka_topic}; @@ -90,13 +90,13 @@ pub async fn fetch_json_schema_and_map_to_columns( let schema = client .get_schema_by_subject(&format!("{}-value", topic)) .await?; - serde_json::from_str(&schema.content)? + JsonSchema::parse_str(&schema.content)? } else { let url = url.first().unwrap(); let bytes = bytes_from_url(url, None).await?; - serde_json::from_slice(&bytes)? + JsonSchema::parse_bytes(&bytes)? }; - json_schema_to_columns(&json_schema).map_err(Into::into) + json_schema.json_schema_to_columns().map_err(Into::into) } #[cfg(test)] From c28b986dd8840ec0796a70fa1e4b846b8a1828f2 Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 21 Jun 2024 12:02:27 +0800 Subject: [PATCH 7/7] fix lock Signed-off-by: xxchan --- Cargo.lock | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 94222a2ea252e..cbbb3db4da6a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3151,9 +3151,9 @@ checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff" [[package]] name = "crc32c" -version = "0.6.8" +version = "0.6.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a47af21622d091a8f0fb295b88bc886ac74efcc613efc19f5d0b21de5c89e47" +checksum = "0227b9f93e535d49bc7ce914c066243424ce85ed90864cebd0874b184e9b6947" dependencies = [ "rustc_version 0.4.0", ] @@ -3396,16 +3396,15 @@ dependencies = [ [[package]] name = "curve25519-dalek" -version = "4.1.2" +version = "4.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a677b8922c94e01bdbb12126b0bc852f00447528dee1782229af9c720c3f348" +checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be" dependencies = [ "cfg-if", "cpufeatures", "curve25519-dalek-derive", "digest", "fiat-crypto", - "platforms", "rustc_version 0.4.0", "subtle", "zeroize", @@ -9125,12 +9124,6 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" -[[package]] -name = "platforms" -version = "3.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4503fa043bf02cee09a9582e9554b4c6403b2ef55e4612e96561d294419429f8" - [[package]] name = "plotters" version = "0.3.5"