From 8aa49fca5c53a2acec40e729506381dcf4b4342b Mon Sep 17 00:00:00 2001 From: Antoine Pultier <45740+fungiboletus@users.noreply.github.com> Date: Sat, 20 Jan 2024 20:42:36 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20=F0=9F=8C=88=20Work=20in=20progress?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduction of more inference, broken type guessing, stupid exotic crypto, and bugs. --- Cargo.lock | 667 +++++++++++++++++- Cargo.toml | 11 +- src/bus/utils.rs | 60 ++ src/config.rs | 120 ++++ src/http/server.rs | 56 +- src/importers/csv.rs | 8 +- src/infer/columns.rs | 74 +- src/infer/datagrid.rs | 4 + src/infer/datetime_guesser.rs | 302 +++++++- src/infer/geo_guesser.rs | 241 +++++++ src/infer/is_header.rs | 41 ++ src/infer/mod.rs | 10 +- src/infer/{infer.rs => parsing.rs} | 73 +- src/infer/uuid.rs | 103 +++ src/main.rs | 32 +- src/name_to_uuid.rs | 101 +++ src/storage/duckdb/mod.rs | 0 src/storage/mod.rs | 1 + .../sqlite/migrations/20240110093153_init.sql | 45 +- src/storage/sqlite/sqlite.rs | 52 +- 20 files changed, 1926 insertions(+), 75 deletions(-) create mode 100644 src/config.rs create mode 100644 src/infer/datagrid.rs create mode 100644 src/infer/geo_guesser.rs create mode 100644 src/infer/is_header.rs rename src/infer/{infer.rs => parsing.rs} (89%) create mode 100644 src/infer/uuid.rs create mode 100644 src/name_to_uuid.rs create mode 100644 src/storage/duckdb/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 53cd773..db0f223 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -35,6 +35,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77c3a9648d43b9cd48db467b3f87fdd6e146bcc88ab0180006cef2179fe11d01" dependencies = [ "cfg-if", + "const-random", "getrandom", "once_cell", "version_check", @@ -116,12 +117,110 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf7d0a018de4f6aa429b9d33d69edf69072b1c5b1cb8d3e4a5f7ef898fc3eb76" +[[package]] +name = "arrayref" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b4930d2cb77ce62f89ee5d5289b4ac049559b1c45539271f5ed4fdc7db34545" + [[package]] name = "arrayvec" version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +[[package]] +name = "arrow" +version = "49.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bc25126d18a012146a888a0298f2c22e1150327bd2765fc76d710a556b2d614" +dependencies = [ + "ahash 0.8.7", + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-ord", + "arrow-row", + "arrow-schema", + "arrow-select", + "arrow-string", +] + +[[package]] +name = "arrow-arith" +version = "49.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34ccd45e217ffa6e53bbb0080990e77113bdd4e91ddb84e97b77649810bcf1a7" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "half", + "num", +] + +[[package]] +name = "arrow-array" +version = "49.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bda9acea48b25123c08340f3a8ac361aa0f74469bb36f5ee9acf923fce23e9d" +dependencies = [ + "ahash 0.8.7", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "half", + "hashbrown 0.14.3", + "num", +] + +[[package]] +name = "arrow-buffer" +version = "49.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01a0fc21915b00fc6c2667b069c1b64bdd920982f426079bc4a7cab86822886c" +dependencies = [ + "bytes", + "half", + "num", +] + +[[package]] +name = "arrow-cast" +version = "49.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dc0368ed618d509636c1e3cc20db1281148190a78f43519487b2daf07b63b4a" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "base64 0.21.7", + "chrono", + "comfy-table", + "half", + "lexical-core", + "num", +] + +[[package]] +name = "arrow-data" +version = "49.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "907fafe280a3874474678c1858b9ca4cb7fd83fb8034ff5b6d6376205a08c634" +dependencies = [ + "arrow-buffer", + "arrow-schema", + "half", + "num", +] + [[package]] name = "arrow-format" version = "0.8.1" @@ -132,6 +231,75 @@ dependencies = [ "serde", ] +[[package]] +name = "arrow-ord" +version = "49.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b23b0e53c0db57c6749997fd343d4c0354c994be7eca67152dd2bdb9a3e1bb4" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "half", + "num", +] + +[[package]] +name = "arrow-row" +version = "49.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "361249898d2d6d4a6eeb7484be6ac74977e48da12a4dd81a708d620cc558117a" +dependencies = [ + "ahash 0.8.7", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "half", + "hashbrown 0.14.3", +] + +[[package]] +name = "arrow-schema" +version = "49.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09e28a5e781bf1b0f981333684ad13f5901f4cd2f20589eab7cf1797da8fc167" +dependencies = [ + "bitflags 2.4.1", +] + +[[package]] +name = "arrow-select" +version = "49.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f6208466590960efc1d2a7172bc4ff18a67d6e25c529381d7f96ddaf0dc4036" +dependencies = [ + "ahash 0.8.7", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "num", +] + +[[package]] +name = "arrow-string" +version = "49.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a48149c63c11c9ff571e50ab8f017d2a7cb71037a882b42f6354ed2da9acc7" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "num", + "regex", + "regex-syntax 0.8.2", +] + [[package]] name = "async-broadcast" version = "0.6.0" @@ -229,6 +397,7 @@ dependencies = [ "matchit", "memchr", "mime", + "multer", "percent-encoding", "pin-project-lite", "rustversion", @@ -280,6 +449,12 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "base64" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" + [[package]] name = "base64" version = "0.21.7" @@ -319,6 +494,19 @@ dependencies = [ "wyz", ] +[[package]] +name = "blake3" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0231f06152bf547e9c2b5194f247cd97aacf6dcd8b15d8e5ec0663f64580da87" +dependencies = [ + "arrayref", + "arrayvec", + "cc", + "cfg-if", + "constant_time_eq", +] + [[package]] name = "block-buffer" version = "0.10.4" @@ -390,6 +578,17 @@ version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" +[[package]] +name = "byte-unit" +version = "5.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbda27216be70d08546aa506cecabce0c5eb0d494aaaedbd7ec82c8ae1a60b46" +dependencies = [ + "rust_decimal", + "serde", + "utf8-width", +] + [[package]] name = "bytecheck" version = "0.6.11" @@ -480,6 +679,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a4f925191b4367301851c6d99b09890311d74b0d43f274c0b34c86d308a3663" +[[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + [[package]] name = "cc" version = "1.0.83" @@ -538,12 +743,82 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "config" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23738e11972c7643e4ec947840fc463b6a571afcd3e735bdfce7d03c7a784aca" +dependencies = [ + "async-trait", + "json5", + "lazy_static", + "nom", + "pathdiff", + "ron", + "rust-ini", + "serde", + "serde_json", + "toml", + "yaml-rust", +] + +[[package]] +name = "confique" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c37945ed2efccb10339a12eea282a5af9ebac77720d088723b2bbbdc44eca964" +dependencies = [ + "confique-macro", + "json5", + "serde", + "serde_yaml", + "toml", +] + +[[package]] +name = "confique-macro" +version = "0.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3821efdaaab3c5297054a90201cc3afa2061fc6ba2bc5d2fa558b850a7dabefe" +dependencies = [ + "heck 0.3.3", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "const-oid" version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "const-random" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aaf16c9c2c612020bcfd042e170f6e32de9b9d75adb5277cdbbd2e2c8c8299a" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom", + "once_cell", + "tiny-keccak", +] + +[[package]] +name = "constant_time_eq" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7144d30dcf0fafbce74250a3963025d8d52177934239851c917d29f1df280c2" + [[package]] name = "core-foundation-sys" version = "0.8.6" @@ -654,6 +929,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "crunchy" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" + [[package]] name = "crypto-common" version = "0.1.6" @@ -746,12 +1027,36 @@ dependencies = [ "subtle", ] +[[package]] +name = "dlv-list" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0688c2a7f92e427f44895cd63841bff7b29f8d7a1648b9e7e07a4a365b2e1257" + [[package]] name = "dotenvy" version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" +[[package]] +name = "duckdb" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "326e8f84acb4d57c4025637f77e89dc3eee0e25b3a79c21cfd8b72db5ecd3c97" +dependencies = [ + "arrow", + "cast", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink", + "libduckdb-sys", + "memchr", + "rust_decimal", + "smallvec", + "strum", +] + [[package]] name = "dyn-clone" version = "1.0.16" @@ -777,6 +1082,15 @@ dependencies = [ "serde", ] +[[package]] +name = "encoding_rs" +version = "0.8.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7268b386296a025e474d5140678f75d6de9493ae55a5d709eeb9dd08149945e1" +dependencies = [ + "cfg-if", +] + [[package]] name = "enum_dispatch" version = "0.3.12" @@ -849,6 +1163,18 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "fallible-iterator" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fast-float" version = "0.2.0" @@ -861,6 +1187,18 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" +[[package]] +name = "filetime" +version = "0.2.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ee447700ac8aa0b2f2bd7bc4462ad686ba06baa6727ac149a2d6277f0d240fd" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "windows-sys 0.52.0", +] + [[package]] name = "finl_unicode" version = "1.2.0" @@ -1113,6 +1451,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "half" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc52e53916c08643f1b56ec082790d1e86a32e58dc5268f897f313fbae7b4872" +dependencies = [ + "cfg-if", + "crunchy", + "num-traits", +] + [[package]] name = "hash32" version = "0.2.1" @@ -1174,6 +1523,15 @@ dependencies = [ "stable_deref_trait", ] +[[package]] +name = "heck" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "heck" version = "0.4.1" @@ -1455,6 +1813,17 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "json5" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96b0db21af676c1ce64250b5f40f3ce2cf27e4e47cb91ed91eb6fe9350b430c1" +dependencies = [ + "pest", + "pest_derive", + "serde", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -1473,6 +1842,8 @@ dependencies = [ "lexical-parse-float", "lexical-parse-integer", "lexical-util", + "lexical-write-float", + "lexical-write-integer", ] [[package]] @@ -1505,12 +1876,48 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "lexical-write-float" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accabaa1c4581f05a3923d1b4cfd124c329352288b7b9da09e766b0668116862" +dependencies = [ + "lexical-util", + "lexical-write-integer", + "static_assertions", +] + +[[package]] +name = "lexical-write-integer" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1b6f3d1f4422866b68192d62f77bc5c700bee84f3069f2469d7bc8c77852446" +dependencies = [ + "lexical-util", + "static_assertions", +] + [[package]] name = "libc" version = "0.2.152" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7" +[[package]] +name = "libduckdb-sys" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "666dbbb7ac31fc49fc2535df5d6efe9ce09815783a0ad98eea3564d2f0223974" +dependencies = [ + "autocfg", + "flate2", + "pkg-config", + "serde", + "serde_json", + "tar", + "vcpkg", +] + [[package]] name = "libm" version = "0.2.8" @@ -1528,6 +1935,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "linked-hash-map" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" + [[package]] name = "linux-raw-sys" version = "0.4.12" @@ -1652,6 +2065,24 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "multer" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a15d522be0a9c3e46fd2632e272d178f56387bdb5c9fbb3a36c649062e9b5219" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http", + "httparse", + "log", + "memchr", + "mime", + "spin 0.9.8", + "version_check", +] + [[package]] name = "multiversion" version = "0.7.3" @@ -1723,6 +2154,31 @@ dependencies = [ "winapi", ] +[[package]] +name = "num" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b05180d69e3da0e530ba2a1dae5110317e49e3b7f3d41be227dc5f92e49ee7af" +dependencies = [ + "num-bigint", + "num-complex", + "num-integer", + "num-iter", + "num-rational", + "num-traits", +] + +[[package]] +name = "num-bigint" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "608e7659b5c3d7cba262d894801b9ec9d00de989e8a82bd4bef91d08da45cdc0" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + [[package]] name = "num-bigint-dig" version = "0.8.4" @@ -1740,6 +2196,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "num-complex" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ba157ca0885411de85d6ca030ba7e2a83a28636056c7c699b07c8b6f7383214" +dependencies = [ + "num-traits", +] + [[package]] name = "num-integer" version = "0.1.45" @@ -1761,6 +2226,18 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-rational" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0638a1c9d0a3c0914158145bc76cff373a75a627e6ecbfb71cbe6f453a5a19b0" +dependencies = [ + "autocfg", + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.17" @@ -1796,6 +2273,16 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "ordered-multimap" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccd746e37177e1711c20dd619a1620f34f5c8b569c53590a72dedd5344d8924a" +dependencies = [ + "dlv-list", + "hashbrown 0.12.3", +] + [[package]] name = "overload" version = "0.1.1" @@ -1837,6 +2324,12 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" +[[package]] +name = "pathdiff" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8835116a5c179084a830efb3adc117ab007512b535bc1a21c991d3b32a6b44dd" + [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -1852,6 +2345,51 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pest" +version = "2.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f200d8d83c44a45b21764d1916299752ca035d15ecd46faca3e9a2a2bf6ad06" +dependencies = [ + "memchr", + "thiserror", + "ucd-trie", +] + +[[package]] +name = "pest_derive" +version = "2.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bcd6ab1236bbdb3a49027e920e693192ebfe8913f6d60e294de57463a493cfde" +dependencies = [ + "pest", + "pest_generator", +] + +[[package]] +name = "pest_generator" +version = "2.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a31940305ffc96863a735bef7c7994a00b325a7138fdbc5bda0f1a0476d3275" +dependencies = [ + "pest", + "pest_meta", + "proc-macro2", + "quote", + "syn 2.0.48", +] + +[[package]] +name = "pest_meta" +version = "2.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7ff62f5259e53b78d1af898941cdcdccfae7385cf7d793a6e55de5d05bb4b7d" +dependencies = [ + "once_cell", + "pest", + "sha2", +] + [[package]] name = "pin-project" version = "1.1.3" @@ -2454,6 +2992,17 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cbf4a6aa5f6d6888f39e980649f3ad6b666acdce1d78e95b8a2cb076e687ae30" +[[package]] +name = "ron" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88073939a61e5b7680558e6be56b419e208420c2adb92be54921fa6b72283f1a" +dependencies = [ + "base64 0.13.1", + "bitflags 1.3.2", + "serde", +] + [[package]] name = "rsa" version = "0.9.6" @@ -2485,6 +3034,16 @@ dependencies = [ "smallvec", ] +[[package]] +name = "rust-ini" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6d5f2436026b4f6e79dc829837d467cc7e9a55ee40e750d716713540715a2df" +dependencies = [ + "cfg-if", + "ordered-multimap", +] + [[package]] name = "rust_decimal" version = "1.33.1" @@ -2518,9 +3077,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.28" +version = "0.38.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72e572a5e8ca657d7366229cdde4bd14c4eb5499a9573d4d366fe1b599daa316" +checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca" dependencies = [ "bitflags 2.4.1", "errno", @@ -2567,16 +3126,25 @@ dependencies = [ "async-broadcast", "async-trait", "axum", + "blake3", + "byte-unit", "cached", + "config", + "confique", "csv-async", + "duckdb", "futures", "geo", + "hex", "hifitime", "iso8601", "nom", "num-traits", + "once_cell", "polars", + "regex", "rust_decimal", + "serde", "serde_json", "sindit-senml", "sqlx", @@ -2643,6 +3211,19 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_yaml" +version = "0.9.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1bf28c79a99f70ee1f1d83d10c875d2e70618417fda01ad1785e027579d9d38" +dependencies = [ + "indexmap 2.1.0", + "itoa", + "ryu", + "serde", + "unsafe-libyaml", +] + [[package]] name = "sha1" version = "0.10.6" @@ -2705,7 +3286,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "36fd7adab1c6f40d6eda0cc8655f810c93e672efc7a4fcaa8700b50dd8becf3f" dependencies = [ - "base64", + "base64 0.21.7", "chrono", "once_cell", "regex", @@ -2882,7 +3463,7 @@ dependencies = [ "atomic-write-file", "dotenvy", "either", - "heck", + "heck 0.4.1", "hex", "once_cell", "proc-macro2", @@ -2906,7 +3487,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e37195395df71fd068f6e2082247891bc11e3289624bbc776a0cdfa1ca7f1ea4" dependencies = [ "atoi", - "base64", + "base64 0.21.7", "bitflags 2.4.1", "byteorder", "bytes", @@ -2948,7 +3529,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6ac0ac3b7ccd10cc96c7ab29791a7dd236bd94021f31eec7ba3d46a74aa1c24" dependencies = [ "atoi", - "base64", + "base64 0.21.7", "bitflags 2.4.1", "byteorder", "crc", @@ -3049,6 +3630,9 @@ name = "strum" version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" +dependencies = [ + "strum_macros", +] [[package]] name = "strum_macros" @@ -3056,7 +3640,7 @@ version = "0.25.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23dc1fa9ac9c169a78ba62f0b841814b7abae11bdd047b9c58f893439e309ea0" dependencies = [ - "heck", + "heck 0.4.1", "proc-macro2", "quote", "rustversion", @@ -3129,6 +3713,17 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" +[[package]] +name = "tar" +version = "0.4.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b16afcea1f22891c49a00c751c7b63b2233284064f11a200fc624137c51e2ddb" +dependencies = [ + "filetime", + "libc", + "xattr", +] + [[package]] name = "target-features" version = "0.1.5" @@ -3178,6 +3773,15 @@ dependencies = [ "once_cell", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinyvec" version = "1.6.0" @@ -3248,6 +3852,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "toml" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4f7f0dd8d50a853a531c426359045b1998f04219d88799810762cd4ad314234" +dependencies = [ + "serde", +] + [[package]] name = "toml_datetime" version = "0.6.5" @@ -3293,7 +3906,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09e12e6351354851911bdf8c2b8f2ab15050c567d70a8b9a37ae7b8301a4080d" dependencies = [ "async-compression", - "base64", + "base64 0.21.7", "bitflags 2.4.1", "bytes", "futures-util", @@ -3396,6 +4009,12 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "ucd-trie" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" + [[package]] name = "unicase" version = "2.7.0" @@ -3444,6 +4063,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" +[[package]] +name = "unsafe-libyaml" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab4c90930b95a82d00dc9e9ac071b4991924390d46cbd0dfe566148667605e4b" + [[package]] name = "url" version = "2.5.0" @@ -3461,6 +4086,12 @@ version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" +[[package]] +name = "utf8-width" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86bd8d4e895da8537e5315b8254664e6b769c4ff3db18321b297a1e7004392e3" + [[package]] name = "uuid" version = "1.6.1" @@ -3755,12 +4386,32 @@ dependencies = [ "tap", ] +[[package]] +name = "xattr" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8da84f1a25939b27f6820d92aed108f83ff920fdf11a7b19366c27c4cda81d4f" +dependencies = [ + "libc", + "linux-raw-sys", + "rustix", +] + [[package]] name = "xxhash-rust" version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53be06678ed9e83edb1745eb72efc0bbcd7b5c3c35711a860906aed827a13d61" +[[package]] +name = "yaml-rust" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85" +dependencies = [ + "linked-hash-map", +] + [[package]] name = "zerocopy" version = "0.7.32" diff --git a/Cargo.toml b/Cargo.toml index 535a6a5..7abaa99 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ edition = "2021" anyhow = "1.0" #async-stream = "0.3" async-trait = "0.1" -axum = { version = "0.7" } +axum = { version = "0.7", features = ["multipart"] } #axum-streams = { version = "0.12", features = ["json", "csv", "text"] } #bytes = "1.5" futures = "0.3" @@ -37,3 +37,12 @@ serde_json = "1.0" num-traits = "0.2.17" hifitime = "3.9.0" iso8601 = "0.6.1" +duckdb = "0.9.2" +config = "0.13.4" +serde = "1.0.195" +confique = "0.2.5" +once_cell = "1.19.0" +byte-unit = "5.1.3" +hex = "0.4.3" +blake3 = "1.5.0" +regex = "1.10.2" diff --git a/src/bus/utils.rs b/src/bus/utils.rs index 6c77cbb..3fcfd35 100644 --- a/src/bus/utils.rs +++ b/src/bus/utils.rs @@ -28,6 +28,7 @@ impl WaitForAll { let mut nb_started = self.nb_started.lock().await; *nb_started += 1; } + let nb_started_clone = self.nb_started.clone(); let nb_finished_clone = self.nb_finished.clone(); let finished_sender_clone = self.finished_sender.clone(); @@ -38,6 +39,13 @@ impl WaitForAll { *nb_finished += 1; } if !finished_sender_clone.is_closed() && finished_sender_clone.receiver_count() > 0 { + { + let nb_started = nb_started_clone.lock().await; + let nb_finished = nb_finished_clone.lock().await; + if *nb_started != *nb_finished { + return; + } + } let _ = finished_sender_clone.broadcast(()).await; } }); @@ -55,3 +63,55 @@ impl WaitForAll { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_wait_for_all() { + let mut wfa = WaitForAll::new(); + + let (s1, r1) = async_broadcast::broadcast(1); + let (s2, r2) = async_broadcast::broadcast(1); + + wfa.add(r1).await; + wfa.add(r2).await; + + let s2_clone = s2.clone(); + + tokio::spawn(async move { + println!("Waiting for s1"); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + println!("Broadcasting s1"); + s1.broadcast(()).await.unwrap(); + }); + + tokio::spawn(async move { + println!("Waiting for s2"); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + println!("Broadcasting s2"); + s2.broadcast(()).await.unwrap(); + }); + + println!("Waiting for all"); + wfa.wait().await.unwrap(); + println!("done"); + + // Should return fast since it's already finished + wfa.wait().await.unwrap(); + + // What happens now ? + assert!(s2_clone.broadcast(()).await.is_err()); + } + + #[tokio::test] + async fn test_without_waiting() { + let mut wfa = WaitForAll::new(); + + let (s1, r1) = async_broadcast::broadcast(1); + wfa.add(r1).await; + s1.broadcast(()).await.unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } +} diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..7ddd0de --- /dev/null +++ b/src/config.rs @@ -0,0 +1,120 @@ +use anyhow::Error; +use confique::Config; +use once_cell::sync::OnceCell; +use std::{net::IpAddr, sync::Arc}; + +#[derive(Debug, Config)] +pub struct SensAppConfig { + #[config(env = "SENSAPP_PORT", default = 3000)] + pub port: u16, + #[config(env = "SENSAPP_ENDPOINT", default = "127.0.0.1")] + pub endpoint: IpAddr, + + #[config(env = "SENSAPP_HTTP_BODY_LIMIT", default = "10mb")] + pub http_body_limit: String, + + #[config(env = "SENSAPP_MAX_INFERENCES_ROWS", default = 128)] + pub max_inference_rows: usize, + + #[config(env = "SENSAPP_BATCH_SIZE", default = 8192)] + pub batch_size: usize, +} + +impl SensAppConfig { + pub fn load() -> Result { + let c = SensAppConfig::builder() + .env() + .file("settings.toml") + .load()?; + + Ok(c) + } + + pub fn parse_http_body_limit(&self) -> Result { + let size = byte_unit::Byte::parse_str(self.http_body_limit.clone(), true)?.as_u64(); + if size > 128 * 1024 * 1024 * 1024 { + anyhow::bail!("Body size is too big: > 128GB"); + } + Ok(size as usize) + } +} + +static SENSAPP_CONFIG: OnceCell> = OnceCell::new(); + +pub fn set(config: Arc) -> Result<(), Error> { + match SENSAPP_CONFIG.set(config) { + Ok(_) => Ok(()), + Err(e) => Err(Error::msg(format!("Failed to set configuration: {:?}", e))), + } +} + +pub fn get() -> Result, Error> { + SENSAPP_CONFIG.get().cloned().ok_or_else(|| { + Error::msg( + "Configuration not loaded. Please call load_configuration() before using the configuration", + ) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_load_config() { + let config = SensAppConfig::load().unwrap(); + + assert_eq!(config.port, 3000); + assert_eq!(config.endpoint, IpAddr::from([127, 0, 0, 1])); + + // set env PORT + std::env::set_var("SENSAPP_PORT", "8080"); + let config = SensAppConfig::load().unwrap(); + assert_eq!(config.port, 8080); + } + + #[test] + fn test_parse_http_body_limit() { + let config = SensAppConfig::load().unwrap(); + assert_eq!(config.parse_http_body_limit().unwrap(), 10000000); + + std::env::set_var("SENSAPP_HTTP_BODY_LIMIT", "12345"); + let config = SensAppConfig::load().unwrap(); + assert_eq!(config.parse_http_body_limit().unwrap(), 12345); + + std::env::set_var("SENSAPP_HTTP_BODY_LIMIT", "10m"); + let config = SensAppConfig::load().unwrap(); + assert_eq!(config.parse_http_body_limit().unwrap(), 10000000); + + std::env::set_var("SENSAPP_HTTP_BODY_LIMIT", "10mb"); + let config = SensAppConfig::load().unwrap(); + assert_eq!(config.parse_http_body_limit().unwrap(), 10000000); + + std::env::set_var("SENSAPP_HTTP_BODY_LIMIT", "10MiB"); + let config = SensAppConfig::load().unwrap(); + assert_eq!(config.parse_http_body_limit().unwrap(), 10485760); + + std::env::set_var("SENSAPP_HTTP_BODY_LIMIT", "1.5gb"); + let config = SensAppConfig::load().unwrap(); + assert_eq!(config.parse_http_body_limit().unwrap(), 1500000000); + + std::env::set_var("SENSAPP_HTTP_BODY_LIMIT", "1tb"); + let config = SensAppConfig::load().unwrap(); + assert!(config.parse_http_body_limit().is_err()); + + std::env::set_var("SENSAPP_HTTP_BODY_LIMIT", "-5mb"); + let config = SensAppConfig::load().unwrap(); + assert!(config.parse_http_body_limit().is_err()); + } + + #[test] + fn test_set_get() { + assert!(SENSAPP_CONFIG.get().is_none()); + let config = SensAppConfig::load().unwrap(); + set(Arc::new(config)).unwrap(); + assert!(SENSAPP_CONFIG.get().is_some()); + + let config = get().unwrap(); + assert_eq!(config.port, 3000); + } +} diff --git a/src/http/server.rs b/src/http/server.rs index a247730..be78224 100644 --- a/src/http/server.rs +++ b/src/http/server.rs @@ -1,5 +1,11 @@ +use super::state::HttpServerState; +use crate::config; +use crate::importers::csv::publish_csv_async; +use crate::name_to_uuid::name_to_uuid; use anyhow::Result; use axum::extract::DefaultBodyLimit; +use axum::extract::Multipart; +use axum::extract::Path; use axum::extract::State; use axum::http::header; use axum::http::StatusCode; @@ -28,9 +34,6 @@ use tower_http::trace; use tower_http::{timeout::TimeoutLayer, trace::TraceLayer, ServiceBuilderExt}; use tracing::Level; -use super::state::HttpServerState; -use crate::importers::csv::publish_csv_async; - // Anyhow error handling with axum // https://github.com/tokio-rs/axum/blob/d3112a40d55f123bc5e65f995e2068e245f12055/examples/anyhow-error-response/src/main.rs struct AppError(anyhow::Error); @@ -53,6 +56,8 @@ where } pub async fn run_http_server(state: HttpServerState, address: SocketAddr) -> Result<()> { + let max_body_layer = DefaultBodyLimit::max(config::get()?.parse_http_body_limit()?); + // Initialize tracing tracing_subscriber::fmt() .with_target(false) @@ -80,10 +85,17 @@ pub async fn run_http_server(state: HttpServerState, address: SocketAddr) -> Res .route("/", get(handler)) .route( "/publish", - post(publish_handler).layer(DefaultBodyLimit::max(1024 * 1024 * 1024)), + post(publish_handler).layer(max_body_layer.clone()), ) //.route("/publish_stream", post(publish_stream_handler)) - .route("/publish_csv", post(publish_csv)) + .route( + "/sensors/:sensor_name_or_uuid/publish_csv", + post(publish_csv), + ) + .route( + "/sensors/:sensor_name_or_uuid/publish_multipart", + post(publish_multipart).layer(max_body_layer.clone()), + ) .route("/fail", get(test_fail)) .layer(middleware) .with_state(state); @@ -106,8 +118,11 @@ async fn handler(State(state): State) -> Result, A async fn publish_csv( State(state): State, + Path(sensor_name_or_uuid): Path, body: axum::body::Body, ) -> Result { + // let uuid = name_to_uuid(sensor_name_or_uuid.as_str())?; + // Convert the body in a stream let stream = body.into_data_stream(); let stream = stream.map_err(|err| io::Error::new(io::ErrorKind::Other, err)); let reader = stream.into_async_read(); @@ -155,3 +170,34 @@ async fn publish_handler(bytes: Bytes) -> Result, (StatusCode, Stri )), } } + +async fn publish_multipart(mut multipart: Multipart) -> Result, (StatusCode, String)> { + Ok(Json("ok".to_string())) +} + +#[cfg(test)] +mod tests { + use axum::{body::Body, http::Request}; + use tower::ServiceExt; + + use super::*; + use crate::bus::EventBus; + + #[tokio::test] + async fn test_handler() { + let state = HttpServerState { + name: "hello world".to_string(), + event_bus: Arc::new(EventBus::init("test".to_string())), + }; + let app = Router::new().route("/", get(handler)).with_state(state); + let request = Request::builder().uri("/").body(Body::empty()).unwrap(); + + let response = app.oneshot(request).await.unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + use axum::body::to_bytes; + let body_str = + String::from_utf8(to_bytes(response.into_body(), 128).await.unwrap().to_vec()).unwrap(); + assert_eq!(body_str, "\"hello world\""); + } +} diff --git a/src/importers/csv.rs b/src/importers/csv.rs index d44c499..01b1233 100644 --- a/src/importers/csv.rs +++ b/src/importers/csv.rs @@ -18,7 +18,7 @@ pub async fn publish_csv_async( let mut current_samples: Vec> = vec![]; - let mut toto = WaitForAll::new(); + let mut all_batches_waiter = WaitForAll::new(); let mut i = 0; @@ -43,7 +43,7 @@ pub async fn publish_csv_async( let sync_receiver = event_bus.publish(batch).await?; //sync_receiver.activate().recv().await?; current_samples = vec![]; - toto.add(sync_receiver.activate()).await; + all_batches_waiter.add(sync_receiver.activate()).await; } } @@ -54,11 +54,11 @@ pub async fn publish_csv_async( samples: Arc::new(TypedSamples::Integer(current_samples)), }; let sync_receiver = event_bus.publish(batch).await?; - toto.add(sync_receiver.activate()).await; + all_batches_waiter.add(sync_receiver.activate()).await; } // Wololo ?? - toto.wait().await?; + all_batches_waiter.wait().await?; println!("Done reading CSV"); Ok(()) diff --git a/src/infer/columns.rs b/src/infer/columns.rs index 7b7b1c3..9ca8e50 100644 --- a/src/infer/columns.rs +++ b/src/infer/columns.rs @@ -1,4 +1,4 @@ -use super::infer::*; +use super::parsing::*; use std::sync::Arc; #[derive(Debug, Clone, PartialEq)] @@ -8,8 +8,8 @@ pub enum InferedColumn { Float(Vec), String(Vec), Boolean(Vec), - JSON(Vec>), - //DateTime(Vec), + DateTime(Vec), + Json(Vec>), } pub fn infer_column(column: Vec, trim: bool, numeric: bool) -> InferedColumn { @@ -35,8 +35,9 @@ pub fn infer_column(column: Vec, trim: bool, numeric: bool) -> InferedCo let mut has_numeric = false; let mut has_floats = false; let mut has_string = false; - let mut has_boolean = false; let mut has_json = false; + let mut has_boolean = false; + let mut has_datetime = false; for infered_value in infered_column.iter() { match infered_value { @@ -44,8 +45,9 @@ pub fn infer_column(column: Vec, trim: bool, numeric: bool) -> InferedCo Ok((_, InferedValue::Numeric(_))) => has_numeric = true, Ok((_, InferedValue::Float(_))) => has_floats = true, Ok((_, InferedValue::String(_))) => has_string = true, - Ok((_, InferedValue::JSON(_))) => has_json = true, + Ok((_, InferedValue::Json(_))) => has_json = true, Ok((_, InferedValue::Boolean(_))) => has_boolean = true, + Ok((_, InferedValue::DateTime(_))) => has_datetime = true, _ => panic!("Failed to infer column"), } } @@ -57,11 +59,11 @@ pub fn infer_column(column: Vec, trim: bool, numeric: bool) -> InferedCo } if has_json { - return InferedColumn::JSON( + return InferedColumn::Json( infered_column .iter() .map(|value| match value { - Ok((_, InferedValue::JSON(value))) => value.clone(), + Ok((_, InferedValue::Json(value))) => value.clone(), // Convert the other types to JSON, to be nice Ok((_, InferedValue::Integer(value))) => { Arc::new(serde_json::Value::from(*value)) @@ -72,6 +74,9 @@ pub fn infer_column(column: Vec, trim: bool, numeric: bool) -> InferedCo Ok((_, InferedValue::Boolean(value))) => { Arc::new(serde_json::Value::from(*value)) } + Ok((_, InferedValue::DateTime(value))) => { + Arc::new(serde_json::Value::from(value.to_rfc3339())) + } _ => unreachable!("We should have only JSON compatible types at this point"), }) .collect::>(), @@ -81,7 +86,7 @@ pub fn infer_column(column: Vec, trim: bool, numeric: bool) -> InferedCo // If we have booleans if has_boolean { // If we don't have only booleans, we use string instead - if has_integers || has_numeric || has_floats { + if has_integers || has_numeric || has_floats || has_datetime { return InferedColumn::String(column); } return InferedColumn::Boolean( @@ -95,6 +100,23 @@ pub fn infer_column(column: Vec, trim: bool, numeric: bool) -> InferedCo ); } + // If we have datetimes + if has_datetime { + // If we don't have only datetimes, we use string instead + if has_integers || has_numeric || has_floats { + return InferedColumn::String(column); + } + return InferedColumn::DateTime( + infered_column + .iter() + .map(|value| match value { + Ok((_, InferedValue::DateTime(value))) => *value, + _ => unreachable!("We should have only datetimes at this point"), + }) + .collect::>(), + ); + } + // If we have numerics if has_numeric { return InferedColumn::Numeric( @@ -135,7 +157,11 @@ pub fn infer_column(column: Vec, trim: bool, numeric: bool) -> InferedCo ); } - unreachable!("failed to infer column"); + // If we reach this point, the column is supposdly empty + // Then we use Integer as a fallback + assert!(infered_column.is_empty()); + + InferedColumn::Integer(vec![]) } #[cfg(test)] @@ -270,7 +296,7 @@ mod tests { let infered_column = infer_column(column, true, false); assert_eq!( infered_column, - InferedColumn::JSON(vec![ + InferedColumn::Json(vec![ Arc::new(json!({"a": 1})), Arc::new(json!([{"b": 2}])), Arc::new(json!({"c": true})), @@ -287,17 +313,43 @@ mod tests { "42".to_string(), "42.83".to_string(), "true".to_string(), + "1951-10-26T00:00:00+02:00".to_string(), ]; let infered_column = infer_column(column, true, false); assert_eq!( infered_column, - InferedColumn::JSON(vec![ + InferedColumn::Json(vec![ Arc::new(json!({"a": 1})), Arc::new(json!([{"b": 2}])), Arc::new(json!(42)), Arc::new(json!(42.83)), Arc::new(json!(true)), + // Date is converted to UTC + Arc::new(json!("1951-10-26T02:00:00+00:00")), + ]) + ); + } + + #[test] + fn test_infer_column_empty() { + let infered_column = infer_column(vec![], true, false); + assert_eq!(infered_column, InferedColumn::Integer(vec![])); + } + + #[test] + fn test_datetime() { + let column = vec![ + "2020-01-01T00:00:00Z".to_string(), + "1969-358T14:21:32.0933+05:35".to_string(), + ]; + + let infered_column = infer_column(column, true, false); + assert_eq!( + infered_column, + InferedColumn::DateTime(vec![ + hifitime::Epoch::from_gregorian_utc(2020, 1, 1, 0, 0, 0, 0), + hifitime::Epoch::from_gregorian_utc(1969, 12, 24, 19, 56, 32, 93000000) ]) ); } diff --git a/src/infer/datagrid.rs b/src/infer/datagrid.rs new file mode 100644 index 0000000..72ba911 --- /dev/null +++ b/src/infer/datagrid.rs @@ -0,0 +1,4 @@ +pub struct DataGrid { + pub column_names: Vec, + +} diff --git a/src/infer/datetime_guesser.rs b/src/infer/datetime_guesser.rs index ed976ba..6beab35 100644 --- a/src/infer/datetime_guesser.rs +++ b/src/infer/datetime_guesser.rs @@ -1,4 +1,5 @@ use super::columns::InferedColumn; +use rust_decimal::Decimal; pub fn is_i64_likely_timestamp(value: i64) -> bool { // Between 2000-01-01 and 2118-01-01 @@ -11,9 +12,9 @@ pub fn is_f64_likely_timestamp(value: f64) -> bool { (946684800.0..=4670438400.0).contains(&value) } -pub fn is_decimal_likely_timestamp(value: rust_decimal::Decimal) -> bool { - let from = rust_decimal::Decimal::from(946684800i64); - let to = rust_decimal::Decimal::from(4670438400i64); +pub fn is_decimal_likely_timestamp(value: Decimal) -> bool { + let from = Decimal::from(946684800i64); + let to = Decimal::from(4670438400i64); value >= from && value <= to } @@ -23,7 +24,7 @@ pub fn is_decimal_likely_timestamp(value: rust_decimal::Decimal) -> bool { // This is more an helper function than something a production // system should rely on. Of course production systems may // rely on this, so the numbers should probably not be changed. -fn datetime_guesser(column_name: &str, column: &InferedColumn) -> isize { +pub fn datetime_guesser(column_name: &str, column: &InferedColumn) -> isize { let lowercase_column_name = column_name.to_lowercase(); let mut sum = 0_isize; sum += match lowercase_column_name.as_str() { @@ -52,6 +53,7 @@ fn datetime_guesser(column_name: &str, column: &InferedColumn) -> isize { } } sum += match column { + InferedColumn::DateTime(_) => 100, InferedColumn::Integer(values) => { // If all values are likely timestamps, it's likely a datetime column if values.iter().all(|value| is_i64_likely_timestamp(*value)) { @@ -81,24 +83,308 @@ fn datetime_guesser(column_name: &str, column: &InferedColumn) -> isize { } InferedColumn::Boolean(_) => -80, InferedColumn::String(_) => -100, - InferedColumn::JSON(_) => -128, + InferedColumn::Json(_) => -128, }; sum } pub fn likely_datetime_column( - column_names: &Vec, - columns: &Vec, + column_names: &[String], + columns: &[InferedColumn], ) -> Option { let best_candidate = column_names .iter() .zip(columns.iter()) .map(|(column_name, column)| (column_name, datetime_guesser(column_name, column))) + .filter(|(_, score)| *score > 0) .max_by_key(|(_, score)| *score); match best_candidate { - Some((column_name, score)) if score > 0 => Some(column_name.clone()), + Some((column_name, _)) => Some(column_name.clone()), _ => None, } } + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Arc; + + #[test] + fn test_is_i64_likely_timestamp() { + assert!(!is_i64_likely_timestamp(0)); + // current timestamp + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() as i64; + assert!(is_i64_likely_timestamp(now)); + assert!(!is_i64_likely_timestamp(-now)); + assert!(is_i64_likely_timestamp(946684801)); + assert!(is_i64_likely_timestamp(4670438400)); + assert!(!is_i64_likely_timestamp(2093009830983097)); + } + + #[test] + fn test_is_f64_likely_timestamp() { + assert!(!is_f64_likely_timestamp(0.0)); + // current timestamp + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() as f64; + assert!(is_f64_likely_timestamp(now)); + assert!(!is_f64_likely_timestamp(-now)); + assert!(is_f64_likely_timestamp(946684801.0)); + assert!(is_f64_likely_timestamp(4670438400.0)); + assert!(!is_f64_likely_timestamp(2093009830983097.0)); + } + + #[test] + fn test_is_decimal_likely_timestamp() { + assert!(!is_decimal_likely_timestamp(Decimal::new(0, 0))); + // current timestamp + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() as i64; + assert!(is_decimal_likely_timestamp(Decimal::new(now, 0))); + assert!(!is_decimal_likely_timestamp(Decimal::new(-now, 0))); + assert!(is_decimal_likely_timestamp(Decimal::new(946684801, 0))); + assert!(is_decimal_likely_timestamp(Decimal::new(4670438400, 0))); + assert!(!is_decimal_likely_timestamp(Decimal::new( + 2093009830983097, + 0 + ))); + } + + #[test] + fn test_datetime_guesser() { + assert_eq!( + datetime_guesser("datetime", &InferedColumn::DateTime(vec![])), + 200 + ); + assert_eq!( + datetime_guesser("timestamp", &InferedColumn::DateTime(vec![])), + 199 + ); + assert_eq!( + datetime_guesser("date", &InferedColumn::Integer(vec![])), + 92 + ); + assert_eq!( + datetime_guesser("time", &InferedColumn::Integer(vec![0, 946684801])), + 51 + ); + assert_eq!( + datetime_guesser("created_at", &InferedColumn::Float(vec![0.0, 946684801.0])), + 39 + ); + assert_eq!( + datetime_guesser("updated_at", &InferedColumn::Float(vec![946684801.0])), + 69 + ); + assert_eq!( + datetime_guesser( + "recorded_at", + &InferedColumn::Numeric(vec![Decimal::new(946684801, 0)]) + ), + 78 + ); + assert_eq!( + datetime_guesser( + "date_of_creation", + &InferedColumn::Numeric(vec![Decimal::new(2024, 0)]) + ), + 12 + ); + assert_eq!( + datetime_guesser("sensor_time_ok", &InferedColumn::Boolean(vec![false])), + -71 + ); + assert_eq!( + datetime_guesser( + "sensor_name", + &InferedColumn::String(vec!["toto".to_string()]) + ), + -100 + ); + assert_eq!( + datetime_guesser( + "sensor_format", + &InferedColumn::Json(vec![Arc::new(serde_json::json!({"toto": true}))]) + ), + -124 + ); + } + + #[test] + fn test_likely_datetime_column() { + assert_eq!( + likely_datetime_column( + &["timestamp".to_string(), "value".to_string()], + &[ + InferedColumn::DateTime(vec![ + hifitime::Epoch::from_unix_seconds(0.0), + hifitime::Epoch::from_unix_seconds(1.0), + hifitime::Epoch::from_unix_seconds(2.0), + ]), + InferedColumn::Float(vec![0.0, 1.0, 2.0]) + ] + ), + Some("timestamp".to_string()) + ); + assert_eq!( + likely_datetime_column( + &["date".to_string(), "time".to_string()], + &[ + InferedColumn::Integer(vec![1, 2]), + InferedColumn::Integer(vec![946684801, 4670438400]) + ] + ), + Some("time".to_string()) + ); + assert_eq!( + likely_datetime_column( + &["created_at".to_string(), "content".to_string()], + &[ + InferedColumn::DateTime(vec![ + hifitime::Epoch::from_unix_seconds(0.0), + hifitime::Epoch::from_unix_seconds(1.0), + ]), + InferedColumn::String(vec!["abc".to_string(), "def".to_string(),]) + ] + ), + Some("created_at".to_string()) + ); + // No columns + assert_eq!(likely_datetime_column(&[], &[]), None,); + + // No datetime column + assert_eq!( + likely_datetime_column( + &["name".to_string(), "content".to_string()], + &[ + InferedColumn::String(vec!["abc".to_string(), "def".to_string(),]), + InferedColumn::String(vec!["ghi".to_string(), "jkl".to_string(),]) + ] + ), + None + ); + } + + #[test] + fn test_likely_datetime_column_car1() { + assert_eq!( + likely_datetime_column( + &[ + "Time".to_string(), + "Lat".to_string(), + "Lon".to_string(), + "Bearing".to_string(), + "GPS Speed".to_string(), + "Revs(rpm)".to_string(), + "Speed(km/h)".to_string(), + "LPK(l/100km)".to_string(), + "CO₂(g/km)".to_string(), + "Coolant(°C)".to_string(), + "Baro(mb)".to_string(), + "GPS Height(m)".to_string(), + ], + &[ + InferedColumn::Integer(vec![1344450050621, 1344450050774]), + InferedColumn::Float(vec![60.0, 60.0]), + InferedColumn::Float(vec![10.0, 10.0]), + InferedColumn::Float(vec![0.0, 0.0]), + InferedColumn::Float(vec![0.0, 0.0]), + InferedColumn::Float(vec![783.0, 783.0]), + InferedColumn::Float(vec![0.0, 0.0]), + InferedColumn::Float(vec![0.0, 0.0]), + InferedColumn::Float(vec![0.0, 0.0]), + InferedColumn::Float(vec![58.0, 58.0]), + InferedColumn::Float(vec![998.12, 998.02]), + InferedColumn::Float(vec![200.0, 200.0]), + ] + ), + Some("Time".to_string()) + ); + } + + #[test] + fn test_likely_datetime_column_ebike1() { + assert_eq!( + likely_datetime_column( + &[ + "time".to_string(), + "fix".to_string(), + "sats".to_string(), + "latitute".to_string(), + "longitude".to_string(), + "sonar".to_string(), + "alt".to_string(), + "gps_alt".to_string(), + "speed".to_string(), + "CRS".to_string(), + "roll".to_string(), + "pitch".to_string(), + "yaw".to_string(), + ], + &[ + InferedColumn::Integer(vec![0, 100]), + InferedColumn::Integer(vec![1, 1]), + InferedColumn::Integer(vec![6, 6]), + InferedColumn::Float(vec![60.0, 60.0]), + InferedColumn::Float(vec![10.0, 10.0]), + InferedColumn::Integer(vec![0, 0]), + InferedColumn::Float(vec![207.95, 207.95]), + InferedColumn::Float(vec![204.53, 204.53]), + InferedColumn::Float(vec![0.02, 0.02]), + InferedColumn::Float(vec![111.97, 111.97]), + InferedColumn::Integer(vec![-953, -960]), + InferedColumn::Integer(vec![51, 50]), + InferedColumn::Integer(vec![17243, 17242]), + ] + ), + Some("time".to_string()) + ); + } + + #[test] + fn test_likely_datetime_column_ais() { + assert_eq!( + likely_datetime_column( + &[ + "mmsi".to_string(), + "imo_nr".to_string(), + "length".to_string(), + "date_time_utc".to_string(), + "lon".to_string(), + "lat".to_string(), + "sog".to_string(), + "cog".to_string(), + "true_heading".to_string(), + "nav_status".to_string(), + "message_nr".to_string(), + ], + &[ + InferedColumn::Integer(vec![123456789, 123456789]), + InferedColumn::Integer(vec![9876543, 9876543]), + InferedColumn::Integer(vec![100, 100]), + InferedColumn::DateTime(vec![ + hifitime::Epoch::from_unix_seconds(1420110601.0), + hifitime::Epoch::from_unix_seconds(1420110622.0), + ]), + InferedColumn::Float(vec![14.2535, 14.2549]), + InferedColumn::Float(vec![60.0, 60.0]), + InferedColumn::Float(vec![10.0, 10.0]), + InferedColumn::Float(vec![26.2, 28.0]), + InferedColumn::Integer(vec![27, 28]), + InferedColumn::Integer(vec![0, 0]), + InferedColumn::Integer(vec![1, 1]), + ] + ), + Some("date_time_utc".to_string()) + ); + } +} diff --git a/src/infer/geo_guesser.rs b/src/infer/geo_guesser.rs new file mode 100644 index 0000000..f423325 --- /dev/null +++ b/src/infer/geo_guesser.rs @@ -0,0 +1,241 @@ +use super::columns::InferedColumn; +use regex::Regex; +use rust_decimal::Decimal; + +pub fn is_f64_likely_coordinates(value: f64) -> bool { + (-180.0..=180.0).contains(&value) +} + +pub fn is_decimal_likely_coordinates(value: Decimal) -> bool { + value >= Decimal::new(-180, 0) && value <= Decimal::new(180, 0) +} + +static LATITUDE_REGEX: once_cell::sync::Lazy = once_cell::sync::Lazy::new(|| { + Regex::new(r"(?i)^(gps_?(loc_?|location_?|position_?|)|geo_?(loc_?|location_?|position_?|)|position_?|pos_?|coord_?|coordinates_?|)(lat|latitude)$") + .expect("Failed to compile latitude regex") +}); + +static LONGITUDE_REGEX: once_cell::sync::Lazy = once_cell::sync::Lazy::new(|| { + Regex::new( + r"(?i)^(gps_?(loc_?|location_?|position_?|)|geo_?(loc_?|location_?|position_?|)|position_?|pos_?|coord_?|coordinates_?|)(lng|lon|long|longitude)$", + ) + .expect("Failed to compile longitude regex") +}); + +pub enum GeoType { + Latitude, + Longitude, +} + +pub fn lat_long_guesser(mode: GeoType, column_name: &str, column: &InferedColumn) -> isize { + let regex = match mode { + GeoType::Latitude => LATITUDE_REGEX.clone(), + GeoType::Longitude => LONGITUDE_REGEX.clone(), + }; + + let mut sum = 0_isize; + + sum += if regex.is_match(column_name) { + 100 + } else { + -200 + }; + + sum += match column { + InferedColumn::Float(values) => { + if values.iter().all(|value| is_f64_likely_coordinates(*value)) { + 99 + } else { + -101 + } + } + InferedColumn::Numeric(values) => { + if values + .iter() + .all(|value| is_decimal_likely_coordinates(*value)) + { + 98 + } else { + -102 + } + } + _ => -103, + }; + + sum +} + +pub struct LatLonColumnNames { + pub lat: String, + pub lon: String, +} + +pub fn likely_geo_columns( + column_names: &[String], + columns: &[InferedColumn], +) -> Option { + let latitude_best_candidate = column_names + .iter() + .zip(columns.iter()) + .map(|(column_name, column)| { + ( + column_name, + lat_long_guesser(GeoType::Latitude, column_name, column), + ) + }) + .filter(|(_, score)| *score > 0) + .max_by_key(|(_, score)| *score); + + // No need to find a longitude if we don't have a latitude + latitude_best_candidate?; + + let longitude_best_candidate = column_names + .iter() + .zip(columns.iter()) + .map(|(column_name, column)| { + ( + column_name, + lat_long_guesser(GeoType::Longitude, column_name, column), + ) + }) + .filter(|(_, score)| *score > 0) + .max_by_key(|(_, score)| *score); + + let (lon_column_name, score_lon) = longitude_best_candidate?; + let (lat_column_name, score_lat) = latitude_best_candidate?; + + // If the score isn't identical, this is weird. + // So no automatic guessing. + if score_lat != score_lon { + return None; + } + + Some(LatLonColumnNames { + lat: lat_column_name.clone(), + lon: lon_column_name.clone(), + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_lat_long_guesser() { + let column = InferedColumn::Float(vec![0.0, 1.0, 2.0]); + assert_eq!(lat_long_guesser(GeoType::Latitude, "lat", &column), 199); + assert_eq!(lat_long_guesser(GeoType::Longitude, "lon", &column), 199); + assert_eq!( + lat_long_guesser(GeoType::Longitude, "geo_longitude", &column), + 199 + ); + assert_eq!( + lat_long_guesser(GeoType::Longitude, "geoloc_lng", &column), + 199 + ); + assert_eq!(lat_long_guesser(GeoType::Longitude, "speed", &column), -101); + assert_eq!( + lat_long_guesser(GeoType::Longitude, "altitude", &column), + -101 + ); + + let column = InferedColumn::Float(vec![0.0, 1.0, 2.0, 200.0]); + assert_eq!(lat_long_guesser(GeoType::Latitude, "lat", &column), -1); + assert_eq!(lat_long_guesser(GeoType::Longitude, "lon", &column), -1); + assert_eq!(lat_long_guesser(GeoType::Longitude, "speed", &column), -301); + assert_eq!( + lat_long_guesser(GeoType::Longitude, "altitude", &column), + -301 + ); + + let column = InferedColumn::Numeric(vec![ + Decimal::new(0, 0), + Decimal::new(1, 0), + Decimal::new(2, 0), + ]); + assert_eq!(lat_long_guesser(GeoType::Latitude, "lat", &column), 198); + assert_eq!(lat_long_guesser(GeoType::Longitude, "lon", &column), 198); + + let column = InferedColumn::Numeric(vec![ + Decimal::new(0, 0), + Decimal::new(1, 0), + Decimal::new(2, 0), + Decimal::new(200, 0), + ]); + assert_eq!(lat_long_guesser(GeoType::Latitude, "lat", &column), -2); + assert_eq!(lat_long_guesser(GeoType::Longitude, "lon", &column), -2); + + let column = InferedColumn::Boolean(vec![true, false, true]); + assert_eq!(lat_long_guesser(GeoType::Latitude, "lat", &column), -3); + assert_eq!(lat_long_guesser(GeoType::Longitude, "ready", &column), -303); + } + + #[test] + fn test_likely_geo_columns() { + let column_names = vec![ + "lat".to_string(), + "lon".to_string(), + "speed".to_string(), + "altitude".to_string(), + ]; + let columns = vec![ + InferedColumn::Float(vec![0.0, 1.0, 2.0]), + InferedColumn::Float(vec![0.0, 1.0, 2.0]), + InferedColumn::Float(vec![0.0, 1.0, 2.0]), + InferedColumn::Float(vec![0.0, 1.0, 2.0]), + ]; + let result = likely_geo_columns(&column_names, &columns); + assert!(result.is_some()); + let result = result.unwrap(); + assert_eq!(result.lat, "lat"); + assert_eq!(result.lon, "lon"); + + let column_names = vec![ + "weight".to_string(), + "height".to_string(), + "speed".to_string(), + "altitude".to_string(), + ]; + let result = likely_geo_columns(&column_names, &columns); + assert!(result.is_none()); + + let column_names = vec![ + "geo_position_latitude".to_string(), + "height".to_string(), + "speed".to_string(), + "altitude".to_string(), + ]; + let result = likely_geo_columns(&column_names, &columns); + assert!(result.is_none()); + + let column_names = vec![ + "geoposition_latitude".to_string(), + "geoposition_longitude".to_string(), + "speed".to_string(), + "altitude".to_string(), + ]; + let columns = vec![ + InferedColumn::Float(vec![0.0, 1.0, 2.0]), + // Weird coordinates + InferedColumn::Float(vec![0.0, 1.0, -1000.0]), + InferedColumn::Float(vec![0.0, 1.0, 2.0]), + InferedColumn::Float(vec![0.0, 1.0, 2.0]), + ]; + let result = likely_geo_columns(&column_names, &columns); + assert!(result.is_none()); + + let columns = vec![ + InferedColumn::Float(vec![0.0, 1.0, 2.0]), + // Different type + InferedColumn::Numeric(vec![ + Decimal::new(0, 0), + Decimal::new(1, 0), + Decimal::new(2, 0), + ]), + InferedColumn::Float(vec![0.0, 1.0, 2.0]), + InferedColumn::Float(vec![0.0, 1.0, 2.0]), + ]; + let result = likely_geo_columns(&column_names, &columns); + assert!(result.is_none()); + } +} diff --git a/src/infer/is_header.rs b/src/infer/is_header.rs new file mode 100644 index 0000000..4d039e1 --- /dev/null +++ b/src/infer/is_header.rs @@ -0,0 +1,41 @@ +use super::parsing::{infer_type, InferedValue}; + +pub fn is_header(cells: &[String]) -> bool { + if cells.is_empty() { + return false; + } + // Infer the type of every cell + cells.iter().map(|cell| infer_type(cell)).all(|result| { + if let Ok((_, infered_value)) = result { + // matches!(infered_value, InferedValue::String(_)) + match infered_value { + InferedValue::String(string_value) => !string_value.is_empty(), + _ => false, + } + } else { + unreachable!("Error while inferring type"); + } + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_is_header() { + assert!(is_header(&["name".to_string(), "value".to_string()])); + assert!(!is_header(&[ + "name".to_string(), + "value".to_string(), + "3".to_string() + ])); + assert!(!is_header(&[ + "name".to_string(), + "value".to_string(), + "".to_string() + ])); + + assert!(!is_header(&[])); + } +} diff --git a/src/infer/mod.rs b/src/infer/mod.rs index 7f4821a..d932d48 100644 --- a/src/infer/mod.rs +++ b/src/infer/mod.rs @@ -1,3 +1,7 @@ -mod columns; -mod datetime_guesser; -mod infer; +pub mod columns; +pub mod datagrid; +pub mod datetime_guesser; +pub mod geo_guesser; +pub mod is_header; +pub mod parsing; +pub mod uuid; diff --git a/src/infer/infer.rs b/src/infer/parsing.rs similarity index 89% rename from src/infer/infer.rs rename to src/infer/parsing.rs index fb342fc..6a2dc15 100644 --- a/src/infer/infer.rs +++ b/src/infer/parsing.rs @@ -1,6 +1,5 @@ use std::{str::FromStr, sync::Arc}; -use iso8601::DateTime; use nom::{ branch::alt, bytes::complete::tag_no_case, @@ -11,7 +10,6 @@ use nom::{ sequence::{delimited, terminated}, Err, IResult, }; -use polars::chunked_array::iterator::par; use rust_decimal::Decimal; #[derive(Debug, Clone, PartialEq)] @@ -21,7 +19,7 @@ pub enum InferedValue { Float(f64), String(String), Boolean(bool), - JSON(Arc), + Json(Arc), DateTime(hifitime::Epoch), //Location, //Blob(Vec), @@ -29,13 +27,13 @@ pub enum InferedValue { } pub fn parse_integer(data: &str) -> IResult<&str, InferedValue> { - map(i64, |i| InferedValue::Integer(i))(data) + map(i64, InferedValue::Integer)(data) } pub fn parse_float(data: &str) -> IResult<&str, InferedValue> { // We use the "double" parser from nom, that returns a f64. // The parser named "float" from nom returns a f32. - map(double, |f| InferedValue::Float(f))(data) + map(double, InferedValue::Float)(data) } pub fn parse_numeric(data: &str) -> IResult<&str, InferedValue> { @@ -72,7 +70,7 @@ fn is_likely_json(data: &str) -> bool { pub fn parse_json(data: &str) -> IResult<&str, InferedValue> { if is_likely_json(data) { serde_json::from_str(data) - .map(|val| ("", InferedValue::JSON(val))) + .map(|val| ("", InferedValue::Json(val))) .map_err(|_| Err::Error(nom::error::Error::new(data, nom::error::ErrorKind::Fail))) } else { Err(Err::Error(nom::error::Error::new( @@ -88,9 +86,9 @@ fn convert_datetime_from_iso8601_to_hifitime( // convert the iso8601::DateTime to a std::time::Duration first let iso8601::DateTime { date, time } = dt; let (year, month, day) = match date { - iso8601::Date::YMD { year, month, day } => (year as i32, month as u8, day as u8), + iso8601::Date::YMD { year, month, day } => (year, month as u8, day as u8), iso8601::Date::Week { year, ww: _, d: _ } | iso8601::Date::Ordinal { year, ddd: _ } => { - (year as i32, 1, 1) + (year, 1, 1) } }; let iso8601::Time { @@ -103,7 +101,7 @@ fn convert_datetime_from_iso8601_to_hifitime( } = time; //hifitime::Epoch::from_gregorian_utc(year, month, day, hour, minute, second, nanos) // convert milliseconds to nanoseconds - let nanos = millisecond as u32 * 1_000_000_u32; + let nanos = millisecond * 1_000_000_u32; let mut epoch = hifitime::Epoch::maybe_from_gregorian_utc( year, month, @@ -386,24 +384,24 @@ mod tests { fn test_parse_json() { assert_eq!( parse_json("{}"), - Ok(("", InferedValue::JSON(Arc::new(serde_json::json!({}))))) + Ok(("", InferedValue::Json(Arc::new(serde_json::json!({}))))) ); assert_eq!( parse_json("[]"), - Ok(("", InferedValue::JSON(Arc::new(serde_json::json!([]))))) + Ok(("", InferedValue::Json(Arc::new(serde_json::json!([]))))) ); assert_eq!( parse_json("[{\"a\": 1}]"), Ok(( "", - InferedValue::JSON(Arc::new(serde_json::json!([{"a": 1}]))) + InferedValue::Json(Arc::new(serde_json::json!([{"a": 1}]))) )) ); assert_eq!( parse_json("[{\"a\": 1}, {\"b\": 2}]"), Ok(( "", - InferedValue::JSON(Arc::new(serde_json::json!([{"a": 1}, {"b": 2}]))) + InferedValue::Json(Arc::new(serde_json::json!([{"a": 1}, {"b": 2}]))) )) ); assert_eq!( @@ -444,13 +442,13 @@ mod tests { ); assert_eq!( infer_type("{}"), - Ok(("", InferedValue::JSON(Arc::new(serde_json::json!({}))))) + Ok(("", InferedValue::Json(Arc::new(serde_json::json!({}))))) ); assert_eq!( infer_type("[{\"a\": 1}]"), Ok(( "", - InferedValue::JSON(Arc::new(serde_json::json!([{"a": 1}]))) + InferedValue::Json(Arc::new(serde_json::json!([{"a": 1}]))) )) ); } @@ -506,7 +504,7 @@ mod tests { ); assert_eq!( infer_type_with_numeric("{}"), - Ok(("", InferedValue::JSON(Arc::new(serde_json::json!({}))))) + Ok(("", InferedValue::Json(Arc::new(serde_json::json!({}))))) ); assert_eq!( infer_type_with_numeric(" 42.12 "), @@ -595,6 +593,49 @@ mod tests { epoch, hifitime::Epoch::from_gregorian_utc(1969, 12, 24, 19, 56, 32, 93000000) ); + + // Compute a date that is not valid + let dt = iso8601::datetime("1969-02-29T00:00:00").unwrap(); + let epoch = convert_datetime_from_iso8601_to_hifitime(dt); + assert!(epoch.is_err()); + } + + #[test] + fn test_parse_iso8601_datetime() { + assert_eq!( + parse_iso8601_datetime("2020-01-01T00:00:00Z"), + Ok(( + "", + InferedValue::DateTime(hifitime::Epoch::from_gregorian_utc( + 2020, 1, 1, 0, 0, 0, 0 + )) + )) + ); + assert_eq!( + parse_iso8601_datetime("1969-358T14:21:32.0933+05:35"), + Ok(( + "", + InferedValue::DateTime(hifitime::Epoch::from_gregorian_utc( + 1969, 12, 24, 19, 56, 32, 93000000 + )) + )) + ); + + assert_eq!( + parse_iso8601_datetime("2024-07-32T00:00:00Z"), + Err(Err::Error(nom::error::Error::new( + "2024-07-32T00:00:00Z", + nom::error::ErrorKind::Fail + ))) + ); + + assert_eq!( + parse_iso8601_datetime("1969-02-29T00:00:00Z"), + Err(Err::Error(nom::error::Error::new( + "1969-02-29T00:00:00Z", + nom::error::ErrorKind::Fail + ))) + ); } } } diff --git a/src/infer/uuid.rs b/src/infer/uuid.rs new file mode 100644 index 0000000..4c20d6d --- /dev/null +++ b/src/infer/uuid.rs @@ -0,0 +1,103 @@ +use nom::{ + character::{ + complete::{char, satisfy}, + is_hex_digit, + }, + combinator::map, + combinator::{cond, map_res}, + multi::count, + sequence::tuple, + IResult, +}; +use uuid::Uuid; + +#[inline] +fn hex_digit_char(c: char) -> bool { + is_hex_digit(c as u8) +} + +fn parse_hex_char(data: &str) -> IResult<&str, u8> { + map_res(satisfy(hex_digit_char), |s: char| match s.to_digit(16) { + Some(d) => Ok(d as u8), + None => Err("Invalid hex digit"), + })(data) +} + +pub fn parse_uuid(data: &str) -> IResult<&str, uuid::Uuid> { + map( + tuple(( + count(parse_hex_char, 8), + char('-'), + count(parse_hex_char, 4), + char('-'), + count(parse_hex_char, 4), + char('-'), + count(parse_hex_char, 4), + char('-'), + count(parse_hex_char, 12), + )), + |(a, _, b, _, c, _, d, _, e)| { + let bytes = [ + (a[0] << 4) | a[1], + (a[2] << 4) | a[3], + (a[4] << 4) | a[5], + (a[6] << 4) | a[7], + (b[0] << 4) | b[1], + (b[2] << 4) | b[3], + (c[0] << 4) | c[1], + (c[2] << 4) | c[3], + (d[0] << 4) | d[1], + (d[2] << 4) | d[3], + (e[0] << 4) | e[1], + (e[2] << 4) | e[3], + (e[4] << 4) | e[5], + (e[6] << 4) | e[7], + (e[8] << 4) | e[9], + (e[10] << 4) | e[11], + ]; + Uuid::from_bytes(bytes) + }, + )(data) +} + +pub fn attempt_uuid_parsing(s: &str) -> Option { + match parse_uuid(s) { + Ok((_, uuid)) => Some(uuid), + Err(_) => None, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_uuid() { + let uuid = parse_uuid("01234567-89ab-cdef-0123-456789abcdef") + .unwrap() + .1; + assert_eq!(uuid.to_string(), "01234567-89ab-cdef-0123-456789abcdef"); + } + + #[test] + fn test_attempt_uuid_parsing() { + let uuid = attempt_uuid_parsing("01234567-89ab-cdef-0123-456789abcdef").unwrap(); + assert_eq!(uuid.to_string(), "01234567-89ab-cdef-0123-456789abcdef"); + let uuid = attempt_uuid_parsing("01234567-89AB-CDEF-0123-456789ABCDEF").unwrap(); + assert_eq!(uuid.to_string(), "01234567-89ab-cdef-0123-456789abcdef"); + let uuid = attempt_uuid_parsing("aa6e8b8f-5b0b-5b7a-8c4d-2b9f1c1b1b1b").unwrap(); + assert_eq!(uuid.to_string(), "aa6e8b8f-5b0b-5b7a-8c4d-2b9f1c1b1b1b"); + } + + #[test] + fn test_attempt_uuid_parsing_invalid() { + let uuid = attempt_uuid_parsing("01234567-89ab-cdef-0123-456789abcde"); + assert!(uuid.is_none()); + let uuid = attempt_uuid_parsing("01234567-89ab-cdef-0123-456789abcdeg"); + assert!(uuid.is_none()); + let uuid = attempt_uuid_parsing(""); + assert!(uuid.is_none()); + let uuid = attempt_uuid_parsing("auniestau"); + assert!(uuid.is_none()); + } +} diff --git a/src/main.rs b/src/main.rs index 2a2a361..ff0c6e8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,6 +9,7 @@ use axum::routing::get; use axum::routing::post; use axum::Json; use axum::Router; +use config::SensAppConfig; use futures::stream::StreamExt; use futures::TryStreamExt; use polars::prelude::*; @@ -29,14 +30,25 @@ use tower_http::{timeout::TimeoutLayer, trace::TraceLayer, ServiceBuilderExt}; use tracing::event; use tracing::Level; mod bus; +mod config; mod datamodel; mod http; mod importers; mod infer; +mod name_to_uuid; mod storage; #[tokio::main] async fn main() { + // Load configuration + let config = match config::SensAppConfig::load() { + Ok(config) => Arc::new(config), + Err(err) => { + panic!("Failed to load configuration: {:?}", err); + } + }; + config::set(config.clone()).expect("Failed to set configuration"); + /*let (tx, rx) = tokio::sync::mpsc::channel(100); // Channel with buffer size 100 // Simulate event emitter @@ -69,6 +81,10 @@ async fn main() { println!("Hello, world!"); + let columns = infer::columns::infer_column(vec![], false, true); + let _ = infer::datetime_guesser::likely_datetime_column(&vec![], &vec![]); + let _ = infer::geo_guesser::likely_geo_columns(&vec![], &vec![]); + /*let event_bus = event_bus::EVENT_BUS .get_or_init(|| event_bus::init_event_bus()) .await; @@ -109,15 +125,25 @@ async fn main() { } }); - run_http_server( + let endpoint = config.endpoint; + let port = config.port; + println!("📡 HTTP server listening on {}:{}", endpoint, port); + match run_http_server( HttpServerState { name: "SensApp".to_string(), event_bus: event_bus, }, - SocketAddr::from((Ipv4Addr::LOCALHOST, 3000)), + SocketAddr::from((endpoint, port)), ) .await - .expect("Failed to run HTTP server"); + { + Ok(_) => { + event!(Level::INFO, "HTTP server stopped"); + } + Err(err) => { + event!(Level::ERROR, "HTTP server failed: {:?}", err); + } + } } async fn handler() -> &'static str { diff --git a/src/name_to_uuid.rs b/src/name_to_uuid.rs new file mode 100644 index 0000000..087a5d7 --- /dev/null +++ b/src/name_to_uuid.rs @@ -0,0 +1,101 @@ +use crate::infer::uuid::attempt_uuid_parsing; +use anyhow::{anyhow, Error}; +use once_cell::sync::OnceCell; +use std::sync::Arc; +use uuid::Uuid; + +type NameToUuidKey = [u8; 32]; +static UUID_HASH_MAC: OnceCell> = OnceCell::new(); + +pub fn initialise_uuid_hash_mac(salt: &str) -> Result<(), Error> { + const KEY_CONTEXT: &str = "SENSAPP uuid hash mac 2024-01-19 strings to unique ids"; + let key = blake3::derive_key(KEY_CONTEXT, salt.as_bytes()); + + match UUID_HASH_MAC.set(Arc::new(key)) { + Ok(_) => Ok(()), + Err(e) => Err(anyhow!("Failed to set UUID_HASH_MAC: {:?}", e)), + } +} + +pub fn uuid_v8_blake3(name: &str) -> Result { + // uuid::Uuid::from_bytes(uuid::v5::NAMESPACE_DNS, name.as_bytes()) + // Using a UUID v5 (SHA1) or v3 (MD5) is too easy to implement. + // It's friday, let's take terrible decisions and use Blake3 instead. + + let key = UUID_HASH_MAC.get().ok_or_else(|| { + anyhow!("UUID_HASH_MAC not initialised. Please call initialise_uuid_hash_mac() before using name_to_uuid()") + })?; + + // Create the random bytes + let mut hash_output = [0; 16]; + let mut hasher = blake3::Hasher::new_keyed(key); + hasher.update(name.as_bytes()); + hasher.finalize_xof().fill(&mut hash_output); + + Ok(uuid::Builder::from_custom_bytes(hash_output).into_uuid()) +} + +pub fn name_to_uuid(name: &str) -> Result { + match attempt_uuid_parsing(name) { + Some(uuid) => Ok(uuid), + None => uuid_v8_blake3(name), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_uuid_v8_blake3() { + let _ = initialise_uuid_hash_mac("sensapp tests"); + + let uuid = uuid_v8_blake3("test").unwrap(); + assert_eq!(uuid.to_string(), "a2794553-385f-8d6c-9d2f-843cf728307a"); + + let uuid = uuid_v8_blake3("test2").unwrap(); + assert_eq!(uuid.to_string(), "daa4b5f3-70b5-820f-819b-787344e7a4c7"); + + // This is case sensitive + let uuid = uuid_v8_blake3("TEST").unwrap(); + assert_eq!(uuid.to_string(), "6aa50a6c-9f4f-899f-9f24-93efacb0c9e5"); + + let uuid = uuid_v8_blake3("").unwrap(); + assert_eq!(uuid.to_string(), "58748fa2-0c24-86b3-925b-59e65e916af0"); + + // Giving an UUID will return another UUID + let uuid = uuid_v8_blake3("aa6e8b8f0-5b0b-5b7a-8c4d-2b9f1c1b1b1b").unwrap(); + assert_eq!(uuid.to_string(), "d90a33ab-0e7e-8e19-99ab-847c5399884a"); + + // Already initialised + let is_err = initialise_uuid_hash_mac("sensapp tests 2"); + assert!(is_err.is_err()); + } + + #[test] + fn test_name_to_uuid() { + let _ = initialise_uuid_hash_mac("sensapp tests"); + let uuid = name_to_uuid("test").unwrap(); + assert_eq!(uuid.to_string(), "a2794553-385f-8d6c-9d2f-843cf728307a"); + + let uuid = name_to_uuid("test2").unwrap(); + assert_eq!(uuid.to_string(), "daa4b5f3-70b5-820f-819b-787344e7a4c7"); + + let uuid = name_to_uuid("").unwrap(); + assert_eq!(uuid.to_string(), "58748fa2-0c24-86b3-925b-59e65e916af0"); + + // Giving an UUID will return the same UUID + let uuid = name_to_uuid("aa6e8b8f-5b0b-5b7a-8c4d-2b9f1c1b1b1b").unwrap(); + assert_eq!(uuid.to_string(), "aa6e8b8f-5b0b-5b7a-8c4d-2b9f1c1b1b1b"); + + // This is not case sensitive + let uuid = name_to_uuid("AA6E8B8F-5b0b-5B7A-8c4d-2B9F1C1B1B1B").unwrap(); + assert_eq!(uuid.to_string(), "aa6e8b8f-5b0b-5b7a-8c4d-2b9f1c1b1b1b"); + + // If it's not a valid UUID, even it's almost, it will return a new UUID + // This may be a bit confusing to the users + // But I'm not sure that trying to detect almost UUIDs is sound. + let uuid = name_to_uuid("aa6e8b8f0-5b0b-5b7a-8c4d-2b9f1c1b1b1G").unwrap(); + assert_eq!(uuid.to_string(), "48dfa368-01bd-8d5a-892e-1e009653c92b"); + } +} diff --git a/src/storage/duckdb/mod.rs b/src/storage/duckdb/mod.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/storage/mod.rs b/src/storage/mod.rs index a6b35bb..7e29212 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1,3 +1,4 @@ +pub mod duckdb; pub mod postgresql; pub mod sqlite; pub mod storage; diff --git a/src/storage/sqlite/migrations/20240110093153_init.sql b/src/storage/sqlite/migrations/20240110093153_init.sql index a7bff85..f9ef384 100644 --- a/src/storage/sqlite/migrations/20240110093153_init.sql +++ b/src/storage/sqlite/migrations/20240110093153_init.sql @@ -3,7 +3,7 @@ CREATE TABLE units ( id INTEGER PRIMARY KEY AUTOINCREMENT, -- Auto-incrementing primary key name TEXT NOT NULL UNIQUE, -- Unique name, cannot be null description TEXT -- Optional description -); +) STRICT; -- Create the 'sensors' table with both UUID and auto-incrementing sensor_id CREATE TABLE sensors ( @@ -13,7 +13,7 @@ CREATE TABLE sensors ( type TEXT NOT NULL, -- Type of the sensor (e.g., integer, float, etc.), cannot be null unit INTEGER, -- References 'units' (optional) FOREIGN KEY (unit) REFERENCES units(id) -- Foreign key to 'units' table -); +) STRICT; -- Create the 'labels' table CREATE TABLE labels ( @@ -22,25 +22,25 @@ CREATE TABLE labels ( description INTEGER, -- ID for the description in the dictionary (optional) PRIMARY KEY (sensor_id, named), FOREIGN KEY (sensor_id) REFERENCES sensors(sensor_id) -- Foreign key to 'sensors' table -); +) STRICT; -- Create the 'labels_name_dictionary' table CREATE TABLE labels_name_dictionary ( id INTEGER PRIMARY KEY AUTOINCREMENT, -- Auto-incrementing primary key name TEXT NOT NULL UNIQUE -- Unique name for label, cannot be null -); +) STRICT; -- Create the 'labels_description_dictionary' table CREATE TABLE labels_description_dictionary ( id INTEGER PRIMARY KEY AUTOINCREMENT, -- Auto-incrementing primary key description TEXT NOT NULL UNIQUE -- Unique description for label, cannot be null -); +) STRICT; -- Create the 'strings_values_dictionary' table CREATE TABLE strings_values_dictionary ( id INTEGER PRIMARY KEY AUTOINCREMENT, -- Auto-incrementing primary key value TEXT NOT NULL UNIQUE -- Unique text value, cannot be null -); +) STRICT; -- Create the 'integer_values' table CREATE TABLE integer_values ( @@ -48,15 +48,15 @@ CREATE TABLE integer_values ( timestamp_ms INTEGER NOT NULL, -- Unix timestamp in milliseconds, cannot be null value INTEGER NOT NULL, -- Integer value, cannot be null FOREIGN KEY (sensor_id) REFERENCES sensors(sensor_id) -- Foreign key to 'sensors' table -); +) STRICT; -- Create the 'numeric_values' table CREATE TABLE numeric_values ( sensor_id INTEGER NOT NULL, -- References 'sensors' (sensor_id), cannot be null timestamp_ms INTEGER NOT NULL, -- Unix timestamp in milliseconds, cannot be null - value NUMERIC NOT NULL, -- Numeric value, cannot be null + value TEXT NOT NULL, -- Numeric value, cannot be null FOREIGN KEY (sensor_id) REFERENCES sensors(sensor_id) -- Foreign key to 'sensors' table -); +) STRICT; -- Create the 'float_values' table CREATE TABLE float_values ( @@ -64,7 +64,7 @@ CREATE TABLE float_values ( timestamp_ms INTEGER NOT NULL, -- Unix timestamp in milliseconds, cannot be null value REAL NOT NULL, -- Real (float) value, cannot be null FOREIGN KEY (sensor_id) REFERENCES sensors(sensor_id) -- Foreign key to 'sensors' table -); +) STRICT; -- Create the 'string_values' table CREATE TABLE string_values ( @@ -73,15 +73,15 @@ CREATE TABLE string_values ( value INTEGER NOT NULL, -- References 'strings_values_dictionary', cannot be null FOREIGN KEY (sensor_id) REFERENCES sensors(sensor_id), -- Foreign key to 'sensors' table FOREIGN KEY (value) REFERENCES strings_values_dictionary(id) -- Foreign key to 'strings_values_dictionary' -); +) STRICT; -- Create the 'boolean_values' table CREATE TABLE boolean_values ( sensor_id INTEGER NOT NULL, -- References 'sensors' (sensor_id), cannot be null timestamp_ms INTEGER NOT NULL, -- Unix timestamp in milliseconds, cannot be null - value BOOLEAN NOT NULL, -- Boolean value, cannot be null + value INTEGER NOT NULL, -- Integer Boolean value, cannot be null FOREIGN KEY (sensor_id) REFERENCES sensors(sensor_id) -- Foreign key to 'sensors' table -); +) STRICT; -- Create the 'localisations' table CREATE TABLE localisations ( @@ -90,4 +90,21 @@ CREATE TABLE localisations ( latitude REAL NOT NULL, -- Latitude value, cannot be null longitude REAL NOT NULL, -- Longitude value, cannot be null FOREIGN KEY (sensor_id) REFERENCES sensors(sensor_id) -- Foreign key to 'sensors' table -); +) STRICT; + +-- Create the 'json_values' table +CREATE TABLE json_values ( + sensor_id INTEGER NOT NULL, -- References 'sensors' (sensor_id), cannot be null + timestamp_ms INTEGER NOT NULL, -- Unix timestamp in milliseconds, cannot be null + value BLOB NOT NULL, -- BLOB JSONB value, cannot be null + FOREIGN KEY (sensor_id) REFERENCES sensors(sensor_id) -- Foreign key to 'sensors' table +) STRICT; + +-- Indexes +CREATE INDEX index_integer_values ON integer_values(sensor_id, timestamp_ms); +CREATE INDEX index_numeric_values ON numeric_values(sensor_id, timestamp_ms); +CREATE INDEX index_float_values ON float_values(sensor_id, timestamp_ms); +CREATE INDEX index_string_values ON string_values(sensor_id, timestamp_ms); +CREATE INDEX index_boolean_values ON boolean_values(sensor_id, timestamp_ms); +CREATE INDEX index_localisations ON localisations(sensor_id, timestamp_ms); +CREATE INDEX index_json_values ON json_values(sensor_id, timestamp_ms); diff --git a/src/storage/sqlite/sqlite.rs b/src/storage/sqlite/sqlite.rs index 3ed2db7..ee38a77 100644 --- a/src/storage/sqlite/sqlite.rs +++ b/src/storage/sqlite/sqlite.rs @@ -163,7 +163,7 @@ impl SqliteStorage { ) -> Result<()> { let mut transaction = self.pool.begin().await?; for value in values { - let lol = sqlx::query!( + let query = sqlx::query!( r#" INSERT INTO integer_values (sensor_id, timestamp_ms, value) VALUES (?, ?, ?) @@ -172,9 +172,57 @@ impl SqliteStorage { value.timestamp_ms, value.value ); - transaction.execute(lol).await?; + transaction.execute(query).await?; } transaction.commit().await?; Ok(()) } + + async fn publish_float_values(&self, sensor_id: i64, values: &Vec>) -> Result<()> { + let mut transaction = self.pool.begin().await?; + for value in values { + let query = sqlx::query!( + r#" + INSERT INTO float_values (sensor_id, timestamp_ms, value) + VALUES (?, ?, ?) + "#, + sensor_id, + value.timestamp_ms, + value.value + ); + transaction.execute(query).await?; + } + transaction.commit().await?; + Ok(()) + } + + async fn vacuum(&self) -> Result<()> { + let mut transaction = self.pool.begin().await?; + transaction + .execute(sqlx::query!( + r#" + DELETE FROM integer_values WHERE rowid NOT IN ( + SELECT MIN(rowid) FROM integer_values GROUP BY sensor_id, timestamp_ms, value + ) + "# + )) + .await?; + + transaction + .execute(sqlx::query!( + r#" + DELETE FROM float_values WHERE rowid NOT IN ( + SELECT MIN(rowid) FROM float_values GROUP BY sensor_id, timestamp_ms, value + ) + "# + )) + .await?; + + transaction.commit().await?; + + let vacuum = sqlx::query!("VACUUM"); + vacuum.execute(&self.pool).await?; + + Ok(()) + } }