From b2c3d0e0a6989df8d48989a732fbdc5ef0598c4f Mon Sep 17 00:00:00 2001 From: VG Date: Sat, 9 Mar 2024 05:15:57 +0800 Subject: [PATCH] feat: Refactor clickhouse sink to work with pulse (#2448) * feat: refactor type mapping * chore: use sink config * chore: fix clickhouse sink * feat: implement batching * chore: fix decimal * chore: use a common format * chore: fix table creation * chore: fix decimal * chore: fix clippy * chore: update client * chore: update pub crate * remove println Signed-off-by: VG * feat: implement clikchouse sink checkpointing --------- Signed-off-by: VG --- Cargo.lock | 254 ++++++-------- Cargo.toml | 1 - dozer-cli/src/main.rs | 17 +- dozer-sink-clickhouse/Cargo.toml | 5 +- dozer-sink-clickhouse/src/client.rs | 124 +++++++ dozer-sink-clickhouse/src/ddl.rs | 146 +++----- dozer-sink-clickhouse/src/errors.rs | 40 +++ dozer-sink-clickhouse/src/lib.rs | 473 +------------------------- dozer-sink-clickhouse/src/metadata.rs | 47 +++ dozer-sink-clickhouse/src/schema.rs | 269 +++++++-------- dozer-sink-clickhouse/src/sink.rs | 322 ++++++++++++++++++ dozer-sink-clickhouse/src/tests.rs | 50 +-- dozer-sink-clickhouse/src/types.rs | 327 ++++++++++++++++++ dozer-tracing/src/telemetry.rs | 3 +- dozer-types/src/json_types.rs | 2 +- dozer-types/src/models/sink.rs | 35 +- json_schemas/dozer.json | 40 ++- 17 files changed, 1225 insertions(+), 930 deletions(-) create mode 100644 dozer-sink-clickhouse/src/client.rs create mode 100644 dozer-sink-clickhouse/src/errors.rs create mode 100644 dozer-sink-clickhouse/src/metadata.rs create mode 100644 dozer-sink-clickhouse/src/sink.rs create mode 100644 dozer-sink-clickhouse/src/types.rs diff --git a/Cargo.lock b/Cargo.lock index 835e0127c6..34b4fd1d70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -98,7 +98,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" dependencies = [ "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -211,7 +211,7 @@ dependencies = [ "actix-router", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -757,7 +757,7 @@ dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", "swc_macros_common", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -787,7 +787,7 @@ checksum = "5fd55a5ba1179988837d24ab4c7cc8ed6efdeff578ede0416b4225a5fca35bd0" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -809,7 +809,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -820,7 +820,7 @@ checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -1020,7 +1020,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.39", + "syn 2.0.52", "which", ] @@ -1041,7 +1041,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -1171,7 +1171,7 @@ dependencies = [ "proc-macro-crate 2.0.0", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", "syn_derive", ] @@ -1217,15 +1217,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "bstr" -version = "1.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "542f33a8835a0884b006a0c3df3dadd99c0c3f296ed26c2fdc8028e01ad6230c" -dependencies = [ - "memchr", -] - [[package]] name = "bumpalo" version = "3.14.0" @@ -1390,9 +1381,9 @@ dependencies = [ [[package]] name = "chrono-tz" -version = "0.8.4" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e23185c0e21df6ed832a12e2bda87c7d1def6842881fb634a8511ced741b0d76" +checksum = "d59ae0466b83e838b81a54256c39d5d7c20b9d7daa10510a242d9b75abd5936e" dependencies = [ "chrono", "chrono-tz-build", @@ -1501,7 +1492,7 @@ dependencies = [ "heck", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -1520,43 +1511,37 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1" [[package]] -name = "clickhouse" -version = "0.11.6" -source = "git+https://github.com/getdozer/clickhouse.rs.git#4c7d6f03caa933aeda5af7bff31536fccb97fba1" +name = "clickhouse-rs" +version = "1.1.0-alpha.1" +source = "git+https://github.com/suharev7/clickhouse-rs#afd8ce54e3882fec943fac42ab8aa4425e2f40e9" dependencies = [ - "bstr", - "bytes", - "clickhouse-derive", + "byteorder", + "cfg-if", + "chrono", + "chrono-tz", "clickhouse-rs-cityhash-sys", - "futures", - "hyper 0.14.27", - "hyper-tls", + "combine", + "crossbeam", + "either", + "futures-core", + "futures-sink", + "futures-util", + "hostname", + "lazy_static", + "log", "lz4", - "quanta 0.12.2", - "sealed", - "serde", - "static_assertions", + "percent-encoding", + "pin-project", "thiserror", "tokio", "url", -] - -[[package]] -name = "clickhouse-derive" -version = "0.1.1" -source = "git+https://github.com/getdozer/clickhouse.rs.git#4c7d6f03caa933aeda5af7bff31536fccb97fba1" -dependencies = [ - "proc-macro2 1.0.78", - "quote 1.0.35", - "serde_derive_internals 0.29.0", - "syn 2.0.39", + "uuid", ] [[package]] name = "clickhouse-rs-cityhash-sys" version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4baf9d4700a28d6cb600e17ed6ae2b43298a5245f1f76b4eab63027ebfd592b9" +source = "git+https://github.com/suharev7/clickhouse-rs#afd8ce54e3882fec943fac42ab8aa4425e2f40e9" dependencies = [ "cc", ] @@ -1974,7 +1959,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -2031,7 +2016,7 @@ dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", "strsim", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -2053,7 +2038,7 @@ checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" dependencies = [ "darling_core 0.20.3", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -2376,7 +2361,7 @@ checksum = "3c65c2ffdafc1564565200967edc4851c7b55422d3913466688907efd05ea26f" dependencies = [ "deno-proc-macro-rules-macros", "proc-macro2 1.0.78", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -2388,7 +2373,7 @@ dependencies = [ "once_cell", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -2800,7 +2785,7 @@ dependencies = [ "regex", "strum 0.25.0", "strum_macros 0.25.3", - "syn 2.0.39", + "syn 2.0.52", "thiserror", ] @@ -3028,7 +3013,7 @@ checksum = "67e77553c4162a157adbf834ebae5b415acbecbeafc7a74b0e886657506a7611" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -3103,7 +3088,7 @@ checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -3413,9 +3398,12 @@ dependencies = [ name = "dozer-sink-clickhouse" version = "0.1.0" dependencies = [ - "clickhouse", + "chrono-tz", + "clickhouse-rs", "dozer-core", "dozer-types", + "either", + "serde", ] [[package]] @@ -3680,9 +3668,9 @@ dependencies = [ [[package]] name = "either" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" +checksum = "11157ac094ffbdde99aa67b23417ebdd801842852b500e395a45a9c0aac03e4a" [[package]] name = "elliptic-curve" @@ -3788,7 +3776,7 @@ dependencies = [ "num-traits", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -3800,7 +3788,7 @@ dependencies = [ "once_cell", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -4142,7 +4130,7 @@ dependencies = [ "pmutil", "proc-macro2 1.0.78", "swc_macros_common", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -4170,7 +4158,7 @@ checksum = "b0fa992f1656e1707946bbba340ad244f0814009ef8c0118eb7b658395f19a2e" dependencies = [ "frunk_proc_macro_helpers", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -4182,7 +4170,7 @@ dependencies = [ "frunk_core", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -4194,7 +4182,7 @@ dependencies = [ "frunk_core", "frunk_proc_macro_helpers", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -4310,7 +4298,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -5147,7 +5135,7 @@ dependencies = [ "pmutil", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -5315,7 +5303,7 @@ dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", "regex", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -5575,6 +5563,9 @@ name = "log" version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +dependencies = [ + "serde", +] [[package]] name = "logos" @@ -5596,7 +5587,7 @@ dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", "regex-syntax 0.6.29", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -5802,7 +5793,7 @@ dependencies = [ "ipnet", "metrics", "metrics-util", - "quanta 0.11.1", + "quanta", "thiserror", "tokio", "tracing", @@ -5816,7 +5807,7 @@ checksum = "ddece26afd34c31585c74a4db0630c376df271c285d682d1e55012197830b6df" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -5830,7 +5821,7 @@ dependencies = [ "hashbrown 0.13.1", "metrics", "num_cpus", - "quanta 0.11.1", + "quanta", "sketches-ddsketch", ] @@ -5952,7 +5943,7 @@ dependencies = [ "proc-macro-error 1.0.4", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", "termcolor", "thiserror", ] @@ -6446,7 +6437,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -6924,7 +6915,7 @@ dependencies = [ "pest_meta", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -7058,7 +7049,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -7153,7 +7144,7 @@ checksum = "52a40bc70c2c58040d2d8b167ba9a5ff59fc9dab7ad44771cfde3dcfde7a09c6" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -7254,7 +7245,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae005bd773ab59b4725093fd7df83fd7892f7d8eafb48dbd7de6e024e4215f9d" dependencies = [ "proc-macro2 1.0.78", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -7487,7 +7478,7 @@ dependencies = [ "prost 0.12.3", "prost-types 0.12.3", "regex", - "syn 2.0.39", + "syn 2.0.52", "tempfile", "which", ] @@ -7515,7 +7506,7 @@ dependencies = [ "itertools 0.11.0", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -7672,22 +7663,7 @@ dependencies = [ "libc", "mach2", "once_cell", - "raw-cpuid 10.7.0", - "wasi 0.11.0+wasi-snapshot-preview1", - "web-sys", - "winapi", -] - -[[package]] -name = "quanta" -version = "0.12.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ca0b7bac0b97248c40bb77288fc52029cf1459c0461ea1b05ee32ccf011de2c" -dependencies = [ - "crossbeam-utils", - "libc", - "once_cell", - "raw-cpuid 11.0.1", + "raw-cpuid", "wasi 0.11.0+wasi-snapshot-preview1", "web-sys", "winapi", @@ -7836,15 +7812,6 @@ dependencies = [ "bitflags 1.3.2", ] -[[package]] -name = "raw-cpuid" -version = "11.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d86a7c4638d42c44551f4791a20e687dbb4c3de1f33c43dd71e355cd429def1" -dependencies = [ - "bitflags 2.4.1", -] - [[package]] name = "raw-window-handle" version = "0.5.2" @@ -8448,7 +8415,7 @@ checksum = "5a32af5427251d2e4be14fc151eabe18abb4a7aad5efee7044da9f096c906a43" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -8525,7 +8492,7 @@ checksum = "c767fd6fa65d9ccf9cf026122c1b555f2ef9a4f0cea69da4d7dbc3e258d30967" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "serde_derive_internals 0.26.0", + "serde_derive_internals", "syn 1.0.109", ] @@ -8575,18 +8542,6 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" -[[package]] -name = "sealed" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4a8caec23b7800fb97971a1c6ae365b6239aaeddfb934d6265f8505e795699d" -dependencies = [ - "heck", - "proc-macro2 1.0.78", - "quote 1.0.35", - "syn 2.0.39", -] - [[package]] name = "sec1" version = "0.3.0" @@ -8704,9 +8659,9 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.193" +version = "1.0.197" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89" +checksum = "3fb1c873e1b9b056a4dc4c0c198b24c3ffa059243875552b2bd0933b1aee4ce2" dependencies = [ "serde_derive", ] @@ -8732,13 +8687,13 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.193" +version = "1.0.197" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" +checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -8752,17 +8707,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "serde_derive_internals" -version = "0.29.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "330f01ce65a3a5fe59a60c82f3c9a024b573b8a6e875bd233fe5f934e71d54e3" -dependencies = [ - "proc-macro2 1.0.78", - "quote 1.0.35", - "syn 2.0.39", -] - [[package]] name = "serde_json" version = "1.0.108" @@ -8885,7 +8829,7 @@ checksum = "91d129178576168c589c9ec973feedf7d3126c01ac2bf08795109aa35b69fb8f" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -9300,7 +9244,7 @@ dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", "swc_macros_common", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -9358,7 +9302,7 @@ dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", "rustversion", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -9450,7 +9394,7 @@ dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", "swc_macros_common", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -9499,7 +9443,7 @@ dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", "swc_macros_common", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -9582,7 +9526,7 @@ dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", "swc_macros_common", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -9686,7 +9630,7 @@ dependencies = [ "pmutil", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -9698,7 +9642,7 @@ dependencies = [ "pmutil", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -9722,7 +9666,7 @@ dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", "swc_macros_common", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -9749,9 +9693,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.39" +version = "2.0.52" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23e78b90f2fcf45d3e842032ce32e3f2d1545ba6636271dcbf24fa306d87be7a" +checksum = "b699d15b36d1f02c3e7c69f8ffef53de37aefae075d8488d4ba1a7788d574a07" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", @@ -9778,7 +9722,7 @@ dependencies = [ "proc-macro-error 1.0.4", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -9924,7 +9868,7 @@ checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -10069,7 +10013,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -10301,7 +10245,7 @@ dependencies = [ "proc-macro2 1.0.78", "prost-build 0.12.3", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -10419,7 +10363,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -11007,7 +10951,7 @@ dependencies = [ "once_cell", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", "wasm-bindgen-shared", ] @@ -11041,7 +10985,7 @@ checksum = "c5353b8dab669f5e10f5bd76df26a9360c748f054f862ff5f3f8aae0c7fb3907" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -11500,7 +11444,7 @@ checksum = "855e0f6af9cd72b87d8a6c586f3cb583f5cdcc62c2c80869d8cd7e96fdf7ee20" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -11511,7 +11455,7 @@ checksum = "dd7e48ccf166952882ca8bd778a43502c64f33bf94c12ebe2a7f08e5a0f6689f" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -11531,7 +11475,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index fac522e8f5..4805745e66 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,6 @@ resolver = "2" [workspace.dependencies] bincode = { version = "2.0.0-rc.3", features = ["derive"] } datafusion = { version = "33.0.0" } -datafusion-expr = { version = "33.0.0" } [patch.crates-io] postgres = { git = "https://github.com/getdozer/rust-postgres" } diff --git a/dozer-cli/src/main.rs b/dozer-cli/src/main.rs index 865d28eded..bdd917f6cd 100644 --- a/dozer-cli/src/main.rs +++ b/dozer-cli/src/main.rs @@ -9,7 +9,7 @@ use dozer_cli::{set_ctrl_handler, set_panic_hook}; use dozer_core::shutdown; use dozer_tracing::LabelsAndProgress; use dozer_types::models::config::Config; -use dozer_types::models::telemetry::{TelemetryConfig, TelemetryMetricsConfig}; +use dozer_types::models::telemetry::TelemetryConfig; use dozer_types::tracing::{error, error_span, info}; use futures::TryFutureExt; use std::process; @@ -45,17 +45,10 @@ fn run() -> Result<(), OrchestrationError> { .map(|(c, _)| c.cloud.app_id.as_deref().unwrap_or(&c.app_name)) .ok(); - let telemetry_config = if matches!(cli.cmd, Commands::Run) { - TelemetryConfig { - trace: None, - metrics: Some(TelemetryMetricsConfig::Prometheus), - } - } else { - config_res - .as_ref() - .map(|(c, _)| c.telemetry.clone()) - .unwrap_or_default() - }; + let telemetry_config = config_res + .as_ref() + .map(|(c, _)| c.telemetry.clone()) + .unwrap_or_default(); let _telemetry = runtime.block_on(async { Telemetry::new(app_id, &telemetry_config) }); diff --git a/dozer-sink-clickhouse/Cargo.toml b/dozer-sink-clickhouse/Cargo.toml index ce1851ad8e..4f26da67a2 100644 --- a/dozer-sink-clickhouse/Cargo.toml +++ b/dozer-sink-clickhouse/Cargo.toml @@ -8,4 +8,7 @@ edition = "2021" [dependencies] dozer-core = { path = "../dozer-core" } dozer-types = { path = "../dozer-types" } -clickhouse = { git = "https://github.com/getdozer/clickhouse.rs.git" } +clickhouse-rs = { git = "https://github.com/suharev7/clickhouse-rs" } +either = "1.10.0" +chrono-tz = "0.8.6" +serde = "1.0.197" diff --git a/dozer-sink-clickhouse/src/client.rs b/dozer-sink-clickhouse/src/client.rs new file mode 100644 index 0000000000..b986ea0d49 --- /dev/null +++ b/dozer-sink-clickhouse/src/client.rs @@ -0,0 +1,124 @@ +#![allow(dead_code)] +use super::ddl::get_create_table_query; +use super::types::ValueWrapper; +use crate::errors::QueryError; +use crate::types::{insert_multi, map_value_wrapper_to_field}; +use clickhouse_rs::{ClientHandle, Pool}; +use dozer_types::log::{debug, info}; +use dozer_types::models::sink::{ClickhouseSinkConfig, ClickhouseTableOptions}; +use dozer_types::types::{Field, FieldDefinition}; +pub struct SqlResult { + pub rows: Vec>, +} + +#[derive(Clone)] +pub struct ClickhouseClient { + pool: Pool, +} + +impl ClickhouseClient { + pub fn new(config: ClickhouseSinkConfig) -> Self { + let url = Self::construct_url(&config); + let pool = Pool::new(url); + Self { pool } + } + + pub fn construct_url(config: &ClickhouseSinkConfig) -> String { + let user_password = match &config.password { + Some(password) => format!("{}:{}", config.user, password), + None => config.user.to_string(), + }; + + let url = format!( + "{}://{}@{}:{}/{}", + config.scheme, user_password, config.host, config.port, config.database + ); + debug!("{url}"); + url + } + + pub async fn get_client_handle(&self) -> Result { + let client = self.pool.get_handle().await?; + Ok(client) + } + + pub async fn drop_table(&self, datasource_name: &str) -> Result<(), QueryError> { + let mut client = self.pool.get_handle().await?; + let ddl = format!("DROP TABLE IF EXISTS {}", datasource_name); + info!("#{ddl}"); + client.execute(ddl).await?; + Ok(()) + } + + pub async fn create_table( + &self, + datasource_name: &str, + fields: &[FieldDefinition], + table_options: Option, + ) -> Result<(), QueryError> { + let mut client = self.pool.get_handle().await?; + let ddl = get_create_table_query(datasource_name, fields, table_options); + info!("Creating Clickhouse Table"); + info!("{ddl}"); + client.execute(ddl).await?; + Ok(()) + } + + pub async fn fetch_all( + &self, + query: &str, + schema: Vec, + query_id: Option, + ) -> Result { + let mut client = self.pool.get_handle().await?; + // TODO: query_id doesnt work + // https://github.com/suharev7/clickhouse-rs/issues/176 + // let query = Query::new(sql).id(query_id.to_string()) + let query = query_id.map_or(query.to_string(), |id| { + format!("{0} settings log_comment = '{1}'", query, id) + }); + + let block = client.query(&query).fetch_all().await?; + + let mut rows: Vec> = vec![]; + for row in block.rows() { + let mut row_data = vec![]; + for (idx, field) in schema.clone().into_iter().enumerate() { + let v: ValueWrapper = row.get(idx)?; + row_data.push(map_value_wrapper_to_field(v, field)?); + } + rows.push(row_data); + } + + Ok(SqlResult { rows }) + } + + pub async fn check_table(&self, table_name: &str) -> Result { + let mut client = self.pool.get_handle().await?; + let query = format!("CHECK TABLE {}", table_name); + client.query(query).fetch_all().await?; + + // if error not found, table exists + Ok(true) + } + + pub async fn insert( + &self, + table_name: &str, + fields: &[FieldDefinition], + values: &[Field], + ) -> Result<(), QueryError> { + let client = self.pool.get_handle().await?; + insert_multi(client, table_name, fields, &[values.to_vec()]).await + } + + pub async fn insert_multi( + &self, + table_name: &str, + fields: &[FieldDefinition], + values: &[Vec], + ) -> Result<(), QueryError> { + let client = self.pool.get_handle().await?; + insert_multi(client, table_name, fields, values).await + } +} diff --git a/dozer-sink-clickhouse/src/ddl.rs b/dozer-sink-clickhouse/src/ddl.rs index 94fbbddd05..0610b1e41a 100644 --- a/dozer-sink-clickhouse/src/ddl.rs +++ b/dozer-sink-clickhouse/src/ddl.rs @@ -1,70 +1,66 @@ -use dozer_types::log::warn; -use dozer_types::models::sink::ClickhouseSinkTableOptions; -use dozer_types::types::{FieldDefinition, FieldType, Schema}; +use dozer_types::models::sink::ClickhouseTableOptions; +use dozer_types::types::FieldDefinition; -pub struct ClickhouseDDL {} +use crate::schema::map_field_to_type; const DEFAULT_TABLE_ENGINE: &str = "MergeTree()"; -impl ClickhouseDDL { - pub fn get_create_table_query( - table_name: String, - schema: Schema, - sink_options: Option, - primary_keys: Option>, - ) -> String { - let mut parts = schema - .fields - .iter() - .map(|field| { - let typ = Self::map_field_to_type(field); - format!("{} {}", field.name, typ) - }) - .collect::>(); +pub fn get_create_table_query( + table_name: &str, + fields: &[FieldDefinition], + table_options: Option, +) -> String { + let mut parts = fields + .iter() + .map(|field| { + let typ = map_field_to_type(field); + format!("{} {}", field.name, typ) + }) + .collect::>(); - let engine = sink_options - .as_ref() - .map(|options| { - options - .engine - .clone() - .unwrap_or_else(|| DEFAULT_TABLE_ENGINE.to_string()) - }) - .unwrap_or_else(|| DEFAULT_TABLE_ENGINE.to_string()); + let engine = table_options + .as_ref() + .and_then(|c| c.engine.clone()) + .unwrap_or_else(|| DEFAULT_TABLE_ENGINE.to_string()); - if let Some(pk) = primary_keys { - parts.push(format!("PRIMARY KEY ({})", pk.join(", "))); - } + parts.push( + table_options + .as_ref() + .and_then(|options| options.primary_keys.clone()) + .map_or("".to_string(), |pk| { + format!("PRIMARY KEY ({})", pk.join(", ")) + }), + ); - let query = parts.join(",\n"); + let query = parts.join(",\n"); - let partition_by = sink_options - .as_ref() - .and_then(|options| options.partition_by.clone()) - .map_or("".to_string(), |partition_by| { - format!("PARTITION BY {}\n", partition_by) - }); - let sample_by = sink_options - .as_ref() - .and_then(|options| options.sample_by.clone()) - .map_or("".to_string(), |partition_by| { - format!("SAMPLE BY {}\n", partition_by) - }); - let order_by = sink_options - .as_ref() - .and_then(|options| options.order_by.clone()) - .map_or("".to_string(), |order_by| { - format!("ORDER BY ({})\n", order_by.join(", ")) - }); - let cluster = sink_options - .as_ref() - .and_then(|options| options.cluster.clone()) - .map_or("".to_string(), |cluster| { - format!("ON CLUSTER {}\n", cluster) - }); + let partition_by = table_options + .as_ref() + .and_then(|options| options.partition_by.clone()) + .map_or("".to_string(), |partition_by| { + format!("PARTITION BY {}\n", partition_by) + }); + let sample_by = table_options + .as_ref() + .and_then(|options| options.sample_by.clone()) + .map_or("".to_string(), |partition_by| { + format!("SAMPLE BY {}\n", partition_by) + }); + let order_by = table_options + .as_ref() + .and_then(|options| options.order_by.clone()) + .map_or("".to_string(), |order_by| { + format!("ORDER BY ({})\n", order_by.join(", ")) + }); + let cluster = table_options + .as_ref() + .and_then(|options| options.cluster.clone()) + .map_or("".to_string(), |cluster| { + format!("ON CLUSTER {}\n", cluster) + }); - format!( - "CREATE TABLE IF NOT EXISTS {table_name} {cluster} ( + format!( + "CREATE TABLE IF NOT EXISTS {table_name} {cluster} ( {query} ) ENGINE = {engine} @@ -72,37 +68,5 @@ impl ClickhouseDDL { {partition_by} {sample_by} ", - ) - } - - pub fn map_field_to_type(field: &FieldDefinition) -> String { - let typ = match field.typ { - FieldType::UInt => "UInt64", - FieldType::U128 => "UInt128", - FieldType::Int => "Int64", - FieldType::I128 => "Int128", - FieldType::Float => "Float64", - FieldType::Boolean => "Boolean", - FieldType::String => "String", - FieldType::Text => "String", - FieldType::Binary => "Array(UInt8)", - FieldType::Decimal => "Decimal", - FieldType::Timestamp => "DateTime64(3)", - FieldType::Date => "Date", - FieldType::Json => "JSON", - FieldType::Point => "Point", - FieldType::Duration => unimplemented!(), - }; - - if field.nullable { - if field.typ != FieldType::Binary { - format!("Nullable({})", typ) - } else { - warn!("Binary field cannot be nullable, ignoring nullable flag"); - typ.to_string() - } - } else { - typ.to_string() - } - } + ) } diff --git a/dozer-sink-clickhouse/src/errors.rs b/dozer-sink-clickhouse/src/errors.rs new file mode 100644 index 0000000000..d3fc752c5c --- /dev/null +++ b/dozer-sink-clickhouse/src/errors.rs @@ -0,0 +1,40 @@ +use dozer_types::thiserror::{self, Error}; + +#[derive(Error, Debug)] +pub enum ClickhouseSinkError { + #[error("Only MergeTree engine is supported for delete operation")] + UnsupportedOperation, + + #[error("Column {0} not found in sink table")] + ColumnNotFound(String), + + #[error("Column {0} has type {1} in dozer schema but type {2} in sink table")] + ColumnTypeMismatch(String, String, String), + + #[error("Clickhouse error: {0:?}")] + ClickhouseError(#[from] clickhouse_rs::errors::Error), + + #[error("Primary key not found")] + PrimaryKeyNotFound, + + #[error("Sink table does not exist and create_table_options is not set")] + SinkTableDoesNotExist, + + #[error("Expected primary key {0:?} but got {1:?}")] + PrimaryKeyMismatch(Vec, Vec), + + #[error("QueryError: {0:?}")] + QueryError(#[from] QueryError), +} + +#[derive(Error, Debug)] +pub enum QueryError { + #[error("Clickhouse error: {0:?}")] + DataFetchError(#[from] clickhouse_rs::errors::Error), + + #[error("Unexpected field type for {0:?}, expected {0}")] + TypeMismatch(String, String), + + #[error("{0:?}")] + CustomError(String), +} diff --git a/dozer-sink-clickhouse/src/lib.rs b/dozer-sink-clickhouse/src/lib.rs index b0c9051b3b..426b54ba6c 100644 --- a/dozer-sink-clickhouse/src/lib.rs +++ b/dozer-sink-clickhouse/src/lib.rs @@ -1,467 +1,10 @@ -mod ddl; -mod schema; +pub mod client; +pub mod ddl; +pub mod errors; +pub mod schema; +mod sink; +pub use sink::ClickhouseSinkFactory; +pub mod metadata; #[cfg(test)] mod tests; - -use crate::ClickhouseSinkError::{ - PrimaryKeyNotFound, SchemaFieldNotFoundByIndex, UnsupportedOperation, -}; -use clickhouse::inserter::Inserter; -use clickhouse::Client; -use dozer_core::epoch::Epoch; -use dozer_core::event::EventHub; -use dozer_core::node::{PortHandle, Sink, SinkFactory}; -use dozer_core::tokio::runtime::Runtime; -use dozer_core::DEFAULT_PORT_HANDLE; -use dozer_types::errors::internal::BoxedError; -use dozer_types::log::debug; -use dozer_types::models::sink::ClickhouseSinkConfig; -use dozer_types::node::OpIdentifier; -use dozer_types::serde::Serialize; -use dozer_types::tonic::async_trait; -use dozer_types::types::{ - DozerDuration, DozerPoint, Field, FieldType, Operation, Record, Schema, TableOperation, -}; -use std::collections::HashMap; -use std::fmt::Debug; -use std::sync::Arc; - -use crate::schema::{ClickhouseSchema, ClickhouseTable}; -use dozer_types::chrono::{DateTime, FixedOffset, NaiveDate}; -use dozer_types::json_types::JsonValue; -use dozer_types::ordered_float::OrderedFloat; -use dozer_types::rust_decimal::Decimal; -use dozer_types::serde_bytes; -use dozer_types::thiserror::{self, Error}; - -pub const BATCH_SIZE: usize = 100; - -#[derive(Error, Debug)] -enum ClickhouseSinkError { - #[error("Only MergeTree engine is supported for delete operation")] - UnsupportedOperation, - - #[error("Column {0} not found in sink table")] - ColumnNotFound(String), - - #[error("Column {0} has type {1} in dozer schema but type {2} in sink table")] - ColumnTypeMismatch(String, String, String), - - #[error("Clickhouse query error: {0}")] - ClickhouseQueryError(#[from] clickhouse::error::Error), - - #[error("Primary key not found")] - PrimaryKeyNotFound, - - #[error("Type {1} is not supported for column {0}")] - TypeNotSupported(String, String), - - #[error("Sink table does not exist and create_table_options is not set")] - SinkTableDoesNotExist, - - #[error("Expected primary key {0:?} but got {1:?}")] - PrimaryKeyMismatch(Vec, Vec), - - #[error("Schema field not found by index {0}")] - SchemaFieldNotFoundByIndex(usize), -} - -#[derive(Debug)] -pub struct ClickhouseSinkFactory { - runtime: Arc, - config: ClickhouseSinkConfig, -} - -#[derive(Debug, Serialize)] -#[serde(crate = "dozer_types::serde", untagged)] -pub enum FieldWrapper { - UInt(u64), - U128(u128), - Int(i64), - I128(i128), - Float(#[cfg_attr(feature= "arbitrary", arbitrary(with = arbitrary_float))] OrderedFloat), - Boolean(bool), - String(String), - Text(String), - Binary(#[serde(with = "serde_bytes")] Vec), - Decimal(Decimal), - Timestamp(DateTime), - Date(NaiveDate), - Json(#[cfg_attr(feature= "arbitrary", arbitrary(with = arb_json::arbitrary_json))] JsonValue), - Point(DozerPoint), - Duration(DozerDuration), - OptionalUInt(Option), - OptionalU128(Option), - OptionalInt(Option), - OptionalI128(Option), - OptionalFloat( - #[cfg_attr(feature= "arbitrary", arbitrary(with = arbitrary_float))] - Option>, - ), - OptionalBoolean(Option), - OptionalString(Option), - OptionalText(Option), - OptionalDecimal(Option), - OptionalTimestamp(Option>), - OptionalDate(Option), - OptionalJson( - #[cfg_attr(feature= "arbitrary", arbitrary(with = arb_json::arbitrary_json))] - Option, - ), - OptionalPoint(Option), - OptionalDuration(Option), - Null(Option<()>), -} - -fn convert_field_to_ff(field: Field, nullable: bool) -> FieldWrapper { - if nullable { - match field { - Field::UInt(v) => FieldWrapper::OptionalUInt(Some(v)), - Field::U128(v) => FieldWrapper::OptionalU128(Some(v)), - Field::Int(v) => FieldWrapper::OptionalInt(Some(v)), - Field::I128(v) => FieldWrapper::OptionalI128(Some(v)), - Field::Float(v) => FieldWrapper::OptionalFloat(Some(v)), - Field::Boolean(v) => FieldWrapper::OptionalBoolean(Some(v)), - Field::String(v) => FieldWrapper::OptionalString(Some(v)), - Field::Text(v) => FieldWrapper::OptionalText(Some(v)), - Field::Binary(v) => FieldWrapper::Binary(v), - Field::Decimal(v) => FieldWrapper::OptionalDecimal(Some(v)), - Field::Timestamp(v) => FieldWrapper::OptionalTimestamp(Some(v)), - Field::Date(v) => FieldWrapper::OptionalDate(Some(v)), - Field::Json(v) => FieldWrapper::OptionalJson(Some(v)), - Field::Point(v) => FieldWrapper::OptionalPoint(Some(v)), - Field::Duration(v) => FieldWrapper::OptionalDuration(Some(v)), - Field::Null => FieldWrapper::Null(None), - } - } else { - match field { - Field::UInt(v) => FieldWrapper::UInt(v), - Field::U128(v) => FieldWrapper::U128(v), - Field::Int(v) => FieldWrapper::Int(v), - Field::I128(v) => FieldWrapper::I128(v), - Field::Float(v) => FieldWrapper::Float(v), - Field::Boolean(v) => FieldWrapper::Boolean(v), - Field::String(v) => FieldWrapper::String(v), - Field::Text(v) => FieldWrapper::Text(v), - Field::Binary(v) => FieldWrapper::Binary(v), - Field::Decimal(v) => FieldWrapper::Decimal(v), - Field::Timestamp(v) => FieldWrapper::Timestamp(v), - Field::Date(v) => FieldWrapper::Date(v), - Field::Json(v) => FieldWrapper::Json(v), - Field::Point(v) => FieldWrapper::Point(v), - Field::Duration(v) => FieldWrapper::Duration(v), - Field::Null => FieldWrapper::Null(None), - } - } -} -impl ClickhouseSinkFactory { - pub fn new(config: ClickhouseSinkConfig, runtime: Arc) -> Self { - Self { config, runtime } - } -} - -#[async_trait] -impl SinkFactory for ClickhouseSinkFactory { - fn type_name(&self) -> String { - "clickhouse".to_string() - } - - fn get_input_ports(&self) -> Vec { - vec![DEFAULT_PORT_HANDLE] - } - - fn get_input_port_name(&self, _port: &PortHandle) -> String { - self.config.source_table_name.clone() - } - - fn prepare(&self, input_schemas: HashMap) -> Result<(), BoxedError> { - debug_assert!(input_schemas.len() == 1); - Ok(()) - } - - async fn build( - &self, - mut input_schemas: HashMap, - _event_hub: EventHub, - ) -> Result, BoxedError> { - let schema = input_schemas.remove(&DEFAULT_PORT_HANDLE).unwrap(); - let mut client = Client::default() - .with_url(self.config.database_url.clone()) - .with_user(self.config.user.clone()) - .with_database(self.config.database.clone()); - - if let Some(password) = self.config.password.clone() { - client = client.with_password(password); - } - - let table = ClickhouseSchema::get_clickhouse_table(&client, &self.config, &schema).await?; - let primary_key_field_names = - ClickhouseSchema::get_primary_keys(&client, &self.config).await?; - - let primary_key_fields_indexes: Result, ClickhouseSinkError> = - primary_key_field_names - .iter() - .map(|primary_key| { - schema - .fields - .iter() - .position(|field| field.name == *primary_key) - .ok_or(PrimaryKeyNotFound) - }) - .collect(); - - let sink = ClickhouseSink::new( - client, - self.config.clone(), - schema, - self.runtime.clone(), - table, - primary_key_fields_indexes?, - ); - - Ok(Box::new(sink)) - } -} - -pub(crate) struct ClickhouseSink { - pub(crate) client: Client, - pub(crate) runtime: Arc, - pub(crate) schema: Schema, - pub(crate) inserter: Inserter, - pub(crate) sink_table_name: String, - pub(crate) table: ClickhouseTable, - pub(crate) primary_key_fields_indexes: Vec, -} - -impl Debug for ClickhouseSink { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ClickhouseSink") - .field("sink_table_name", &self.sink_table_name) - .field( - "primary_key_fields_indexes", - &self.primary_key_fields_indexes, - ) - .field("table", &self.table) - .field("schema", &self.schema) - .finish() - } -} - -impl ClickhouseSink { - pub fn new( - client: Client, - config: ClickhouseSinkConfig, - schema: Schema, - runtime: Arc, - table: ClickhouseTable, - primary_key_fields_indexes: Vec, - ) -> Self { - let fields_list = schema - .fields - .iter() - .map(|field| field.name.as_str()) - .collect::>(); - - let inserter = client - .inserter(&config.sink_table_name, fields_list.as_slice()) - .unwrap() - .with_max_rows((BATCH_SIZE * schema.fields.len()).try_into().unwrap()); - - Self { - client, - runtime, - schema, - inserter, - sink_table_name: config.sink_table_name, - table, - primary_key_fields_indexes, - } - } - - pub fn commit_insert(&mut self) -> Result<(), BoxedError> { - self.runtime.block_on(async { - let stats = self.inserter.commit().await?; - if stats.rows > 0 { - debug!( - "{} bytes, {} rows, {} transactions have been inserted", - stats.bytes, stats.rows, stats.transactions, - ); - } - - Ok::<(), BoxedError>(()) - }) - } - - fn map_fields(&self, record: Record) -> Result, ClickhouseSinkError> { - record - .values - .into_iter() - .enumerate() - .map(|(index, mut field)| match self.schema.fields.get(index) { - Some(schema_field) => { - if schema_field.r#typ == FieldType::Binary && Field::Null == field { - field = Field::Binary(vec![]); - } - - Ok(convert_field_to_ff(field.clone(), schema_field.nullable)) - } - None => Err(SchemaFieldNotFoundByIndex(index)), - }) - .collect() - } -} - -impl Sink for ClickhouseSink { - fn commit(&mut self, _epoch_details: &Epoch) -> Result<(), BoxedError> { - self.commit_insert() - } - - fn process(&mut self, op: TableOperation) -> Result<(), BoxedError> { - match op.op { - Operation::Insert { new } => { - let values = self.map_fields(new)?; - self.runtime.block_on(async { - for value in values { - self.inserter.write(&value)?; - } - - Ok::<(), BoxedError>(()) - })?; - - self.commit_insert()?; - } - Operation::Delete { old } => { - if self.table.engine != "MergeTree" { - return Err(BoxedError::from(UnsupportedOperation)); - } - let mut conditions = vec![]; - - for (index, field) in old.values.iter().enumerate() { - if *field != Field::Null { - let field_name = self.schema.fields.get(index).unwrap().name.clone(); - conditions.push(format!("{field_name} = ?")); - } - } - - let mut query = self.client.query(&format!( - "DELETE FROM {table_name} WHERE {condition}", - condition = conditions.join(" AND "), - table_name = self.sink_table_name - )); - - for field in old.values.iter() { - if *field != Field::Null { - query = query.bind(field); - } - } - - self.runtime.block_on(async { - query.execute().await.unwrap(); - - Ok::<(), BoxedError>(()) - })?; - } - Operation::Update { new, old } => { - let mut updates = vec![]; - - for (index, field) in new.values.iter().enumerate() { - if self.primary_key_fields_indexes.contains(&index) { - continue; - } - let schema_field = self.schema.fields.get(index).unwrap(); - let field_name = schema_field.name.clone(); - if *field == Field::Null { - updates.push(format!("{field_name} = NULL")); - } else { - updates.push(format!("{field_name} = ?")); - } - } - - let mut conditions = vec![]; - - for (index, field) in old.values.iter().enumerate() { - if !self.primary_key_fields_indexes.contains(&index) { - continue; - } - - if *field != Field::Null { - let field_name = self.schema.fields.get(index).unwrap().name.clone(); - conditions.push(format!("{field_name} = ?")); - } - } - - let mut query = self.client.query(&format!( - "ALTER TABLE {table_name} UPDATE {updates} WHERE {condition}", - condition = conditions.join(" AND "), - updates = updates.join(", "), - table_name = self.sink_table_name - )); - - for (index, field) in new.values.iter().enumerate() { - if *field == Field::Null || self.primary_key_fields_indexes.contains(&index) { - continue; - } - - query = query.bind(field); - } - - for (index, field) in old.values.iter().enumerate() { - if !self.primary_key_fields_indexes.contains(&index) { - continue; - } - - if *field != Field::Null { - query = query.bind(field); - } - } - - self.runtime.block_on(async { - query.execute().await.unwrap(); - - Ok::<(), BoxedError>(()) - })?; - } - Operation::BatchInsert { new } => { - for record in new { - for f in self.map_fields(record)? { - self.runtime.block_on(async { - self.inserter.write(&f)?; - Ok::<(), BoxedError>(()) - })?; - } - } - - self.commit_insert()?; - } - } - - Ok(()) - } - - fn on_source_snapshotting_started( - &mut self, - _connection_name: String, - ) -> Result<(), BoxedError> { - Ok(()) - } - - fn on_source_snapshotting_done( - &mut self, - _connection_name: String, - _id: Option, - ) -> Result<(), BoxedError> { - Ok(()) - } - - fn set_source_state(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> { - Ok(()) - } - - fn get_source_state(&mut self) -> Result>, BoxedError> { - Ok(None) - } - - fn get_latest_op_id(&mut self) -> Result, BoxedError> { - Ok(None) - } -} +pub mod types; diff --git a/dozer-sink-clickhouse/src/metadata.rs b/dozer-sink-clickhouse/src/metadata.rs new file mode 100644 index 0000000000..4d3b409e02 --- /dev/null +++ b/dozer-sink-clickhouse/src/metadata.rs @@ -0,0 +1,47 @@ +use dozer_types::types::{FieldDefinition, FieldType, Schema, SourceDefinition}; + +// Replication Metadata Constants +pub const REPLICA_METADATA_TABLE: &str = "__dozer_replication_metadata"; +pub const META_TABLE_COL: &str = "table"; +pub const META_TXN_ID_COL: &str = "txn_id"; + +pub struct ReplicationMetadata { + pub schema: Schema, + pub table_name: String, +} + +impl ReplicationMetadata { + pub fn schema(&self) -> &Schema { + &self.schema + } + + pub fn get_primary_keys(&self) -> Vec { + vec![META_TABLE_COL.to_string()] + } + + pub fn get_metadata() -> ReplicationMetadata { + ReplicationMetadata { + table_name: REPLICA_METADATA_TABLE.to_string(), + schema: Schema::new() + .field( + FieldDefinition { + name: META_TABLE_COL.to_owned(), + typ: FieldType::String, + nullable: false, + source: SourceDefinition::Dynamic, + }, + true, + ) + .field( + FieldDefinition { + name: META_TXN_ID_COL.to_owned(), + typ: FieldType::UInt, + nullable: false, + source: SourceDefinition::Dynamic, + }, + false, + ) + .clone(), + } + } +} diff --git a/dozer-sink-clickhouse/src/schema.rs b/dozer-sink-clickhouse/src/schema.rs index 56488af879..ca920e8562 100644 --- a/dozer-sink-clickhouse/src/schema.rs +++ b/dozer-sink-clickhouse/src/schema.rs @@ -1,162 +1,91 @@ -use crate::ClickhouseSinkError::SinkTableDoesNotExist; -use crate::{ddl, ClickhouseSinkError}; -use clickhouse::{Client, Row}; -use dozer_types::errors::internal::BoxedError; +use crate::client::ClickhouseClient; +use crate::errors::ClickhouseSinkError::{self, SinkTableDoesNotExist}; +use crate::types::DECIMAL_SCALE; +use clickhouse_rs::types::Complex; +use clickhouse_rs::{Block, ClientHandle}; +use dozer_types::log::warn; use dozer_types::models::sink::ClickhouseSinkConfig; use dozer_types::serde::{Deserialize, Serialize}; -use dozer_types::types::{FieldType, Schema}; +use dozer_types::types::{FieldDefinition, FieldType, Schema}; -#[derive(Debug, Row, Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize)] #[serde(crate = "dozer_types::serde")] -pub(crate) struct ClickhouseSchemaColumn { - pub(crate) name: String, - pub(crate) r#type: String, - pub(crate) default_type: String, - pub(crate) default_expression: String, - pub(crate) comment: String, - pub(crate) codec_expression: String, - pub(crate) ttl_expression: String, +pub struct ClickhouseSchemaColumn { + name: String, + type_: String, } -#[derive(Debug, Row, Deserialize, Serialize, Clone)] +#[derive(Debug, Deserialize, Serialize, Clone)] #[serde(crate = "dozer_types::serde")] -pub(crate) struct ClickhouseTable { - pub(crate) database: String, - pub(crate) name: String, - pub(crate) engine: String, - pub(crate) engine_full: String, +pub struct ClickhouseTable { + pub database: String, + pub name: String, + pub engine: String, + pub engine_full: String, } -#[derive(Debug, Row, Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize)] #[serde(crate = "dozer_types::serde")] -pub(crate) struct ClickhouseKeyColumnDef { - pub(crate) column_name: Option, - pub(crate) constraint_name: Option, - pub(crate) constraint_schema: String, +pub struct ClickhouseKeyColumnDef { + pub column_name: Option, + pub constraint_name: Option, + pub constraint_schema: String, } pub struct ClickhouseSchema {} impl ClickhouseSchema { pub async fn get_clickhouse_table( - client: &Client, + client: ClickhouseClient, config: &ClickhouseSinkConfig, - dozer_schema: &Schema, ) -> Result { - match Self::fetch_sink_table_info(client, &config.sink_table_name).await { - Ok(table) => { - ClickhouseSchema::compare_with_dozer_schema( - client, - dozer_schema.clone(), - table.clone(), - ) - .await?; - Ok(table) - } - Err(ClickhouseSinkError::ClickhouseQueryError( - clickhouse::error::Error::RowNotFound, - )) => { - if config.create_table_options.is_none() { - Err(SinkTableDoesNotExist) - } else { - let create_table_query = ddl::ClickhouseDDL::get_create_table_query( - config.sink_table_name.clone(), - dozer_schema.clone(), - config.create_table_options.clone(), - config.primary_keys.clone(), - ); - - client.query(&create_table_query).execute().await?; - Self::fetch_sink_table_info(client, &config.sink_table_name).await - } - } - Err(e) => Err(e), + let mut client = client.get_client_handle().await?; + let query = format!("DESCRIBE TABLE {}", config.sink_table_name); + let block: Block = client.query(&query).fetch_all().await?; + + if block.row_count() == 0 { + Err(SinkTableDoesNotExist) + } else { + Self::fetch_sink_table_info(client, &config.sink_table_name).await } } - pub async fn get_primary_keys( - client: &Client, - config: &ClickhouseSinkConfig, - ) -> Result, BoxedError> { - let existing_pk = - Self::fetch_primary_keys(client, &config.sink_table_name, &config.database).await?; - - if let Some(expected_pk) = &config.primary_keys { - if expected_pk.len() != existing_pk.len() { - return Err(ClickhouseSinkError::PrimaryKeyMismatch( - expected_pk.clone(), - existing_pk.clone(), - ) - .into()); - } - - for pk in expected_pk { - if !existing_pk.iter().any(|existing_pk| existing_pk == pk) { - return Err(ClickhouseSinkError::PrimaryKeyMismatch( - expected_pk.clone(), - existing_pk.clone(), - ) - .into()); - } - } - } - - Ok(existing_pk) - } - async fn compare_with_dozer_schema( - client: &Client, - schema: Schema, - table: ClickhouseTable, + pub async fn compare_with_dozer_schema( + client: ClickhouseClient, + schema: &Schema, + table: &ClickhouseTable, ) -> Result<(), ClickhouseSinkError> { - let columns: Vec = client + let mut client = client.get_client_handle().await?; + let block: Block = client .query(&format!( "DESCRIBE TABLE {database}.{table_name}", table_name = table.name, database = table.database )) - .fetch_all::() + .fetch_all() .await?; - for field in schema.fields { - let Some(column) = columns.iter().find(|column| column.name == field.name) else { - return Err(ClickhouseSinkError::ColumnNotFound(field.name)); - }; - - let mut expected_type = match field.typ { - FieldType::UInt => "UInt64", - FieldType::U128 => "Uint128", - FieldType::Int => "Int64", - FieldType::I128 => "Int128", - FieldType::Float => "Float64", - FieldType::Boolean => "Bool", - FieldType::String | FieldType::Text => "String", - FieldType::Binary => "UInt8", - FieldType::Decimal => "Decimal64", - FieldType::Timestamp => "DateTime64(9)", - FieldType::Date => "Date", - FieldType::Json => "Json", - FieldType::Point => "Point", - FieldType::Duration => { - return Err(ClickhouseSinkError::TypeNotSupported( - field.name, - "Duration".to_string(), - )) + let columns: Vec = block + .rows() + .map(|row| { + let column_name: String = row.get("name").unwrap(); + let column_type: String = row.get("type").unwrap(); + ClickhouseSchemaColumn { + name: column_name, + type_: column_type, } - } - .to_string(); - - if field.nullable { - expected_type = format!("Nullable({expected_type})"); - } - - if field.typ == FieldType::Binary { - expected_type = format!("Array({expected_type})"); - } + }) + .collect(); - let column_type = column.r#type.clone(); + for field in &schema.fields { + let Some(column) = columns.iter().find(|column| column.name == field.name) else { + return Err(ClickhouseSinkError::ColumnNotFound(field.name.clone())); + }; + let expected_type = map_field_to_type(field); + let column_type = column.type_.clone(); if expected_type != column_type { return Err(ClickhouseSinkError::ColumnTypeMismatch( - field.name, + field.name.clone(), expected_type.to_string(), column_type.to_string(), )); @@ -167,32 +96,82 @@ impl ClickhouseSchema { } async fn fetch_sink_table_info( - client: &Client, + mut handle: ClientHandle, sink_table_name: &str, ) -> Result { - Ok(client - .query("SELECT database, name, engine, engine_full FROM system.tables WHERE name = ?") - .bind(sink_table_name) - .fetch_one::() - .await?) + let block = handle + .query(format!( + "SELECT database, name, engine, engine_full FROM system.tables WHERE name = '{}'", + sink_table_name + )) + .fetch_all() + .await?; + let row = block.rows().next().unwrap(); + Ok(ClickhouseTable { + database: row.get(0)?, + name: row.get(1)?, + engine: row.get(2)?, + engine_full: row.get(3)?, + }) } + #[allow(dead_code)] async fn fetch_primary_keys( - client: &Client, + mut handle: ClientHandle, sink_table_name: &str, schema: &str, ) -> Result, ClickhouseSinkError> { - Ok(client - .query("SELECT ?fields FROM INFORMATION_SCHEMA.key_column_usage WHERE table_name = ? AND constraint_schema = ?") - .bind(sink_table_name) - .bind(schema) - .fetch_all::() - .await? - .iter() - .filter_map(|key_column_def| { - key_column_def.column_name.clone() - }) - .collect() - ) + let block = handle + .query(format!( + r#" + SELECT column_name, constraint_name, constraint_schema + FROM INFORMATION_SCHEMA.key_column_usage + WHERE table_name = '{}' AND constraint_schema = '{}' and column_name is NOT NULL"#, + sink_table_name, schema + )) + .fetch_all() + .await?; + + let mut keys = vec![]; + for r in block.rows() { + let name: Option = r.get(0)?; + if let Some(name) = name { + keys.push(name); + } + } + + Ok(keys) + } +} + +pub fn map_field_to_type(field: &FieldDefinition) -> String { + let decimal = format!("Decimal(10, {})", DECIMAL_SCALE); + let typ: &str = match field.typ { + FieldType::UInt => "UInt64", + FieldType::U128 => "UInt128", + FieldType::Int => "Int64", + FieldType::I128 => "Int128", + FieldType::Float => "Float64", + FieldType::Boolean => "Boolean", + FieldType::String => "String", + FieldType::Text => "String", + FieldType::Binary => "Array(UInt8)", + FieldType::Decimal => &decimal, + FieldType::Timestamp => "DateTime64(3)", + FieldType::Date => "Date", + FieldType::Json => "JSON", + FieldType::Point => "Point", + FieldType::Duration => unimplemented!(), + }; + + if field.nullable { + if field.typ != FieldType::Binary { + format!("Nullable({})", typ) + } else { + warn!("Binary field cannot be nullable, ignoring nullable flag"); + typ.to_string() + } + } else { + typ.to_string() } } diff --git a/dozer-sink-clickhouse/src/sink.rs b/dozer-sink-clickhouse/src/sink.rs new file mode 100644 index 0000000000..b03ab68f86 --- /dev/null +++ b/dozer-sink-clickhouse/src/sink.rs @@ -0,0 +1,322 @@ +use dozer_core::epoch::Epoch; +use dozer_core::event::EventHub; +use dozer_core::node::{PortHandle, Sink, SinkFactory}; +use dozer_core::tokio::runtime::Runtime; +use dozer_core::DEFAULT_PORT_HANDLE; +use dozer_types::errors::internal::BoxedError; + +use dozer_types::log::debug; +use dozer_types::models::sink::{ClickhouseSinkConfig, ClickhouseTableOptions}; +use dozer_types::node::OpIdentifier; + +use crate::client::ClickhouseClient; +use crate::errors::ClickhouseSinkError; +use crate::metadata::{ + ReplicationMetadata, META_TABLE_COL, META_TXN_ID_COL, REPLICA_METADATA_TABLE, +}; +use crate::schema::{ClickhouseSchema, ClickhouseTable}; +use dozer_types::tonic::async_trait; +use dozer_types::types::{Field, Operation, Schema, TableOperation}; +use std::collections::HashMap; +use std::fmt::Debug; +use std::sync::Arc; + +const BATCH_SIZE: usize = 100; + +#[derive(Debug)] +pub struct ClickhouseSinkFactory { + runtime: Arc, + config: ClickhouseSinkConfig, +} + +impl ClickhouseSinkFactory { + pub fn new(config: ClickhouseSinkConfig, runtime: Arc) -> Self { + Self { config, runtime } + } + + pub async fn create_replication_metadata_table(&self) -> Result<(), BoxedError> { + let client = ClickhouseClient::new(self.config.clone()); + let repl_metadata = ReplicationMetadata::get_metadata(); + + let primary_keys = repl_metadata.get_primary_keys(); + let partition_by = format!("({})", primary_keys.join(",")); + let create_table_options = ClickhouseTableOptions { + engine: Some("ReplacingMergeTree".to_string()), + primary_keys: Some(repl_metadata.get_primary_keys()), + partition_by: Some(partition_by), + // Replaced using this key + order_by: Some(repl_metadata.get_primary_keys()), + cluster: self + .config + .create_table_options + .as_ref() + .and_then(|o| o.cluster.clone()), + sample_by: None, + }; + client + .create_table( + &repl_metadata.table_name, + &repl_metadata.schema.fields, + Some(create_table_options), + ) + .await?; + + Ok(()) + } +} + +#[async_trait] +impl SinkFactory for ClickhouseSinkFactory { + fn type_name(&self) -> String { + "clickhouse".to_string() + } + + fn get_input_ports(&self) -> Vec { + vec![DEFAULT_PORT_HANDLE] + } + + fn get_input_port_name(&self, _port: &PortHandle) -> String { + self.config.source_table_name.clone() + } + + fn prepare(&self, input_schemas: HashMap) -> Result<(), BoxedError> { + debug_assert!(input_schemas.len() == 1); + Ok(()) + } + + async fn build( + &self, + mut input_schemas: HashMap, + _event_hub: EventHub, + ) -> Result, BoxedError> { + let schema = input_schemas.remove(&DEFAULT_PORT_HANDLE).unwrap(); + + let client = ClickhouseClient::new(self.config.clone()); + + let config = &self.config; + + // Create Sink Table + self.create_replication_metadata_table().await?; + + // Create Sink Table + if self.config.create_table_options.is_some() { + client + .create_table( + &config.sink_table_name, + &schema.fields, + self.config.create_table_options.clone(), + ) + .await?; + } + let table = ClickhouseSchema::get_clickhouse_table(client.clone(), &self.config).await?; + + ClickhouseSchema::compare_with_dozer_schema(client.clone(), &schema, &table).await?; + + let sink = ClickhouseSink::new( + client, + self.config.clone(), + schema, + self.runtime.clone(), + table, + ); + + Ok(Box::new(sink)) + } +} + +pub(crate) struct ClickhouseSink { + pub(crate) client: ClickhouseClient, + pub(crate) runtime: Arc, + pub(crate) schema: Schema, + pub(crate) sink_table_name: String, + pub(crate) table: ClickhouseTable, + batch: Vec>, + metadata: ReplicationMetadata, + latest_txid: Option, +} + +impl Debug for ClickhouseSink { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ClickhouseSink") + .field("sink_table_name", &self.sink_table_name) + .field("table", &self.table) + .field("schema", &self.schema) + .finish() + } +} + +impl ClickhouseSink { + pub fn new( + client: ClickhouseClient, + config: ClickhouseSinkConfig, + schema: Schema, + runtime: Arc, + table: ClickhouseTable, + ) -> Self { + Self { + client, + runtime, + schema, + sink_table_name: config.sink_table_name, + table, + batch: Vec::new(), + latest_txid: None, + metadata: ReplicationMetadata::get_metadata(), + } + } + + pub async fn insert_metadata(&self) -> Result<(), BoxedError> { + debug!( + "[Sink] Inserting metadata record {:?} {}", + self.latest_txid, + self.sink_table_name.clone() + ); + if let Some(txid) = self.latest_txid { + self.client + .insert( + REPLICA_METADATA_TABLE, + &self.metadata.schema.fields, + &[ + Field::String(self.sink_table_name.clone()), + Field::UInt(txid), + ], + ) + .await?; + } + Ok(()) + } + + fn insert_values(&mut self, values: &[Field]) -> Result<(), BoxedError> { + // add values to batch instead of inserting immediately + self.batch.push(values.to_vec()); + Ok(()) + } + + fn commit_batch(&mut self) -> Result<(), BoxedError> { + self.runtime.block_on(async { + //Insert batch + self.client + .insert_multi(&self.sink_table_name, &self.schema.fields, &self.batch) + .await?; + + self.insert_metadata().await?; + Ok::<(), BoxedError>(()) + })?; + + self.batch.clear(); + Ok(()) + } + + fn _get_latest_op(&mut self) -> Result, BoxedError> { + let op = self.runtime.block_on(async { + let mut client = self.client.get_client_handle().await?; + let table_name = self.sink_table_name.clone(); + let query = format!("SELECT \"{META_TXN_ID_COL}\" FROM \"{REPLICA_METADATA_TABLE}\" WHERE \"{META_TABLE_COL}\" = '\"{table_name}\"' ORDER BY \"{META_TXN_ID_COL}\" LIMIT 1"); + let block = client + .query(query) + .fetch_all() + .await?; + + let row = block.rows().next(); + match row { + Some(row) => { + let txid: u64 = row.get(META_TXN_ID_COL)?; + Ok::, BoxedError>(Some(OpIdentifier { txid, seq_in_tx: 0 })) + }, + None => Ok::, BoxedError>(None), + } + })?; + Ok(op) + } +} + +impl Sink for ClickhouseSink { + fn commit(&mut self, _epoch_details: &Epoch) -> Result<(), BoxedError> { + Ok(()) + } + + fn flush_batch(&mut self) -> Result<(), BoxedError> { + self.commit_batch()?; + Ok(()) + } + + fn process(&mut self, op: TableOperation) -> Result<(), BoxedError> { + self.latest_txid = op.id.map(|id| id.txid); + match op.op { + Operation::Insert { new } => { + if self.table.engine == "CollapsingMergeTree" { + let mut values = new.values; + values.push(Field::Int(1)); + + self.insert_values(&values)?; + } else { + self.insert_values(&new.values)?; + } + + if self.batch.len() > BATCH_SIZE - 1 { + self.commit_batch()?; + } + } + Operation::Delete { old } => { + if self.table.engine != "CollapsingMergeTree" { + return Err(BoxedError::from(ClickhouseSinkError::UnsupportedOperation)); + } + let mut values = old.values; + values.push(Field::Int(-1)); + self.insert_values(&values)?; + } + Operation::Update { new, old } => { + if self.table.engine != "CollapsingMergeTree" { + return Err(BoxedError::from(ClickhouseSinkError::UnsupportedOperation)); + } + let mut values = old.values; + values.push(Field::Int(-1)); + self.insert_values(&values)?; + + let mut values = new.values; + values.push(Field::Int(1)); + self.insert_values(&values)?; + } + Operation::BatchInsert { new } => { + for record in new { + let mut values = record.values; + values.push(Field::Int(1)); + self.insert_values(&values)?; + } + self.commit_batch()?; + } + } + + Ok(()) + } + + fn on_source_snapshotting_started( + &mut self, + _connection_name: String, + ) -> Result<(), BoxedError> { + Ok(()) + } + + fn on_source_snapshotting_done( + &mut self, + _connection_name: String, + id: Option, + ) -> Result<(), BoxedError> { + self.latest_txid = id.map(|opid| opid.txid); + self.commit_batch()?; + Ok(()) + } + + fn set_source_state(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> { + Ok(()) + } + + fn get_source_state(&mut self) -> Result>, BoxedError> { + Ok(None) + } + + fn get_latest_op_id(&mut self) -> Result, BoxedError> { + // self.get_latest_op() + Ok(None) + } +} diff --git a/dozer-sink-clickhouse/src/tests.rs b/dozer-sink-clickhouse/src/tests.rs index 894a747650..3f7102fd42 100644 --- a/dozer-sink-clickhouse/src/tests.rs +++ b/dozer-sink-clickhouse/src/tests.rs @@ -1,31 +1,28 @@ +use crate::client::ClickhouseClient; use crate::schema::ClickhouseSchema; -use crate::ClickhouseSinkError; -use clickhouse::Client; use dozer_core::tokio; use dozer_types::models::sink::ClickhouseSinkConfig; use dozer_types::types::{FieldDefinition, FieldType, Schema}; -fn get_client() -> Client { - Client::default() - .with_url("http://localhost:8123") - .with_user("default") - .with_database("default") +fn get_client() -> ClickhouseClient { + ClickhouseClient::new(get_sink_config()) } fn get_sink_config() -> ClickhouseSinkConfig { ClickhouseSinkConfig { source_table_name: "source_table".to_string(), sink_table_name: "sink_table".to_string(), + scheme: "tcp".to_string(), create_table_options: None, - primary_keys: Some(vec!["id".to_string()]), user: "default".to_string(), - password: Some("default".to_string()), + password: None, database: "default".to_string(), - database_url: "http://localhost:8123".to_string(), + host: "localhost".to_string(), + port: 9000, } } -fn get_dozer_schema() -> Schema { +fn _get_dozer_schema() -> Schema { Schema { fields: vec![ FieldDefinition { @@ -46,16 +43,14 @@ fn get_dozer_schema() -> Schema { } async fn create_table(table_name: &str) { - let client = get_client(); + let mut client = get_client().get_client_handle().await.unwrap(); client - .query(&format!("DROP TABLE IF EXISTS {table_name}")) - .execute() + .execute(&format!("DROP TABLE IF EXISTS {table_name}")) .await .unwrap(); client - .query(&format!("CREATE TABLE {table_name}(id UInt64, data String, PRIMARY KEY id) ENGINE = MergeTree ORDER BY id")) - .execute() + .execute(&format!("CREATE TABLE {table_name}(id UInt64, data String, PRIMARY KEY id) ENGINE = CollapsingMergeTree ORDER BY id")) .await .unwrap(); } @@ -66,29 +61,8 @@ async fn test_get_clickhouse_table() { let client = get_client(); let sink_config = get_sink_config(); create_table(&sink_config.sink_table_name).await; - let schema = get_dozer_schema(); - - let clickhouse_table = ClickhouseSchema::get_clickhouse_table(&client, &sink_config, &schema) + let clickhouse_table = ClickhouseSchema::get_clickhouse_table(client, &sink_config) .await .unwrap(); assert_eq!(clickhouse_table.name, sink_config.sink_table_name); } - -#[tokio::test] -#[ignore] -async fn test_get_not_existing_clickhouse_table() { - let client = get_client(); - let mut sink_config = get_sink_config(); - create_table("not_existing").await; - let schema = get_dozer_schema(); - - sink_config.create_table_options = None; - - let clickhouse_table = - ClickhouseSchema::get_clickhouse_table(&client, &sink_config, &schema).await; - eprintln!("CT {:?}", clickhouse_table); - assert!(matches!( - clickhouse_table, - Err(ClickhouseSinkError::SinkTableDoesNotExist) - )); -} diff --git a/dozer-sink-clickhouse/src/types.rs b/dozer-sink-clickhouse/src/types.rs new file mode 100644 index 0000000000..56ade44a41 --- /dev/null +++ b/dozer-sink-clickhouse/src/types.rs @@ -0,0 +1,327 @@ +#![allow(clippy::redundant_closure_call)] +use crate::errors::QueryError; + +use chrono_tz::{Tz, UTC}; +use clickhouse_rs::{Block, ClientHandle}; +use dozer_types::chrono::{DateTime, FixedOffset, NaiveDate, Offset, TimeZone}; +use dozer_types::json_types::JsonValue; +use dozer_types::ordered_float::OrderedFloat; +use dozer_types::rust_decimal::prelude::ToPrimitive; +use dozer_types::rust_decimal::{self}; +use dozer_types::serde_json; +use dozer_types::types::{Field, FieldDefinition, FieldType}; +use either::Either; + +use clickhouse_rs::types::{FromSql, Value, ValueRef}; + +pub const DECIMAL_SCALE: u8 = 4; +pub struct ValueWrapper(pub Value); + +impl<'a> FromSql<'a> for ValueWrapper { + fn from_sql(value: ValueRef<'a>) -> clickhouse_rs::errors::Result { + let v = Value::from(value); + Ok(ValueWrapper(v)) + } +} + +pub fn map_value_wrapper_to_field( + value: ValueWrapper, + field: FieldDefinition, +) -> Result { + if field.nullable { + if let clickhouse_rs::types::Value::Nullable(v) = value.0 { + match v { + Either::Left(_) => Ok(Field::Null), + Either::Right(data) => { + let mut fd = field.clone(); + fd.nullable = false; + map_value_wrapper_to_field(ValueWrapper(*data), fd) + } + } + } else { + Err(QueryError::CustomError(format!( + "Field is marked as nullable in Schema but not as per the database {0:?}", + field.name + ))) + } + } else { + let value = value.0; + match field.typ { + FieldType::UInt => match value { + clickhouse_rs::types::Value::UInt8(val) => Ok(Field::UInt(val.into())), + clickhouse_rs::types::Value::UInt16(val) => Ok(Field::UInt(val.into())), + clickhouse_rs::types::Value::UInt32(val) => Ok(Field::UInt(val.into())), + clickhouse_rs::types::Value::UInt64(val) => Ok(Field::UInt(val)), + _ => Err(QueryError::CustomError("Invalid UInt value".to_string())), + }, + FieldType::U128 => match value { + clickhouse_rs::types::Value::UInt128(val) => Ok(Field::U128(val)), + _ => Err(QueryError::CustomError("Invalid U128 value".to_string())), + }, + FieldType::Int => match value { + clickhouse_rs::types::Value::Int8(val) => Ok(Field::Int(val.into())), + clickhouse_rs::types::Value::Int16(val) => Ok(Field::Int(val.into())), + clickhouse_rs::types::Value::Int32(val) => Ok(Field::Int(val.into())), + clickhouse_rs::types::Value::Int64(val) => Ok(Field::Int(val)), + _ => Err(QueryError::CustomError("Invalid Int value".to_string())), + }, + FieldType::I128 => match value { + clickhouse_rs::types::Value::Int128(val) => Ok(Field::I128(val)), + _ => Err(QueryError::CustomError("Invalid I128 value".to_string())), + }, + FieldType::Float => match value { + clickhouse_rs::types::Value::Float64(val) => Ok(Field::Float(OrderedFloat(val))), + _ => Err(QueryError::CustomError("Invalid Float value".to_string())), + }, + FieldType::Boolean => match value { + clickhouse_rs::types::Value::UInt8(val) => Ok(Field::Boolean(val != 0)), + _ => Err(QueryError::CustomError("Invalid Boolean value".to_string())), + }, + FieldType::String => match value { + clickhouse_rs::types::Value::String(_) => Ok(Field::String(value.to_string())), + _ => Err(QueryError::CustomError("Invalid String value".to_string())), + }, + FieldType::Text => match value { + clickhouse_rs::types::Value::String(_) => Ok(Field::String(value.to_string())), + _ => Err(QueryError::CustomError("Invalid String value".to_string())), + }, + FieldType::Binary => match value { + clickhouse_rs::types::Value::String(val) => { + let val = (*val).clone(); + Ok(Field::Binary(val)) + } + _ => Err(QueryError::CustomError("Invalid Binary value".to_string())), + }, + FieldType::Decimal => match value { + clickhouse_rs::types::Value::Decimal(v) => Ok(Field::Decimal( + rust_decimal::Decimal::new(v.internal(), v.scale() as u32), + )), + _ => Err(QueryError::CustomError("Invalid Decimal value".to_string())), + }, + FieldType::Timestamp => { + let v: DateTime = value.into(); + let dt = convert_to_fixed_offset(v); + match dt { + Some(dt) => Ok(Field::Timestamp(dt)), + None => Err(QueryError::CustomError( + "Invalid Timestamp value".to_string(), + )), + } + } + FieldType::Date => Ok(Field::Date(value.into())), + FieldType::Json => match value { + clickhouse_rs::types::Value::String(_) => { + let json = value.to_string(); + let json = serde_json::from_str(&json); + json.map(Field::Json) + .map_err(|e| QueryError::CustomError(e.to_string())) + } + _ => Err(QueryError::CustomError("Invalid Json value".to_string())), + }, + x => Err(QueryError::CustomError(format!( + "Unsupported type {0:?}", + x + ))), + } + } +} + +fn convert_to_fixed_offset(datetime_tz: DateTime) -> Option> { + // Get the offset from UTC in seconds for the specific datetime + let offset_seconds = datetime_tz + .timezone() + .offset_from_utc_datetime(&datetime_tz.naive_utc()) + .fix() + .local_minus_utc(); + + // Create a FixedOffset timezone from the offset in seconds using east_opt() + FixedOffset::east_opt(offset_seconds) + .map(|fixed_offset| fixed_offset.from_utc_datetime(&datetime_tz.naive_utc())) +} + +fn type_mismatch_error(expected_type: &str, field_name: &str) -> QueryError { + QueryError::TypeMismatch(expected_type.to_string(), field_name.to_string()) +} + +macro_rules! handle_type { + ($nullable: expr, $b: expr, $field_type:ident, $rust_type:ty, $column_values:expr, $n:expr) => {{ + if $nullable { + let column_data: Vec> = $column_values.iter().map(Some).collect(); + let mut col: Vec> = vec![]; + for f in column_data { + let v = match f { + Some(Field::$field_type(v)) => Ok(Some(*v)), + None => Ok(None), + _ => Err(type_mismatch_error(stringify!($field_type), $n)), + }?; + col.push(v); + } + $b = $b.column($n, col); + } else { + let mut col: Vec<$rust_type> = vec![]; + for f in $column_values { + let v = match f { + Field::$field_type(v) => Ok(*v), + _ => Err(type_mismatch_error(stringify!($field_type), $n)), + }?; + col.push(v); + } + $b = $b.column($n, col); + } + }}; +} + +macro_rules! handle_complex_type { + ($nullable: expr, $b: expr, $field_type:ident, $rust_type:ty, $column_values:expr, $n:expr, $complex_expr:expr) => {{ + if $nullable { + let column_data: Vec> = $column_values.iter().map(Some).collect(); + let mut col: Vec> = vec![]; + for f in column_data { + let v = match f { + Some(Field::$field_type(v)) => { + let v = $complex_expr(v); + Ok(v) + } + None => Ok(None), + _ => Err(type_mismatch_error(stringify!($field_type), $n)), + }?; + col.push(v); + } + $b = $b.column($n, col); + } else { + let mut col: Vec<$rust_type> = vec![]; + for f in $column_values { + let v = match f { + Field::$field_type(v) => { + let v = $complex_expr(v); + match v { + Some(v) => Ok(v), + None => Err(type_mismatch_error(stringify!($field_type), $n)), + } + } + _ => Err(type_mismatch_error(stringify!($field_type), $n)), + }?; + col.push(v); + } + $b = $b.column($n, col); + } + }}; +} + +pub async fn insert_multi( + mut client: ClientHandle, + table_name: &str, + fields: &[FieldDefinition], + values: &[Vec], // Now expects a Vec of Vec of Field +) -> Result<(), QueryError> { + let mut b = Block::::new(); + + for (field_index, fd) in fields.iter().enumerate() { + let column_values: Vec<_> = values.iter().map(|row| &row[field_index]).collect(); + + let n = &fd.name; + let nullable = fd.nullable; + match fd.typ { + FieldType::UInt => handle_type!(nullable, b, UInt, u64, column_values, n), + FieldType::U128 => handle_type!(nullable, b, U128, u128, column_values, n), + FieldType::Int => handle_type!(nullable, b, Int, i64, column_values, n), + FieldType::I128 => handle_type!(nullable, b, I128, i128, column_values, n), + FieldType::Boolean => handle_type!(nullable, b, Boolean, bool, column_values, n), + FieldType::Float => { + handle_complex_type!( + nullable, + b, + Float, + f64, + column_values, + n, + |f: &OrderedFloat| -> Option { f.to_f64() } + ) + } + FieldType::String | FieldType::Text => { + handle_complex_type!( + nullable, + b, + String, + String, + column_values, + n, + |f: &String| -> Option { Some(f.to_string()) } + ) + } + FieldType::Binary => { + handle_complex_type!( + nullable, + b, + Binary, + Vec, + column_values, + n, + |f: &Vec| -> Option> { Some(f.clone()) } + ) + } + FieldType::Decimal => { + handle_complex_type!( + nullable, + b, + Decimal, + clickhouse_rs::types::Decimal, + column_values, + n, + |f: &rust_decimal::Decimal| -> Option { + f.to_f64() + .map(|f| clickhouse_rs::types::Decimal::of(f, DECIMAL_SCALE)) + } + ) + } + FieldType::Timestamp => { + handle_complex_type!( + nullable, + b, + Timestamp, + DateTime, + column_values, + n, + |dt: &DateTime| -> Option> { + Some(dt.with_timezone(&UTC)) + } + ) + } + FieldType::Date => { + handle_complex_type!( + nullable, + b, + Date, + NaiveDate, + column_values, + n, + |f: &NaiveDate| -> Option { Some(*f) } + ) + } + FieldType::Json => { + handle_complex_type!( + nullable, + b, + Json, + Vec, + column_values, + n, + |f: &JsonValue| -> Option> { + Some(dozer_types::json_types::json_to_bytes(f)) + } + ) + } + ft => { + return Err(QueryError::CustomError(format!( + "Unsupported field_type {} for {}", + ft, n + ))); + } + } + } + + // Insert the block into the table + client.insert(table_name, b).await?; + + Ok(()) +} diff --git a/dozer-tracing/src/telemetry.rs b/dozer-tracing/src/telemetry.rs index 2b3deeb139..4caab836e5 100644 --- a/dozer-tracing/src/telemetry.rs +++ b/dozer-tracing/src/telemetry.rs @@ -63,7 +63,7 @@ fn create_subscriber( let app_name = app_name.unwrap_or("dozer"); let fmt_filter = EnvFilter::try_from_default_env() - .or_else(|_| EnvFilter::try_new("info")) + .or_else(|_| EnvFilter::try_new("info,clickhouse_rs=error")) .unwrap(); // `console_subscriber` can only be added once. @@ -78,6 +78,7 @@ fn create_subscriber( let trace_filter = EnvFilter::try_from_env("DOZER_TRACE_FILTER") .unwrap_or_else(|_| EnvFilter::try_new("dozer=trace").unwrap()); + let layers = match &telemetry_config.trace { None => (None, None), Some(TelemetryTraceConfig::Dozer(config)) => ( diff --git a/dozer-types/src/json_types.rs b/dozer-types/src/json_types.rs index 96024902f8..e556697190 100644 --- a/dozer-types/src/json_types.rs +++ b/dozer-types/src/json_types.rs @@ -36,7 +36,7 @@ pub fn json_to_string(value: &JsonValue) -> String { serde_value.to_string() } -pub(crate) fn json_to_bytes(value: &JsonValue) -> Vec { +pub fn json_to_bytes(value: &JsonValue) -> Vec { rmp_serde::to_vec(value).unwrap() } diff --git a/dozer-types/src/models/sink.rs b/dozer-types/src/models/sink.rs index bfa94f2639..51809482e3 100644 --- a/dozer-types/src/models/sink.rs +++ b/dozer-types/src/models/sink.rs @@ -185,24 +185,49 @@ pub struct AerospikeSinkConfig { pub metadata_set: Option, } -#[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq, Clone)] +#[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq, Clone, Default)] #[serde(deny_unknown_fields)] pub struct ClickhouseSinkConfig { - pub database_url: String, + #[serde(default = "ClickhouseSinkConfig::default_host")] + pub host: String, + #[serde(default = "ClickhouseSinkConfig::default_port")] + pub port: u16, + #[serde(default = "ClickhouseSinkConfig::default_user")] pub user: String, #[serde(default)] pub password: Option, + #[serde(default = "ClickhouseSinkConfig::default_scheme")] + pub scheme: String, + #[serde(default = "ClickhouseSinkConfig::default_database")] pub database: String, pub source_table_name: String, pub sink_table_name: String, - pub primary_keys: Option>, - pub create_table_options: Option, + pub create_table_options: Option, +} + +impl ClickhouseSinkConfig { + fn default_database() -> String { + "default".to_string() + } + fn default_scheme() -> String { + "tcp".to_string() + } + fn default_host() -> String { + "0.0.0.0".to_string() + } + fn default_port() -> u16 { + 9000 + } + fn default_user() -> String { + "default".to_string() + } } #[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq, Clone)] #[serde(deny_unknown_fields)] -pub struct ClickhouseSinkTableOptions { +pub struct ClickhouseTableOptions { pub engine: Option, + pub primary_keys: Option>, pub partition_by: Option, pub sample_by: Option, pub order_by: Option>, diff --git a/json_schemas/dozer.json b/json_schemas/dozer.json index a1e9d770aa..82c69b1431 100644 --- a/json_schemas/dozer.json +++ b/json_schemas/dozer.json @@ -448,17 +448,14 @@ "ClickhouseSinkConfig": { "type": "object", "required": [ - "database", - "database_url", "sink_table_name", - "source_table_name", - "user" + "source_table_name" ], "properties": { "create_table_options": { "anyOf": [ { - "$ref": "#/definitions/ClickhouseSinkTableOptions" + "$ref": "#/definitions/ClickhouseTableOptions" }, { "type": "null" @@ -466,9 +463,11 @@ ] }, "database": { + "default": "default", "type": "string" }, - "database_url": { + "host": { + "default": "0.0.0.0", "type": "string" }, "password": { @@ -478,14 +477,15 @@ "null" ] }, - "primary_keys": { - "type": [ - "array", - "null" - ], - "items": { - "type": "string" - } + "port": { + "default": 9000, + "type": "integer", + "format": "uint16", + "minimum": 0.0 + }, + "scheme": { + "default": "tcp", + "type": "string" }, "sink_table_name": { "type": "string" @@ -494,12 +494,13 @@ "type": "string" }, "user": { + "default": "default", "type": "string" } }, "additionalProperties": false }, - "ClickhouseSinkTableOptions": { + "ClickhouseTableOptions": { "type": "object", "properties": { "cluster": { @@ -529,6 +530,15 @@ "null" ] }, + "primary_keys": { + "type": [ + "array", + "null" + ], + "items": { + "type": "string" + } + }, "sample_by": { "type": [ "string",