diff --git a/Cargo.lock b/Cargo.lock index 3f7df00f7648d..93dd074beda3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,13 +2,22 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a76fd60b23679b7d19bd066031410fb7e458ccc5e958eb5c325888ce4baedc97" +dependencies = [ + "gimli 0.27.3", +] + [[package]] name = "addr2line" version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb" dependencies = [ - "gimli", + "gimli 0.28.0", ] [[package]] @@ -83,6 +92,12 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" +[[package]] +name = "ambient-authority" +version = "0.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9d4ee0d472d1cd2e28c97dfa124b3d8d992e10eb0a035f33f5d12e3a177ba3b" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -229,6 +244,12 @@ dependencies = [ "syn 2.0.37", ] +[[package]] +name = "arbitrary" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2e1373abdaa212b704512ec2bd8b26bd0b7d5c3f70117411a5d9a451383c859" + [[package]] name = "arc-swap" version = "1.6.0" @@ -1136,12 +1157,12 @@ version = "0.3.69" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837" dependencies = [ - "addr2line", + "addr2line 0.21.0", "cc", "cfg-if", "libc", "miniz_oxide", - "object", + "object 0.32.1", "rustc-demangle", ] @@ -1534,6 +1555,69 @@ dependencies = [ "serde", ] +[[package]] +name = "cap-fs-ext" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58bc48200a1a0fa6fba138b1802ad7def18ec1cdd92f7b2a04e21f1bd887f7b9" +dependencies = [ + "cap-primitives", + "cap-std", + "io-lifetimes 1.0.11", + "windows-sys 0.48.0", +] + +[[package]] +name = "cap-primitives" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4b6df5b295dca8d56f35560be8c391d59f0420f72e546997154e24e765e6451" +dependencies = [ + "ambient-authority", + "fs-set-times", + "io-extras", + "io-lifetimes 1.0.11", + "ipnet", + "maybe-owned", + "rustix 0.37.23", + "windows-sys 0.48.0", + "winx 0.35.1", +] + +[[package]] +name = "cap-rand" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d25555efacb0b5244cf1d35833d55d21abc916fff0eaad254b8e2453ea9b8ab" +dependencies = [ + "ambient-authority", + "rand", +] + +[[package]] +name = "cap-std" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3373a62accd150b4fcba056d4c5f3b552127f0ec86d3c8c102d60b978174a012" +dependencies = [ + "cap-primitives", + "io-extras", + "io-lifetimes 1.0.11", + "rustix 0.37.23", +] + +[[package]] +name = "cap-time-ext" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e95002993b7baee6b66c8950470e59e5226a23b3af39fc59c47fe416dd39821a" +dependencies = [ + "cap-primitives", + "once_cell", + "rustix 0.37.23", + "winx 0.35.1", +] + [[package]] name = "cargo-platform" version = "0.1.3" @@ -1953,6 +2037,15 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa" +[[package]] +name = "cpp_demangle" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eeaa953eaad386a53111e47172c2fedba671e5684c8dd601a5f474f4f118710f" +dependencies = [ + "cfg-if", +] + [[package]] name = "cpp_demangle" version = "0.4.3" @@ -1971,6 +2064,114 @@ dependencies = [ "libc", ] +[[package]] +name = "cranelift-bforest" +version = "0.97.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7aae6f552c4c0ccfb30b9559b77bc985a387d998e1736cbbe6b14c903f3656cf" +dependencies = [ + "cranelift-entity", +] + +[[package]] +name = "cranelift-codegen" +version = "0.97.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95551de96900cefae691ce895ff2abc691ae3a0b97911a76b45faf99e432937b" +dependencies = [ + "bumpalo", + "cranelift-bforest", + "cranelift-codegen-meta", + "cranelift-codegen-shared", + "cranelift-control", + "cranelift-entity", + "cranelift-isle", + "gimli 0.27.3", + "hashbrown 0.13.2", + "log", + "regalloc2", + "smallvec", + "target-lexicon", +] + +[[package]] +name = "cranelift-codegen-meta" +version = "0.97.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36a3ad7b2bb03de3383f258b00ca29d80234bebd5130cb6ef3bae37ada5baab0" +dependencies = [ + "cranelift-codegen-shared", +] + +[[package]] +name = "cranelift-codegen-shared" +version = "0.97.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "915918fee4142c85fb04bafe0bcd697e2fd6c15a260301ea6f8d2ea332a30e86" + +[[package]] +name = "cranelift-control" +version = "0.97.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37e447d548cd7f4fcb87fbd10edbd66a4f77966d17785ed50a08c8f3835483c8" +dependencies = [ + "arbitrary", +] + +[[package]] +name = "cranelift-entity" +version = "0.97.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d8ab3352a1e5966968d7ab424bd3de8e6b58314760745c3817c2eec3fa2f918" +dependencies = [ + "serde", +] + +[[package]] +name = "cranelift-frontend" +version = "0.97.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bffa38431f7554aa1594f122263b87c9e04abc55c9f42b81d37342ac44f79f0" +dependencies = [ + "cranelift-codegen", + "log", + "smallvec", + "target-lexicon", +] + +[[package]] +name = "cranelift-isle" +version = "0.97.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84cef66a71c77938148b72bf006892c89d6be9274a08f7e669ff15a56145d701" + +[[package]] +name = "cranelift-native" +version = "0.97.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f33c7e5eb446e162d2d10b17fe68e1f091020cc2e4e38b5501c21099600b0a1b" +dependencies = [ + "cranelift-codegen", + "libc", + "target-lexicon", +] + +[[package]] +name = "cranelift-wasm" +version = "0.97.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "632f7b64fa6a8c5b980eb6a17ef22089e15cb9f779f1ed3bd3072beab0686c09" +dependencies = [ + "cranelift-codegen", + "cranelift-entity", + "cranelift-frontend", + "itertools 0.10.5", + "log", + "smallvec", + "wasmparser 0.107.0", + "wasmtime-types", +] + [[package]] name = "crc" version = "3.0.1" @@ -2099,7 +2300,7 @@ dependencies = [ "autocfg", "cfg-if", "crossbeam-utils", - "memoffset", + "memoffset 0.9.0", "scopeguard", ] @@ -2543,6 +2744,47 @@ dependencies = [ "subtle", ] +[[package]] +name = "directories-next" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "339ee130d97a610ea5a5872d2bbb130fdf68884ff09d3028b81bec8a1ac23bbc" +dependencies = [ + "cfg-if", + "dirs-sys-next", +] + +[[package]] +name = "dirs" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059" +dependencies = [ + "dirs-sys", +] + +[[package]] +name = "dirs-sys" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6" +dependencies = [ + "libc", + "redox_users", + "winapi", +] + +[[package]] +name = "dirs-sys-next" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d" +dependencies = [ + "libc", + "redox_users", + "winapi", +] + [[package]] name = "dissimilar" version = "1.0.7" @@ -2886,6 +3128,17 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6999dc1837253364c2ebb0704ba97994bd874e8f195d665c50b7548f6ea92764" +[[package]] +name = "fd-lock" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b0377f1edc77dbd1118507bc7a66e4ab64d2b90c66f90726dc801e73a8c68f9" +dependencies = [ + "cfg-if", + "rustix 0.38.11", + "windows-sys 0.48.0", +] + [[package]] name = "fiat-crypto" version = "0.2.1" @@ -2901,6 +3154,16 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "file-per-thread-logger" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a3cc21c33af89af0930c8cae4ade5e6fdc17b5d2c97b3d2e2edb67a1cf683f3" +dependencies = [ + "env_logger", + "log", +] + [[package]] name = "findshlibs" version = "0.10.2" @@ -3048,7 +3311,7 @@ dependencies = [ "foyer-common", "foyer-workspace-hack", "itertools 0.11.0", - "memoffset", + "memoffset 0.9.0", "parking_lot 0.12.1", "paste", "tracing", @@ -3073,7 +3336,7 @@ dependencies = [ "itertools 0.11.0", "libc", "madsim-tokio", - "memoffset", + "memoffset 0.9.0", "nix 0.27.1", "parking_lot 0.12.1", "paste", @@ -3177,6 +3440,17 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0845fa252299212f0389d64ba26f34fa32cfe41588355f21ed507c59a0f64541" +[[package]] +name = "fs-set-times" +version = "0.19.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d167b646a876ba8fda6b50ac645cfd96242553cbaf0ca4fccaa39afcbf0801f" +dependencies = [ + "io-lifetimes 1.0.11", + "rustix 0.38.11", + "windows-sys 0.48.0", +] + [[package]] name = "fs2" version = "0.4.3" @@ -3360,6 +3634,19 @@ dependencies = [ "byteorder", ] +[[package]] +name = "fxprof-processed-profile" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27d12c0aed7f1e24276a241aadc4cb8ea9f83000f34bc062b7cc2d51e3b0fabd" +dependencies = [ + "bitflags 2.4.0", + "debugid", + "fxhash", + "serde", + "serde_json", +] + [[package]] name = "generator" version = "0.7.5" @@ -3395,6 +3682,17 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "gimli" +version = "0.27.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e" +dependencies = [ + "fallible-iterator", + "indexmap 1.9.3", + "stable_deref_trait", +] + [[package]] name = "gimli" version = "0.28.0" @@ -3893,6 +4191,12 @@ dependencies = [ "uuid", ] +[[package]] +name = "id-arena" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25a2bc672d1148e28034f176e01fffebb08b35768468cc954630da77a1449005" + [[package]] name = "ident_case" version = "1.0.1" @@ -4010,6 +4314,16 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "io-extras" +version = "0.17.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fde93d48f0d9277f977a333eca8313695ddd5301dc96f7e02aeddcb0dd99096f" +dependencies = [ + "io-lifetimes 1.0.11", + "windows-sys 0.48.0", +] + [[package]] name = "io-lifetimes" version = "1.0.11" @@ -4021,6 +4335,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "io-lifetimes" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bffb4def18c48926ccac55c1223e02865ce1a821751a95920448662696e7472c" + [[package]] name = "ipnet" version = "2.8.0" @@ -4062,6 +4382,26 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" +[[package]] +name = "ittapi" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25a5c0b993601cad796222ea076565c5d9f337d35592f8622c753724f06d7271" +dependencies = [ + "anyhow", + "ittapi-sys", + "log", +] + +[[package]] +name = "ittapi-sys" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7b5e473765060536a660eed127f758cf1a810c73e49063264959c60d1727d9" +dependencies = [ + "cc", +] + [[package]] name = "java-locator" version = "0.1.5" @@ -4188,6 +4528,12 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" +[[package]] +name = "leb128" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" + [[package]] name = "lexical" version = "6.1.1" @@ -4469,6 +4815,15 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "mach" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b823e83b2affd8f40a9ee8c29dbc56404c1e34cd2710921f2801e2cf29527afa" +dependencies = [ + "libc", +] + [[package]] name = "mach2" version = "0.4.1" @@ -4655,6 +5010,12 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed1202b2a6f884ae56f04cff409ab315c5ce26b5e58d7412e484f01fd52f52ef" +[[package]] +name = "maybe-owned" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4facc753ae494aeb6e3c22f839b158aebd4f9270f55cd3c79906c45476c47ab4" + [[package]] name = "md-5" version = "0.10.5" @@ -4688,6 +5049,15 @@ dependencies = [ "thiserror", ] +[[package]] +name = "memfd" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2cffa4ad52c6f791f4f8b15f0c05f9824b2ced1160e88cc393d64fff9a8ac64" +dependencies = [ + "rustix 0.38.11", +] + [[package]] name = "memmap2" version = "0.5.10" @@ -4697,6 +5067,15 @@ dependencies = [ "libc", ] +[[package]] +name = "memoffset" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d61c719bcfbcf5d62b3a09efa6088de8c54bc0bfcd3ea7ae39fcc186108b8de1" +dependencies = [ + "autocfg", +] + [[package]] name = "memoffset" version = "0.9.0" @@ -5225,6 +5604,18 @@ dependencies = [ "url", ] +[[package]] +name = "object" +version = "0.30.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03b4680b86d9cfafba8fc491dc9b6df26b68cf40e9e6cd73909194759a63c385" +dependencies = [ + "crc32fast", + "hashbrown 0.13.2", + "indexmap 1.9.3", + "memchr", +] + [[package]] name = "object" version = "0.32.1" @@ -6430,6 +6821,15 @@ dependencies = [ "autotools", ] +[[package]] +name = "psm" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5787f7cda34e3033a72192c018bc5883100330f362ef279a8cbccfce8bb4e874" +dependencies = [ + "cc", +] + [[package]] name = "ptr_meta" version = "0.1.4" @@ -6450,6 +6850,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "pulldown-cmark" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffade02495f22453cd593159ea2f59827aae7f53fa8323f756799b670881dcf8" +dependencies = [ + "bitflags 1.3.2", + "memchr", + "unicase", +] + [[package]] name = "pulldown-cmark" version = "0.9.3" @@ -6684,6 +7095,30 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "redox_users" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b033d837a7cf162d7993aded9304e30a83213c648b6e389db233191f891e5c2b" +dependencies = [ + "getrandom", + "redox_syscall 0.2.16", + "thiserror", +] + +[[package]] +name = "regalloc2" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad156d539c879b7a24a363a2016d77961786e71f48f2e2fc8302a92abd2429a6" +dependencies = [ + "hashbrown 0.13.2", + "log", + "rustc-hash", + "slice-group-by", + "smallvec", +] + [[package]] name = "regex" version = "1.10.0" @@ -7491,6 +7926,7 @@ dependencies = [ "async-recursion", "async-trait", "auto_enums", + "base64 0.21.4", "bk-tree", "bytes", "clap", @@ -7602,7 +8038,7 @@ dependencies = [ "bytes", "futures", "futures-async-stream", - "itertools 0.10.5", + "itertools 0.11.0", "madsim-tokio", "mockall", "parking_lot 0.12.1", @@ -8205,14 +8641,22 @@ version = "0.1.0" dependencies = [ "arrow-array", "arrow-flight", + "arrow-ipc", "arrow-schema", "arrow-select", + "base64 0.21.4", + "bytes", "cfg-or-panic", "futures-util", + "itertools 0.11.0", "madsim-tokio", "madsim-tonic", + "risingwave_object_store", "static_assertions", "thiserror", + "tracing", + "wasmtime", + "wasmtime-wasi", ] [[package]] @@ -8344,7 +8788,7 @@ checksum = "6da3636faa25820d8648e0e31c5d519bbb01f72fdf57131f0f5f7da5fed36eab" dependencies = [ "bitflags 1.3.2", "errno", - "io-lifetimes", + "io-lifetimes 1.0.11", "libc", "linux-raw-sys 0.1.4", "windows-sys 0.45.0", @@ -8358,9 +8802,11 @@ checksum = "4d69718bf81c6127a49dc64e44a742e8bb9213c0ff8869a22c308f84c1d4ab06" dependencies = [ "bitflags 1.3.2", "errno", - "io-lifetimes", + "io-lifetimes 1.0.11", + "itoa", "libc", "linux-raw-sys 0.3.8", + "once_cell", "windows-sys 0.48.0", ] @@ -9027,6 +9473,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24188a676b6ae68c3b2cb3a01be17fbf7240ce009799bb56d5b1409051e78fde" +[[package]] +name = "shellexpand" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ccc8076840c4da029af4f87e4e8daeb0fca6b87bbb02e10cb60b791450e11e4" +dependencies = [ + "dirs", +] + [[package]] name = "shlex" version = "1.2.0" @@ -9146,7 +9601,7 @@ dependencies = [ "cargo_metadata", "error-chain", "glob", - "pulldown-cmark", + "pulldown-cmark 0.9.3", "tempfile", "walkdir", ] @@ -9176,6 +9631,12 @@ dependencies = [ "parking_lot 0.11.2", ] +[[package]] +name = "slice-group-by" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "826167069c09b99d56f31e9ae5c99049e932a98c9dc2dac47645b08dbbf76ba7" + [[package]] name = "smallbitset" version = "0.7.1" @@ -9255,6 +9716,12 @@ dependencies = [ "der", ] +[[package]] +name = "sptr" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b9b39299b249ad65f3b7e96443bad61c02ca5cd3589f46cb6d610a0fd6c0d6a" + [[package]] name = "sqlformat" version = "0.2.2" @@ -9648,7 +10115,7 @@ version = "12.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "691e53bdc0702aba3a5abc2cffff89346fcbd4050748883c7e2f714b33a69045" dependencies = [ - "cpp_demangle", + "cpp_demangle 0.4.3", "rustc-demangle", "symbolic-common", ] @@ -9705,6 +10172,22 @@ dependencies = [ "winapi", ] +[[package]] +name = "system-interface" +version = "0.25.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10081a99cbecbc363d381b9503563785f0b02735fccbb0d4c1a2cb3d39f7e7fe" +dependencies = [ + "bitflags 2.4.0", + "cap-fs-ext", + "cap-std", + "fd-lock", + "io-lifetimes 2.0.2", + "rustix 0.38.11", + "windows-sys 0.48.0", + "winx 0.36.2", +] + [[package]] name = "tagptr" version = "0.2.0" @@ -9717,6 +10200,12 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" +[[package]] +name = "target-lexicon" +version = "0.12.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c39fd04924ca3a864207c66fc2cd7d22d7c016007f9ce846cbb9326331930a" + [[package]] name = "task_stats_alloc" version = "0.1.11" @@ -10486,6 +10975,12 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b" +[[package]] +name = "unicode-xid" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" + [[package]] name = "unicode_categories" version = "0.1.1" @@ -10638,6 +11133,50 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasi-cap-std-sync" +version = "10.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2fe3aaf51c1e1a04a490e89f0a9cab789d21a496c0ce398d49a24f8df883a58" +dependencies = [ + "anyhow", + "async-trait", + "cap-fs-ext", + "cap-rand", + "cap-std", + "cap-time-ext", + "fs-set-times", + "io-extras", + "io-lifetimes 1.0.11", + "is-terminal", + "once_cell", + "rustix 0.37.23", + "system-interface", + "tracing", + "wasi-common", + "windows-sys 0.48.0", +] + +[[package]] +name = "wasi-common" +version = "10.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e74e9a2c8bfda59870a8bff38a31b9ba80b6fdb7abdfd2487177b85537d2e8a8" +dependencies = [ + "anyhow", + "bitflags 1.3.2", + "cap-rand", + "cap-std", + "io-extras", + "log", + "rustix 0.37.23", + "thiserror", + "tracing", + "wasmtime", + "wiggle", + "windows-sys 0.48.0", +] + [[package]] name = "wasm-bindgen" version = "0.2.87" @@ -10704,6 +11243,24 @@ version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" +[[package]] +name = "wasm-encoder" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18c41dbd92eaebf3612a39be316540b8377c871cb9bde6b064af962984912881" +dependencies = [ + "leb128", +] + +[[package]] +name = "wasm-encoder" +version = "0.35.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ca90ba1b5b0a70d3d49473c5579951f3bddc78d47b59256d2f9d4922b150aca" +dependencies = [ + "leb128", +] + [[package]] name = "wasm-streams" version = "0.3.0" @@ -10717,6 +11274,370 @@ dependencies = [ "web-sys", ] +[[package]] +name = "wasmparser" +version = "0.107.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29e3ac9b780c7dda0cac7a52a5d6d2d6707cc6e3451c9db209b6c758f40d7acb" +dependencies = [ + "indexmap 1.9.3", + "semver", +] + +[[package]] +name = "wasmparser" +version = "0.115.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e06c0641a4add879ba71ccb3a1e4278fd546f76f1eafb21d8f7b07733b547cd5" +dependencies = [ + "indexmap 2.0.0", + "semver", +] + +[[package]] +name = "wasmprinter" +version = "0.2.70" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e74458a9bc5cc9c7108abfa0fe4dc88d5abf1f3baf194df3264985f17d559b5e" +dependencies = [ + "anyhow", + "wasmparser 0.115.0", +] + +[[package]] +name = "wasmtime" +version = "10.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bc104ced94ff0a6981bde77a0bc29aab4af279914a4143b8d1af9fd4b2c9d41" +dependencies = [ + "anyhow", + "async-trait", + "bincode 1.3.3", + "bumpalo", + "cfg-if", + "encoding_rs", + "fxprof-processed-profile", + "indexmap 1.9.3", + "libc", + "log", + "object 0.30.4", + "once_cell", + "paste", + "psm", + "rayon", + "serde", + "serde_json", + "target-lexicon", + "wasmparser 0.107.0", + "wasmtime-cache", + "wasmtime-component-macro", + "wasmtime-component-util", + "wasmtime-cranelift", + "wasmtime-environ", + "wasmtime-fiber", + "wasmtime-jit", + "wasmtime-runtime", + "wasmtime-winch", + "wat", + "windows-sys 0.48.0", +] + +[[package]] +name = "wasmtime-asm-macros" +version = "10.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2b28e5661a9b5f7610a62ab3c69222fa161f7bd31d04529e856461d8c3e706b" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "wasmtime-cache" +version = "10.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f58ddfe801df3886feaf466d883ea37e941bcc6d841b9f644a08c7acabfe7f8" +dependencies = [ + "anyhow", + "base64 0.21.4", + "bincode 1.3.3", + "directories-next", + "file-per-thread-logger", + "log", + "rustix 0.37.23", + "serde", + "sha2", + "toml 0.5.11", + "windows-sys 0.48.0", + "zstd 0.11.2+zstd.1.5.2", +] + +[[package]] +name = "wasmtime-component-macro" +version = "10.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39725d9633fb064bd3a6d83c5ea5077289256de0862d3d96295822edb13419c0" +dependencies = [ + "anyhow", + "proc-macro2", + "quote", + "syn 1.0.109", + "wasmtime-component-util", + "wasmtime-wit-bindgen", + "wit-parser", +] + +[[package]] +name = "wasmtime-component-util" +version = "10.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1153feafc824f95dc69472cb89a3396b3b05381f781a7508b01840f9df7b1a51" + +[[package]] +name = "wasmtime-cranelift" +version = "10.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fc1e39ce9aa0fa0b319541ed423960b06cfa7343eca1574f811ea34275739c2" +dependencies = [ + "anyhow", + "cranelift-codegen", + "cranelift-control", + "cranelift-entity", + "cranelift-frontend", + "cranelift-native", + "cranelift-wasm", + "gimli 0.27.3", + "log", + "object 0.30.4", + "target-lexicon", + "thiserror", + "wasmparser 0.107.0", + "wasmtime-cranelift-shared", + "wasmtime-environ", +] + +[[package]] +name = "wasmtime-cranelift-shared" +version = "10.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dd32739326690e51c76551d7cbf29d371e7de4dc7b37d2d503be314ab5b7d04" +dependencies = [ + "anyhow", + "cranelift-codegen", + "cranelift-control", + "cranelift-native", + "gimli 0.27.3", + "object 0.30.4", + "target-lexicon", + "wasmtime-environ", +] + +[[package]] +name = "wasmtime-environ" +version = "10.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32b60e4ae5c9ae81750d8bc59110bf25444aa1d9266c19999c3b64b801db3c73" +dependencies = [ + "anyhow", + "cranelift-entity", + "gimli 0.27.3", + "indexmap 1.9.3", + "log", + "object 0.30.4", + "serde", + "target-lexicon", + "thiserror", + "wasm-encoder 0.29.0", + "wasmparser 0.107.0", + "wasmprinter", + "wasmtime-component-util", + "wasmtime-types", +] + +[[package]] +name = "wasmtime-fiber" +version = "10.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dd40c8d869916ee6b1f3fcf1858c52041445475ca8550aee81c684c0eb530ca" +dependencies = [ + "cc", + "cfg-if", + "rustix 0.37.23", + "wasmtime-asm-macros", + "windows-sys 0.48.0", +] + +[[package]] +name = "wasmtime-jit" +version = "10.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "655b23a10eddfe7814feb548a466f3f25aa4bb4f43098a147305c544a2de28e1" +dependencies = [ + "addr2line 0.19.0", + "anyhow", + "bincode 1.3.3", + "cfg-if", + "cpp_demangle 0.3.5", + "gimli 0.27.3", + "ittapi", + "log", + "object 0.30.4", + "rustc-demangle", + "rustix 0.37.23", + "serde", + "target-lexicon", + "wasmtime-environ", + "wasmtime-jit-debug", + "wasmtime-jit-icache-coherence", + "wasmtime-runtime", + "windows-sys 0.48.0", +] + +[[package]] +name = "wasmtime-jit-debug" +version = "10.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e46b7e98979a69d3df093076bde8431204e3c96a770e8d216fea365c627d88a4" +dependencies = [ + "object 0.30.4", + "once_cell", + "rustix 0.37.23", +] + +[[package]] +name = "wasmtime-jit-icache-coherence" +version = "10.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fb1e7c68ede63dc7a98c3e473162954e224951854e229c8b4e74697fe17dbdd" +dependencies = [ + "cfg-if", + "libc", + "windows-sys 0.48.0", +] + +[[package]] +name = "wasmtime-runtime" +version = "10.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "843e33bf9e0f0c57902c87a1dea1389cc23865c65f007214318dbdfcb3fd4ae5" +dependencies = [ + "anyhow", + "cc", + "cfg-if", + "encoding_rs", + "indexmap 1.9.3", + "libc", + "log", + "mach", + "memfd", + "memoffset 0.8.0", + "paste", + "rand", + "rustix 0.37.23", + "sptr", + "wasmtime-asm-macros", + "wasmtime-environ", + "wasmtime-fiber", + "wasmtime-jit-debug", + "windows-sys 0.48.0", +] + +[[package]] +name = "wasmtime-types" +version = "10.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7473a07bebd85671bada453123e3d465c8e0a59668ff79f5004076e6a2235ef5" +dependencies = [ + "cranelift-entity", + "serde", + "thiserror", + "wasmparser 0.107.0", +] + +[[package]] +name = "wasmtime-wasi" +version = "10.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aff7b3b3272ad5b4ba63c9aac6248da6f06a8227d0c0d6017d89225d794e966c" +dependencies = [ + "anyhow", + "async-trait", + "bitflags 1.3.2", + "cap-fs-ext", + "cap-rand", + "cap-std", + "cap-time-ext", + "fs-set-times", + "io-extras", + "libc", + "rustix 0.37.23", + "system-interface", + "thiserror", + "tracing", + "wasi-cap-std-sync", + "wasi-common", + "wasmtime", + "wiggle", + "windows-sys 0.48.0", +] + +[[package]] +name = "wasmtime-winch" +version = "10.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "351c9d4e60658dd0cf616c12c5508f86cc2cefcc0cff307eed0a31b23d3c0b70" +dependencies = [ + "anyhow", + "cranelift-codegen", + "gimli 0.27.3", + "object 0.30.4", + "target-lexicon", + "wasmparser 0.107.0", + "wasmtime-cranelift-shared", + "wasmtime-environ", + "winch-codegen", +] + +[[package]] +name = "wasmtime-wit-bindgen" +version = "10.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f114407efbd09e4ef67053b6ae54c16455a821ef2f6096597fcba83b7625e59c" +dependencies = [ + "anyhow", + "heck 0.4.1", + "wit-parser", +] + +[[package]] +name = "wast" +version = "35.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ef140f1b49946586078353a453a1d28ba90adfc54dde75710bc1931de204d68" +dependencies = [ + "leb128", +] + +[[package]] +name = "wast" +version = "66.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93cb43b0ac6dd156f2c375735ccfd72b012a7c0a6e6d09503499b8d3cb6e6072" +dependencies = [ + "leb128", + "memchr", + "unicode-width", + "wasm-encoder 0.35.0", +] + +[[package]] +name = "wat" +version = "1.0.77" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e367582095d2903caeeea9acbb140e1db9c7677001efa4347c3687fd34fe7072" +dependencies = [ + "wast 66.0.2", +] + [[package]] name = "web-sys" version = "0.3.64" @@ -10774,6 +11695,48 @@ dependencies = [ "web-sys", ] +[[package]] +name = "wiggle" +version = "10.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e63f150c6e39ef29a58139564c5ed7a0ef34d6df8a8eecd4233af85a576968d9" +dependencies = [ + "anyhow", + "async-trait", + "bitflags 1.3.2", + "thiserror", + "tracing", + "wasmtime", + "wiggle-macro", +] + +[[package]] +name = "wiggle-generate" +version = "10.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f31e961fb0a5ad3ff10689c85f327f4abf10b4cac033b9d7372ccbb106aea24" +dependencies = [ + "anyhow", + "heck 0.4.1", + "proc-macro2", + "quote", + "shellexpand", + "syn 1.0.109", + "witx", +] + +[[package]] +name = "wiggle-macro" +version = "10.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a28ae3d6b90f212beca7fab5910d0a3b1a171290c06eaa81bb39f41e6f74589" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", + "wiggle-generate", +] + [[package]] name = "winapi" version = "0.3.9" @@ -10805,6 +11768,22 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "winch-codegen" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1bf2ac354be169bb201de7867b84f45d91d0ef812f67f11c33f74a7f5a24e56" +dependencies = [ + "anyhow", + "cranelift-codegen", + "gimli 0.27.3", + "regalloc2", + "smallvec", + "target-lexicon", + "wasmparser 0.107.0", + "wasmtime-environ", +] + [[package]] name = "windows" version = "0.48.0" @@ -10965,6 +11944,55 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "winx" +version = "0.35.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c52a121f0fbf9320d5f2a9a5d82f6cb7557eda5e8b47fc3e7f359ec866ae960" +dependencies = [ + "bitflags 1.3.2", + "io-lifetimes 1.0.11", + "windows-sys 0.48.0", +] + +[[package]] +name = "winx" +version = "0.36.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "357bb8e2932df531f83b052264b050b81ba0df90ee5a59b2d1d3949f344f81e5" +dependencies = [ + "bitflags 2.4.0", + "windows-sys 0.48.0", +] + +[[package]] +name = "wit-parser" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6daec9f093dbaea0e94043eeb92ece327bbbe70c86b1f41aca9bbfefd7f050f0" +dependencies = [ + "anyhow", + "id-arena", + "indexmap 1.9.3", + "log", + "pulldown-cmark 0.8.0", + "semver", + "unicode-xid", + "url", +] + +[[package]] +name = "witx" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e366f27a5cabcddb2706a78296a40b8fcc451e1a6aba2fc1d94b4a01bdaaef4b" +dependencies = [ + "anyhow", + "log", + "thiserror", + "wast 35.0.2", +] + [[package]] name = "workspace-config" version = "1.3.0-alpha" @@ -10998,6 +12026,7 @@ dependencies = [ "clap", "clap_builder", "combine", + "crc32fast", "crossbeam-epoch", "crossbeam-queue", "crossbeam-utils", @@ -11005,6 +12034,7 @@ dependencies = [ "digest", "either", "fail", + "fallible-iterator", "fixedbitset", "flate2", "frunk_core", @@ -11016,6 +12046,7 @@ dependencies = [ "futures-task", "futures-util", "hashbrown 0.12.3", + "hashbrown 0.13.2", "hashbrown 0.14.0", "hyper", "indexmap 1.9.3", @@ -11065,6 +12096,8 @@ dependencies = [ "reqwest", "ring", "rust_decimal", + "rustc-hash", + "rustix 0.38.11", "rustls 0.21.7", "scopeguard", "sea-orm", @@ -11082,6 +12115,7 @@ dependencies = [ "sqlx-mysql", "sqlx-postgres", "sqlx-sqlite", + "stable_deref_trait", "strum 0.25.0", "subtle", "syn 1.0.109", @@ -11190,6 +12224,15 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9" +[[package]] +name = "zstd" +version = "0.11.2+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4" +dependencies = [ + "zstd-safe 5.0.2+zstd.1.5.2", +] + [[package]] name = "zstd" version = "0.12.4" @@ -11208,6 +12251,16 @@ dependencies = [ "zstd-safe 7.0.0", ] +[[package]] +name = "zstd-safe" +version = "5.0.2+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d2a5585e04f9eea4b2a3d1eca508c4dee9592a89ef6f450c11719da0726f4db" +dependencies = [ + "libc", + "zstd-sys", +] + [[package]] name = "zstd-safe" version = "6.0.6" diff --git a/Cargo.toml b/Cargo.toml index ac533e733f7a8..cddb598e6f0dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,6 +120,7 @@ arrow-buffer = "48" arrow-flight = "48" arrow-select = "48" arrow-ord = "48" +arrow-ipc = "48" tikv-jemalloc-ctl = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" } tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", features = [ "profiling", diff --git a/proto/catalog.proto b/proto/catalog.proto index c966b7bbe5eb0..12c72726ad57b 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -193,6 +193,7 @@ message Function { repeated data.DataType arg_types = 5; data.DataType return_type = 6; string language = 7; + // external function service only string link = 8; string identifier = 10; @@ -204,6 +205,11 @@ message Function { message ScalarFunction {} message TableFunction {} message AggregateFunction {} + + oneof extra { + expr.ExternalUdfExtra external = 14; + expr.WasmUdfExtra wasm = 15; + } } // See `TableCatalog` struct in frontend crate for more information. diff --git a/proto/expr.proto b/proto/expr.proto index 2f252d67c8400..745231a9cd2b5 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -233,6 +233,7 @@ message ExprNode { COL_DESCRIPTION = 2100; CAST_REGCLASS = 2101; } + // Only use this field for function call. For other types of expression, it should be UNSPECIFIED. Type function_type = 1; data.DataType return_type = 3; oneof rex_node { @@ -417,15 +418,39 @@ message WindowFunction { WindowFrame frame = 5; } +// Note: due to historic reasons, UserDefinedFunction is a oneof variant parallel to FunctionCall, +// while UserDefinedTableFunction is embedded as a field in TableFunction. + message UserDefinedFunction { repeated ExprNode children = 1; string name = 2; repeated data.DataType arg_types = 3; string language = 4; + // external function service only string link = 5; + // An unique identifier for the function. Different kinds of UDF may handle this field differently. + // - For external UDF, it's the name of the function in the external function service. + // It doesn't need to be unique across different external function servers. + // - For wasm UDF, it's the name of the function stored in remote object store, and needs to be globally unique. string identifier = 6; + + oneof extra { + ExternalUdfExtra external = 7; + WasmUdfExtra wasm = 8; + } +} + +// extra information for external functions +message ExternalUdfExtra {} + +// extra information for wasm functions +message WasmUdfExtra { + // We store the url of the remote object store here, as it's inconvenient to pass it to build expr. + // Maybe we can deprecate it in the future. + string wasm_storage_url = 1; } +// Additional information for user defined table functions. message UserDefinedTableFunction { repeated data.DataType arg_types = 3; string language = 4; diff --git a/proto/meta.proto b/proto/meta.proto index f2375eed7653a..f9947d278b468 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -514,6 +514,7 @@ message SystemParams { optional uint32 parallel_compact_size_mb = 11; optional uint32 max_concurrent_creating_streaming_jobs = 12; optional bool pause_on_next_bootstrap = 13; + optional string wasm_storage_url = 14; } message GetSystemParamsRequest {} diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 9e515ba471967..4188c3faa2b18 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -844,6 +844,9 @@ pub struct SystemConfig { /// Whether to pause all data sources on next bootstrap. #[serde(default = "default::system::pause_on_next_bootstrap")] pub pause_on_next_bootstrap: Option, + + #[serde(default = "default::system::wasm_storage_url")] + pub wasm_storage_url: Option, } impl SystemConfig { @@ -863,6 +866,7 @@ impl SystemConfig { max_concurrent_creating_streaming_jobs: self.max_concurrent_creating_streaming_jobs, pause_on_next_bootstrap: self.pause_on_next_bootstrap, telemetry_enabled: None, // deprecated + wasm_storage_url: self.wasm_storage_url, } } } diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index 0b35d53ae7e8c..ce29b7ab6a4d1 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -55,6 +55,7 @@ macro_rules! for_all_params { { backup_storage_directory, String, Some("backup".to_string()), true }, { max_concurrent_creating_streaming_jobs, u32, Some(1_u32), true }, { pause_on_next_bootstrap, bool, Some(false), true }, + { wasm_storage_url, String, Some("fs://@/tmp/risingwave".to_string()), false }, } }; } diff --git a/src/common/src/system_param/reader.rs b/src/common/src/system_param/reader.rs index 643905c89f919..8333e224a04a9 100644 --- a/src/common/src/system_param/reader.rs +++ b/src/common/src/system_param/reader.rs @@ -80,6 +80,10 @@ impl SystemParamsReader { self.prost.pause_on_next_bootstrap.unwrap_or(false) } + pub fn wasm_storage_url(&self) -> &str { + self.prost.wasm_storage_url.as_ref().unwrap() + } + pub fn to_kv(&self) -> Vec<(String, String)> { system_params_to_kv(&self.prost).unwrap() } diff --git a/src/config/example.toml b/src/config/example.toml index 141078ddf8a54..3af89710d47c6 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -166,3 +166,4 @@ backup_storage_url = "memory" backup_storage_directory = "backup" max_concurrent_creating_streaming_jobs = 1 pause_on_next_bootstrap = false +wasm_storage_url = "fs://@/tmp/risingwave" diff --git a/src/expr/core/Cargo.toml b/src/expr/core/Cargo.toml index ada9a3639525c..9801c2767c18b 100644 --- a/src/expr/core/Cargo.toml +++ b/src/expr/core/Cargo.toml @@ -44,6 +44,9 @@ risingwave_udf = { workspace = true } smallvec = "1" static_assertions = "1" thiserror = "1" +tokio = { version = "0.2", package = "madsim-tokio", features = [ + "rt-multi-thread", +] } tracing = "0.1" [target.'cfg(not(madsim))'.dependencies] diff --git a/src/expr/core/src/error.rs b/src/expr/core/src/error.rs index 7204b851e5b2b..ffb0e004c29e8 100644 --- a/src/expr/core/src/error.rs +++ b/src/expr/core/src/error.rs @@ -80,7 +80,9 @@ pub enum ExprError { Internal(#[from] anyhow::Error), #[error("UDF error: {0}")] - Udf(#[from] risingwave_udf::Error), + ExternalUdf(#[from] risingwave_udf::Error), + #[error("UDF error: {0}")] + WasmUdf(#[from] risingwave_udf::wasm::WasmUdfError), #[error("not a constant")] NotConstant, @@ -98,7 +100,7 @@ pub enum ExprError { InvalidState(String), } -static_assertions::const_assert_eq!(std::mem::size_of::(), 40); +static_assertions::const_assert_eq!(std::mem::size_of::(), 48); impl From for RwError { fn from(s: ExprError) -> Self { diff --git a/src/expr/core/src/expr/expr_udf.rs b/src/expr/core/src/expr/expr_udf.rs index a11af2434b4f9..9d694f4c4fda1 100644 --- a/src/expr/core/src/expr/expr_udf.rs +++ b/src/expr/core/src/expr/expr_udf.rs @@ -22,8 +22,11 @@ use cfg_or_panic::cfg_or_panic; use risingwave_common::array::{ArrayRef, DataChunk}; use risingwave_common::row::OwnedRow; use risingwave_common::types::{DataType, Datum}; -use risingwave_pb::expr::ExprNode; +use risingwave_pb::expr::user_defined_function::PbExtra; +use risingwave_pb::expr::{ExprNode, PbExternalUdfExtra, PbWasmUdfExtra}; +use risingwave_udf::wasm::{InstantiatedComponent, WasmEngine}; use risingwave_udf::ArrowFlightUdfClient; +use tracing::Instrument; use super::{BoxedExpression, Build}; use crate::expr::Expression; @@ -35,11 +38,36 @@ pub struct UdfExpression { arg_types: Vec, return_type: DataType, arg_schema: Arc, - client: Arc, - identifier: String, + imp: UdfImpl, span: await_tree::Span, } +enum UdfImpl { + External { + client: Arc, + identifier: String, + }, + Wasm { + component: InstantiatedComponent, + }, +} + +impl std::fmt::Debug for UdfImpl { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::External { client, identifier } => f + .debug_struct("External") + .field("client", client) + .field("identifier", identifier) + .finish(), + Self::Wasm { component: _ } => f + .debug_struct("Wasm") + // .field("component", component) + .finish(), + } + } +} + #[async_trait::async_trait] impl Expression for UdfExpression { fn return_type(&self) -> DataType { @@ -98,11 +126,20 @@ impl UdfExpression { ) .expect("failed to build record batch"); - let output = self - .client - .call(&self.identifier, input) - .instrument_await(self.span.clone()) - .await?; + let output: arrow_array::RecordBatch = match &self.imp { + UdfImpl::Wasm { component } => { + component + .eval(input) + .instrument_await(self.span.clone()) + .await? + } + UdfImpl::External { client, identifier } => { + client + .call(identifier, input) + .instrument_await(self.span.clone()) + .await? + } + }; if output.num_rows() != vis.count_ones() { bail!( "UDF returned {} rows, but expected {}", @@ -110,7 +147,6 @@ impl UdfExpression { vis.len(), ); } - let data_chunk = DataChunk::try_from(&output).expect("failed to convert UDF output to DataChunk"); let output = data_chunk.uncompact(vis.clone()); @@ -139,8 +175,26 @@ impl Build for UdfExpression { let return_type = DataType::from(prost.get_return_type().unwrap()); let udf = prost.get_rex_node().unwrap().as_udf().unwrap(); - // connect to UDF service - let client = get_or_create_client(&udf.link)?; + let imp = match &udf.extra { + None | Some(PbExtra::External(PbExternalUdfExtra {})) => UdfImpl::External { + client: get_or_create_flight_client(&udf.link)?, + identifier: udf.identifier.clone(), + }, + Some(PbExtra::Wasm(PbWasmUdfExtra { wasm_storage_url })) => { + let wasm_engine = WasmEngine::get_or_create(); + // Use `block_in_place` as an escape hatch to run async code here in sync context. + // Calling `block_on` directly will panic. + let component = tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on({ + wasm_engine + .load_component(wasm_storage_url, &udf.identifier) + .instrument(tracing::info_span!("load_component", %udf.identifier)) + }) + })?; + + UdfImpl::Wasm { component } + } + }; let arg_schema = Arc::new(Schema::new( udf.arg_types @@ -162,8 +216,7 @@ impl Build for UdfExpression { arg_types: udf.arg_types.iter().map(|t| t.into()).collect(), return_type, arg_schema, - client, - identifier: udf.identifier.clone(), + imp, span: format!("expr_udf_call ({})", udf.identifier).into(), }) } @@ -173,7 +226,7 @@ impl Build for UdfExpression { /// Get or create a client for the given UDF service. /// /// There is a global cache for clients, so that we can reuse the same client for the same service. -pub(crate) fn get_or_create_client(link: &str) -> Result> { +pub(crate) fn get_or_create_flight_client(link: &str) -> Result> { static CLIENTS: LazyLock>>> = LazyLock::new(Default::default); let mut clients = CLIENTS.lock().unwrap(); diff --git a/src/expr/core/src/table_function/user_defined.rs b/src/expr/core/src/table_function/user_defined.rs index 60fde34f9df1f..88d3252338fed 100644 --- a/src/expr/core/src/table_function/user_defined.rs +++ b/src/expr/core/src/table_function/user_defined.rs @@ -147,7 +147,7 @@ pub fn new_user_defined(prost: &PbTableFunction, chunk_size: usize) -> Result()?, )); // connect to UDF service - let client = crate::expr::expr_udf::get_or_create_client(&udtf.link)?; + let client = crate::expr::expr_udf::get_or_create_flight_client(&udtf.link)?; Ok(UserDefinedTableFunction { children: prost.args.iter().map(expr_build_from_prost).try_collect()?, diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 37f9f6326faea..513f81234a3ce 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -21,6 +21,7 @@ arrow-schema = { workspace = true } async-recursion = "1.0.5" async-trait = "0.1" auto_enums = { version = "0.8", features = ["futures03"] } +base64 = "0.21" bk-tree = "0.5.0" bytes = "1" clap = { version = "4", features = ["derive"] } diff --git a/src/frontend/src/catalog/function_catalog.rs b/src/frontend/src/catalog/function_catalog.rs index e56a36d85f8c2..492b017f19621 100644 --- a/src/frontend/src/catalog/function_catalog.rs +++ b/src/frontend/src/catalog/function_catalog.rs @@ -17,6 +17,7 @@ use risingwave_common::catalog::FunctionId; use risingwave_common::types::DataType; use risingwave_pb::catalog::function::PbKind; use risingwave_pb::catalog::PbFunction; +use risingwave_pb::expr::user_defined_function::PbExtra; use crate::catalog::OwnedByUserCatalog; @@ -31,6 +32,8 @@ pub struct FunctionCatalog { pub language: String, pub identifier: String, pub link: String, + // for backward compatibility, newly added fields should be optional + pub extra: Option, } #[derive(Clone, Display, PartialEq, Eq, Hash, Debug)] @@ -64,6 +67,7 @@ impl From<&PbFunction> for FunctionCatalog { language: prost.language.clone(), identifier: prost.identifier.clone(), link: prost.link.clone(), + extra: prost.extra.clone().map(Into::into), } } } diff --git a/src/frontend/src/expr/user_defined_function.rs b/src/frontend/src/expr/user_defined_function.rs index 1c9d06320ba15..131a5e34e1531 100644 --- a/src/frontend/src/expr/user_defined_function.rs +++ b/src/frontend/src/expr/user_defined_function.rs @@ -56,6 +56,7 @@ impl UserDefinedFunction { language: udf.get_language().clone(), identifier: udf.get_identifier().clone(), link: udf.get_link().clone(), + extra: udf.extra.clone(), }; Ok(Self { @@ -88,6 +89,8 @@ impl Expr for UserDefinedFunction { language: self.catalog.language.clone(), identifier: self.catalog.identifier.clone(), link: self.catalog.link.clone(), + + extra: self.catalog.extra.clone(), })), } } diff --git a/src/frontend/src/handler/create_function.rs b/src/frontend/src/handler/create_function.rs index 9d9db08204e49..a57b9d26ddf59 100644 --- a/src/frontend/src/handler/create_function.rs +++ b/src/frontend/src/handler/create_function.rs @@ -18,12 +18,15 @@ use itertools::Itertools; use pgwire::pg_response::StatementType; use risingwave_common::catalog::FunctionId; use risingwave_common::types::DataType; -use risingwave_pb::catalog::function::{Kind, ScalarFunction, TableFunction}; +use risingwave_pb::catalog::function::{Kind, PbExtra, ScalarFunction, TableFunction}; use risingwave_pb::catalog::Function; +use risingwave_pb::expr::{PbExternalUdfExtra, PbWasmUdfExtra}; use risingwave_sqlparser::ast::{ CreateFunctionBody, FunctionDefinition, ObjectName, OperateFunctionArg, }; +use risingwave_udf::wasm::WasmEngine; use risingwave_udf::ArrowFlightUdfClient; +use tracing::Instrument; use super::*; use crate::catalog::CatalogError; @@ -56,7 +59,7 @@ pub async fn handle_create_function( Some(lang) => { let lang = lang.real_value().to_lowercase(); match &*lang { - "python" | "java" => lang, + "python" | "java" | "wasm_v1" => lang, _ => { return Err(ErrorCode::InvalidParameterValue(format!( "language {} is not supported", @@ -70,12 +73,6 @@ pub async fn handle_create_function( // correct protocol. None => "".to_string(), }; - let Some(FunctionDefinition::SingleQuotedDef(identifier)) = params.as_ else { - return Err(ErrorCode::InvalidParameterValue("AS must be specified".to_string()).into()); - }; - let Some(CreateFunctionUsing::Link(link)) = params.using else { - return Err(ErrorCode::InvalidParameterValue("USING must be specified".to_string()).into()); - }; let return_type; let kind = match returns { Some(CreateFunctionReturns::Value(data_type)) => { @@ -88,14 +85,10 @@ pub async fn handle_create_function( return_type = bind_data_type(&columns[0].data_type)?; } else { // return type is a struct for multiple columns - let datatypes = columns - .iter() - .map(|c| bind_data_type(&c.data_type)) - .collect::>>()?; - let names = columns - .iter() - .map(|c| c.name.real_value()) - .collect::>(); + let it = columns + .into_iter() + .map(|c| bind_data_type(&c.data_type).map(|ty| (ty, c.name.real_value()))); + let (datatypes, names) = itertools::process_results(it, |it| it.unzip())?; return_type = DataType::new_struct(datatypes, names); } Kind::Table(TableFunction {}) @@ -108,6 +101,10 @@ pub async fn handle_create_function( } }; + let Some(using) = params.using else { + return Err(ErrorCode::InvalidParameterValue("USING must be specified".to_string()).into()); + }; + let mut arg_types = vec![]; for arg in args.unwrap_or_default() { arg_types.push(bind_data_type(&arg.data_type)?); @@ -119,7 +116,7 @@ pub async fn handle_create_function( let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, name)?; let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?; - // check if function exists + // check if the function exists in the catalog if (session.env().catalog_reader().read_guard()) .get_schema_by_id(&database_id, &schema_id)? .get_function_by_name_args(&function_name, &arg_types) @@ -132,32 +129,80 @@ pub async fn handle_create_function( return Err(CatalogError::Duplicated("function", name).into()); } - // check the service - let client = ArrowFlightUdfClient::connect(&link) - .await - .map_err(|e| anyhow!(e))?; - /// A helper function to create a unnamed field from data type. - fn to_field(data_type: arrow_schema::DataType) -> arrow_schema::Field { - arrow_schema::Field::new("", data_type, true) - } - let args = arrow_schema::Schema::new( - arg_types - .iter() - .map::, _>(|t| Ok(to_field(t.try_into()?))) - .try_collect::<_, Fields, _>()?, - ); - let returns = arrow_schema::Schema::new(match kind { - Kind::Scalar(_) => vec![to_field(return_type.clone().try_into()?)], - Kind::Table(_) => vec![ - arrow_schema::Field::new("row_index", arrow_schema::DataType::Int32, true), - to_field(return_type.clone().try_into()?), - ], - _ => unreachable!(), - }); - client - .check(&identifier, &args, &returns) - .await - .map_err(|e| anyhow!(e))?; + let link; + let identifier; + + // judge the type of the UDF, and do some type-specific checks correspondingly. + let extra = match using { + CreateFunctionUsing::Link(l) => { + let Some(FunctionDefinition::SingleQuotedDef(id)) = params.as_ else { + return Err(ErrorCode::InvalidParameterValue( + "AS must be specified for USING link".to_string(), + ) + .into()); + }; + identifier = id; + link = l; + + // check UDF server + { + let client = ArrowFlightUdfClient::connect(&link) + .await + .map_err(|e| anyhow!(e))?; + /// A helper function to create a unnamed field from data type. + fn to_field(data_type: arrow_schema::DataType) -> arrow_schema::Field { + arrow_schema::Field::new("", data_type, true) + } + let args = arrow_schema::Schema::new( + arg_types + .iter() + .map::, _>(|t| Ok(to_field(t.try_into()?))) + .try_collect::<_, Fields, _>()?, + ); + let returns = arrow_schema::Schema::new(match kind { + Kind::Scalar(_) => vec![to_field(return_type.clone().try_into()?)], + Kind::Table(_) => vec![ + arrow_schema::Field::new("row_index", arrow_schema::DataType::Int32, true), + to_field(return_type.clone().try_into()?), + ], + _ => unreachable!(), + }); + client + .check(&identifier, &args, &returns) + .await + .map_err(|e| anyhow!(e))?; + } + + PbExtra::External(PbExternalUdfExtra {}) + } + CreateFunctionUsing::Base64(module) => { + link = String::new(); + identifier = format!("{}.{}.{}", database_id, schema_id, function_name); + if language != "wasm_v1" { + return Err(ErrorCode::InvalidParameterValue( + "LANGUAGE should be wasm_v1 for USING base64".to_string(), + ) + .into()); + } + + use base64::prelude::{Engine, BASE64_STANDARD}; + let module = BASE64_STANDARD.decode(module).map_err(|e| anyhow!(e))?; + + let system_params = session.env().meta_client().get_system_params().await?; + let wasm_storage_url = system_params.wasm_storage_url(); + + let wasm_engine = WasmEngine::get_or_create(); + wasm_engine + .compile_and_upload_component(module, wasm_storage_url, &identifier) + .instrument(tracing::info_span!("compile_and_upload_component", %identifier)) + .await + .map_err(|e| anyhow!(e))?; + + PbExtra::Wasm(PbWasmUdfExtra { + wasm_storage_url: wasm_storage_url.to_string(), + }) + } + }; let function = Function { id: FunctionId::placeholder().0, @@ -171,6 +216,7 @@ pub async fn handle_create_function( identifier, link, owner: session.user_id(), + extra: Some(extra), }; let catalog_writer = session.catalog_writer()?; diff --git a/src/meta/src/backup_restore/restore.rs b/src/meta/src/backup_restore/restore.rs index ab4696e62f9bd..47d14330f5fb2 100644 --- a/src/meta/src/backup_restore/restore.rs +++ b/src/meta/src/backup_restore/restore.rs @@ -320,6 +320,7 @@ mod tests { SystemParams { state_store: Some("state_store".to_string()), data_directory: Some("data_directory".to_string()), + wasm_storage_url: None, ..SystemConfig::default().into_init_system_params() } } diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index d2007dcab45d6..d50c8bc5d31e3 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -585,6 +585,7 @@ impl CatalogManager { #[cfg(not(test))] user_core.ensure_user_id(function.owner)?; + tracing::debug!("create function: {:?}", function); let mut functions = BTreeMapTransaction::new(&mut database_core.functions); functions.insert(function.id, function.clone()); commit_meta!(self, functions)?; diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 96e58397dfa82..38b826920d158 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -46,7 +46,7 @@ pub trait ObjectRangeBounds = RangeBounds + Clone + Send + Sync + std::fm /// Partitions a set of given paths into two vectors. The first vector contains all local paths, and /// the second contains all remote paths. -pub fn partition_object_store_paths(paths: &[String]) -> Vec { +fn partition_object_store_paths(paths: &[String]) -> Vec { // ToDo: Currently the result is a copy of the input. Would it be worth it to use an in-place // partition instead? let mut vec_rem = vec![]; @@ -785,6 +785,11 @@ impl MonitoredObjectStore { } } +/// Creates a new [`ObjectStore`] from the given `url`. Credentials are configured via environment +/// variables. +/// +/// # Panics +/// If the `url` is invalid. Therefore, it is only suitable to be used during startup. pub async fn parse_remote_object_store( url: &str, metrics: Arc, diff --git a/src/prost/build.rs b/src/prost/build.rs index 5722a04767962..12476f60b9ac0 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -101,6 +101,9 @@ fn main() -> Result<(), Box> { .type_attribute("data.Datum", "#[derive(Eq, Hash)]") .type_attribute("expr.FunctionCall", "#[derive(Eq, Hash)]") .type_attribute("expr.UserDefinedFunction", "#[derive(Eq, Hash)]") + .type_attribute("expr.UserDefinedFunction.extra", "#[derive(Eq, Hash)]") + .type_attribute("expr.ExternalUdfExtra", "#[derive(Eq, Hash)]") + .type_attribute("expr.WasmUdfExtra", "#[derive(Eq, Hash)]") .type_attribute( "plan_common.ColumnDesc.generated_or_default_column", "#[derive(Eq, Hash)]", diff --git a/src/prost/src/lib.rs b/src/prost/src/lib.rs index a5e0cf0e82664..b89a471c223de 100644 --- a/src/prost/src/lib.rs +++ b/src/prost/src/lib.rs @@ -166,6 +166,25 @@ impl FromStr for crate::expr::table_function::PbType { } } +// They are the same oneof, but different types +impl From for expr::user_defined_function::Extra { + fn from(value: catalog::function::Extra) -> Self { + match value { + catalog::function::Extra::External(v) => Self::External(v), + catalog::function::Extra::Wasm(v) => Self::Wasm(v), + } + } +} + +impl From for catalog::function::Extra { + fn from(value: expr::user_defined_function::Extra) -> Self { + match value { + expr::user_defined_function::Extra::External(v) => Self::External(v), + expr::user_defined_function::Extra::Wasm(v) => Self::Wasm(v), + } + } +} + #[cfg(test)] mod tests { use crate::data::{data_type, DataType}; diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index 5d802bae99cdc..9470724f9a0c7 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -2576,6 +2576,7 @@ impl fmt::Display for CreateFunctionBody { #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum CreateFunctionUsing { Link(String), + Base64(String), } impl fmt::Display for CreateFunctionUsing { @@ -2583,6 +2584,9 @@ impl fmt::Display for CreateFunctionUsing { write!(f, "USING ")?; match self { CreateFunctionUsing::Link(uri) => write!(f, "LINK '{uri}'"), + CreateFunctionUsing::Base64(s) => { + write!(f, "BASE64 '{s}'") + } } } } diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index 4188f06f76ae3..338fffc174bb1 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -92,6 +92,7 @@ define_keywords!( ATOMIC, AUTHORIZATION, AVG, + BASE64, BEGIN, BEGIN_FRAME, BEGIN_PARTITION, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 5cc094a204268..a6a08ab089ae9 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -2303,16 +2303,18 @@ impl Parser { } fn parse_create_function_using(&mut self) -> Result { - let keyword = self.expect_one_of_keywords(&[Keyword::LINK])?; - - let uri = self.parse_literal_string()?; + let keyword = self.expect_one_of_keywords(&[Keyword::LINK, Keyword::BASE64])?; match keyword { - Keyword::LINK => Ok(CreateFunctionUsing::Link(uri)), - _ => self.expected( - "LINK, got {:?}", - TokenWithLocation::wrap(Token::make_keyword(format!("{keyword:?}").as_str())), - ), + Keyword::LINK => { + let uri = self.parse_literal_string()?; + Ok(CreateFunctionUsing::Link(uri)) + } + Keyword::BASE64 => { + let base64 = self.parse_literal_string()?; + Ok(CreateFunctionUsing::Base64(base64)) + } + _ => unreachable!("{}", keyword), } } diff --git a/src/storage/hummock_trace/Cargo.toml b/src/storage/hummock_trace/Cargo.toml index 150b35b79cda0..316447fb41934 100644 --- a/src/storage/hummock_trace/Cargo.toml +++ b/src/storage/hummock_trace/Cargo.toml @@ -25,7 +25,7 @@ tokio = { version = "0.2", package = "madsim-tokio" } tracing = "0.1" [dev-dependencies] -itertools = "0.10.5" +itertools = "0.11" mockall = "0.11.4" [lints] diff --git a/src/udf/Cargo.toml b/src/udf/Cargo.toml index 2d13f39bdddc4..0f8e80c42612f 100644 --- a/src/udf/Cargo.toml +++ b/src/udf/Cargo.toml @@ -13,14 +13,22 @@ normal = ["workspace-hack"] [dependencies] arrow-array = { workspace = true } arrow-flight = { workspace = true } +arrow-ipc = { workspace = true } arrow-schema = { workspace = true } arrow-select = { workspace = true } +base64 = "0.21" +bytes = "1.4" cfg-or-panic = "0.2" futures-util = "0.3.28" +itertools = "0.11" +risingwave_object_store = { workspace = true } static_assertions = "1" thiserror = "1" tokio = { version = "0.2", package = "madsim-tokio", features = ["rt", "macros"] } tonic = { workspace = true } +tracing = "0.1" +wasmtime = { version = "10", features = ["component-model"] } +wasmtime-wasi = { version = "10" } [lints] workspace = true diff --git a/src/udf/src/lib.rs b/src/udf/src/lib.rs index 513551a9108af..f303d56a850ef 100644 --- a/src/udf/src/lib.rs +++ b/src/udf/src/lib.rs @@ -12,8 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(lazy_cell)] +#![feature(lint_reasons)] + mod error; mod external; pub use error::{Error, Result}; pub use external::ArrowFlightUdfClient; +pub mod wasm; diff --git a/src/udf/src/wasm.rs b/src/udf/src/wasm.rs new file mode 100644 index 0000000000000..06c84c9baf7db --- /dev/null +++ b/src/udf/src/wasm.rs @@ -0,0 +1,307 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![expect(dead_code)] + +use std::sync::Arc; + +use bytes::Bytes; +use itertools::Itertools; +use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; +use risingwave_object_store::object::{parse_remote_object_store, ObjectStoreImpl}; +use tokio::sync::Mutex; +use tracing::debug; +use wasmtime::component::{Component, Linker}; +use wasmtime::{Config, Store, WasmBacktraceDetails}; + +pub mod component { + mod bindgen { + wasmtime::component::bindgen!({ + world: "udf", + path: "wit/udf.wit", + async: true // required for wasi + }); + } + pub use bindgen::{EvalErrno, RecordBatch as WasmRecordBatch, Schema, Udf}; +} + +/// Host state +/// +/// Currently this is only a placeholder. No states. +struct WasmState { + wasi_ctx: wasmtime_wasi::preview2::WasiCtx, + table: wasmtime_wasi::preview2::Table, +} + +impl WasmState { + pub fn try_new() -> WasmUdfResult { + let mut table = wasmtime_wasi::preview2::Table::new(); + + let wasi_ctx = wasmtime_wasi::preview2::WasiCtxBuilder::new() + // Note: panic message is printed, and only available in WASI. + // TODO: redirect to tracing to make it clear it's from WASM. + .inherit_stdout() + .inherit_stderr() + .build(&mut table)?; + Ok(Self { wasi_ctx, table }) + } +} + +impl wasmtime_wasi::preview2::WasiView for WasmState { + fn table(&self) -> &wasmtime_wasi::preview2::Table { + &self.table + } + + fn table_mut(&mut self) -> &mut wasmtime_wasi::preview2::Table { + &mut self.table + } + + fn ctx(&self) -> &wasmtime_wasi::preview2::WasiCtx { + &self.wasi_ctx + } + + fn ctx_mut(&mut self) -> &mut wasmtime_wasi::preview2::WasiCtx { + &mut self.wasi_ctx + } +} + +type ArrowResult = std::result::Result; +type WasmtimeResult = std::result::Result; + +pub struct InstantiatedComponent { + store: Arc>>, + bindings: component::Udf, + #[expect(dead_code)] + instance: wasmtime::component::Instance, +} + +use convert::*; +mod convert { + use super::*; + + pub fn to_wasm_batch( + batch: arrow_array::RecordBatch, + ) -> WasmUdfResult { + let mut buf = vec![]; + { + let mut writer = arrow_ipc::writer::StreamWriter::try_new(&mut buf, &batch.schema())?; + writer.write(&batch)?; + writer.finish()?; + } + Ok(buf) + } + + pub fn from_wasm_batch( + batch: &component::WasmRecordBatch, + ) -> WasmUdfResult> + '_> { + let reader = arrow_ipc::reader::StreamReader::try_new(&batch[..], None).unwrap(); + + Ok(reader) + } + + // pub fn from_wasm_schema(schema: &component::Schema) -> WasmUdfResult + // { } +} + +impl InstantiatedComponent { + pub async fn eval( + &self, + input: arrow_array::RecordBatch, + ) -> WasmUdfResult { + // let input_schema = self.bindings.call_input_schema(&mut self.store)?; + // let output_schema = self.bindings.call_output_schema(&mut self.store)?; + + let input = to_wasm_batch(input)?; + // TODO: Use tokio Mutex to use it across the await here. Does it make sense? + let result = self + .bindings + .call_eval(&mut *self.store.lock().await, &input) + .await??; + let result = from_wasm_batch(&result)?; + let Some((record_batch,)) = result.collect_tuple() else { + return Err(WasmUdfError::Encoding( + "should return only one record batch in IPC buffer".to_string(), + )); + }; + Ok(record_batch?) + } +} + +/// The interface to interact with the wasm engine. +/// +/// It can be safely shared across threads and is a cheap cloneable handle to the actual engine. +#[derive(Clone)] +pub struct WasmEngine { + engine: wasmtime::Engine, +} + +impl WasmEngine { + #[expect(clippy::new_without_default)] + pub fn new() -> Self { + // Is this expensive? + let mut config = Config::new(); + config + .wasm_component_model(true) + // required for wasi + .async_support(true) + .wasm_backtrace(true) + .wasm_backtrace_details(WasmBacktraceDetails::Enable); + + Self { + engine: wasmtime::Engine::new(&config).expect("failed to create wasm engine"), + } + } + + pub fn get_or_create() -> Self { + use std::sync::LazyLock; + static WASM_ENGINE: LazyLock = LazyLock::new(WasmEngine::new); + WASM_ENGINE.clone() + } + + pub async fn compile_and_upload_component( + &self, + binary: Vec, + wasm_storage_url: &str, + identifier: &str, + ) -> WasmUdfResult<()> { + let object_store = get_wasm_storage(wasm_storage_url).await?; + let binary: Bytes = binary.into(); + object_store + .upload(&raw_path(identifier), binary.clone()) + .await?; + + // This is expensive. + let component = Component::from_binary(&self.engine, &binary[..])?; + tracing::info!("wasm component loaded"); + + // This function is similar to the Engine::precompile_module method where it produces an + // artifact of Wasmtime which is suitable to later pass into Module::deserialize. If a + // module is never instantiated then it’s recommended to use Engine::precompile_module + // instead of this method, but if a module is both instantiated and serialized then this + // method can be useful to get the serialized version without compiling twice. + let serialized = component.serialize()?; + debug!( + "compile component, size: {} -> {}", + binary.len(), + serialized.len() + ); + + // check the component can be instantiated + let mut linker = Linker::new(&self.engine); + // A Store is intended to be a short-lived object in a program. No form of GC is + // implemented at this time so once an instance is created within a Store it will not be + // deallocated until the Store itself is dropped. This makes Store unsuitable for + // creating an unbounded number of instances in it because Store will never release this + // memory. It's recommended to have a Store correspond roughly to the lifetime of a + // "main instance" that an embedding is interested in executing. + + // So creating a Store is cheap? + + let mut store = Store::new(&self.engine, WasmState::try_new()?); + wasmtime_wasi::preview2::wasi::command::add_to_linker(&mut linker)?; + let (_bindings, _instance) = + component::Udf::instantiate_async(&mut store, &component, &linker).await?; + + object_store + .upload(&compiled_path(identifier), serialized.into()) + .await?; + + tracing::debug!( + path = compiled_path(identifier), + "wasm component compiled and uploaded", + ); + + Ok(()) + } + + pub async fn load_component( + &self, + wasm_storage_url: &str, + identifier: &str, + ) -> WasmUdfResult { + let object_store = get_wasm_storage(wasm_storage_url).await?; + let serialized_component = object_store.read(&compiled_path(identifier), ..).await?; + + // This is fast. + let component = unsafe { + // safety: it's serialized by ourself + // https://docs.rs/wasmtime/latest/wasmtime/struct.Module.html#unsafety-1 + Component::deserialize(&self.engine, serialized_component)? + }; + + let mut linker = Linker::new(&self.engine); + let mut store = Store::new(&self.engine, WasmState::try_new()?); + wasmtime_wasi::preview2::wasi::command::add_to_linker(&mut linker)?; + let (bindings, instance) = + component::Udf::instantiate_async(&mut store, &component, &linker).await?; + + Ok(InstantiatedComponent { + store: Arc::new(Mutex::new(store)), + bindings, + instance, + }) + } +} + +pub type WasmUdfResult = std::result::Result; + +#[derive(thiserror::Error, Debug)] +pub enum WasmUdfError { + #[error("wasm error: {0:#}")] + Wasmtime(#[from] wasmtime::Error), + #[error("arrow error: {0}")] + Arrow(#[from] arrow_schema::ArrowError), + #[error("eval error: {0}")] + Eval(#[from] component::EvalErrno), + #[error("{0}")] + Encoding(String), + #[error("object store error: {0}")] + ObjectStore(#[from] Box), + #[error("object store error: {0}")] + ObjectStore1(String), +} + +const WASM_RAW_MODULE_DIR: &str = "wasm/raw"; +const WASM_COMPILED_MODULE_DIR: &str = "wasm/compiled"; + +fn raw_path(identifier: &str) -> String { + format!("{}/{}", WASM_RAW_MODULE_DIR, identifier) +} + +fn compiled_path(identifier: &str) -> String { + format!("{}/{}", WASM_COMPILED_MODULE_DIR, identifier) +} + +async fn get_wasm_storage(wasm_storage_url: &str) -> WasmUdfResult { + if wasm_storage_url.starts_with("memory") { + // because we create a new store every time... + return Err(WasmUdfError::ObjectStore1( + "memory storage is not supported".to_string(), + )); + } + // Note: it will panic if the url is invalid. We did a validation on meta startup. + let object_store = parse_remote_object_store( + wasm_storage_url, + Arc::new(ObjectStoreMetrics::unused()), + "Wasm Engine", + ) + .await; + Ok(object_store) +} + +impl From for WasmUdfError { + fn from(e: risingwave_object_store::object::ObjectError) -> Self { + WasmUdfError::ObjectStore(Box::new(e)) + } +} diff --git a/src/udf/wit/udf.wit b/src/udf/wit/udf.wit new file mode 100644 index 0000000000000..22b629b8d8cbc --- /dev/null +++ b/src/udf/wit/udf.wit @@ -0,0 +1,49 @@ +package risingwave:udf // Maybe we can use a different package name? udf as a world name is enough. + +world udf { + use types.{schema, record-batch, eval-errno} + + // TODO: is schema needed? since record-batch already contains schema. + export input-schema: func() -> schema + export output-schema: func() -> schema + + // export init: func(inputs: list) -> result<_, init-errno> + export eval: func(batch: record-batch) -> result +} + +interface types { + // in Arrow IPC Streaming Format + type record-batch = list + + enum data-type { + dt-i16, + dt-i32, + dt-i64, + dt-bool, + dt-string, + } + + type schema = list + + record field { + name: string, + data-type: data-type, + } + + enum init-errno { + invalid-params, + } + + enum eval-errno { + numeric-overflow, + division-by-zero, + } + + union scalar { + s16, + s32, + s64, + bool, + string, + } +} diff --git a/src/udf/wit_example/.gitignore b/src/udf/wit_example/.gitignore new file mode 100644 index 0000000000000..cb53a1ef0fa8d --- /dev/null +++ b/src/udf/wit_example/.gitignore @@ -0,0 +1,4 @@ +*.sql +*.wasm +Cargo.lock +tinygo/gen diff --git a/src/udf/wit_example/README.md b/src/udf/wit_example/README.md new file mode 100644 index 0000000000000..39ae9984dd87a --- /dev/null +++ b/src/udf/wit_example/README.md @@ -0,0 +1,54 @@ +# WASM UDF examples + +TODO: +- [ ] error handing +- [ ] schema validation + +## Required tools + +- [wasm-tools](https://github.com/bytecodealliance/wasm-tools): to create WASM component from WASM module. +- [wit-bindgen](https://github.com/bytecodealliance/wit-bindgen) CLI: to generate guest code from WIT file. (Not required for Rust guest) + +``` +cargo install wasm-tools@1.0.35 +cargo install wit-bindgen-cli@0.8.0 +``` + +> **Note** +> +> WASM component model IS NOT stable and may change. Please use the version specified above. + +## Examples for different guest languages + +Refer to each language's directory for an example. Some additional notes are listed below. +Generally you will just need to copy the `wit` directory and follow the examples. + +It's not guaranteed to work if you used different versions of toolchains and project dependencies. + +### Rust + +nothing special + +### Golang + +#### TinyGo + +[TinyGo](https://tinygo.org/getting-started/install/) is an alternative Go compiler for small devices. It also supports WASM. + +tested under +``` +> tinygo version +tinygo version 0.28.1 darwin/amd64 (using go version go1.20.2 and LLVM version 15.0.0) +``` + +- TinyGo cannot compile the lz4 package ([Add support for reading Go assembly files by aykevl · Pull Request #3103 · tinygo-org/tinygo](https://github.com/tinygo-org/tinygo/pull/3103)), which is used by Arrow. Can workaround by using the forked version of arrow, which removed lz4. + +``` +replace github.com/apache/arrow/go/v13 => github.com/xxchan/arrow/go/v13 v13.0.0-20230713134335-45002b4934f9 +``` + +#### TODO: Go 1.21 + +(requires full wasi_snapshot_preview1) +- [Go 1.21 Release Notes - The Go Programming Language](https://tip.golang.org/doc/go1.21) +- [all: add GOOS=wasip1 GOARCH=wasm port · Issue #58141 · golang/go](https://github.com/golang/go/issues/58141) diff --git a/src/udf/wit_example/build.sh b/src/udf/wit_example/build.sh new file mode 100755 index 0000000000000..01b70932cc7a3 --- /dev/null +++ b/src/udf/wit_example/build.sh @@ -0,0 +1,95 @@ +#!/bin/bash + +set -e + +# usage: ./build.sh --lang [rust|go] [--rebuild] + +while [[ $# -gt 0 ]]; do + key="$1" + + case $key in + --lang) + lang="$2" + shift # past argument + shift # past value + ;; + --rebuild) + rebuild="true" + shift # past argument + ;; + *) # unknown option + shift # past argument + ;; + esac +done + +if [ -z "$lang" ]; then + echo "Please specify --lang [rust|go]" + exit 1 +fi + +if [ "$(wasm-tools -V)" != "wasm-tools 1.0.35" ]; then + echo "wasm-tools 1.0.35 is required" + exit 1 +fi + +path=$(dirname "$0") + +function build_rust() { + echo "--- Build Rust guest component" + + cd "$path/rust" + + rustup target add wasm32-wasi + + profile=release + if [ "$profile" == "dev" ]; then + target_dir=debug + else + target_dir=$profile + fi + + cargo build --target=wasm32-wasi --profile "${profile}" + mv ./target/wasm32-wasi/"${target_dir}"/my_udf.wasm ../my_udf.rust.wasm + + cd .. +} + +function build_go() { + echo "--- Build TinyGo guest component" + # Note: TinyGo will rebuild the whole binary every time and it's slow. + + cd "$path/tinygo" + go generate # generate bindings for Go + tinygo build -target=wasi -o my_udf.go.wasm my_udf.go + wasm-tools component embed ../../wit my_udf.go.wasm -o my_udf.go.wasm + + mv ./my_udf.go.wasm .. + cd .. +} + +# if the file "my_udf.$lang.wasm" does not exist, or --rebuild is specified, rebuild it +if [ ! -f "my_udf.$lang.wasm" ] || [ "$rebuild" == "true" ]; then + if [ "$lang" == "rust" ]; then + build_rust + elif [ "$lang" == "go" ]; then + build_go + else + echo "Unknown language: $lang" + exit 1 + fi +else + echo "my_udf.$lang.wasm exists, skip building" +fi + + + +# (WASI adaptor) if file not found, download from +if [ ! -f wasi_snapshot_preview1.reactor.wasm ]; then + wget https://github.com/bytecodealliance/wasmtime/releases/download/v10.0.1/wasi_snapshot_preview1.reactor.wasm +fi + +echo wasm-tools component new "my_udf.$lang.wasm" -o my_udf.component.wasm +wasm-tools component new "my_udf.$lang.wasm" -o my_udf.component.wasm \ + --adapt wasi_snapshot_preview1=./wasi_snapshot_preview1.reactor.wasm +wasm-tools validate my_udf.component.wasm --features component-model diff --git a/src/udf/wit_example/create.sh b/src/udf/wit_example/create.sh new file mode 100755 index 0000000000000..f55e94683a2c5 --- /dev/null +++ b/src/udf/wit_example/create.sh @@ -0,0 +1,27 @@ +#!/bin/bash + +set -e + +path=$(dirname "$0") +cd "$path" + +if [ ! -f "./my_udf.component.wasm" ]; then + echo "my_udf.component.wasm not found, please run ./build.sh first" + exit 1 +fi + +echo "size of wasm: $(stat -f "%z" my_udf.component.wasm) bytes" +encoded=$(base64 -i my_udf.component.wasm) +echo "size of encoded wasm: ${#encoded} bytes" +# debug: 23557258 +# release: 12457072 + +psql -h localhost -p 4566 -d dev -U root -c "DROP FUNCTION IF EXISTS count_char;" +sql="CREATE FUNCTION count_char (s varchar, c varchar) RETURNS BIGINT LANGUAGE wasm_v1 USING BASE64 '$encoded';" +echo "$sql" > create.sql +psql -h localhost -p 4566 -d dev -U root -v "ON_ERROR_STOP=1" -f ./create.sql + +# test +psql -h localhost -p 4566 -d dev -U root -c "SELECT count_char('aabca', 'a');" +psql -h localhost -p 4566 -d dev -U root -c "SELECT count_char('aabca', 'b');" +psql -h localhost -p 4566 -d dev -U root -c "SELECT count_char('aabca', 'd');" diff --git a/src/udf/wit_example/rust/Cargo.toml b/src/udf/wit_example/rust/Cargo.toml new file mode 100644 index 0000000000000..76a25356a885a --- /dev/null +++ b/src/udf/wit_example/rust/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "my_udf" +version = "0.1.0" +edition = "2021" +homepage = "https://github.com/risingwavelabs/risingwave" +license = "Apache-2.0" +repository = "https://github.com/risingwavelabs/risingwave" + +[workspace] + +[dependencies] +wit-bindgen = "0.8" +arrow-array = "43" +arrow-schema = "43" +arrow-ipc = "43" +arrow = "43" +arrow-data = "43" +arrow-buffer = "43" + + +[lib] +crate-type = ["cdylib"] + +[profile.release] +debug = 1 \ No newline at end of file diff --git a/src/udf/wit_example/rust/src/lib.rs b/src/udf/wit_example/rust/src/lib.rs new file mode 100644 index 0000000000000..2c90529aea8b1 --- /dev/null +++ b/src/udf/wit_example/rust/src/lib.rs @@ -0,0 +1,118 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use arrow_array::Array; + +use crate::risingwave::udf::types::{DataType, Field}; + +wit_bindgen::generate!({ + // optional, since there's only one world. We make it explicit here. + world: "udf", + // path is relative to Cargo.toml + path:"../../wit" +}); + +// Define a custom type and implement the generated `Udf` trait for it which +// represents implementing all the necesssary exported interfaces for this +// component. +/// User defined function tou count number of specified characters. +/// Ref +struct CountChar; + +export_udf!(CountChar); + +fn count_char(s: &str, char: &str) -> i64 { + let mut count = 0; + let char = char.bytes().next().unwrap(); + + for c in s.bytes() { + if c == char { + count += 1; + } + } + count +} + +impl Udf for CountChar { + fn eval(batch: RecordBatch) -> Result { + // Read data from IPC buffer + let batch = arrow_ipc::reader::StreamReader::try_new(batch.as_slice(), None).unwrap(); + + // Do UDF computation (for each batch, for each row, do scalar -> scalar) + let mut ret = arrow_array::builder::Int64Builder::new(); + for batch in batch { + let batch = batch.unwrap(); + for i in 0..batch.num_rows() { + let s = batch + .column(0) + .as_any() + .downcast_ref::() + .expect( + format!( + "expected StringArray, got {:?}", + batch.column(0).data_type() + ) + .as_str(), + ) + .value(i); + let c = batch + .column(1) + .as_any() + .downcast_ref::() + .expect( + format!( + "expected StringArray, got {:?}", + batch.column(1).data_type() + ) + .as_str(), + ) + .value(i); + ret.append_value(count_char(s, c)); + } + } + + // Write data to IPC buffer + let mut buf = vec![]; + { + let array = ret.finish(); + let schema = arrow_schema::Schema::new(vec![arrow_schema::Field::new( + "result", + arrow_schema::DataType::Int64, + false, + )]); + let mut writer = arrow_ipc::writer::StreamWriter::try_new(&mut buf, &schema).unwrap(); + let batch = + arrow_array::RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap(); + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + } + Ok(buf) + } + + fn input_schema() -> Schema { + vec![Field { + name: "input".to_string(), + data_type: DataType::DtString, + }] + } + + fn output_schema() -> Schema { + vec![Field { + name: "result".to_string(), + data_type: DataType::DtI64, + }] + } +} diff --git a/src/udf/wit_example/tinygo/go.mod b/src/udf/wit_example/tinygo/go.mod new file mode 100644 index 0000000000000..9e2110de9ad42 --- /dev/null +++ b/src/udf/wit_example/tinygo/go.mod @@ -0,0 +1,19 @@ +module github.com/my_account/my_udf + +go 1.20 + +require github.com/apache/arrow/go/v13 v13.0.0-20230712165359-085a0baf7868 + +require ( + github.com/goccy/go-json v0.10.0 // indirect + github.com/google/flatbuffers v23.5.26+incompatible // indirect + github.com/klauspost/compress v1.16.7 // indirect + github.com/klauspost/cpuid/v2 v2.2.3 // indirect + github.com/zeebo/xxh3 v1.0.2 // indirect + golang.org/x/mod v0.8.0 // indirect + golang.org/x/sys v0.5.0 // indirect + golang.org/x/tools v0.6.0 // indirect + golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect +) + +replace github.com/apache/arrow/go/v13 => github.com/xxchan/arrow/go/v13 v13.0.0-20230713134335-45002b4934f9 diff --git a/src/udf/wit_example/tinygo/go.sum b/src/udf/wit_example/tinygo/go.sum new file mode 100644 index 0000000000000..347e7e70feda6 --- /dev/null +++ b/src/udf/wit_example/tinygo/go.sum @@ -0,0 +1,30 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/goccy/go-json v0.10.0 h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA= +github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/google/flatbuffers v23.5.26+incompatible h1:M9dgRyhJemaM4Sw8+66GHBu8ioaQmyPLg1b8VwK5WJg= +github.com/google/flatbuffers v23.5.26+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/cpuid/v2 v2.2.3 h1:sxCkb+qR91z4vsqw4vGGZlDgPz3G7gjaLyK3V8y70BU= +github.com/klauspost/cpuid/v2 v2.2.3/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/xxchan/arrow/go/v13 v13.0.0-20230713134335-45002b4934f9 h1:gsKJOVHt6vUME28wICV67tIIGAxpIsVJN3Ip3IuxNcU= +github.com/xxchan/arrow/go/v13 v13.0.0-20230713134335-45002b4934f9/go.mod h1:t8LhfrObSFaJZGpfHtA6BSRnP0sl0yaK/rDHBEdvWBU= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= +github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= +golang.org/x/exp v0.0.0-20230206171751-46f607a40771 h1:xP7rWLUr1e1n2xkK5YB4LI0hPEy3LJC6Wk+D4pGlOJg= +golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= +golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= +gonum.org/v1/gonum v0.12.0 h1:xKuo6hzt+gMav00meVPUlXwSdoEJP46BR+wdxQEFK2o= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/src/udf/wit_example/tinygo/my_udf.go b/src/udf/wit_example/tinygo/my_udf.go new file mode 100644 index 0000000000000..4fc14a463d6c6 --- /dev/null +++ b/src/udf/wit_example/tinygo/my_udf.go @@ -0,0 +1,89 @@ +package main + +import ( + "bytes" + + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/array" + "github.com/apache/arrow/go/v13/arrow/ipc" + "github.com/apache/arrow/go/v13/arrow/memory" + gen "github.com/my_account/my_udf/gen" +) + +type MyUdf struct { +} + +func init() { + a := MyUdf{} + gen.SetUdf(a) +} + +func (u MyUdf) InputSchema() gen.RisingwaveUdfTypesSchema { + return gen.RisingwaveUdfTypesSchema{} +} +func (u MyUdf) OutputSchema() gen.RisingwaveUdfTypesSchema { + return gen.RisingwaveUdfTypesSchema{} +} +func (u MyUdf) Eval(batch []uint8) gen.Result[[]uint8, gen.RisingwaveUdfTypesEvalErrno] { + reader, err := ipc.NewReader(bytes.NewReader(batch)) + if err != nil { + panic(err) + } + builder := array.NewBooleanBuilder(memory.NewGoAllocator()) + for reader.Next() { + rec := reader.Record() + col := rec.Column(0).(*array.Int64).Int64Values() + + for i := 0; i < int(rec.NumRows()); i++ { + builder.Append(col[i] > 0) + } + } + arr := builder.NewArray() + schema := arrow.NewSchema( + []arrow.Field{ + {Name: "result", Type: &arrow.BooleanType{}}, + }, + nil, + ) + record := array.NewRecord(schema, []arrow.Array{arr}, int64(arr.Len())) + + buffer := new(bytes.Buffer) + writer := ipc.NewWriter(buffer, ipc.WithSchema(record.Schema())) + writer.Write(record) + + return gen.Result[[]uint8, gen.RisingwaveUdfTypesEvalErrno]{ + Kind: gen.Ok, + Val: buffer.Bytes(), + Err: gen.RisingwaveUdfTypesEvalErrno{}, + } +} + +//go:generate wit-bindgen tiny-go ../../wit --out-dir=gen +func main() { + // Just for testing + + builder := array.NewInt64Builder(memory.NewGoAllocator()) + builder.Append(-1) + builder.Append(0) + builder.Append(1) + arr := builder.NewArray() + record := array.NewRecord( + arrow.NewSchema( + []arrow.Field{ + {Name: "input", Type: &arrow.Int64Type{}}, + }, + nil, + ), + []arrow.Array{arr}, + int64(arr.Len()), + ) + + buffer := new(bytes.Buffer) + writer := ipc.NewWriter(buffer, ipc.WithSchema(record.Schema())) + writer.Write(record) + + udf := MyUdf{} + + result := udf.Eval(buffer.Bytes()) + println("result:", result.Kind) +} diff --git a/src/utils/pgwire/src/pg_protocol.rs b/src/utils/pgwire/src/pg_protocol.rs index ff705025a0d64..d2b6770211c09 100644 --- a/src/utils/pgwire/src/pg_protocol.rs +++ b/src/utils/pgwire/src/pg_protocol.rs @@ -52,7 +52,7 @@ static RW_QUERY_LOG_TRUNCATE_LEN: LazyLock = Ok(len) if len.parse::().is_ok() => len.parse::().unwrap(), _ => { if cfg!(debug_assertions) { - usize::MAX + 65536 } else { 1024 } diff --git a/src/utils/runtime/src/logger.rs b/src/utils/runtime/src/logger.rs index 916dd93d7a32b..a86c585c0a3fd 100644 --- a/src/utils/runtime/src/logger.rs +++ b/src/utils/runtime/src/logger.rs @@ -156,7 +156,6 @@ pub fn init_risingwave_logger(settings: LoggerSettings) { .with_target("aws_sdk_ec2", Level::INFO) .with_target("aws_sdk_s3", Level::INFO) .with_target("aws_config", Level::WARN) - // Only enable WARN and ERROR for 3rd-party crates .with_target("aws_endpoint", Level::WARN) .with_target("aws_credential_types::cache::lazy_caching", Level::WARN) .with_target("hyper", Level::WARN) @@ -166,7 +165,9 @@ pub fn init_risingwave_logger(settings: LoggerSettings) { .with_target("isahc", Level::WARN) .with_target("console_subscriber", Level::WARN) .with_target("reqwest", Level::WARN) - .with_target("sled", Level::INFO); + .with_target("sled", Level::INFO) + .with_target("cranelift", Level::INFO) + .with_target("wasmtime", Level::INFO); // For all other crates, apply default level depending on the deployment and `debug_assertions` flag. let default_level = match deployment { diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 67b218c787652..d21f92780d053 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -34,6 +34,7 @@ chrono = { version = "0.4", features = ["serde"] } clap = { version = "4", features = ["cargo", "derive", "env"] } clap_builder = { version = "4", default-features = false, features = ["cargo", "color", "env", "help", "std", "suggestions", "usage"] } combine = { version = "4", features = ["tokio"] } +crc32fast = { version = "1" } crossbeam-epoch = { version = "0.9" } crossbeam-queue = { version = "0.3" } crossbeam-utils = { version = "0.8" } @@ -41,6 +42,7 @@ deranged = { version = "0.3", default-features = false, features = ["powerfmt", digest = { version = "0.10", features = ["mac", "oid", "std"] } either = { version = "1", features = ["serde"] } fail = { version = "0.5", default-features = false, features = ["failpoints"] } +fallible-iterator = { version = "0.2" } fixedbitset = { version = "0.4" } flate2 = { version = "1", features = ["zlib"] } frunk_core = { version = "0.4", default-features = false, features = ["std"] } @@ -52,9 +54,10 @@ futures-sink = { version = "0.3" } futures-task = { version = "0.3" } futures-util = { version = "0.3", features = ["channel", "io", "sink"] } hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", features = ["nightly", "raw"] } +hashbrown-594e8ee84c453af0 = { package = "hashbrown", version = "0.13", features = ["raw"] } hashbrown-5ef9efb8ec2df382 = { package = "hashbrown", version = "0.12", features = ["nightly", "raw"] } hyper = { version = "0.14", features = ["full"] } -indexmap = { version = "1", default-features = false, features = ["serde", "std"] } +indexmap = { version = "1", default-features = false, features = ["serde-1", "std"] } itertools-93f6ce9d446188ac = { package = "itertools", version = "0.10" } itertools-a6292c17cd707f01 = { package = "itertools", version = "0.11" } jni = { version = "0.21", features = ["invocation"] } @@ -100,6 +103,8 @@ regex-syntax = { version = "0.8" } reqwest = { version = "0.11", features = ["blocking", "json", "rustls-tls"] } ring = { version = "0.16", features = ["std"] } rust_decimal = { version = "1", features = ["db-postgres", "maths"] } +rustc-hash = { version = "1" } +rustix = { version = "0.38", features = ["fs"] } rustls = { version = "0.21" } scopeguard = { version = "1" } sea-orm = { version = "0.12", features = ["runtime-tokio-native-tls", "sqlx-mysql", "sqlx-postgres", "sqlx-sqlite"] } @@ -117,6 +122,7 @@ sqlx-core = { version = "0.7", features = ["_rt-tokio", "_tls-native-tls", "bigd sqlx-mysql = { version = "0.7", default-features = false, features = ["bigdecimal", "chrono", "json", "rust_decimal", "time", "uuid"] } sqlx-postgres = { version = "0.7", default-features = false, features = ["bigdecimal", "chrono", "json", "rust_decimal", "time", "uuid"] } sqlx-sqlite = { version = "0.7", default-features = false, features = ["chrono", "json", "time", "uuid"] } +stable_deref_trait = { version = "1" } strum = { version = "0.25", features = ["derive"] } subtle = { version = "2" } time = { version = "0.3", features = ["local-offset", "macros", "serde-well-known"] } @@ -152,6 +158,8 @@ either = { version = "1", features = ["serde"] } fixedbitset = { version = "0.4" } frunk_core = { version = "0.4", default-features = false, features = ["std"] } hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", features = ["nightly", "raw"] } +hashbrown-5ef9efb8ec2df382 = { package = "hashbrown", version = "0.12", features = ["nightly", "raw"] } +indexmap = { version = "1", default-features = false, features = ["serde-1", "std"] } itertools-93f6ce9d446188ac = { package = "itertools", version = "0.10" } itertools-a6292c17cd707f01 = { package = "itertools", version = "0.11" } lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] } @@ -174,13 +182,19 @@ rand_core = { version = "0.6", default-features = false, features = ["std"] } regex = { version = "1" } regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] } regex-syntax = { version = "0.8" } +rustc-hash = { version = "1" } +rustix = { version = "0.38", features = ["fs"] } serde = { version = "1", features = ["alloc", "derive", "rc"] } serde_json = { version = "1", features = ["alloc", "raw_value"] } syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-traits", "full", "visit", "visit-mut"] } syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "fold", "full", "visit", "visit-mut"] } time = { version = "0.3", features = ["local-offset", "macros", "serde-well-known"] } time-macros = { version = "0.2", default-features = false, features = ["formatting", "parsing", "serde"] } +tinyvec = { version = "1", features = ["alloc", "grab_spare_slice", "rustc_1_55"] } toml_datetime = { version = "0.6", default-features = false, features = ["serde"] } toml_edit = { version = "0.19", features = ["serde"] } +unicode-bidi = { version = "0.3" } +unicode-normalization = { version = "0.1" } +url = { version = "2", features = ["serde"] } ### END HAKARI SECTION