diff --git a/Cargo.lock b/Cargo.lock index dae1149309..fd6afac44c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -113,7 +113,7 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" dependencies = [ - "quote 1.0.33", + "quote 1.0.35", "syn 2.0.39", ] @@ -247,8 +247,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb1f50ebbb30eca122b188319a4398b3f7bb4a8cdf50ecfb73bfc6a3c3ce54f5" dependencies = [ "actix-router", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -816,8 +816,8 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "726535892e8eae7e70657b4c8ea93d26b8553afb1ce617caee529ef96d7dee6c" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", "synstructure", ] @@ -828,8 +828,8 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2777730b2039ac0f95f093556e61b6d26cebed5393ca6f152717777cec3a42ed" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", ] @@ -840,8 +840,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c09c69dffe06d222d072c878c3afe86eee2179806f20503faec97250268b4c24" dependencies = [ "pmutil", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "swc_macros_common", "syn 2.0.39", ] @@ -871,8 +871,8 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5fd55a5ba1179988837d24ab4c7cc8ed6efdeff578ede0416b4225a5fca35bd0" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -893,8 +893,8 @@ version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -904,8 +904,8 @@ version = "0.1.74" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -1505,8 +1505,8 @@ dependencies = [ "log", "peeking_take_while", "prettyplease 0.2.15", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "regex", "rustc-hash", "shlex", @@ -1526,8 +1526,8 @@ dependencies = [ "lazy_static", "lazycell", "peeking_take_while", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "regex", "rustc-hash", "shlex", @@ -1678,8 +1678,8 @@ checksum = "f404657a7ea7b5249e36808dff544bc88a28f26e0ac40009f674b7a009d14be3" dependencies = [ "once_cell", "proc-macro-crate 2.0.0", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", "syn_derive", ] @@ -1726,6 +1726,15 @@ dependencies = [ "uuid", ] +[[package]] +name = "bstr" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "542f33a8835a0884b006a0c3df3dadd99c0c3f296ed26c2fdc8028e01ad6230c" +dependencies = [ + "memchr", +] + [[package]] name = "bumpalo" version = "3.14.0" @@ -1755,8 +1764,8 @@ version = "0.6.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7ec4c6f261935ad534c0c22dbef2201b45918860eb1c574b972bd213a76af61" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", ] @@ -2023,8 +2032,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf9804afaaf59a91e75b022a30fb7229a7901f60c755489cc61c9b423b836442" dependencies = [ "heck", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -2043,6 +2052,48 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1" +[[package]] +name = "clickhouse" +version = "0.11.6" +source = "git+https://github.com/getdozer/clickhouse.rs.git#4c7d6f03caa933aeda5af7bff31536fccb97fba1" +dependencies = [ + "bstr", + "bytes", + "clickhouse-derive", + "clickhouse-rs-cityhash-sys", + "futures", + "hyper 0.14.27", + "hyper-tls", + "lz4", + "quanta 0.12.2", + "sealed", + "serde", + "static_assertions", + "thiserror", + "tokio", + "url", +] + +[[package]] +name = "clickhouse-derive" +version = "0.1.1" +source = "git+https://github.com/getdozer/clickhouse.rs.git#4c7d6f03caa933aeda5af7bff31536fccb97fba1" +dependencies = [ + "proc-macro2 1.0.78", + "quote 1.0.35", + "serde_derive_internals 0.29.0", + "syn 2.0.39", +] + +[[package]] +name = "clickhouse-rs-cityhash-sys" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4baf9d4700a28d6cb600e17ed6ae2b43298a5245f1f76b4eab63027ebfd592b9" +dependencies = [ + "cc", +] + [[package]] name = "clipboard-win" version = "4.5.0" @@ -2502,8 +2553,8 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -2544,8 +2595,8 @@ checksum = "859d65a907b6852c9361e3185c862aae7fafd2887876799fa55f5f99dc40d610" dependencies = [ "fnv", "ident_case", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "strsim", "syn 1.0.109", ] @@ -2558,8 +2609,8 @@ checksum = "177e3443818124b357d8e76f53be906d60937f0d3a90773a664fa63fa253e621" dependencies = [ "fnv", "ident_case", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "strsim", "syn 2.0.39", ] @@ -2571,7 +2622,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c972679f83bdf9c42bd905396b6c3588a843a17f0f16dfcfa3e2c5d57441835" dependencies = [ "darling_core 0.13.4", - "quote 1.0.33", + "quote 1.0.35", "syn 1.0.109", ] @@ -2582,7 +2633,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" dependencies = [ "darling_core 0.20.3", - "quote 1.0.33", + "quote 1.0.35", "syn 2.0.39", ] @@ -2905,7 +2956,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c65c2ffdafc1564565200967edc4851c7b55422d3913466688907efd05ea26f" dependencies = [ "deno-proc-macro-rules-macros", - "proc-macro2 1.0.69", + "proc-macro2 1.0.78", "syn 2.0.39", ] @@ -2916,8 +2967,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3047b312b7451e3190865713a4dd6e1f821aed614ada219766ebc3024a690435" dependencies = [ "once_cell", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -3325,8 +3376,8 @@ dependencies = [ "once_cell", "pmutil", "proc-macro-crate 1.3.1", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "regex", "strum 0.25.0", "strum_macros 0.25.3", @@ -3545,8 +3596,8 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", ] @@ -3556,8 +3607,8 @@ version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3418329ca0ad70234b9735dc4ceed10af4df60eff9c8e7b06cb5e520d92c3535" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", ] @@ -3567,8 +3618,8 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67e77553c4162a157adbf834ebae5b415acbecbeafc7a74b0e886657506a7611" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -3579,8 +3630,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321" dependencies = [ "convert_case", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "rustc_version 0.4.0", "syn 1.0.109", ] @@ -3648,8 +3699,8 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -3780,6 +3831,7 @@ dependencies = [ "dozer-lambda", "dozer-recordstore", "dozer-sink-aerospike", + "dozer-sink-clickhouse", "dozer-sinks", "dozer-sql", "dozer-storage", @@ -4090,6 +4142,17 @@ dependencies = [ "dozer-types", ] +[[package]] +name = "dozer-sink-clickhouse" +version = "0.1.0" +dependencies = [ + "clickhouse", + "dozer-core", + "dozer-log", + "dozer-recordstore", + "dozer-types", +] + [[package]] name = "dozer-sinks" version = "0.1.0" @@ -4312,8 +4375,8 @@ dependencies = [ "byteorder", "lazy_static", "proc-macro-error 1.0.4", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", ] @@ -4380,8 +4443,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0f0042ff8246a363dbe77d2ceedb073339e85a804b9a47636c6e016a9a32c05f" dependencies = [ "enum-ordinalize", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", ] @@ -4468,8 +4531,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "21cdad81446a7f7dc43f6a77409efeb9733d2fa65553efef6018ef257c959b73" dependencies = [ "heck", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", ] @@ -4480,8 +4543,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c9720bba047d567ffc8a3cba48bf19126600e249ab7f128e9233e6376976a116" dependencies = [ "heck", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", ] @@ -4493,8 +4556,8 @@ checksum = "1bf1fa3f06bbff1ea5b1a9c7b14aa992a39657db60a2759457328d7e058f49ee" dependencies = [ "num-bigint", "num-traits", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -4505,8 +4568,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f33313078bb8d4d05a2733a94ac4c2d8a0df9a2b84424ebf4f33bfc224a890e" dependencies = [ "once_cell", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -4749,8 +4812,8 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e4c81935e123ab0741c4c4f0d9b8377e5fb21d3de7e062fa4b1263b1fbcba1ea" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", ] @@ -4847,8 +4910,8 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a5c6c585bc94aaf2c7b51dd4c2ba22680844aba4c687be581871a6f518c5742" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -4880,7 +4943,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03ec5dc38ee19078d84a692b1c41181ff9f94331c76cee66ff0208c770b5e54f" dependencies = [ "pmutil", - "proc-macro2 1.0.69", + "proc-macro2 1.0.78", "swc_macros_common", "syn 2.0.39", ] @@ -4909,7 +4972,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b0fa992f1656e1707946bbba340ad244f0814009ef8c0118eb7b658395f19a2e" dependencies = [ "frunk_proc_macro_helpers", - "quote 1.0.33", + "quote 1.0.35", "syn 2.0.39", ] @@ -4920,8 +4983,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35b54add839292b743aeda6ebedbd8b11e93404f902c56223e51b9ec18a13d2c" dependencies = [ "frunk_core", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -4933,7 +4996,7 @@ checksum = "71b85a1d4a9a6b300b41c05e8e13ef2feca03e0334127f29eca9506a7fe13a93" dependencies = [ "frunk_core", "frunk_proc_macro_helpers", - "quote 1.0.33", + "quote 1.0.35", "syn 2.0.39", ] @@ -5048,8 +5111,8 @@ version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -5124,8 +5187,8 @@ checksum = "784f84eebc366e15251c4a8c3acee82a6a6f427949776ecb88377362a9621738" dependencies = [ "proc-macro-error 0.4.12", "proc-macro-hack", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", ] @@ -5239,8 +5302,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e45727250e75cc04ff2846a66397da8ef2b3db8e40e0cef4df67950a07621eb9" dependencies = [ "proc-macro-error 1.0.4", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", ] @@ -5781,8 +5844,8 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "11d7a9f6330b71fea57921c9b61c47ee6e84f72d394754eff6163ae67e7395eb" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", ] @@ -5801,8 +5864,8 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b139284b5cf57ecfa712bcc66950bb635b31aff41c188e8a4cfc758eca374a3f" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", ] [[package]] @@ -5926,8 +5989,8 @@ checksum = "f4467ed1321b310c2625c5aa6c1b1ffc5de4d9e42668cf697a08fb033ee8265e" dependencies = [ "Inflector", "pmutil", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -6098,8 +6161,8 @@ version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44bcd58e6c97a7fcbaffcdc95728b393b8d98933bfadad49ed4097845b57ef0b" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "regex", "syn 2.0.39", ] @@ -6439,8 +6502,8 @@ checksum = "dc487311295e0002e452025d6b580b77bb17286de87b57138f3b5db711cded68" dependencies = [ "beef", "fnv", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "regex-syntax 0.6.29", "syn 2.0.39", ] @@ -6472,6 +6535,16 @@ dependencies = [ "linked-hash-map", ] +[[package]] +name = "lz4" +version = "1.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e9e2dd86df36ce760a60f6ff6ad526f7ba1f14ba0356f8254fb6905e6494df1" +dependencies = [ + "libc", + "lz4-sys", +] + [[package]] name = "lz4-sys" version = "1.9.4" @@ -6644,7 +6717,7 @@ dependencies = [ "ipnet", "metrics", "metrics-util", - "quanta", + "quanta 0.11.1", "thiserror", "tokio", "tracing", @@ -6656,8 +6729,8 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ddece26afd34c31585c74a4db0630c376df271c285d682d1e55012197830b6df" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -6672,7 +6745,7 @@ dependencies = [ "hashbrown 0.13.1", "metrics", "num_cpus", - "quanta", + "quanta 0.11.1", "sketches-ddsketch", ] @@ -6798,8 +6871,8 @@ dependencies = [ "num-bigint", "proc-macro-crate 1.3.1", "proc-macro-error 1.0.4", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", "termcolor", "thiserror", @@ -6966,7 +7039,7 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7288eac8b54af7913c60e0eb0e2a7683020dffa342ab3fd15e28f035ba897cf" dependencies = [ - "quote 1.0.33", + "quote 1.0.35", "syn 1.0.109", "syn-mid", ] @@ -7238,8 +7311,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dcbff9bc912032c62bf65ef1d5aea88983b420f4f839db1e9b0c281a25c9c799" dependencies = [ "proc-macro-crate 1.3.1", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", ] @@ -7250,8 +7323,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c11e44798ad209ccdd91fc192f0526a369a01234f7373e1b141c96d7cee4f0e" dependencies = [ "proc-macro-crate 2.0.0", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -7437,8 +7510,8 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -7711,8 +7784,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1557010476e0595c9b568d16dcfb81b93cdeb157612726f5170d31aa707bed27" dependencies = [ "proc-macro-crate 1.3.1", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", ] @@ -7909,8 +7982,8 @@ checksum = "68bd1206e71118b5356dae5ddc61c8b11e28b09ef6a31acbd15ea48a28e0c227" dependencies = [ "pest", "pest_meta", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -8033,8 +8106,8 @@ dependencies = [ "phf_generator 0.10.0", "phf_shared 0.10.0", "proc-macro-hack", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", ] @@ -8071,8 +8144,8 @@ version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -8166,8 +8239,8 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52a40bc70c2c58040d2d8b167ba9a5ff59fc9dab7ad44771cfde3dcfde7a09c6" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -8272,7 +8345,7 @@ version = "0.1.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c8646e95016a7a6c4adea95bafa8a16baab64b583356217f2c85db4a39d9a86" dependencies = [ - "proc-macro2 1.0.69", + "proc-macro2 1.0.78", "syn 1.0.109", ] @@ -8282,7 +8355,7 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae005bd773ab59b4725093fd7df83fd7892f7d8eafb48dbd7de6e024e4215f9d" dependencies = [ - "proc-macro2 1.0.69", + "proc-macro2 1.0.78", "syn 2.0.39", ] @@ -8358,8 +8431,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "18f33027081eba0a6d8aba6d1b1c3a3be58cbb12106341c2d5759fcd9b5277e7" dependencies = [ "proc-macro-error-attr 0.4.12", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", "version_check", ] @@ -8371,8 +8444,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" dependencies = [ "proc-macro-error-attr 1.0.4", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", "version_check", ] @@ -8383,8 +8456,8 @@ version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a5b4b77fdb63c1eca72173d68d24501c54ab1269409f6b672c85deb18af69de" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", "syn-mid", "version_check", @@ -8396,8 +8469,8 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "version_check", ] @@ -8418,9 +8491,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.69" +version = "1.0.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" +checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" dependencies = [ "unicode-ident", ] @@ -8529,8 +8602,8 @@ checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" dependencies = [ "anyhow", "itertools 0.10.5", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", ] @@ -8542,8 +8615,8 @@ checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e" dependencies = [ "anyhow", "itertools 0.11.0", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -8610,8 +8683,8 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "16b845dbfca988fa33db069c0e230574d15a3088f147a87b64c7589eb662c9ac" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", ] @@ -8681,9 +8754,9 @@ version = "0.18.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9d39c55dab3fc5a4b25bbd1ac10a2da452c4aca13bb450f22818a002e29648d" dependencies = [ - "proc-macro2 1.0.69", + "proc-macro2 1.0.78", "pyo3-macros-backend", - "quote 1.0.33", + "quote 1.0.35", "syn 1.0.109", ] @@ -8693,8 +8766,8 @@ version = "0.18.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97daff08a4c48320587b5224cc98d609e3c27b6d437315bd40b605c98eeb5918" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", ] @@ -8714,7 +8787,22 @@ dependencies = [ "libc", "mach2", "once_cell", - "raw-cpuid", + "raw-cpuid 10.7.0", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + +[[package]] +name = "quanta" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ca0b7bac0b97248c40bb77288fc52029cf1459c0461ea1b05ee32ccf011de2c" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid 11.0.1", "wasi 0.11.0+wasi-snapshot-preview1", "web-sys", "winapi", @@ -8747,11 +8835,11 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.33" +version = "1.0.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" +checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" dependencies = [ - "proc-macro2 1.0.69", + "proc-macro2 1.0.78", ] [[package]] @@ -8863,6 +8951,15 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "raw-cpuid" +version = "11.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d86a7c4638d42c44551f4791a20e687dbb4c3de1f33c43dd71e355cd429def1" +dependencies = [ + "bitflags 2.4.1", +] + [[package]] name = "raw-window-handle" version = "0.5.2" @@ -9178,8 +9275,8 @@ version = "0.7.42" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b2e06b915b5c230a17d7a736d1e2e63ee753c256a8614ef3f5147b13a4f5541d" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", ] @@ -9492,8 +9589,8 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a32af5427251d2e4be14fc151eabe18abb4a7aad5efee7044da9f096c906a43" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -9569,9 +9666,9 @@ version = "0.8.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c767fd6fa65d9ccf9cf026122c1b555f2ef9a4f0cea69da4d7dbc3e258d30967" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", - "serde_derive_internals", + "proc-macro2 1.0.78", + "quote 1.0.35", + "serde_derive_internals 0.26.0", "syn 1.0.109", ] @@ -9621,6 +9718,18 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" +[[package]] +name = "sealed" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4a8caec23b7800fb97971a1c6ae365b6239aaeddfb934d6265f8505e795699d" +dependencies = [ + "heck", + "proc-macro2 1.0.78", + "quote 1.0.35", + "syn 2.0.39", +] + [[package]] name = "sec1" version = "0.3.0" @@ -9770,8 +9879,8 @@ version = "1.0.193" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -9781,11 +9890,22 @@ version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85bf8229e7920a9f636479437026331ce11aa132b4dde37d121944a44d6e5f3c" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", ] +[[package]] +name = "serde_derive_internals" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "330f01ce65a3a5fe59a60c82f3c9a024b573b8a6e875bd233fe5f934e71d54e3" +dependencies = [ + "proc-macro2 1.0.78", + "quote 1.0.35", + "syn 2.0.39", +] + [[package]] name = "serde_json" version = "1.0.108" @@ -9843,8 +9963,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e182d6ec6f05393cc0e5ed1bf81ad6db3a8feedf8ee515ecdd369809bcce8082" dependencies = [ "darling 0.13.4", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", ] @@ -9895,8 +10015,8 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "079a83df15f85d89a68d64ae1238f142f172b1fa915d0d76b26a7cba1b659a69" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", ] @@ -9906,8 +10026,8 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91d129178576168c589c9ec973feedf7d3126c01ac2bf08795109aa35b69fb8f" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -10101,8 +10221,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "990079665f075b699031e9c08fd3ab99be5029b96f3b78dc0709e8f77e4efebf" dependencies = [ "heck", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", ] @@ -10272,8 +10392,8 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55fe75cb4a364c7f7ae06c7dbbc8d84bddd85d6cdf9975963c3935bc1991761e" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", ] @@ -10330,8 +10450,8 @@ checksum = "6bb30289b722be4ff74a408c3cc27edeaad656e06cb1fe8fa9231fa59c728988" dependencies = [ "phf_generator 0.10.0", "phf_shared 0.10.0", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", ] [[package]] @@ -10341,8 +10461,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fa4d4f81d7c05b9161f8de839975d3326328b8ba2831164b465524cc2f55252" dependencies = [ "pmutil", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "swc_macros_common", "syn 2.0.39", ] @@ -10386,8 +10506,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59" dependencies = [ "heck", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "rustversion", "syn 1.0.109", ] @@ -10399,8 +10519,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23dc1fa9ac9c169a78ba62f0b841814b7abae11bdd047b9c58f893439e309ea0" dependencies = [ "heck", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "rustversion", "syn 2.0.39", ] @@ -10491,8 +10611,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5b5aaca9a0082be4515f0fbbecc191bf5829cd25b5b9c0a2810f6a2bb0d6829" dependencies = [ "pmutil", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "swc_macros_common", "syn 2.0.39", ] @@ -10540,8 +10660,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dcdff076dccca6cc6a0e0b2a2c8acfb066014382bc6df98ec99e755484814384" dependencies = [ "pmutil", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "swc_macros_common", "syn 2.0.39", ] @@ -10623,8 +10743,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8188eab297da773836ef5cf2af03ee5cca7a563e1be4b146f8141452c28cc690" dependencies = [ "pmutil", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "swc_macros_common", "syn 2.0.39", ] @@ -10728,8 +10848,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05a95d367e228d52484c53336991fdcf47b6b553ef835d9159db4ba40efb0ee8" dependencies = [ "pmutil", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -10740,8 +10860,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a273205ccb09b51fabe88c49f3b34c5a4631c4c00a16ae20e03111d6a42e832" dependencies = [ "pmutil", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -10763,8 +10883,8 @@ checksum = "0f322730fb82f3930a450ac24de8c98523af7d34ab8cb2f46bcb405839891a99" dependencies = [ "Inflector", "pmutil", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "swc_macros_common", "syn 2.0.39", ] @@ -10786,8 +10906,8 @@ version = "1.0.109" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "unicode-ident", ] @@ -10797,8 +10917,8 @@ version = "2.0.39" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23e78b90f2fcf45d3e842032ce32e3f2d1545ba6636271dcbf24fa306d87be7a" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "unicode-ident", ] @@ -10808,8 +10928,8 @@ version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fea305d57546cc8cd04feb14b62ec84bf17f50e3f7b12560d7bfa9265f39d9ed" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", ] @@ -10820,8 +10940,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1329189c02ff984e9736652b1631330da25eaa6bc639089ed4915d25446cbe7b" dependencies = [ "proc-macro-error 1.0.4", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -10837,8 +10957,8 @@ version = "0.12.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f36bdaa60a83aca3921b5259d5400cbf5e90fc51931376a9bd4a0eb79aa7210f" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", "unicode-xid 0.2.4", ] @@ -10966,8 +11086,8 @@ version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -11111,8 +11231,8 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -11353,9 +11473,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d021fc044c18582b9a2408cd0dd05b1596e3ecdb5c4df822bb0183545683889" dependencies = [ "prettyplease 0.2.15", - "proc-macro2 1.0.69", + "proc-macro2 1.0.78", "prost-build 0.12.3", - "quote 1.0.33", + "quote 1.0.35", "syn 2.0.39", ] @@ -11485,8 +11605,8 @@ version = "0.1.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -11685,8 +11805,8 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89851716b67b937e393b3daa8423e67ddfc4bbbf1654bcf05488e95e0828db0c" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 1.0.109", ] @@ -12004,8 +12124,8 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d257817081c7dffcdbab24b9e62d2def62e2ff7d00b1c20062551e6cccc145ff" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", ] [[package]] @@ -12073,8 +12193,8 @@ dependencies = [ "bumpalo", "log", "once_cell", - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", "wasm-bindgen-shared", ] @@ -12097,7 +12217,7 @@ version = "0.2.88" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5961017b3b08ad5f3fe39f1e79877f8ee7c23c5e5fd5eb80de95abc41f1f16b2" dependencies = [ - "quote 1.0.33", + "quote 1.0.35", "wasm-bindgen-macro-support", ] @@ -12107,8 +12227,8 @@ version = "0.2.88" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c5353b8dab669f5e10f5bd76df26a9360c748f054f862ff5f3f8aae0c7fb3907" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", "wasm-bindgen-backend", "wasm-bindgen-shared", @@ -12730,8 +12850,8 @@ version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "855e0f6af9cd72b87d8a6c586f3cb583f5cdcc62c2c80869d8cd7e96fdf7ee20" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -12741,8 +12861,8 @@ version = "0.7.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd7e48ccf166952882ca8bd778a43502c64f33bf94c12ebe2a7f08e5a0f6689f" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] @@ -12761,8 +12881,8 @@ version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", + "proc-macro2 1.0.78", + "quote 1.0.35", "syn 2.0.39", ] diff --git a/Cargo.toml b/Cargo.toml index 440736f99d..d762377d0c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ members = [ "dozer-lambda", "dozer-sinks", "dozer-sink-aerospike", + "dozer-sink-clickhouse", ] resolver = "2" diff --git a/dozer-cli/Cargo.toml b/dozer-cli/Cargo.toml index bc573cb72f..2f50c7d855 100644 --- a/dozer-cli/Cargo.toml +++ b/dozer-cli/Cargo.toml @@ -22,6 +22,7 @@ dozer-recordstore = { path = "../dozer-recordstore" } dozer-lambda = { path = "../dozer-lambda" } dozer-sinks = { path = "../dozer-sinks" } dozer-sink-aerospike = { path = "../dozer-sink-aerospike" } +dozer-sink-clickhouse = { path = "../dozer-sink-clickhouse" } uuid = { version = "1.6.1", features = ["v4", "serde"] } tokio = { version = "1", features = ["full"] } diff --git a/dozer-cli/src/pipeline/builder.rs b/dozer-cli/src/pipeline/builder.rs index a5a0bb58de..8f6347f7eb 100644 --- a/dozer-cli/src/pipeline/builder.rs +++ b/dozer-cli/src/pipeline/builder.rs @@ -15,7 +15,7 @@ use dozer_sql::builder::{OutputNodeInfo, QueryContext}; use dozer_tracing::LabelsAndProgress; use dozer_types::log::debug; use dozer_types::models::connection::Connection; -use dozer_types::models::endpoint::AerospikeSinkConfig; +use dozer_types::models::endpoint::{AerospikeSinkConfig, ClickhouseSinkConfig}; use dozer_types::models::flags::Flags; use dozer_types::models::source::Source; use dozer_types::models::udf_config::UdfConfig; @@ -26,6 +26,7 @@ use tokio::sync::Mutex; use crate::pipeline::dummy_sink::DummySinkFactory; use crate::pipeline::LogSinkFactory; use dozer_sink_aerospike::AerospikeSinkFactory; +use dozer_sink_clickhouse::ClickhouseSinkFactory; use super::connector_source::ConnectorSourceFactoryError; use super::source_builder::SourceBuilder; @@ -60,6 +61,7 @@ pub enum EndpointLogKind { Api { log: Arc> }, Dummy, Aerospike { config: AerospikeSinkConfig }, + Clickhouse { config: ClickhouseSinkConfig }, } pub struct PipelineBuilder<'a> { @@ -290,6 +292,9 @@ impl<'a> PipelineBuilder<'a> { EndpointLogKind::Aerospike { config } => { Box::new(AerospikeSinkFactory::new(config)) } + EndpointLogKind::Clickhouse { config } => { + Box::new(ClickhouseSinkFactory::new(config.clone(), runtime.clone())) + } }; match table_info { diff --git a/dozer-cli/src/pipeline/dummy_sink.rs b/dozer-cli/src/pipeline/dummy_sink.rs index 1a08995dc8..20b084674e 100644 --- a/dozer-cli/src/pipeline/dummy_sink.rs +++ b/dozer-cli/src/pipeline/dummy_sink.rs @@ -1,3 +1,4 @@ +use dozer_api::async_trait::async_trait; use std::{collections::HashMap, time::Instant}; use dozer_cache::dozer_log::storage::Queue; @@ -17,6 +18,7 @@ use dozer_types::{ #[derive(Debug)] pub struct DummySinkFactory; +#[async_trait] impl SinkFactory for DummySinkFactory { fn get_input_ports(&self) -> Vec { vec![DEFAULT_PORT_HANDLE] @@ -26,7 +28,7 @@ impl SinkFactory for DummySinkFactory { Ok(()) } - fn build( + async fn build( &self, input_schemas: HashMap, ) -> Result, BoxedError> { diff --git a/dozer-cli/src/pipeline/log_sink.rs b/dozer-cli/src/pipeline/log_sink.rs index 26fa524fd2..ff00cee91b 100644 --- a/dozer-cli/src/pipeline/log_sink.rs +++ b/dozer-cli/src/pipeline/log_sink.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, fmt::Debug, ops::Deref, sync::Arc}; +use dozer_api::async_trait::async_trait; use dozer_cache::dozer_log::{ replication::{Log, LogOperation}, storage::Queue, @@ -40,6 +41,7 @@ impl LogSinkFactory { } } +#[async_trait] impl SinkFactory for LogSinkFactory { fn get_input_ports(&self) -> Vec { vec![DEFAULT_PORT_HANDLE] @@ -50,7 +52,7 @@ impl SinkFactory for LogSinkFactory { Ok(()) } - fn build( + async fn build( &self, _input_schemas: HashMap, ) -> Result, BoxedError> { diff --git a/dozer-cli/src/simple/executor.rs b/dozer-cli/src/simple/executor.rs index 91d838731e..c356527441 100644 --- a/dozer-cli/src/simple/executor.rs +++ b/dozer-cli/src/simple/executor.rs @@ -5,7 +5,9 @@ use dozer_cache::dozer_log::home_dir::{BuildPath, HomeDir}; use dozer_cache::dozer_log::replication::Log; use dozer_core::checkpoint::{CheckpointOptions, OptionCheckpoint}; use dozer_tracing::LabelsAndProgress; -use dozer_types::models::endpoint::{AerospikeSinkConfig, Endpoint, EndpointKind}; +use dozer_types::models::endpoint::{ + AerospikeSinkConfig, ClickhouseSinkConfig, Endpoint, EndpointKind, +}; use dozer_types::models::flags::Flags; use tokio::runtime::Runtime; use tokio::sync::Mutex; @@ -45,6 +47,7 @@ enum ExecutorEndpointKind { Api { log_endpoint: LogEndpoint }, Dummy, Aerospike { config: AerospikeSinkConfig }, + Clickhouse { config: ClickhouseSinkConfig }, } impl<'a> Executor<'a> { @@ -89,6 +92,9 @@ impl<'a> Executor<'a> { EndpointKind::Aerospike(config) => ExecutorEndpointKind::Aerospike { config: config.clone(), }, + EndpointKind::Clickhouse(config) => ExecutorEndpointKind::Clickhouse { + config: config.clone(), + }, }; executor_endpoints.push(ExecutorEndpoint { @@ -147,6 +153,9 @@ impl<'a> Executor<'a> { ExecutorEndpointKind::Aerospike { config } => { EndpointLogKind::Aerospike { config } } + ExecutorEndpointKind::Clickhouse { config } => { + EndpointLogKind::Clickhouse { config } + } }; EndpointLog { table_name: endpoint.table_name, diff --git a/dozer-core/src/builder_dag.rs b/dozer-core/src/builder_dag.rs index 1645931378..6e53e39dd1 100644 --- a/dozer-core/src/builder_dag.rs +++ b/dozer-core/src/builder_dag.rs @@ -118,6 +118,7 @@ impl BuilderDag { .remove(&node_index) .expect("we collected all input schemas"), ) + .await .map_err(ExecutionError::Factory)?; NodeType { handle: node.handle, diff --git a/dozer-core/src/node.rs b/dozer-core/src/node.rs index 5b3a629d51..166eb94092 100644 --- a/dozer-core/src/node.rs +++ b/dozer-core/src/node.rs @@ -103,10 +103,11 @@ pub trait Processor: Send + Sync + Debug { ) -> Result<(), BoxedError>; } +#[async_trait] pub trait SinkFactory: Send + Sync + Debug { fn get_input_ports(&self) -> Vec; fn prepare(&self, input_schemas: HashMap) -> Result<(), BoxedError>; - fn build( + async fn build( &self, input_schemas: HashMap, ) -> Result, BoxedError>; diff --git a/dozer-core/src/tests/dag_base_errors.rs b/dozer-core/src/tests/dag_base_errors.rs index 883865cf35..604d72107a 100644 --- a/dozer-core/src/tests/dag_base_errors.rs +++ b/dozer-core/src/tests/dag_base_errors.rs @@ -419,6 +419,7 @@ impl ErrSinkFactory { } } +#[async_trait] impl SinkFactory for ErrSinkFactory { fn get_input_ports(&self) -> Vec { vec![COUNTING_SINK_INPUT_PORT] @@ -428,7 +429,7 @@ impl SinkFactory for ErrSinkFactory { Ok(()) } - fn build( + async fn build( &self, _input_schemas: HashMap, ) -> Result, BoxedError> { diff --git a/dozer-core/src/tests/dag_schemas.rs b/dozer-core/src/tests/dag_schemas.rs index 7312332c03..7122af32c0 100644 --- a/dozer-core/src/tests/dag_schemas.rs +++ b/dozer-core/src/tests/dag_schemas.rs @@ -165,6 +165,7 @@ impl ProcessorFactory for TestJoinProcessorFactory { #[derive(Debug)] struct TestSinkFactory {} +#[async_trait] impl SinkFactory for TestSinkFactory { fn get_input_ports(&self) -> Vec { vec![DEFAULT_PORT_HANDLE] @@ -174,7 +175,7 @@ impl SinkFactory for TestSinkFactory { Ok(()) } - fn build( + async fn build( &self, _input_schemas: HashMap, ) -> Result, BoxedError> { diff --git a/dozer-core/src/tests/sinks.rs b/dozer-core/src/tests/sinks.rs index c6ba8f53d6..7b10f478d7 100644 --- a/dozer-core/src/tests/sinks.rs +++ b/dozer-core/src/tests/sinks.rs @@ -9,6 +9,7 @@ use dozer_types::types::{Operation, Schema}; use dozer_types::log::debug; use std::collections::HashMap; +use dozer_types::tonic::async_trait; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -29,6 +30,7 @@ impl CountingSinkFactory { } } +#[async_trait] impl SinkFactory for CountingSinkFactory { fn get_input_ports(&self) -> Vec { vec![COUNTING_SINK_INPUT_PORT] @@ -38,7 +40,7 @@ impl SinkFactory for CountingSinkFactory { Ok(()) } - fn build( + async fn build( &self, _input_schemas: HashMap, ) -> Result, BoxedError> { @@ -104,6 +106,7 @@ impl Sink for CountingSink { #[derive(Debug)] pub struct ConnectivityTestSinkFactory; +#[async_trait] impl SinkFactory for ConnectivityTestSinkFactory { fn get_input_ports(&self) -> Vec { vec![DEFAULT_PORT_HANDLE] @@ -113,7 +116,7 @@ impl SinkFactory for ConnectivityTestSinkFactory { unimplemented!("This struct is for connectivity test, only input ports are defined") } - fn build( + async fn build( &self, _input_schemas: HashMap, ) -> Result, BoxedError> { @@ -124,6 +127,7 @@ impl SinkFactory for ConnectivityTestSinkFactory { #[derive(Debug)] pub struct NoInputPortSinkFactory; +#[async_trait] impl SinkFactory for NoInputPortSinkFactory { fn get_input_ports(&self) -> Vec { vec![] @@ -133,7 +137,7 @@ impl SinkFactory for NoInputPortSinkFactory { unimplemented!("This struct is for connectivity test, only input ports are defined") } - fn build( + async fn build( &self, _input_schemas: HashMap, ) -> Result, BoxedError> { diff --git a/dozer-sink-aerospike/src/lib.rs b/dozer-sink-aerospike/src/lib.rs index beaa696d98..aaa06e5a39 100644 --- a/dozer-sink-aerospike/src/lib.rs +++ b/dozer-sink-aerospike/src/lib.rs @@ -34,6 +34,7 @@ use dozer_types::errors::internal::BoxedError; use dozer_types::geo::{Coord, Point}; use dozer_types::log::{error, info}; use dozer_types::ordered_float::OrderedFloat; +use dozer_types::tonic::async_trait; use dozer_types::types::DozerPoint; use dozer_types::{ errors::types::TypeError, @@ -217,6 +218,7 @@ impl AerospikeSinkFactory { } } +#[async_trait] impl SinkFactory for AerospikeSinkFactory { fn get_input_ports(&self) -> Vec { vec![DEFAULT_PORT_HANDLE] @@ -227,7 +229,7 @@ impl SinkFactory for AerospikeSinkFactory { Ok(()) } - fn build( + async fn build( &self, mut input_schemas: HashMap, ) -> Result, BoxedError> { @@ -920,6 +922,7 @@ impl Sink for AerospikeSink { #[cfg(test)] mod tests { + use dozer_log::tokio; use std::time::Duration; use dozer_recordstore::ProcessorRecordStore; @@ -944,12 +947,12 @@ mod tests { const N_RECORDS: usize = 1000; const BATCH_SIZE: usize = 100; - #[test] + #[tokio::test] #[ignore] - fn test_inserts() { + async fn test_inserts() { let rs = ProcessorRecordStore::new(dozer_types::models::app_config::RecordStore::InMemory) .unwrap(); - let mut sink = sink("inserts"); + let mut sink = sink("inserts").await; for i in 0..N_RECORDS { sink.process( DEFAULT_PORT_HANDLE, @@ -962,9 +965,9 @@ mod tests { } } - #[test] + #[tokio::test] #[ignore] - fn test_inserts_batch() { + async fn test_inserts_batch() { let mut batches = Vec::with_capacity(N_RECORDS / BATCH_SIZE); for i in 0..N_RECORDS / BATCH_SIZE { let mut batch = Vec::with_capacity(BATCH_SIZE); @@ -973,7 +976,7 @@ mod tests { } batches.push(batch); } - let mut sink = sink("inserts_batch"); + let mut sink = sink("inserts_batch").await; let rs = ProcessorRecordStore::new(dozer_types::models::app_config::RecordStore::InMemory) .unwrap(); for batch in batches { @@ -986,7 +989,7 @@ mod tests { } } - fn sink(set: &str) -> Box { + async fn sink(set: &str) -> Box { let mut schema = Schema::new(); schema .field(f("uint", FieldType::UInt), true) @@ -1020,6 +1023,7 @@ mod tests { }); factory .build([(DEFAULT_PORT_HANDLE, schema)].into()) + .await .unwrap() } diff --git a/dozer-sink-clickhouse/Cargo.toml b/dozer-sink-clickhouse/Cargo.toml new file mode 100644 index 0000000000..57682d5f7c --- /dev/null +++ b/dozer-sink-clickhouse/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "dozer-sink-clickhouse" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dozer-core = { path = "../dozer-core" } +dozer-types = { path = "../dozer-types" } +dozer-log = { path = "../dozer-log" } +dozer-recordstore = { path = "../dozer-recordstore" } +clickhouse = { git = "https://github.com/getdozer/clickhouse.rs.git" } \ No newline at end of file diff --git a/dozer-sink-clickhouse/src/ddl.rs b/dozer-sink-clickhouse/src/ddl.rs new file mode 100644 index 0000000000..ae4f925b7f --- /dev/null +++ b/dozer-sink-clickhouse/src/ddl.rs @@ -0,0 +1,108 @@ +use dozer_types::log::warn; +use dozer_types::models::endpoint::ClickhouseSinkTableOptions; +use dozer_types::types::{FieldDefinition, FieldType, Schema}; + +pub struct ClickhouseDDL {} + +const DEFAULT_TABLE_ENGINE: &str = "MergeTree()"; + +impl ClickhouseDDL { + pub fn get_create_table_query( + table_name: String, + schema: Schema, + sink_options: Option, + primary_keys: Option>, + ) -> String { + let mut parts = schema + .fields + .iter() + .map(|field| { + let typ = Self::map_field_to_type(field); + format!("{} {}", field.name, typ) + }) + .collect::>(); + + let engine = sink_options + .as_ref() + .map(|options| { + options + .engine + .clone() + .unwrap_or_else(|| DEFAULT_TABLE_ENGINE.to_string()) + }) + .unwrap_or_else(|| DEFAULT_TABLE_ENGINE.to_string()); + + if let Some(pk) = primary_keys { + parts.push(format!("PRIMARY KEY ({})", pk.join(", "))); + } + + let query = parts.join(",\n"); + + let partition_by = sink_options + .as_ref() + .and_then(|options| options.partition_by.clone()) + .map_or("".to_string(), |partition_by| { + format!("PARTITION BY {}\n", partition_by) + }); + let sample_by = sink_options + .as_ref() + .and_then(|options| options.sample_by.clone()) + .map_or("".to_string(), |partition_by| { + format!("SAMPLE BY {}\n", partition_by) + }); + let order_by = sink_options + .as_ref() + .and_then(|options| options.order_by.clone()) + .map_or("".to_string(), |order_by| { + format!("ORDER BY ({})\n", order_by.join(", ")) + }); + let cluster = sink_options + .as_ref() + .and_then(|options| options.cluster.clone()) + .map_or("".to_string(), |cluster| { + format!("ON CLUSTER {}\n", cluster) + }); + + format!( + "CREATE TABLE IF NOT EXISTS {table_name} {cluster} ( + {query} + ) + ENGINE = {engine} + {order_by} + {partition_by} + {sample_by} + ", + ) + } + + pub fn map_field_to_type(field: &FieldDefinition) -> String { + let typ = match field.typ { + FieldType::UInt => "UInt64", + FieldType::U128 => "UInt128", + FieldType::Int => "Int64", + FieldType::I128 => "Int128", + FieldType::Float => "Float64", + FieldType::Boolean => "Boolean", + FieldType::String => "String", + FieldType::Text => "String", + FieldType::Binary => "Array(UInt8)", + FieldType::Decimal => "Decimal", + FieldType::Timestamp => "DateTime64(3)", + FieldType::Date => "Date", + FieldType::Json => "JSON", + FieldType::Point => "Point", + FieldType::Duration => unimplemented!(), + }; + + if field.nullable { + if field.typ != FieldType::Binary { + format!("Nullable({})", typ) + } else { + warn!("Binary field cannot be nullable, ignoring nullable flag"); + typ.to_string() + } + } else { + typ.to_string() + } + } +} diff --git a/dozer-sink-clickhouse/src/lib.rs b/dozer-sink-clickhouse/src/lib.rs new file mode 100644 index 0000000000..55dc5b3981 --- /dev/null +++ b/dozer-sink-clickhouse/src/lib.rs @@ -0,0 +1,450 @@ +mod ddl; +mod schema; +#[cfg(test)] +mod tests; + +use crate::ClickhouseSinkError::{ + PrimaryKeyNotFound, SchemaFieldNotFoundByIndex, UnsupportedOperation, +}; +use clickhouse::inserter::Inserter; +use clickhouse::Client; +use dozer_core::epoch::Epoch; +use dozer_core::node::{PortHandle, Sink, SinkFactory}; +use dozer_core::DEFAULT_PORT_HANDLE; +use dozer_log::storage::Queue; +use dozer_log::tokio::runtime::Runtime; +use dozer_recordstore::ProcessorRecordStore; +use dozer_types::errors::internal::BoxedError; +use dozer_types::log::debug; +use dozer_types::models::endpoint::ClickhouseSinkConfig; + +use dozer_types::serde::Serialize; +use dozer_types::tonic::async_trait; +use dozer_types::types::{DozerDuration, DozerPoint, Field, FieldType, Operation, Record, Schema}; +use std::collections::HashMap; +use std::fmt::Debug; +use std::sync::Arc; + +use crate::schema::{ClickhouseSchema, ClickhouseTable}; +use dozer_types::chrono::{DateTime, FixedOffset, NaiveDate}; +use dozer_types::json_types::JsonValue; +use dozer_types::ordered_float::OrderedFloat; +use dozer_types::rust_decimal::Decimal; +use dozer_types::serde_bytes; +use dozer_types::thiserror::{self, Error}; + +pub const BATCH_SIZE: usize = 100; + +#[derive(Error, Debug)] +enum ClickhouseSinkError { + #[error("Only MergeTree engine is supported for delete operation")] + UnsupportedOperation, + + #[error("Column {0} not found in sink table")] + ColumnNotFound(String), + + #[error("Column {0} has type {1} in dozer schema but type {2} in sink table")] + ColumnTypeMismatch(String, String, String), + + #[error("Clickhouse query error: {0}")] + ClickhouseQueryError(#[from] clickhouse::error::Error), + + #[error("Primary key not found")] + PrimaryKeyNotFound, + + #[error("Type {1} is not supported for column {0}")] + TypeNotSupported(String, String), + + #[error("Sink table does not exist and create_table_options is not set")] + SinkTableDoesNotExist, + + #[error("Expected primary key {0:?} but got {1:?}")] + PrimaryKeyMismatch(Vec, Vec), + + #[error("Schema field not found by index {0}")] + SchemaFieldNotFoundByIndex(usize), +} + +#[derive(Debug)] +pub struct ClickhouseSinkFactory { + runtime: Arc, + config: ClickhouseSinkConfig, +} + +#[derive(Debug, Serialize)] +#[serde(crate = "dozer_types::serde", untagged)] +pub enum FieldWrapper { + UInt(u64), + U128(u128), + Int(i64), + I128(i128), + Float(#[cfg_attr(feature= "arbitrary", arbitrary(with = arbitrary_float))] OrderedFloat), + Boolean(bool), + String(String), + Text(String), + Binary(#[serde(with = "serde_bytes")] Vec), + Decimal(Decimal), + Timestamp(DateTime), + Date(NaiveDate), + Json(#[cfg_attr(feature= "arbitrary", arbitrary(with = arb_json::arbitrary_json))] JsonValue), + Point(DozerPoint), + Duration(DozerDuration), + OptionalUInt(Option), + OptionalU128(Option), + OptionalInt(Option), + OptionalI128(Option), + OptionalFloat( + #[cfg_attr(feature= "arbitrary", arbitrary(with = arbitrary_float))] + Option>, + ), + OptionalBoolean(Option), + OptionalString(Option), + OptionalText(Option), + OptionalDecimal(Option), + OptionalTimestamp(Option>), + OptionalDate(Option), + OptionalJson( + #[cfg_attr(feature= "arbitrary", arbitrary(with = arb_json::arbitrary_json))] + Option, + ), + OptionalPoint(Option), + OptionalDuration(Option), + Null(Option<()>), +} + +fn convert_field_to_ff(field: Field, nullable: bool) -> FieldWrapper { + if nullable { + match field { + Field::UInt(v) => FieldWrapper::OptionalUInt(Some(v)), + Field::U128(v) => FieldWrapper::OptionalU128(Some(v)), + Field::Int(v) => FieldWrapper::OptionalInt(Some(v)), + Field::I128(v) => FieldWrapper::OptionalI128(Some(v)), + Field::Float(v) => FieldWrapper::OptionalFloat(Some(v)), + Field::Boolean(v) => FieldWrapper::OptionalBoolean(Some(v)), + Field::String(v) => FieldWrapper::OptionalString(Some(v)), + Field::Text(v) => FieldWrapper::OptionalText(Some(v)), + Field::Binary(v) => FieldWrapper::Binary(v), + Field::Decimal(v) => FieldWrapper::OptionalDecimal(Some(v)), + Field::Timestamp(v) => FieldWrapper::OptionalTimestamp(Some(v)), + Field::Date(v) => FieldWrapper::OptionalDate(Some(v)), + Field::Json(v) => FieldWrapper::OptionalJson(Some(v)), + Field::Point(v) => FieldWrapper::OptionalPoint(Some(v)), + Field::Duration(v) => FieldWrapper::OptionalDuration(Some(v)), + Field::Null => FieldWrapper::Null(None), + } + } else { + match field { + Field::UInt(v) => FieldWrapper::UInt(v), + Field::U128(v) => FieldWrapper::U128(v), + Field::Int(v) => FieldWrapper::Int(v), + Field::I128(v) => FieldWrapper::I128(v), + Field::Float(v) => FieldWrapper::Float(v), + Field::Boolean(v) => FieldWrapper::Boolean(v), + Field::String(v) => FieldWrapper::String(v), + Field::Text(v) => FieldWrapper::Text(v), + Field::Binary(v) => FieldWrapper::Binary(v), + Field::Decimal(v) => FieldWrapper::Decimal(v), + Field::Timestamp(v) => FieldWrapper::Timestamp(v), + Field::Date(v) => FieldWrapper::Date(v), + Field::Json(v) => FieldWrapper::Json(v), + Field::Point(v) => FieldWrapper::Point(v), + Field::Duration(v) => FieldWrapper::Duration(v), + Field::Null => FieldWrapper::Null(None), + } + } +} +impl ClickhouseSinkFactory { + pub fn new(config: ClickhouseSinkConfig, runtime: Arc) -> Self { + Self { config, runtime } + } +} + +#[async_trait] +impl SinkFactory for ClickhouseSinkFactory { + fn get_input_ports(&self) -> Vec { + vec![DEFAULT_PORT_HANDLE] + } + + fn prepare(&self, input_schemas: HashMap) -> Result<(), BoxedError> { + debug_assert!(input_schemas.len() == 1); + Ok(()) + } + + async fn build( + &self, + mut input_schemas: HashMap, + ) -> Result, BoxedError> { + let schema = input_schemas.remove(&DEFAULT_PORT_HANDLE).unwrap(); + let mut client = Client::default() + .with_url(self.config.database_url.clone()) + .with_user(self.config.user.clone()) + .with_database(self.config.database.clone()); + + if let Some(password) = self.config.password.clone() { + client = client.with_password(password); + } + + let table = ClickhouseSchema::get_clickhouse_table(&client, &self.config, &schema).await?; + let primary_key_field_names = + ClickhouseSchema::get_primary_keys(&client, &self.config).await?; + + let primary_key_fields_indexes: Result, ClickhouseSinkError> = + primary_key_field_names + .iter() + .map(|primary_key| { + schema + .fields + .iter() + .position(|field| field.name == *primary_key) + .ok_or(PrimaryKeyNotFound) + }) + .collect(); + + let sink = ClickhouseSink::new( + client, + self.config.clone(), + schema, + self.runtime.clone(), + table, + primary_key_fields_indexes?, + ); + + Ok(Box::new(sink)) + } +} + +pub(crate) struct ClickhouseSink { + pub(crate) client: Client, + pub(crate) runtime: Arc, + pub(crate) schema: Schema, + pub(crate) inserter: Inserter, + pub(crate) sink_table_name: String, + pub(crate) table: ClickhouseTable, + pub(crate) primary_key_fields_indexes: Vec, +} + +impl Debug for ClickhouseSink { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ClickhouseSink") + .field("sink_table_name", &self.sink_table_name) + .field( + "primary_key_fields_indexes", + &self.primary_key_fields_indexes, + ) + .field("table", &self.table) + .field("schema", &self.schema) + .finish() + } +} + +impl ClickhouseSink { + pub fn new( + client: Client, + config: ClickhouseSinkConfig, + schema: Schema, + runtime: Arc, + table: ClickhouseTable, + primary_key_fields_indexes: Vec, + ) -> Self { + let fields_list = schema + .fields + .iter() + .map(|field| field.name.as_str()) + .collect::>(); + + let inserter = client + .inserter(&config.sink_table_name, fields_list.as_slice()) + .unwrap() + .with_max_rows((BATCH_SIZE * schema.fields.len()).try_into().unwrap()); + + Self { + client, + runtime, + schema, + inserter, + sink_table_name: config.sink_table_name, + table, + primary_key_fields_indexes, + } + } + + pub fn commit_insert(&mut self) -> Result<(), BoxedError> { + self.runtime.block_on(async { + let stats = self.inserter.commit().await?; + if stats.rows > 0 { + debug!( + "{} bytes, {} rows, {} transactions have been inserted", + stats.bytes, stats.rows, stats.transactions, + ); + } + + Ok::<(), BoxedError>(()) + }) + } + + fn map_fields(&self, record: Record) -> Result, ClickhouseSinkError> { + record + .values + .into_iter() + .enumerate() + .map(|(index, mut field)| match self.schema.fields.get(index) { + Some(schema_field) => { + if schema_field.r#typ == FieldType::Binary && Field::Null == field { + field = Field::Binary(vec![]); + } + + Ok(convert_field_to_ff(field.clone(), schema_field.nullable)) + } + None => Err(SchemaFieldNotFoundByIndex(index)), + }) + .collect() + } +} + +impl Sink for ClickhouseSink { + fn commit(&mut self, _epoch_details: &Epoch) -> Result<(), BoxedError> { + self.commit_insert() + } + + fn process( + &mut self, + _from_port: PortHandle, + _record_store: &ProcessorRecordStore, + op: Operation, + ) -> Result<(), BoxedError> { + match op { + Operation::Insert { new } => { + let values = self.map_fields(new)?; + self.runtime.block_on(async { + for value in values { + self.inserter.write(&value)?; + } + + Ok::<(), BoxedError>(()) + })?; + + self.commit_insert()?; + } + Operation::Delete { old } => { + if self.table.engine != "MergeTree" { + return Err(BoxedError::from(UnsupportedOperation)); + } + let mut conditions = vec![]; + + for (index, field) in old.values.iter().enumerate() { + if *field != Field::Null { + let field_name = self.schema.fields.get(index).unwrap().name.clone(); + conditions.push(format!("{field_name} = ?")); + } + } + + let mut query = self.client.query(&format!( + "DELETE FROM {table_name} WHERE {condition}", + condition = conditions.join(" AND "), + table_name = self.sink_table_name + )); + + for field in old.values.iter() { + if *field != Field::Null { + query = query.bind(field); + } + } + + self.runtime.block_on(async { + query.execute().await.unwrap(); + + Ok::<(), BoxedError>(()) + })?; + } + Operation::Update { new, old } => { + let mut updates = vec![]; + + for (index, field) in new.values.iter().enumerate() { + if self.primary_key_fields_indexes.contains(&index) { + continue; + } + let schema_field = self.schema.fields.get(index).unwrap(); + let field_name = schema_field.name.clone(); + if *field == Field::Null { + updates.push(format!("{field_name} = NULL")); + } else { + updates.push(format!("{field_name} = ?")); + } + } + + let mut conditions = vec![]; + + for (index, field) in old.values.iter().enumerate() { + if !self.primary_key_fields_indexes.contains(&index) { + continue; + } + + if *field != Field::Null { + let field_name = self.schema.fields.get(index).unwrap().name.clone(); + conditions.push(format!("{field_name} = ?")); + } + } + + let mut query = self.client.query(&format!( + "ALTER TABLE {table_name} UPDATE {updates} WHERE {condition}", + condition = conditions.join(" AND "), + updates = updates.join(", "), + table_name = self.sink_table_name + )); + + for (index, field) in new.values.iter().enumerate() { + if *field == Field::Null || self.primary_key_fields_indexes.contains(&index) { + continue; + } + + query = query.bind(field); + } + + for (index, field) in old.values.iter().enumerate() { + if !self.primary_key_fields_indexes.contains(&index) { + continue; + } + + if *field != Field::Null { + query = query.bind(field); + } + } + + self.runtime.block_on(async { + query.execute().await.unwrap(); + + Ok::<(), BoxedError>(()) + })?; + } + Operation::BatchInsert { new } => { + for record in new { + for f in self.map_fields(record)? { + self.runtime.block_on(async { + self.inserter.write(&f)?; + Ok::<(), BoxedError>(()) + })?; + } + } + + self.commit_insert()?; + } + } + + Ok(()) + } + + fn persist(&mut self, _epoch: &Epoch, _queue: &Queue) -> Result<(), BoxedError> { + Ok(()) + } + + fn on_source_snapshotting_started( + &mut self, + _connection_name: String, + ) -> Result<(), BoxedError> { + Ok(()) + } + + fn on_source_snapshotting_done(&mut self, _connection_name: String) -> Result<(), BoxedError> { + Ok(()) + } +} diff --git a/dozer-sink-clickhouse/src/schema.rs b/dozer-sink-clickhouse/src/schema.rs new file mode 100644 index 0000000000..6852de8ddb --- /dev/null +++ b/dozer-sink-clickhouse/src/schema.rs @@ -0,0 +1,198 @@ +use crate::ClickhouseSinkError::SinkTableDoesNotExist; +use crate::{ddl, ClickhouseSinkError}; +use clickhouse::{Client, Row}; +use dozer_types::errors::internal::BoxedError; +use dozer_types::models::endpoint::ClickhouseSinkConfig; +use dozer_types::serde::{Deserialize, Serialize}; +use dozer_types::types::{FieldType, Schema}; + +#[derive(Debug, Row, Deserialize, Serialize)] +#[serde(crate = "dozer_types::serde")] +pub(crate) struct ClickhouseSchemaColumn { + pub(crate) name: String, + pub(crate) r#type: String, + pub(crate) default_type: String, + pub(crate) default_expression: String, + pub(crate) comment: String, + pub(crate) codec_expression: String, + pub(crate) ttl_expression: String, +} + +#[derive(Debug, Row, Deserialize, Serialize, Clone)] +#[serde(crate = "dozer_types::serde")] +pub(crate) struct ClickhouseTable { + pub(crate) database: String, + pub(crate) name: String, + pub(crate) engine: String, + pub(crate) engine_full: String, +} + +#[derive(Debug, Row, Deserialize, Serialize)] +#[serde(crate = "dozer_types::serde")] +pub(crate) struct ClickhouseKeyColumnDef { + pub(crate) column_name: Option, + pub(crate) constraint_name: Option, + pub(crate) constraint_schema: String, +} + +pub struct ClickhouseSchema {} + +impl ClickhouseSchema { + pub async fn get_clickhouse_table( + client: &Client, + config: &ClickhouseSinkConfig, + dozer_schema: &Schema, + ) -> Result { + match Self::fetch_sink_table_info(client, &config.sink_table_name).await { + Ok(table) => { + ClickhouseSchema::compare_with_dozer_schema( + client, + dozer_schema.clone(), + table.clone(), + ) + .await?; + Ok(table) + } + Err(ClickhouseSinkError::ClickhouseQueryError( + clickhouse::error::Error::RowNotFound, + )) => { + if config.create_table_options.is_none() { + Err(SinkTableDoesNotExist) + } else { + let create_table_query = ddl::ClickhouseDDL::get_create_table_query( + config.sink_table_name.clone(), + dozer_schema.clone(), + config.create_table_options.clone(), + config.primary_keys.clone(), + ); + + client.query(&create_table_query).execute().await?; + Self::fetch_sink_table_info(client, &config.sink_table_name).await + } + } + Err(e) => Err(e), + } + } + + pub async fn get_primary_keys( + client: &Client, + config: &ClickhouseSinkConfig, + ) -> Result, BoxedError> { + let existing_pk = + Self::fetch_primary_keys(client, &config.sink_table_name, &config.database).await?; + + if let Some(expected_pk) = &config.primary_keys { + if expected_pk.len() != existing_pk.len() { + return Err(ClickhouseSinkError::PrimaryKeyMismatch( + expected_pk.clone(), + existing_pk.clone(), + ) + .into()); + } + + for pk in expected_pk { + if !existing_pk.iter().any(|existing_pk| existing_pk == pk) { + return Err(ClickhouseSinkError::PrimaryKeyMismatch( + expected_pk.clone(), + existing_pk.clone(), + ) + .into()); + } + } + } + + Ok(existing_pk) + } + async fn compare_with_dozer_schema( + client: &Client, + schema: Schema, + table: ClickhouseTable, + ) -> Result<(), ClickhouseSinkError> { + let columns: Vec = client + .query(&format!( + "DESCRIBE TABLE {database}.{table_name}", + table_name = table.name, + database = table.database + )) + .fetch_all::() + .await?; + + for field in schema.fields { + let Some(column) = columns.iter().find(|column| column.name == field.name) else { + return Err(ClickhouseSinkError::ColumnNotFound(field.name)); + }; + + let mut expected_type = match field.typ { + FieldType::UInt => "UInt64", + FieldType::U128 => "Uint128", + FieldType::Int => "Int64", + FieldType::I128 => "Int128", + FieldType::Float => "Float64", + FieldType::Boolean => "Bool", + FieldType::String | FieldType::Text => "String", + FieldType::Binary => "UInt8", + FieldType::Decimal => "Decimal64", + FieldType::Timestamp => "DateTime64(9)", + FieldType::Date => "Date", + FieldType::Json => "Json", + FieldType::Point => "Point", + FieldType::Duration => { + return Err(ClickhouseSinkError::TypeNotSupported( + field.name, + "Duration".to_string(), + )) + } + } + .to_string(); + + if field.nullable { + expected_type = format!("Nullable({expected_type})"); + } + + if field.typ == FieldType::Binary { + expected_type = format!("Array({expected_type})"); + } + + let column_type = column.r#type.clone(); + if expected_type != column_type { + return Err(ClickhouseSinkError::ColumnTypeMismatch( + field.name, + expected_type.to_string(), + column_type.to_string(), + )); + } + } + + Ok(()) + } + + async fn fetch_sink_table_info( + client: &Client, + sink_table_name: &str, + ) -> Result { + Ok(client + .query("SELECT database, name, engine, engine_full FROM system.tables WHERE name = ?") + .bind(sink_table_name) + .fetch_one::() + .await?) + } + + async fn fetch_primary_keys( + client: &Client, + sink_table_name: &str, + schema: &str, + ) -> Result, ClickhouseSinkError> { + Ok(client + .query("SELECT ?fields FROM INFORMATION_SCHEMA.key_column_usage WHERE table_name = ? AND constraint_schema = ?") + .bind(sink_table_name) + .bind(schema) + .fetch_all::() + .await? + .iter() + .filter_map(|key_column_def| { + key_column_def.column_name.clone() + }) + .collect() + ) + } +} diff --git a/dozer-sink-clickhouse/src/tests.rs b/dozer-sink-clickhouse/src/tests.rs new file mode 100644 index 0000000000..723e043573 --- /dev/null +++ b/dozer-sink-clickhouse/src/tests.rs @@ -0,0 +1,93 @@ +use crate::schema::ClickhouseSchema; +use crate::ClickhouseSinkError; +use clickhouse::Client; +use dozer_log::tokio; +use dozer_types::models::endpoint::ClickhouseSinkConfig; +use dozer_types::types::{FieldDefinition, FieldType, Schema}; + +fn get_client() -> Client { + Client::default() + .with_url("http://localhost:8123") + .with_user("default") + .with_database("default") +} + +fn get_sink_config() -> ClickhouseSinkConfig { + ClickhouseSinkConfig { + sink_table_name: "sink_table".to_string(), + create_table_options: None, + primary_keys: Some(vec!["id".to_string()]), + user: "default".to_string(), + password: Some("default".to_string()), + database: "default".to_string(), + database_url: "http://localhost:8123".to_string(), + } +} + +fn get_dozer_schema() -> Schema { + Schema { + fields: vec![ + FieldDefinition { + name: "id".to_string(), + typ: FieldType::UInt, + nullable: false, + source: Default::default(), + }, + FieldDefinition { + name: "data".to_string(), + typ: FieldType::String, + nullable: false, + source: Default::default(), + }, + ], + primary_index: vec![0], + } +} + +async fn create_table(table_name: &str) { + let client = get_client(); + client + .query(&format!("DROP TABLE IF EXISTS {table_name}")) + .execute() + .await + .unwrap(); + + client + .query(&format!("CREATE TABLE {table_name}(id UInt64, data String, PRIMARY KEY id) ENGINE = MergeTree ORDER BY id")) + .execute() + .await + .unwrap(); +} + +#[tokio::test] +#[ignore] +async fn test_get_clickhouse_table() { + let client = get_client(); + let sink_config = get_sink_config(); + create_table(&sink_config.sink_table_name).await; + let schema = get_dozer_schema(); + + let clickhouse_table = ClickhouseSchema::get_clickhouse_table(&client, &sink_config, &schema) + .await + .unwrap(); + assert_eq!(clickhouse_table.name, sink_config.sink_table_name); +} + +#[tokio::test] +#[ignore] +async fn test_get_not_existing_clickhouse_table() { + let client = get_client(); + let mut sink_config = get_sink_config(); + create_table("not_existing").await; + let schema = get_dozer_schema(); + + sink_config.create_table_options = None; + + let clickhouse_table = + ClickhouseSchema::get_clickhouse_table(&client, &sink_config, &schema).await; + eprintln!("CT {:?}", clickhouse_table); + assert!(matches!( + clickhouse_table, + Err(ClickhouseSinkError::SinkTableDoesNotExist) + )); +} diff --git a/dozer-sql/src/tests/builder_test.rs b/dozer-sql/src/tests/builder_test.rs index bec44aa143..0f56e9ae43 100644 --- a/dozer-sql/src/tests/builder_test.rs +++ b/dozer-sql/src/tests/builder_test.rs @@ -154,12 +154,13 @@ impl TestSinkFactory { } } +#[async_trait] impl SinkFactory for TestSinkFactory { fn get_input_ports(&self) -> Vec { self.input_ports.clone() } - fn build( + async fn build( &self, _input_schemas: HashMap, ) -> Result, BoxedError> { diff --git a/dozer-tests/src/sql_tests/helper/pipeline.rs b/dozer-tests/src/sql_tests/helper/pipeline.rs index 29461f7dd2..cf28034124 100644 --- a/dozer-tests/src/sql_tests/helper/pipeline.rs +++ b/dozer-tests/src/sql_tests/helper/pipeline.rs @@ -174,6 +174,7 @@ impl TestSinkFactory { } } +#[async_trait] impl SinkFactory for TestSinkFactory { fn get_input_ports(&self) -> Vec { self.input_ports.clone() @@ -183,7 +184,7 @@ impl SinkFactory for TestSinkFactory { Ok(()) } - fn build( + async fn build( &self, _input_schemas: HashMap, ) -> Result, BoxedError> { diff --git a/dozer-types/src/models/endpoint.rs b/dozer-types/src/models/endpoint.rs index 0f4373292c..2ba4ddc97a 100644 --- a/dozer-types/src/models/endpoint.rs +++ b/dozer-types/src/models/endpoint.rs @@ -110,6 +110,7 @@ pub enum EndpointKind { Api(ApiEndpoint), Dummy, Aerospike(AerospikeSinkConfig), + Clickhouse(ClickhouseSinkConfig), } #[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq, Clone)] @@ -122,6 +123,27 @@ pub struct AerospikeSinkConfig { pub set_name: String, } +#[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq, Clone)] +pub struct ClickhouseSinkConfig { + pub database_url: String, + pub user: String, + #[serde(default)] + pub password: Option, + pub database: String, + pub sink_table_name: String, + pub primary_keys: Option>, + pub create_table_options: Option, +} + +#[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq, Clone)] +pub struct ClickhouseSinkTableOptions { + pub engine: Option, + pub partition_by: Option, + pub sample_by: Option, + pub order_by: Option>, + pub cluster: Option, +} + #[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq, Clone)] #[serde(deny_unknown_fields)] pub struct ApiEndpoint { diff --git a/json_schemas/dozer.json b/json_schemas/dozer.json index aa4ded69f6..9af199f8f6 100644 --- a/json_schemas/dozer.json +++ b/json_schemas/dozer.json @@ -406,6 +406,93 @@ }, "additionalProperties": false }, + "ClickhouseSinkConfig": { + "type": "object", + "required": [ + "database", + "database_url", + "sink_table_name", + "user" + ], + "properties": { + "create_table_options": { + "anyOf": [ + { + "$ref": "#/definitions/ClickhouseSinkTableOptions" + }, + { + "type": "null" + } + ] + }, + "database": { + "type": "string" + }, + "database_url": { + "type": "string" + }, + "password": { + "default": null, + "type": [ + "string", + "null" + ] + }, + "primary_keys": { + "type": [ + "array", + "null" + ], + "items": { + "type": "string" + } + }, + "sink_table_name": { + "type": "string" + }, + "user": { + "type": "string" + } + } + }, + "ClickhouseSinkTableOptions": { + "type": "object", + "properties": { + "cluster": { + "type": [ + "string", + "null" + ] + }, + "engine": { + "type": [ + "string", + "null" + ] + }, + "order_by": { + "type": [ + "array", + "null" + ], + "items": { + "type": "string" + } + }, + "partition_by": { + "type": [ + "string", + "null" + ] + }, + "sample_by": { + "type": [ + "string", + "null" + ] + } + } + }, "Cloud": { "type": "object", "properties": { @@ -887,6 +974,18 @@ } }, "additionalProperties": false + }, + { + "type": "object", + "required": [ + "Clickhouse" + ], + "properties": { + "Clickhouse": { + "$ref": "#/definitions/ClickhouseSinkConfig" + } + }, + "additionalProperties": false } ] },