From 6d55b5cf93b7e78857b873632e0750f91c4f7496 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Wed, 10 Jan 2024 10:30:56 +0800 Subject: [PATCH] add external/iceberg and construct_struct expression --- Cargo.lock | 292 ++++++-------- Cargo.toml | 2 +- proto/expr.proto | 10 + src/expr/core/Cargo.toml | 1 + src/expr/core/src/expr/build.rs | 16 +- src/expr/core/src/expr/expr_some_all.rs | 10 +- src/expr/core/src/expr/external/iceberg.rs | 399 +++++++++++++++++++ src/expr/core/src/expr/external/mod.rs | 15 + src/expr/core/src/expr/mod.rs | 1 + src/expr/impl/src/scalar/construct_struct.rs | 109 +++++ src/expr/impl/src/scalar/mod.rs | 1 + src/frontend/src/expr/pure.rs | 9 +- src/workspace-hack/Cargo.toml | 8 +- 13 files changed, 702 insertions(+), 171 deletions(-) create mode 100644 src/expr/core/src/expr/external/iceberg.rs create mode 100644 src/expr/core/src/expr/external/mod.rs create mode 100644 src/expr/impl/src/scalar/construct_struct.rs diff --git a/Cargo.lock b/Cargo.lock index 4368e3b689ded..15047f3fb52c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -45,7 +45,7 @@ dependencies = [ "getrandom", "once_cell", "version_check", - "zerocopy 0.7.31", + "zerocopy", ] [[package]] @@ -164,40 +164,40 @@ dependencies = [ [[package]] name = "apache-avro" -version = "0.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c0fdddc3fdac97394ffcc5c89c634faa9c1c166ced54189af34e407c97b6ee7" +version = "0.16.0" +source = "git+https://github.com/risingwavelabs/avro?rev=d0846a16ce813a225af04ade35b3b8117b137a29#d0846a16ce813a225af04ade35b3b8117b137a29" dependencies = [ - "apache-avro-derive", - "byteorder", + "bzip2", + "crc32fast", "digest", "lazy_static", - "libflate 1.4.0", + "libflate", "log", "num-bigint", "quad-rand", "rand", - "regex", + "regex-lite", "serde", "serde_json", + "snap", "strum", "strum_macros", "thiserror", - "typed-builder 0.14.0", + "typed-builder 0.16.2", "uuid", - "zerocopy 0.6.6", + "xz2", + "zstd 0.12.4", ] [[package]] name = "apache-avro" -version = "0.16.0" -source = "git+https://github.com/risingwavelabs/avro?rev=d0846a16ce813a225af04ade35b3b8117b137a29#d0846a16ce813a225af04ade35b3b8117b137a29" +version = "0.17.0" +source = "git+https://github.com/icelake-io/avro.git?rev=4866a4ad0ed5d6af7160c9b52af898ab6d0551f9#4866a4ad0ed5d6af7160c9b52af898ab6d0551f9" dependencies = [ - "bzip2", - "crc32fast", + "apache-avro-derive", + "bigdecimal 0.4.2", "digest", - "lazy_static", - "libflate 2.0.0", + "libflate", "log", "num-bigint", "quad-rand", @@ -205,27 +205,23 @@ dependencies = [ "regex-lite", "serde", "serde_json", - "snap", "strum", "strum_macros", "thiserror", - "typed-builder 0.16.2", + "typed-builder 0.18.0", "uuid", - "xz2", - "zstd 0.12.4", ] [[package]] name = "apache-avro-derive" -version = "0.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6686cd705badba064ec2322b9c3d72f5c70db8394e486bbb56e84fbdb3fa158c" +version = "0.17.0" +source = "git+https://github.com/icelake-io/avro.git?rev=4866a4ad0ed5d6af7160c9b52af898ab6d0551f9#4866a4ad0ed5d6af7160c9b52af898ab6d0551f9" dependencies = [ "darling 0.20.3", "proc-macro2", "quote", "serde_json", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -778,7 +774,7 @@ checksum = "5fd55a5ba1179988837d24ab4c7cc8ed6efdeff578ede0416b4225a5fca35bd0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -827,7 +823,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -844,7 +840,7 @@ checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -877,7 +873,7 @@ dependencies = [ "derive_utils", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -1651,7 +1647,7 @@ dependencies = [ "proc-macro-crate 2.0.0", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", "syn_derive", ] @@ -1865,7 +1861,7 @@ checksum = "bc7cb2538d4ecc42b6c3b57a83094d8c69894e74468d18cd045a09fdea807358" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -1982,7 +1978,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -2299,7 +2295,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -2513,7 +2509,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30d2b3721e861707777e3195b0158f950ae6dc4a27e4d02ff9f67e3eb3de199e" dependencies = [ "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -2541,7 +2537,7 @@ checksum = "83fdaf97f4804dcebfa5862639bc9ce4121e82140bec2a987ac5140294865b5b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -2568,7 +2564,7 @@ dependencies = [ "proc-macro2", "quote", "scratch", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -2585,7 +2581,7 @@ checksum = "2fa16a70dd58129e4dfffdff535fb1bce66673f7bbeec4a5a1765a504e1ccd84" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -2657,7 +2653,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -2690,7 +2686,7 @@ checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" dependencies = [ "darling_core 0.20.3", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -3130,7 +3126,7 @@ checksum = "9abcad25e9720609ccb3dcdb795d845e37d8ce34183330a9f48b03a1a71c8e21" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -3318,7 +3314,7 @@ dependencies = [ "enum-ordinalize 4.3.0", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -3395,7 +3391,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -3435,7 +3431,7 @@ checksum = "eecf8589574ce9b895052fa12d69af7a233f99e6107f5cb8dd1044f2a17bfdcb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -3448,7 +3444,7 @@ dependencies = [ "num-traits", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -3468,7 +3464,7 @@ checksum = "0d28318a75d4aead5c4db25382e8ef717932d0346600cacae6357eb5941bc5ff" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -3488,7 +3484,7 @@ checksum = "f95e2801cd355d4a1a3e3953ce6ee5ae9603a5c833455343a8bfe3f44d418246" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -3949,7 +3945,7 @@ checksum = "b0fa992f1656e1707946bbba340ad244f0814009ef8c0118eb7b658395f19a2e" dependencies = [ "frunk_proc_macro_helpers", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -3961,7 +3957,7 @@ dependencies = [ "frunk_core", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -3973,7 +3969,7 @@ dependencies = [ "frunk_core", "frunk_proc_macro_helpers", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -4050,7 +4046,7 @@ checksum = "5df2c13d48c8cb8a3ec093ede6f0f4482f327d7bb781120c5fb483ef0f17e758" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -4120,7 +4116,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -4707,10 +4703,10 @@ dependencies = [ [[package]] name = "icelake" version = "0.0.10" -source = "git+https://github.com/icelake-io/icelake?rev=3f7b53ba5b563524212c25810345d1314678e7fc#3f7b53ba5b563524212c25810345d1314678e7fc" +source = "git+https://github.com/icelake-io/icelake?rev=cc27aa20d34b1d252bd7b8aaadee64c34140e687#cc27aa20d34b1d252bd7b8aaadee64c34140e687" dependencies = [ "anyhow", - "apache-avro 0.15.0", + "apache-avro 0.17.0", "arrow-arith 49.0.0", "arrow-array 49.0.0", "arrow-buffer 49.0.0", @@ -4729,6 +4725,7 @@ dependencies = [ "faster-hex", "futures", "itertools 0.11.0", + "lazy_static", "log", "murmur3", "once_cell", @@ -4839,7 +4836,7 @@ checksum = "ce243b1bfa62ffc028f1cc3b6034ec63d649f3031bc8a4fbbb004e1ac17d1f68" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -5154,17 +5151,6 @@ version = "0.2.150" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" -[[package]] -name = "libflate" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ff4ae71b685bbad2f2f391fe74f6b7659a34871c08b210fdc039e43bee07d18" -dependencies = [ - "adler32", - "crc32fast", - "libflate_lz77 1.2.0", -] - [[package]] name = "libflate" version = "2.0.0" @@ -5175,16 +5161,7 @@ dependencies = [ "core2", "crc32fast", "dary_heap", - "libflate_lz77 2.0.0", -] - -[[package]] -name = "libflate_lz77" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a52d3a8bfc85f250440e4424db7d857e241a3aebbbe301f3eb606ab15c39acbf" -dependencies = [ - "rle-decode-fast", + "libflate_lz77", ] [[package]] @@ -5540,7 +5517,7 @@ dependencies = [ "proc-macro2", "prost-build 0.12.1", "quote", - "syn 2.0.37", + "syn 2.0.48", "tonic-build", ] @@ -5762,7 +5739,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", "termcolor", "thiserror", ] @@ -6288,7 +6265,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -6487,7 +6464,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -6696,7 +6673,7 @@ dependencies = [ "regex", "regex-syntax 0.7.5", "structmeta", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -6895,7 +6872,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -7026,7 +7003,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -7174,7 +7151,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae005bd773ab59b4725093fd7df83fd7892f7d8eafb48dbd7de6e024e4215f9d" dependencies = [ "proc-macro2", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -7237,9 +7214,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" [[package]] name = "proc-macro2" -version = "1.0.67" +version = "1.0.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d433d9f1a3e8c1263d9456598b16fec66f4acc9a74dacffd35c7bb09b3a1328" +checksum = "95fc56cda0b5c3325f5fbbd7ff9fda9e02bb00bb3dac51252d2f1bfa1cb8cc8c" dependencies = [ "unicode-ident", ] @@ -7361,7 +7338,7 @@ checksum = "8bdf592881d821b83d471f8af290226c8d51402259e9bb5be7f9f8bdebbb11ac" dependencies = [ "bytes", "heck 0.4.1", - "itertools 0.11.0", + "itertools 0.10.5", "log", "multimap 0.8.3", "once_cell", @@ -7370,7 +7347,7 @@ dependencies = [ "prost 0.12.1", "prost-types 0.12.1", "regex", - "syn 2.0.37", + "syn 2.0.48", "tempfile", "which", ] @@ -7395,10 +7372,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "265baba7fabd416cf5078179f7d2cbeca4ce7a9041111900675ea7c4cb8a4c32" dependencies = [ "anyhow", - "itertools 0.11.0", + "itertools 0.10.5", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -7407,7 +7384,7 @@ version = "0.1.0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -7593,9 +7570,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.33" +version = "1.0.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" +checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" dependencies = [ "proc-macro2", ] @@ -7764,7 +7741,7 @@ checksum = "7f7473c2cfcf90008193dd0e3e16599455cb601a9fce322b5bb55de799664925" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -8025,7 +8002,7 @@ dependencies = [ "prettyplease 0.2.15", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -8616,6 +8593,7 @@ dependencies = [ "expect-test", "futures-async-stream", "futures-util", + "icelake", "itertools 0.12.0", "madsim-tokio", "num-traits", @@ -8677,7 +8655,7 @@ dependencies = [ "itertools 0.12.0", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -9584,7 +9562,7 @@ dependencies = [ "proc-macro2", "quote", "rust-embed-utils", - "syn 2.0.37", + "syn 2.0.48", "walkdir", ] @@ -9841,7 +9819,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -9899,7 +9877,7 @@ dependencies = [ "proc-macro2", "quote", "sea-bae", - "syn 2.0.37", + "syn 2.0.48", "unicode-ident", ] @@ -10082,9 +10060,9 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.188" +version = "1.0.195" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf9e0fcba69a370eed61bcf2b728575f726b50b55cba78064753d708ddc7549e" +checksum = "63261df402c67811e9ac6def069e4786148c4563f4b50fd4bf30aa370d626b02" dependencies = [ "serde_derive", ] @@ -10131,13 +10109,13 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.188" +version = "1.0.195" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" +checksum = "46fe8f8603d81ba86327b23a2e9cdf49e1255fb94a4c5f297f6ee0547178ea2c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -10153,9 +10131,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.107" +version = "1.0.111" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65" +checksum = "176e46fa42316f18edd598015a5166857fc835ec732f5215eac6b7bdbf0a84f4" dependencies = [ "itoa", "ryu", @@ -10198,7 +10176,7 @@ checksum = "8725e1dfadb3a50f7e5ce0b1a540466f6ed3fe7a0fca2ac2b8b831d31316bd00" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -10248,7 +10226,7 @@ dependencies = [ "darling 0.20.3", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -10286,7 +10264,7 @@ checksum = "b93fb4adc70021ac1b47f7d45e8cc4169baaa7ea58483bc5b721d19a26202212" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -10941,7 +10919,7 @@ dependencies = [ "proc-macro2", "quote", "structmeta-derive", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -10952,7 +10930,7 @@ checksum = "a60bcaff7397072dca0017d1db428e30d5002e00b6847703e2e42005c95fbe00" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -10966,15 +10944,15 @@ dependencies = [ [[package]] name = "strum_macros" -version = "0.25.2" +version = "0.25.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad8d03b598d3d0fff69bf533ee3ef19b8eeb342729596df84bcc7e1f96ec4059" +checksum = "23dc1fa9ac9c169a78ba62f0b841814b7abae11bdd047b9c58f893439e309ea0" dependencies = [ "heck 0.4.1", "proc-macro2", "quote", "rustversion", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -11039,9 +11017,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.37" +version = "2.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7303ef2c05cd654186cb250d29049a24840ca25d2747c25c0381c8d9e2f582e8" +checksum = "0f3531638e407dfc0814761abb7c00a5b54992b849452a0646b7f65c9f770f3f" dependencies = [ "proc-macro2", "quote", @@ -11057,7 +11035,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -11140,9 +11118,9 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" [[package]] name = "thiserror" -version = "1.0.48" +version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d6d7a740b8a666a7e828dd00da9c0dc290dff53154ea77ac109281de90589b7" +checksum = "d54378c645627613241d077a3a79db965db602882668f9136ac42af9ecb730ad" dependencies = [ "thiserror-impl", ] @@ -11166,18 +11144,18 @@ dependencies = [ "either", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] name = "thiserror-impl" -version = "1.0.48" +version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35" +checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -11352,7 +11330,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -11587,7 +11565,7 @@ dependencies = [ "proc-macro2", "prost-build 0.12.1", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -11668,7 +11646,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -11807,22 +11785,20 @@ dependencies = [ [[package]] name = "typed-builder" -version = "0.14.0" +version = "0.16.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64cba322cb9b7bc6ca048de49e83918223f35e7a86311267013afff257004870" +checksum = "34085c17941e36627a879208083e25d357243812c30e7d7387c3b954f30ade16" dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", + "typed-builder-macro 0.16.2", ] [[package]] name = "typed-builder" -version = "0.16.2" +version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34085c17941e36627a879208083e25d357243812c30e7d7387c3b954f30ade16" +checksum = "e47c0496149861b7c95198088cbf36645016b1a0734cf350c50e2a38e070f38a" dependencies = [ - "typed-builder-macro", + "typed-builder-macro 0.18.0", ] [[package]] @@ -11833,7 +11809,18 @@ checksum = "f03ca4cb38206e2bef0700092660bb74d696f808514dae47fa1467cbfe26e96e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", +] + +[[package]] +name = "typed-builder-macro" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "982ee4197351b5c9782847ef5ec1fdcaf50503fb19d68f9771adae314e72b492" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", ] [[package]] @@ -12072,7 +12059,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", "wasm-bindgen-shared", ] @@ -12106,7 +12093,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -12450,7 +12437,7 @@ version = "1.5.0-alpha" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] @@ -12481,6 +12468,7 @@ dependencies = [ "aws-smithy-types", "axum", "base64 0.21.4", + "bigdecimal 0.4.2", "bit-vec", "bitflags 2.4.0", "byteorder", @@ -12515,7 +12503,7 @@ dependencies = [ "hmac", "hyper", "indexmap 1.9.3", - "itertools 0.11.0", + "itertools 0.10.5", "jni", "lazy_static", "lexical-core", @@ -12578,7 +12566,7 @@ dependencies = [ "strum", "subtle", "syn 1.0.109", - "syn 2.0.37", + "syn 2.0.48", "time", "time-macros", "tinyvec", @@ -12599,6 +12587,9 @@ dependencies = [ "uuid", "whoami", "zeroize", + "zstd 0.13.0", + "zstd-safe 7.0.0", + "zstd-sys", ] [[package]] @@ -12689,34 +12680,13 @@ version = "3.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a599daf1b507819c1121f0bf87fa37eb19daac6aff3aefefd4e6e2e0f2020fc" -[[package]] -name = "zerocopy" -version = "0.6.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "854e949ac82d619ee9a14c66a1b674ac730422372ccb759ce0c39cabcf2bf8e6" -dependencies = [ - "byteorder", - "zerocopy-derive 0.6.6", -] - [[package]] name = "zerocopy" version = "0.7.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c4061bedbb353041c12f413700357bec76df2c7e2ca8e4df8bac24c6bf68e3d" dependencies = [ - "zerocopy-derive 0.7.31", -] - -[[package]] -name = "zerocopy-derive" -version = "0.6.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "125139de3f6b9d625c39e2efdd73d41bdac468ccd556556440e322be0e1bbd91" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.37", + "zerocopy-derive", ] [[package]] @@ -12727,7 +12697,7 @@ checksum = "b3c129550b3e6de3fd0ba67ba5c81818f9805e58b8d7fee80a3a59d2c9fc601a" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.48", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index b17c2e605f110..0c0fc44b3a857 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,7 +120,7 @@ tonic = { package = "madsim-tonic", version = "0.4.1" } tonic-build = { package = "madsim-tonic-build", version = "0.4.2" } otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = "58c1f003484449d7c6dd693b348bf19dd44889cb" } prost = { version = "0.12" } -icelake = { git = "https://github.com/icelake-io/icelake", rev = "3f7b53ba5b563524212c25810345d1314678e7fc", features = [ +icelake = { git = "https://github.com/icelake-io/icelake", rev = "cc27aa20d34b1d252bd7b8aaadee64c34140e687", features = [ "prometheus", ] } arrow-array = "49" diff --git a/proto/expr.proto b/proto/expr.proto index 31835ac905705..527d245442ea8 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -259,6 +259,8 @@ message ExprNode { JSONB_PATH_QUERY_ARRAY = 622; JSONB_PATH_QUERY_FIRST = 623; + CONSTRUCT_STRUCT = 624; + // Non-pure functions below (> 1000) // ------------------------ // Internal functions @@ -275,6 +277,14 @@ message ExprNode { PG_GET_INDEXDEF = 2400; COL_DESCRIPTION = 2401; PG_GET_VIEWDEF = 2402; + + // EXTERNAL + ICEBERG_BUCKET = 2201; + ICEBERG_TRUNCATE = 2202; + ICEBERG_YEAR = 2203; + ICEBERG_MONTH = 2204; + ICEBERG_DAY = 2205; + ICEBERG_HOUR = 2206; } Type function_type = 1; data.DataType return_type = 3; diff --git a/src/expr/core/Cargo.toml b/src/expr/core/Cargo.toml index 1b5fcb7287ebf..a260d252b4b97 100644 --- a/src/expr/core/Cargo.toml +++ b/src/expr/core/Cargo.toml @@ -34,6 +34,7 @@ either = "1" enum-as-inner = "0.6" futures-async-stream = { workspace = true } futures-util = "0.3" +icelake = { workspace = true } itertools = "0.12" num-traits = "0.2" parse-display = "0.8" diff --git a/src/expr/core/src/expr/build.rs b/src/expr/core/src/expr/build.rs index 16319a01bbb5f..e3f6c108dfa3b 100644 --- a/src/expr/core/src/expr/build.rs +++ b/src/expr/core/src/expr/build.rs @@ -27,7 +27,7 @@ use super::wrapper::non_strict::NonStrict; use super::wrapper::EvalErrorReport; use super::NonStrictExpression; use crate::expr::{ - BoxedExpression, Expression, ExpressionBoxExt, InputRefExpression, LiteralExpression, + external, BoxedExpression, Expression, ExpressionBoxExt, InputRefExpression, LiteralExpression, }; use crate::sig::FUNCTION_REGISTRY; use crate::{bail, ExprError, Result}; @@ -110,6 +110,16 @@ where // Dedicated types E::All | E::Some => SomeAllExpression::build_boxed(prost, build_child), + // Iceberg partition transform functions + // # TODO + // Move to general types + E::IcebergBucket => external::iceberg::Bucket::build_boxed(prost, build_child), + E::IcebergTruncate => external::iceberg::Truncate::build_boxed(prost, build_child), + E::IcebergYear => external::iceberg::Year::build_boxed(prost, build_child), + E::IcebergMonth => external::iceberg::Month::build_boxed(prost, build_child), + E::IcebergDay => external::iceberg::Day::build_boxed(prost, build_child), + E::IcebergHour => external::iceberg::Hour::build_boxed(prost, build_child), + // General types, lookup in the function signature map _ => FuncCallBuilder::build_boxed(prost, build_child), }, @@ -216,7 +226,9 @@ pub fn build_func_non_strict( Ok(wrapped) } -pub(super) fn get_children_and_return_type(prost: &ExprNode) -> Result<(&[ExprNode], DataType)> { +pub fn get_children_and_return_type_for_func_call( + prost: &ExprNode, +) -> Result<(&[ExprNode], DataType)> { let ret_type = DataType::from(prost.get_return_type().unwrap()); if let RexNode::FuncCall(func_call) = prost.get_rex_node().unwrap() { Ok((func_call.get_children(), ret_type)) diff --git a/src/expr/core/src/expr/expr_some_all.rs b/src/expr/core/src/expr/expr_some_all.rs index 9250aa6d877cf..cb408211b7a3c 100644 --- a/src/expr/core/src/expr/expr_some_all.rs +++ b/src/expr/core/src/expr/expr_some_all.rs @@ -22,7 +22,7 @@ use risingwave_common::{bail, ensure}; use risingwave_pb::expr::expr_node::{RexNode, Type}; use risingwave_pb::expr::{ExprNode, FunctionCall}; -use super::build::get_children_and_return_type; +use super::build::get_children_and_return_type_for_func_call; use super::{BoxedExpression, Build, Expression}; use crate::Result; @@ -211,17 +211,19 @@ impl Build for SomeAllExpression { build_child: impl Fn(&ExprNode) -> Result, ) -> Result { let outer_expr_type = prost.get_function_type().unwrap(); - let (outer_children, outer_return_type) = get_children_and_return_type(prost)?; + let (outer_children, outer_return_type) = + get_children_and_return_type_for_func_call(prost)?; ensure!(matches!(outer_return_type, DataType::Boolean)); let mut inner_expr_type = outer_children[0].get_function_type().unwrap(); let (mut inner_children, mut inner_return_type) = - get_children_and_return_type(&outer_children[0])?; + get_children_and_return_type_for_func_call(&outer_children[0])?; let mut stack = vec![]; while inner_children.len() != 2 { stack.push((inner_expr_type, inner_return_type)); inner_expr_type = inner_children[0].get_function_type().unwrap(); - (inner_children, inner_return_type) = get_children_and_return_type(&inner_children[0])?; + (inner_children, inner_return_type) = + get_children_and_return_type_for_func_call(&inner_children[0])?; } let left_expr = build_child(&inner_children[0])?; diff --git a/src/expr/core/src/expr/external/iceberg.rs b/src/expr/core/src/expr/external/iceberg.rs new file mode 100644 index 0000000000000..bfd0ec1494653 --- /dev/null +++ b/src/expr/core/src/expr/external/iceberg.rs @@ -0,0 +1,399 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; + +use icelake::types::{ + Bucket as BucketTransform, Day as DayTransform, Hour as HourTransform, Month as MonthTransform, + TransformFunction, Truncate as TruncateTransform, Year as YearTransform, +}; +use risingwave_common::array::{ArrayRef, DataChunk}; +use risingwave_common::ensure; +use risingwave_common::row::OwnedRow; +use risingwave_common::types::{DataType, Datum}; +use risingwave_expr::expr::{get_children_and_return_type_for_func_call, BoxedExpression, Build}; +use risingwave_expr::Result; +use risingwave_pb::expr::ExprNode; + +/// This module contains the iceberg expression for computing the partition value. +/// spec ref: + +pub struct Bucket { + child: BoxedExpression, + n: i32, + transform: BucketTransform, +} + +impl Debug for Bucket { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "Iceberg_Bucket({})", self.n) + } +} + +#[async_trait::async_trait] +impl risingwave_expr::expr::Expression for Bucket { + fn return_type(&self) -> DataType { + DataType::Int32 + } + + async fn eval(&self, data_chunk: &DataChunk) -> Result { + // Get the child array + let array = self.child.eval(data_chunk).await?; + // Convert to arrow array + let arrow_array = array.as_ref().try_into().unwrap(); + // Transform + let res_array = self.transform.transform(arrow_array).unwrap(); + // Convert back to array ref and return it + Ok(Arc::new((&res_array).try_into().unwrap())) + } + + async fn eval_row(&self, _row: &OwnedRow) -> Result { + unimplemented!() + } +} + +impl Build for Bucket { + fn build( + prost: &ExprNode, + build_child: impl Fn(&ExprNode) -> Result, + ) -> Result { + let (children, res_type) = get_children_and_return_type_for_func_call(prost)?; + + // Check expression + ensure!(children.len() == 2); + ensure!(matches!( + children[0].get_return_type().unwrap().into(), + DataType::Int32 + | DataType::Int64 + | DataType::Decimal + | DataType::Date + | DataType::Time + | DataType::Timestamp + | DataType::Timestamptz + | DataType::Varchar + | DataType::Bytea + )); + ensure!(DataType::Int32 == children[1].get_return_type().unwrap().into()); + ensure!(res_type == DataType::Int32); + + // Get the second child as const param + let literal = build_child(&children[1])?; + let n = *literal.eval_const()?.unwrap().as_int32(); + + // Build the child + let child = build_child(&children[0])?; + Ok(Bucket { + child, + n, + transform: BucketTransform::new(n), + }) + } +} + +pub struct Truncate { + child: BoxedExpression, + w: i32, + transform: TruncateTransform, +} + +impl Debug for Truncate { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "Iceberg_Truncate({})", self.w) + } +} + +#[async_trait::async_trait] +impl risingwave_expr::expr::Expression for Truncate { + fn return_type(&self) -> DataType { + self.child.return_type() + } + + async fn eval(&self, data_chunk: &DataChunk) -> Result { + // Get the child array + let array = self.child.eval(data_chunk).await?; + // Convert to arrow array + let arrow_array = array.as_ref().try_into().unwrap(); + // Transform + let res_array = self.transform.transform(arrow_array).unwrap(); + // Convert back to array ref and return it + Ok(Arc::new((&res_array).try_into().unwrap())) + } + + async fn eval_row(&self, _row: &OwnedRow) -> Result { + unimplemented!() + } +} + +impl Build for Truncate { + fn build( + prost: &ExprNode, + build_child: impl Fn(&ExprNode) -> Result, + ) -> Result { + let (children, res_type) = get_children_and_return_type_for_func_call(prost)?; + + // Check expression + ensure!(children.len() == 2); + ensure!(matches!( + children[0].get_return_type().unwrap().into(), + DataType::Int32 | DataType::Int64 | DataType::Decimal | DataType::Varchar + )); + ensure!(DataType::Int32 == children[1].get_return_type().unwrap().into()); + ensure!(res_type == children[0].get_return_type().unwrap().into()); + + // Get the second child as const param + let literal = build_child(&children[1])?; + let w = *literal.eval_const()?.unwrap().as_int32(); + + // Build the child + let child = build_child(&children[0])?; + Ok(Truncate { + child, + w, + transform: TruncateTransform::new(w), + }) + } +} + +// Year +pub struct Year { + child: BoxedExpression, + transform: YearTransform, +} + +impl Debug for Year { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "Iceberg_Year") + } +} + +#[async_trait::async_trait] +impl risingwave_expr::expr::Expression for Year { + fn return_type(&self) -> DataType { + DataType::Int32 + } + + async fn eval(&self, data_chunk: &DataChunk) -> Result { + // Get the child array + let array = self.child.eval(data_chunk).await?; + // Convert to arrow array + let arrow_array = array.as_ref().try_into().unwrap(); + // Transform + let res_array = self.transform.transform(arrow_array).unwrap(); + // Convert back to array ref and return it + Ok(Arc::new((&res_array).try_into().unwrap())) + } + + async fn eval_row(&self, _row: &OwnedRow) -> Result { + unimplemented!() + } +} + +impl Build for Year { + fn build( + prost: &ExprNode, + build_child: impl Fn(&ExprNode) -> Result, + ) -> Result { + let (children, res_type) = get_children_and_return_type_for_func_call(prost)?; + + // Check expression + ensure!(children.len() == 1); + ensure!(matches!( + children[0].get_return_type().unwrap().into(), + DataType::Date | DataType::Timestamp | DataType::Timestamptz + )); + ensure!(res_type == DataType::Int32); + + // Build the child + let child = build_child(&children[0])?; + Ok(Year { + child, + transform: YearTransform {}, + }) + } +} + +// Month +pub struct Month { + child: BoxedExpression, + transform: MonthTransform, +} + +impl Debug for Month { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "Iceberg_Month") + } +} + +#[async_trait::async_trait] +impl risingwave_expr::expr::Expression for Month { + fn return_type(&self) -> DataType { + DataType::Int32 + } + + async fn eval(&self, data_chunk: &DataChunk) -> Result { + // Get the child array + let array = self.child.eval(data_chunk).await?; + // Convert to arrow array + let arrow_array = array.as_ref().try_into().unwrap(); + // Transform + let res_array = self.transform.transform(arrow_array).unwrap(); + // Convert back to array ref and return it + Ok(Arc::new((&res_array).try_into().unwrap())) + } + + async fn eval_row(&self, _row: &OwnedRow) -> Result { + unimplemented!() + } +} + +impl Build for Month { + fn build( + prost: &ExprNode, + build_child: impl Fn(&ExprNode) -> Result, + ) -> Result { + let (children, res_type) = get_children_and_return_type_for_func_call(prost)?; + + // Check expression + ensure!(children.len() == 1); + ensure!(matches!( + children[0].get_return_type().unwrap().into(), + DataType::Date | DataType::Timestamp | DataType::Timestamptz + )); + ensure!(res_type == DataType::Int32); + + // Build the child + let child = build_child(&children[0])?; + Ok(Month { + child, + transform: MonthTransform {}, + }) + } +} + +// Day +pub struct Day { + child: BoxedExpression, + transform: DayTransform, +} + +impl Debug for Day { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "Iceberg_Day") + } +} + +#[async_trait::async_trait] +impl risingwave_expr::expr::Expression for Day { + fn return_type(&self) -> DataType { + DataType::Int32 + } + + async fn eval(&self, data_chunk: &DataChunk) -> Result { + // Get the child array + let array = self.child.eval(data_chunk).await?; + // Convert to arrow array + let arrow_array = array.as_ref().try_into().unwrap(); + // Transform + let res_array = self.transform.transform(arrow_array).unwrap(); + // Convert back to array ref and return it + Ok(Arc::new((&res_array).try_into().unwrap())) + } + + async fn eval_row(&self, _row: &OwnedRow) -> Result { + unimplemented!() + } +} + +impl Build for Day { + fn build( + prost: &ExprNode, + build_child: impl Fn(&ExprNode) -> Result, + ) -> Result { + let (children, res_type) = get_children_and_return_type_for_func_call(prost)?; + + // Check expression + ensure!(children.len() == 1); + ensure!(matches!( + children[0].get_return_type().unwrap().into(), + DataType::Date | DataType::Timestamp | DataType::Timestamptz + )); + ensure!(res_type == DataType::Int32); + + // Build the child + let child = build_child(&children[0])?; + Ok(Day { + child, + transform: DayTransform {}, + }) + } +} + +// Hour +pub struct Hour { + child: BoxedExpression, + transform: HourTransform, +} + +impl Debug for Hour { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "Iceberg_Hour") + } +} + +#[async_trait::async_trait] +impl risingwave_expr::expr::Expression for Hour { + fn return_type(&self) -> DataType { + DataType::Int32 + } + + async fn eval(&self, data_chunk: &DataChunk) -> Result { + // Get the child array + let array = self.child.eval(data_chunk).await?; + // Convert to arrow array + let arrow_array = array.as_ref().try_into().unwrap(); + // Transform + let res_array = self.transform.transform(arrow_array).unwrap(); + // Convert back to array ref and return it + Ok(Arc::new((&res_array).try_into().unwrap())) + } + + async fn eval_row(&self, _row: &OwnedRow) -> Result { + unimplemented!() + } +} + +impl Build for Hour { + fn build( + prost: &ExprNode, + build_child: impl Fn(&ExprNode) -> Result, + ) -> Result { + let (children, res_type) = get_children_and_return_type_for_func_call(prost)?; + + // Check expression + ensure!(children.len() == 1); + ensure!(matches!( + children[0].get_return_type().unwrap().into(), + DataType::Timestamp | DataType::Timestamptz + )); + ensure!(res_type == DataType::Int32); + + // Build the child + let child = build_child(&children[0])?; + Ok(Hour { + child, + transform: HourTransform {}, + }) + } +} diff --git a/src/expr/core/src/expr/external/mod.rs b/src/expr/core/src/expr/external/mod.rs new file mode 100644 index 0000000000000..a05cbd983f772 --- /dev/null +++ b/src/expr/core/src/expr/external/mod.rs @@ -0,0 +1,15 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod iceberg; diff --git a/src/expr/core/src/expr/mod.rs b/src/expr/core/src/expr/mod.rs index 951ef4bb99765..ad91640f5a0a5 100644 --- a/src/expr/core/src/expr/mod.rs +++ b/src/expr/core/src/expr/mod.rs @@ -40,6 +40,7 @@ pub(crate) mod expr_udf; pub(crate) mod wrapper; mod build; +mod external; pub mod test_utils; mod value; diff --git a/src/expr/impl/src/scalar/construct_struct.rs b/src/expr/impl/src/scalar/construct_struct.rs new file mode 100644 index 0000000000000..e347bfc19753f --- /dev/null +++ b/src/expr/impl/src/scalar/construct_struct.rs @@ -0,0 +1,109 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use risingwave_common::array::{ArrayImpl, ArrayRef, DataChunk, StructArray}; +use risingwave_common::row::OwnedRow; +use risingwave_common::types::{DataType, Datum, ScalarImpl, StructValue}; +use risingwave_common::util::iter_util::ZipEqFast; +use risingwave_expr::expr::{BoxedExpression, Expression}; +use risingwave_expr::{build_function, Result}; + +#[derive(Debug)] +pub struct ConstructStructExpression { + return_type: DataType, + children: Vec, +} + +#[async_trait::async_trait] +impl Expression for ConstructStructExpression { + fn return_type(&self) -> DataType { + self.return_type.clone() + } + + async fn eval(&self, input: &DataChunk) -> Result { + let mut struct_cols = Vec::with_capacity(self.children.len()); + for child in &self.children { + let res = child.eval(input).await?; + struct_cols.push(res); + } + Ok(Arc::new(ArrayImpl::Struct(StructArray::new( + self.return_type.as_struct().clone(), + struct_cols, + input.visibility().clone(), + )))) + } + + async fn eval_row(&self, input: &OwnedRow) -> Result { + let mut datums = Vec::with_capacity(self.children.len()); + for child in &self.children { + let res = child.eval_row(input).await?; + datums.push(res); + } + Ok(Some(ScalarImpl::Struct(StructValue::new(datums)))) + } +} + +#[build_function("construct_struct(...) -> struct", type_infer = "panic")] +fn build(return_type: DataType, children: Vec) -> Result { + assert!(return_type.is_struct()); + return_type + .as_struct() + .types() + .zip_eq_fast(children.iter()) + .for_each(|(ty, child)| { + assert_eq!(*ty, child.return_type()); + }); + + Ok(Box::new(ConstructStructExpression { + return_type, + children, + })) +} + +#[cfg(test)] +mod tests { + use risingwave_common::array::DataChunk; + use risingwave_common::row::Row; + use risingwave_common::test_prelude::DataChunkTestExt; + use risingwave_common::types::ToOwnedDatum; + use risingwave_common::util::iter_util::ZipEqDebug; + use risingwave_expr::expr::build_from_pretty; + + #[tokio::test] + async fn test_construct_struct_expr() { + let expr = build_from_pretty( + "(construct_struct:struct $0:int4 $1:int4 $2:int4)", + ); + let (input, expected) = DataChunk::from_pretty( + "i i i + 1 2 3 (1,2,3) + 4 2 1 (4,2,1) + 9 1 3 (9,1,3) + 1 1 1 (1,1,1)", + ) + .split_column_at(3); + + // test eval + let output = expr.eval(&input).await.unwrap(); + assert_eq!(&output, expected.column_at(0)); + + // test eval_row + for (row, expected) in input.rows().zip_eq_debug(expected.rows()) { + let result = expr.eval_row(&row.to_owned_row()).await.unwrap(); + assert_eq!(result, expected.datum_at(0).to_owned_datum()); + } + } +} diff --git a/src/expr/impl/src/scalar/mod.rs b/src/expr/impl/src/scalar/mod.rs index c76a7b48663ee..9bde2180d644c 100644 --- a/src/expr/impl/src/scalar/mod.rs +++ b/src/expr/impl/src/scalar/mod.rs @@ -78,6 +78,7 @@ mod to_char; mod to_jsonb; mod vnode; pub use to_jsonb::*; +mod construct_struct; mod to_timestamp; mod translate; mod trigonometric; diff --git a/src/frontend/src/expr/pure.rs b/src/frontend/src/expr/pure.rs index 08b6bb47fa042..45cf443e876a7 100644 --- a/src/frontend/src/expr/pure.rs +++ b/src/frontend/src/expr/pure.rs @@ -225,7 +225,14 @@ impl ExprVisitor for ImpureAnalyzer { | expr_node::Type::Greatest | expr_node::Type::Least | expr_node::Type::ConvertFrom - | expr_node::Type::ConvertTo => + | expr_node::Type::ConvertTo + | expr_node::Type::ConstructStruct + | expr_node::Type::IcebergBucket + | expr_node::Type::IcebergTruncate + | expr_node::Type::IcebergYear + | expr_node::Type::IcebergMonth + | expr_node::Type::IcebergDay + | expr_node::Type::IcebergHour => // expression output is deterministic(same result for the same input) { func_call diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 09e5b2e6ce6c7..bff0846a4634b 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -30,6 +30,7 @@ aws-smithy-runtime = { version = "1", default-features = false, features = ["cli aws-smithy-types = { version = "1", default-features = false, features = ["byte-stream-poll-next", "http-body-0-4-x", "hyper-0-14-x", "rt-tokio"] } axum = { version = "0.6" } base64 = { version = "0.21", features = ["alloc"] } +bigdecimal = { version = "0.4" } bit-vec = { version = "0.6" } bitflags = { version = "2", default-features = false, features = ["serde", "std"] } byteorder = { version = "1" } @@ -63,7 +64,7 @@ hashbrown-5ef9efb8ec2df382 = { package = "hashbrown", version = "0.12", features hmac = { version = "0.12", default-features = false, features = ["reset"] } hyper = { version = "0.14", features = ["full"] } indexmap = { version = "1", default-features = false, features = ["serde", "std"] } -itertools = { version = "0.11" } +itertools = { version = "0.10" } jni = { version = "0.21", features = ["invocation"] } lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] } lexical-core = { version = "0.8", features = ["format"] } @@ -145,6 +146,9 @@ url = { version = "2", features = ["serde"] } uuid = { version = "1", features = ["fast-rng", "serde", "v4"] } whoami = { version = "1" } zeroize = { version = "1" } +zstd = { version = "0.13" } +zstd-safe = { version = "7", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] } +zstd-sys = { version = "2", default-features = false, features = ["legacy", "std", "zdict_builder"] } [build-dependencies] ahash = { version = "0.8" } @@ -161,7 +165,7 @@ fixedbitset = { version = "0.4" } frunk_core = { version = "0.4", default-features = false, features = ["std"] } generic-array = { version = "0.14", default-features = false, features = ["more_lengths", "zeroize"] } hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", features = ["nightly", "raw"] } -itertools = { version = "0.11" } +itertools = { version = "0.10" } lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] } libc = { version = "0.2", features = ["extra_traits"] } log = { version = "0.4", default-features = false, features = ["kv_unstable", "std"] }