From 25b0cc9362ea931367e245c45d6ebea85804c286 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 18 Sep 2024 15:43:18 +0800 Subject: [PATCH] refactor(connector): replace `protobuf-native` with `protox` for schema compiling (#18543) Signed-off-by: Bugen Zhao --- Cargo.lock | 179 ++++++++---------- src/connector/codec/Cargo.toml | 3 +- src/connector/codec/build.rs | 6 - .../codec/src/common/protobuf/compiler.rs | 103 ++++------ .../codec/tests/integration_tests/protobuf.rs | 16 +- src/connector/src/schema/protobuf.rs | 32 ++-- 6 files changed, 152 insertions(+), 187 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 640a843a83a1e..fcce5f2c9db4b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1266,15 +1266,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" -[[package]] -name = "autotools" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aef8da1805e028a172334c3b680f93e71126f2327622faef2ec3d893c0a4ad77" -dependencies = [ - "cc", -] - [[package]] name = "await-tree" version = "0.2.1" @@ -2045,6 +2036,12 @@ version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f40afb3abbf90895dda3ddbc6d8734d24215130a22d646067690f5e318f81bc" +[[package]] +name = "beef" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a8241f3ebb85c056b509d4327ad0358fbbba6ffb340bf388f26350aeda225b1" + [[package]] name = "bigdecimal" version = "0.3.1" @@ -2831,16 +2828,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "codespan-reporting" -version = "0.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3538270d33cc669650c4b093848450d380def10c331d38c768e34cac80576e6e" -dependencies = [ - "termcolor", - "unicode-width", -] - [[package]] name = "colorchoice" version = "1.0.0" @@ -3462,50 +3449,6 @@ dependencies = [ "syn 2.0.66", ] -[[package]] -name = "cxx" -version = "1.0.107" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbe98ba1789d56fb3db3bee5e032774d4f421b685de7ba703643584ba24effbe" -dependencies = [ - "cc", - "cxxbridge-flags", - "cxxbridge-macro", - "link-cplusplus", -] - -[[package]] -name = "cxx-build" -version = "1.0.107" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4ce20f6b8433da4841b1dadfb9468709868022d829d5ca1f2ffbda928455ea3" -dependencies = [ - "cc", - "codespan-reporting", - "once_cell", - "proc-macro2", - "quote", - "scratch", - "syn 2.0.66", -] - -[[package]] -name = "cxxbridge-flags" -version = "1.0.107" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20888d9e1d2298e2ff473cee30efe7d5036e437857ab68bbfea84c74dba91da2" - -[[package]] -name = "cxxbridge-macro" -version = "1.0.107" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fa16a70dd58129e4dfffdff535fb1bce66673f7bbeec4a5a1765a504e1ccd84" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.66", -] - [[package]] name = "darling" version = "0.13.4" @@ -6739,15 +6682,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "link-cplusplus" -version = "1.0.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d240c6f7e1ba3a28b0249f774e6a9dd0175054b52dfbb61b16eb8505c3785c9" -dependencies = [ - "cc", -] - [[package]] name = "linked-hash-map" version = "0.5.6" @@ -6818,6 +6752,39 @@ dependencies = [ "value-bag", ] +[[package]] +name = "logos" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff1ceb190eb9bdeecdd8f1ad6a71d6d632a50905948771718741b5461fb01e13" +dependencies = [ + "logos-derive", +] + +[[package]] +name = "logos-codegen" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90be66cb7bd40cb5cc2e9cfaf2d1133b04a3d93b72344267715010a466e0915a" +dependencies = [ + "beef", + "fnv", + "lazy_static", + "proc-macro2", + "quote", + "regex-syntax 0.8.2", + "syn 2.0.66", +] + +[[package]] +name = "logos-derive" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45154231e8e96586b39494029e58f12f8ffcb5ecf80333a603a13aa205ea8cbd" +dependencies = [ + "logos-codegen", +] + [[package]] name = "lru" version = "0.7.6" @@ -7177,6 +7144,29 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "miette" +version = "7.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4edc8853320c2a0dab800fbda86253c8938f6ea88510dc92c5f1ed20e794afc1" +dependencies = [ + "cfg-if", + "miette-derive", + "thiserror", + "unicode-width", +] + +[[package]] +name = "miette-derive" +version = "7.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcf09caffaac8068c346b6df2a7fc27a177fd20b39421a39ce0a211bde679a6c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.66", +] + [[package]] name = "mime" version = "0.3.17" @@ -9239,6 +9229,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55a6a9143ae25c25fa7b6a48d6cc08b10785372060009c25140a4e7c340e95af" dependencies = [ "base64 0.22.0", + "logos", + "miette", "once_cell", "prost 0.13.1", "prost-types 0.13.1", @@ -9280,26 +9272,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" [[package]] -name = "protobuf-native" -version = "0.2.2+3.19.1" +name = "protox" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "577feb02952883f2348ee6f36afeeb4e0ee22be0e5b707d60bdc2f5aab038e8f" +checksum = "873f359bdecdfe6e353752f97cb9ee69368df55b16363ed2216da85e03232a58" dependencies = [ - "cxx", - "cxx-build", - "paste", - "pretty_assertions", - "protobuf-src", - "tempfile", + "bytes", + "miette", + "prost 0.13.1", + "prost-reflect", + "prost-types 0.13.1", + "protox-parse", + "thiserror", ] [[package]] -name = "protobuf-src" -version = "1.1.0+21.5" +name = "protox-parse" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7ac8852baeb3cc6fb83b93646fb93c0ffe5d14bf138c945ceb4b9948ee0e3c1" +checksum = "a3a462d115462c080ae000c29a47f0b3985737e5d3a995fcdbcaa5c782068dde" dependencies = [ - "autotools", + "logos", + "miette", + "prost-types 0.13.1", + "thiserror", ] [[package]] @@ -10751,8 +10747,7 @@ dependencies = [ "prost-build 0.12.1", "prost-reflect", "prost-types 0.13.1", - "protobuf-native", - "protobuf-src", + "protox", "risingwave_common", "risingwave_pb", "rust_decimal", @@ -12406,12 +12401,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" -[[package]] -name = "scratch" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3cf7c11c38cb994f3d40e8a8cde3bbd1f72a435e4c49e85d6553d8312306152" - [[package]] name = "scrypt" version = "0.11.0" @@ -14982,9 +14971,9 @@ checksum = "1dd624098567895118886609431a7c3b8f516e41d30e0643f03d94592a147e36" [[package]] name = "unicode-width" -version = "0.1.10" +version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b" +checksum = "0336d538f7abc86d282a4189614dfaa90810dfc2c6f6427eaf88e16311dd225d" [[package]] name = "unicode-xid" diff --git a/src/connector/codec/Cargo.toml b/src/connector/codec/Cargo.toml index 5848c236dbd4d..bdb43ff0a9062 100644 --- a/src/connector/codec/Cargo.toml +++ b/src/connector/codec/Cargo.toml @@ -29,7 +29,7 @@ num-bigint = "0.4" prost = { workspace = true, features = ["no-recursion-limit"] } prost-reflect = { version = "0.14", features = ["serde"] } prost-types = "0.13" -protobuf-native = "0.2.2" +protox = "0.7" risingwave_common = { workspace = true } risingwave_pb = { workspace = true } rust_decimal = "1" @@ -47,7 +47,6 @@ tokio = { version = "0.2", package = "madsim-tokio" } [build-dependencies] prost-build = "0.12" -protobuf-src = "1" [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../../workspace-hack" } diff --git a/src/connector/codec/build.rs b/src/connector/codec/build.rs index 8a9438d59b9e8..7cf027a4338ad 100644 --- a/src/connector/codec/build.rs +++ b/src/connector/codec/build.rs @@ -26,10 +26,4 @@ fn main() { .out_dir("./tests/integration_tests/protobuf") .compile_protos(&protos, &Vec::::new()) .unwrap(); - - let proto_include_path = protobuf_src::include(); - println!( - "cargo:rustc-env=PROTO_INCLUDE={}", - proto_include_path.to_str().unwrap() - ); } diff --git a/src/connector/codec/src/common/protobuf/compiler.rs b/src/connector/codec/src/common/protobuf/compiler.rs index 80e86d002d4aa..ce9470d2c130b 100644 --- a/src/connector/codec/src/common/protobuf/compiler.rs +++ b/src/connector/codec/src/common/protobuf/compiler.rs @@ -12,75 +12,52 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::path::{Path, PathBuf}; +use std::collections::HashMap; -use itertools::Itertools; - -macro_rules! embed_wkts { - [$( $path:literal ),+ $(,)?] => { - &[$( - ( - concat!("google/protobuf/", $path), - include_bytes!(concat!(env!("PROTO_INCLUDE"), "/google/protobuf/", $path)).as_slice(), - ) - ),+] - }; -} -const WELL_KNOWN_TYPES: &[(&str, &[u8])] = embed_wkts![ - "any.proto", - "api.proto", - "compiler/plugin.proto", - "descriptor.proto", - "duration.proto", - "empty.proto", - "field_mask.proto", - "source_context.proto", - "struct.proto", - "timestamp.proto", - "type.proto", - "wrappers.proto", -]; - -#[derive(Debug, thiserror::Error)] -pub enum PbCompileError { - #[error("build_file_descriptor_set failed\n{}", errs.iter().map(|e| format!("\t{e}")).join("\n"))] - Build { - errs: Vec, - }, - #[error("serialize descriptor set failed")] - Serialize, -} +use prost_types::FileDescriptorSet; +use protox::file::{ChainFileResolver, File, FileResolver, GoogleFileResolver}; +use protox::Error; +// name -> content pub fn compile_pb( - main_file: (PathBuf, Vec), - dependencies: impl IntoIterator)>, -) -> Result, PbCompileError> { - use protobuf_native::compiler::{ - SimpleErrorCollector, SourceTreeDescriptorDatabase, VirtualSourceTree, - }; - use protobuf_native::MessageLite; + main_file: (String, String), + dependencies: impl IntoIterator, +) -> Result { + struct MyResolver { + map: HashMap, + } - let root = main_file.0.clone(); + impl MyResolver { + fn new( + main_file: (String, String), + dependencies: impl IntoIterator, + ) -> Self { + let map = std::iter::once(main_file).chain(dependencies).collect(); - let mut source_tree = VirtualSourceTree::new(); - for (path, bytes) in std::iter::once(main_file).chain(dependencies.into_iter()) { - source_tree.as_mut().add_file(&path, bytes); - } - for (path, bytes) in WELL_KNOWN_TYPES { - source_tree - .as_mut() - .add_file(Path::new(path), bytes.to_vec()); + Self { map } + } } - let mut error_collector = SimpleErrorCollector::new(); - // `db` needs to be dropped before we can iterate on `error_collector`. - let fds = { - let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut()); - db.as_mut().record_errors_to(error_collector.as_mut()); - db.as_mut().build_file_descriptor_set(&[root]) + impl FileResolver for MyResolver { + fn open_file(&self, name: &str) -> Result { + if let Some(content) = self.map.get(name) { + Ok(File::from_source(name, content)?) + } else { + Err(Error::file_not_found(name)) + } + } } - .map_err(|_| PbCompileError::Build { - errs: error_collector.as_mut().collect(), - })?; - fds.serialize().map_err(|_| PbCompileError::Serialize) + + let main_file_name = main_file.0.clone(); + + let mut resolver = ChainFileResolver::new(); + resolver.add(GoogleFileResolver::new()); + resolver.add(MyResolver::new(main_file, dependencies)); + + let fd = protox::Compiler::with_file_resolver(resolver) + .include_imports(true) + .open_file(&main_file_name)? + .file_descriptor_set(); + + Ok(fd) } diff --git a/src/connector/codec/tests/integration_tests/protobuf.rs b/src/connector/codec/tests/integration_tests/protobuf.rs index 9a70ef5e5c7a9..0b022f603a46d 100644 --- a/src/connector/codec/tests/integration_tests/protobuf.rs +++ b/src/connector/codec/tests/integration_tests/protobuf.rs @@ -19,7 +19,6 @@ mod recursive; #[allow(clippy::all)] mod all_types; use std::collections::HashMap; -use std::path::PathBuf; use anyhow::Context; use prost::Message; @@ -89,14 +88,15 @@ fn load_message_descriptor( message_name: &str, ) -> anyhow::Result { let location = "tests/test_data/".to_string() + file_name; - let file_content = fs_err::read(&location).unwrap(); - let schema_bytes = if file_name.ends_with(".proto") { - compile_pb((PathBuf::from(&location), file_content), [])? + let file_content = fs_err::read_to_string(&location).unwrap(); + + let pool = if file_name.ends_with(".proto") { + let fd_set = compile_pb((location.clone(), file_content), [])?; + DescriptorPool::from_file_descriptor_set(fd_set) } else { - file_content - }; - let pool = DescriptorPool::decode(schema_bytes.as_slice()) - .with_context(|| format!("cannot build descriptor pool from schema `{location}`"))?; + DescriptorPool::decode(file_content.as_bytes()) + } + .with_context(|| format!("cannot build descriptor pool from schema `{location}`"))?; pool.get_message_by_name(message_name).with_context(|| { format!( diff --git a/src/connector/src/schema/protobuf.rs b/src/connector/src/schema/protobuf.rs index 634d692066ac1..38161d2d2cc56 100644 --- a/src/connector/src/schema/protobuf.rs +++ b/src/connector/src/schema/protobuf.rs @@ -13,9 +13,10 @@ // limitations under the License. use std::collections::BTreeMap; -use std::path::PathBuf; +use anyhow::Context as _; use prost_reflect::{DescriptorPool, FileDescriptor, MessageDescriptor}; +use prost_types::FileDescriptorSet; use risingwave_connector_codec::common::protobuf::compile_pb; use super::loader::{LoadedSchema, SchemaLoader}; @@ -99,29 +100,34 @@ pub async fn fetch_from_registry( impl LoadedSchema for FileDescriptor { fn compile(primary: Subject, references: Vec) -> Result { let primary_name = primary.name.clone(); - let compiled_pb = compile_pb_subject(primary, references)?; - let pool = DescriptorPool::decode(compiled_pb.as_slice()) - .map_err(|e| SchemaFetchError::SchemaCompile(e.into()))?; - pool.get_file_by_name(&primary_name).ok_or_else(|| { - SchemaFetchError::SchemaCompile( - anyhow::anyhow!("{primary_name} lost after compilation").into(), - ) - }) + + match compile_pb_subject(primary, references) + .context("failed to compile protobuf schema into fd set") + { + Err(e) => Err(SchemaFetchError::SchemaCompile(e.into())), + Ok(fd_set) => DescriptorPool::from_file_descriptor_set(fd_set) + .context("failed to convert fd set to descriptor pool") + .and_then(|pool| { + pool.get_file_by_name(&primary_name) + .context("file lost after compilation") + }) + .map_err(|e| SchemaFetchError::SchemaCompile(e.into())), + } } } fn compile_pb_subject( primary_subject: Subject, dependency_subjects: Vec, -) -> Result, SchemaFetchError> { +) -> Result { compile_pb( ( - PathBuf::from(&primary_subject.name), - primary_subject.schema.content.as_bytes().to_vec(), + primary_subject.name.clone(), + primary_subject.schema.content.clone(), ), dependency_subjects .into_iter() - .map(|s| (PathBuf::from(&s.name), s.schema.content.as_bytes().to_vec())), + .map(|s| (s.name.clone(), s.schema.content.clone())), ) .map_err(|e| SchemaFetchError::SchemaCompile(e.into())) }