From b0fb1a09c2e1ceb66a8beab27739c9edfd19daef Mon Sep 17 00:00:00 2001 From: VG Date: Thu, 7 Mar 2024 18:35:20 +0800 Subject: [PATCH 01/13] feat: refactor type mapping --- Cargo.lock | 254 ++++++--------- Cargo.toml | 1 - dozer-sink-clickhouse/Cargo.toml | 5 +- dozer-sink-clickhouse/src/client.rs | 190 +++++++++++ dozer-sink-clickhouse/src/ddl.rs | 163 +++++----- dozer-sink-clickhouse/src/errors.rs | 54 ++++ dozer-sink-clickhouse/src/lib.rs | 468 +--------------------------- dozer-sink-clickhouse/src/schema.rs | 144 +++++---- dozer-sink-clickhouse/src/sink.rs | 224 +++++++++++++ dozer-sink-clickhouse/src/tests.rs | 168 +++++----- dozer-sink-clickhouse/src/types.rs | 325 +++++++++++++++++++ dozer-types/src/json_types.rs | 2 +- 12 files changed, 1140 insertions(+), 858 deletions(-) create mode 100644 dozer-sink-clickhouse/src/client.rs create mode 100644 dozer-sink-clickhouse/src/errors.rs create mode 100644 dozer-sink-clickhouse/src/sink.rs create mode 100644 dozer-sink-clickhouse/src/types.rs diff --git a/Cargo.lock b/Cargo.lock index e342c8e277..7de91cb468 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -98,7 +98,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" dependencies = [ "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -211,7 +211,7 @@ dependencies = [ "actix-router", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -757,7 +757,7 @@ dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", "swc_macros_common", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -787,7 +787,7 @@ checksum = "5fd55a5ba1179988837d24ab4c7cc8ed6efdeff578ede0416b4225a5fca35bd0" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -809,7 +809,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -820,7 +820,7 @@ checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -1020,7 +1020,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.39", + "syn 2.0.52", "which", ] @@ -1041,7 +1041,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -1171,7 +1171,7 @@ dependencies = [ "proc-macro-crate 2.0.0", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", "syn_derive", ] @@ -1217,15 +1217,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "bstr" -version = "1.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "542f33a8835a0884b006a0c3df3dadd99c0c3f296ed26c2fdc8028e01ad6230c" -dependencies = [ - "memchr", -] - [[package]] name = "bumpalo" version = "3.14.0" @@ -1390,9 +1381,9 @@ dependencies = [ [[package]] name = "chrono-tz" -version = "0.8.4" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e23185c0e21df6ed832a12e2bda87c7d1def6842881fb634a8511ced741b0d76" +checksum = "d59ae0466b83e838b81a54256c39d5d7c20b9d7daa10510a242d9b75abd5936e" dependencies = [ "chrono", "chrono-tz-build", @@ -1501,7 +1492,7 @@ dependencies = [ "heck", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -1520,43 +1511,37 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1" [[package]] -name = "clickhouse" -version = "0.11.6" -source = "git+https://github.com/getdozer/clickhouse.rs.git#4c7d6f03caa933aeda5af7bff31536fccb97fba1" +name = "clickhouse-rs" +version = "1.1.0-alpha.1" +source = "git+https://github.com/suharev7/clickhouse-rs#afd8ce54e3882fec943fac42ab8aa4425e2f40e9" dependencies = [ - "bstr", - "bytes", - "clickhouse-derive", + "byteorder", + "cfg-if", + "chrono", + "chrono-tz", "clickhouse-rs-cityhash-sys", - "futures", - "hyper 0.14.27", - "hyper-tls", + "combine", + "crossbeam", + "either", + "futures-core", + "futures-sink", + "futures-util", + "hostname", + "lazy_static", + "log", "lz4", - "quanta 0.12.2", - "sealed", - "serde", - "static_assertions", + "percent-encoding", + "pin-project", "thiserror", "tokio", "url", -] - -[[package]] -name = "clickhouse-derive" -version = "0.1.1" -source = "git+https://github.com/getdozer/clickhouse.rs.git#4c7d6f03caa933aeda5af7bff31536fccb97fba1" -dependencies = [ - "proc-macro2 1.0.78", - "quote 1.0.35", - "serde_derive_internals 0.29.0", - "syn 2.0.39", + "uuid", ] [[package]] name = "clickhouse-rs-cityhash-sys" version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4baf9d4700a28d6cb600e17ed6ae2b43298a5245f1f76b4eab63027ebfd592b9" +source = "git+https://github.com/suharev7/clickhouse-rs#afd8ce54e3882fec943fac42ab8aa4425e2f40e9" dependencies = [ "cc", ] @@ -1974,7 +1959,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -2031,7 +2016,7 @@ dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", "strsim", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -2053,7 +2038,7 @@ checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" dependencies = [ "darling_core 0.20.3", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -2376,7 +2361,7 @@ checksum = "3c65c2ffdafc1564565200967edc4851c7b55422d3913466688907efd05ea26f" dependencies = [ "deno-proc-macro-rules-macros", "proc-macro2 1.0.78", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -2388,7 +2373,7 @@ dependencies = [ "once_cell", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -2800,7 +2785,7 @@ dependencies = [ "regex", "strum 0.25.0", "strum_macros 0.25.3", - "syn 2.0.39", + "syn 2.0.52", "thiserror", ] @@ -3028,7 +3013,7 @@ checksum = "67e77553c4162a157adbf834ebae5b415acbecbeafc7a74b0e886657506a7611" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -3103,7 +3088,7 @@ checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -3412,9 +3397,12 @@ dependencies = [ name = "dozer-sink-clickhouse" version = "0.1.0" dependencies = [ - "clickhouse", + "chrono-tz", + "clickhouse-rs", "dozer-core", "dozer-types", + "either", + "serde", ] [[package]] @@ -3679,9 +3667,9 @@ dependencies = [ [[package]] name = "either" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" +checksum = "11157ac094ffbdde99aa67b23417ebdd801842852b500e395a45a9c0aac03e4a" [[package]] name = "elliptic-curve" @@ -3787,7 +3775,7 @@ dependencies = [ "num-traits", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -3799,7 +3787,7 @@ dependencies = [ "once_cell", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -4141,7 +4129,7 @@ dependencies = [ "pmutil", "proc-macro2 1.0.78", "swc_macros_common", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -4169,7 +4157,7 @@ checksum = "b0fa992f1656e1707946bbba340ad244f0814009ef8c0118eb7b658395f19a2e" dependencies = [ "frunk_proc_macro_helpers", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -4181,7 +4169,7 @@ dependencies = [ "frunk_core", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -4193,7 +4181,7 @@ dependencies = [ "frunk_core", "frunk_proc_macro_helpers", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -4309,7 +4297,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -5146,7 +5134,7 @@ dependencies = [ "pmutil", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -5305,7 +5293,7 @@ dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", "regex", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -5565,6 +5553,9 @@ name = "log" version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +dependencies = [ + "serde", +] [[package]] name = "logos" @@ -5586,7 +5577,7 @@ dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", "regex-syntax 0.6.29", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -5792,7 +5783,7 @@ dependencies = [ "ipnet", "metrics", "metrics-util", - "quanta 0.11.1", + "quanta", "thiserror", "tokio", "tracing", @@ -5806,7 +5797,7 @@ checksum = "ddece26afd34c31585c74a4db0630c376df271c285d682d1e55012197830b6df" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -5820,7 +5811,7 @@ dependencies = [ "hashbrown 0.13.1", "metrics", "num_cpus", - "quanta 0.11.1", + "quanta", "sketches-ddsketch", ] @@ -5942,7 +5933,7 @@ dependencies = [ "proc-macro-error 1.0.4", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", "termcolor", "thiserror", ] @@ -6436,7 +6427,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -6914,7 +6905,7 @@ dependencies = [ "pest_meta", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -7048,7 +7039,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -7143,7 +7134,7 @@ checksum = "52a40bc70c2c58040d2d8b167ba9a5ff59fc9dab7ad44771cfde3dcfde7a09c6" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -7244,7 +7235,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae005bd773ab59b4725093fd7df83fd7892f7d8eafb48dbd7de6e024e4215f9d" dependencies = [ "proc-macro2 1.0.78", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -7477,7 +7468,7 @@ dependencies = [ "prost 0.12.3", "prost-types 0.12.3", "regex", - "syn 2.0.39", + "syn 2.0.52", "tempfile", "which", ] @@ -7505,7 +7496,7 @@ dependencies = [ "itertools 0.11.0", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -7662,22 +7653,7 @@ dependencies = [ "libc", "mach2", "once_cell", - "raw-cpuid 10.7.0", - "wasi 0.11.0+wasi-snapshot-preview1", - "web-sys", - "winapi", -] - -[[package]] -name = "quanta" -version = "0.12.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ca0b7bac0b97248c40bb77288fc52029cf1459c0461ea1b05ee32ccf011de2c" -dependencies = [ - "crossbeam-utils", - "libc", - "once_cell", - "raw-cpuid 11.0.1", + "raw-cpuid", "wasi 0.11.0+wasi-snapshot-preview1", "web-sys", "winapi", @@ -7826,15 +7802,6 @@ dependencies = [ "bitflags 1.3.2", ] -[[package]] -name = "raw-cpuid" -version = "11.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d86a7c4638d42c44551f4791a20e687dbb4c3de1f33c43dd71e355cd429def1" -dependencies = [ - "bitflags 2.4.1", -] - [[package]] name = "raw-window-handle" version = "0.5.2" @@ -8438,7 +8405,7 @@ checksum = "5a32af5427251d2e4be14fc151eabe18abb4a7aad5efee7044da9f096c906a43" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -8515,7 +8482,7 @@ checksum = "c767fd6fa65d9ccf9cf026122c1b555f2ef9a4f0cea69da4d7dbc3e258d30967" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "serde_derive_internals 0.26.0", + "serde_derive_internals", "syn 1.0.109", ] @@ -8565,18 +8532,6 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" -[[package]] -name = "sealed" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4a8caec23b7800fb97971a1c6ae365b6239aaeddfb934d6265f8505e795699d" -dependencies = [ - "heck", - "proc-macro2 1.0.78", - "quote 1.0.35", - "syn 2.0.39", -] - [[package]] name = "sec1" version = "0.3.0" @@ -8694,9 +8649,9 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.193" +version = "1.0.197" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89" +checksum = "3fb1c873e1b9b056a4dc4c0c198b24c3ffa059243875552b2bd0933b1aee4ce2" dependencies = [ "serde_derive", ] @@ -8722,13 +8677,13 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.193" +version = "1.0.197" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" +checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -8742,17 +8697,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "serde_derive_internals" -version = "0.29.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "330f01ce65a3a5fe59a60c82f3c9a024b573b8a6e875bd233fe5f934e71d54e3" -dependencies = [ - "proc-macro2 1.0.78", - "quote 1.0.35", - "syn 2.0.39", -] - [[package]] name = "serde_json" version = "1.0.108" @@ -8875,7 +8819,7 @@ checksum = "91d129178576168c589c9ec973feedf7d3126c01ac2bf08795109aa35b69fb8f" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -9290,7 +9234,7 @@ dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", "swc_macros_common", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -9348,7 +9292,7 @@ dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", "rustversion", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -9440,7 +9384,7 @@ dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", "swc_macros_common", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -9489,7 +9433,7 @@ dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", "swc_macros_common", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -9572,7 +9516,7 @@ dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", "swc_macros_common", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -9676,7 +9620,7 @@ dependencies = [ "pmutil", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -9688,7 +9632,7 @@ dependencies = [ "pmutil", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -9712,7 +9656,7 @@ dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", "swc_macros_common", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -9739,9 +9683,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.39" +version = "2.0.52" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23e78b90f2fcf45d3e842032ce32e3f2d1545ba6636271dcbf24fa306d87be7a" +checksum = "b699d15b36d1f02c3e7c69f8ffef53de37aefae075d8488d4ba1a7788d574a07" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", @@ -9768,7 +9712,7 @@ dependencies = [ "proc-macro-error 1.0.4", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -9914,7 +9858,7 @@ checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -10059,7 +10003,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -10291,7 +10235,7 @@ dependencies = [ "proc-macro2 1.0.78", "prost-build 0.12.3", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -10409,7 +10353,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -10997,7 +10941,7 @@ dependencies = [ "once_cell", "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", "wasm-bindgen-shared", ] @@ -11031,7 +10975,7 @@ checksum = "c5353b8dab669f5e10f5bd76df26a9360c748f054f862ff5f3f8aae0c7fb3907" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -11490,7 +11434,7 @@ checksum = "855e0f6af9cd72b87d8a6c586f3cb583f5cdcc62c2c80869d8cd7e96fdf7ee20" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -11501,7 +11445,7 @@ checksum = "dd7e48ccf166952882ca8bd778a43502c64f33bf94c12ebe2a7f08e5a0f6689f" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] @@ -11521,7 +11465,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", - "syn 2.0.39", + "syn 2.0.52", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index fac522e8f5..4805745e66 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,6 @@ resolver = "2" [workspace.dependencies] bincode = { version = "2.0.0-rc.3", features = ["derive"] } datafusion = { version = "33.0.0" } -datafusion-expr = { version = "33.0.0" } [patch.crates-io] postgres = { git = "https://github.com/getdozer/rust-postgres" } diff --git a/dozer-sink-clickhouse/Cargo.toml b/dozer-sink-clickhouse/Cargo.toml index ce1851ad8e..4f26da67a2 100644 --- a/dozer-sink-clickhouse/Cargo.toml +++ b/dozer-sink-clickhouse/Cargo.toml @@ -8,4 +8,7 @@ edition = "2021" [dependencies] dozer-core = { path = "../dozer-core" } dozer-types = { path = "../dozer-types" } -clickhouse = { git = "https://github.com/getdozer/clickhouse.rs.git" } +clickhouse-rs = { git = "https://github.com/suharev7/clickhouse-rs" } +either = "1.10.0" +chrono-tz = "0.8.6" +serde = "1.0.197" diff --git a/dozer-sink-clickhouse/src/client.rs b/dozer-sink-clickhouse/src/client.rs new file mode 100644 index 0000000000..edb9d28a3a --- /dev/null +++ b/dozer-sink-clickhouse/src/client.rs @@ -0,0 +1,190 @@ +#![allow(dead_code)] +use super::ddl::get_create_table_query; +use super::types::ValueWrapper; +use crate::errors::QueryError; +use crate::types::{insert_multi, map_value_wrapper_to_field}; +use clickhouse_rs::{ClientHandle, Pool}; +use dozer_types::types::{Field, FieldDefinition}; +use serde::Serialize; + +pub struct SqlResult { + pub rows: Vec>, +} + +#[derive(Clone)] +pub struct ClickhouseClient { + pool: Pool, +} +#[derive(Debug, Clone, Serialize)] +pub struct QueryId(pub String); + +#[derive(Debug, Clone, Serialize)] +#[serde(crate = "dozer_types::serde")] + +pub struct QueryLog { + pub query_duration_ms: u64, + pub read_rows: u64, + pub read_bytes: u64, + pub written_rows: u64, + pub written_bytes: u64, + pub result_rows: u64, + pub result_bytes: u64, + pub memory_usage: u64, +} + +impl ClickhouseClient { + pub fn root() -> Self { + let pool = Pool::new("tcp://default@localhost:9000/default"); + + Self { pool } + } + + pub fn new(app_id: i32) -> Self { + let pool = Pool::new( + format!("tcp://default@localhost:9000/db_{app_id}", app_id = app_id).as_str(), + ); + Self { pool } + } + + pub fn get_log_query(log_comment: &str) -> String { + format!( + r#" + SELECT + query_duration_ms, + read_rows, + read_bytes, + written_rows, + written_bytes, + result_rows, + result_bytes, + memory_usage + FROM system.query_log WHERE + log_comment = '{}' + "#, + log_comment + ) + } + pub async fn get_client_handle(&self) -> Result { + let client = self.pool.get_handle().await?; + Ok(client) + } + + pub async fn drop_table(&self, datasource_name: &str) -> Result<(), QueryError> { + let mut client = self.pool.get_handle().await?; + let ddl = format!("DROP TABLE IF EXISTS {}", datasource_name); + println!("#{ddl}"); + client.execute(ddl).await?; + Ok(()) + } + + pub async fn create_table( + &self, + datasource_name: &str, + fields: &[FieldDefinition], + ) -> Result<(), QueryError> { + let mut client = self.pool.get_handle().await?; + let ddl = get_create_table_query(datasource_name, fields, None, None); + println!("#{ddl}"); + client.execute(ddl).await?; + Ok(()) + } + + pub async fn fetch_all( + &self, + query: &str, + schema: Vec, + query_id: Option, + ) -> Result { + let mut client = self.pool.get_handle().await?; + // TODO: query_id doesnt work + // https://github.com/suharev7/clickhouse-rs/issues/176 + // let query = Query::new(sql).id(query_id.to_string()) + let query = query_id.map_or(query.to_string(), |id| { + format!("{0} settings log_comment = '{1}'", query, id) + }); + + let block = client.query(&query).fetch_all().await?; + + let mut rows: Vec> = vec![]; + for row in block.rows() { + let mut row_data = vec![]; + for (idx, field) in schema.clone().into_iter().enumerate() { + let v: ValueWrapper = row.get(idx)?; + row_data.push(map_value_wrapper_to_field(v, field)?); + } + rows.push(row_data); + } + + Ok(SqlResult { rows }) + } + + pub async fn _fetch_query_log(&self, query_id: String) -> Result { + let mut client = self.pool.get_handle().await?; + let query = Self::get_log_query(&query_id); + let block = client.query(query).fetch_all().await?; + let first_row = block.rows().next(); + + if let Some(row) = first_row { + let query_log = QueryLog { + query_duration_ms: row.get("query_duration_ms")?, + read_rows: row.get("read_rows")?, + read_bytes: row.get("read_bytes")?, + written_rows: row.get("written_rows")?, + written_bytes: row.get("written_bytes")?, + result_rows: row.get("result_rows")?, + result_bytes: row.get("result_bytes")?, + memory_usage: row.get("memory_usage")?, + }; + Ok(query_log) + } else { + Err(QueryError::CustomError(format!( + "No query log found for {0}", + query_id + ))) + } + } + + pub async fn check_table(&self, table_name: &str) -> Result { + let mut client = self.pool.get_handle().await?; + let query = format!("CHECK TABLE {}", table_name); + client.query(query).fetch_all().await?; + + // if error not found, table exists + Ok(true) + } + + pub async fn create_materialized_view( + &self, + name: &str, + target_table: &str, + query: &str, + ) -> Result<(), QueryError> { + let mut client = self.pool.get_handle().await?; + let ddl = format!( + "CREATE MATERIALIZED VIEW {} TO {} AS {}", + name, target_table, query + ); + client.execute(ddl).await?; + Ok(()) + } + + pub async fn insert( + &self, + table_name: &str, + fields: &[FieldDefinition], + values: &[Field], + ) -> Result<(), QueryError> { + let client = self.pool.get_handle().await?; + insert_multi(client, table_name, fields, &[values.to_vec()]).await + } + + pub async fn insert_multi( + &self, + table_name: &str, + fields: &[FieldDefinition], + values: &[Vec], + ) -> Result<(), QueryError> { + let client = self.pool.get_handle().await?; + insert_multi(client, table_name, fields, values).await + } +} diff --git a/dozer-sink-clickhouse/src/ddl.rs b/dozer-sink-clickhouse/src/ddl.rs index 94fbbddd05..9b2e33b51f 100644 --- a/dozer-sink-clickhouse/src/ddl.rs +++ b/dozer-sink-clickhouse/src/ddl.rs @@ -1,70 +1,66 @@ use dozer_types::log::warn; use dozer_types::models::sink::ClickhouseSinkTableOptions; -use dozer_types::types::{FieldDefinition, FieldType, Schema}; - -pub struct ClickhouseDDL {} +use dozer_types::types::{FieldDefinition, FieldType}; const DEFAULT_TABLE_ENGINE: &str = "MergeTree()"; -impl ClickhouseDDL { - pub fn get_create_table_query( - table_name: String, - schema: Schema, - sink_options: Option, - primary_keys: Option>, - ) -> String { - let mut parts = schema - .fields - .iter() - .map(|field| { - let typ = Self::map_field_to_type(field); - format!("{} {}", field.name, typ) - }) - .collect::>(); +pub fn get_create_table_query( + table_name: &str, + fields: &[FieldDefinition], + sink_options: Option, + primary_keys: Option>, +) -> String { + let mut parts = fields + .iter() + .map(|field| { + let typ = map_field_to_type(field); + format!("{} {}", field.name, typ) + }) + .collect::>(); - let engine = sink_options - .as_ref() - .map(|options| { - options - .engine - .clone() - .unwrap_or_else(|| DEFAULT_TABLE_ENGINE.to_string()) - }) - .unwrap_or_else(|| DEFAULT_TABLE_ENGINE.to_string()); + let engine = sink_options + .as_ref() + .map(|options| { + options + .engine + .clone() + .unwrap_or_else(|| DEFAULT_TABLE_ENGINE.to_string()) + }) + .unwrap_or_else(|| DEFAULT_TABLE_ENGINE.to_string()); - if let Some(pk) = primary_keys { - parts.push(format!("PRIMARY KEY ({})", pk.join(", "))); - } + if let Some(pk) = primary_keys { + parts.push(format!("PRIMARY KEY ({})", pk.join(", "))); + } - let query = parts.join(",\n"); + let query = parts.join(",\n"); - let partition_by = sink_options - .as_ref() - .and_then(|options| options.partition_by.clone()) - .map_or("".to_string(), |partition_by| { - format!("PARTITION BY {}\n", partition_by) - }); - let sample_by = sink_options - .as_ref() - .and_then(|options| options.sample_by.clone()) - .map_or("".to_string(), |partition_by| { - format!("SAMPLE BY {}\n", partition_by) - }); - let order_by = sink_options - .as_ref() - .and_then(|options| options.order_by.clone()) - .map_or("".to_string(), |order_by| { - format!("ORDER BY ({})\n", order_by.join(", ")) - }); - let cluster = sink_options - .as_ref() - .and_then(|options| options.cluster.clone()) - .map_or("".to_string(), |cluster| { - format!("ON CLUSTER {}\n", cluster) - }); + let partition_by = sink_options + .as_ref() + .and_then(|options| options.partition_by.clone()) + .map_or("".to_string(), |partition_by| { + format!("PARTITION BY {}\n", partition_by) + }); + let sample_by = sink_options + .as_ref() + .and_then(|options| options.sample_by.clone()) + .map_or("".to_string(), |partition_by| { + format!("SAMPLE BY {}\n", partition_by) + }); + let order_by = sink_options + .as_ref() + .and_then(|options| options.order_by.clone()) + .map_or("".to_string(), |order_by| { + format!("ORDER BY ({})\n", order_by.join(", ")) + }); + let cluster = sink_options + .as_ref() + .and_then(|options| options.cluster.clone()) + .map_or("".to_string(), |cluster| { + format!("ON CLUSTER {}\n", cluster) + }); - format!( - "CREATE TABLE IF NOT EXISTS {table_name} {cluster} ( + format!( + "CREATE TABLE IF NOT EXISTS {table_name} {cluster} ( {query} ) ENGINE = {engine} @@ -72,37 +68,36 @@ impl ClickhouseDDL { {partition_by} {sample_by} ", - ) - } + ) +} - pub fn map_field_to_type(field: &FieldDefinition) -> String { - let typ = match field.typ { - FieldType::UInt => "UInt64", - FieldType::U128 => "UInt128", - FieldType::Int => "Int64", - FieldType::I128 => "Int128", - FieldType::Float => "Float64", - FieldType::Boolean => "Boolean", - FieldType::String => "String", - FieldType::Text => "String", - FieldType::Binary => "Array(UInt8)", - FieldType::Decimal => "Decimal", - FieldType::Timestamp => "DateTime64(3)", - FieldType::Date => "Date", - FieldType::Json => "JSON", - FieldType::Point => "Point", - FieldType::Duration => unimplemented!(), - }; +pub fn map_field_to_type(field: &FieldDefinition) -> String { + let typ = match field.typ { + FieldType::UInt => "UInt64", + FieldType::U128 => "UInt128", + FieldType::Int => "Int64", + FieldType::I128 => "Int128", + FieldType::Float => "Float64", + FieldType::Boolean => "Boolean", + FieldType::String => "String", + FieldType::Text => "String", + FieldType::Binary => "Array(UInt8)", + FieldType::Decimal => "Decimal", + FieldType::Timestamp => "DateTime64(3)", + FieldType::Date => "Date", + FieldType::Json => "JSON", + FieldType::Point => "Point", + FieldType::Duration => unimplemented!(), + }; - if field.nullable { - if field.typ != FieldType::Binary { - format!("Nullable({})", typ) - } else { - warn!("Binary field cannot be nullable, ignoring nullable flag"); - typ.to_string() - } + if field.nullable { + if field.typ != FieldType::Binary { + format!("Nullable({})", typ) } else { + warn!("Binary field cannot be nullable, ignoring nullable flag"); typ.to_string() } + } else { + typ.to_string() } } diff --git a/dozer-sink-clickhouse/src/errors.rs b/dozer-sink-clickhouse/src/errors.rs new file mode 100644 index 0000000000..02b8d8e7f1 --- /dev/null +++ b/dozer-sink-clickhouse/src/errors.rs @@ -0,0 +1,54 @@ +use clickhouse_rs::types::SqlType; +use dozer_types::{ + thiserror::{self, Error}, + types::FieldDefinition, +}; +pub const BATCH_SIZE: usize = 100; + +#[derive(Error, Debug)] +pub enum ClickhouseSinkError { + #[error("Only MergeTree engine is supported for delete operation")] + UnsupportedOperation, + + #[error("Column {0} not found in sink table")] + ColumnNotFound(String), + + #[error("Column {0} has type {1} in dozer schema but type {2} in sink table")] + ColumnTypeMismatch(String, String, String), + + #[error("Clickhouse error: {0:?}")] + ClickhouseError(#[from] clickhouse_rs::errors::Error), + + #[error("Primary key not found")] + PrimaryKeyNotFound, + + #[error("Type {1} is not supported for column {0}")] + TypeNotSupported(String, String), + + #[error("Sink table does not exist and create_table_options is not set")] + SinkTableDoesNotExist, + + #[error("Expected primary key {0:?} but got {1:?}")] + PrimaryKeyMismatch(Vec, Vec), + + #[error("Schema field not found by index {0}")] + SchemaFieldNotFoundByIndex(usize), + + #[error("QueryError: {0:?}")] + QueryError(#[from] QueryError), +} + +#[derive(Error, Debug)] +pub enum QueryError { + #[error("Clickhouse error: {0:?}")] + DataFetchError(#[from] clickhouse_rs::errors::Error), + + #[error("Unsupported type: {0:?}")] + UnsupportedType(SqlType), + + #[error("Schema has type {0:?} but value is of type {1:?}")] + TypeMismatch(FieldDefinition, SqlType), + + #[error("{0:?}")] + CustomError(String), +} diff --git a/dozer-sink-clickhouse/src/lib.rs b/dozer-sink-clickhouse/src/lib.rs index b0c9051b3b..75f98504ca 100644 --- a/dozer-sink-clickhouse/src/lib.rs +++ b/dozer-sink-clickhouse/src/lib.rs @@ -1,467 +1,9 @@ +mod client; mod ddl; +mod errors; mod schema; +mod sink; +pub use sink::ClickhouseSinkFactory; #[cfg(test)] mod tests; - -use crate::ClickhouseSinkError::{ - PrimaryKeyNotFound, SchemaFieldNotFoundByIndex, UnsupportedOperation, -}; -use clickhouse::inserter::Inserter; -use clickhouse::Client; -use dozer_core::epoch::Epoch; -use dozer_core::event::EventHub; -use dozer_core::node::{PortHandle, Sink, SinkFactory}; -use dozer_core::tokio::runtime::Runtime; -use dozer_core::DEFAULT_PORT_HANDLE; -use dozer_types::errors::internal::BoxedError; -use dozer_types::log::debug; -use dozer_types::models::sink::ClickhouseSinkConfig; -use dozer_types::node::OpIdentifier; -use dozer_types::serde::Serialize; -use dozer_types::tonic::async_trait; -use dozer_types::types::{ - DozerDuration, DozerPoint, Field, FieldType, Operation, Record, Schema, TableOperation, -}; -use std::collections::HashMap; -use std::fmt::Debug; -use std::sync::Arc; - -use crate::schema::{ClickhouseSchema, ClickhouseTable}; -use dozer_types::chrono::{DateTime, FixedOffset, NaiveDate}; -use dozer_types::json_types::JsonValue; -use dozer_types::ordered_float::OrderedFloat; -use dozer_types::rust_decimal::Decimal; -use dozer_types::serde_bytes; -use dozer_types::thiserror::{self, Error}; - -pub const BATCH_SIZE: usize = 100; - -#[derive(Error, Debug)] -enum ClickhouseSinkError { - #[error("Only MergeTree engine is supported for delete operation")] - UnsupportedOperation, - - #[error("Column {0} not found in sink table")] - ColumnNotFound(String), - - #[error("Column {0} has type {1} in dozer schema but type {2} in sink table")] - ColumnTypeMismatch(String, String, String), - - #[error("Clickhouse query error: {0}")] - ClickhouseQueryError(#[from] clickhouse::error::Error), - - #[error("Primary key not found")] - PrimaryKeyNotFound, - - #[error("Type {1} is not supported for column {0}")] - TypeNotSupported(String, String), - - #[error("Sink table does not exist and create_table_options is not set")] - SinkTableDoesNotExist, - - #[error("Expected primary key {0:?} but got {1:?}")] - PrimaryKeyMismatch(Vec, Vec), - - #[error("Schema field not found by index {0}")] - SchemaFieldNotFoundByIndex(usize), -} - -#[derive(Debug)] -pub struct ClickhouseSinkFactory { - runtime: Arc, - config: ClickhouseSinkConfig, -} - -#[derive(Debug, Serialize)] -#[serde(crate = "dozer_types::serde", untagged)] -pub enum FieldWrapper { - UInt(u64), - U128(u128), - Int(i64), - I128(i128), - Float(#[cfg_attr(feature= "arbitrary", arbitrary(with = arbitrary_float))] OrderedFloat), - Boolean(bool), - String(String), - Text(String), - Binary(#[serde(with = "serde_bytes")] Vec), - Decimal(Decimal), - Timestamp(DateTime), - Date(NaiveDate), - Json(#[cfg_attr(feature= "arbitrary", arbitrary(with = arb_json::arbitrary_json))] JsonValue), - Point(DozerPoint), - Duration(DozerDuration), - OptionalUInt(Option), - OptionalU128(Option), - OptionalInt(Option), - OptionalI128(Option), - OptionalFloat( - #[cfg_attr(feature= "arbitrary", arbitrary(with = arbitrary_float))] - Option>, - ), - OptionalBoolean(Option), - OptionalString(Option), - OptionalText(Option), - OptionalDecimal(Option), - OptionalTimestamp(Option>), - OptionalDate(Option), - OptionalJson( - #[cfg_attr(feature= "arbitrary", arbitrary(with = arb_json::arbitrary_json))] - Option, - ), - OptionalPoint(Option), - OptionalDuration(Option), - Null(Option<()>), -} - -fn convert_field_to_ff(field: Field, nullable: bool) -> FieldWrapper { - if nullable { - match field { - Field::UInt(v) => FieldWrapper::OptionalUInt(Some(v)), - Field::U128(v) => FieldWrapper::OptionalU128(Some(v)), - Field::Int(v) => FieldWrapper::OptionalInt(Some(v)), - Field::I128(v) => FieldWrapper::OptionalI128(Some(v)), - Field::Float(v) => FieldWrapper::OptionalFloat(Some(v)), - Field::Boolean(v) => FieldWrapper::OptionalBoolean(Some(v)), - Field::String(v) => FieldWrapper::OptionalString(Some(v)), - Field::Text(v) => FieldWrapper::OptionalText(Some(v)), - Field::Binary(v) => FieldWrapper::Binary(v), - Field::Decimal(v) => FieldWrapper::OptionalDecimal(Some(v)), - Field::Timestamp(v) => FieldWrapper::OptionalTimestamp(Some(v)), - Field::Date(v) => FieldWrapper::OptionalDate(Some(v)), - Field::Json(v) => FieldWrapper::OptionalJson(Some(v)), - Field::Point(v) => FieldWrapper::OptionalPoint(Some(v)), - Field::Duration(v) => FieldWrapper::OptionalDuration(Some(v)), - Field::Null => FieldWrapper::Null(None), - } - } else { - match field { - Field::UInt(v) => FieldWrapper::UInt(v), - Field::U128(v) => FieldWrapper::U128(v), - Field::Int(v) => FieldWrapper::Int(v), - Field::I128(v) => FieldWrapper::I128(v), - Field::Float(v) => FieldWrapper::Float(v), - Field::Boolean(v) => FieldWrapper::Boolean(v), - Field::String(v) => FieldWrapper::String(v), - Field::Text(v) => FieldWrapper::Text(v), - Field::Binary(v) => FieldWrapper::Binary(v), - Field::Decimal(v) => FieldWrapper::Decimal(v), - Field::Timestamp(v) => FieldWrapper::Timestamp(v), - Field::Date(v) => FieldWrapper::Date(v), - Field::Json(v) => FieldWrapper::Json(v), - Field::Point(v) => FieldWrapper::Point(v), - Field::Duration(v) => FieldWrapper::Duration(v), - Field::Null => FieldWrapper::Null(None), - } - } -} -impl ClickhouseSinkFactory { - pub fn new(config: ClickhouseSinkConfig, runtime: Arc) -> Self { - Self { config, runtime } - } -} - -#[async_trait] -impl SinkFactory for ClickhouseSinkFactory { - fn type_name(&self) -> String { - "clickhouse".to_string() - } - - fn get_input_ports(&self) -> Vec { - vec![DEFAULT_PORT_HANDLE] - } - - fn get_input_port_name(&self, _port: &PortHandle) -> String { - self.config.source_table_name.clone() - } - - fn prepare(&self, input_schemas: HashMap) -> Result<(), BoxedError> { - debug_assert!(input_schemas.len() == 1); - Ok(()) - } - - async fn build( - &self, - mut input_schemas: HashMap, - _event_hub: EventHub, - ) -> Result, BoxedError> { - let schema = input_schemas.remove(&DEFAULT_PORT_HANDLE).unwrap(); - let mut client = Client::default() - .with_url(self.config.database_url.clone()) - .with_user(self.config.user.clone()) - .with_database(self.config.database.clone()); - - if let Some(password) = self.config.password.clone() { - client = client.with_password(password); - } - - let table = ClickhouseSchema::get_clickhouse_table(&client, &self.config, &schema).await?; - let primary_key_field_names = - ClickhouseSchema::get_primary_keys(&client, &self.config).await?; - - let primary_key_fields_indexes: Result, ClickhouseSinkError> = - primary_key_field_names - .iter() - .map(|primary_key| { - schema - .fields - .iter() - .position(|field| field.name == *primary_key) - .ok_or(PrimaryKeyNotFound) - }) - .collect(); - - let sink = ClickhouseSink::new( - client, - self.config.clone(), - schema, - self.runtime.clone(), - table, - primary_key_fields_indexes?, - ); - - Ok(Box::new(sink)) - } -} - -pub(crate) struct ClickhouseSink { - pub(crate) client: Client, - pub(crate) runtime: Arc, - pub(crate) schema: Schema, - pub(crate) inserter: Inserter, - pub(crate) sink_table_name: String, - pub(crate) table: ClickhouseTable, - pub(crate) primary_key_fields_indexes: Vec, -} - -impl Debug for ClickhouseSink { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ClickhouseSink") - .field("sink_table_name", &self.sink_table_name) - .field( - "primary_key_fields_indexes", - &self.primary_key_fields_indexes, - ) - .field("table", &self.table) - .field("schema", &self.schema) - .finish() - } -} - -impl ClickhouseSink { - pub fn new( - client: Client, - config: ClickhouseSinkConfig, - schema: Schema, - runtime: Arc, - table: ClickhouseTable, - primary_key_fields_indexes: Vec, - ) -> Self { - let fields_list = schema - .fields - .iter() - .map(|field| field.name.as_str()) - .collect::>(); - - let inserter = client - .inserter(&config.sink_table_name, fields_list.as_slice()) - .unwrap() - .with_max_rows((BATCH_SIZE * schema.fields.len()).try_into().unwrap()); - - Self { - client, - runtime, - schema, - inserter, - sink_table_name: config.sink_table_name, - table, - primary_key_fields_indexes, - } - } - - pub fn commit_insert(&mut self) -> Result<(), BoxedError> { - self.runtime.block_on(async { - let stats = self.inserter.commit().await?; - if stats.rows > 0 { - debug!( - "{} bytes, {} rows, {} transactions have been inserted", - stats.bytes, stats.rows, stats.transactions, - ); - } - - Ok::<(), BoxedError>(()) - }) - } - - fn map_fields(&self, record: Record) -> Result, ClickhouseSinkError> { - record - .values - .into_iter() - .enumerate() - .map(|(index, mut field)| match self.schema.fields.get(index) { - Some(schema_field) => { - if schema_field.r#typ == FieldType::Binary && Field::Null == field { - field = Field::Binary(vec![]); - } - - Ok(convert_field_to_ff(field.clone(), schema_field.nullable)) - } - None => Err(SchemaFieldNotFoundByIndex(index)), - }) - .collect() - } -} - -impl Sink for ClickhouseSink { - fn commit(&mut self, _epoch_details: &Epoch) -> Result<(), BoxedError> { - self.commit_insert() - } - - fn process(&mut self, op: TableOperation) -> Result<(), BoxedError> { - match op.op { - Operation::Insert { new } => { - let values = self.map_fields(new)?; - self.runtime.block_on(async { - for value in values { - self.inserter.write(&value)?; - } - - Ok::<(), BoxedError>(()) - })?; - - self.commit_insert()?; - } - Operation::Delete { old } => { - if self.table.engine != "MergeTree" { - return Err(BoxedError::from(UnsupportedOperation)); - } - let mut conditions = vec![]; - - for (index, field) in old.values.iter().enumerate() { - if *field != Field::Null { - let field_name = self.schema.fields.get(index).unwrap().name.clone(); - conditions.push(format!("{field_name} = ?")); - } - } - - let mut query = self.client.query(&format!( - "DELETE FROM {table_name} WHERE {condition}", - condition = conditions.join(" AND "), - table_name = self.sink_table_name - )); - - for field in old.values.iter() { - if *field != Field::Null { - query = query.bind(field); - } - } - - self.runtime.block_on(async { - query.execute().await.unwrap(); - - Ok::<(), BoxedError>(()) - })?; - } - Operation::Update { new, old } => { - let mut updates = vec![]; - - for (index, field) in new.values.iter().enumerate() { - if self.primary_key_fields_indexes.contains(&index) { - continue; - } - let schema_field = self.schema.fields.get(index).unwrap(); - let field_name = schema_field.name.clone(); - if *field == Field::Null { - updates.push(format!("{field_name} = NULL")); - } else { - updates.push(format!("{field_name} = ?")); - } - } - - let mut conditions = vec![]; - - for (index, field) in old.values.iter().enumerate() { - if !self.primary_key_fields_indexes.contains(&index) { - continue; - } - - if *field != Field::Null { - let field_name = self.schema.fields.get(index).unwrap().name.clone(); - conditions.push(format!("{field_name} = ?")); - } - } - - let mut query = self.client.query(&format!( - "ALTER TABLE {table_name} UPDATE {updates} WHERE {condition}", - condition = conditions.join(" AND "), - updates = updates.join(", "), - table_name = self.sink_table_name - )); - - for (index, field) in new.values.iter().enumerate() { - if *field == Field::Null || self.primary_key_fields_indexes.contains(&index) { - continue; - } - - query = query.bind(field); - } - - for (index, field) in old.values.iter().enumerate() { - if !self.primary_key_fields_indexes.contains(&index) { - continue; - } - - if *field != Field::Null { - query = query.bind(field); - } - } - - self.runtime.block_on(async { - query.execute().await.unwrap(); - - Ok::<(), BoxedError>(()) - })?; - } - Operation::BatchInsert { new } => { - for record in new { - for f in self.map_fields(record)? { - self.runtime.block_on(async { - self.inserter.write(&f)?; - Ok::<(), BoxedError>(()) - })?; - } - } - - self.commit_insert()?; - } - } - - Ok(()) - } - - fn on_source_snapshotting_started( - &mut self, - _connection_name: String, - ) -> Result<(), BoxedError> { - Ok(()) - } - - fn on_source_snapshotting_done( - &mut self, - _connection_name: String, - _id: Option, - ) -> Result<(), BoxedError> { - Ok(()) - } - - fn set_source_state(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> { - Ok(()) - } - - fn get_source_state(&mut self) -> Result>, BoxedError> { - Ok(None) - } - - fn get_latest_op_id(&mut self) -> Result, BoxedError> { - Ok(None) - } -} +mod types; diff --git a/dozer-sink-clickhouse/src/schema.rs b/dozer-sink-clickhouse/src/schema.rs index 56488af879..37faae2758 100644 --- a/dozer-sink-clickhouse/src/schema.rs +++ b/dozer-sink-clickhouse/src/schema.rs @@ -1,24 +1,21 @@ -use crate::ClickhouseSinkError::SinkTableDoesNotExist; -use crate::{ddl, ClickhouseSinkError}; -use clickhouse::{Client, Row}; +use crate::client::ClickhouseClient; +use crate::ddl; +use crate::errors::ClickhouseSinkError::{self, SinkTableDoesNotExist}; +use clickhouse_rs::types::Complex; +use clickhouse_rs::{Block, ClientHandle}; use dozer_types::errors::internal::BoxedError; use dozer_types::models::sink::ClickhouseSinkConfig; use dozer_types::serde::{Deserialize, Serialize}; use dozer_types::types::{FieldType, Schema}; -#[derive(Debug, Row, Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize)] #[serde(crate = "dozer_types::serde")] -pub(crate) struct ClickhouseSchemaColumn { - pub(crate) name: String, - pub(crate) r#type: String, - pub(crate) default_type: String, - pub(crate) default_expression: String, - pub(crate) comment: String, - pub(crate) codec_expression: String, - pub(crate) ttl_expression: String, +pub struct ClickhouseSchemaColumn { + name: String, + type_: String, } -#[derive(Debug, Row, Deserialize, Serialize, Clone)] +#[derive(Debug, Deserialize, Serialize, Clone)] #[serde(crate = "dozer_types::serde")] pub(crate) struct ClickhouseTable { pub(crate) database: String, @@ -27,7 +24,7 @@ pub(crate) struct ClickhouseTable { pub(crate) engine_full: String, } -#[derive(Debug, Row, Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize)] #[serde(crate = "dozer_types::serde")] pub(crate) struct ClickhouseKeyColumnDef { pub(crate) column_name: Option, @@ -39,47 +36,27 @@ pub struct ClickhouseSchema {} impl ClickhouseSchema { pub async fn get_clickhouse_table( - client: &Client, + client: ClickhouseClient, config: &ClickhouseSinkConfig, - dozer_schema: &Schema, ) -> Result { - match Self::fetch_sink_table_info(client, &config.sink_table_name).await { - Ok(table) => { - ClickhouseSchema::compare_with_dozer_schema( - client, - dozer_schema.clone(), - table.clone(), - ) - .await?; - Ok(table) - } - Err(ClickhouseSinkError::ClickhouseQueryError( - clickhouse::error::Error::RowNotFound, - )) => { - if config.create_table_options.is_none() { - Err(SinkTableDoesNotExist) - } else { - let create_table_query = ddl::ClickhouseDDL::get_create_table_query( - config.sink_table_name.clone(), - dozer_schema.clone(), - config.create_table_options.clone(), - config.primary_keys.clone(), - ); - - client.query(&create_table_query).execute().await?; - Self::fetch_sink_table_info(client, &config.sink_table_name).await - } - } - Err(e) => Err(e), + let mut client = client.get_client_handle().await?; + let query = format!("DESCRIBE TABLE {}", config.sink_table_name); + let block: Block = client.query(&query).fetch_all().await?; + + if block.row_count() == 0 { + Err(SinkTableDoesNotExist) + } else { + Self::fetch_sink_table_info(client, &config.sink_table_name).await } } pub async fn get_primary_keys( - client: &Client, + client: ClickhouseClient, config: &ClickhouseSinkConfig, ) -> Result, BoxedError> { + let handle = client.get_client_handle().await?; let existing_pk = - Self::fetch_primary_keys(client, &config.sink_table_name, &config.database).await?; + Self::fetch_primary_keys(handle, &config.sink_table_name, &config.database).await?; if let Some(expected_pk) = &config.primary_keys { if expected_pk.len() != existing_pk.len() { @@ -104,19 +81,31 @@ impl ClickhouseSchema { Ok(existing_pk) } async fn compare_with_dozer_schema( - client: &Client, + mut client: ClientHandle, schema: Schema, table: ClickhouseTable, ) -> Result<(), ClickhouseSinkError> { - let columns: Vec = client + let block: Block = client .query(&format!( "DESCRIBE TABLE {database}.{table_name}", table_name = table.name, database = table.database )) - .fetch_all::() + .fetch_all() .await?; + let columns: Vec = block + .rows() + .map(|row| { + let column_name: String = row.get("name").unwrap(); + let column_type: String = row.get("type").unwrap(); + ClickhouseSchemaColumn { + name: column_name, + type_: column_type, + } + }) + .collect(); + for field in schema.fields { let Some(column) = columns.iter().find(|column| column.name == field.name) else { return Err(ClickhouseSinkError::ColumnNotFound(field.name)); @@ -153,7 +142,7 @@ impl ClickhouseSchema { expected_type = format!("Array({expected_type})"); } - let column_type = column.r#type.clone(); + let column_type = column.type_.clone(); if expected_type != column_type { return Err(ClickhouseSinkError::ColumnTypeMismatch( field.name, @@ -167,32 +156,49 @@ impl ClickhouseSchema { } async fn fetch_sink_table_info( - client: &Client, + mut handle: ClientHandle, sink_table_name: &str, ) -> Result { - Ok(client - .query("SELECT database, name, engine, engine_full FROM system.tables WHERE name = ?") - .bind(sink_table_name) - .fetch_one::() - .await?) + let block = handle + .query(format!( + "SELECT database, name, engine, engine_full FROM system.tables WHERE name = {}", + sink_table_name + )) + .fetch_all() + .await?; + let row = block.rows().next().unwrap(); + Ok(ClickhouseTable { + database: row.get(0)?, + name: row.get(1)?, + engine: row.get(2)?, + engine_full: row.get(3)?, + }) } async fn fetch_primary_keys( - client: &Client, + mut handle: ClientHandle, sink_table_name: &str, schema: &str, ) -> Result, ClickhouseSinkError> { - Ok(client - .query("SELECT ?fields FROM INFORMATION_SCHEMA.key_column_usage WHERE table_name = ? AND constraint_schema = ?") - .bind(sink_table_name) - .bind(schema) - .fetch_all::() - .await? - .iter() - .filter_map(|key_column_def| { - key_column_def.column_name.clone() - }) - .collect() - ) + let block = handle + .query(format!( + r#" + SELECT column_name, constraint_name, constraint_schema + FROM INFORMATION_SCHEMA.key_column_usage + WHERE table_name = {} AND constraint_schema = {} and column_name is NOT NULL"#, + sink_table_name, schema + )) + .fetch_all() + .await?; + + let mut keys = vec![]; + for r in block.rows() { + let name: Option = r.get(0)?; + if let Some(name) = name { + keys.push(name); + } + } + + Ok(keys) } } diff --git a/dozer-sink-clickhouse/src/sink.rs b/dozer-sink-clickhouse/src/sink.rs new file mode 100644 index 0000000000..c46d586d1a --- /dev/null +++ b/dozer-sink-clickhouse/src/sink.rs @@ -0,0 +1,224 @@ +use dozer_core::epoch::Epoch; +use dozer_core::event::EventHub; +use dozer_core::node::{PortHandle, Sink, SinkFactory}; +use dozer_core::tokio::runtime::Runtime; +use dozer_core::DEFAULT_PORT_HANDLE; +use dozer_types::errors::internal::BoxedError; + +use dozer_types::models::sink::ClickhouseSinkConfig; +use dozer_types::node::OpIdentifier; + +use crate::client::ClickhouseClient; +use crate::errors::ClickhouseSinkError; +use crate::schema::{ClickhouseSchema, ClickhouseTable}; +use dozer_types::tonic::async_trait; +use dozer_types::types::{Field, Operation, Schema, TableOperation}; +use std::collections::HashMap; +use std::fmt::Debug; +use std::sync::Arc; + +#[derive(Debug)] +pub struct ClickhouseSinkFactory { + runtime: Arc, + config: ClickhouseSinkConfig, +} + +impl ClickhouseSinkFactory { + pub fn new(config: ClickhouseSinkConfig, runtime: Arc) -> Self { + Self { config, runtime } + } +} + +#[async_trait] +impl SinkFactory for ClickhouseSinkFactory { + fn type_name(&self) -> String { + "clickhouse".to_string() + } + + fn get_input_ports(&self) -> Vec { + vec![DEFAULT_PORT_HANDLE] + } + + fn get_input_port_name(&self, _port: &PortHandle) -> String { + self.config.source_table_name.clone() + } + + fn prepare(&self, input_schemas: HashMap) -> Result<(), BoxedError> { + debug_assert!(input_schemas.len() == 1); + Ok(()) + } + + async fn build( + &self, + mut input_schemas: HashMap, + _event_hub: EventHub, + ) -> Result, BoxedError> { + let schema = input_schemas.remove(&DEFAULT_PORT_HANDLE).unwrap(); + + let client = ClickhouseClient::root(); + + let config = &self.config; + if self.config.create_table_options.is_some() { + client + .create_table(&config.sink_table_name, &schema.fields) + .await?; + } + let table = ClickhouseSchema::get_clickhouse_table(client.clone(), &self.config).await?; + + let primary_key_field_names = + ClickhouseSchema::get_primary_keys(client.clone(), &self.config).await?; + + let primary_key_fields_indexes: Result, ClickhouseSinkError> = + primary_key_field_names + .iter() + .map(|primary_key| { + schema + .fields + .iter() + .position(|field| field.name == *primary_key) + .ok_or(ClickhouseSinkError::PrimaryKeyNotFound) + }) + .collect(); + + let sink = ClickhouseSink::new( + client, + self.config.clone(), + schema, + self.runtime.clone(), + table, + primary_key_fields_indexes?, + ); + + Ok(Box::new(sink)) + } +} + +pub(crate) struct ClickhouseSink { + pub(crate) client: ClickhouseClient, + pub(crate) runtime: Arc, + pub(crate) schema: Schema, + pub(crate) sink_table_name: String, + pub(crate) table: ClickhouseTable, + pub(crate) primary_key_fields_indexes: Vec, +} + +impl Debug for ClickhouseSink { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ClickhouseSink") + .field("sink_table_name", &self.sink_table_name) + .field( + "primary_key_fields_indexes", + &self.primary_key_fields_indexes, + ) + .field("table", &self.table) + .field("schema", &self.schema) + .finish() + } +} + +impl ClickhouseSink { + pub fn new( + client: ClickhouseClient, + config: ClickhouseSinkConfig, + schema: Schema, + runtime: Arc, + table: ClickhouseTable, + primary_key_fields_indexes: Vec, + ) -> Self { + Self { + client, + runtime, + schema, + sink_table_name: config.sink_table_name, + table, + primary_key_fields_indexes, + } + } + + fn insert_values(&self, values: &[Field]) -> Result<(), BoxedError> { + self.runtime.block_on(async { + self.client + .insert(&self.sink_table_name, &self.schema.fields, values) + .await?; + Ok::<(), BoxedError>(()) + })?; + Ok(()) + } +} + +impl Sink for ClickhouseSink { + fn commit(&mut self, _epoch_details: &Epoch) -> Result<(), BoxedError> { + Ok(()) + } + + fn process(&mut self, op: TableOperation) -> Result<(), BoxedError> { + match op.op { + Operation::Insert { new } => { + if self.table.engine == "CollapsingMergeTree" { + let mut values = new.values; + values.push(Field::Int(1)); + + self.insert_values(&values)?; + } else { + self.insert_values(&new.values)?; + } + } + Operation::Delete { old } => { + if self.table.engine != "CollapsingMergeTree" { + return Err(BoxedError::from(ClickhouseSinkError::UnsupportedOperation)); + } + let mut values = old.values; + values.push(Field::Int(-1)); + self.insert_values(&values)?; + } + Operation::Update { new, old } => { + if self.table.engine != "CollapsingMergeTree" { + return Err(BoxedError::from(ClickhouseSinkError::UnsupportedOperation)); + } + let mut values = old.values; + values.push(Field::Int(-1)); + self.insert_values(&values)?; + + let mut values = new.values; + values.push(Field::Int(1)); + self.insert_values(&values)?; + } + Operation::BatchInsert { new } => { + for record in new { + let mut values = record.values; + values.push(Field::Int(1)); + self.insert_values(&values)?; + } + } + } + + Ok(()) + } + + fn on_source_snapshotting_started( + &mut self, + _connection_name: String, + ) -> Result<(), BoxedError> { + Ok(()) + } + + fn on_source_snapshotting_done( + &mut self, + _connection_name: String, + _id: Option, + ) -> Result<(), BoxedError> { + Ok(()) + } + + fn set_source_state(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> { + Ok(()) + } + + fn get_source_state(&mut self) -> Result>, BoxedError> { + Ok(None) + } + + fn get_latest_op_id(&mut self) -> Result, BoxedError> { + Ok(None) + } +} diff --git a/dozer-sink-clickhouse/src/tests.rs b/dozer-sink-clickhouse/src/tests.rs index 894a747650..1fcee6cdaa 100644 --- a/dozer-sink-clickhouse/src/tests.rs +++ b/dozer-sink-clickhouse/src/tests.rs @@ -1,94 +1,94 @@ -use crate::schema::ClickhouseSchema; -use crate::ClickhouseSinkError; -use clickhouse::Client; -use dozer_core::tokio; -use dozer_types::models::sink::ClickhouseSinkConfig; -use dozer_types::types::{FieldDefinition, FieldType, Schema}; +// use crate::errors::ClickhouseSinkError; +// use crate::schema::ClickhouseSchema; +// use clickhouse::Client; +// use dozer_core::tokio; +// use dozer_types::models::sink::ClickhouseSinkConfig; +// use dozer_types::types::{FieldDefinition, FieldType, Schema}; -fn get_client() -> Client { - Client::default() - .with_url("http://localhost:8123") - .with_user("default") - .with_database("default") -} +// fn get_client() -> Client { +// Client::default() +// .with_url("http://localhost:8123") +// .with_user("default") +// .with_database("default") +// } -fn get_sink_config() -> ClickhouseSinkConfig { - ClickhouseSinkConfig { - source_table_name: "source_table".to_string(), - sink_table_name: "sink_table".to_string(), - create_table_options: None, - primary_keys: Some(vec!["id".to_string()]), - user: "default".to_string(), - password: Some("default".to_string()), - database: "default".to_string(), - database_url: "http://localhost:8123".to_string(), - } -} +// fn get_sink_config() -> ClickhouseSinkConfig { +// ClickhouseSinkConfig { +// source_table_name: "source_table".to_string(), +// sink_table_name: "sink_table".to_string(), +// create_table_options: None, +// primary_keys: Some(vec!["id".to_string()]), +// user: "default".to_string(), +// password: Some("default".to_string()), +// database: "default".to_string(), +// database_url: "http://localhost:8123".to_string(), +// } +// } -fn get_dozer_schema() -> Schema { - Schema { - fields: vec![ - FieldDefinition { - name: "id".to_string(), - typ: FieldType::UInt, - nullable: false, - source: Default::default(), - }, - FieldDefinition { - name: "data".to_string(), - typ: FieldType::String, - nullable: false, - source: Default::default(), - }, - ], - primary_index: vec![0], - } -} +// fn get_dozer_schema() -> Schema { +// Schema { +// fields: vec![ +// FieldDefinition { +// name: "id".to_string(), +// typ: FieldType::UInt, +// nullable: false, +// source: Default::default(), +// }, +// FieldDefinition { +// name: "data".to_string(), +// typ: FieldType::String, +// nullable: false, +// source: Default::default(), +// }, +// ], +// primary_index: vec![0], +// } +// } -async fn create_table(table_name: &str) { - let client = get_client(); - client - .query(&format!("DROP TABLE IF EXISTS {table_name}")) - .execute() - .await - .unwrap(); +// async fn create_table(table_name: &str) { +// let client = get_client(); +// client +// .query(&format!("DROP TABLE IF EXISTS {table_name}")) +// .execute() +// .await +// .unwrap(); - client - .query(&format!("CREATE TABLE {table_name}(id UInt64, data String, PRIMARY KEY id) ENGINE = MergeTree ORDER BY id")) - .execute() - .await - .unwrap(); -} +// client +// .query(&format!("CREATE TABLE {table_name}(id UInt64, data String, PRIMARY KEY id) ENGINE = MergeTree ORDER BY id")) +// .execute() +// .await +// .unwrap(); +// } -#[tokio::test] -#[ignore] -async fn test_get_clickhouse_table() { - let client = get_client(); - let sink_config = get_sink_config(); - create_table(&sink_config.sink_table_name).await; - let schema = get_dozer_schema(); +// #[tokio::test] +// #[ignore] +// async fn test_get_clickhouse_table() { +// let client = get_client(); +// let sink_config = get_sink_config(); +// create_table(&sink_config.sink_table_name).await; +// let schema = get_dozer_schema(); - let clickhouse_table = ClickhouseSchema::get_clickhouse_table(&client, &sink_config, &schema) - .await - .unwrap(); - assert_eq!(clickhouse_table.name, sink_config.sink_table_name); -} +// let clickhouse_table = ClickhouseSchema::get_clickhouse_table(&client, &sink_config, &schema) +// .await +// .unwrap(); +// assert_eq!(clickhouse_table.name, sink_config.sink_table_name); +// } -#[tokio::test] -#[ignore] -async fn test_get_not_existing_clickhouse_table() { - let client = get_client(); - let mut sink_config = get_sink_config(); - create_table("not_existing").await; - let schema = get_dozer_schema(); +// #[tokio::test] +// #[ignore] +// async fn test_get_not_existing_clickhouse_table() { +// let client = get_client(); +// let mut sink_config = get_sink_config(); +// create_table("not_existing").await; +// let schema = get_dozer_schema(); - sink_config.create_table_options = None; +// sink_config.create_table_options = None; - let clickhouse_table = - ClickhouseSchema::get_clickhouse_table(&client, &sink_config, &schema).await; - eprintln!("CT {:?}", clickhouse_table); - assert!(matches!( - clickhouse_table, - Err(ClickhouseSinkError::SinkTableDoesNotExist) - )); -} +// let clickhouse_table = +// ClickhouseSchema::get_clickhouse_table(&client, &sink_config, &schema).await; +// eprintln!("CT {:?}", clickhouse_table); +// assert!(matches!( +// clickhouse_table, +// Err(ClickhouseSinkError::SinkTableDoesNotExist) +// )); +// } diff --git a/dozer-sink-clickhouse/src/types.rs b/dozer-sink-clickhouse/src/types.rs new file mode 100644 index 0000000000..48fc8be2d7 --- /dev/null +++ b/dozer-sink-clickhouse/src/types.rs @@ -0,0 +1,325 @@ +use crate::errors::QueryError; + +use chrono_tz::{Tz, UTC}; +use clickhouse_rs::{Block, ClientHandle}; +use dozer_types::chrono::{DateTime, FixedOffset, NaiveDate, Offset, TimeZone}; +use dozer_types::json_types::JsonValue; +use dozer_types::ordered_float::OrderedFloat; +use dozer_types::rust_decimal::prelude::ToPrimitive; +use dozer_types::rust_decimal::{self}; +use dozer_types::serde_json; +use dozer_types::types::{Field, FieldDefinition, FieldType}; +use either::Either; + +use clickhouse_rs::types::{FromSql, Value, ValueRef}; + +pub struct ValueWrapper(pub Value); + +impl<'a> FromSql<'a> for ValueWrapper { + fn from_sql(value: ValueRef<'a>) -> clickhouse_rs::errors::Result { + let v = Value::from(value); + Ok(ValueWrapper(v)) + } +} + +pub fn map_value_wrapper_to_field( + value: ValueWrapper, + field: FieldDefinition, +) -> Result { + if field.nullable { + if let clickhouse_rs::types::Value::Nullable(v) = value.0 { + match v { + Either::Left(_) => Ok(Field::Null), + Either::Right(data) => { + let mut fd = field.clone(); + fd.nullable = false; + map_value_wrapper_to_field(ValueWrapper(*data), fd) + } + } + } else { + Err(QueryError::CustomError(format!( + "Field is marked as nullable in Schema but not as per the database {0:?}", + field.name + ))) + } + } else { + let value = value.0; + match field.typ { + FieldType::UInt => match value { + clickhouse_rs::types::Value::UInt8(val) => Ok(Field::UInt(val.into())), + clickhouse_rs::types::Value::UInt16(val) => Ok(Field::UInt(val.into())), + clickhouse_rs::types::Value::UInt32(val) => Ok(Field::UInt(val.into())), + clickhouse_rs::types::Value::UInt64(val) => Ok(Field::UInt(val)), + _ => Err(QueryError::CustomError("Invalid UInt value".to_string())), + }, + FieldType::U128 => match value { + clickhouse_rs::types::Value::UInt128(val) => Ok(Field::U128(val)), + _ => Err(QueryError::CustomError("Invalid U128 value".to_string())), + }, + FieldType::Int => match value { + clickhouse_rs::types::Value::Int8(val) => Ok(Field::Int(val.into())), + clickhouse_rs::types::Value::Int16(val) => Ok(Field::Int(val.into())), + clickhouse_rs::types::Value::Int32(val) => Ok(Field::Int(val.into())), + clickhouse_rs::types::Value::Int64(val) => Ok(Field::Int(val)), + _ => Err(QueryError::CustomError("Invalid Int value".to_string())), + }, + FieldType::I128 => match value { + clickhouse_rs::types::Value::Int128(val) => Ok(Field::I128(val)), + _ => Err(QueryError::CustomError("Invalid I128 value".to_string())), + }, + FieldType::Float => match value { + clickhouse_rs::types::Value::Float64(val) => Ok(Field::Float(OrderedFloat(val))), + _ => Err(QueryError::CustomError("Invalid Float value".to_string())), + }, + FieldType::Boolean => match value { + clickhouse_rs::types::Value::UInt8(val) => Ok(Field::Boolean(val != 0)), + _ => Err(QueryError::CustomError("Invalid Boolean value".to_string())), + }, + FieldType::String => match value { + clickhouse_rs::types::Value::String(_) => Ok(Field::String(value.to_string())), + _ => Err(QueryError::CustomError("Invalid String value".to_string())), + }, + FieldType::Text => match value { + clickhouse_rs::types::Value::String(_) => Ok(Field::String(value.to_string())), + _ => Err(QueryError::CustomError("Invalid String value".to_string())), + }, + FieldType::Binary => match value { + clickhouse_rs::types::Value::String(val) => { + let val = (*val).clone(); + Ok(Field::Binary(val)) + } + _ => Err(QueryError::CustomError("Invalid Binary value".to_string())), + }, + FieldType::Decimal => match value { + clickhouse_rs::types::Value::Decimal(v) => Ok(Field::Decimal( + rust_decimal::Decimal::new(v.internal(), v.scale() as u32), + )), + _ => Err(QueryError::CustomError("Invalid Decimal value".to_string())), + }, + FieldType::Timestamp => { + let v: DateTime = value.into(); + let dt = convert_to_fixed_offset(v); + match dt { + Some(dt) => Ok(Field::Timestamp(dt)), + None => Err(QueryError::CustomError( + "Invalid Timestamp value".to_string(), + )), + } + } + FieldType::Date => Ok(Field::Date(value.into())), + FieldType::Json => match value { + clickhouse_rs::types::Value::String(_) => { + let json = value.to_string(); + let json = serde_json::from_str(&json); + json.map(Field::Json) + .map_err(|e| QueryError::CustomError(e.to_string())) + } + _ => Err(QueryError::CustomError("Invalid Json value".to_string())), + }, + x => Err(QueryError::CustomError(format!( + "Unsupported type {0:?}", + x + ))), + } + } +} + +fn convert_to_fixed_offset(datetime_tz: DateTime) -> Option> { + // Get the offset from UTC in seconds for the specific datetime + let offset_seconds = datetime_tz + .timezone() + .offset_from_utc_datetime(&datetime_tz.naive_utc()) + .fix() + .local_minus_utc(); + + // Create a FixedOffset timezone from the offset in seconds using east_opt() + FixedOffset::east_opt(offset_seconds) + .map(|fixed_offset| fixed_offset.from_utc_datetime(&datetime_tz.naive_utc())) +} + +fn type_mismatch_error(expected_type: &str, field_name: &str) -> QueryError { + QueryError::CustomError(format!( + "Unexpected field type for {}, expected {}", + field_name, expected_type + )) +} + +macro_rules! handle_type { + ($nullable: expr, $b: expr, $field_type:ident, $rust_type:ty, $column_values:expr, $n:expr) => {{ + if $nullable { + let column_data: Vec> = $column_values.iter().map(Some).collect(); + let mut col: Vec> = vec![]; + for f in column_data { + let v = match f { + Some(Field::$field_type(v)) => Ok(Some(*v)), + None => Ok(None), + _ => Err(type_mismatch_error(stringify!($field_type), $n)), + }?; + col.push(v); + } + $b = $b.column($n, col); + } else { + let mut col: Vec<$rust_type> = vec![]; + for f in $column_values { + let v = match f { + Field::$field_type(v) => Ok(*v), + _ => Err(type_mismatch_error(stringify!($field_type), $n)), + }?; + col.push(v); + } + $b = $b.column($n, col); + } + }}; +} + +macro_rules! handle_complex_type { + ($nullable: expr, $b: expr, $field_type:ident, $rust_type:ty, $column_values:expr, $n:expr, $complex_expr:expr) => {{ + if $nullable { + let column_data: Vec> = $column_values.iter().map(Some).collect(); + let mut col: Vec> = vec![]; + for f in column_data { + let v = match f { + Some(Field::$field_type(v)) => { + let v = $complex_expr(v); + Ok(v) + } + None => Ok(None), + _ => Err(type_mismatch_error(stringify!($field_type), $n)), + }?; + col.push(v); + } + $b = $b.column($n, col); + } else { + let mut col: Vec<$rust_type> = vec![]; + for f in $column_values { + let v = match f { + Field::$field_type(v) => { + let v = $complex_expr(v); + match v { + Some(v) => Ok(v), + None => Err(type_mismatch_error(stringify!($field_type), $n)), + } + } + _ => Err(type_mismatch_error(stringify!($field_type), $n)), + }?; + col.push(v); + } + $b = $b.column($n, col); + } + }}; +} + +pub async fn insert_multi( + mut client: ClientHandle, + table_name: &str, + fields: &[FieldDefinition], + values: &[Vec], // Now expects a Vec of Vec of Field +) -> Result<(), QueryError> { + let mut b = Block::::new(); + + for (field_index, fd) in fields.iter().enumerate() { + let column_values: Vec<_> = values.iter().map(|row| &row[field_index]).collect(); + + let n = &fd.name; + let nullable = fd.nullable; + match fd.typ { + FieldType::UInt => handle_type!(nullable, b, UInt, u64, column_values, n), + FieldType::U128 => handle_type!(nullable, b, U128, u128, column_values, n), + FieldType::Int => handle_type!(nullable, b, Int, i64, column_values, n), + FieldType::I128 => handle_type!(nullable, b, I128, i128, column_values, n), + FieldType::Boolean => handle_type!(nullable, b, Boolean, bool, column_values, n), + FieldType::Float => { + handle_complex_type!( + nullable, + b, + Float, + f64, + column_values, + n, + |f: &OrderedFloat| -> Option { f.to_f64() } + ) + } + FieldType::String | FieldType::Text => { + handle_complex_type!( + nullable, + b, + String, + String, + column_values, + n, + |f: &String| -> Option { Some(f.to_string()) } + ) + } + FieldType::Binary => { + handle_complex_type!( + nullable, + b, + Binary, + Vec, + column_values, + n, + |f: &Vec| -> Option> { Some(f.clone()) } + ) + } + FieldType::Decimal => { + handle_complex_type!( + nullable, + b, + Decimal, + f64, + column_values, + n, + |f: &rust_decimal::Decimal| -> Option { f.to_f64() } + ) + } + FieldType::Timestamp => { + handle_complex_type!( + nullable, + b, + Timestamp, + DateTime, + column_values, + n, + |dt: &DateTime| -> Option> { + Some(dt.with_timezone(&UTC)) + } + ) + } + FieldType::Date => { + handle_complex_type!( + nullable, + b, + Date, + NaiveDate, + column_values, + n, + |f: &NaiveDate| -> Option { Some(f.clone()) } + ) + } + FieldType::Json => { + handle_complex_type!( + nullable, + b, + Json, + Vec, + column_values, + n, + |f: &JsonValue| -> Option> { + Some(dozer_types::json_types::json_to_bytes(f)) + } + ) + } + ft => { + return Err(QueryError::CustomError(format!( + "Unsupported field_type {} for {}", + ft, n + ))); + } + } + } + + // Insert the block into the table + client.insert(table_name, b).await?; + + Ok(()) +} diff --git a/dozer-types/src/json_types.rs b/dozer-types/src/json_types.rs index 96024902f8..e556697190 100644 --- a/dozer-types/src/json_types.rs +++ b/dozer-types/src/json_types.rs @@ -36,7 +36,7 @@ pub fn json_to_string(value: &JsonValue) -> String { serde_value.to_string() } -pub(crate) fn json_to_bytes(value: &JsonValue) -> Vec { +pub fn json_to_bytes(value: &JsonValue) -> Vec { rmp_serde::to_vec(value).unwrap() } From e6bc57720202950058fa3f19c9d6121176c282b5 Mon Sep 17 00:00:00 2001 From: VG Date: Thu, 7 Mar 2024 19:52:42 +0800 Subject: [PATCH 02/13] chore: use sink config --- dozer-sink-clickhouse/src/client.rs | 22 ++++++++++++++-------- dozer-sink-clickhouse/src/sink.rs | 2 +- dozer-types/src/models/sink.rs | 13 +++++++++++++ json_schemas/dozer.json | 6 +++++- 4 files changed, 33 insertions(+), 10 deletions(-) diff --git a/dozer-sink-clickhouse/src/client.rs b/dozer-sink-clickhouse/src/client.rs index edb9d28a3a..35e54a12eb 100644 --- a/dozer-sink-clickhouse/src/client.rs +++ b/dozer-sink-clickhouse/src/client.rs @@ -4,6 +4,7 @@ use super::types::ValueWrapper; use crate::errors::QueryError; use crate::types::{insert_multi, map_value_wrapper_to_field}; use clickhouse_rs::{ClientHandle, Pool}; +use dozer_types::models::sink::ClickhouseSinkConfig; use dozer_types::types::{Field, FieldDefinition}; use serde::Serialize; @@ -31,19 +32,24 @@ pub struct QueryLog { pub result_bytes: u64, pub memory_usage: u64, } +pub struct ClickhouseOptionsWrapper(ClickhouseSinkConfig); impl ClickhouseClient { - pub fn root() -> Self { - let pool = Pool::new("tcp://default@localhost:9000/default"); - + pub fn new(config: ClickhouseSinkConfig) -> Self { + let url = Self::construct_url(config); + let pool = Pool::new(url); Self { pool } } - pub fn new(app_id: i32) -> Self { - let pool = Pool::new( - format!("tcp://default@localhost:9000/db_{app_id}", app_id = app_id).as_str(), - ); - Self { pool } + pub fn construct_url(config: ClickhouseSinkConfig) -> String { + format!( + "{}://{}:{}@{}/{}", + config.scheme, + config.user, + config.password.as_deref().unwrap_or(""), + config.database_url, + config.database + ) } pub fn get_log_query(log_comment: &str) -> String { diff --git a/dozer-sink-clickhouse/src/sink.rs b/dozer-sink-clickhouse/src/sink.rs index c46d586d1a..cbfba66535 100644 --- a/dozer-sink-clickhouse/src/sink.rs +++ b/dozer-sink-clickhouse/src/sink.rs @@ -55,7 +55,7 @@ impl SinkFactory for ClickhouseSinkFactory { ) -> Result, BoxedError> { let schema = input_schemas.remove(&DEFAULT_PORT_HANDLE).unwrap(); - let client = ClickhouseClient::root(); + let client = ClickhouseClient::new(self.config.clone()); let config = &self.config; if self.config.create_table_options.is_some() { diff --git a/dozer-types/src/models/sink.rs b/dozer-types/src/models/sink.rs index 766d870db8..f271e0c526 100644 --- a/dozer-types/src/models/sink.rs +++ b/dozer-types/src/models/sink.rs @@ -161,6 +161,9 @@ pub struct ClickhouseSinkConfig { pub user: String, #[serde(default)] pub password: Option, + #[serde(default = "ClickhouseSinkConfig::default_scheme")] + pub scheme: String, + #[serde(default = "ClickhouseSinkConfig::default_database")] pub database: String, pub source_table_name: String, pub sink_table_name: String, @@ -168,6 +171,16 @@ pub struct ClickhouseSinkConfig { pub create_table_options: Option, } +impl ClickhouseSinkConfig { + fn default_database() -> String { + "default".to_string() + } + + fn default_scheme() -> String { + "tcp".to_string() + } +} + #[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq, Clone)] #[serde(deny_unknown_fields)] pub struct ClickhouseSinkTableOptions { diff --git a/json_schemas/dozer.json b/json_schemas/dozer.json index a6a8ee2d93..0869958c6a 100644 --- a/json_schemas/dozer.json +++ b/json_schemas/dozer.json @@ -405,7 +405,6 @@ "ClickhouseSinkConfig": { "type": "object", "required": [ - "database", "database_url", "sink_table_name", "source_table_name", @@ -423,6 +422,7 @@ ] }, "database": { + "default": "default", "type": "string" }, "database_url": { @@ -444,6 +444,10 @@ "type": "string" } }, + "scheme": { + "default": "tcp", + "type": "string" + }, "sink_table_name": { "type": "string" }, From f21c523817e5b6870383e6d7044ff23a38e3c34b Mon Sep 17 00:00:00 2001 From: VG Date: Thu, 7 Mar 2024 21:37:16 +0800 Subject: [PATCH 03/13] chore: fix clickhouse sink --- dozer-cli/src/main.rs | 17 +++++------------ dozer-sink-clickhouse/src/client.rs | 23 ++++++++++++++--------- dozer-sink-clickhouse/src/errors.rs | 17 +++-------------- dozer-sink-clickhouse/src/schema.rs | 23 ++++++++++++----------- dozer-sink-clickhouse/src/sink.rs | 2 ++ dozer-sink-clickhouse/src/types.rs | 5 +---- dozer-tracing/src/telemetry.rs | 3 ++- dozer-types/src/models/sink.rs | 16 ++++++++++++++-- json_schemas/dozer.json | 14 ++++++++++---- 9 files changed, 63 insertions(+), 57 deletions(-) diff --git a/dozer-cli/src/main.rs b/dozer-cli/src/main.rs index 865d28eded..bdd917f6cd 100644 --- a/dozer-cli/src/main.rs +++ b/dozer-cli/src/main.rs @@ -9,7 +9,7 @@ use dozer_cli::{set_ctrl_handler, set_panic_hook}; use dozer_core::shutdown; use dozer_tracing::LabelsAndProgress; use dozer_types::models::config::Config; -use dozer_types::models::telemetry::{TelemetryConfig, TelemetryMetricsConfig}; +use dozer_types::models::telemetry::TelemetryConfig; use dozer_types::tracing::{error, error_span, info}; use futures::TryFutureExt; use std::process; @@ -45,17 +45,10 @@ fn run() -> Result<(), OrchestrationError> { .map(|(c, _)| c.cloud.app_id.as_deref().unwrap_or(&c.app_name)) .ok(); - let telemetry_config = if matches!(cli.cmd, Commands::Run) { - TelemetryConfig { - trace: None, - metrics: Some(TelemetryMetricsConfig::Prometheus), - } - } else { - config_res - .as_ref() - .map(|(c, _)| c.telemetry.clone()) - .unwrap_or_default() - }; + let telemetry_config = config_res + .as_ref() + .map(|(c, _)| c.telemetry.clone()) + .unwrap_or_default(); let _telemetry = runtime.block_on(async { Telemetry::new(app_id, &telemetry_config) }); diff --git a/dozer-sink-clickhouse/src/client.rs b/dozer-sink-clickhouse/src/client.rs index 35e54a12eb..b34b772780 100644 --- a/dozer-sink-clickhouse/src/client.rs +++ b/dozer-sink-clickhouse/src/client.rs @@ -4,6 +4,7 @@ use super::types::ValueWrapper; use crate::errors::QueryError; use crate::types::{insert_multi, map_value_wrapper_to_field}; use clickhouse_rs::{ClientHandle, Pool}; +use dozer_types::log::{debug, info}; use dozer_types::models::sink::ClickhouseSinkConfig; use dozer_types::types::{Field, FieldDefinition}; use serde::Serialize; @@ -42,14 +43,17 @@ impl ClickhouseClient { } pub fn construct_url(config: ClickhouseSinkConfig) -> String { - format!( - "{}://{}:{}@{}/{}", - config.scheme, - config.user, - config.password.as_deref().unwrap_or(""), - config.database_url, - config.database - ) + let user_password = match &config.password { + Some(password) => format!("{}:{}", config.user, password), + None => config.user, + }; + + let url = format!( + "{}://{}@{}:{}/{}", + config.scheme, user_password, config.host, config.port, config.database + ); + debug!("{url}"); + url } pub fn get_log_query(log_comment: &str) -> String { @@ -90,7 +94,8 @@ impl ClickhouseClient { ) -> Result<(), QueryError> { let mut client = self.pool.get_handle().await?; let ddl = get_create_table_query(datasource_name, fields, None, None); - println!("#{ddl}"); + info!("Creating Clickhouse Sink Table"); + info!("{ddl}"); client.execute(ddl).await?; Ok(()) } diff --git a/dozer-sink-clickhouse/src/errors.rs b/dozer-sink-clickhouse/src/errors.rs index 02b8d8e7f1..e056181d07 100644 --- a/dozer-sink-clickhouse/src/errors.rs +++ b/dozer-sink-clickhouse/src/errors.rs @@ -1,9 +1,4 @@ -use clickhouse_rs::types::SqlType; -use dozer_types::{ - thiserror::{self, Error}, - types::FieldDefinition, -}; -pub const BATCH_SIZE: usize = 100; +use dozer_types::thiserror::{self, Error}; #[derive(Error, Debug)] pub enum ClickhouseSinkError { @@ -31,9 +26,6 @@ pub enum ClickhouseSinkError { #[error("Expected primary key {0:?} but got {1:?}")] PrimaryKeyMismatch(Vec, Vec), - #[error("Schema field not found by index {0}")] - SchemaFieldNotFoundByIndex(usize), - #[error("QueryError: {0:?}")] QueryError(#[from] QueryError), } @@ -43,11 +35,8 @@ pub enum QueryError { #[error("Clickhouse error: {0:?}")] DataFetchError(#[from] clickhouse_rs::errors::Error), - #[error("Unsupported type: {0:?}")] - UnsupportedType(SqlType), - - #[error("Schema has type {0:?} but value is of type {1:?}")] - TypeMismatch(FieldDefinition, SqlType), + #[error("Unexpected field type for {0:?}, expected {0}")] + TypeMismatch(String, String), #[error("{0:?}")] CustomError(String), diff --git a/dozer-sink-clickhouse/src/schema.rs b/dozer-sink-clickhouse/src/schema.rs index 37faae2758..e6192da7fe 100644 --- a/dozer-sink-clickhouse/src/schema.rs +++ b/dozer-sink-clickhouse/src/schema.rs @@ -1,5 +1,4 @@ use crate::client::ClickhouseClient; -use crate::ddl; use crate::errors::ClickhouseSinkError::{self, SinkTableDoesNotExist}; use clickhouse_rs::types::Complex; use clickhouse_rs::{Block, ClientHandle}; @@ -80,11 +79,13 @@ impl ClickhouseSchema { Ok(existing_pk) } - async fn compare_with_dozer_schema( - mut client: ClientHandle, - schema: Schema, - table: ClickhouseTable, + + pub async fn compare_with_dozer_schema( + client: ClickhouseClient, + schema: &Schema, + table: &ClickhouseTable, ) -> Result<(), ClickhouseSinkError> { + let mut client = client.get_client_handle().await?; let block: Block = client .query(&format!( "DESCRIBE TABLE {database}.{table_name}", @@ -106,9 +107,9 @@ impl ClickhouseSchema { }) .collect(); - for field in schema.fields { + for field in &schema.fields { let Some(column) = columns.iter().find(|column| column.name == field.name) else { - return Err(ClickhouseSinkError::ColumnNotFound(field.name)); + return Err(ClickhouseSinkError::ColumnNotFound(field.name.clone())); }; let mut expected_type = match field.typ { @@ -127,7 +128,7 @@ impl ClickhouseSchema { FieldType::Point => "Point", FieldType::Duration => { return Err(ClickhouseSinkError::TypeNotSupported( - field.name, + field.name.clone(), "Duration".to_string(), )) } @@ -145,7 +146,7 @@ impl ClickhouseSchema { let column_type = column.type_.clone(); if expected_type != column_type { return Err(ClickhouseSinkError::ColumnTypeMismatch( - field.name, + field.name.clone(), expected_type.to_string(), column_type.to_string(), )); @@ -161,7 +162,7 @@ impl ClickhouseSchema { ) -> Result { let block = handle .query(format!( - "SELECT database, name, engine, engine_full FROM system.tables WHERE name = {}", + "SELECT database, name, engine, engine_full FROM system.tables WHERE name = '{}'", sink_table_name )) .fetch_all() @@ -185,7 +186,7 @@ impl ClickhouseSchema { r#" SELECT column_name, constraint_name, constraint_schema FROM INFORMATION_SCHEMA.key_column_usage - WHERE table_name = {} AND constraint_schema = {} and column_name is NOT NULL"#, + WHERE table_name = '{}' AND constraint_schema = '{}' and column_name is NOT NULL"#, sink_table_name, schema )) .fetch_all() diff --git a/dozer-sink-clickhouse/src/sink.rs b/dozer-sink-clickhouse/src/sink.rs index cbfba66535..56c2f10e78 100644 --- a/dozer-sink-clickhouse/src/sink.rs +++ b/dozer-sink-clickhouse/src/sink.rs @@ -65,6 +65,8 @@ impl SinkFactory for ClickhouseSinkFactory { } let table = ClickhouseSchema::get_clickhouse_table(client.clone(), &self.config).await?; + ClickhouseSchema::compare_with_dozer_schema(client.clone(), &schema, &table).await?; + let primary_key_field_names = ClickhouseSchema::get_primary_keys(client.clone(), &self.config).await?; diff --git a/dozer-sink-clickhouse/src/types.rs b/dozer-sink-clickhouse/src/types.rs index 48fc8be2d7..75c10df85e 100644 --- a/dozer-sink-clickhouse/src/types.rs +++ b/dozer-sink-clickhouse/src/types.rs @@ -138,10 +138,7 @@ fn convert_to_fixed_offset(datetime_tz: DateTime) -> Option QueryError { - QueryError::CustomError(format!( - "Unexpected field type for {}, expected {}", - field_name, expected_type - )) + QueryError::TypeMismatch(expected_type.to_string(), field_name.to_string()) } macro_rules! handle_type { diff --git a/dozer-tracing/src/telemetry.rs b/dozer-tracing/src/telemetry.rs index 2b3deeb139..4caab836e5 100644 --- a/dozer-tracing/src/telemetry.rs +++ b/dozer-tracing/src/telemetry.rs @@ -63,7 +63,7 @@ fn create_subscriber( let app_name = app_name.unwrap_or("dozer"); let fmt_filter = EnvFilter::try_from_default_env() - .or_else(|_| EnvFilter::try_new("info")) + .or_else(|_| EnvFilter::try_new("info,clickhouse_rs=error")) .unwrap(); // `console_subscriber` can only be added once. @@ -78,6 +78,7 @@ fn create_subscriber( let trace_filter = EnvFilter::try_from_env("DOZER_TRACE_FILTER") .unwrap_or_else(|_| EnvFilter::try_new("dozer=trace").unwrap()); + let layers = match &telemetry_config.trace { None => (None, None), Some(TelemetryTraceConfig::Dozer(config)) => ( diff --git a/dozer-types/src/models/sink.rs b/dozer-types/src/models/sink.rs index f271e0c526..96faef6281 100644 --- a/dozer-types/src/models/sink.rs +++ b/dozer-types/src/models/sink.rs @@ -157,7 +157,11 @@ pub struct AerospikeSinkConfig { #[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq, Clone)] #[serde(deny_unknown_fields)] pub struct ClickhouseSinkConfig { - pub database_url: String, + #[serde(default = "ClickhouseSinkConfig::default_host")] + pub host: String, + #[serde(default = "ClickhouseSinkConfig::default_port")] + pub port: u16, + #[serde(default = "ClickhouseSinkConfig::default_user")] pub user: String, #[serde(default)] pub password: Option, @@ -175,10 +179,18 @@ impl ClickhouseSinkConfig { fn default_database() -> String { "default".to_string() } - fn default_scheme() -> String { "tcp".to_string() } + fn default_host() -> String { + "0.0.0.0".to_string() + } + fn default_port() -> u16 { + 9000 + } + fn default_user() -> String { + "default".to_string() + } } #[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq, Clone)] diff --git a/json_schemas/dozer.json b/json_schemas/dozer.json index 0869958c6a..7df2e5f128 100644 --- a/json_schemas/dozer.json +++ b/json_schemas/dozer.json @@ -405,10 +405,8 @@ "ClickhouseSinkConfig": { "type": "object", "required": [ - "database_url", "sink_table_name", - "source_table_name", - "user" + "source_table_name" ], "properties": { "create_table_options": { @@ -425,7 +423,8 @@ "default": "default", "type": "string" }, - "database_url": { + "host": { + "default": "0.0.0.0", "type": "string" }, "password": { @@ -435,6 +434,12 @@ "null" ] }, + "port": { + "default": 9000, + "type": "integer", + "format": "uint16", + "minimum": 0.0 + }, "primary_keys": { "type": [ "array", @@ -455,6 +460,7 @@ "type": "string" }, "user": { + "default": "default", "type": "string" } }, From b43c6125b90f60d1f4cb60ee39fe47523462461a Mon Sep 17 00:00:00 2001 From: VG Date: Thu, 7 Mar 2024 21:55:37 +0800 Subject: [PATCH 04/13] feat: implement batching --- dozer-sink-clickhouse/src/sink.rs | 21 +++- dozer-sink-clickhouse/src/tests.rs | 151 ++++++++++++----------------- dozer-types/src/models/sink.rs | 2 +- 3 files changed, 83 insertions(+), 91 deletions(-) diff --git a/dozer-sink-clickhouse/src/sink.rs b/dozer-sink-clickhouse/src/sink.rs index 56c2f10e78..5ffacd2722 100644 --- a/dozer-sink-clickhouse/src/sink.rs +++ b/dozer-sink-clickhouse/src/sink.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; +const BATCH_SIZE: usize = 100; #[derive(Debug)] pub struct ClickhouseSinkFactory { runtime: Arc, @@ -102,6 +103,7 @@ pub(crate) struct ClickhouseSink { pub(crate) sink_table_name: String, pub(crate) table: ClickhouseTable, pub(crate) primary_key_fields_indexes: Vec, + batch: Vec>, } impl Debug for ClickhouseSink { @@ -134,22 +136,32 @@ impl ClickhouseSink { sink_table_name: config.sink_table_name, table, primary_key_fields_indexes, + batch: Vec::new(), } } - fn insert_values(&self, values: &[Field]) -> Result<(), BoxedError> { + fn insert_values(&mut self, values: &[Field]) -> Result<(), BoxedError> { + // add values to batch instead of inserting immediately + self.batch.push(values.to_vec()); + Ok(()) + } + + fn commit_batch(&mut self) -> Result<(), BoxedError> { self.runtime.block_on(async { self.client - .insert(&self.sink_table_name, &self.schema.fields, values) + .insert_multi(&self.sink_table_name, &self.schema.fields, &self.batch) .await?; + Ok::<(), BoxedError>(()) })?; + self.batch.clear(); Ok(()) } } impl Sink for ClickhouseSink { fn commit(&mut self, _epoch_details: &Epoch) -> Result<(), BoxedError> { + self.commit_batch()?; Ok(()) } @@ -164,6 +176,10 @@ impl Sink for ClickhouseSink { } else { self.insert_values(&new.values)?; } + + if self.batch.len() > BATCH_SIZE - 1 { + self.commit_batch()?; + } } Operation::Delete { old } => { if self.table.engine != "CollapsingMergeTree" { @@ -191,6 +207,7 @@ impl Sink for ClickhouseSink { values.push(Field::Int(1)); self.insert_values(&values)?; } + self.commit_batch()?; } } diff --git a/dozer-sink-clickhouse/src/tests.rs b/dozer-sink-clickhouse/src/tests.rs index 1fcee6cdaa..46639f4bca 100644 --- a/dozer-sink-clickhouse/src/tests.rs +++ b/dozer-sink-clickhouse/src/tests.rs @@ -1,94 +1,69 @@ -// use crate::errors::ClickhouseSinkError; -// use crate::schema::ClickhouseSchema; -// use clickhouse::Client; -// use dozer_core::tokio; -// use dozer_types::models::sink::ClickhouseSinkConfig; -// use dozer_types::types::{FieldDefinition, FieldType, Schema}; +use crate::client::ClickhouseClient; +use crate::schema::ClickhouseSchema; +use dozer_core::tokio; +use dozer_types::models::sink::ClickhouseSinkConfig; +use dozer_types::types::{FieldDefinition, FieldType, Schema}; -// fn get_client() -> Client { -// Client::default() -// .with_url("http://localhost:8123") -// .with_user("default") -// .with_database("default") -// } +fn get_client() -> ClickhouseClient { + ClickhouseClient::new(get_sink_config()) +} -// fn get_sink_config() -> ClickhouseSinkConfig { -// ClickhouseSinkConfig { -// source_table_name: "source_table".to_string(), -// sink_table_name: "sink_table".to_string(), -// create_table_options: None, -// primary_keys: Some(vec!["id".to_string()]), -// user: "default".to_string(), -// password: Some("default".to_string()), -// database: "default".to_string(), -// database_url: "http://localhost:8123".to_string(), -// } -// } +fn get_sink_config() -> ClickhouseSinkConfig { + ClickhouseSinkConfig { + source_table_name: "source_table".to_string(), + sink_table_name: "sink_table".to_string(), + scheme: "tcp".to_string(), + create_table_options: None, + primary_keys: Some(vec!["id".to_string()]), + user: "default".to_string(), + password: None, + database: "default".to_string(), + host: "localhost".to_string(), + port: 9000, + } +} -// fn get_dozer_schema() -> Schema { -// Schema { -// fields: vec![ -// FieldDefinition { -// name: "id".to_string(), -// typ: FieldType::UInt, -// nullable: false, -// source: Default::default(), -// }, -// FieldDefinition { -// name: "data".to_string(), -// typ: FieldType::String, -// nullable: false, -// source: Default::default(), -// }, -// ], -// primary_index: vec![0], -// } -// } +fn _get_dozer_schema() -> Schema { + Schema { + fields: vec![ + FieldDefinition { + name: "id".to_string(), + typ: FieldType::UInt, + nullable: false, + source: Default::default(), + }, + FieldDefinition { + name: "data".to_string(), + typ: FieldType::String, + nullable: false, + source: Default::default(), + }, + ], + primary_index: vec![0], + } +} -// async fn create_table(table_name: &str) { -// let client = get_client(); -// client -// .query(&format!("DROP TABLE IF EXISTS {table_name}")) -// .execute() -// .await -// .unwrap(); +async fn create_table(table_name: &str) { + let mut client = get_client().get_client_handle().await.unwrap(); + client + .execute(&format!("DROP TABLE IF EXISTS {table_name}")) + .await + .unwrap(); -// client -// .query(&format!("CREATE TABLE {table_name}(id UInt64, data String, PRIMARY KEY id) ENGINE = MergeTree ORDER BY id")) -// .execute() -// .await -// .unwrap(); -// } + client + .execute(&format!("CREATE TABLE {table_name}(id UInt64, data String, PRIMARY KEY id) ENGINE = CollapsingMergeTree ORDER BY id")) + .await + .unwrap(); +} -// #[tokio::test] -// #[ignore] -// async fn test_get_clickhouse_table() { -// let client = get_client(); -// let sink_config = get_sink_config(); -// create_table(&sink_config.sink_table_name).await; -// let schema = get_dozer_schema(); - -// let clickhouse_table = ClickhouseSchema::get_clickhouse_table(&client, &sink_config, &schema) -// .await -// .unwrap(); -// assert_eq!(clickhouse_table.name, sink_config.sink_table_name); -// } - -// #[tokio::test] -// #[ignore] -// async fn test_get_not_existing_clickhouse_table() { -// let client = get_client(); -// let mut sink_config = get_sink_config(); -// create_table("not_existing").await; -// let schema = get_dozer_schema(); - -// sink_config.create_table_options = None; - -// let clickhouse_table = -// ClickhouseSchema::get_clickhouse_table(&client, &sink_config, &schema).await; -// eprintln!("CT {:?}", clickhouse_table); -// assert!(matches!( -// clickhouse_table, -// Err(ClickhouseSinkError::SinkTableDoesNotExist) -// )); -// } +#[tokio::test] +#[ignore] +async fn test_get_clickhouse_table() { + let client = get_client(); + let sink_config = get_sink_config(); + create_table(&sink_config.sink_table_name).await; + let clickhouse_table = ClickhouseSchema::get_clickhouse_table(client, &sink_config) + .await + .unwrap(); + assert_eq!(clickhouse_table.name, sink_config.sink_table_name); +} diff --git a/dozer-types/src/models/sink.rs b/dozer-types/src/models/sink.rs index 96faef6281..6e5aa6acd8 100644 --- a/dozer-types/src/models/sink.rs +++ b/dozer-types/src/models/sink.rs @@ -154,7 +154,7 @@ pub struct AerospikeSinkConfig { pub preferred_batch_size: Option, } -#[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq, Clone)] +#[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq, Clone, Default)] #[serde(deny_unknown_fields)] pub struct ClickhouseSinkConfig { #[serde(default = "ClickhouseSinkConfig::default_host")] From 622ef3587fc75fb92344f549479c67ac8d387502 Mon Sep 17 00:00:00 2001 From: VG Date: Thu, 7 Mar 2024 21:59:38 +0800 Subject: [PATCH 05/13] chore: fix decimal --- dozer-sink-clickhouse/src/ddl.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dozer-sink-clickhouse/src/ddl.rs b/dozer-sink-clickhouse/src/ddl.rs index 9b2e33b51f..680c8c8179 100644 --- a/dozer-sink-clickhouse/src/ddl.rs +++ b/dozer-sink-clickhouse/src/ddl.rs @@ -82,7 +82,7 @@ pub fn map_field_to_type(field: &FieldDefinition) -> String { FieldType::String => "String", FieldType::Text => "String", FieldType::Binary => "Array(UInt8)", - FieldType::Decimal => "Decimal", + FieldType::Decimal => "Decimal64", FieldType::Timestamp => "DateTime64(3)", FieldType::Date => "Date", FieldType::Json => "JSON", From 9129a90ea17f1e2058b2fac62242048bdd4c78b6 Mon Sep 17 00:00:00 2001 From: VG Date: Thu, 7 Mar 2024 22:04:47 +0800 Subject: [PATCH 06/13] chore: use a common format --- dozer-sink-clickhouse/src/ddl.rs | 36 ++---------------- dozer-sink-clickhouse/src/schema.rs | 57 +++++++++++++++++------------ 2 files changed, 37 insertions(+), 56 deletions(-) diff --git a/dozer-sink-clickhouse/src/ddl.rs b/dozer-sink-clickhouse/src/ddl.rs index 680c8c8179..07934b2bc0 100644 --- a/dozer-sink-clickhouse/src/ddl.rs +++ b/dozer-sink-clickhouse/src/ddl.rs @@ -1,6 +1,7 @@ -use dozer_types::log::warn; use dozer_types::models::sink::ClickhouseSinkTableOptions; -use dozer_types::types::{FieldDefinition, FieldType}; +use dozer_types::types::FieldDefinition; + +use crate::schema::map_field_to_type; const DEFAULT_TABLE_ENGINE: &str = "MergeTree()"; @@ -70,34 +71,3 @@ pub fn get_create_table_query( ", ) } - -pub fn map_field_to_type(field: &FieldDefinition) -> String { - let typ = match field.typ { - FieldType::UInt => "UInt64", - FieldType::U128 => "UInt128", - FieldType::Int => "Int64", - FieldType::I128 => "Int128", - FieldType::Float => "Float64", - FieldType::Boolean => "Boolean", - FieldType::String => "String", - FieldType::Text => "String", - FieldType::Binary => "Array(UInt8)", - FieldType::Decimal => "Decimal64", - FieldType::Timestamp => "DateTime64(3)", - FieldType::Date => "Date", - FieldType::Json => "JSON", - FieldType::Point => "Point", - FieldType::Duration => unimplemented!(), - }; - - if field.nullable { - if field.typ != FieldType::Binary { - format!("Nullable({})", typ) - } else { - warn!("Binary field cannot be nullable, ignoring nullable flag"); - typ.to_string() - } - } else { - typ.to_string() - } -} diff --git a/dozer-sink-clickhouse/src/schema.rs b/dozer-sink-clickhouse/src/schema.rs index e6192da7fe..0f7088c8cc 100644 --- a/dozer-sink-clickhouse/src/schema.rs +++ b/dozer-sink-clickhouse/src/schema.rs @@ -3,9 +3,10 @@ use crate::errors::ClickhouseSinkError::{self, SinkTableDoesNotExist}; use clickhouse_rs::types::Complex; use clickhouse_rs::{Block, ClientHandle}; use dozer_types::errors::internal::BoxedError; +use dozer_types::log::warn; use dozer_types::models::sink::ClickhouseSinkConfig; use dozer_types::serde::{Deserialize, Serialize}; -use dozer_types::types::{FieldType, Schema}; +use dozer_types::types::{FieldDefinition, FieldType, Schema}; #[derive(Debug, Deserialize, Serialize)] #[serde(crate = "dozer_types::serde")] @@ -112,28 +113,7 @@ impl ClickhouseSchema { return Err(ClickhouseSinkError::ColumnNotFound(field.name.clone())); }; - let mut expected_type = match field.typ { - FieldType::UInt => "UInt64", - FieldType::U128 => "Uint128", - FieldType::Int => "Int64", - FieldType::I128 => "Int128", - FieldType::Float => "Float64", - FieldType::Boolean => "Bool", - FieldType::String | FieldType::Text => "String", - FieldType::Binary => "UInt8", - FieldType::Decimal => "Decimal64", - FieldType::Timestamp => "DateTime64(9)", - FieldType::Date => "Date", - FieldType::Json => "Json", - FieldType::Point => "Point", - FieldType::Duration => { - return Err(ClickhouseSinkError::TypeNotSupported( - field.name.clone(), - "Duration".to_string(), - )) - } - } - .to_string(); + let mut expected_type = map_field_to_type(field); if field.nullable { expected_type = format!("Nullable({expected_type})"); @@ -203,3 +183,34 @@ impl ClickhouseSchema { Ok(keys) } } + +pub fn map_field_to_type(field: &FieldDefinition) -> String { + let typ = match field.typ { + FieldType::UInt => "UInt64", + FieldType::U128 => "UInt128", + FieldType::Int => "Int64", + FieldType::I128 => "Int128", + FieldType::Float => "Float64", + FieldType::Boolean => "Boolean", + FieldType::String => "String", + FieldType::Text => "String", + FieldType::Binary => "Array(UInt8)", + FieldType::Decimal => "Decimal", + FieldType::Timestamp => "DateTime64(3)", + FieldType::Date => "Date", + FieldType::Json => "JSON", + FieldType::Point => "Point", + FieldType::Duration => unimplemented!(), + }; + + if field.nullable { + if field.typ != FieldType::Binary { + format!("Nullable({})", typ) + } else { + warn!("Binary field cannot be nullable, ignoring nullable flag"); + typ.to_string() + } + } else { + typ.to_string() + } +} From 0100965dd289940a1b2d30bee1971d5494405b7c Mon Sep 17 00:00:00 2001 From: VG Date: Thu, 7 Mar 2024 23:25:41 +0800 Subject: [PATCH 07/13] chore: fix table creation --- dozer-sink-clickhouse/src/client.rs | 11 ++++++----- dozer-sink-clickhouse/src/ddl.rs | 29 ++++++++++++++--------------- dozer-sink-clickhouse/src/errors.rs | 3 --- dozer-sink-clickhouse/src/schema.rs | 16 +++------------- 4 files changed, 23 insertions(+), 36 deletions(-) diff --git a/dozer-sink-clickhouse/src/client.rs b/dozer-sink-clickhouse/src/client.rs index b34b772780..eca3c7ae1a 100644 --- a/dozer-sink-clickhouse/src/client.rs +++ b/dozer-sink-clickhouse/src/client.rs @@ -16,6 +16,7 @@ pub struct SqlResult { #[derive(Clone)] pub struct ClickhouseClient { pool: Pool, + config: ClickhouseSinkConfig, } #[derive(Debug, Clone, Serialize)] pub struct QueryId(pub String); @@ -37,15 +38,15 @@ pub struct ClickhouseOptionsWrapper(ClickhouseSinkConfig); impl ClickhouseClient { pub fn new(config: ClickhouseSinkConfig) -> Self { - let url = Self::construct_url(config); + let url = Self::construct_url(&config); let pool = Pool::new(url); - Self { pool } + Self { pool, config } } - pub fn construct_url(config: ClickhouseSinkConfig) -> String { + pub fn construct_url(config: &ClickhouseSinkConfig) -> String { let user_password = match &config.password { Some(password) => format!("{}:{}", config.user, password), - None => config.user, + None => config.user.to_string(), }; let url = format!( @@ -93,7 +94,7 @@ impl ClickhouseClient { fields: &[FieldDefinition], ) -> Result<(), QueryError> { let mut client = self.pool.get_handle().await?; - let ddl = get_create_table_query(datasource_name, fields, None, None); + let ddl = get_create_table_query(datasource_name, fields, self.config.clone()); info!("Creating Clickhouse Sink Table"); info!("{ddl}"); client.execute(ddl).await?; diff --git a/dozer-sink-clickhouse/src/ddl.rs b/dozer-sink-clickhouse/src/ddl.rs index 07934b2bc0..1928e1aa82 100644 --- a/dozer-sink-clickhouse/src/ddl.rs +++ b/dozer-sink-clickhouse/src/ddl.rs @@ -1,4 +1,4 @@ -use dozer_types::models::sink::ClickhouseSinkTableOptions; +use dozer_types::models::sink::ClickhouseSinkConfig; use dozer_types::types::FieldDefinition; use crate::schema::map_field_to_type; @@ -8,8 +8,7 @@ const DEFAULT_TABLE_ENGINE: &str = "MergeTree()"; pub fn get_create_table_query( table_name: &str, fields: &[FieldDefinition], - sink_options: Option, - primary_keys: Option>, + config: ClickhouseSinkConfig, ) -> String { let mut parts = fields .iter() @@ -19,41 +18,41 @@ pub fn get_create_table_query( }) .collect::>(); - let engine = sink_options + let engine = config + .create_table_options .as_ref() - .map(|options| { - options - .engine - .clone() - .unwrap_or_else(|| DEFAULT_TABLE_ENGINE.to_string()) - }) + .and_then(|c| c.engine.clone()) .unwrap_or_else(|| DEFAULT_TABLE_ENGINE.to_string()); - if let Some(pk) = primary_keys { + if let Some(pk) = config.primary_keys { parts.push(format!("PRIMARY KEY ({})", pk.join(", "))); } let query = parts.join(",\n"); - let partition_by = sink_options + let partition_by = config + .create_table_options .as_ref() .and_then(|options| options.partition_by.clone()) .map_or("".to_string(), |partition_by| { format!("PARTITION BY {}\n", partition_by) }); - let sample_by = sink_options + let sample_by = config + .create_table_options .as_ref() .and_then(|options| options.sample_by.clone()) .map_or("".to_string(), |partition_by| { format!("SAMPLE BY {}\n", partition_by) }); - let order_by = sink_options + let order_by = config + .create_table_options .as_ref() .and_then(|options| options.order_by.clone()) .map_or("".to_string(), |order_by| { format!("ORDER BY ({})\n", order_by.join(", ")) }); - let cluster = sink_options + let cluster = config + .create_table_options .as_ref() .and_then(|options| options.cluster.clone()) .map_or("".to_string(), |cluster| { diff --git a/dozer-sink-clickhouse/src/errors.rs b/dozer-sink-clickhouse/src/errors.rs index e056181d07..d3fc752c5c 100644 --- a/dozer-sink-clickhouse/src/errors.rs +++ b/dozer-sink-clickhouse/src/errors.rs @@ -17,9 +17,6 @@ pub enum ClickhouseSinkError { #[error("Primary key not found")] PrimaryKeyNotFound, - #[error("Type {1} is not supported for column {0}")] - TypeNotSupported(String, String), - #[error("Sink table does not exist and create_table_options is not set")] SinkTableDoesNotExist, diff --git a/dozer-sink-clickhouse/src/schema.rs b/dozer-sink-clickhouse/src/schema.rs index 0f7088c8cc..f3c617512c 100644 --- a/dozer-sink-clickhouse/src/schema.rs +++ b/dozer-sink-clickhouse/src/schema.rs @@ -112,17 +112,7 @@ impl ClickhouseSchema { let Some(column) = columns.iter().find(|column| column.name == field.name) else { return Err(ClickhouseSinkError::ColumnNotFound(field.name.clone())); }; - - let mut expected_type = map_field_to_type(field); - - if field.nullable { - expected_type = format!("Nullable({expected_type})"); - } - - if field.typ == FieldType::Binary { - expected_type = format!("Array({expected_type})"); - } - + let expected_type = map_field_to_type(field); let column_type = column.type_.clone(); if expected_type != column_type { return Err(ClickhouseSinkError::ColumnTypeMismatch( @@ -185,7 +175,7 @@ impl ClickhouseSchema { } pub fn map_field_to_type(field: &FieldDefinition) -> String { - let typ = match field.typ { + let typ: &str = match field.typ { FieldType::UInt => "UInt64", FieldType::U128 => "UInt128", FieldType::Int => "Int64", @@ -195,7 +185,7 @@ pub fn map_field_to_type(field: &FieldDefinition) -> String { FieldType::String => "String", FieldType::Text => "String", FieldType::Binary => "Array(UInt8)", - FieldType::Decimal => "Decimal", + FieldType::Decimal => "Decimal(10, 0)", FieldType::Timestamp => "DateTime64(3)", FieldType::Date => "Date", FieldType::Json => "JSON", From e78bd2bb9e17d5981df08a85abd9c8591de0c8bd Mon Sep 17 00:00:00 2001 From: VG Date: Fri, 8 Mar 2024 00:18:43 +0800 Subject: [PATCH 08/13] chore: fix decimal --- dozer-sink-clickhouse/src/schema.rs | 4 +++- dozer-sink-clickhouse/src/types.rs | 8 ++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/dozer-sink-clickhouse/src/schema.rs b/dozer-sink-clickhouse/src/schema.rs index f3c617512c..0d99871e17 100644 --- a/dozer-sink-clickhouse/src/schema.rs +++ b/dozer-sink-clickhouse/src/schema.rs @@ -1,5 +1,6 @@ use crate::client::ClickhouseClient; use crate::errors::ClickhouseSinkError::{self, SinkTableDoesNotExist}; +use crate::types::DECIMAL_SCALE; use clickhouse_rs::types::Complex; use clickhouse_rs::{Block, ClientHandle}; use dozer_types::errors::internal::BoxedError; @@ -175,6 +176,7 @@ impl ClickhouseSchema { } pub fn map_field_to_type(field: &FieldDefinition) -> String { + let decimal = format!("Decimal(10, {})", DECIMAL_SCALE); let typ: &str = match field.typ { FieldType::UInt => "UInt64", FieldType::U128 => "UInt128", @@ -185,7 +187,7 @@ pub fn map_field_to_type(field: &FieldDefinition) -> String { FieldType::String => "String", FieldType::Text => "String", FieldType::Binary => "Array(UInt8)", - FieldType::Decimal => "Decimal(10, 0)", + FieldType::Decimal => &decimal, FieldType::Timestamp => "DateTime64(3)", FieldType::Date => "Date", FieldType::Json => "JSON", diff --git a/dozer-sink-clickhouse/src/types.rs b/dozer-sink-clickhouse/src/types.rs index 75c10df85e..c2cf3b831b 100644 --- a/dozer-sink-clickhouse/src/types.rs +++ b/dozer-sink-clickhouse/src/types.rs @@ -13,6 +13,7 @@ use either::Either; use clickhouse_rs::types::{FromSql, Value, ValueRef}; +pub const DECIMAL_SCALE: u8 = 4; pub struct ValueWrapper(pub Value); impl<'a> FromSql<'a> for ValueWrapper { @@ -263,10 +264,13 @@ pub async fn insert_multi( nullable, b, Decimal, - f64, + clickhouse_rs::types::Decimal, column_values, n, - |f: &rust_decimal::Decimal| -> Option { f.to_f64() } + |f: &rust_decimal::Decimal| -> Option { + f.to_f64() + .map(|f| clickhouse_rs::types::Decimal::of(f, DECIMAL_SCALE)) + } ) } FieldType::Timestamp => { From 9179dae0d708249e674d1c013d419b92bd5525fa Mon Sep 17 00:00:00 2001 From: VG Date: Fri, 8 Mar 2024 00:28:23 +0800 Subject: [PATCH 09/13] chore: fix clippy --- dozer-sink-clickhouse/src/types.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dozer-sink-clickhouse/src/types.rs b/dozer-sink-clickhouse/src/types.rs index c2cf3b831b..56ade44a41 100644 --- a/dozer-sink-clickhouse/src/types.rs +++ b/dozer-sink-clickhouse/src/types.rs @@ -1,3 +1,4 @@ +#![allow(clippy::redundant_closure_call)] use crate::errors::QueryError; use chrono_tz::{Tz, UTC}; @@ -294,7 +295,7 @@ pub async fn insert_multi( NaiveDate, column_values, n, - |f: &NaiveDate| -> Option { Some(f.clone()) } + |f: &NaiveDate| -> Option { Some(*f) } ) } FieldType::Json => { From 24863243317cc8e939f7b587a0783459894607a0 Mon Sep 17 00:00:00 2001 From: VG Date: Fri, 8 Mar 2024 01:27:52 +0800 Subject: [PATCH 10/13] chore: update client --- dozer-sink-clickhouse/src/client.rs | 88 ++--------------------------- dozer-sink-clickhouse/src/ddl.rs | 30 +++++----- dozer-sink-clickhouse/src/lib.rs | 10 ++-- dozer-sink-clickhouse/src/schema.rs | 32 ----------- dozer-sink-clickhouse/src/sink.rs | 29 ++-------- dozer-sink-clickhouse/src/tests.rs | 1 - dozer-types/src/models/sink.rs | 6 +- json_schemas/dozer.json | 22 ++++---- 8 files changed, 44 insertions(+), 174 deletions(-) diff --git a/dozer-sink-clickhouse/src/client.rs b/dozer-sink-clickhouse/src/client.rs index eca3c7ae1a..6f94c2a77a 100644 --- a/dozer-sink-clickhouse/src/client.rs +++ b/dozer-sink-clickhouse/src/client.rs @@ -5,10 +5,8 @@ use crate::errors::QueryError; use crate::types::{insert_multi, map_value_wrapper_to_field}; use clickhouse_rs::{ClientHandle, Pool}; use dozer_types::log::{debug, info}; -use dozer_types::models::sink::ClickhouseSinkConfig; +use dozer_types::models::sink::{ClickhouseSinkConfig, ClickhouseTableOptions}; use dozer_types::types::{Field, FieldDefinition}; -use serde::Serialize; - pub struct SqlResult { pub rows: Vec>, } @@ -16,31 +14,13 @@ pub struct SqlResult { #[derive(Clone)] pub struct ClickhouseClient { pool: Pool, - config: ClickhouseSinkConfig, } -#[derive(Debug, Clone, Serialize)] -pub struct QueryId(pub String); - -#[derive(Debug, Clone, Serialize)] -#[serde(crate = "dozer_types::serde")] - -pub struct QueryLog { - pub query_duration_ms: u64, - pub read_rows: u64, - pub read_bytes: u64, - pub written_rows: u64, - pub written_bytes: u64, - pub result_rows: u64, - pub result_bytes: u64, - pub memory_usage: u64, -} -pub struct ClickhouseOptionsWrapper(ClickhouseSinkConfig); impl ClickhouseClient { pub fn new(config: ClickhouseSinkConfig) -> Self { let url = Self::construct_url(&config); let pool = Pool::new(url); - Self { pool, config } + Self { pool } } pub fn construct_url(config: &ClickhouseSinkConfig) -> String { @@ -57,24 +37,6 @@ impl ClickhouseClient { url } - pub fn get_log_query(log_comment: &str) -> String { - format!( - r#" - SELECT - query_duration_ms, - read_rows, - read_bytes, - written_rows, - written_bytes, - result_rows, - result_bytes, - memory_usage - FROM system.query_log WHERE - log_comment = '{}' - "#, - log_comment - ) - } pub async fn get_client_handle(&self) -> Result { let client = self.pool.get_handle().await?; Ok(client) @@ -92,10 +54,11 @@ impl ClickhouseClient { &self, datasource_name: &str, fields: &[FieldDefinition], + table_options: Option, ) -> Result<(), QueryError> { let mut client = self.pool.get_handle().await?; - let ddl = get_create_table_query(datasource_name, fields, self.config.clone()); - info!("Creating Clickhouse Sink Table"); + let ddl = get_create_table_query(datasource_name, fields, table_options); + info!("Creating Clickhouse Table"); info!("{ddl}"); client.execute(ddl).await?; Ok(()) @@ -130,32 +93,6 @@ impl ClickhouseClient { Ok(SqlResult { rows }) } - pub async fn _fetch_query_log(&self, query_id: String) -> Result { - let mut client = self.pool.get_handle().await?; - let query = Self::get_log_query(&query_id); - let block = client.query(query).fetch_all().await?; - let first_row = block.rows().next(); - - if let Some(row) = first_row { - let query_log = QueryLog { - query_duration_ms: row.get("query_duration_ms")?, - read_rows: row.get("read_rows")?, - read_bytes: row.get("read_bytes")?, - written_rows: row.get("written_rows")?, - written_bytes: row.get("written_bytes")?, - result_rows: row.get("result_rows")?, - result_bytes: row.get("result_bytes")?, - memory_usage: row.get("memory_usage")?, - }; - Ok(query_log) - } else { - Err(QueryError::CustomError(format!( - "No query log found for {0}", - query_id - ))) - } - } - pub async fn check_table(&self, table_name: &str) -> Result { let mut client = self.pool.get_handle().await?; let query = format!("CHECK TABLE {}", table_name); @@ -165,21 +102,6 @@ impl ClickhouseClient { Ok(true) } - pub async fn create_materialized_view( - &self, - name: &str, - target_table: &str, - query: &str, - ) -> Result<(), QueryError> { - let mut client = self.pool.get_handle().await?; - let ddl = format!( - "CREATE MATERIALIZED VIEW {} TO {} AS {}", - name, target_table, query - ); - client.execute(ddl).await?; - Ok(()) - } - pub async fn insert( &self, table_name: &str, diff --git a/dozer-sink-clickhouse/src/ddl.rs b/dozer-sink-clickhouse/src/ddl.rs index 1928e1aa82..0610b1e41a 100644 --- a/dozer-sink-clickhouse/src/ddl.rs +++ b/dozer-sink-clickhouse/src/ddl.rs @@ -1,4 +1,4 @@ -use dozer_types::models::sink::ClickhouseSinkConfig; +use dozer_types::models::sink::ClickhouseTableOptions; use dozer_types::types::FieldDefinition; use crate::schema::map_field_to_type; @@ -8,7 +8,7 @@ const DEFAULT_TABLE_ENGINE: &str = "MergeTree()"; pub fn get_create_table_query( table_name: &str, fields: &[FieldDefinition], - config: ClickhouseSinkConfig, + table_options: Option, ) -> String { let mut parts = fields .iter() @@ -18,41 +18,41 @@ pub fn get_create_table_query( }) .collect::>(); - let engine = config - .create_table_options + let engine = table_options .as_ref() .and_then(|c| c.engine.clone()) .unwrap_or_else(|| DEFAULT_TABLE_ENGINE.to_string()); - if let Some(pk) = config.primary_keys { - parts.push(format!("PRIMARY KEY ({})", pk.join(", "))); - } + parts.push( + table_options + .as_ref() + .and_then(|options| options.primary_keys.clone()) + .map_or("".to_string(), |pk| { + format!("PRIMARY KEY ({})", pk.join(", ")) + }), + ); let query = parts.join(",\n"); - let partition_by = config - .create_table_options + let partition_by = table_options .as_ref() .and_then(|options| options.partition_by.clone()) .map_or("".to_string(), |partition_by| { format!("PARTITION BY {}\n", partition_by) }); - let sample_by = config - .create_table_options + let sample_by = table_options .as_ref() .and_then(|options| options.sample_by.clone()) .map_or("".to_string(), |partition_by| { format!("SAMPLE BY {}\n", partition_by) }); - let order_by = config - .create_table_options + let order_by = table_options .as_ref() .and_then(|options| options.order_by.clone()) .map_or("".to_string(), |order_by| { format!("ORDER BY ({})\n", order_by.join(", ")) }); - let cluster = config - .create_table_options + let cluster = table_options .as_ref() .and_then(|options| options.cluster.clone()) .map_or("".to_string(), |cluster| { diff --git a/dozer-sink-clickhouse/src/lib.rs b/dozer-sink-clickhouse/src/lib.rs index 75f98504ca..01c4008d0c 100644 --- a/dozer-sink-clickhouse/src/lib.rs +++ b/dozer-sink-clickhouse/src/lib.rs @@ -1,9 +1,9 @@ -mod client; -mod ddl; -mod errors; -mod schema; +pub mod client; +pub mod ddl; +pub mod errors; +pub mod schema; mod sink; pub use sink::ClickhouseSinkFactory; #[cfg(test)] mod tests; -mod types; +pub mod types; diff --git a/dozer-sink-clickhouse/src/schema.rs b/dozer-sink-clickhouse/src/schema.rs index 0d99871e17..d0fb2369f5 100644 --- a/dozer-sink-clickhouse/src/schema.rs +++ b/dozer-sink-clickhouse/src/schema.rs @@ -3,7 +3,6 @@ use crate::errors::ClickhouseSinkError::{self, SinkTableDoesNotExist}; use crate::types::DECIMAL_SCALE; use clickhouse_rs::types::Complex; use clickhouse_rs::{Block, ClientHandle}; -use dozer_types::errors::internal::BoxedError; use dozer_types::log::warn; use dozer_types::models::sink::ClickhouseSinkConfig; use dozer_types::serde::{Deserialize, Serialize}; @@ -51,37 +50,6 @@ impl ClickhouseSchema { } } - pub async fn get_primary_keys( - client: ClickhouseClient, - config: &ClickhouseSinkConfig, - ) -> Result, BoxedError> { - let handle = client.get_client_handle().await?; - let existing_pk = - Self::fetch_primary_keys(handle, &config.sink_table_name, &config.database).await?; - - if let Some(expected_pk) = &config.primary_keys { - if expected_pk.len() != existing_pk.len() { - return Err(ClickhouseSinkError::PrimaryKeyMismatch( - expected_pk.clone(), - existing_pk.clone(), - ) - .into()); - } - - for pk in expected_pk { - if !existing_pk.iter().any(|existing_pk| existing_pk == pk) { - return Err(ClickhouseSinkError::PrimaryKeyMismatch( - expected_pk.clone(), - existing_pk.clone(), - ) - .into()); - } - } - } - - Ok(existing_pk) - } - pub async fn compare_with_dozer_schema( client: ClickhouseClient, schema: &Schema, diff --git a/dozer-sink-clickhouse/src/sink.rs b/dozer-sink-clickhouse/src/sink.rs index 5ffacd2722..5c1b253017 100644 --- a/dozer-sink-clickhouse/src/sink.rs +++ b/dozer-sink-clickhouse/src/sink.rs @@ -61,35 +61,23 @@ impl SinkFactory for ClickhouseSinkFactory { let config = &self.config; if self.config.create_table_options.is_some() { client - .create_table(&config.sink_table_name, &schema.fields) + .create_table( + &config.sink_table_name, + &schema.fields, + self.config.create_table_options.clone(), + ) .await?; } let table = ClickhouseSchema::get_clickhouse_table(client.clone(), &self.config).await?; ClickhouseSchema::compare_with_dozer_schema(client.clone(), &schema, &table).await?; - let primary_key_field_names = - ClickhouseSchema::get_primary_keys(client.clone(), &self.config).await?; - - let primary_key_fields_indexes: Result, ClickhouseSinkError> = - primary_key_field_names - .iter() - .map(|primary_key| { - schema - .fields - .iter() - .position(|field| field.name == *primary_key) - .ok_or(ClickhouseSinkError::PrimaryKeyNotFound) - }) - .collect(); - let sink = ClickhouseSink::new( client, self.config.clone(), schema, self.runtime.clone(), table, - primary_key_fields_indexes?, ); Ok(Box::new(sink)) @@ -102,7 +90,6 @@ pub(crate) struct ClickhouseSink { pub(crate) schema: Schema, pub(crate) sink_table_name: String, pub(crate) table: ClickhouseTable, - pub(crate) primary_key_fields_indexes: Vec, batch: Vec>, } @@ -110,10 +97,6 @@ impl Debug for ClickhouseSink { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ClickhouseSink") .field("sink_table_name", &self.sink_table_name) - .field( - "primary_key_fields_indexes", - &self.primary_key_fields_indexes, - ) .field("table", &self.table) .field("schema", &self.schema) .finish() @@ -127,7 +110,6 @@ impl ClickhouseSink { schema: Schema, runtime: Arc, table: ClickhouseTable, - primary_key_fields_indexes: Vec, ) -> Self { Self { client, @@ -135,7 +117,6 @@ impl ClickhouseSink { schema, sink_table_name: config.sink_table_name, table, - primary_key_fields_indexes, batch: Vec::new(), } } diff --git a/dozer-sink-clickhouse/src/tests.rs b/dozer-sink-clickhouse/src/tests.rs index 46639f4bca..3f7102fd42 100644 --- a/dozer-sink-clickhouse/src/tests.rs +++ b/dozer-sink-clickhouse/src/tests.rs @@ -14,7 +14,6 @@ fn get_sink_config() -> ClickhouseSinkConfig { sink_table_name: "sink_table".to_string(), scheme: "tcp".to_string(), create_table_options: None, - primary_keys: Some(vec!["id".to_string()]), user: "default".to_string(), password: None, database: "default".to_string(), diff --git a/dozer-types/src/models/sink.rs b/dozer-types/src/models/sink.rs index 6e5aa6acd8..209b38919e 100644 --- a/dozer-types/src/models/sink.rs +++ b/dozer-types/src/models/sink.rs @@ -171,8 +171,7 @@ pub struct ClickhouseSinkConfig { pub database: String, pub source_table_name: String, pub sink_table_name: String, - pub primary_keys: Option>, - pub create_table_options: Option, + pub create_table_options: Option, } impl ClickhouseSinkConfig { @@ -195,8 +194,9 @@ impl ClickhouseSinkConfig { #[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq, Clone)] #[serde(deny_unknown_fields)] -pub struct ClickhouseSinkTableOptions { +pub struct ClickhouseTableOptions { pub engine: Option, + pub primary_keys: Option>, pub partition_by: Option, pub sample_by: Option, pub order_by: Option>, diff --git a/json_schemas/dozer.json b/json_schemas/dozer.json index 7df2e5f128..7e8d73da5b 100644 --- a/json_schemas/dozer.json +++ b/json_schemas/dozer.json @@ -412,7 +412,7 @@ "create_table_options": { "anyOf": [ { - "$ref": "#/definitions/ClickhouseSinkTableOptions" + "$ref": "#/definitions/ClickhouseTableOptions" }, { "type": "null" @@ -440,15 +440,6 @@ "format": "uint16", "minimum": 0.0 }, - "primary_keys": { - "type": [ - "array", - "null" - ], - "items": { - "type": "string" - } - }, "scheme": { "default": "tcp", "type": "string" @@ -466,7 +457,7 @@ }, "additionalProperties": false }, - "ClickhouseSinkTableOptions": { + "ClickhouseTableOptions": { "type": "object", "properties": { "cluster": { @@ -496,6 +487,15 @@ "null" ] }, + "primary_keys": { + "type": [ + "array", + "null" + ], + "items": { + "type": "string" + } + }, "sample_by": { "type": [ "string", From b29667c1a949f4d418aeed7f3ad0059347d4ee0b Mon Sep 17 00:00:00 2001 From: VG Date: Fri, 8 Mar 2024 01:34:48 +0800 Subject: [PATCH 11/13] chore: update pub crate --- dozer-sink-clickhouse/src/schema.rs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/dozer-sink-clickhouse/src/schema.rs b/dozer-sink-clickhouse/src/schema.rs index d0fb2369f5..ca920e8562 100644 --- a/dozer-sink-clickhouse/src/schema.rs +++ b/dozer-sink-clickhouse/src/schema.rs @@ -17,19 +17,19 @@ pub struct ClickhouseSchemaColumn { #[derive(Debug, Deserialize, Serialize, Clone)] #[serde(crate = "dozer_types::serde")] -pub(crate) struct ClickhouseTable { - pub(crate) database: String, - pub(crate) name: String, - pub(crate) engine: String, - pub(crate) engine_full: String, +pub struct ClickhouseTable { + pub database: String, + pub name: String, + pub engine: String, + pub engine_full: String, } #[derive(Debug, Deserialize, Serialize)] #[serde(crate = "dozer_types::serde")] -pub(crate) struct ClickhouseKeyColumnDef { - pub(crate) column_name: Option, - pub(crate) constraint_name: Option, - pub(crate) constraint_schema: String, +pub struct ClickhouseKeyColumnDef { + pub column_name: Option, + pub constraint_name: Option, + pub constraint_schema: String, } pub struct ClickhouseSchema {} @@ -115,6 +115,7 @@ impl ClickhouseSchema { }) } + #[allow(dead_code)] async fn fetch_primary_keys( mut handle: ClientHandle, sink_table_name: &str, From ce0fde33f63ebbbd3a3a43507ab1810f2338a0f4 Mon Sep 17 00:00:00 2001 From: VG Date: Fri, 8 Mar 2024 02:00:19 +0800 Subject: [PATCH 12/13] remove println Signed-off-by: VG --- dozer-sink-clickhouse/src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dozer-sink-clickhouse/src/client.rs b/dozer-sink-clickhouse/src/client.rs index 6f94c2a77a..b986ea0d49 100644 --- a/dozer-sink-clickhouse/src/client.rs +++ b/dozer-sink-clickhouse/src/client.rs @@ -45,7 +45,7 @@ impl ClickhouseClient { pub async fn drop_table(&self, datasource_name: &str) -> Result<(), QueryError> { let mut client = self.pool.get_handle().await?; let ddl = format!("DROP TABLE IF EXISTS {}", datasource_name); - println!("#{ddl}"); + info!("#{ddl}"); client.execute(ddl).await?; Ok(()) } From 2e3debd92e87d0663c2b9e2f17bfe083360a43f5 Mon Sep 17 00:00:00 2001 From: VG Date: Fri, 8 Mar 2024 15:26:52 +0800 Subject: [PATCH 13/13] feat: implement clikchouse sink checkpointing --- dozer-sink-clickhouse/src/lib.rs | 1 + dozer-sink-clickhouse/src/metadata.rs | 47 ++++++++++++ dozer-sink-clickhouse/src/sink.rs | 102 +++++++++++++++++++++++++- 3 files changed, 148 insertions(+), 2 deletions(-) create mode 100644 dozer-sink-clickhouse/src/metadata.rs diff --git a/dozer-sink-clickhouse/src/lib.rs b/dozer-sink-clickhouse/src/lib.rs index 01c4008d0c..426b54ba6c 100644 --- a/dozer-sink-clickhouse/src/lib.rs +++ b/dozer-sink-clickhouse/src/lib.rs @@ -4,6 +4,7 @@ pub mod errors; pub mod schema; mod sink; pub use sink::ClickhouseSinkFactory; +pub mod metadata; #[cfg(test)] mod tests; pub mod types; diff --git a/dozer-sink-clickhouse/src/metadata.rs b/dozer-sink-clickhouse/src/metadata.rs new file mode 100644 index 0000000000..4d3b409e02 --- /dev/null +++ b/dozer-sink-clickhouse/src/metadata.rs @@ -0,0 +1,47 @@ +use dozer_types::types::{FieldDefinition, FieldType, Schema, SourceDefinition}; + +// Replication Metadata Constants +pub const REPLICA_METADATA_TABLE: &str = "__dozer_replication_metadata"; +pub const META_TABLE_COL: &str = "table"; +pub const META_TXN_ID_COL: &str = "txn_id"; + +pub struct ReplicationMetadata { + pub schema: Schema, + pub table_name: String, +} + +impl ReplicationMetadata { + pub fn schema(&self) -> &Schema { + &self.schema + } + + pub fn get_primary_keys(&self) -> Vec { + vec![META_TABLE_COL.to_string()] + } + + pub fn get_metadata() -> ReplicationMetadata { + ReplicationMetadata { + table_name: REPLICA_METADATA_TABLE.to_string(), + schema: Schema::new() + .field( + FieldDefinition { + name: META_TABLE_COL.to_owned(), + typ: FieldType::String, + nullable: false, + source: SourceDefinition::Dynamic, + }, + true, + ) + .field( + FieldDefinition { + name: META_TXN_ID_COL.to_owned(), + typ: FieldType::UInt, + nullable: false, + source: SourceDefinition::Dynamic, + }, + false, + ) + .clone(), + } + } +} diff --git a/dozer-sink-clickhouse/src/sink.rs b/dozer-sink-clickhouse/src/sink.rs index 5c1b253017..b03ab68f86 100644 --- a/dozer-sink-clickhouse/src/sink.rs +++ b/dozer-sink-clickhouse/src/sink.rs @@ -5,11 +5,15 @@ use dozer_core::tokio::runtime::Runtime; use dozer_core::DEFAULT_PORT_HANDLE; use dozer_types::errors::internal::BoxedError; -use dozer_types::models::sink::ClickhouseSinkConfig; +use dozer_types::log::debug; +use dozer_types::models::sink::{ClickhouseSinkConfig, ClickhouseTableOptions}; use dozer_types::node::OpIdentifier; use crate::client::ClickhouseClient; use crate::errors::ClickhouseSinkError; +use crate::metadata::{ + ReplicationMetadata, META_TABLE_COL, META_TXN_ID_COL, REPLICA_METADATA_TABLE, +}; use crate::schema::{ClickhouseSchema, ClickhouseTable}; use dozer_types::tonic::async_trait; use dozer_types::types::{Field, Operation, Schema, TableOperation}; @@ -18,6 +22,7 @@ use std::fmt::Debug; use std::sync::Arc; const BATCH_SIZE: usize = 100; + #[derive(Debug)] pub struct ClickhouseSinkFactory { runtime: Arc, @@ -28,6 +33,36 @@ impl ClickhouseSinkFactory { pub fn new(config: ClickhouseSinkConfig, runtime: Arc) -> Self { Self { config, runtime } } + + pub async fn create_replication_metadata_table(&self) -> Result<(), BoxedError> { + let client = ClickhouseClient::new(self.config.clone()); + let repl_metadata = ReplicationMetadata::get_metadata(); + + let primary_keys = repl_metadata.get_primary_keys(); + let partition_by = format!("({})", primary_keys.join(",")); + let create_table_options = ClickhouseTableOptions { + engine: Some("ReplacingMergeTree".to_string()), + primary_keys: Some(repl_metadata.get_primary_keys()), + partition_by: Some(partition_by), + // Replaced using this key + order_by: Some(repl_metadata.get_primary_keys()), + cluster: self + .config + .create_table_options + .as_ref() + .and_then(|o| o.cluster.clone()), + sample_by: None, + }; + client + .create_table( + &repl_metadata.table_name, + &repl_metadata.schema.fields, + Some(create_table_options), + ) + .await?; + + Ok(()) + } } #[async_trait] @@ -59,6 +94,11 @@ impl SinkFactory for ClickhouseSinkFactory { let client = ClickhouseClient::new(self.config.clone()); let config = &self.config; + + // Create Sink Table + self.create_replication_metadata_table().await?; + + // Create Sink Table if self.config.create_table_options.is_some() { client .create_table( @@ -91,6 +131,8 @@ pub(crate) struct ClickhouseSink { pub(crate) sink_table_name: String, pub(crate) table: ClickhouseTable, batch: Vec>, + metadata: ReplicationMetadata, + latest_txid: Option, } impl Debug for ClickhouseSink { @@ -118,7 +160,30 @@ impl ClickhouseSink { sink_table_name: config.sink_table_name, table, batch: Vec::new(), + latest_txid: None, + metadata: ReplicationMetadata::get_metadata(), + } + } + + pub async fn insert_metadata(&self) -> Result<(), BoxedError> { + debug!( + "[Sink] Inserting metadata record {:?} {}", + self.latest_txid, + self.sink_table_name.clone() + ); + if let Some(txid) = self.latest_txid { + self.client + .insert( + REPLICA_METADATA_TABLE, + &self.metadata.schema.fields, + &[ + Field::String(self.sink_table_name.clone()), + Field::UInt(txid), + ], + ) + .await?; } + Ok(()) } fn insert_values(&mut self, values: &[Field]) -> Result<(), BoxedError> { @@ -129,24 +194,54 @@ impl ClickhouseSink { fn commit_batch(&mut self) -> Result<(), BoxedError> { self.runtime.block_on(async { + //Insert batch self.client .insert_multi(&self.sink_table_name, &self.schema.fields, &self.batch) .await?; + self.insert_metadata().await?; Ok::<(), BoxedError>(()) })?; + self.batch.clear(); Ok(()) } + + fn _get_latest_op(&mut self) -> Result, BoxedError> { + let op = self.runtime.block_on(async { + let mut client = self.client.get_client_handle().await?; + let table_name = self.sink_table_name.clone(); + let query = format!("SELECT \"{META_TXN_ID_COL}\" FROM \"{REPLICA_METADATA_TABLE}\" WHERE \"{META_TABLE_COL}\" = '\"{table_name}\"' ORDER BY \"{META_TXN_ID_COL}\" LIMIT 1"); + let block = client + .query(query) + .fetch_all() + .await?; + + let row = block.rows().next(); + match row { + Some(row) => { + let txid: u64 = row.get(META_TXN_ID_COL)?; + Ok::, BoxedError>(Some(OpIdentifier { txid, seq_in_tx: 0 })) + }, + None => Ok::, BoxedError>(None), + } + })?; + Ok(op) + } } impl Sink for ClickhouseSink { fn commit(&mut self, _epoch_details: &Epoch) -> Result<(), BoxedError> { + Ok(()) + } + + fn flush_batch(&mut self) -> Result<(), BoxedError> { self.commit_batch()?; Ok(()) } fn process(&mut self, op: TableOperation) -> Result<(), BoxedError> { + self.latest_txid = op.id.map(|id| id.txid); match op.op { Operation::Insert { new } => { if self.table.engine == "CollapsingMergeTree" { @@ -205,8 +300,10 @@ impl Sink for ClickhouseSink { fn on_source_snapshotting_done( &mut self, _connection_name: String, - _id: Option, + id: Option, ) -> Result<(), BoxedError> { + self.latest_txid = id.map(|opid| opid.txid); + self.commit_batch()?; Ok(()) } @@ -219,6 +316,7 @@ impl Sink for ClickhouseSink { } fn get_latest_op_id(&mut self) -> Result, BoxedError> { + // self.get_latest_op() Ok(None) } }