From 7c3396257d0090f6330a8b9a30dda0371298531b Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Fri, 12 Jan 2024 14:06:27 +0800 Subject: [PATCH] feat(udf): add initial support for WASM-based Rust UDF (#14271) Signed-off-by: Runji Wang Co-authored-by: xxchan Co-authored-by: wangrunji0408 --- Cargo.lock | 1162 ++++++++++++++++- Cargo.toml | 3 +- ci/scripts/build-other.sh | 8 + ci/scripts/run-e2e-test.sh | 5 + e2e_test/udf/wasm/.cargo/config.toml | 2 + e2e_test/udf/wasm/.gitignore | 2 + e2e_test/udf/wasm/Cargo.toml | 14 + e2e_test/udf/wasm/src/lib.rs | 80 ++ e2e_test/udf/wasm_udf.slt | 93 ++ proto/expr.proto | 10 + proto/meta.proto | 1 + src/common/src/config.rs | 4 + src/common/src/system_param/mod.rs | 2 + src/common/src/system_param/reader.rs | 4 + src/config/example.toml | 1 + src/expr/core/Cargo.toml | 3 + src/expr/core/src/expr/build.rs | 4 +- src/expr/core/src/expr/expr_udf.rs | 145 +- src/expr/core/src/expr/mod.rs | 1 + .../core/src/table_function/user_defined.rs | 53 +- src/expr/udf/README.md | 118 ++ src/frontend/Cargo.toml | 3 + src/frontend/src/binder/expr/function.rs | 7 - src/frontend/src/handler/create_function.rs | 220 +++- src/meta/src/backup_restore/restore.rs | 1 + src/meta/src/manager/catalog/mod.rs | 1 + src/object_store/src/object/mod.rs | 9 +- src/sqlparser/src/ast/mod.rs | 4 + src/sqlparser/src/keywords.rs | 1 + src/sqlparser/src/parser.rs | 18 +- src/storage/hummock_trace/Cargo.toml | 2 +- src/utils/pgwire/src/pg_protocol.rs | 2 +- src/utils/runtime/src/logger.rs | 3 +- src/workspace-hack/Cargo.toml | 9 +- 34 files changed, 1864 insertions(+), 131 deletions(-) create mode 100644 e2e_test/udf/wasm/.cargo/config.toml create mode 100644 e2e_test/udf/wasm/.gitignore create mode 100644 e2e_test/udf/wasm/Cargo.toml create mode 100644 e2e_test/udf/wasm/src/lib.rs create mode 100644 e2e_test/udf/wasm_udf.slt create mode 100644 src/expr/udf/README.md diff --git a/Cargo.lock b/Cargo.lock index 713e70c52449e..ff20f999220dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -84,6 +84,12 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" +[[package]] +name = "ambient-authority" +version = "0.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9d4ee0d472d1cd2e28c97dfa124b3d8d992e10eb0a035f33f5d12e3a177ba3b" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -228,6 +234,12 @@ dependencies = [ "syn 2.0.37", ] +[[package]] +name = "arbitrary" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d5a26814d8dcb93b0e5a0ff3c6d80a8843bafb21b39e8e18a6f05471870e110" + [[package]] name = "arc-swap" version = "1.6.0" @@ -619,6 +631,22 @@ dependencies = [ "regex-syntax 0.8.0", ] +[[package]] +name = "arrow-udf-wasm" +version = "0.1.0" +source = "git+https://github.com/risingwavelabs/arrow-udf.git?rev=f9a9e0d#f9a9e0d41d1a4ae26a6d90ac8aebf2e38a0c8a55" +dependencies = [ + "anyhow", + "arrow-array 49.0.0", + "arrow-ipc 49.0.0", + "arrow-schema 49.0.0", + "base64 0.21.4", + "genawaiter", + "lazy_static", + "wasmtime", + "wasmtime-wasi", +] + [[package]] name = "assert_matches" version = "1.5.0" @@ -886,7 +914,7 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fee3da8ef1276b0bee5dd1c7258010d8fffd31801447323115a25560e1327b89" dependencies = [ - "proc-macro-error", + "proc-macro-error 1.0.4", "proc-macro2", "quote", "syn 1.0.109", @@ -1391,7 +1419,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33b8de67cc41132507eeece2584804efcb15f85ba516e34c944b7667f480397a" dependencies = [ "heck 0.3.3", - "proc-macro-error", + "proc-macro-error 1.0.4", "proc-macro2", "quote", "syn 1.0.109", @@ -1808,6 +1836,81 @@ dependencies = [ "serde", ] +[[package]] +name = "cap-fs-ext" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b779b2d0a001c125b4584ad586268fb4b92d957bff8d26d7fe0dd78283faa814" +dependencies = [ + "cap-primitives", + "cap-std", + "io-lifetimes 2.0.3", + "windows-sys 0.48.0", +] + +[[package]] +name = "cap-net-ext" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ffc30dee200c20b4dcb80572226f42658e1d9c4b668656d7cc59c33d50e396e" +dependencies = [ + "cap-primitives", + "cap-std", + "rustix 0.38.28", + "smallvec", +] + +[[package]] +name = "cap-primitives" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bf30c373a3bee22c292b1b6a7a26736a38376840f1af3d2d806455edf8c3899" +dependencies = [ + "ambient-authority", + "fs-set-times", + "io-extras", + "io-lifetimes 2.0.3", + "ipnet", + "maybe-owned", + "rustix 0.38.28", + "windows-sys 0.48.0", + "winx", +] + +[[package]] +name = "cap-rand" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "577de6cff7c2a47d6b13efe5dd28bf116bd7f8f7db164ea95b7cc2640711f522" +dependencies = [ + "ambient-authority", + "rand", +] + +[[package]] +name = "cap-std" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84bade423fa6403efeebeafe568fdb230e8c590a275fba2ba978dd112efcf6e9" +dependencies = [ + "cap-primitives", + "io-extras", + "io-lifetimes 2.0.3", + "rustix 0.38.28", +] + +[[package]] +name = "cap-time-ext" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8f52b3c8f4abfe3252fd0a071f3004aaa3b18936ec97bdbd8763ce03aff6247" +dependencies = [ + "cap-primitives", + "once_cell", + "rustix 0.38.28", + "winx", +] + [[package]] name = "cargo-platform" version = "0.1.3" @@ -2248,6 +2351,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "cpp_demangle" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eeaa953eaad386a53111e47172c2fedba671e5684c8dd601a5f474f4f118710f" +dependencies = [ + "cfg-if", +] + [[package]] name = "cpp_demangle" version = "0.4.3" @@ -2266,6 +2378,115 @@ dependencies = [ "libc", ] +[[package]] +name = "cranelift-bforest" +version = "0.103.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c22542c0b95bd3302f7ed6839869c561f2324bac2fd5e7e99f5cfa65fdc8b92" +dependencies = [ + "cranelift-entity", +] + +[[package]] +name = "cranelift-codegen" +version = "0.103.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b3db903ef2e9c8a4de2ea6db5db052c7857282952f9df604aa55d169e6000d8" +dependencies = [ + "bumpalo", + "cranelift-bforest", + "cranelift-codegen-meta", + "cranelift-codegen-shared", + "cranelift-control", + "cranelift-entity", + "cranelift-isle", + "gimli", + "hashbrown 0.14.0", + "log", + "regalloc2", + "smallvec", + "target-lexicon", +] + +[[package]] +name = "cranelift-codegen-meta" +version = "0.103.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6590feb5a1d6438f974bf6a5ac4dddf69fca14e1f07f3265d880f69e61a94463" +dependencies = [ + "cranelift-codegen-shared", +] + +[[package]] +name = "cranelift-codegen-shared" +version = "0.103.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7239038c56fafe77fddc8788fc8533dd6c474dc5bdc5637216404f41ba807330" + +[[package]] +name = "cranelift-control" +version = "0.103.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7dc9c595341404d381d27a3d950160856b35b402275f0c3990cd1ad683c8053" +dependencies = [ + "arbitrary", +] + +[[package]] +name = "cranelift-entity" +version = "0.103.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44e3ee532fc4776c69bcedf7e62f9632cbb3f35776fa9a525cdade3195baa3f7" +dependencies = [ + "serde", + "serde_derive", +] + +[[package]] +name = "cranelift-frontend" +version = "0.103.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a612c94d09e653662ec37681dc2d6fd2b9856e6df7147be0afc9aabb0abf19df" +dependencies = [ + "cranelift-codegen", + "log", + "smallvec", + "target-lexicon", +] + +[[package]] +name = "cranelift-isle" +version = "0.103.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85db9830abeb1170b7d29b536ffd55af1d4d26ac8a77570b5d1aca003bf225cc" + +[[package]] +name = "cranelift-native" +version = "0.103.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "301ef0edafeaeda5771a5d2db64ac53e1818ae3111220a185677025fe91db4a1" +dependencies = [ + "cranelift-codegen", + "libc", + "target-lexicon", +] + +[[package]] +name = "cranelift-wasm" +version = "0.103.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "380f0abe8264e4570ac615fc31cef32a3b90a77f7eb97b08331f9dd357b1f500" +dependencies = [ + "cranelift-codegen", + "cranelift-entity", + "cranelift-frontend", + "itertools 0.10.5", + "log", + "smallvec", + "wasmparser", + "wasmtime-types", +] + [[package]] name = "crc" version = "3.0.1" @@ -2306,7 +2527,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a572c5a5165c71c6a34cd5391521faf590f0e216031574375149fd9666ec5cad" dependencies = [ "petgraph", - "proc-macro-error", + "proc-macro-error 1.0.4", "proc-macro2", "quote", "syn 2.0.37", @@ -3180,6 +3401,47 @@ dependencies = [ "subtle", ] +[[package]] +name = "directories-next" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "339ee130d97a610ea5a5872d2bbb130fdf68884ff09d3028b81bec8a1ac23bbc" +dependencies = [ + "cfg-if", + "dirs-sys-next", +] + +[[package]] +name = "dirs" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059" +dependencies = [ + "dirs-sys", +] + +[[package]] +name = "dirs-sys" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6" +dependencies = [ + "libc", + "redox_users", + "winapi", +] + +[[package]] +name = "dirs-sys-next" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d" +dependencies = [ + "libc", + "redox_users", + "winapi", +] + [[package]] name = "dissimilar" version = "1.0.7" @@ -3607,6 +3869,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" +[[package]] +name = "fallible-iterator" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" + [[package]] name = "fancy-regex" version = "0.13.0" @@ -3642,6 +3910,17 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6999dc1837253364c2ebb0704ba97994bd874e8f195d665c50b7548f6ea92764" +[[package]] +name = "fd-lock" +version = "4.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93f7a0db71c99f68398f80653ed05afb0b00e062e1a20c7ff849c4edfabbbcc" +dependencies = [ + "cfg-if", + "rustix 0.38.28", + "windows-sys 0.52.0", +] + [[package]] name = "ff" version = "0.12.1" @@ -3995,6 +4274,17 @@ dependencies = [ "autocfg", ] +[[package]] +name = "fs-set-times" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "033b337d725b97690d86893f9de22b67b80dcc4e9ad815f348254c38119db8fb" +dependencies = [ + "io-lifetimes 2.0.3", + "rustix 0.38.28", + "windows-sys 0.52.0", +] + [[package]] name = "fs2" version = "0.4.3" @@ -4180,6 +4470,19 @@ dependencies = [ "byteorder", ] +[[package]] +name = "fxprof-processed-profile" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27d12c0aed7f1e24276a241aadc4cb8ea9f83000f34bc062b7cc2d51e3b0fabd" +dependencies = [ + "bitflags 2.4.0", + "debugid", + "fxhash", + "serde", + "serde_json", +] + [[package]] name = "gcp-bigquery-client" version = "0.18.0" @@ -4203,6 +4506,36 @@ dependencies = [ "yup-oauth2", ] +[[package]] +name = "genawaiter" +version = "0.99.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c86bd0361bcbde39b13475e6e36cb24c329964aa2611be285289d1e4b751c1a0" +dependencies = [ + "genawaiter-macro", + "genawaiter-proc-macro", + "proc-macro-hack", +] + +[[package]] +name = "genawaiter-macro" +version = "0.99.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b32dfe1fdfc0bbde1f22a5da25355514b5e450c33a6af6770884c8750aedfbc" + +[[package]] +name = "genawaiter-proc-macro" +version = "0.99.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "784f84eebc366e15251c4a8c3acee82a6a6f427949776ecb88377362a9621738" +dependencies = [ + "proc-macro-error 0.4.12", + "proc-macro-hack", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "generator" version = "0.7.5" @@ -4244,6 +4577,11 @@ name = "gimli" version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0" +dependencies = [ + "fallible-iterator 0.3.0", + "indexmap 2.0.0", + "stable_deref_trait", +] [[package]] name = "glob" @@ -4760,6 +5098,12 @@ dependencies = [ "uuid", ] +[[package]] +name = "id-arena" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25a2bc672d1148e28034f176e01fffebb08b35768468cc954630da77a1449005" + [[package]] name = "ident_case" version = "1.0.1" @@ -4883,6 +5227,16 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "io-extras" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c301e73fb90e8a29e600a9f402d095765f74310d582916a952f618836a1bd1ed" +dependencies = [ + "io-lifetimes 2.0.3", + "windows-sys 0.52.0", +] + [[package]] name = "io-lifetimes" version = "1.0.11" @@ -4894,6 +5248,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "io-lifetimes" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a611371471e98973dbcab4e0ec66c31a10bc356eeb4d54a0e05eac8158fe38c" + [[package]] name = "ipnet" version = "2.8.0" @@ -4944,6 +5304,26 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" +[[package]] +name = "ittapi" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b996fe614c41395cdaedf3cf408a9534851090959d90d54a535f675550b64b1" +dependencies = [ + "anyhow", + "ittapi-sys", + "log", +] + +[[package]] +name = "ittapi-sys" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52f5385394064fa2c886205dba02598013ce83d3e92d33dbdc0c52fe0e7bf4fc" +dependencies = [ + "cc", +] + [[package]] name = "java-locator" version = "0.1.5" @@ -5094,6 +5474,12 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" +[[package]] +name = "leb128" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" + [[package]] name = "lexical-core" version = "0.8.5" @@ -5224,6 +5610,17 @@ version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7012b1bbb0719e1097c47611d3898568c546d597c2e74d66f6087edd5233ff4" +[[package]] +name = "libredox" +version = "0.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85c833ca1e66078851dba29046874e38f08b2c883700aa29a03ddd3b23814ee8" +dependencies = [ + "bitflags 2.4.0", + "libc", + "redox_syscall 0.4.1", +] + [[package]] name = "libsqlite3-sys" version = "0.26.0" @@ -5400,8 +5797,17 @@ dependencies = [ ] [[package]] -name = "mach2" -version = "0.4.1" +name = "mach" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b823e83b2affd8f40a9ee8c29dbc56404c1e34cd2710921f2801e2cf29527afa" +dependencies = [ + "libc", +] + +[[package]] +name = "mach2" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d0d1830bcd151a6fc4aea1369af235b36c1528fe976b8ff678683c9995eade8" dependencies = [ @@ -5587,6 +5993,12 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed1202b2a6f884ae56f04cff409ab315c5ce26b5e58d7412e484f01fd52f52ef" +[[package]] +name = "maybe-owned" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4facc753ae494aeb6e3c22f839b158aebd4f9270f55cd3c79906c45476c47ab4" + [[package]] name = "md-5" version = "0.10.5" @@ -5620,6 +6032,15 @@ dependencies = [ "thiserror", ] +[[package]] +name = "memfd" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2cffa4ad52c6f791f4f8b15f0c05f9824b2ced1160e88cc393d64fff9a8ac64" +dependencies = [ + "rustix 0.38.28", +] + [[package]] name = "memmap2" version = "0.5.10" @@ -5769,7 +6190,7 @@ dependencies = [ "heck 0.4.1", "num-bigint", "proc-macro-crate 1.3.1", - "proc-macro-error", + "proc-macro-error 1.0.4", "proc-macro2", "quote", "syn 2.0.37", @@ -6165,6 +6586,9 @@ version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9cf5f9dd3933bd50a9e1f149ec995f39ae2c496d31fd772c1fd45ebc27e902b0" dependencies = [ + "crc32fast", + "hashbrown 0.14.0", + "indexmap 2.0.0", "memchr", ] @@ -6494,7 +6918,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec4c6225c69b4ca778c0aea097321a64c421cf4577b331c61b229267edabb6f8" dependencies = [ "heck 0.4.1", - "proc-macro-error", + "proc-macro-error 1.0.4", "proc-macro2", "quote", "syn 2.0.37", @@ -7020,7 +7444,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bed5017bc2ff49649c0075d0d7a9d676933c1292480c1d137776fb205b5cd18" dependencies = [ "bytes", - "fallible-iterator", + "fallible-iterator 0.2.0", "futures-util", "log", "tokio", @@ -7048,7 +7472,7 @@ dependencies = [ "base64 0.21.4", "byteorder", "bytes", - "fallible-iterator", + "fallible-iterator 0.2.0", "hmac", "md-5", "memchr", @@ -7065,7 +7489,7 @@ checksum = "8d2234cdee9408b523530a9b6d2d6b373d1db34f6a8e51dc03ded1828d7fb67c" dependencies = [ "bytes", "chrono", - "fallible-iterator", + "fallible-iterator 0.2.0", "postgres-derive", "postgres-protocol", "serde", @@ -7215,16 +7639,42 @@ dependencies = [ "toml_edit 0.20.2", ] +[[package]] +name = "proc-macro-error" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18f33027081eba0a6d8aba6d1b1c3a3be58cbb12106341c2d5759fcd9b5277e7" +dependencies = [ + "proc-macro-error-attr 0.4.12", + "proc-macro2", + "quote", + "syn 1.0.109", + "version_check", +] + [[package]] name = "proc-macro-error" version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" dependencies = [ - "proc-macro-error-attr", + "proc-macro-error-attr 1.0.4", + "proc-macro2", + "quote", + "syn 1.0.109", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a5b4b77fdb63c1eca72173d68d24501c54ab1269409f6b672c85deb18af69de" +dependencies = [ "proc-macro2", "quote", "syn 1.0.109", + "syn-mid", "version_check", ] @@ -7478,6 +7928,15 @@ dependencies = [ "autotools", ] +[[package]] +name = "psm" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5787f7cda34e3033a72192c018bc5883100330f362ef279a8cbccfce8bb4e874" +dependencies = [ + "cc", +] + [[package]] name = "ptr_meta" version = "0.1.4" @@ -7757,6 +8216,17 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "redox_users" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a18479200779601e498ada4e8c1e1f50e3ee19deb0259c25825a98b5603b2cb4" +dependencies = [ + "getrandom", + "libredox", + "thiserror", +] + [[package]] name = "ref-cast" version = "1.0.20" @@ -7777,6 +8247,19 @@ dependencies = [ "syn 2.0.37", ] +[[package]] +name = "regalloc2" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad156d539c879b7a24a363a2016d77961786e71f48f2e2fc8302a92abd2429a6" +dependencies = [ + "hashbrown 0.13.2", + "log", + "rustc-hash", + "slice-group-by", + "smallvec", +] + [[package]] name = "regex" version = "1.10.0" @@ -8319,7 +8802,7 @@ name = "risingwave_common_proc_macro" version = "1.5.0-alpha" dependencies = [ "bae", - "proc-macro-error", + "proc-macro-error 1.0.4", "proc-macro2", "quote", "syn 1.0.109", @@ -8614,6 +9097,7 @@ dependencies = [ "anyhow", "arrow-array 49.0.0", "arrow-schema 49.0.0", + "arrow-udf-wasm", "async-trait", "auto_impl", "await-tree", @@ -8629,11 +9113,13 @@ dependencies = [ "futures-util", "itertools 0.12.0", "madsim-tokio", + "moka", "num-traits", "parse-display", "paste", "risingwave_common", "risingwave_expr_macro", + "risingwave_object_store", "risingwave_pb", "risingwave_udf", "smallvec", @@ -8698,11 +9184,13 @@ dependencies = [ "anyhow", "arc-swap", "arrow-schema 49.0.0", + "arrow-udf-wasm", "assert_matches", "async-recursion", "async-trait", "auto_enums", "auto_impl", + "base64 0.21.4", "bk-tree", "bytes", "clap", @@ -8741,6 +9229,7 @@ dependencies = [ "risingwave_expr", "risingwave_expr_impl", "risingwave_hummock_sdk", + "risingwave_object_store", "risingwave_pb", "risingwave_rpc_client", "risingwave_source", @@ -8819,7 +9308,7 @@ dependencies = [ "bytes", "futures", "futures-async-stream", - "itertools 0.10.5", + "itertools 0.12.0", "madsim-tokio", "mockall", "parking_lot 0.12.1", @@ -9666,7 +10155,7 @@ checksum = "6da3636faa25820d8648e0e31c5d519bbb01f72fdf57131f0f5f7da5fed36eab" dependencies = [ "bitflags 1.3.2", "errno", - "io-lifetimes", + "io-lifetimes 1.0.11", "libc", "linux-raw-sys 0.1.4", "windows-sys 0.45.0", @@ -9680,7 +10169,7 @@ checksum = "84f3f8f960ed3b5a59055428714943298bf3fa2d4a1d53135084e0544829d995" dependencies = [ "bitflags 1.3.2", "errno", - "io-lifetimes", + "io-lifetimes 1.0.11", "libc", "linux-raw-sys 0.3.8", "windows-sys 0.48.0", @@ -9694,8 +10183,10 @@ checksum = "72e572a5e8ca657d7366229cdde4bd14c4eb5499a9573d4d366fe1b599daa316" dependencies = [ "bitflags 2.4.0", "errno", + "itoa", "libc", "linux-raw-sys 0.4.12", + "once_cell", "windows-sys 0.52.0", ] @@ -9849,7 +10340,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3bd3534a9978d0aa7edd2808dc1f8f31c4d0ecd31ddf71d997b3c98e9f3c9114" dependencies = [ "heck 0.4.1", - "proc-macro-error", + "proc-macro-error 1.0.4", "proc-macro2", "quote", "syn 2.0.37", @@ -10353,6 +10844,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24188a676b6ae68c3b2cb3a01be17fbf7240ce009799bb56d5b1409051e78fde" +[[package]] +name = "shellexpand" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ccc8076840c4da029af4f87e4e8daeb0fca6b87bbb02e10cb60b791450e11e4" +dependencies = [ + "dirs", +] + [[package]] name = "shlex" version = "1.2.0" @@ -10513,6 +11013,12 @@ dependencies = [ "parking_lot 0.11.2", ] +[[package]] +name = "slice-group-by" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "826167069c09b99d56f31e9ae5c99049e932a98c9dc2dac47645b08dbbf76ba7" + [[package]] name = "smallbitset" version = "0.7.1" @@ -10624,6 +11130,12 @@ dependencies = [ "der 0.7.8", ] +[[package]] +name = "sptr" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b9b39299b249ad65f3b7e96443bad61c02ca5cd3589f46cb6d610a0fd6c0d6a" + [[package]] name = "sql-json-path" version = "0.1.0" @@ -11032,7 +11544,7 @@ version = "12.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "691e53bdc0702aba3a5abc2cffff89346fcbd4050748883c7e2f714b33a69045" dependencies = [ - "cpp_demangle", + "cpp_demangle 0.4.3", "rustc-demangle", "symbolic-common", ] @@ -11059,13 +11571,24 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "syn-mid" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea305d57546cc8cd04feb14b62ec84bf17f50e3f7b12560d7bfa9265f39d9ed" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "syn_derive" version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1329189c02ff984e9736652b1631330da25eaa6bc639089ed4915d25446cbe7b" dependencies = [ - "proc-macro-error", + "proc-macro-error 1.0.4", "proc-macro2", "quote", "syn 2.0.37", @@ -11101,6 +11624,22 @@ dependencies = [ "windows 0.51.1", ] +[[package]] +name = "system-interface" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27ce32341b2c0b70c144bbf35627fdc1ef18c76ced5e5e7b3ee8b5ba6b2ab6a0" +dependencies = [ + "bitflags 2.4.0", + "cap-fs-ext", + "cap-std", + "fd-lock", + "io-lifetimes 2.0.3", + "rustix 0.38.28", + "windows-sys 0.48.0", + "winx", +] + [[package]] name = "tagptr" version = "0.2.0" @@ -11113,6 +11652,12 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" +[[package]] +name = "target-lexicon" +version = "0.12.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c39fd04924ca3a864207c66fc2cd7d22d7c016007f9ce846cbb9326331930a" + [[package]] name = "task_stats_alloc" version = "0.1.11" @@ -11408,7 +11953,7 @@ dependencies = [ "async-trait", "byteorder", "bytes", - "fallible-iterator", + "fallible-iterator 0.2.0", "futures-channel", "futures-util", "log", @@ -11469,6 +12014,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "toml" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4f7f0dd8d50a853a531c426359045b1998f04219d88799810762cd4ad314234" +dependencies = [ + "serde", +] + [[package]] name = "toml" version = "0.7.8" @@ -11904,6 +12458,12 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b" +[[package]] +name = "unicode-xid" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" + [[package]] name = "unicode_categories" version = "0.1.1" @@ -12062,6 +12622,49 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasi-cap-std-sync" +version = "16.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "154528979a211aa28d969846e883df75705809ed9bcc70aba61460683ea7355b" +dependencies = [ + "anyhow", + "async-trait", + "cap-fs-ext", + "cap-rand", + "cap-std", + "cap-time-ext", + "fs-set-times", + "io-extras", + "io-lifetimes 2.0.3", + "once_cell", + "rustix 0.38.28", + "system-interface", + "tracing", + "wasi-common", + "windows-sys 0.48.0", +] + +[[package]] +name = "wasi-common" +version = "16.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d888b611fee7d273dd057dc009d2dd3132736f36710ffd65657ac83628d1e3b" +dependencies = [ + "anyhow", + "bitflags 2.4.0", + "cap-rand", + "cap-std", + "io-extras", + "log", + "rustix 0.38.28", + "thiserror", + "tracing", + "wasmtime", + "wiggle", + "windows-sys 0.48.0", +] + [[package]] name = "wasm-bindgen" version = "0.2.87" @@ -12128,6 +12731,15 @@ version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" +[[package]] +name = "wasm-encoder" +version = "0.38.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ad2b51884de9c7f4fe2fd1043fccb8dcad4b1e29558146ee57a144d15779f3f" +dependencies = [ + "leb128", +] + [[package]] name = "wasm-streams" version = "0.3.0" @@ -12141,6 +12753,398 @@ dependencies = [ "web-sys", ] +[[package]] +name = "wasmparser" +version = "0.118.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95ee9723b928e735d53000dec9eae7b07a60e490c85ab54abb66659fc61bfcd9" +dependencies = [ + "indexmap 2.0.0", + "semver", +] + +[[package]] +name = "wasmprinter" +version = "0.2.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d027eb8294904fc715ac0870cebe6b0271e96b90605ee21511e7565c4ce568c" +dependencies = [ + "anyhow", + "wasmparser", +] + +[[package]] +name = "wasmtime" +version = "16.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8e539fded2495422ea3c4dfa7beeddba45904eece182cf315294009e1a323bf" +dependencies = [ + "anyhow", + "async-trait", + "bincode 1.3.3", + "bumpalo", + "cfg-if", + "encoding_rs", + "fxprof-processed-profile", + "indexmap 2.0.0", + "libc", + "log", + "object", + "once_cell", + "paste", + "rayon", + "serde", + "serde_derive", + "serde_json", + "target-lexicon", + "wasm-encoder", + "wasmparser", + "wasmtime-cache", + "wasmtime-component-macro", + "wasmtime-component-util", + "wasmtime-cranelift", + "wasmtime-environ", + "wasmtime-fiber", + "wasmtime-jit", + "wasmtime-runtime", + "wasmtime-winch", + "wat", + "windows-sys 0.48.0", +] + +[[package]] +name = "wasmtime-asm-macros" +version = "16.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "660ba9143e15a2acd921820df221b73aee256bd3ca2d208d73d8adc9587ccbb9" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "wasmtime-cache" +version = "16.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3ce373743892002f9391c6741ef0cb0335b55ec899d874f311222b7e36f4594" +dependencies = [ + "anyhow", + "base64 0.21.4", + "bincode 1.3.3", + "directories-next", + "log", + "rustix 0.38.28", + "serde", + "serde_derive", + "sha2", + "toml 0.5.11", + "windows-sys 0.48.0", + "zstd 0.11.2+zstd.1.5.2", +] + +[[package]] +name = "wasmtime-component-macro" +version = "16.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12ef32643324e564e1c359e9044daa06cbf90d7e2d6c99a738d17a12959f01a5" +dependencies = [ + "anyhow", + "proc-macro2", + "quote", + "syn 2.0.37", + "wasmtime-component-util", + "wasmtime-wit-bindgen", + "wit-parser", +] + +[[package]] +name = "wasmtime-component-util" +version = "16.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c87d06c18d21a4818f354c00a85f4ebc62b2270961cd022968452b0e4dbed9d" + +[[package]] +name = "wasmtime-cranelift" +version = "16.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d648c8b4064a7911093b02237cd5569f71ca171d3a0a486bf80600b19e1cba2" +dependencies = [ + "anyhow", + "cfg-if", + "cranelift-codegen", + "cranelift-control", + "cranelift-entity", + "cranelift-frontend", + "cranelift-native", + "cranelift-wasm", + "gimli", + "log", + "object", + "target-lexicon", + "thiserror", + "wasmparser", + "wasmtime-cranelift-shared", + "wasmtime-environ", + "wasmtime-versioned-export-macros", +] + +[[package]] +name = "wasmtime-cranelift-shared" +version = "16.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290a89027688782da8ff60b12bb95695494b1874e0d0ba2ba387d23dace6d70c" +dependencies = [ + "anyhow", + "cranelift-codegen", + "cranelift-control", + "cranelift-native", + "gimli", + "object", + "target-lexicon", + "wasmtime-environ", +] + +[[package]] +name = "wasmtime-environ" +version = "16.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61eb64fb3e0da883e2df4a13a81d6282e072336e6cb6295021d0f7ab2e352754" +dependencies = [ + "anyhow", + "cranelift-entity", + "gimli", + "indexmap 2.0.0", + "log", + "object", + "serde", + "serde_derive", + "target-lexicon", + "thiserror", + "wasm-encoder", + "wasmparser", + "wasmprinter", + "wasmtime-component-util", + "wasmtime-types", +] + +[[package]] +name = "wasmtime-fiber" +version = "16.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40ecf1d3a838b0956b71ad3f8cb80069a228339775bf02dd35d86a5a68bbe443" +dependencies = [ + "anyhow", + "cc", + "cfg-if", + "rustix 0.38.28", + "wasmtime-asm-macros", + "wasmtime-versioned-export-macros", + "windows-sys 0.48.0", +] + +[[package]] +name = "wasmtime-jit" +version = "16.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f485336add49267d8859e8f8084d2d4b9a4b1564496b6f30ba5b168d50c10ceb" +dependencies = [ + "addr2line", + "anyhow", + "bincode 1.3.3", + "cfg-if", + "cpp_demangle 0.3.5", + "gimli", + "ittapi", + "log", + "object", + "rustc-demangle", + "rustix 0.38.28", + "serde", + "serde_derive", + "target-lexicon", + "wasmtime-environ", + "wasmtime-jit-debug", + "wasmtime-jit-icache-coherence", + "wasmtime-runtime", + "windows-sys 0.48.0", +] + +[[package]] +name = "wasmtime-jit-debug" +version = "16.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65e119affec40edb2fab9044f188759a00c2df9c3017278d047012a2de1efb4f" +dependencies = [ + "object", + "once_cell", + "rustix 0.38.28", + "wasmtime-versioned-export-macros", +] + +[[package]] +name = "wasmtime-jit-icache-coherence" +version = "16.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b6d197fcc34ad32ed440e1f9552fd57d1f377d9699d31dee1b5b457322c1f8a" +dependencies = [ + "cfg-if", + "libc", + "windows-sys 0.48.0", +] + +[[package]] +name = "wasmtime-runtime" +version = "16.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "794b2bb19b99ef8322ff0dd9fe1ba7e19c41036dfb260b3f99ecce128c42ff92" +dependencies = [ + "anyhow", + "cc", + "cfg-if", + "encoding_rs", + "indexmap 2.0.0", + "libc", + "log", + "mach", + "memfd", + "memoffset", + "paste", + "psm", + "rustix 0.38.28", + "sptr", + "wasm-encoder", + "wasmtime-asm-macros", + "wasmtime-environ", + "wasmtime-fiber", + "wasmtime-jit-debug", + "wasmtime-versioned-export-macros", + "wasmtime-wmemcheck", + "windows-sys 0.48.0", +] + +[[package]] +name = "wasmtime-types" +version = "16.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d995db8bb56f2cd8d2dc0ed5ffab94ffb435283b0fe6747f80f7aab40b2d06a1" +dependencies = [ + "cranelift-entity", + "serde", + "serde_derive", + "thiserror", + "wasmparser", +] + +[[package]] +name = "wasmtime-versioned-export-macros" +version = "16.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f55c5565959287c21dd0f4277ae3518dd2ae62679f655ee2dbc4396e19d210db" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.37", +] + +[[package]] +name = "wasmtime-wasi" +version = "16.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccd8370078149d49a3a47e93741553fd79b700421464b6a27ca32718192ab130" +dependencies = [ + "anyhow", + "async-trait", + "bitflags 2.4.0", + "bytes", + "cap-fs-ext", + "cap-net-ext", + "cap-rand", + "cap-std", + "cap-time-ext", + "fs-set-times", + "futures", + "io-extras", + "io-lifetimes 2.0.3", + "libc", + "log", + "once_cell", + "rustix 0.38.28", + "system-interface", + "thiserror", + "tokio", + "tracing", + "url", + "wasi-cap-std-sync", + "wasi-common", + "wasmtime", + "wiggle", + "windows-sys 0.48.0", +] + +[[package]] +name = "wasmtime-winch" +version = "16.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c6f945ff9bad96e0a69973d74f193c19f627c8adbf250e7cb73ae7564b6cc8a" +dependencies = [ + "anyhow", + "cranelift-codegen", + "gimli", + "object", + "target-lexicon", + "wasmparser", + "wasmtime-cranelift-shared", + "wasmtime-environ", + "winch-codegen", +] + +[[package]] +name = "wasmtime-wit-bindgen" +version = "16.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f328b2d4a690270324756e886ed5be3a4da4c00be0eea48253f4595ad068062b" +dependencies = [ + "anyhow", + "heck 0.4.1", + "indexmap 2.0.0", + "wit-parser", +] + +[[package]] +name = "wasmtime-wmemcheck" +version = "16.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67761d8f8c0b3c13a5d34356274b10a40baba67fe9cfabbfc379a8b414e45de2" + +[[package]] +name = "wast" +version = "35.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ef140f1b49946586078353a453a1d28ba90adfc54dde75710bc1931de204d68" +dependencies = [ + "leb128", +] + +[[package]] +name = "wast" +version = "69.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1ee37317321afde358e4d7593745942c48d6d17e0e6e943704de9bbee121e7a" +dependencies = [ + "leb128", + "memchr", + "unicode-width", + "wasm-encoder", +] + +[[package]] +name = "wat" +version = "1.0.82" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aeb338ee8dee4d4cd05e6426683f21c5087dc7cfc8903e839ccf48d43332da3c" +dependencies = [ + "wast 69.0.1", +] + [[package]] name = "web-sys" version = "0.3.64" @@ -12179,6 +13183,48 @@ dependencies = [ "web-sys", ] +[[package]] +name = "wiggle" +version = "16.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0afb26cd3269289bb314a361ff0a6685e5ce793b62181a9fe3f81ace15051697" +dependencies = [ + "anyhow", + "async-trait", + "bitflags 2.4.0", + "thiserror", + "tracing", + "wasmtime", + "wiggle-macro", +] + +[[package]] +name = "wiggle-generate" +version = "16.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cef2868fed7584d2b552fa317104858ded80021d23b073b2d682d3c932a027bd" +dependencies = [ + "anyhow", + "heck 0.4.1", + "proc-macro2", + "quote", + "shellexpand", + "syn 2.0.37", + "witx", +] + +[[package]] +name = "wiggle-macro" +version = "16.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31ae1ec11a17ea481539ee9a5719a278c9790d974060fbf71db4b2c05378780b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.37", + "wiggle-generate", +] + [[package]] name = "winapi" version = "0.3.9" @@ -12210,6 +13256,22 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "winch-codegen" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58e58c236a6abdd9ab454552b4f29e16cfa837a86897c1503313b2e62e7609ec" +dependencies = [ + "anyhow", + "cranelift-codegen", + "gimli", + "regalloc2", + "smallvec", + "target-lexicon", + "wasmparser", + "wasmtime-environ", +] + [[package]] name = "windows" version = "0.48.0" @@ -12455,6 +13517,33 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "winx" +version = "0.36.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9643b83820c0cd246ecabe5fa454dd04ba4fa67996369466d0747472d337346" +dependencies = [ + "bitflags 2.4.0", + "windows-sys 0.52.0", +] + +[[package]] +name = "wit-parser" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15df6b7b28ce94b8be39d8df5cb21a08a4f3b9f33b631aedb4aa5776f785ead3" +dependencies = [ + "anyhow", + "id-arena", + "indexmap 2.0.0", + "log", + "semver", + "serde", + "serde_derive", + "serde_json", + "unicode-xid", +] + [[package]] name = "with_options" version = "1.5.0-alpha" @@ -12464,6 +13553,18 @@ dependencies = [ "syn 2.0.37", ] +[[package]] +name = "witx" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e366f27a5cabcddb2706a78296a40b8fcc451e1a6aba2fc1d94b4a01bdaaef4b" +dependencies = [ + "anyhow", + "log", + "thiserror", + "wast 35.0.2", +] + [[package]] name = "workspace-config" version = "1.5.0-alpha" @@ -12526,6 +13627,7 @@ dependencies = [ "hmac", "hyper", "indexmap 1.9.3", + "indexmap 2.0.0", "itertools 0.11.0", "jni", "lazy_static", @@ -12571,6 +13673,8 @@ dependencies = [ "reqwest", "ring 0.16.20", "rust_decimal", + "rustc-hash", + "rustix 0.38.28", "scopeguard", "sea-orm", "sea-query", @@ -12610,6 +13714,7 @@ dependencies = [ "uuid", "whoami", "zeroize", + "zstd-sys", ] [[package]] @@ -12747,6 +13852,15 @@ version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d" +[[package]] +name = "zstd" +version = "0.11.2+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4" +dependencies = [ + "zstd-safe 5.0.2+zstd.1.5.2", +] + [[package]] name = "zstd" version = "0.12.4" @@ -12765,6 +13879,16 @@ dependencies = [ "zstd-safe 7.0.0", ] +[[package]] +name = "zstd-safe" +version = "5.0.2+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d2a5585e04f9eea4b2a3d1eca508c4dee9592a89ef6f450c11719da0726f4db" +dependencies = [ + "libc", + "zstd-sys", +] + [[package]] name = "zstd-safe" version = "6.0.6" diff --git a/Cargo.toml b/Cargo.toml index 4359d3569aca3..b09dd5def90b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,7 +59,7 @@ members = [ "src/utils/workspace-config", "src/workspace-hack", ] -exclude = ["lints"] +exclude = ["e2e_test/udf/wasm", "lints"] resolver = "2" [workspace.package] @@ -132,6 +132,7 @@ arrow-flight = "49" arrow-select = "49" arrow-ord = "49" arrow-row = "49" +arrow-udf-wasm = { git = "https://github.com/risingwavelabs/arrow-udf.git", rev = "f9a9e0d" } arrow-array-deltalake = { package = "arrow-array", version = "48.0.1" } arrow-buffer-deltalake = { package = "arrow-buffer", version = "48.0.1" } arrow-cast-deltalake = { package = "arrow-cast", version = "48.0.1" } diff --git a/ci/scripts/build-other.sh b/ci/scripts/build-other.sh index 9cd44dc78c95a..e303a5f5160cb 100755 --- a/ci/scripts/build-other.sh +++ b/ci/scripts/build-other.sh @@ -6,6 +6,12 @@ set -euo pipefail source ci/scripts/common.sh +echo "--- Build Rust UDF" +cd e2e_test/udf/wasm +rustup target add wasm32-wasi +cargo build --release +cd ../../.. + echo "--- Build Java packages" cd java mvn -B package -Dmaven.test.skip=true @@ -26,6 +32,8 @@ tar --zstd -cf java-binding-integration-test.tar.zst bin java/java-binding-integ echo "--- Upload Java artifacts" cp java/connector-node/assembly/target/risingwave-connector-1.0.0.tar.gz ./risingwave-connector.tar.gz cp java/udf-example/target/risingwave-udf-example.jar ./risingwave-udf-example.jar +cp e2e_test/udf/wasm/target/wasm32-wasi/release/udf.wasm udf.wasm buildkite-agent artifact upload ./risingwave-connector.tar.gz buildkite-agent artifact upload ./risingwave-udf-example.jar buildkite-agent artifact upload ./java-binding-integration-test.tar.zst +buildkite-agent artifact upload ./udf.wasm diff --git a/ci/scripts/run-e2e-test.sh b/ci/scripts/run-e2e-test.sh index 50c1d9d7edb94..86eae7a146f5f 100755 --- a/ci/scripts/run-e2e-test.sh +++ b/ci/scripts/run-e2e-test.sh @@ -54,6 +54,8 @@ download_and_prepare_rw "$profile" common echo "--- Download artifacts" download-and-decompress-artifact e2e_test_generated ./ download-and-decompress-artifact risingwave_e2e_extended_mode_test-"$profile" target/debug/ +mkdir -p e2e_test/udf/wasm/target/wasm32-wasi/release/ +buildkite-agent artifact download udf.wasm e2e_test/udf/wasm/target/wasm32-wasi/release/ buildkite-agent artifact download risingwave-udf-example.jar ./ mv target/debug/risingwave_e2e_extended_mode_test-"$profile" target/debug/risingwave_e2e_extended_mode_test @@ -97,6 +99,9 @@ sleep 1 sqllogictest -p 4566 -d dev './e2e_test/udf/udf.slt' pkill java +echo "--- e2e, $mode, wasm udf" +sqllogictest -p 4566 -d dev './e2e_test/udf/wasm_udf.slt' + echo "--- Kill cluster" cluster_stop diff --git a/e2e_test/udf/wasm/.cargo/config.toml b/e2e_test/udf/wasm/.cargo/config.toml new file mode 100644 index 0000000000000..6b77899cb3333 --- /dev/null +++ b/e2e_test/udf/wasm/.cargo/config.toml @@ -0,0 +1,2 @@ +[build] +target = "wasm32-wasi" diff --git a/e2e_test/udf/wasm/.gitignore b/e2e_test/udf/wasm/.gitignore new file mode 100644 index 0000000000000..fa8d85ac52f19 --- /dev/null +++ b/e2e_test/udf/wasm/.gitignore @@ -0,0 +1,2 @@ +Cargo.lock +target diff --git a/e2e_test/udf/wasm/Cargo.toml b/e2e_test/udf/wasm/Cargo.toml new file mode 100644 index 0000000000000..5e413a40e37b8 --- /dev/null +++ b/e2e_test/udf/wasm/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "udf" +version = "0.1.0" +edition = "2021" +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[lib] +crate-type = ["cdylib"] + +[dependencies] +arrow-udf = { git = "https://github.com/risingwavelabs/arrow-udf.git", rev = "f9a9e0d" } +genawaiter = "0.99" +rust_decimal = "1" +serde_json = "1" diff --git a/e2e_test/udf/wasm/src/lib.rs b/e2e_test/udf/wasm/src/lib.rs new file mode 100644 index 0000000000000..522dd471daf50 --- /dev/null +++ b/e2e_test/udf/wasm/src/lib.rs @@ -0,0 +1,80 @@ +use arrow_udf::function; +use rust_decimal::Decimal; + +#[function("int_42() -> int")] +fn int_42() -> i32 { + 42 +} + +#[function("gcd(int, int) -> int")] +fn gcd(mut a: i32, mut b: i32) -> i32 { + while b != 0 { + let t = b; + b = a % b; + a = t; + } + a +} + +#[function("gcd(int, int, int) -> int")] +fn gcd3(a: i32, b: i32, c: i32) -> i32 { + gcd(gcd(a, b), c) +} + +#[function("sleep(int) -> int")] +fn sleep(second: i32) -> i32 { + std::thread::sleep(std::time::Duration::from_secs(second as u64)); + 0 +} + +#[function("segfault() -> int")] +fn segfault() -> i32 { + unsafe { (usize::MAX as *const i32).read_volatile() } +} + +#[function("oom() -> int")] +fn oom() -> i32 { + _ = vec![0u8; usize::MAX]; + 0 +} + +#[function("create_file() -> int")] +fn create_file() -> i32 { + std::fs::File::create("test").unwrap(); + 0 +} + +#[function("length(varchar) -> int")] +#[function("length(bytea) -> int")] +fn length(s: impl AsRef<[u8]>) -> i32 { + s.as_ref().len() as i32 +} + +#[function("extract_tcp_info(bytea) -> struct")] +fn extract_tcp_info(tcp_packet: &[u8]) -> (String, String, i16, i16) { + let src_addr = std::net::Ipv4Addr::from(<[u8; 4]>::try_from(&tcp_packet[12..16]).unwrap()); + let dst_addr = std::net::Ipv4Addr::from(<[u8; 4]>::try_from(&tcp_packet[16..20]).unwrap()); + let src_port = u16::from_be_bytes(<[u8; 2]>::try_from(&tcp_packet[20..22]).unwrap()); + let dst_port = u16::from_be_bytes(<[u8; 2]>::try_from(&tcp_packet[22..24]).unwrap()); + ( + src_addr.to_string(), + dst_addr.to_string(), + src_port as i16, + dst_port as i16, + ) +} + +#[function("decimal_add(decimal, decimal) -> decimal")] +fn decimal_add(a: Decimal, b: Decimal) -> Decimal { + a + b +} + +#[function("jsonb_access(json, int) -> json")] +fn jsonb_access(json: serde_json::Value, index: i32) -> Option { + json.get(index as usize).cloned() +} + +#[function("series(int) -> setof int")] +fn series(n: i32) -> impl Iterator { + 0..n +} diff --git a/e2e_test/udf/wasm_udf.slt b/e2e_test/udf/wasm_udf.slt new file mode 100644 index 0000000000000..d182e8ff0f993 --- /dev/null +++ b/e2e_test/udf/wasm_udf.slt @@ -0,0 +1,93 @@ +# Before running this test: +# cd e2e_test/udf/wasm && cargo build --release + +statement ok +create function int_42() returns int +language wasm using link 'fs://e2e_test/udf/wasm/target/wasm32-wasi/release/udf.wasm'; + +statement ok +create function gcd(int, int) returns int +language wasm using link 'fs://e2e_test/udf/wasm/target/wasm32-wasi/release/udf.wasm'; + +statement ok +create function gcd(int, int, int) returns int +language wasm using link 'fs://e2e_test/udf/wasm/target/wasm32-wasi/release/udf.wasm'; + +statement ok +create function extract_tcp_info(bytea) returns struct +language wasm using link 'fs://e2e_test/udf/wasm/target/wasm32-wasi/release/udf.wasm'; + +statement ok +create function decimal_add(decimal, decimal) returns decimal +language wasm using link 'fs://e2e_test/udf/wasm/target/wasm32-wasi/release/udf.wasm'; + +statement ok +create function jsonb_access(jsonb, int) returns jsonb +language wasm using link 'fs://e2e_test/udf/wasm/target/wasm32-wasi/release/udf.wasm'; + +statement ok +create function series(int) returns table (x int) +language wasm using link 'fs://e2e_test/udf/wasm/target/wasm32-wasi/release/udf.wasm'; + +query I +select int_42(); +---- +42 + +query I +select gcd(25, 15); +---- +5 + +query I +select gcd(25, 15, 3); +---- +1 + +query T +select extract_tcp_info(E'\\x45000034a8a8400040065b8ac0a8000ec0a80001035d20b6d971b900000000080020200493310000020405b4' :: bytea); +---- +(192.168.0.14,192.168.0.1,861,8374) + +query R +select decimal_add(1.11, 2.22); +---- +3.33 + +query T +select jsonb_access(a::jsonb, 1) from +(values ('["a", "b", "c"]'), (null), ('[0, false]')) t(a); +---- +"b" +NULL +false + +query I +select series(5); +---- +0 +1 +2 +3 +4 + +statement ok +drop function int_42; + +statement ok +drop function gcd(int,int); + +statement ok +drop function gcd(int,int,int); + +statement ok +drop function extract_tcp_info; + +statement ok +drop function decimal_add; + +statement ok +drop function jsonb_access; + +statement ok +drop function series; diff --git a/proto/expr.proto b/proto/expr.proto index 31835ac905705..e02f7e45bb19a 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -276,6 +276,7 @@ message ExprNode { COL_DESCRIPTION = 2401; PG_GET_VIEWDEF = 2402; } + // Only use this field for function call. For other types of expression, it should be UNSPECIFIED. Type function_type = 1; data.DataType return_type = 3; oneof rex_node { @@ -461,15 +462,24 @@ message WindowFunction { WindowFrame frame = 5; } +// Note: due to historic reasons, UserDefinedFunction is a oneof variant parallel to FunctionCall, +// while UserDefinedTableFunction is embedded as a field in TableFunction. + message UserDefinedFunction { repeated ExprNode children = 1; string name = 2; repeated data.DataType arg_types = 3; string language = 4; + // For external UDF: the link to the external function service. + // For WASM UDF: the link to the wasm binary file. string link = 5; + // An unique identifier for the function. + // For external UDF, it's the name of the function in the external function service. + // For WASM UDF, it's the name of the function in the wasm binary file. string identifier = 6; } +// Additional information for user defined table functions. message UserDefinedTableFunction { repeated data.DataType arg_types = 3; string language = 4; diff --git a/proto/meta.proto b/proto/meta.proto index a54375be80827..0ce5fc887fe9d 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -551,6 +551,7 @@ message SystemParams { optional uint32 parallel_compact_size_mb = 11; optional uint32 max_concurrent_creating_streaming_jobs = 12; optional bool pause_on_next_bootstrap = 13; + optional string wasm_storage_url = 14; } message GetSystemParamsRequest {} diff --git a/src/common/src/config.rs b/src/common/src/config.rs index d7e307b30a072..3fb00ffff198f 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -907,6 +907,9 @@ pub struct SystemConfig { /// Whether to pause all data sources on next bootstrap. #[serde(default = "default::system::pause_on_next_bootstrap")] pub pause_on_next_bootstrap: Option, + + #[serde(default = "default::system::wasm_storage_url")] + pub wasm_storage_url: Option, } /// The subsections `[storage.object_store]`. @@ -961,6 +964,7 @@ impl SystemConfig { max_concurrent_creating_streaming_jobs: self.max_concurrent_creating_streaming_jobs, pause_on_next_bootstrap: self.pause_on_next_bootstrap, telemetry_enabled: None, // deprecated + wasm_storage_url: self.wasm_storage_url, } } } diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index 1449093db6b3a..366cc61d2dd53 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -55,6 +55,7 @@ macro_rules! for_all_params { { backup_storage_directory, String, Some("backup".to_string()), true }, { max_concurrent_creating_streaming_jobs, u32, Some(1_u32), true }, { pause_on_next_bootstrap, bool, Some(false), true }, + { wasm_storage_url, String, Some("fs://.risingwave/data".to_string()), false }, } }; } @@ -357,6 +358,7 @@ mod tests { (BACKUP_STORAGE_DIRECTORY_KEY, "a"), (MAX_CONCURRENT_CREATING_STREAMING_JOBS_KEY, "1"), (PAUSE_ON_NEXT_BOOTSTRAP_KEY, "false"), + (WASM_STORAGE_URL_KEY, "a"), ("a_deprecated_param", "foo"), ]; diff --git a/src/common/src/system_param/reader.rs b/src/common/src/system_param/reader.rs index 5eabb445903f1..0059974203c6d 100644 --- a/src/common/src/system_param/reader.rs +++ b/src/common/src/system_param/reader.rs @@ -80,6 +80,10 @@ impl SystemParamsReader { self.prost.pause_on_next_bootstrap.unwrap_or(false) } + pub fn wasm_storage_url(&self) -> &str { + self.prost.wasm_storage_url.as_ref().unwrap() + } + pub fn to_kv(&self) -> Vec<(String, String)> { system_params_to_kv(&self.prost).unwrap() } diff --git a/src/config/example.toml b/src/config/example.toml index 851716714ee9a..22c5b2cd559bc 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -189,3 +189,4 @@ backup_storage_url = "memory" backup_storage_directory = "backup" max_concurrent_creating_streaming_jobs = 1 pause_on_next_bootstrap = false +wasm_storage_url = "fs://.risingwave/data" diff --git a/src/expr/core/Cargo.toml b/src/expr/core/Cargo.toml index 1b5fcb7287ebf..ff1910e9a0182 100644 --- a/src/expr/core/Cargo.toml +++ b/src/expr/core/Cargo.toml @@ -19,6 +19,7 @@ normal = ["workspace-hack", "ctor"] anyhow = "1" arrow-array = { workspace = true } arrow-schema = { workspace = true } +arrow-udf-wasm = { workspace = true } async-trait = "0.1" auto_impl = "1" await-tree = { workspace = true } @@ -35,11 +36,13 @@ enum-as-inner = "0.6" futures-async-stream = { workspace = true } futures-util = "0.3" itertools = "0.12" +moka = { version = "0.12", features = ["future"] } num-traits = "0.2" parse-display = "0.8" paste = "1" risingwave_common = { workspace = true } risingwave_expr_macro = { path = "../macro" } +risingwave_object_store = { workspace = true } risingwave_pb = { workspace = true } risingwave_udf = { workspace = true } smallvec = "1" diff --git a/src/expr/core/src/expr/build.rs b/src/expr/core/src/expr/build.rs index 50fd2e4497ec3..51e9b87de19fe 100644 --- a/src/expr/core/src/expr/build.rs +++ b/src/expr/core/src/expr/build.rs @@ -20,7 +20,7 @@ use risingwave_pb::expr::expr_node::{PbType, RexNode}; use risingwave_pb::expr::ExprNode; use super::expr_some_all::SomeAllExpression; -use super::expr_udf::UdfExpression; +use super::expr_udf::UserDefinedFunction; use super::strict::Strict; use super::wrapper::checked::Checked; use super::wrapper::non_strict::NonStrict; @@ -104,7 +104,7 @@ where match prost.get_rex_node()? { RexNode::InputRef(_) => InputRefExpression::build_boxed(prost, build_child), RexNode::Constant(_) => LiteralExpression::build_boxed(prost, build_child), - RexNode::Udf(_) => UdfExpression::build_boxed(prost, build_child), + RexNode::Udf(_) => UserDefinedFunction::build_boxed(prost, build_child), RexNode::FuncCall(_) => match prost.function_type() { // Dedicated types diff --git a/src/expr/core/src/expr/expr_udf.rs b/src/expr/core/src/expr/expr_udf.rs index cd11974eb986b..60f04799838ba 100644 --- a/src/expr/core/src/expr/expr_udf.rs +++ b/src/expr/core/src/expr/expr_udf.rs @@ -15,14 +15,21 @@ use std::collections::HashMap; use std::convert::TryFrom; use std::sync::atomic::{AtomicU8, Ordering}; -use std::sync::{Arc, LazyLock, Mutex, Weak}; +use std::sync::{Arc, LazyLock, Weak}; +use std::time::Duration; +use anyhow::Context; use arrow_schema::{Field, Fields, Schema}; +use arrow_udf_wasm::Runtime as WasmRuntime; use await_tree::InstrumentAwait; use cfg_or_panic::cfg_or_panic; +use moka::future::Cache; use risingwave_common::array::{ArrayError, ArrayRef, DataChunk}; +use risingwave_common::config::ObjectStoreConfig; use risingwave_common::row::OwnedRow; use risingwave_common::types::{DataType, Datum}; +use risingwave_object_store::object::build_remote_object_store; +use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; use risingwave_pb::expr::ExprNode; use risingwave_udf::ArrowFlightUdfClient; use thiserror_ext::AsReport; @@ -32,12 +39,12 @@ use crate::expr::Expression; use crate::{bail, Result}; #[derive(Debug)] -pub struct UdfExpression { +pub struct UserDefinedFunction { children: Vec, arg_types: Vec, return_type: DataType, arg_schema: Arc, - client: Arc, + imp: UdfImpl, identifier: String, span: await_tree::Span, /// Number of remaining successful calls until retry is enabled. @@ -50,8 +57,14 @@ pub struct UdfExpression { const INITIAL_RETRY_COUNT: u8 = 16; +#[derive(Debug)] +enum UdfImpl { + External(Arc), + Wasm(Arc), +} + #[async_trait::async_trait] -impl Expression for UdfExpression { +impl Expression for UserDefinedFunction { fn return_type(&self) -> DataType { self.return_type.clone() } @@ -82,7 +95,7 @@ impl Expression for UdfExpression { } } -impl UdfExpression { +impl UserDefinedFunction { async fn eval_inner( &self, columns: Vec, @@ -108,35 +121,39 @@ impl UdfExpression { ) .expect("failed to build record batch"); - let disable_retry_count = self.disable_retry_count.load(Ordering::Relaxed); - let result = if disable_retry_count != 0 { - self.client - .call(&self.identifier, input) - .instrument_await(self.span.clone()) - .await - } else { - self.client - .call_with_retry(&self.identifier, input) - .instrument_await(self.span.clone()) - .await + let output: arrow_array::RecordBatch = match &self.imp { + UdfImpl::Wasm(runtime) => runtime.call(&self.identifier, &input)?, + UdfImpl::External(client) => { + let disable_retry_count = self.disable_retry_count.load(Ordering::Relaxed); + let result = if disable_retry_count != 0 { + client + .call(&self.identifier, input) + .instrument_await(self.span.clone()) + .await + } else { + client + .call_with_retry(&self.identifier, input) + .instrument_await(self.span.clone()) + .await + }; + let disable_retry_count = self.disable_retry_count.load(Ordering::Relaxed); + let connection_error = matches!(&result, Err(e) if e.is_connection_error()); + if connection_error && disable_retry_count != INITIAL_RETRY_COUNT { + // reset count on connection error + self.disable_retry_count + .store(INITIAL_RETRY_COUNT, Ordering::Relaxed); + } else if !connection_error && disable_retry_count != 0 { + // decrease count on success, ignore if exchange failed + _ = self.disable_retry_count.compare_exchange( + disable_retry_count, + disable_retry_count - 1, + Ordering::Relaxed, + Ordering::Relaxed, + ); + } + result? + } }; - let disable_retry_count = self.disable_retry_count.load(Ordering::Relaxed); - let connection_error = matches!(&result, Err(e) if e.is_connection_error()); - if connection_error && disable_retry_count != INITIAL_RETRY_COUNT { - // reset count on connection error - self.disable_retry_count - .store(INITIAL_RETRY_COUNT, Ordering::Relaxed); - } else if !connection_error && disable_retry_count != 0 { - // decrease count on success, ignore if exchange failed - _ = self.disable_retry_count.compare_exchange( - disable_retry_count, - disable_retry_count - 1, - Ordering::Relaxed, - Ordering::Relaxed, - ); - } - let output = result?; - if output.num_rows() != vis.count_ones() { bail!( "UDF returned {} rows, but expected {}", @@ -164,7 +181,7 @@ impl UdfExpression { } #[cfg_or_panic(not(madsim))] -impl Build for UdfExpression { +impl Build for UserDefinedFunction { fn build( prost: &ExprNode, build_child: impl Fn(&ExprNode) -> Result, @@ -172,8 +189,17 @@ impl Build for UdfExpression { let return_type = DataType::from(prost.get_return_type().unwrap()); let udf = prost.get_rex_node().unwrap().as_udf().unwrap(); - // connect to UDF service - let client = get_or_create_client(&udf.link)?; + let imp = match udf.language.as_str() { + "wasm" => { + // Use `block_in_place` as an escape hatch to run async code here in sync context. + // Calling `block_on` directly will panic. + UdfImpl::Wasm(tokio::task::block_in_place(|| { + tokio::runtime::Handle::current() + .block_on(get_or_create_wasm_runtime(&udf.link)) + })?) + } + _ => UdfImpl::External(get_or_create_flight_client(&udf.link)?), + }; let arg_schema = Arc::new(Schema::new( udf.arg_types @@ -195,9 +221,9 @@ impl Build for UdfExpression { arg_types: udf.arg_types.iter().map(|t| t.into()).collect(), return_type, arg_schema, - client, + imp, identifier: udf.identifier.clone(), - span: format!("expr_udf_call ({})", udf.identifier).into(), + span: format!("udf_call({})", udf.identifier).into(), disable_retry_count: AtomicU8::new(0), }) } @@ -207,8 +233,8 @@ impl Build for UdfExpression { /// Get or create a client for the given UDF service. /// /// There is a global cache for clients, so that we can reuse the same client for the same service. -pub(crate) fn get_or_create_client(link: &str) -> Result> { - static CLIENTS: LazyLock>>> = +pub(crate) fn get_or_create_flight_client(link: &str) -> Result> { + static CLIENTS: LazyLock>>> = LazyLock::new(Default::default); let mut clients = CLIENTS.lock().unwrap(); if let Some(client) = clients.get(link).and_then(|c| c.upgrade()) { @@ -221,3 +247,42 @@ pub(crate) fn get_or_create_client(link: &str) -> Result Result> { + static RUNTIMES: LazyLock>> = LazyLock::new(|| { + Cache::builder() + .time_to_idle(Duration::from_secs(60)) + .build() + }); + + if let Some(runtime) = RUNTIMES.get(link).await { + return Ok(runtime.clone()); + } + + // create new runtime + let (wasm_storage_url, object_name) = link + .rsplit_once('/') + .context("invalid link for wasm function")?; + + // load wasm binary from object store + let object_store = build_remote_object_store( + wasm_storage_url, + Arc::new(ObjectStoreMetrics::unused()), + "Wasm Engine", + ObjectStoreConfig::default(), + ) + .await; + let binary = object_store + .read(object_name, ..) + .await + .context("failed to load wasm binary from object storage")?; + + let runtime = Arc::new(arrow_udf_wasm::Runtime::new(&binary)?); + RUNTIMES.insert(link.into(), runtime.clone()).await; + Ok(runtime) +} diff --git a/src/expr/core/src/expr/mod.rs b/src/expr/core/src/expr/mod.rs index 951ef4bb99765..d1ced2b3322c9 100644 --- a/src/expr/core/src/expr/mod.rs +++ b/src/expr/core/src/expr/mod.rs @@ -51,6 +51,7 @@ use risingwave_common::types::{DataType, Datum}; pub use self::build::*; pub use self::expr_input_ref::InputRefExpression; pub use self::expr_literal::LiteralExpression; +pub use self::expr_udf::get_or_create_wasm_runtime; pub use self::value::{ValueImpl, ValueRef}; pub use self::wrapper::*; pub use super::{ExprError, Result}; diff --git a/src/expr/core/src/table_function/user_defined.rs b/src/expr/core/src/table_function/user_defined.rs index e3850e6e75615..83658026ed56d 100644 --- a/src/expr/core/src/table_function/user_defined.rs +++ b/src/expr/core/src/table_function/user_defined.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use arrow_array::RecordBatch; use arrow_schema::{Field, Fields, Schema, SchemaRef}; +use arrow_udf_wasm::Runtime as WasmRuntime; use cfg_or_panic::cfg_or_panic; use futures_util::stream; use risingwave_common::array::{ArrayError, DataChunk, I32Array}; @@ -31,12 +32,18 @@ pub struct UserDefinedTableFunction { #[allow(dead_code)] arg_schema: SchemaRef, return_type: DataType, - client: Arc, + client: UdfImpl, identifier: String, #[allow(dead_code)] chunk_size: usize, } +#[derive(Debug)] +enum UdfImpl { + External(Arc), + Wasm(Arc), +} + #[async_trait::async_trait] impl TableFunction for UserDefinedTableFunction { fn return_type(&self) -> DataType { @@ -49,6 +56,29 @@ impl TableFunction for UserDefinedTableFunction { } } +#[cfg(not(madsim))] +impl UdfImpl { + #[try_stream(ok = RecordBatch, error = ExprError)] + async fn call_table_function<'a>(&'a self, identifier: &'a str, input: RecordBatch) { + match self { + UdfImpl::External(client) => { + #[for_await] + for res in client + .call_stream(identifier, stream::once(async { input })) + .await? + { + yield res?; + } + } + UdfImpl::Wasm(runtime) => { + for res in runtime.call_table_function(identifier, &input)? { + yield res?; + } + } + } + } +} + #[cfg(not(madsim))] impl UserDefinedTableFunction { #[try_stream(boxed, ok = DataChunk, error = ExprError)] @@ -70,8 +100,7 @@ impl UserDefinedTableFunction { #[for_await] for res in self .client - .call_stream(&self.identifier, stream::once(async { arrow_input })) - .await? + .call_table_function(&self.identifier, arrow_input) { let output = DataChunk::try_from(&res?)?; self.check_output(&output)?; @@ -147,8 +176,22 @@ pub fn new_user_defined(prost: &PbTableFunction, chunk_size: usize) -> Result()?, )); - // connect to UDF service - let client = crate::expr::expr_udf::get_or_create_client(&udtf.link)?; + + let client = match udtf.language.as_str() { + "wasm" => { + // Use `block_in_place` as an escape hatch to run async code here in sync context. + // Calling `block_on` directly will panic. + UdfImpl::Wasm(tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on( + crate::expr::expr_udf::get_or_create_wasm_runtime(&udtf.link), + ) + })?) + } + // connect to UDF service + _ => UdfImpl::External(crate::expr::expr_udf::get_or_create_flight_client( + &udtf.link, + )?), + }; Ok(UserDefinedTableFunction { children: prost.args.iter().map(expr_build_from_prost).try_collect()?, diff --git a/src/expr/udf/README.md b/src/expr/udf/README.md new file mode 100644 index 0000000000000..d9428cc547249 --- /dev/null +++ b/src/expr/udf/README.md @@ -0,0 +1,118 @@ +# Use UDFs in Rust + +This article provides a step-by-step guide for defining Rust functions in RisingWave. + +Rust functions are compiled into WebAssembly modules and then run on the embedded WebAssembly virtual machine in RisingWave. Compared to Python and Java, Rust UDFs offer **higher performance** (near native) and are **managed by the RisingWave kernel**, eliminating the need for additional maintenance. However, since they run embedded in the kernel, for security reasons, Rust UDFs currently **do not support access to external networks and are limited to computational tasks only**, with restricted CPU and memory resources. Therefore, we recommend using Rust UDFs for **computationally intensive tasks**, such as packet parsing and format conversion. + +## Prerequisites + +- Ensure that you have [Rust toolchain](https://rustup.rs) (stable channel) installed on your computer. +- Ensure that the Rust standard library for `wasm32-wasi` target is installed: + ```shell + rustup target add wasm32-wasi + ``` + +## 1. Create a project + +Create a Rust project named `udf`: + +```shell +cargo new --lib udf +cd udf +``` + +Add the following lines to `Cargo.toml`: + +```toml +[lib] +crate-type = ["cdylib"] + +[dependencies] +arrow-udf = "0.1" +``` + +## 2. Define your functions + +In `src/lib.rs`, define your functions using the `function` macro: + +```rust +use arrow_udf::function; + +// define a scalar function +#[function("gcd(int, int) -> int")] +fn gcd(mut x: i32, mut y: i32) -> i32 { + while y != 0 { + (x, y) = (y, x % y); + } + x +} + +// define a table function +#[function("series(int) -> setof int")] +fn series(n: i32) -> impl Iterator { + 0..n +} +``` + +You can find more usages in the [documentation](https://docs.rs/arrow_udf/0.1.0/arrow_udf/attr.function.html) and more examples in the [tests](https://github.com/risingwavelabs/arrow-udf/blob/main/arrow-udf/tests/tests.rs). + +Currently we only support a limited set of data types. `timestamptz` and complex array types are not supported yet. + +## 3. Build the project + +Build your functions into a WebAssembly module: + +```shell +cargo build --release --target wasm32-wasi +``` + +You can find the generated WASM module at `target/wasm32-wasi/release/udf.wasm`. + +Optional: It is recommended to strip the binary to reduce its size: + +```shell +# Install wasm-tools +cargo install wasm-tools + +# Strip the binary +wasm-tools strip ./target/wasm32-wasi/release/udf.wasm > udf.wasm +``` + +## 4. Declare your functions in RisingWave + +In RisingWave, use the `CREATE FUNCTION` command to declare the functions you defined. + +There are two ways to load the WASM module: + +1. The WASM binary can be embedded in the SQL statement using the base64 encoding. +You can use the following shell script to encode the binary and generate the SQL statement: + ```shell + encoded=$(base64 -i udf.wasm) + sql="CREATE FUNCTION gcd(int, int) RETURNS int LANGUAGE wasm USING BASE64 '$encoded';" + echo "$sql" > create_function.sql + ``` + When created successfully, the WASM binary will be automatically uploaded to the object store. + +2. The WASM binary can be loaded from the object store. + ```sql + CREATE FUNCTION gcd(int, int) RETURNS int + LANGUAGE wasm USING LINK 's3://bucket/path/to/udf.wasm'; + + CREATE FUNCTION series(int) RETURNS TABLE (x int) + LANGUAGE wasm USING LINK 's3://bucket/path/to/udf.wasm'; + ``` + + Or if you run RisingWave locally, you can use the local file system: + ```sql + CREATE FUNCTION gcd(int, int) RETURNS int + LANGUAGE wasm USING LINK 'fs://path/to/udf.wasm'; + ``` + +## 5. Use your functions in RisingWave + +Once the UDFs are created in RisingWave, you can use them in SQL queries just like any built-in functions. For example: + +```sql +SELECT gcd(25, 15); +SELECT series(5); +``` diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 43c22fdc35f88..d6996008507a9 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -18,10 +18,12 @@ normal = ["workspace-hack"] anyhow = "1" arc-swap = "1" arrow-schema = { workspace = true } +arrow-udf-wasm = { workspace = true } async-recursion = "1.0.5" async-trait = "0.1" auto_enums = { version = "0.8", features = ["futures03"] } auto_impl = "1" +base64 = "0.21" bk-tree = "0.5.0" bytes = "1" clap = { version = "4", features = ["derive"] } @@ -57,6 +59,7 @@ risingwave_common_service = { workspace = true } risingwave_connector = { workspace = true } risingwave_expr = { workspace = true } risingwave_hummock_sdk = { workspace = true } +risingwave_object_store = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } risingwave_source = { workspace = true } diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index a0545b81b17d6..8c9532f50a9a7 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -294,13 +294,6 @@ impl Binder { .into()); } } else { - // Note that `language` may be empty for external udf - if !func.language.is_empty() { - debug_assert!( - func.language == "python" || func.language == "java", - "only `python` and `java` are currently supported for general udf" - ); - } match &func.kind { Scalar { .. } => { return Ok(UserDefinedFunction::new(func.clone(), inputs).into()) diff --git a/src/frontend/src/handler/create_function.rs b/src/frontend/src/handler/create_function.rs index 4557b71223b98..bc10bdea4431b 100644 --- a/src/frontend/src/handler/create_function.rs +++ b/src/frontend/src/handler/create_function.rs @@ -12,17 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use arrow_schema::Fields; +use bytes::Bytes; use itertools::Itertools; use pgwire::pg_response::StatementType; use risingwave_common::catalog::FunctionId; use risingwave_common::types::DataType; +use risingwave_expr::expr::get_or_create_wasm_runtime; +use risingwave_object_store::object::{build_remote_object_store, ObjectStoreConfig}; use risingwave_pb::catalog::function::{Kind, ScalarFunction, TableFunction}; use risingwave_pb::catalog::Function; use risingwave_sqlparser::ast::{ CreateFunctionBody, FunctionDefinition, ObjectName, OperateFunctionArg, }; +use risingwave_storage::monitor::ObjectStoreMetrics; use risingwave_udf::ArrowFlightUdfClient; use super::*; @@ -49,7 +53,7 @@ pub async fn handle_create_function( Some(lang) => { let lang = lang.real_value().to_lowercase(); match &*lang { - "python" | "java" => lang, + "python" | "java" | "wasm" => lang, _ => { return Err(ErrorCode::InvalidParameterValue(format!( "language {} is not supported", @@ -63,12 +67,6 @@ pub async fn handle_create_function( // correct protocol. None => "".to_string(), }; - let Some(FunctionDefinition::SingleQuotedDef(identifier)) = params.as_ else { - return Err(ErrorCode::InvalidParameterValue("AS must be specified".to_string()).into()); - }; - let Some(CreateFunctionUsing::Link(link)) = params.using else { - return Err(ErrorCode::InvalidParameterValue("USING must be specified".to_string()).into()); - }; let return_type; let kind = match returns { Some(CreateFunctionReturns::Value(data_type)) => { @@ -81,14 +79,10 @@ pub async fn handle_create_function( return_type = bind_data_type(&columns[0].data_type)?; } else { // return type is a struct for multiple columns - let datatypes = columns - .iter() - .map(|c| bind_data_type(&c.data_type)) - .collect::>>()?; - let names = columns - .iter() - .map(|c| c.name.real_value()) - .collect::>(); + let it = columns + .into_iter() + .map(|c| bind_data_type(&c.data_type).map(|ty| (ty, c.name.real_value()))); + let (datatypes, names) = itertools::process_results(it, |it| it.unzip())?; return_type = DataType::new_struct(datatypes, names); } Kind::Table(TableFunction {}) @@ -101,6 +95,10 @@ pub async fn handle_create_function( } }; + let Some(using) = params.using else { + return Err(ErrorCode::InvalidParameterValue("USING must be specified".to_string()).into()); + }; + let mut arg_types = vec![]; for arg in args.unwrap_or_default() { arg_types.push(bind_data_type(&arg.data_type)?); @@ -112,7 +110,7 @@ pub async fn handle_create_function( let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, name)?; let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?; - // check if function exists + // check if the function exists in the catalog if (session.env().catalog_reader().read_guard()) .get_schema_by_id(&database_id, &schema_id)? .get_function_by_name_args(&function_name, &arg_types) @@ -125,32 +123,93 @@ pub async fn handle_create_function( return Err(CatalogError::Duplicated("function", name).into()); } - // check the service - let client = ArrowFlightUdfClient::connect(&link) - .await - .map_err(|e| anyhow!(e))?; - /// A helper function to create a unnamed field from data type. - fn to_field(data_type: arrow_schema::DataType) -> arrow_schema::Field { - arrow_schema::Field::new("", data_type, true) - } - let args = arrow_schema::Schema::new( - arg_types - .iter() - .map::, _>(|t| Ok(to_field(t.try_into()?))) - .try_collect::<_, Fields, _>()?, - ); - let returns = arrow_schema::Schema::new(match kind { - Kind::Scalar(_) => vec![to_field(return_type.clone().try_into()?)], - Kind::Table(_) => vec![ - arrow_schema::Field::new("row_index", arrow_schema::DataType::Int32, true), - to_field(return_type.clone().try_into()?), - ], - _ => unreachable!(), - }); - client - .check(&identifier, &args, &returns) - .await - .map_err(|e| anyhow!(e))?; + let link; + let identifier; + + match language.as_str() { + "python" | "java" | "" => { + let CreateFunctionUsing::Link(l) = using else { + return Err(ErrorCode::InvalidParameterValue( + "USING LINK must be specified".to_string(), + ) + .into()); + }; + let Some(FunctionDefinition::SingleQuotedDef(id)) = params.as_ else { + return Err( + ErrorCode::InvalidParameterValue("AS must be specified".to_string()).into(), + ); + }; + identifier = id; + link = l; + + // check UDF server + { + let client = ArrowFlightUdfClient::connect(&link) + .await + .map_err(|e| anyhow!(e))?; + /// A helper function to create a unnamed field from data type. + fn to_field(data_type: arrow_schema::DataType) -> arrow_schema::Field { + arrow_schema::Field::new("", data_type, true) + } + let args = arrow_schema::Schema::new( + arg_types + .iter() + .map::, _>(|t| Ok(to_field(t.try_into()?))) + .try_collect::<_, Fields, _>()?, + ); + let returns = arrow_schema::Schema::new(match kind { + Kind::Scalar(_) => vec![to_field(return_type.clone().try_into()?)], + Kind::Table(_) => vec![ + arrow_schema::Field::new("row_index", arrow_schema::DataType::Int32, true), + to_field(return_type.clone().try_into()?), + ], + _ => unreachable!(), + }); + client + .check(&identifier, &args, &returns) + .await + .context("failed to check UDF signature")?; + } + } + "wasm" => { + identifier = wasm_identifier( + &function_name, + &arg_types, + &return_type, + matches!(kind, Kind::Table(_)), + ); + + link = match using { + CreateFunctionUsing::Link(link) => { + let runtime = get_or_create_wasm_runtime(&link).await?; + check_wasm_function(&runtime, &identifier)?; + link + } + CreateFunctionUsing::Base64(encoded) => { + // decode wasm binary from base64 + use base64::prelude::{Engine, BASE64_STANDARD}; + let wasm_binary = BASE64_STANDARD + .decode(encoded) + .context("invalid base64 encoding")?; + + let runtime = arrow_udf_wasm::Runtime::new(&wasm_binary)?; + check_wasm_function(&runtime, &identifier)?; + + let system_params = session.env().meta_client().get_system_params().await?; + let object_name = format!("{:?}.wasm", md5::compute(&wasm_binary)); + upload_wasm_binary( + system_params.wasm_storage_url(), + &object_name, + wasm_binary.into(), + ) + .await?; + + format!("{}/{}", system_params.wasm_storage_url(), object_name) + } + }; + } + _ => unreachable!("invalid language: {language}"), + }; let function = Function { id: FunctionId::placeholder().0, @@ -172,3 +231,78 @@ pub async fn handle_create_function( Ok(PgResponse::empty_result(StatementType::CREATE_FUNCTION)) } + +/// Upload wasm binary to object store. +async fn upload_wasm_binary( + wasm_storage_url: &str, + object_name: &str, + wasm_binary: Bytes, +) -> Result<()> { + // Note: it will panic if the url is invalid. We did a validation on meta startup. + let object_store = build_remote_object_store( + wasm_storage_url, + Arc::new(ObjectStoreMetrics::unused()), + "Wasm Engine", + ObjectStoreConfig::default(), + ) + .await; + object_store + .upload(object_name, wasm_binary) + .await + .context("failed to upload wasm binary to object store")?; + Ok(()) +} + +/// Check if the function exists in the wasm binary. +fn check_wasm_function(runtime: &arrow_udf_wasm::Runtime, identifier: &str) -> Result<()> { + if !runtime.functions().contains(&identifier) { + return Err(ErrorCode::InvalidParameterValue(format!( + "function not found in wasm binary: \"{}\"\nHINT: available functions:\n {}", + identifier, + runtime.functions().join("\n ") + )) + .into()); + } + Ok(()) +} + +/// Generate the function identifier in wasm binary. +fn wasm_identifier(name: &str, args: &[DataType], ret: &DataType, table_function: bool) -> String { + format!( + "{}({}){}{}", + name, + args.iter().map(datatype_name).join(","), + if table_function { "->>" } else { "->" }, + datatype_name(ret) + ) +} + +/// Convert a data type to string used in identifier. +fn datatype_name(ty: &DataType) -> String { + match ty { + DataType::Boolean => "boolean".to_string(), + DataType::Int16 => "int2".to_string(), + DataType::Int32 => "int4".to_string(), + DataType::Int64 => "int8".to_string(), + DataType::Float32 => "float4".to_string(), + DataType::Float64 => "float8".to_string(), + DataType::Date => "date".to_string(), + DataType::Time => "time".to_string(), + DataType::Timestamp => "timestamp".to_string(), + DataType::Timestamptz => "timestamptz".to_string(), + DataType::Interval => "interval".to_string(), + DataType::Decimal => "decimal".to_string(), + DataType::Jsonb => "json".to_string(), + DataType::Serial => "serial".to_string(), + DataType::Int256 => "int256".to_string(), + DataType::Bytea => "bytea".to_string(), + DataType::Varchar => "varchar".to_string(), + DataType::List(inner) => format!("{}[]", datatype_name(inner)), + DataType::Struct(s) => format!( + "struct<{}>", + s.iter() + .map(|(name, ty)| format!("{}:{}", name, datatype_name(ty))) + .join(",") + ), + } +} diff --git a/src/meta/src/backup_restore/restore.rs b/src/meta/src/backup_restore/restore.rs index 0ed838644cfec..084aa8ad7192c 100644 --- a/src/meta/src/backup_restore/restore.rs +++ b/src/meta/src/backup_restore/restore.rs @@ -241,6 +241,7 @@ mod tests { SystemParams { state_store: Some("state_store".to_string()), data_directory: Some("data_directory".to_string()), + wasm_storage_url: Some("wasm_storage_url".to_string()), ..SystemConfig::default().into_init_system_params() } } diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index f272e6d1ff198..cace146d7fb62 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -639,6 +639,7 @@ impl CatalogManager { #[cfg(not(test))] user_core.ensure_user_id(function.owner)?; + tracing::debug!("create function: {:?}", function); let mut functions = BTreeMapTransaction::new(&mut database_core.functions); functions.insert(function.id, function.clone()); commit_meta!(self, functions)?; diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 6684b45b6d66c..32adba09d2bb1 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -29,7 +29,7 @@ pub mod s3; use await_tree::InstrumentAwait; use futures::stream::BoxStream; use futures::StreamExt; -use risingwave_common::config::ObjectStoreConfig; +pub use risingwave_common::config::ObjectStoreConfig; pub use s3::*; pub mod error; @@ -47,7 +47,7 @@ pub trait ObjectRangeBounds = RangeBounds + Clone + Send + Sync + std::fm /// Partitions a set of given paths into two vectors. The first vector contains all local paths, and /// the second contains all remote paths. -pub fn partition_object_store_paths(paths: &[String]) -> Vec { +fn partition_object_store_paths(paths: &[String]) -> Vec { // ToDo: Currently the result is a copy of the input. Would it be worth it to use an in-place // partition instead? let mut vec_rem = vec![]; @@ -782,6 +782,11 @@ impl MonitoredObjectStore { } } +/// Creates a new [`ObjectStore`] from the given `url`. Credentials are configured via environment +/// variables. +/// +/// # Panics +/// If the `url` is invalid. Therefore, it is only suitable to be used during startup. pub async fn build_remote_object_store( url: &str, metrics: Arc, diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index d6c56dff7b29f..eef14722ee841 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -2712,6 +2712,7 @@ impl fmt::Display for CreateFunctionBody { #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum CreateFunctionUsing { Link(String), + Base64(String), } impl fmt::Display for CreateFunctionUsing { @@ -2719,6 +2720,9 @@ impl fmt::Display for CreateFunctionUsing { write!(f, "USING ")?; match self { CreateFunctionUsing::Link(uri) => write!(f, "LINK '{uri}'"), + CreateFunctionUsing::Base64(s) => { + write!(f, "BASE64 '{s}'") + } } } } diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index 55b7f81949719..dae6529376c4c 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -93,6 +93,7 @@ define_keywords!( AUTHORIZATION, AUTO, AVG, + BASE64, BEGIN, BEGIN_FRAME, BEGIN_PARTITION, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 59fedc43a398c..5bfda592f3b27 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -2340,16 +2340,18 @@ impl Parser { } fn parse_create_function_using(&mut self) -> Result { - let keyword = self.expect_one_of_keywords(&[Keyword::LINK])?; - - let uri = self.parse_literal_string()?; + let keyword = self.expect_one_of_keywords(&[Keyword::LINK, Keyword::BASE64])?; match keyword { - Keyword::LINK => Ok(CreateFunctionUsing::Link(uri)), - _ => self.expected( - "LINK, got {:?}", - TokenWithLocation::wrap(Token::make_keyword(format!("{keyword:?}").as_str())), - ), + Keyword::LINK => { + let uri = self.parse_literal_string()?; + Ok(CreateFunctionUsing::Link(uri)) + } + Keyword::BASE64 => { + let base64 = self.parse_literal_string()?; + Ok(CreateFunctionUsing::Base64(base64)) + } + _ => unreachable!("{}", keyword), } } diff --git a/src/storage/hummock_trace/Cargo.toml b/src/storage/hummock_trace/Cargo.toml index 357bc121c0296..7bc7de61fefc9 100644 --- a/src/storage/hummock_trace/Cargo.toml +++ b/src/storage/hummock_trace/Cargo.toml @@ -25,7 +25,7 @@ tokio = { version = "0.2", package = "madsim-tokio" } tracing = "0.1" [dev-dependencies] -itertools = "0.10.5" +itertools = "0.12" mockall = "0.11.4" risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] } diff --git a/src/utils/pgwire/src/pg_protocol.rs b/src/utils/pgwire/src/pg_protocol.rs index ebffb24d81118..8f0d1fc5a5498 100644 --- a/src/utils/pgwire/src/pg_protocol.rs +++ b/src/utils/pgwire/src/pg_protocol.rs @@ -53,7 +53,7 @@ static RW_QUERY_LOG_TRUNCATE_LEN: LazyLock = Ok(len) if len.parse::().is_ok() => len.parse::().unwrap(), _ => { if cfg!(debug_assertions) { - usize::MAX + 65536 } else { 1024 } diff --git a/src/utils/runtime/src/logger.rs b/src/utils/runtime/src/logger.rs index 11e82150de4a3..e636c3a72de51 100644 --- a/src/utils/runtime/src/logger.rs +++ b/src/utils/runtime/src/logger.rs @@ -203,7 +203,6 @@ pub fn init_risingwave_logger(settings: LoggerSettings) { .with_target("aws_sdk_ec2", Level::INFO) .with_target("aws_sdk_s3", Level::INFO) .with_target("aws_config", Level::WARN) - // Only enable WARN and ERROR for 3rd-party crates .with_target("aws_endpoint", Level::WARN) .with_target("aws_credential_types::cache::lazy_caching", Level::WARN) .with_target("hyper", Level::WARN) @@ -214,6 +213,8 @@ pub fn init_risingwave_logger(settings: LoggerSettings) { .with_target("console_subscriber", Level::WARN) .with_target("reqwest", Level::WARN) .with_target("sled", Level::INFO) + .with_target("cranelift", Level::INFO) + .with_target("wasmtime", Level::INFO) // Expose hyper connection socket addr log. .with_target("hyper::client::connect::http", Level::DEBUG); diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 09e5b2e6ce6c7..c1bf5e9370c8e 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -62,7 +62,8 @@ hashbrown-594e8ee84c453af0 = { package = "hashbrown", version = "0.13", features hashbrown-5ef9efb8ec2df382 = { package = "hashbrown", version = "0.12", features = ["nightly", "raw"] } hmac = { version = "0.12", default-features = false, features = ["reset"] } hyper = { version = "0.14", features = ["full"] } -indexmap = { version = "1", default-features = false, features = ["serde", "std"] } +indexmap-dff4ba8e3ae991db = { package = "indexmap", version = "1", default-features = false, features = ["serde", "std"] } +indexmap-f595c2ba2a3f28df = { package = "indexmap", version = "2", features = ["serde"] } itertools = { version = "0.11" } jni = { version = "0.21", features = ["invocation"] } lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] } @@ -108,6 +109,8 @@ regex-syntax = { version = "0.8" } reqwest = { version = "0.11", features = ["blocking", "json", "rustls-tls"] } ring = { version = "0.16", features = ["std"] } rust_decimal = { version = "1", features = ["db-postgres", "maths"] } +rustc-hash = { version = "1" } +rustix = { version = "0.38", features = ["fs", "net"] } scopeguard = { version = "1" } sea-orm = { version = "0.12", features = ["runtime-tokio-native-tls", "sqlx-mysql", "sqlx-postgres", "sqlx-sqlite"] } sea-query = { version = "0.30", default-features = false, features = ["backend-mysql", "backend-postgres", "backend-sqlite", "derive", "hashable-value", "postgres-array", "thread-safe", "with-bigdecimal", "with-chrono", "with-json", "with-rust_decimal", "with-time", "with-uuid"] } @@ -145,6 +148,7 @@ url = { version = "2", features = ["serde"] } uuid = { version = "1", features = ["fast-rng", "serde", "v4"] } whoami = { version = "1" } zeroize = { version = "1" } +zstd-sys = { version = "2", default-features = false, features = ["legacy", "std", "zdict_builder"] } [build-dependencies] ahash = { version = "0.8" } @@ -161,6 +165,7 @@ fixedbitset = { version = "0.4" } frunk_core = { version = "0.4", default-features = false, features = ["std"] } generic-array = { version = "0.14", default-features = false, features = ["more_lengths", "zeroize"] } hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", features = ["nightly", "raw"] } +indexmap-f595c2ba2a3f28df = { package = "indexmap", version = "2", features = ["serde"] } itertools = { version = "0.11" } lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] } libc = { version = "0.2", features = ["extra_traits"] } @@ -182,6 +187,8 @@ rand_core = { version = "0.6", default-features = false, features = ["std"] } regex = { version = "1" } regex-automata = { version = "0.4", default-features = false, features = ["dfa", "hybrid", "meta", "nfa", "perf", "unicode"] } regex-syntax = { version = "0.8" } +rustc-hash = { version = "1" } +rustix = { version = "0.38", features = ["fs", "net"] } serde = { version = "1", features = ["alloc", "derive", "rc"] } serde_json = { version = "1", features = ["alloc", "raw_value"] } sha2 = { version = "0.10", features = ["oid"] }