From a24df58004e03f80bceaaa9e4613d0314e3fa981 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Tue, 18 Jun 2024 15:52:28 +0800 Subject: [PATCH] introduce iceberg-rust catalog --- Cargo.lock | 474 ++++++++++++++++-- Cargo.toml | 2 + src/connector/Cargo.toml | 4 + src/connector/src/error.rs | 1 + src/connector/src/sink/iceberg/jni_catalog.rs | 166 +++++- src/connector/src/sink/iceberg/mod.rs | 79 ++- .../src/sink/iceberg/storage_catalog.rs | 213 ++++++++ 7 files changed, 891 insertions(+), 48 deletions(-) create mode 100644 src/connector/src/sink/iceberg/storage_catalog.rs diff --git a/Cargo.lock b/Cargo.lock index 69e17e94bce33..8ca397a034aa8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -238,6 +238,29 @@ dependencies = [ "backtrace", ] +[[package]] +name = "apache-avro" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ceb7c683b2f8f40970b70e39ff8be514c95b96fcb9c4af87e1ed2cb2e10801a0" +dependencies = [ + "digest", + "lazy_static", + "libflate", + "log", + "num-bigint", + "quad-rand", + "rand", + "regex-lite", + "serde", + "serde_json", + "strum 0.25.0", + "strum_macros 0.25.3", + "thiserror", + "typed-builder 0.16.2", + "uuid", +] + [[package]] name = "apache-avro" version = "0.16.0" @@ -312,6 +335,12 @@ version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b3d0060af21e8d11a926981cc00c6c1541aa91dd64b9f881985c3da1094425f" +[[package]] +name = "array-init" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d62b7694a562cdf5a74227903507c56ab2cc8bdd1f781ed5cb4cf9c9f810bfc" + [[package]] name = "array-util" version = "1.0.1" @@ -404,6 +433,21 @@ dependencies = [ "num", ] +[[package]] +name = "arrow-arith" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7029a5b3efbeafbf4a12d12dc16b8f9e9bff20a410b8c25c5d28acc089e1043" +dependencies = [ + "arrow-array 52.0.0", + "arrow-buffer 52.0.0", + "arrow-data 52.0.0", + "arrow-schema 52.0.0", + "chrono", + "half 2.3.1", + "num", +] + [[package]] name = "arrow-array" version = "48.0.1" @@ -437,6 +481,22 @@ dependencies = [ "num", ] +[[package]] +name = "arrow-array" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d33238427c60271710695f17742f45b1a5dc5bcfc5c15331c25ddfe7abf70d97" +dependencies = [ + "ahash 0.8.11", + "arrow-buffer 52.0.0", + "arrow-data 52.0.0", + "arrow-schema 52.0.0", + "chrono", + "half 2.3.1", + "hashbrown 0.14.3", + "num", +] + [[package]] name = "arrow-buffer" version = "48.0.1" @@ -459,6 +519,17 @@ dependencies = [ "num", ] +[[package]] +name = "arrow-buffer" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe9b95e825ae838efaf77e366c00d3fc8cca78134c9db497d6bda425f2e7b7c1" +dependencies = [ + "bytes", + "half 2.3.1", + "num", +] + [[package]] name = "arrow-cast" version = "48.0.1" @@ -495,6 +566,26 @@ dependencies = [ "num", ] +[[package]] +name = "arrow-cast" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87cf8385a9d5b5fcde771661dd07652b79b9139fea66193eda6a88664400ccab" +dependencies = [ + "arrow-array 52.0.0", + "arrow-buffer 52.0.0", + "arrow-data 52.0.0", + "arrow-schema 52.0.0", + "arrow-select 52.0.0", + "atoi", + "base64 0.22.0", + "chrono", + "half 2.3.1", + "lexical-core", + "num", + "ryu", +] + [[package]] name = "arrow-csv" version = "48.0.1" @@ -538,6 +629,18 @@ dependencies = [ "num", ] +[[package]] +name = "arrow-data" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb29be98f987bcf217b070512bb7afba2f65180858bca462edf4a39d84a23e10" +dependencies = [ + "arrow-buffer 52.0.0", + "arrow-schema 52.0.0", + "half 2.3.1", + "num", +] + [[package]] name = "arrow-flight" version = "50.0.0" @@ -569,7 +672,7 @@ dependencies = [ "arrow-cast 48.0.1", "arrow-data 48.0.1", "arrow-schema 48.0.1", - "flatbuffers", + "flatbuffers 23.5.26", ] [[package]] @@ -583,7 +686,21 @@ dependencies = [ "arrow-cast 50.0.0", "arrow-data 50.0.0", "arrow-schema 50.0.0", - "flatbuffers", + "flatbuffers 23.5.26", +] + +[[package]] +name = "arrow-ipc" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffc68f6523970aa6f7ce1dc9a33a7d9284cfb9af77d4ad3e617dbe5d79cc6ec8" +dependencies = [ + "arrow-array 52.0.0", + "arrow-buffer 52.0.0", + "arrow-cast 52.0.0", + "arrow-data 52.0.0", + "arrow-schema 52.0.0", + "flatbuffers 24.3.25", ] [[package]] @@ -636,6 +753,21 @@ dependencies = [ "num", ] +[[package]] +name = "arrow-ord" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcb56ed1547004e12203652f12fe12e824161ff9d1e5cf2a7dc4ff02ba94f413" +dependencies = [ + "arrow-array 52.0.0", + "arrow-buffer 52.0.0", + "arrow-data 52.0.0", + "arrow-schema 52.0.0", + "arrow-select 52.0.0", + "half 2.3.1", + "num", +] + [[package]] name = "arrow-row" version = "48.0.1" @@ -681,6 +813,12 @@ version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ff3e9c01f7cd169379d269f926892d0e622a704960350d09d331be3ec9e0029" +[[package]] +name = "arrow-schema" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32aae6a60458a2389c0da89c9de0b7932427776127da1a738e2efc21d32f3393" + [[package]] name = "arrow-select" version = "48.0.1" @@ -709,6 +847,20 @@ dependencies = [ "num", ] +[[package]] +name = "arrow-select" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de36abaef8767b4220d7b4a8c2fe5ffc78b47db81b03d77e2136091c3ba39102" +dependencies = [ + "ahash 0.8.11", + "arrow-array 52.0.0", + "arrow-buffer 52.0.0", + "arrow-data 52.0.0", + "arrow-schema 52.0.0", + "num", +] + [[package]] name = "arrow-string" version = "48.0.1" @@ -741,6 +893,23 @@ dependencies = [ "regex-syntax 0.8.2", ] +[[package]] +name = "arrow-string" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e435ada8409bcafc910bc3e0077f532a4daa20e99060a496685c0e3e53cc2597" +dependencies = [ + "arrow-array 52.0.0", + "arrow-buffer 52.0.0", + "arrow-data 52.0.0", + "arrow-schema 52.0.0", + "arrow-select 52.0.0", + "memchr", + "num", + "regex", + "regex-syntax 0.8.2", +] + [[package]] name = "arrow-udf-flight" version = "0.1.0" @@ -915,7 +1084,7 @@ version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc2d0cfb2a7388d34f590e76686704c494ed7aaceed62ee1ba35cbf363abc2a5" dependencies = [ - "brotli", + "brotli 3.5.0", "bzip2", "flate2", "futures-core", @@ -1560,7 +1729,7 @@ dependencies = [ "aws-smithy-types", "bytes", "http 0.2.9", - "http 1.0.0", + "http 1.1.0", "pin-project-lite", "tokio", "tracing", @@ -1663,7 +1832,7 @@ dependencies = [ "axum-core 0.4.3", "bytes", "futures-util", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "http-body-util", "hyper 1.1.0", @@ -1713,7 +1882,7 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "http-body-util", "mime", @@ -1733,7 +1902,7 @@ checksum = "077959a7f8cf438676af90b483304528eb7e16eadadb7f44e9ada4f9dceb9e62" dependencies = [ "axum-core 0.4.3", "chrono", - "http 1.0.0", + "http 1.1.0", "mime_guess", "rust-embed", "tower-service", @@ -1897,6 +2066,12 @@ dependencies = [ "serde", ] +[[package]] +name = "bimap" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "230c5f1ca6a325a32553f8640d31ac9b49f2411e901e427570154868b46da4f7" + [[package]] name = "bincode" version = "1.3.3" @@ -1934,7 +2109,7 @@ dependencies = [ "bitflags 2.5.0", "cexpr", "clang-sys", - "itertools 0.12.1", + "itertools 0.10.5", "lazy_static", "lazycell", "log", @@ -2087,7 +2262,18 @@ checksum = "d640d25bc63c50fb1f0b545ffd80207d2e10a4c965530809b40ba3386825c391" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", - "brotli-decompressor", + "brotli-decompressor 2.5.1", +] + +[[package]] +name = "brotli" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74f7971dbd9326d58187408ab83117d8ac1bb9c17b085fdacd1cf2f598719b6b" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor 4.0.1", ] [[package]] @@ -2100,6 +2286,16 @@ dependencies = [ "alloc-stdlib", ] +[[package]] +name = "brotli-decompressor" +version = "4.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a45bd2e4095a8b518033b128020dd4a55aab1c0a381ba4404a472630f4bc362" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bstr" version = "1.6.2" @@ -2971,9 +3167,9 @@ checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff" [[package]] name = "crc32c" -version = "0.6.5" +version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89254598aa9b9fa608de44b3ae54c810f0f06d755e24c50177f1f8f31ff50ce2" +checksum = "3a47af21622d091a8f0fb295b88bc886ac74efcc613efc19f5d0b21de5c89e47" dependencies = [ "rustc_version 0.4.0", ] @@ -3890,7 +4086,7 @@ dependencies = [ "deno_core", "deno_tls", "dyn-clone", - "http 1.0.0", + "http 1.1.0", "pin-project", "reqwest 0.12.4", "serde", @@ -3907,14 +4103,14 @@ dependencies = [ "async-compression", "async-trait", "base64 0.21.7", - "brotli", + "brotli 3.5.0", "bytes", "cache_control", "deno_core", "deno_net", "deno_websocket", "flate2", - "http 1.0.0", + "http 1.1.0", "httparse", "hyper 0.14.27", "hyper 1.1.0", @@ -4092,7 +4288,7 @@ dependencies = [ "deno_tls", "fastwebsockets", "h2 0.4.4", - "http 1.0.0", + "http 1.1.0", "http-body-util", "hyper 1.1.0", "hyper-util", @@ -5056,6 +5252,16 @@ dependencies = [ "rustc_version 0.4.0", ] +[[package]] +name = "flatbuffers" +version = "24.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8add37afff2d4ffa83bc748a70b4b1370984f6980768554182424ef71447c35f" +dependencies = [ + "bitflags 1.3.2", + "rustc_version 0.4.0", +] + [[package]] name = "flate2" version = "1.0.27" @@ -5884,7 +6090,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http 1.0.0", + "http 1.1.0", "indexmap 2.0.0", "slab", "tokio", @@ -6076,9 +6282,9 @@ dependencies = [ [[package]] name = "http" -version = "1.0.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" dependencies = [ "bytes", "fnv", @@ -6103,7 +6309,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" dependencies = [ "bytes", - "http 1.0.0", + "http 1.1.0", ] [[package]] @@ -6114,7 +6320,7 @@ checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d" dependencies = [ "bytes", "futures-core", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "pin-project-lite", ] @@ -6177,7 +6383,7 @@ dependencies = [ "futures-channel", "futures-util", "h2 0.4.4", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "httparse", "httpdate", @@ -6211,7 +6417,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c" dependencies = [ "futures-util", - "http 1.0.0", + "http 1.1.0", "hyper 1.1.0", "hyper-util", "rustls 0.22.4", @@ -6271,7 +6477,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "hyper 1.1.0", "pin-project-lite", @@ -6318,6 +6524,72 @@ dependencies = [ "cc", ] +[[package]] +name = "iceberg" +version = "0.2.0" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=e6ae6229dfd0c0a8793cde42a7d626c704ea9088#e6ae6229dfd0c0a8793cde42a7d626c704ea9088" +dependencies = [ + "anyhow", + "apache-avro 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)", + "array-init", + "arrow-arith 52.0.0", + "arrow-array 52.0.0", + "arrow-ord 52.0.0", + "arrow-schema 52.0.0", + "arrow-select 52.0.0", + "arrow-string 52.0.0", + "async-stream", + "async-trait", + "bimap", + "bitvec", + "bytes", + "chrono", + "derive_builder 0.20.0", + "either", + "fnv", + "futures", + "itertools 0.13.0", + "lazy_static", + "log", + "murmur3", + "once_cell", + "opendal 0.47.0", + "ordered-float 4.1.1", + "parquet 52.0.0", + "reqwest 0.12.4", + "rust_decimal", + "serde", + "serde_bytes", + "serde_derive", + "serde_json", + "serde_repr", + "serde_with", + "tokio", + "typed-builder 0.18.0", + "url", + "urlencoding", + "uuid", +] + +[[package]] +name = "iceberg-catalog-rest" +version = "0.2.0" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=e6ae6229dfd0c0a8793cde42a7d626c704ea9088#e6ae6229dfd0c0a8793cde42a7d626c704ea9088" +dependencies = [ + "async-trait", + "chrono", + "iceberg", + "itertools 0.13.0", + "log", + "reqwest 0.12.4", + "serde", + "serde_derive", + "serde_json", + "typed-builder 0.18.0", + "urlencoding", + "uuid", +] + [[package]] name = "icelake" version = "0.0.10" @@ -6347,7 +6619,7 @@ dependencies = [ "log", "murmur3", "once_cell", - "opendal", + "opendal 0.45.1", "ordered-float 3.9.1", "parquet 50.0.0", "prometheus", @@ -7167,7 +7439,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "http 1.0.0", + "http 1.1.0", "madsim", "spin 0.9.8", "tracing", @@ -8050,7 +8322,7 @@ dependencies = [ "percent-encoding", "prometheus", "quick-xml 0.31.0", - "reqsign", + "reqsign 0.14.9", "reqwest 0.11.20", "serde", "serde_json", @@ -8059,6 +8331,36 @@ dependencies = [ "uuid", ] +[[package]] +name = "opendal" +version = "0.47.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c3ba698f2258bebdf7a3a38862bb6ef1f96d351627002686dacc228f805bdd6" +dependencies = [ + "anyhow", + "async-trait", + "backon", + "base64 0.22.0", + "bytes", + "chrono", + "crc32c", + "flagset", + "futures", + "getrandom", + "http 1.1.0", + "log", + "md-5", + "once_cell", + "percent-encoding", + "quick-xml 0.31.0", + "reqsign 0.15.2", + "reqwest 0.12.4", + "serde", + "serde_json", + "tokio", + "uuid", +] + [[package]] name = "openidconnect" version = "3.4.0" @@ -8238,6 +8540,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "536900a8093134cf9ccf00a27deb3532421099e958d9dd431135d0c7543ca1e8" dependencies = [ "num-traits", + "rand", + "serde", ] [[package]] @@ -8455,7 +8759,7 @@ dependencies = [ "arrow-schema 48.0.1", "arrow-select 48.0.1", "base64 0.21.7", - "brotli", + "brotli 3.5.0", "bytes", "chrono", "flate2", @@ -8489,7 +8793,7 @@ dependencies = [ "arrow-schema 50.0.0", "arrow-select 50.0.0", "base64 0.21.7", - "brotli", + "brotli 3.5.0", "bytes", "chrono", "flate2", @@ -8508,6 +8812,41 @@ dependencies = [ "zstd 0.13.0", ] +[[package]] +name = "parquet" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29c3b5322cc1bbf67f11c079c42be41a55949099b78732f7dba9e15edde40eab" +dependencies = [ + "ahash 0.8.11", + "arrow-array 52.0.0", + "arrow-buffer 52.0.0", + "arrow-cast 52.0.0", + "arrow-data 52.0.0", + "arrow-ipc 52.0.0", + "arrow-schema 52.0.0", + "arrow-select 52.0.0", + "base64 0.22.0", + "brotli 6.0.0", + "bytes", + "chrono", + "flate2", + "futures", + "half 2.3.1", + "hashbrown 0.14.3", + "lz4_flex", + "num", + "num-bigint", + "paste", + "seq-macro", + "snap", + "thrift", + "tokio", + "twox-hash", + "zstd 0.13.0", + "zstd-sys", +] + [[package]] name = "parse-display" version = "0.9.0" @@ -9313,7 +9652,7 @@ checksum = "8bdf592881d821b83d471f8af290226c8d51402259e9bb5be7f9f8bdebbb11ac" dependencies = [ "bytes", "heck 0.4.1", - "itertools 0.11.0", + "itertools 0.10.5", "log", "multimap 0.8.3", "once_cell", @@ -9347,7 +9686,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "265baba7fabd416cf5078179f7d2cbeca4ce7a9041111900675ea7c4cb8a4c32" dependencies = [ "anyhow", - "itertools 0.11.0", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.57", @@ -9508,7 +9847,7 @@ dependencies = [ "indoc", "libc", "memoffset", - "parking_lot 0.12.1", + "parking_lot 0.11.2", "portable-atomic", "pyo3-build-config", "pyo3-ffi", @@ -9641,6 +9980,7 @@ dependencies = [ "libc", "rand_chacha", "rand_core", + "serde", ] [[package]] @@ -9660,6 +10000,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ "getrandom", + "serde", ] [[package]] @@ -9903,7 +10244,35 @@ dependencies = [ "rand", "reqwest 0.11.20", "rsa", - "rust-ini", + "rust-ini 0.20.0", + "serde", + "serde_json", + "sha1", + "sha2", +] + +[[package]] +name = "reqsign" +version = "0.15.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70fe66d4cd0b5ed9b1abbfe639bf6baeaaf509f7da2d51b31111ba945be59286" +dependencies = [ + "anyhow", + "async-trait", + "base64 0.22.0", + "chrono", + "form_urlencoded", + "getrandom", + "hex", + "hmac", + "home", + "http 1.1.0", + "log", + "percent-encoding", + "quick-xml 0.31.0", + "rand", + "reqwest 0.12.4", + "rust-ini 0.21.0", "serde", "serde_json", "sha1", @@ -9969,7 +10338,7 @@ dependencies = [ "futures-core", "futures-util", "h2 0.4.4", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "http-body-util", "hyper 1.1.0", @@ -10016,7 +10385,7 @@ checksum = "a45d100244a467870f6cb763c4484d010a6bed6bd610b3676e3825d93fb4cfbd" dependencies = [ "anyhow", "async-trait", - "http 1.0.0", + "http 1.1.0", "reqwest 0.12.4", "serde", "thiserror", @@ -10199,7 +10568,7 @@ dependencies = [ "madsim-tokio", "madsim-tonic", "memcomparable", - "opendal", + "opendal 0.45.1", "parking_lot 0.12.1", "paste", "prometheus", @@ -10635,7 +11004,7 @@ name = "risingwave_connector" version = "1.11.0-alpha" dependencies = [ "anyhow", - "apache-avro 0.16.0", + "apache-avro 0.16.0 (git+https://github.com/risingwavelabs/avro?rev=25113ba88234a9ae23296e981d8302c290fdaa4b)", "arrow-array 50.0.0", "arrow-row 50.0.0", "arrow-schema 50.0.0", @@ -10679,6 +11048,8 @@ dependencies = [ "google-cloud-googleapis", "google-cloud-pubsub", "http 0.2.9", + "iceberg", + "iceberg-catalog-rest", "icelake", "indexmap 1.9.3", "itertools 0.12.1", @@ -10695,9 +11066,10 @@ dependencies = [ "mysql_common", "nexmark", "num-bigint", - "opendal", + "opendal 0.45.1", "openssl", "parking_lot 0.12.1", + "parquet 50.0.0", "paste", "pg_bigdecimal", "postgres-openssl", @@ -10751,6 +11123,7 @@ dependencies = [ "tracing", "tracing-subscriber", "tracing-test", + "typed-builder 0.18.0", "url", "urlencoding", "uuid", @@ -10765,7 +11138,7 @@ name = "risingwave_connector_codec" version = "1.11.0-alpha" dependencies = [ "anyhow", - "apache-avro 0.16.0", + "apache-avro 0.16.0 (git+https://github.com/risingwavelabs/avro?rev=25113ba88234a9ae23296e981d8302c290fdaa4b)", "chrono", "easy-ext", "itertools 0.12.1", @@ -11424,7 +11797,7 @@ dependencies = [ "madsim", "madsim-aws-sdk-s3", "madsim-tokio", - "opendal", + "opendal 0.45.1", "prometheus", "reqwest 0.11.20", "risingwave_common", @@ -12000,6 +12373,17 @@ dependencies = [ "ordered-multimap", ] +[[package]] +name = "rust-ini" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d625ed57d8f49af6cfa514c42e1a71fadcff60eb0b1c517ff82fe41aa025b41" +dependencies = [ + "cfg-if", + "ordered-multimap", + "trim-in-place", +] + [[package]] name = "rust_decimal" version = "1.35.0" @@ -12299,9 +12683,9 @@ dependencies = [ [[package]] name = "ryu" -version = "1.0.15" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" +checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" [[package]] name = "ryu-js" @@ -14893,7 +15277,7 @@ dependencies = [ "bytes", "futures-core", "futures-util", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "http-body-util", "http-range-header", @@ -15062,6 +15446,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "trim-in-place" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "343e926fc669bc8cde4fa3129ab681c63671bae288b1f1081ceee6d9d37904fc" + [[package]] name = "triomphe" version = "0.1.11" diff --git a/Cargo.toml b/Cargo.toml index 7e7910234bd7b..46977de279202 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -128,6 +128,8 @@ prost = { version = "0.12" } icelake = { git = "https://github.com/icelake-io/icelake", rev = "54fd72fbd1dd8c592f05eeeb79223c8a6a33c297", features = [ "prometheus", ] } +iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "e6ae6229dfd0c0a8793cde42a7d626c704ea9088" } +iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "e6ae6229dfd0c0a8793cde42a7d626c704ea9088" } arrow-array = "50" arrow-arith = "50" arrow-cast = "50" diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 8d4d51e6e4233..f6a640daf4a6c 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -67,6 +67,8 @@ google-cloud-gax = "0.17.0" google-cloud-googleapis = { version = "0.13", features = ["pubsub", "bigquery"] } google-cloud-pubsub = "0.25" http = "0.2" +iceberg = { workspace = true } +iceberg-catalog-rest = { workspace = true } icelake = { workspace = true } indexmap = { version = "1.9.3", features = ["serde"] } itertools = { workspace = true } @@ -87,6 +89,7 @@ num-bigint = "0.4" opendal = "0.45" openssl = "0.10" parking_lot = { workspace = true } +parquet = { workspace = true } paste = "1" pg_bigdecimal = { git = "https://github.com/risingwavelabs/rust-pg_bigdecimal", rev = "0b7893d88894ca082b4525f94f812da034486f7c" } postgres-openssl = "0.5.0" @@ -153,6 +156,7 @@ tokio-stream = "0.1" tokio-util = { version = "0.7", features = ["codec", "io"] } tonic = { workspace = true } tracing = "0.1" +typed-builder = "^0.18" url = "2" urlencoding = "2" uuid = { version = "1", features = ["v4", "fast-rng"] } diff --git a/src/connector/src/error.rs b/src/connector/src/error.rs index ab4b3e7bc37b5..dec3a35bad08b 100644 --- a/src/connector/src/error.rs +++ b/src/connector/src/error.rs @@ -57,6 +57,7 @@ def_anyhow_newtype! { async_nats::jetstream::context::CreateStreamError => "Nats error", async_nats::jetstream::stream::ConsumerError => "Nats error", icelake::Error => "Iceberg error", + iceberg::Error => "IcebergV2 error", redis::RedisError => "Redis error", arrow_schema::ArrowError => "Arrow error", google_cloud_pubsub::client::google_cloud_auth::error::Error => "Google Cloud error", diff --git a/src/connector/src/sink/iceberg/jni_catalog.rs b/src/connector/src/sink/iceberg/jni_catalog.rs index d88a63d398c65..4d3a1d11313d1 100644 --- a/src/connector/src/sink/iceberg/jni_catalog.rs +++ b/src/connector/src/sink/iceberg/jni_catalog.rs @@ -15,15 +15,23 @@ //! This module provide jni catalog. use std::collections::HashMap; +use std::fmt::Debug; use std::sync::Arc; use anyhow::Context; use async_trait::async_trait; +use iceberg::io::FileIO; +use iceberg::table::Table as TableV2; +use iceberg::{ + Catalog as CatalogV2, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, +}; +use iceberg_catalog_rest::LoadTableResponse; use icelake::catalog::models::{CommitTableRequest, CommitTableResponse, LoadTableResult}; use icelake::catalog::{ - BaseCatalogConfig, Catalog, CatalogRef, IcebergTableIoArgs, OperatorCreator, UpdateTable, + BaseCatalogConfig, Catalog, IcebergTableIoArgs, OperatorCreator, UpdateTable, }; use icelake::{ErrorKind, Table, TableIdentifier}; +use itertools::Itertools; use jni::objects::{GlobalRef, JObject}; use jni::JavaVM; use risingwave_jni_core::call_method; @@ -31,6 +39,7 @@ use risingwave_jni_core::jvm_runtime::{execute_with_jni_env, jobj_to_str, JVM}; use crate::error::ConnectorResult; +#[derive(Debug)] pub struct JniCatalog { java_catalog: GlobalRef, jvm: &'static JavaVM, @@ -138,13 +147,140 @@ impl Catalog for JniCatalog { } } +#[async_trait] +impl CatalogV2 for JniCatalog { + /// List namespaces from table. + async fn list_namespaces( + &self, + _parent: Option<&NamespaceIdent>, + ) -> iceberg::Result> { + todo!() + } + + /// Create a new namespace inside the catalog. + async fn create_namespace( + &self, + _namespace: &iceberg::NamespaceIdent, + _properties: HashMap, + ) -> iceberg::Result { + todo!() + } + + /// Get a namespace information from the catalog. + async fn get_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result { + todo!() + } + + /// Check if namespace exists in catalog. + async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> iceberg::Result { + todo!() + } + + /// Drop a namespace from the catalog. + async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<()> { + todo!() + } + + /// List tables from namespace. + async fn list_tables(&self, _namespace: &NamespaceIdent) -> iceberg::Result> { + todo!() + } + + async fn update_namespace( + &self, + _namespace: &NamespaceIdent, + _properties: HashMap, + ) -> iceberg::Result<()> { + todo!() + } + + /// Create a new table inside the namespace. + async fn create_table( + &self, + _namespace: &NamespaceIdent, + _creation: TableCreation, + ) -> iceberg::Result { + todo!() + } + + /// Load table from the catalog. + async fn load_table(&self, table: &TableIdent) -> iceberg::Result { + execute_with_jni_env(self.jvm, |env| { + let table_name_str = format!( + "{}.{}", + table.namespace().clone().inner().into_iter().join("."), + table.name() + ); + + let table_name_jstr = env.new_string(&table_name_str).unwrap(); + + let result_json = + call_method!(env, self.java_catalog.as_obj(), {String loadTable(String)}, + &table_name_jstr) + .with_context(|| format!("Failed to load iceberg table: {table_name_str}"))?; + + let rust_json_str = jobj_to_str(env, result_json)?; + + let resp: LoadTableResponse = serde_json::from_str(&rust_json_str)?; + + let metadata_location = resp.metadata_location.ok_or_else(|| { + icelake::Error::new( + ErrorKind::IcebergFeatureUnsupported, + "Loading uncommitted table is not supported!", + ) + })?; + + tracing::info!("Table metadata location of {table_name_str} is {metadata_location}"); + + let table_metadata = resp.metadata; + + let file_io = FileIO::from_path(&metadata_location)? + .with_props(self.config.table_io_configs.iter()) + .build()?; + + Ok(TableV2::builder() + .file_io(file_io) + .identifier(table.clone()) + .metadata(table_metadata) + .build()) + }) + .map_err(|e| { + iceberg::Error::new( + iceberg::ErrorKind::Unexpected, + "Failed to load iceberg table.", + ) + .with_source(e) + }) + } + + /// Drop a table from the catalog. + async fn drop_table(&self, _table: &TableIdent) -> iceberg::Result<()> { + todo!() + } + + /// Check if a table exists in the catalog. + async fn table_exists(&self, _table: &TableIdent) -> iceberg::Result { + todo!() + } + + /// Rename a table in the catalog. + async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> iceberg::Result<()> { + todo!() + } + + /// Update a table to the catalog. + async fn update_table(&self, _commit: TableCommit) -> iceberg::Result { + todo!() + } +} + impl JniCatalog { - pub fn build( + fn build( base_config: BaseCatalogConfig, name: impl ToString, catalog_impl: impl ToString, java_catalog_props: HashMap, - ) -> ConnectorResult { + ) -> ConnectorResult { let jvm = JVM.get_or_init()?; execute_with_jni_env(jvm, |env| { @@ -175,12 +311,32 @@ impl JniCatalog { let jni_catalog = env.new_global_ref(jni_catalog_wrapper.l().unwrap())?; - Ok(Arc::new(Self { + Ok(Self { java_catalog: jni_catalog, jvm, config: base_config, - }) as CatalogRef) + }) }) .map_err(Into::into) } + + pub fn build_catalog( + base_config: BaseCatalogConfig, + name: impl ToString, + catalog_impl: impl ToString, + java_catalog_props: HashMap, + ) -> ConnectorResult> { + let catalog = Self::build(base_config, name, catalog_impl, java_catalog_props)?; + Ok(Arc::new(catalog) as Arc) + } + + pub fn build_catalog_v2( + base_config: BaseCatalogConfig, + name: impl ToString, + catalog_impl: impl ToString, + java_catalog_props: HashMap, + ) -> ConnectorResult> { + let catalog = Self::build(base_config, name, catalog_impl, java_catalog_props)?; + Ok(Arc::new(catalog) as Arc) + } } diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 7d40570fc8fc2..492fa02a1fc44 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -15,6 +15,7 @@ mod jni_catalog; mod mock_catalog; mod prometheus; +mod storage_catalog; use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; @@ -27,6 +28,8 @@ use arrow_schema::{ DataType as ArrowDataType, Field as ArrowField, Fields, Schema as ArrowSchema, SchemaRef, }; use async_trait::async_trait; +use iceberg::table::Table as TableV2; +use iceberg::{Catalog as CatalogV2, TableIdent}; use icelake::catalog::{ load_catalog, load_iceberg_base_catalog_config, BaseCatalogConfig, CatalogRef, CATALOG_NAME, CATALOG_TYPE, @@ -49,6 +52,7 @@ use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized; use risingwave_pb::connector_service::sink_metadata::SerializedMetadata; use risingwave_pb::connector_service::SinkMetadata; use serde_derive::Deserialize; +use storage_catalog::StorageCatalogConfig; use thiserror_ext::AsReport; use url::Url; use with_options::WithOptions; @@ -401,7 +405,7 @@ impl IcebergConfig { _ => unreachable!(), }; - jni_catalog::JniCatalog::build( + jni_catalog::JniCatalog::build_catalog( base_catalog_config, self.catalog_name(), catalog_impl, @@ -432,6 +436,79 @@ impl IcebergConfig { } } +impl IcebergConfig { + fn full_table_name_v2(&self) -> Result { + let ret = if let Some(database_name) = &self.database_name { + TableIdent::from_strs(vec![database_name, &self.table_name]) + } else { + TableIdent::from_strs(vec![&self.table_name]) + }; + + ret.context("Failed to create table identifier") + .map_err(|e| SinkError::Iceberg(anyhow!(e))) + } + + async fn create_catalog_v2(&self) -> ConnectorResult> { + match self.catalog_type() { + "storage" => { + let config = StorageCatalogConfig::builder() + .warehouse(self.path.clone()) + .access_key(self.access_key.clone()) + .secret_key(self.secret_key.clone()) + .region(self.region.clone()) + .endpoint(self.endpoint.clone()) + .build(); + let catalog = storage_catalog::StorageCatalog::new(config)?; + Ok(Arc::new(catalog)) + } + "rest" => { + let config = iceberg_catalog_rest::RestCatalogConfig::builder() + .uri(self.uri.clone().ok_or_else(|| { + SinkError::Iceberg(anyhow!("`catalog.uri` must be set in rest catalog")) + })?) + .build(); + let catalog = iceberg_catalog_rest::RestCatalog::new(config).await?; + Ok(Arc::new(catalog)) + } + catalog_type if catalog_type == "hive" || catalog_type == "jdbc" => { + // Create java catalog + let (base_catalog_config, java_catalog_props) = self.build_jni_catalog_configs()?; + let catalog_impl = match catalog_type { + "hive" => "org.apache.iceberg.hive.HiveCatalog", + "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog", + _ => unreachable!(), + }; + + jni_catalog::JniCatalog::build_catalog_v2( + base_catalog_config, + self.catalog_name(), + catalog_impl, + java_catalog_props, + ) + } + _ => { + bail!( + "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`", + self.catalog_type() + ) + } + } + } + + pub async fn load_table_v2(&self) -> ConnectorResult { + let catalog = self + .create_catalog_v2() + .await + .context("Unable to load iceberg catalog")?; + + let table_id = self + .full_table_name_v2() + .context("Unable to parse table name")?; + + catalog.load_table(&table_id).await.map_err(Into::into) + } +} + pub struct IcebergSink { config: IcebergConfig, param: SinkParam, diff --git a/src/connector/src/sink/iceberg/storage_catalog.rs b/src/connector/src/sink/iceberg/storage_catalog.rs new file mode 100644 index 0000000000000..d440da1d04615 --- /dev/null +++ b/src/connector/src/sink/iceberg/storage_catalog.rs @@ -0,0 +1,213 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! This module provide storage catalog. + +use std::collections::HashMap; + +use async_trait::async_trait; +use iceberg::io::{FileIO, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; +use iceberg::spec::TableMetadata; +use iceberg::table::Table; +use iceberg::{ + Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, + TableIdent, +}; +use thiserror_ext::AsReport; +use typed_builder::TypedBuilder; + +#[derive(Debug, TypedBuilder)] +pub struct StorageCatalogConfig { + warehouse: String, + access_key: String, + secret_key: String, + endpoint: Option, + region: Option, +} + +/// File system catalog. +#[derive(Debug)] +pub struct StorageCatalog { + warehouse: String, + file_io: FileIO, +} + +impl StorageCatalog { + pub fn new(config: StorageCatalogConfig) -> Result { + let mut file_io_builder = FileIO::from_path(&config.warehouse)? + .with_prop(S3_ACCESS_KEY_ID, config.access_key) + .with_prop(S3_SECRET_ACCESS_KEY, config.secret_key); + file_io_builder = if let Some(endpoint) = config.endpoint { + file_io_builder.with_prop(S3_ENDPOINT, endpoint) + } else { + file_io_builder + }; + file_io_builder = if let Some(region) = config.region { + file_io_builder.with_prop(S3_REGION, region) + } else { + file_io_builder + }; + + Ok(StorageCatalog { + warehouse: config.warehouse, + file_io: file_io_builder.build()?, + }) + } + + /// Check if version hint file exist. + /// + /// `table_path`: relative path of table dir under warehouse root. + async fn is_version_hint_exist(&self, table_path: &str) -> Result { + self.file_io + .is_exist(format!("{table_path}/metadata/version-hint.text").as_str()) + .await + .map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + format!("check if version hint exist failed: {}", err.as_report()), + ) + }) + } + + /// Read version hint of table. + /// + /// `table_path`: relative path of table dir under warehouse root. + async fn read_version_hint(&self, table_path: &str) -> Result { + let content = self + .file_io + .new_input(format!("{table_path}/metadata/version-hint.text").as_str())? + .read() + .await?; + let version_hint = String::from_utf8(content.to_vec()).map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Fail to covert version_hint from utf8 to string: {}", + err.as_report() + ), + ) + })?; + + version_hint + .parse() + .map_err(|_| Error::new(ErrorKind::DataInvalid, "parse version hint failed")) + } +} + +#[async_trait] +impl Catalog for StorageCatalog { + /// List namespaces from table. + async fn list_namespaces( + &self, + _parent: Option<&NamespaceIdent>, + ) -> iceberg::Result> { + todo!() + } + + /// Create a new namespace inside the catalog. + async fn create_namespace( + &self, + _namespace: &iceberg::NamespaceIdent, + _properties: HashMap, + ) -> iceberg::Result { + todo!() + } + + /// Get a namespace information from the catalog. + async fn get_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result { + todo!() + } + + /// Check if namespace exists in catalog. + async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> iceberg::Result { + todo!() + } + + /// Drop a namespace from the catalog. + async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> iceberg::Result<()> { + todo!() + } + + /// List tables from namespace. + async fn list_tables(&self, _namespace: &NamespaceIdent) -> iceberg::Result> { + todo!() + } + + async fn update_namespace( + &self, + _namespace: &NamespaceIdent, + _properties: HashMap, + ) -> iceberg::Result<()> { + todo!() + } + + /// Create a new table inside the namespace. + async fn create_table( + &self, + _namespace: &NamespaceIdent, + _creation: TableCreation, + ) -> iceberg::Result { + todo!() + } + + /// Load table from the catalog. + async fn load_table(&self, table: &TableIdent) -> iceberg::Result
{ + let table_path = { + let mut names = table.namespace.clone().inner(); + names.push(table.name.to_string()); + format!("{}/{}", self.warehouse, names.join("/")) + }; + let path = if self.is_version_hint_exist(&table_path).await? { + let version_hint = self.read_version_hint(&table_path).await?; + format!("{table_path}/metadata/v{}.metadata.json", version_hint) + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "no table version hint found", + )); + }; + + let metadata_file = self.file_io.new_input(path)?; + let metadata_file_content = metadata_file.read().await?; + let table_metadata = serde_json::from_slice::(&metadata_file_content)?; + + Ok(Table::builder() + .metadata(table_metadata) + .identifier(table.clone()) + .file_io(self.file_io.clone()) + // Only support readonly table for storage catalog now. + .readonly(true) + .build()) + } + + /// Drop a table from the catalog. + async fn drop_table(&self, _table: &TableIdent) -> iceberg::Result<()> { + todo!() + } + + /// Check if a table exists in the catalog. + async fn table_exists(&self, _table: &TableIdent) -> iceberg::Result { + todo!() + } + + /// Rename a table in the catalog. + async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> iceberg::Result<()> { + todo!() + } + + /// Update a table to the catalog. + async fn update_table(&self, _commit: TableCommit) -> iceberg::Result
{ + todo!() + } +}