From a06b0bf5e244daa8dc40fcc32488d609d795304f Mon Sep 17 00:00:00 2001 From: Solomon <> Date: Fri, 3 Nov 2023 08:26:44 +0100 Subject: [PATCH] feat: enable SQL queries on cache endpoints --- Cargo.lock | 224 ++++++--- dozer-api/Cargo.toml | 5 + dozer-api/src/lib.rs | 1 + dozer-api/src/pgwire/datafusion.rs | 335 ++++++++++++++ dozer-api/src/pgwire/mod.rs | 515 +++++++++++++++++++++ dozer-api/src/pgwire/predicate_pushdown.rs | 310 +++++++++++++ dozer-cli/src/errors.rs | 2 + dozer-cli/src/simple/orchestrator.rs | 16 +- dozer-ingestion/Cargo.toml | 2 +- dozer-ingestion/deltalake/Cargo.toml | 2 +- dozer-ingestion/object-store/Cargo.toml | 4 +- dozer-types/Cargo.toml | 5 +- dozer-types/src/arrow_types/to_arrow.rs | 23 +- dozer-types/src/lib.rs | 1 + dozer-types/src/models/api_config.rs | 17 + json_schemas/dozer.json | 23 + 16 files changed, 1399 insertions(+), 86 deletions(-) create mode 100644 dozer-api/src/pgwire/datafusion.rs create mode 100644 dozer-api/src/pgwire/mod.rs create mode 100644 dozer-api/src/pgwire/predicate_pushdown.rs diff --git a/Cargo.lock b/Cargo.lock index 65ca87980a..634020b012 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -539,9 +539,9 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" [[package]] name = "arrow" -version = "45.0.0" +version = "46.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7104b9e9761613ae92fe770c741d6bbf1dbc791a0fe204400aebdd429875741" +checksum = "04a8801ebb147ad240b2d978d3ab9f73c9ccd4557ba6a03e7800496770ed10e0" dependencies = [ "ahash 0.8.3", "arrow-arith", @@ -561,9 +561,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "45.0.0" +version = "46.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38e597a8e8efb8ff52c50eaf8f4d85124ce3c1bf20fab82f476d73739d9ab1c2" +checksum = "895263144bd4a69751cbe6a34a53f26626e19770b313a9fa792c415cd0e78f11" dependencies = [ "arrow-array", "arrow-buffer", @@ -576,9 +576,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "45.0.0" +version = "46.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a86d9c1473db72896bd2345ebb6b8ad75b8553ba390875c76708e8dc5c5492d" +checksum = "226fdc6c3a4ae154a74c24091d36a90b514f0ed7112f5b8322c1d8f354d8e20d" dependencies = [ "ahash 0.8.3", "arrow-buffer", @@ -593,19 +593,20 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "45.0.0" +version = "46.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "234b3b1c8ed00c874bf95972030ac4def6f58e02ea5a7884314388307fb3669b" +checksum = "fc4843af4dd679c2f35b69c572874da8fde33be53eb549a5fb128e7a4b763510" dependencies = [ + "bytes", "half 2.3.1", "num", ] [[package]] name = "arrow-cast" -version = "45.0.0" +version = "46.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22f61168b853c7faea8cea23a2169fdff9c82fb10ae5e2c07ad1cab8f6884931" +checksum = "35e8b9990733a9b635f656efda3c9b8308c7a19695c9ec2c7046dd154f9b144b" dependencies = [ "arrow-array", "arrow-buffer", @@ -621,9 +622,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "45.0.0" +version = "46.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10b545c114d9bf8569c84d2fbe2020ac4eea8db462c0a37d0b65f41a90d066fe" +checksum = "646fbb4e11dd0afb8083e883f53117713b8caadb4413b3c9e63e3f535da3683c" dependencies = [ "arrow-array", "arrow-buffer", @@ -640,9 +641,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "45.0.0" +version = "46.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6b6852635e7c43e5b242841c7470606ff0ee70eef323004cacc3ecedd33dd8f" +checksum = "da900f31ff01a0a84da0572209be72b2b6f980f3ea58803635de47913191c188" dependencies = [ "arrow-buffer", "arrow-schema", @@ -652,9 +653,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "45.0.0" +version = "46.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a66da9e16aecd9250af0ae9717ae8dd7ea0d8ca5a3e788fe3de9f4ee508da751" +checksum = "2707a8d7ee2d345d045283ece3ae43416175873483e5d96319c929da542a0b1f" dependencies = [ "arrow-array", "arrow-buffer", @@ -666,9 +667,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "45.0.0" +version = "46.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60ee0f9d8997f4be44a60ee5807443e396e025c23cf14d2b74ce56135cb04474" +checksum = "5d1b91a63c356d14eedc778b76d66a88f35ac8498426bb0799a769a49a74a8b4" dependencies = [ "arrow-array", "arrow-buffer", @@ -686,9 +687,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "45.0.0" +version = "46.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fcab05410e6b241442abdab6e1035177dc082bdb6f17049a4db49faed986d63" +checksum = "584325c91293abbca7aaaabf8da9fe303245d641f5f4a18a6058dc68009c7ebf" dependencies = [ "arrow-array", "arrow-buffer", @@ -701,9 +702,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "45.0.0" +version = "46.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91a847dd9eb0bacd7836ac63b3475c68b2210c2c96d0ec1b808237b973bd5d73" +checksum = "0e32afc1329f7b372463b21c6ca502b07cf237e1ed420d87706c1770bb0ebd38" dependencies = [ "ahash 0.8.3", "arrow-array", @@ -716,18 +717,18 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "45.0.0" +version = "46.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54df8c47918eb634c20e29286e69494fdc20cafa5173eb6dad49c7f6acece733" +checksum = "b104f5daa730f00fde22adc03a12aa5a2ae9ccbbf99cbd53d284119ddc90e03d" dependencies = [ "serde", ] [[package]] name = "arrow-select" -version = "45.0.0" +version = "46.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "941dbe481da043c4bd40c805a19ec2fc008846080c4953171b62bcad5ee5f7fb" +checksum = "73b3ca55356d1eae07cf48808d8c462cea674393ae6ad1e0b120f40b422eb2b4" dependencies = [ "arrow-array", "arrow-buffer", @@ -738,9 +739,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "45.0.0" +version = "46.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "359b2cd9e071d5a3bcf44679f9d85830afebc5b9c98a08019a570a65ae933e0f" +checksum = "af1433ce02590cae68da0a18ed3a3ed868ffac2c6f24c533ddd2067f7ee04b4a" dependencies = [ "arrow-array", "arrow-buffer", @@ -1385,6 +1386,16 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "bcder" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf16bec990f8ea25cab661199904ef452fcf11f565c404ce6cffbdf3f8cbbc47" +dependencies = [ + "bytes", + "smallvec", +] + [[package]] name = "beef" version = "0.5.2" @@ -1798,9 +1809,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.26" +version = "0.4.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec837a71355b28f6556dbd569b37b3f363091c0bd4b2e735674521b4c5fd9bc5" +checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" dependencies = [ "android-tzdata", "arbitrary", @@ -1808,9 +1819,8 @@ dependencies = [ "js-sys", "num-traits", "serde", - "time 0.1.45", "wasm-bindgen", - "winapi", + "windows-targets 0.48.0", ] [[package]] @@ -2570,9 +2580,9 @@ checksum = "41b319d1b62ffbd002e057f36bebd1f42b9f97927c9577461d855f3513c4289f" [[package]] name = "datafusion" -version = "30.0.0" +version = "31.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45e3bb3a788d9fa793268e9cec2601d79831ed1be437ba74d1deb32b226ae734" +checksum = "6a4e4fc25698a14c90b34dda647ba10a5a966dc04b036d22e77fb1048663375d" dependencies = [ "ahash 0.8.3", "arrow", @@ -2597,7 +2607,6 @@ dependencies = [ "hashbrown 0.14.0", "indexmap 2.0.0", "itertools 0.11.0", - "lazy_static", "log", "num_cpus", "object_store", @@ -2606,8 +2615,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "rand 0.8.5", - "smallvec", - "sqlparser 0.36.1", + "sqlparser 0.37.0", "tempfile", "tokio", "tokio-util 0.7.8", @@ -2619,9 +2627,9 @@ dependencies = [ [[package]] name = "datafusion-common" -version = "30.0.0" +version = "31.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0dd256483875270612d4fa439359bafa6f1760bae080ecb69eecc59a92b5016f" +checksum = "c23ad0229ea4a85bf76b236d8e75edf539881fdb02ce4e2394f9a76de6055206" dependencies = [ "arrow", "arrow-array", @@ -2634,7 +2642,7 @@ dependencies = [ "num_cpus", "object_store", "parquet", - "sqlparser 0.36.1", + "sqlparser 0.37.0", "tokio", "tokio-util 0.7.8", "xz2", @@ -2643,9 +2651,9 @@ dependencies = [ [[package]] name = "datafusion-execution" -version = "30.0.0" +version = "31.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4973610d680bdc38f409a678c838d3873356cc6c29a543d1f56d7b4801e8d0a4" +checksum = "9b37d2fc1a213baf34e0a57c85b8e6648f1a95152798fd6738163ee96c19203f" dependencies = [ "arrow", "dashmap 5.4.0", @@ -2663,24 +2671,23 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "30.0.0" +version = "31.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f3599f4cfcf22490f7b7d6d2fc70610ca8045b8bdcd99ef9d4309cf2b387537" +checksum = "d6ea9844395f537730a145e5d87f61fecd37c2bc9d54e1dc89b35590d867345d" dependencies = [ "ahash 0.8.3", "arrow", "datafusion-common", - "lazy_static", - "sqlparser 0.36.1", + "sqlparser 0.37.0", "strum 0.25.0", "strum_macros 0.25.1", ] [[package]] name = "datafusion-optimizer" -version = "30.0.0" +version = "31.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f067401eea6a0967c83021e714746f9153368cca964d45c4a1a4f99869a1512f" +checksum = "c8a30e0f79c5d59ba14d3d70f2500e87e0ff70236ad5e47f9444428f054fd2be" dependencies = [ "arrow", "async-trait", @@ -2696,9 +2703,9 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" -version = "30.0.0" +version = "31.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "964c19161288d374fe066535f84de37a1dab419e47a24e02f3a0ca6413744451" +checksum = "766c567082c9bbdcb784feec8fe40c7049cedaeb3a18d54f563f75fe0dc1932c" dependencies = [ "ahash 0.8.3", "arrow", @@ -2716,7 +2723,6 @@ dependencies = [ "hex", "indexmap 2.0.0", "itertools 0.11.0", - "lazy_static", "libc", "log", "md-5 0.10.5", @@ -2731,9 +2737,9 @@ dependencies = [ [[package]] name = "datafusion-proto" -version = "30.0.0" +version = "31.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fde2768f10f1a5d47d164e0219ececb00f0dcd36f33079b656e03ad20e33c68" +checksum = "440553913695dd8db587b43f54fdc7bbd8649326172d16a4904bb41e792b7297" dependencies = [ "arrow", "chrono", @@ -2746,16 +2752,16 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "30.0.0" +version = "31.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b0939df21e440efcb35078c22b0192c537f7a53ebf1a34288a3a134753dd364" +checksum = "811fd084cf2d78aa0c76b74320977c7084ad0383690612528b580795764b4dd0" dependencies = [ "arrow", "arrow-schema", "datafusion-common", "datafusion-expr", "log", - "sqlparser 0.36.1", + "sqlparser 0.37.0", ] [[package]] @@ -2770,9 +2776,9 @@ dependencies = [ [[package]] name = "deltalake" -version = "0.15.0" +version = "0.16.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39fbcd162d595e3b7e7af762b05abbdb14218615d24e8c40d23b1cfe1a408589" +checksum = "acf47a902e00dcc8af977c60319592d45040d56544e781afd440996d6c205f49" dependencies = [ "arrow", "arrow-array", @@ -2808,6 +2814,7 @@ dependencies = [ "parking_lot", "parquet", "percent-encoding", + "rand 0.8.5", "regex", "rusoto_core", "rusoto_credential", @@ -2815,7 +2822,7 @@ dependencies = [ "rusoto_sts", "serde", "serde_json", - "sqlparser 0.36.1", + "sqlparser 0.37.0", "thiserror", "tokio", "url", @@ -3446,6 +3453,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "derive-new" +version = "0.5.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3418329ca0ad70234b9735dc4ceed10af4df60eff9c8e7b06cb5e520d92c3535" +dependencies = [ + "proc-macro2 1.0.63", + "quote 1.0.30", + "syn 1.0.109", +] + [[package]] name = "derive_arbitrary" version = "1.3.1" @@ -3574,6 +3592,9 @@ dependencies = [ "async-stream", "async-trait", "bytes", + "chrono", + "datafusion", + "datafusion-expr", "dozer-cache", "dozer-core", "dozer-tracing", @@ -3587,9 +3608,11 @@ dependencies = [ "jsonwebtoken", "metrics", "openapiv3", + "pgwire", "pin-project", "prost-build 0.12.0", "prost-reflect", + "serde_json", "tempdir", "tokio", "tokio-stream", @@ -4026,6 +4049,7 @@ dependencies = [ "ahash 0.8.3", "arbitrary", "arrow", + "arrow-cast", "arrow-schema", "bincode", "bytes", @@ -4963,6 +4987,18 @@ dependencies = [ "wasi 0.11.0+wasi-snapshot-preview1", ] +[[package]] +name = "getset" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e45727250e75cc04ff2846a66397da8ef2b3db8e40e0cef4df67950a07621eb9" +dependencies = [ + "proc-macro-error 1.0.4", + "proc-macro2 1.0.63", + "quote 1.0.30", + "syn 1.0.109", +] + [[package]] name = "ghash" version = "0.5.0" @@ -6334,6 +6370,12 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "md5" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" + [[package]] name = "memchr" version = "2.6.4" @@ -6981,9 +7023,9 @@ dependencies = [ [[package]] name = "object_store" -version = "0.6.1" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27c776db4f332b571958444982ff641d2531417a326ca368995073b639205d58" +checksum = "f930c88a43b1c3f6e776dfe495b4afab89882dbc81530c632db2ed65451ebcb4" dependencies = [ "async-trait", "base64 0.21.0", @@ -6992,7 +7034,7 @@ dependencies = [ "futures", "humantime", "hyper 0.14.27", - "itertools 0.10.5", + "itertools 0.11.0", "parking_lot", "percent-encoding", "quick-xml", @@ -7390,9 +7432,9 @@ dependencies = [ [[package]] name = "parquet" -version = "45.0.0" +version = "46.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49f9739b984380582bdb7749ae5b5d28839bce899212cf16465c1ac1f8b65d79" +checksum = "1ad2cba786ae07da4d73371a88b9e0f9d3ffac1a9badc83922e0e15814f5c5fa" dependencies = [ "ahash 0.8.3", "arrow-array", @@ -7593,6 +7635,34 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "pgwire" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06d04982366efd653d4365175426acbabd55efb07231869e92b9e1f5b3faf7df" +dependencies = [ + "async-trait", + "base64 0.21.0", + "bytes", + "chrono", + "derive-new", + "futures", + "getset", + "hex", + "log", + "md5", + "postgres-types", + "rand 0.8.5", + "ring", + "stringprep", + "thiserror", + "time 0.3.20", + "tokio", + "tokio-rustls 0.24.0", + "tokio-util 0.7.8", + "x509-certificate", +] + [[package]] name = "phf" version = "0.10.1" @@ -8333,9 +8403,9 @@ checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" [[package]] name = "quick-xml" -version = "0.28.2" +version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ce5e73202a820a31f8a0ee32ada5e21029c81fd9e3ebf668a40832e4219d9d1" +checksum = "eff6510e86862b57b210fd8cbe8ed3f0d7d600b9c2863cd4549a2e033c66e956" dependencies = [ "memchr", "serde", @@ -9971,9 +10041,9 @@ dependencies = [ [[package]] name = "sqlparser" -version = "0.36.1" +version = "0.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2eaa1e88e78d2c2460d78b7dc3f0c08dbb606ab4222f9aff36f420d36e307d87" +checksum = "37ae05a8250b968a3f7db93155a84d68b2e6cea1583949af5ca5b5170c76c075" dependencies = [ "log", "sqlparser_derive", @@ -12193,6 +12263,24 @@ dependencies = [ "zeroize", ] +[[package]] +name = "x509-certificate" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e5d27c90840e84503cf44364de338794d5d5680bdd1da6272d13f80b0769ee0" +dependencies = [ + "bcder", + "bytes", + "chrono", + "der 0.7.8", + "hex", + "pem 2.0.1", + "ring", + "signature 2.1.0", + "spki 0.7.2", + "thiserror", +] + [[package]] name = "x509-parser" version = "0.15.1" diff --git a/dozer-api/Cargo.toml b/dozer-api/Cargo.toml index 5400a810ca..8403f638b7 100644 --- a/dozer-api/Cargo.toml +++ b/dozer-api/Cargo.toml @@ -52,4 +52,9 @@ http = "0.2.9" pin-project = "1.1.3" async-stream = "0.3.5" uuid = "1.4.1" +chrono = "0.4.31" +datafusion = "31.0.0" +datafusion-expr = "31.0.0" +serde_json = { version = "1.0.107", features = ["arbitrary_precision"] } +pgwire = "0.16.1" tempdir = "0.3.7" diff --git a/dozer-api/src/lib.rs b/dozer-api/src/lib.rs index 2b3eabcfec..2672c841d1 100644 --- a/dozer-api/src/lib.rs +++ b/dozer-api/src/lib.rs @@ -12,6 +12,7 @@ pub use tonic_reflection; pub use tonic_web; pub use tower_http; mod api_helper; +pub mod pgwire; pub use api_helper::get_api_security; #[derive(Debug)] diff --git a/dozer-api/src/pgwire/datafusion.rs b/dozer-api/src/pgwire/datafusion.rs new file mode 100644 index 0000000000..6ad28cdfe2 --- /dev/null +++ b/dozer-api/src/pgwire/datafusion.rs @@ -0,0 +1,335 @@ +use std::{any::Any, sync::Arc}; + +use async_trait::async_trait; +use datafusion::arrow::datatypes::{DataType, Field, Schema}; +use datafusion::catalog::schema::{MemorySchemaProvider, SchemaProvider}; +use datafusion::datasource::{TableProvider, TableType}; +use datafusion::error::{DataFusionError, Result}; +use datafusion::execution::context::{SessionState, TaskContext}; +use datafusion::physical_expr::PhysicalSortExpr; +use datafusion::physical_plan::memory::MemoryExec; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, + Statistics, +}; +use datafusion::prelude::{DataFrame, SessionConfig, SessionContext}; +use datafusion::sql::TableReference; +use datafusion_expr::{Expr, TableProviderFilterPushDown}; +use dozer_types::arrow::datatypes::SchemaRef; +use dozer_types::arrow::record_batch::RecordBatch; + +use dozer_cache::cache::{expression::QueryExpression, CacheRecord}; +use dozer_types::arrow_types::to_arrow::{map_record_to_arrow, map_to_arrow_schema}; +use dozer_types::log::debug; +use dozer_types::types::Schema as DozerSchema; +use futures_util::stream::BoxStream; +use futures_util::StreamExt; + +use crate::api_helper::get_records; +use crate::CacheEndpoint; + +use super::predicate_pushdown::{predicate_pushdown, supports_predicates_pushdown}; + +pub struct SQLExecutor { + ctx: SessionContext, +} + +impl SQLExecutor { + pub fn new(cache_endpoints: Vec>) -> Self { + let ctx = SessionContext::with_config( + SessionConfig::new() + .with_information_schema(true) + .with_default_catalog_and_schema("public", "dozer"), + ); + for cache_endpoint in cache_endpoints { + let data_source = CacheEndpointDataSource::new(cache_endpoint.clone()); + let _provider = ctx + .register_table( + TableReference::Bare { + table: cache_endpoint.endpoint().name.clone().into(), + }, + Arc::new(data_source), + ) + .unwrap(); + } + { + let schema = Arc::new(MemorySchemaProvider::new()) as Arc; + let pg_type = Arc::new(PgTypesView::new()); + schema.register_table("pg_type".into(), pg_type).unwrap(); + ctx.catalog("public") + .unwrap() + .register_schema("pg_catalog", schema.clone()) + .unwrap(); + } + Self { ctx } + } + + pub async fn execute(&self, sql: &str) -> Result { + self.ctx.sql(sql).await + } +} + +/// A custom datasource, used to represent a datastore with a single index +#[derive(Debug, Clone)] +pub struct CacheEndpointDataSource { + cache_endpoint: Arc, + schema: SchemaRef, +} + +impl CacheEndpointDataSource { + pub fn new(cache_endpoint: Arc) -> Self { + let schema = { + let cache_reader = &cache_endpoint.cache_reader(); + let schema = &cache_reader.get_schema().0; + Arc::new(map_to_arrow_schema(schema).unwrap()) + }; + Self { + cache_endpoint, + schema, + } + } +} + +#[async_trait] +impl TableProvider for CacheEndpointDataSource { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &SessionState, + projection: Option<&Vec>, + // filters and limit can be used here to inject some push-down operations if needed + filters: &[Expr], + limit: Option, + ) -> Result> { + Ok(Arc::new(CacheEndpointExec::try_new( + self.cache_endpoint.clone(), + self.schema.clone(), + projection, + filters.to_vec(), + limit, + )?)) + } + + // fn supports_filter_pushdown(&self, filter: &Expr) -> Result { + // supports_predicate_pushdown(filter) + // } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> Result> { + Ok(supports_predicates_pushdown( + filters, + self.cache_endpoint.clone(), + )) + } +} + +#[derive(Debug)] +pub struct CacheEndpointExec { + cache_endpoint: Arc, + projection: Option>, + projected_schema: SchemaRef, + filters: Vec, + limit: Option, +} + +impl CacheEndpointExec { + /// Try to create a new [`StreamingTableExec`] returning an error if the schema is incorrect + pub fn try_new( + cache_endpoint: Arc, + schema: SchemaRef, + projection: Option<&Vec>, + filters: Vec, + limit: Option, + ) -> Result { + let projected_schema = match projection { + Some(p) => Arc::new(schema.project(p)?), + None => schema, + }; + + Ok(Self { + cache_endpoint, + projected_schema, + projection: projection.cloned().map(Into::into), + filters, + limit, + }) + } +} + +#[async_trait] +impl ExecutionPlan for CacheEndpointExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.projected_schema.clone() + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(1) + } + + fn unbounded_output(&self, _children: &[bool]) -> Result { + Ok(false) + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> Result> { + unreachable!("Children cannot be replaced in {self:?}") + } + + fn execute( + &self, + _partition: usize, + _ctx: Arc, + ) -> Result { + let stream = futures_util::stream::iter({ + let cache_reader = &self.cache_endpoint.cache_reader(); + let mut expr = QueryExpression::default(); + expr.limit = self.limit; + expr.filter = predicate_pushdown(self.filters.iter()); + debug!("Using predicate pushdown {:?}", expr.filter); + let records = get_records( + cache_reader, + &mut expr, + &self.cache_endpoint.endpoint.name, + None, + ) + .unwrap(); + + transpose(cache_reader.get_schema().0.clone(), records) + }); + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.projected_schema.clone(), + match self.projection.clone() { + Some(projection) => Box::pin(stream.map(move |x| { + x.and_then(|b| b.project(projection.as_ref()).map_err(Into::into)) + })) as BoxStream<'_, Result>, + None => Box::pin(stream), + }, + ))) + } + + fn statistics(&self) -> Statistics { + Default::default() + } +} + +impl DisplayAs for CacheEndpointExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "CacheEndpointExec",) + } + } + } +} + +fn transpose( + schema: DozerSchema, + records: Vec, +) -> impl Iterator> { + records.into_iter().map(move |CacheRecord { record, .. }| { + map_record_to_arrow(record, &schema).map_err(DataFusionError::ArrowError) + }) +} + +#[derive(Debug)] +pub struct PgTypesView { + schema: SchemaRef, +} + +impl PgTypesView { + pub fn new() -> Self { + Self { + schema: Arc::new(Schema::new(vec![ + Field::new("oid", DataType::Utf8, false), + Field::new("typname", DataType::Utf8, false), + Field::new("typnamespace", DataType::Utf8, false), + Field::new("typowner", DataType::Utf8, false), + Field::new("typlen", DataType::Int16, false), + Field::new("typbyval", DataType::Boolean, false), + Field::new("typtype", DataType::Utf8, false), + Field::new("typcategory", DataType::Utf8, false), + Field::new("typispreferred", DataType::Boolean, false), + Field::new("typisdefined", DataType::Boolean, false), + Field::new("typdelim", DataType::Utf8, false), + Field::new("typrelid", DataType::Utf8, false), + Field::new("typelem", DataType::Utf8, false), + Field::new("typarray", DataType::Utf8, false), + Field::new("typinput", DataType::Utf8, false), + Field::new("typoutput", DataType::Utf8, false), + Field::new("typreceive", DataType::Utf8, false), + Field::new("typsend", DataType::Utf8, false), + Field::new("typmodin", DataType::Utf8, false), + Field::new("typmodout", DataType::Utf8, false), + Field::new("typanalyze", DataType::Utf8, false), + Field::new("typalign", DataType::Utf8, false), + Field::new("typstorage", DataType::Utf8, false), + Field::new("typnotnull", DataType::Boolean, false), + Field::new("typbasetype", DataType::Utf8, false), + Field::new("typtypmod", DataType::Int32, false), + Field::new("typndims", DataType::Int32, false), + Field::new("typcollation", DataType::Utf8, false), + Field::new("typdefaultbin", DataType::Binary, true), + Field::new("typdefault", DataType::Utf8, true), + Field::new("typacl", DataType::Utf8, true), + ])), + } + } +} + +#[async_trait] +impl TableProvider for PgTypesView { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &SessionState, + projection: Option<&Vec>, + // filters and limit can be used here to inject some push-down operations if needed + _filters: &[Expr], + _limit: Option, + ) -> Result> { + Ok(Arc::new(MemoryExec::try_new( + &vec![], + self.schema.clone(), + projection.cloned(), + )?)) + } +} diff --git a/dozer-api/src/pgwire/mod.rs b/dozer-api/src/pgwire/mod.rs new file mode 100644 index 0000000000..a22608ee13 --- /dev/null +++ b/dozer-api/src/pgwire/mod.rs @@ -0,0 +1,515 @@ +mod datafusion; +mod predicate_pushdown; + +use std::sync::Arc; + +use ::datafusion::arrow::datatypes::DECIMAL128_MAX_PRECISION; +use ::datafusion::error::DataFusionError; +use async_trait::async_trait; +use dozer_types::arrow::array::{ + Array, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array, Decimal256Array, + DurationMicrosecondArray, DurationMillisecondArray, DurationNanosecondArray, + DurationSecondArray, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array, + Int16Array, Int32Array, Int8Array, IntervalDayTimeArray, IntervalMonthDayNanoArray, + IntervalYearMonthArray, LargeStringArray, StringArray, Time32MillisecondArray, + Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array, + UInt32Array, UInt64Array, UInt8Array, +}; +use dozer_types::arrow::array::{Int64Array, LargeBinaryArray}; +use dozer_types::arrow::datatypes::{DataType, IntervalUnit}; +use dozer_types::arrow::datatypes::{TimeUnit, DECIMAL_DEFAULT_SCALE}; +use dozer_types::log::{debug, info}; +use dozer_types::models::api_config::{default_host, default_sql_port, SqlOptions}; +use dozer_types::rust_decimal::Decimal; +use futures_util::stream::BoxStream; +use futures_util::{stream, StreamExt}; +use pgwire::api::portal::Portal; +use pgwire::api::stmt::NoopQueryParser; +use pgwire::api::store::MemPortalStore; +use pgwire::messages::data::DataRow; +use tokio::net::TcpListener; + +use pgwire::api::auth::noop::NoopStartupHandler; +use pgwire::api::query::{ExtendedQueryHandler, SimpleQueryHandler, StatementOrPortal}; +use pgwire::api::results::{ + DataRowEncoder, DescribeResponse, FieldFormat, FieldInfo, QueryResponse, Response, +}; +use pgwire::api::{ClientInfo, MakeHandler, StatelessMakeHandler, Type}; +use pgwire::error::{ErrorInfo, PgWireError, PgWireResult}; +use pgwire::tokio::process_socket; +use tokio::select; + +use crate::pgwire::datafusion::SQLExecutor; +use crate::shutdown::ShutdownReceiver; +use crate::CacheEndpoint; + +pub struct PgWireServer { + config: SqlOptions, +} + +impl PgWireServer { + pub fn new(config: SqlOptions) -> Self { + Self { config } + } + + pub async fn run( + &self, + shutdown: ShutdownReceiver, + cache_endpoints: Vec>, + ) -> std::io::Result<()> { + let config = self.config.clone(); + let query_processor = Arc::new(QueryProcessor::new(cache_endpoints)); + let processor = Arc::new(StatelessMakeHandler::new(query_processor.clone())); + // We have not implemented extended query in this server, use placeholder instead + let placeholder = Arc::new(StatelessMakeHandler::new(query_processor)); + let authenticator = Arc::new(StatelessMakeHandler::new(Arc::new(NoopStartupHandler))); + + let host = config.host.unwrap_or_else(default_host); + let port = config.port.unwrap_or_else(default_sql_port); + let server_addr = format!("{}:{}", host, port); + let listener = TcpListener::bind(&server_addr).await?; + info!("Starting Postgres Wire Protocol Server on {server_addr}"); + loop { + select! { + accept_result = listener.accept() => { + let incoming_socket = accept_result?; + let authenticator_ref = authenticator.make(); + let processor_ref = processor.make(); + let placeholder_ref = placeholder.make(); + tokio::spawn(async move { + process_socket( + incoming_socket.0, + None, + authenticator_ref, + processor_ref, + placeholder_ref, + ) + .await + }); + } + _ = shutdown.create_shutdown_future() => break + } + } + Ok(()) + } +} + +struct QueryProcessor { + sql_executor: SQLExecutor, + portal_store: Arc>, + query_parser: Arc, +} + +impl QueryProcessor { + pub fn new(cache_endpoints: Vec>) -> Self { + let sql_executor = SQLExecutor::new(cache_endpoints); + Self { + sql_executor, + portal_store: Arc::new(MemPortalStore::new()), + query_parser: Arc::new(NoopQueryParser::new()), + } + } + + async fn execute_query<'a>(&self, query: String) -> PgWireResult> { + debug!("Executing SQL query {:?}", query); + + fn error_info(err: DataFusionError) -> Box { + Box::new(generic_error_info(err.to_string())) + } + let result = self.sql_executor.execute(&query).await; + if let Err(err) = result { + return Ok(Response::Error(error_info(err))); + } + + let dataframe = result.unwrap(); + let schema = Arc::new( + dataframe + .schema() + .fields() + .iter() + .map(|field| { + let datatype = map_data_type(field.data_type()); + FieldInfo::new( + field.name().clone(), + None, + None, + datatype, + FieldFormat::Text, + ) + }) + .collect::>(), + ); + + let result = dataframe.execute_stream().await; + if let Err(err) = result { + return Ok(Response::Error(error_info(err))); + } + + let recordbatch_stream = result.unwrap(); + let schema_ref = schema.clone(); + let data_row_stream = recordbatch_stream + .map(move |recordbatch_result| { + if let Err(err) = recordbatch_result { + return Box::pin(stream::once(async move { + Err(PgWireError::UserError(error_info(err))) + })) as BoxStream<'_, Result>; + } + let recordbatch = recordbatch_result.unwrap(); + let datafusion_schema = recordbatch.schema(); + let pgwire_schema = schema_ref.clone(); + Box::pin(stream::iter((0..recordbatch.num_rows()).map(move |i| { + let mut encoder = DataRowEncoder::new(pgwire_schema.clone()); + for (j, column) in recordbatch.columns().iter().enumerate() { + encode_field( + &mut encoder, + &column, + datafusion_schema.fields()[j].data_type(), + i, + )? + } + encoder.finish() + }))) as BoxStream<'_, Result> + }) + .flatten(); + + Ok(Response::Query(QueryResponse::new(schema, data_row_stream))) + } +} + +#[async_trait] +impl SimpleQueryHandler for QueryProcessor { + async fn do_query<'a, C>(&self, _client: &C, query: &'a str) -> PgWireResult>> + where + C: ClientInfo + Unpin + Send + Sync, + { + self.execute_query(query.to_string()) + .await + .map(|response| vec![response]) + } +} + +#[async_trait] +impl ExtendedQueryHandler for QueryProcessor { + type Statement = String; + type PortalStore = MemPortalStore; + type QueryParser = NoopQueryParser; + + fn portal_store(&self) -> Arc { + self.portal_store.clone() + } + + fn query_parser(&self) -> Arc { + self.query_parser.clone() + } + + async fn do_query<'a, 'b: 'a, C>( + &'b self, + _client: &mut C, + portal: &'a Portal, + _max_rows: usize, + ) -> PgWireResult> + where + C: ClientInfo + Unpin + Send + Sync, + { + let query = portal.statement().statement(); + self.execute_query(query.to_string()).await + } + + async fn do_describe( + &self, + _client: &mut C, + target: StatementOrPortal<'_, Self::Statement>, + ) -> PgWireResult + where + C: ClientInfo + Unpin + Send + Sync, + { + let _query = match target { + StatementOrPortal::Statement(stmt) => { + let query = stmt.statement(); + query + } + StatementOrPortal::Portal(portal) => { + let query = portal.statement().statement(); + query + } + }; + // TODO: proper implementations + Ok(DescribeResponse::new(None, Vec::new())) + // unimplemented!("Extended Query is not implemented on this server.") + } +} + +fn map_data_type(datafusion_type: &DataType) -> Type { + match datafusion_type { + DataType::Null => Type::BOOL, + DataType::Boolean => Type::BOOL, + DataType::Int8 => Type::INT2, + DataType::Int16 => Type::INT2, + DataType::Int32 => Type::INT4, + DataType::Int64 => Type::INT8, + DataType::UInt8 => Type::INT2, + DataType::UInt16 => Type::INT4, + DataType::UInt32 => Type::INT8, + DataType::UInt64 => Type::NUMERIC, + DataType::Float16 => Type::FLOAT4, + DataType::Float32 => Type::FLOAT4, + DataType::Float64 => Type::FLOAT8, + DataType::Timestamp(_, None) => Type::TIMESTAMP, + DataType::Timestamp(_, Some(_)) => Type::TIMESTAMPTZ, + DataType::Date32 => Type::DATE, + DataType::Date64 => Type::DATE, + DataType::Time32(_) => Type::TIME, + DataType::Time64(_) => Type::TIME, + DataType::Duration(_) => Type::INTERVAL, + DataType::Interval(_) => Type::INTERVAL, + DataType::Binary => Type::BYTEA, + DataType::FixedSizeBinary(_) => Type::BYTEA, + DataType::LargeBinary => Type::BYTEA, + DataType::Utf8 => Type::VARCHAR, + DataType::LargeUtf8 => Type::VARCHAR, + DataType::Decimal128(_, _) => Type::NUMERIC, + DataType::Decimal256(_, _) => Type::NUMERIC, + DataType::List(_) + | DataType::FixedSizeList(_, _) + | DataType::LargeList(_) + | DataType::Struct(_) + | DataType::Union(_, _) + | DataType::Dictionary(_, _) + | DataType::Map(_, _) + | DataType::RunEndEncoded(_, _) => unimplemented!(), + } +} + +macro_rules! cast_array { + ($array:tt as $type:tt) => { + $array.as_any().downcast_ref::<$type>().unwrap() + }; +} + +fn encode_field( + encoder: &mut DataRowEncoder, + column_data: &dyn Array, + column_data_type: &DataType, + row_index: usize, +) -> Result<(), PgWireError> { + match column_data_type { + DataType::Null => encoder.encode_field(&None::), + DataType::Boolean => { + encoder.encode_field(&cast_array!(column_data as BooleanArray).value(row_index)) + } + DataType::Int8 => { + encoder.encode_field(&cast_array!(column_data as Int8Array).value(row_index)) + } + DataType::Int16 => { + encoder.encode_field(&cast_array!(column_data as Int16Array).value(row_index)) + } + DataType::Int32 => { + encoder.encode_field(&cast_array!(column_data as Int32Array).value(row_index)) + } + DataType::Int64 => { + encoder.encode_field(&cast_array!(column_data as Int64Array).value(row_index)) + } + DataType::UInt8 => encoder.encode_field( + &cast_array!(column_data as UInt8Array) + .value(row_index) + .to_string(), + ), + DataType::UInt16 => encoder.encode_field( + &cast_array!(column_data as UInt16Array) + .value(row_index) + .to_string(), + ), + DataType::UInt32 => { + encoder.encode_field(&cast_array!(column_data as UInt32Array).value(row_index)) + } + DataType::UInt64 => encoder.encode_field( + &cast_array!(column_data as UInt64Array) + .value(row_index) + .to_string(), + ), + DataType::Float16 => encoder.encode_field( + &cast_array!(column_data as Float16Array) + .value(row_index) + .to_f32(), + ), + DataType::Float32 => { + encoder.encode_field(&cast_array!(column_data as Float32Array).value(row_index)) + } + DataType::Float64 => { + encoder.encode_field(&cast_array!(column_data as Float64Array).value(row_index)) + } + DataType::Timestamp(TimeUnit::Second, _) => { + encoder.encode_field(&cast_array!(column_data as TimestampSecondArray).value(row_index)) + } + DataType::Timestamp(TimeUnit::Millisecond, _) => encoder.encode_field( + &chrono::NaiveDateTime::from_timestamp_millis( + cast_array!(column_data as TimestampMillisecondArray).value(row_index), + ) + .unwrap(), + ), + DataType::Timestamp(TimeUnit::Microsecond, _) => encoder.encode_field( + &(chrono::NaiveDateTime::from_timestamp_micros( + cast_array!(column_data as TimestampMicrosecondArray).value(row_index), + ) + .unwrap()), + ), + DataType::Timestamp(TimeUnit::Nanosecond, _) => encoder.encode_field(&{ + let nsecs = cast_array!(column_data as TimestampNanosecondArray).value(row_index); + let secs = nsecs.div_euclid(1_000_000_000); + let nsecs = nsecs.rem_euclid(1_000_000_000) as u32; + chrono::NaiveDateTime::from_timestamp_opt(secs, nsecs).unwrap() + }), + DataType::Date32 => encoder.encode_field( + &chrono::NaiveDate::from_ymd_opt( + 0, + 0, + cast_array!(column_data as Date32Array).value(row_index) as u32, + ) + .unwrap(), + ), + DataType::Date64 => encoder.encode_field( + &chrono::NaiveDateTime::from_timestamp_millis( + cast_array!(column_data as Date64Array).value(row_index), + ) + .unwrap() + .date(), + ), + DataType::Time32(TimeUnit::Second) => encoder.encode_field( + &chrono::NaiveTime::from_num_seconds_from_midnight_opt( + cast_array!(column_data as Time32SecondArray).value(row_index) as u32, + 0, + ) + .unwrap(), + ), + DataType::Time32(TimeUnit::Millisecond) => { + encoder.encode_field(&chrono::NaiveTime::from_hms_milli_opt( + 0, + 0, + 0, + cast_array!(column_data as Time32MillisecondArray).value(row_index) as u32, + )) + } + DataType::Time64(TimeUnit::Microsecond) => encoder.encode_field({ + let micros = cast_array!(column_data as Time64MicrosecondArray).value(row_index); + let secs = micros.div_euclid(1_000_000) as u32; + let micros = micros.rem_euclid(1_000_000) as u32; + &chrono::NaiveTime::from_hms_micro_opt(0, 0, secs, micros) + }), + DataType::Time64(TimeUnit::Nanosecond) => encoder.encode_field({ + let nanos = cast_array!(column_data as Time64NanosecondArray).value(row_index); + let secs = nanos.div_euclid(1_000_000_000) as u32; + let nanos = nanos.rem_euclid(1_000_000_000) as u32; + &chrono::NaiveTime::from_hms_nano_opt(0, 0, secs, nanos) + }), + DataType::Duration(TimeUnit::Second) => encoder.encode_field({ + let secs = cast_array!(column_data as DurationSecondArray).value(row_index); + &Iso8601Duration::Duration(chrono::Duration::seconds(secs)).to_string() + }), + DataType::Duration(TimeUnit::Millisecond) => encoder.encode_field({ + let millis = cast_array!(column_data as DurationMillisecondArray).value(row_index); + &Iso8601Duration::Duration(chrono::Duration::milliseconds(millis)).to_string() + }), + DataType::Duration(TimeUnit::Microsecond) => encoder.encode_field({ + let micros = cast_array!(column_data as DurationMicrosecondArray).value(row_index); + &Iso8601Duration::Duration(chrono::Duration::microseconds(micros)).to_string() + }), + DataType::Duration(TimeUnit::Nanosecond) => encoder.encode_field({ + let nanos = cast_array!(column_data as DurationNanosecondArray).value(row_index); + &Iso8601Duration::Duration(chrono::Duration::nanoseconds(nanos)).to_string() + }), + DataType::Interval(IntervalUnit::DayTime) => encoder.encode_field({ + let value = cast_array!(column_data as IntervalDayTimeArray).value(row_index); + let (days, milliseconds) = (value as i32, (value >> 32) as i32); + &Iso8601Duration::DaysMilliseconds(days, milliseconds).to_string() + }), + DataType::Interval(IntervalUnit::MonthDayNano) => encoder.encode_field({ + let value = cast_array!(column_data as IntervalMonthDayNanoArray).value(row_index); + let (months, days, nanoseconds) = + (value as i32, (value >> 32) as i32, (value >> 64) as i64); + &Iso8601Duration::MonthsDaysNanoseconds(months, days, nanoseconds).to_string() + }), + DataType::Interval(IntervalUnit::YearMonth) => encoder.encode_field({ + let months = cast_array!(column_data as IntervalYearMonthArray).value(row_index); + &Iso8601Duration::Months(months).to_string() + }), + DataType::Binary => { + encoder.encode_field(&cast_array!(column_data as BinaryArray).value(row_index)) + } + DataType::FixedSizeBinary(_) => { + encoder.encode_field(&cast_array!(column_data as FixedSizeBinaryArray).value(row_index)) + } + DataType::LargeBinary => { + encoder.encode_field(&cast_array!(column_data as LargeBinaryArray).value(row_index)) + } + DataType::Utf8 => { + encoder.encode_field(&cast_array!(column_data as StringArray).value(row_index)) + } + DataType::LargeUtf8 => { + encoder.encode_field(&cast_array!(column_data as LargeStringArray).value(row_index)) + } + DataType::Decimal128(_, scale) => encoder.encode_field({ + let value = cast_array!(column_data as Decimal128Array).value(row_index); + &Decimal::from_i128_with_scale(value, *scale as u32).to_string() + }), + DataType::Decimal256(_, _) => encoder.encode_field({ + let array = cast_array!(column_data as Decimal256Array).slice(row_index, 1); + let precision = DECIMAL128_MAX_PRECISION; + let scale = DECIMAL_DEFAULT_SCALE; + let array = + dozer_types::arrow_cast::cast(&array, &DataType::Decimal128(precision, scale)) + .map_err(|err| { + PgWireError::UserError(Box::new(generic_error_info(err.to_string()))) + })?; + let value = cast_array!(array as Decimal128Array).value(0); + &Decimal::from_i128_with_scale(value, scale as u32).to_string() + }), + + DataType::List(_) + | DataType::FixedSizeList(_, _) + | DataType::LargeList(_) + | DataType::Struct(_) + | DataType::Union(_, _) + | DataType::Dictionary(_, _) + | DataType::Map(_, _) + | DataType::RunEndEncoded(_, _) => unimplemented!(), + + DataType::Time32(TimeUnit::Microsecond) + | DataType::Time32(TimeUnit::Nanosecond) + | DataType::Time64(TimeUnit::Second) + | DataType::Time64(TimeUnit::Millisecond) => unreachable!(), + } +} + +#[derive(Debug)] +enum Iso8601Duration { + Duration(chrono::Duration), + DaysMilliseconds(i32, i32), + MonthsDaysNanoseconds(i32, i32, i64), + Months(i32), +} + +impl std::fmt::Display for Iso8601Duration { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Iso8601Duration::Duration(d) => std::fmt::Display::fmt(&d, f), + Iso8601Duration::DaysMilliseconds(days, msecs) => { + let secs = msecs.div_euclid(1_000); + let msecs = msecs.rem_euclid(1_000); + write!(f, "P{}DT{}.{:03}S", days, secs, msecs) + } + Iso8601Duration::MonthsDaysNanoseconds(months, days, nanos) => { + let secs = nanos.div_euclid(1_000_000_000); + let nanos = nanos.rem_euclid(1_000_000_000); + write!(f, "P{}M{}DT{}.{:09}S", months, days, secs, nanos) + } + Iso8601Duration::Months(months) => { + write!(f, "P{}M", months) + } + } + } +} + +fn generic_error_info(err: String) -> ErrorInfo { + ErrorInfo::new("ERROR".to_string(), "2F000".to_string(), err) +} diff --git a/dozer-api/src/pgwire/predicate_pushdown.rs b/dozer-api/src/pgwire/predicate_pushdown.rs new file mode 100644 index 0000000000..8f72ec9c52 --- /dev/null +++ b/dozer-api/src/pgwire/predicate_pushdown.rs @@ -0,0 +1,310 @@ +use std::sync::Arc; + +use datafusion::arrow::datatypes::{Decimal128Type, Decimal256Type, DecimalType}; +use datafusion::scalar::ScalarValue; +use datafusion_expr::{Between, BinaryExpr, Expr, Operator, TableProviderFilterPushDown}; +use dozer_cache::cache::expression::{ + FilterExpression, Operator as CacheFilterOperator, QueryExpression, +}; +use dozer_types::log::debug; + +use crate::api_helper::get_records; +use crate::CacheEndpoint; + +pub(crate) fn supports_predicates_pushdown( + exprs: &[&Expr], + cache_endpoint: Arc, +) -> Vec { + // preliminary check for potential predicate pushdown support + let results = exprs + .iter() + .copied() + .map(supports_predicate_pushdown) + .collect::>(); + + if results + .iter() + .all(|result| matches!(result, TableProviderFilterPushDown::Unsupported)) + { + return results; + } + + let supported_exprs = exprs + .iter() + .zip(results.iter()) + .filter_map(|(expr, result)| { + (!matches!(result, TableProviderFilterPushDown::Unsupported)).then_some(expr) + }) + .copied(); + + // test a query with predicate pushdown to check if there are missing indexes + let mut query_expr = QueryExpression::with_limit(0); + query_expr.filter = predicate_pushdown(supported_exprs); + let query_result = get_records( + &cache_endpoint.cache_reader(), + &mut query_expr, + &cache_endpoint.endpoint.name, + None, + ); + + if let Err(err) = query_result { + debug!("Predicate pushdown failed due to {err}"); + vec![TableProviderFilterPushDown::Unsupported; exprs.len()] + } else { + results + } +} + +fn supports_predicate_pushdown(expr: &Expr) -> TableProviderFilterPushDown { + let is_applicable = match expr { + Expr::BinaryExpr(BinaryExpr { left, op, right }) => { + matches!( + op, + Operator::Eq | Operator::Gt | Operator::GtEq | Operator::Lt | Operator::LtEq + ) && matches!((&**left, &**right), + (Expr::Column(_), Expr::Literal(v)) | (Expr::Literal(v), Expr::Column(_)) if is_suitable_for_pushdown(v)) + } + + Expr::Like(_) => todo!(), + + Expr::IsNull(expr) | Expr::IsTrue(expr) | Expr::IsFalse(expr) => { + matches!(&**expr, Expr::Column(_)) + } + + Expr::Between(Between { + expr, + negated: false, + low, + high, + }) => { + matches!( + (&**expr, &**low, &**high), + (Expr::Column(_), Expr::Literal(v1), Expr::Literal(v2)) if [v1, v2].into_iter().all(is_suitable_for_pushdown) + ) + } + + _ => false, + }; + + debug!( + "Predicate pushdown is {}possible for {expr:?}", + if is_applicable { "" } else { "not " } + ); + + if is_applicable { + TableProviderFilterPushDown::Exact + } else { + TableProviderFilterPushDown::Unsupported + } +} + +pub(crate) fn predicate_pushdown<'a>( + predicates: impl Iterator, +) -> Option { + use serde_json::Value; + let mut and_list = Vec::new(); + predicates.into_iter().for_each(|expr| match expr { + Expr::BinaryExpr(BinaryExpr { left, op, right }) => { + let (c, op, v) = match (&**left, &**right) { + (Expr::Column(c), Expr::Literal(v)) => { + let op = match op { + Operator::Eq => CacheFilterOperator::EQ, + Operator::Lt => CacheFilterOperator::LT, + Operator::LtEq => CacheFilterOperator::LTE, + Operator::Gt => CacheFilterOperator::GT, + Operator::GtEq => CacheFilterOperator::GTE, + _ => unreachable!(), + }; + (c, op, v) + } + (Expr::Literal(v), Expr::Column(c)) => { + let op = match op { + Operator::Eq => CacheFilterOperator::EQ, + Operator::Lt => CacheFilterOperator::GT, + Operator::LtEq => CacheFilterOperator::GTE, + Operator::Gt => CacheFilterOperator::LT, + Operator::GtEq => CacheFilterOperator::LTE, + _ => unreachable!(), + }; + (c, op, v) + } + _ => unreachable!(), + }; + + let column = c.name.clone(); + let value = serde_json_value_from_scalar_value(v.clone()); + and_list.push(FilterExpression::Simple(column, op, value)) + } + + Expr::IsNull(expr) => match &**expr { + Expr::Column(c) => and_list.push(FilterExpression::Simple( + c.name.clone(), + CacheFilterOperator::EQ, + Value::Null, + )), + _ => unreachable!(), + }, + + Expr::IsTrue(expr) => match &**expr { + Expr::Column(c) => and_list.push(FilterExpression::Simple( + c.name.clone(), + CacheFilterOperator::EQ, + Value::Bool(true), + )), + _ => unreachable!(), + }, + + Expr::IsFalse(expr) => match &**expr { + Expr::Column(c) => and_list.push(FilterExpression::Simple( + c.name.clone(), + CacheFilterOperator::EQ, + Value::Bool(false), + )), + _ => unreachable!(), + }, + + Expr::Between(Between { + expr, + negated: false, + low, + high, + }) => match (&**expr, &**low, &**high) { + (Expr::Column(c), Expr::Literal(low), Expr::Literal(high)) => { + let column = c.name.clone(); + let low = serde_json_value_from_scalar_value(low.clone()); + let high = serde_json_value_from_scalar_value(high.clone()); + and_list.push(FilterExpression::Simple( + column.clone(), + CacheFilterOperator::GTE, + low, + )); + and_list.push(FilterExpression::Simple( + column, + CacheFilterOperator::LTE, + high, + )); + } + _ => unreachable!(), + }, + + Expr::Like(_) => todo!(), + + _ => unreachable!(), + }); + + if and_list.is_empty() { + None + } else if and_list.len() == 1 { + Some(and_list.pop().unwrap()) + } else { + Some(FilterExpression::And(and_list)) + } +} + +fn is_suitable_for_pushdown(value: &ScalarValue) -> bool { + match value { + ScalarValue::Null + | ScalarValue::Boolean(_) + | ScalarValue::Float32(None) + | ScalarValue::Float64(None) + | ScalarValue::Int8(_) + | ScalarValue::Int16(_) + | ScalarValue::Int32(_) + | ScalarValue::Int64(_) + | ScalarValue::UInt8(_) + | ScalarValue::UInt16(_) + | ScalarValue::UInt32(_) + | ScalarValue::UInt64(_) + | ScalarValue::Utf8(_) + | ScalarValue::LargeUtf8(_) + | ScalarValue::Binary(_) + | ScalarValue::FixedSizeBinary(_, _) + | ScalarValue::LargeBinary(_) => true, + + ScalarValue::Float32(Some(f)) if f.is_finite() => true, + ScalarValue::Float64(Some(f)) if f.is_finite() => true, + + _ => false, + } +} + +fn serde_json_value_from_scalar_value(value: ScalarValue) -> serde_json::Value { + use serde_json::{Number, Value}; + let is_null = matches!( + &value, + ScalarValue::Null + | ScalarValue::Boolean(None) + | ScalarValue::Float32(None) + | ScalarValue::Float64(None) + | ScalarValue::Decimal128(None, _, _) + | ScalarValue::Decimal256(None, _, _) + | ScalarValue::Int8(None) + | ScalarValue::Int16(None) + | ScalarValue::Int32(None) + | ScalarValue::Int64(None) + | ScalarValue::UInt8(None) + | ScalarValue::UInt16(None) + | ScalarValue::UInt32(None) + | ScalarValue::UInt64(None) + | ScalarValue::Utf8(None) + | ScalarValue::LargeUtf8(None) + | ScalarValue::Binary(None) + | ScalarValue::FixedSizeBinary(_, None) + | ScalarValue::LargeBinary(None) + ); + if is_null { + Value::Null + } else { + match value { + ScalarValue::Null => Value::Null, + ScalarValue::Boolean(Some(v)) => Value::Bool(v), + ScalarValue::Float32(Some(v)) => { + Value::Number(Number::from_string_unchecked(v.to_string())) + } + ScalarValue::Float64(Some(v)) => { + Value::Number(Number::from_string_unchecked(v.to_string())) + } + ScalarValue::Int8(Some(v)) => { + Value::Number(Number::from_string_unchecked(v.to_string())) + } + ScalarValue::Decimal128(Some(v), precision, scale) => { + Value::Number(Number::from_string_unchecked( + ::format_decimal(v, precision, scale), + )) + } + ScalarValue::Decimal256(Some(v), precision, scale) => { + Value::Number(Number::from_string_unchecked( + ::format_decimal(v, precision, scale), + )) + } + ScalarValue::Int16(Some(v)) => { + Value::Number(Number::from_string_unchecked(v.to_string())) + } + ScalarValue::Int32(Some(v)) => { + Value::Number(Number::from_string_unchecked(v.to_string())) + } + ScalarValue::Int64(Some(v)) => { + Value::Number(Number::from_string_unchecked(v.to_string())) + } + ScalarValue::UInt8(Some(v)) => { + Value::Number(Number::from_string_unchecked(v.to_string())) + } + ScalarValue::UInt16(Some(v)) => { + Value::Number(Number::from_string_unchecked(v.to_string())) + } + ScalarValue::UInt32(Some(v)) => { + Value::Number(Number::from_string_unchecked(v.to_string())) + } + ScalarValue::UInt64(Some(v)) => { + Value::Number(Number::from_string_unchecked(v.to_string())) + } + ScalarValue::Utf8(Some(v)) | ScalarValue::LargeUtf8(Some(v)) => Value::String(v), + ScalarValue::Binary(Some(v)) + | ScalarValue::FixedSizeBinary(_, Some(v)) + | ScalarValue::LargeBinary(Some(v)) => { + Value::String(String::from_utf8_lossy(&v).into()) + } + _ => unreachable!(), + } + } +} diff --git a/dozer-cli/src/errors.rs b/dozer-cli/src/errors.rs index 7a96e733a4..f5b51579e8 100644 --- a/dozer-cli/src/errors.rs +++ b/dozer-cli/src/errors.rs @@ -61,6 +61,8 @@ pub enum OrchestrationError { RestServeFailed(#[source] std::io::Error), #[error("Failed to server gRPC API: {0:?}")] GrpcServeFailed(#[source] tonic::transport::Error), + #[error("Failed to server pgwire: {0}")] + PGWireServerFailed(#[source] std::io::Error), #[error("Failed to initialize internal server: {0}")] InternalServerFailed(#[source] GrpcError), #[error("{0}: Failed to initialize cache. Have you run `dozer build`?")] diff --git a/dozer-cli/src/simple/orchestrator.rs b/dozer-cli/src/simple/orchestrator.rs index 8ae0ee608a..512eb0de32 100644 --- a/dozer-cli/src/simple/orchestrator.rs +++ b/dozer-cli/src/simple/orchestrator.rs @@ -14,7 +14,7 @@ use crate::{flatten_join_handle, join_handle_map_err}; use dozer_api::auth::{Access, Authorizer}; use dozer_api::grpc::internal::internal_pipeline_server::start_internal_pipeline_server; use dozer_api::shutdown::ShutdownReceiver; -use dozer_api::{get_api_security, grpc, rest, CacheEndpoint}; +use dozer_api::{get_api_security, grpc, pgwire, rest, CacheEndpoint}; use dozer_cache::cache::LmdbRwCacheManager; use dozer_cache::dozer_log::camino::Utf8PathBuf; use dozer_cache::dozer_log::home_dir::HomeDir; @@ -157,7 +157,7 @@ impl SimpleOrchestrator { let grpc_server = grpc::ApiServer::new(grpc_config, api_security, flags); let grpc_server = grpc_server .run( - cache_endpoints, + cache_endpoints.clone(), shutdown.clone(), operations_receiver, self.labels.clone(), @@ -174,8 +174,20 @@ impl SimpleOrchestrator { tokio::spawn(async move { Ok::<(), OrchestrationError>(()) }) }; + let pgwire_handle = { + let pgwire_config = self.config.api.sql.clone(); + let pgwire_server = pgwire::PgWireServer::new(pgwire_config); + tokio::spawn(async move { + pgwire_server + .run(shutdown, cache_endpoints) + .await + .map_err(OrchestrationError::PGWireServerFailed) + }) + }; + futures.push(flatten_join_handle(rest_handle)); futures.push(flatten_join_handle(grpc_handle)); + futures.push(flatten_join_handle(pgwire_handle)); while let Some(result) = futures.next().await { result?; diff --git a/dozer-ingestion/Cargo.toml b/dozer-ingestion/Cargo.toml index 332335b4de..6dd4215271 100644 --- a/dozer-ingestion/Cargo.toml +++ b/dozer-ingestion/Cargo.toml @@ -33,7 +33,7 @@ criterion = { version = "0.4.0", features = ["html_reports"] } serial_test = "1.0.0" dozer-tracing = { path = "../dozer-tracing" } tempdir = "0.3.7" -parquet = "45.0.0" +parquet = "46.0.0" env_logger = "0.10.0" hex = "0.4.3" dozer-utils = { path = "../dozer-utils" } diff --git a/dozer-ingestion/deltalake/Cargo.toml b/dozer-ingestion/deltalake/Cargo.toml index 79fd372458..be4b7525c3 100644 --- a/dozer-ingestion/deltalake/Cargo.toml +++ b/dozer-ingestion/deltalake/Cargo.toml @@ -8,6 +8,6 @@ edition = "2021" [dependencies] dozer-ingestion-connector = { path = "../connector" } dozer-ingestion-object-store = { path = "../object-store" } -deltalake = { version = "0.15.0", default-features = false, features = [ +deltalake = { version = "0.16.2", default-features = false, features = [ "datafusion", ] } diff --git a/dozer-ingestion/object-store/Cargo.toml b/dozer-ingestion/object-store/Cargo.toml index 36389876a2..a0e7599346 100644 --- a/dozer-ingestion/object-store/Cargo.toml +++ b/dozer-ingestion/object-store/Cargo.toml @@ -7,9 +7,9 @@ edition = "2021" [dependencies] dozer-ingestion-connector = { path = "../connector" } -deltalake = { version = "0.15.0", default-features = false, features = [ +deltalake = { version = "0.16.2", default-features = false, features = [ "s3", "datafusion", ] } -object_store = { version = "0.6.1", features = ["aws"] } +object_store = { version = "0.7.1", features = ["aws"] } url = "2.4.1" diff --git a/dozer-types/Cargo.toml b/dozer-types/Cargo.toml index f46e51a3fd..6499c355ae 100644 --- a/dozer-types/Cargo.toml +++ b/dozer-types/Cargo.toml @@ -29,8 +29,9 @@ pyo3 = { version = "0.18.1", optional = true } tonic = { version = "0.10.0" } prost-types = "0.12.0" prost = "0.12.0" -arrow = { version = "45.0.0" } -arrow-schema = { version = "45.0.0", features = ["serde"] } +arrow = { version = "46.0.0" } +arrow-cast = { version = "46.0.0" } +arrow-schema = { version = "46.0.0", features = ["serde"] } tokio-postgres = { version = "0.7.7", features = [ "with-chrono-0_4", "with-geo-types-0_7", diff --git a/dozer-types/src/arrow_types/to_arrow.rs b/dozer-types/src/arrow_types/to_arrow.rs index 370b371268..c740ab4a9e 100644 --- a/dozer-types/src/arrow_types/to_arrow.rs +++ b/dozer-types/src/arrow_types/to_arrow.rs @@ -5,7 +5,9 @@ use arrow::{ datatypes::i256, record_batch::RecordBatch, }; -use arrow_schema::TimeUnit; +use arrow_schema::{ + TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, DECIMAL256_MAX_PRECISION, +}; use std::{collections::HashMap, sync::Arc}; pub const DOZER_SCHEMA_KEY: &str = "dozer_schema"; @@ -35,11 +37,11 @@ pub fn map_record_to_arrow( rec: Record, schema: &Schema, ) -> Result { - let mut rows = vec![]; + let mut columns = vec![]; for (idx, f) in rec.values.iter().enumerate() { let fd = schema.fields.get(idx).unwrap(); - let r = match (f, fd.typ) { + let column = match (f, fd.typ) { (Field::UInt(v), FieldType::UInt) => { Arc::new(arrow_array::UInt64Array::from_iter_values([*v])) as ArrayRef } @@ -76,10 +78,11 @@ pub fn map_record_to_arrow( (Field::Null, FieldType::Text) => Arc::new(arrow_array::LargeStringArray::from(vec![ None as Option, ])) as ArrayRef, - (Field::Decimal(v), FieldType::Decimal) => Arc::new(arrow_array::Decimal256Array::from( - i256::from_string(&v.to_string()) - .map_or(vec![None as Option], |f| vec![Some(f)]), - )) as ArrayRef, + (Field::Decimal(v), FieldType::Decimal) => arrow_cast::cast( + &arrow_array::Decimal128Array::from(vec![v.mantissa()]) + .with_precision_and_scale(DECIMAL128_MAX_PRECISION, v.scale() as i8)?, + &DataType::Decimal256(DECIMAL256_MAX_PRECISION, DECIMAL128_MAX_SCALE), + )?, (Field::Null, FieldType::Decimal) => Arc::new(arrow_array::Decimal256Array::from(vec![ None as Option, ])) as ArrayRef, @@ -133,11 +136,11 @@ pub fn map_record_to_arrow( "Invalid field type {b:?} for the field: {a:?}", )))?, }; - rows.push(r); + columns.push(column); } let schema = map_to_arrow_schema(schema).unwrap(); - RecordBatch::try_new(Arc::new(schema), rows) + RecordBatch::try_new(Arc::new(schema), columns) } // Maps the dozer field type to the arrow data type @@ -153,7 +156,7 @@ pub fn map_field_type(typ: FieldType) -> DataType { FieldType::Boolean => DataType::Boolean, FieldType::String => DataType::Utf8, FieldType::Text => DataType::LargeUtf8, - FieldType::Decimal => DataType::Decimal256(10, 5), // TODO: Map this correctly + FieldType::Decimal => DataType::Decimal256(DECIMAL256_MAX_PRECISION, DECIMAL128_MAX_SCALE), FieldType::Timestamp => DataType::Timestamp(arrow_types::TimeUnit::Nanosecond, None), FieldType::Date => DataType::Date64, FieldType::Binary => DataType::Binary, diff --git a/dozer-types/src/lib.rs b/dozer-types/src/lib.rs index f63914f2f8..0a5fd6a455 100644 --- a/dozer-types/src/lib.rs +++ b/dozer-types/src/lib.rs @@ -19,6 +19,7 @@ pub use helper::json_value_to_field; // Re-exports pub use arrow; +pub use arrow_cast; pub use bincode; pub use bytes; pub use chrono; diff --git a/dozer-types/src/models/api_config.rs b/dozer-types/src/models/api_config.rs index 302b296c92..ad99982e60 100644 --- a/dozer-types/src/models/api_config.rs +++ b/dozer-types/src/models/api_config.rs @@ -18,6 +18,9 @@ pub struct ApiConfig { #[serde(default, skip_serializing_if = "equal_default")] pub app_grpc: AppGrpcOptions, + #[serde(default, skip_serializing_if = "equal_default")] + pub sql: SqlOptions, + #[serde(skip_serializing_if = "Option::is_none")] // max records to be returned from the endpoints pub default_max_num_records: Option, @@ -68,6 +71,16 @@ pub struct AppGrpcOptions { pub host: Option, } +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema, Default)] +#[serde(deny_unknown_fields)] +pub struct SqlOptions { + #[serde(skip_serializing_if = "Option::is_none")] + pub port: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub host: Option, +} + pub fn default_app_grpc_port() -> u32 { 50053 } @@ -84,6 +97,10 @@ pub fn default_rest_port() -> u16 { 8080 } +pub fn default_sql_port() -> u32 { + 50054 +} + pub fn default_host() -> String { "0.0.0.0".to_owned() } diff --git a/json_schemas/dozer.json b/json_schemas/dozer.json index 940b1052d1..d061e18935 100644 --- a/json_schemas/dozer.json +++ b/json_schemas/dozer.json @@ -155,6 +155,9 @@ }, "rest": { "$ref": "#/definitions/RestApiOptions" + }, + "sql": { + "$ref": "#/definitions/SqlOptions" } }, "additionalProperties": false @@ -1741,6 +1744,26 @@ }, "additionalProperties": false }, + "SqlOptions": { + "type": "object", + "properties": { + "host": { + "type": [ + "string", + "null" + ] + }, + "port": { + "type": [ + "integer", + "null" + ], + "format": "uint32", + "minimum": 0.0 + } + }, + "additionalProperties": false + }, "Table": { "type": "object", "required": [