diff --git a/Cargo.lock b/Cargo.lock index c9d267769c..bd56a6e850 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -69,9 +69,9 @@ dependencies = [ [[package]] name = "actix-http" -version = "3.3.1" +version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2079246596c18b4a33e274ae10c0e50613f4d32a4198e09c7b93771013fed74" +checksum = "a92ef85799cba03f76e4f7c10f533e66d87c9a7e7055f3391f09000ad8351bc9" dependencies = [ "actix-codec", "actix-rt", @@ -80,7 +80,7 @@ dependencies = [ "actix-utils", "ahash 0.8.3", "base64 0.21.0", - "bitflags 1.3.2", + "bitflags 2.4.0", "brotli", "bytes", "bytestring", @@ -171,19 +171,22 @@ dependencies = [ [[package]] name = "actix-tls" -version = "3.0.3" +version = "3.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fde0cf292f7cdc7f070803cb9a0d45c018441321a78b1042ffbbb81ec333297" +checksum = "72616e7fbec0aa99c6f3164677fa48ff5a60036d0799c98cab894a44f3e0efc3" dependencies = [ - "actix-codec", "actix-rt", "actix-service", "actix-utils", "futures-core", - "log", + "impl-more", "pin-project-lite", + "rustls 0.21.7", + "rustls-webpki 0.101.5", + "tokio", "tokio-rustls 0.23.4", "tokio-util 0.7.8", + "tracing", "webpki-roots 0.22.6", ] @@ -688,26 +691,13 @@ version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "619743e34b5ba4e9703bba34deac3427c72507c7159f5fd030aea8cac0cfe341" -[[package]] -name = "async-compression" -version = "0.3.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "942c7cd7ae39e91bde4820d74132e9862e62c2f386c3aa90ccf55949f5bad63a" -dependencies = [ - "brotli", - "flate2", - "futures-core", - "memchr", - "pin-project-lite", - "tokio", -] - [[package]] name = "async-compression" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62b74f44609f0f91493e3082d3734d98497e094777144380ea4db9f9905dd5b6" dependencies = [ + "brotli", "bzip2", "flate2", "futures-core", @@ -1812,8 +1802,8 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2895653b4d9f1538a83970077cb01dfc77a4810524e51a110944688e916b18e" dependencies = [ - "prost", - "prost-types", + "prost 0.11.9", + "prost-types 0.11.9", "tonic 0.9.2", "tracing-core", ] @@ -1830,7 +1820,7 @@ dependencies = [ "futures", "hdrhistogram", "humantime", - "prost-types", + "prost-types 0.11.9", "serde", "serde_json", "thread_local", @@ -2273,7 +2263,7 @@ dependencies = [ "arrow", "arrow-array", "arrow-schema", - "async-compression 0.4.1", + "async-compression", "async-trait", "bytes", "bzip2", @@ -2423,7 +2413,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "object_store", - "prost", + "prost 0.11.9", ] [[package]] @@ -2603,13 +2593,11 @@ dependencies = [ "metrics", "openapiv3", "pin-project", - "prost", "prost-build", "prost-reflect", "tempdir", "tokio", "tokio-stream", - "tonic 0.8.3", "tonic-build", "tonic-reflection", "tonic-web", @@ -2674,7 +2662,7 @@ dependencies = [ "tempfile", "tokio", "tokio-stream", - "tonic 0.8.3", + "tonic 0.10.0", "tower", "uuid", "webbrowser", @@ -2726,7 +2714,6 @@ dependencies = [ "parquet", "postgres-protocol", "postgres-types", - "prost", "prost-reflect", "rand 0.8.5", "rdkafka", @@ -2739,7 +2726,6 @@ dependencies = [ "tokio", "tokio-postgres", "tokio-postgres-rustls", - "tonic 0.8.3", "tonic-reflection", "tonic-web", "tower-http", @@ -2871,6 +2857,7 @@ dependencies = [ "atty", "console-subscriber", "dozer-types", + "futures-util", "metrics", "metrics-exporter-prometheus", "opentelemetry", @@ -2897,8 +2884,8 @@ dependencies = [ "ordered-float 3.9.1", "parking_lot", "prettytable-rs", - "prost", - "prost-types", + "prost 0.12.0", + "prost-types 0.12.0", "pyo3", "rust_decimal", "serde", @@ -2907,7 +2894,7 @@ dependencies = [ "serde_yaml", "thiserror", "tokio-postgres", - "tonic 0.8.3", + "tonic 0.10.0", "tonic-build", "tracing", ] @@ -4019,6 +4006,12 @@ dependencies = [ "parity-scale-codec", ] +[[package]] +name = "impl-more" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206ca75c9c03ba3d4ace2460e57b189f39f43de612c2f85836e65c929701bb2d" + [[package]] name = "impl-rlp" version = "0.3.0" @@ -4171,11 +4164,12 @@ checksum = "30e22bd8629359895450b59ea7a776c850561b96a3b1d31321c1949d9e6c9146" [[package]] name = "iri-string" -version = "0.4.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f0f7638c1e223529f1bfdc48c8b133b9e0b434094d1d28473161ee48b235f78" +checksum = "21859b667d66a4c1dacd9df0863b3efb65785474255face87f5bca39dd8407c0" dependencies = [ - "nom", + "memchr", + "serde", ] [[package]] @@ -4599,25 +4593,34 @@ dependencies = [ [[package]] name = "logos" -version = "0.12.1" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf8b031682c67a8e3d5446840f9573eb7fe26efe7ec8d195c9ac4c0647c502f1" +checksum = "c000ca4d908ff18ac99b93a062cb8958d331c3220719c52e77cb19cc6ac5d2c1" dependencies = [ "logos-derive", ] [[package]] -name = "logos-derive" -version = "0.12.1" +name = "logos-codegen" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1d849148dbaf9661a6151d1ca82b13bb4c4c128146a88d05253b38d4e2f496c" +checksum = "dc487311295e0002e452025d6b580b77bb17286de87b57138f3b5db711cded68" dependencies = [ "beef", "fnv", "proc-macro2", "quote", "regex-syntax 0.6.28", - "syn 1.0.109", + "syn 2.0.29", +] + +[[package]] +name = "logos-derive" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbfc0d229f1f42d790440136d941afd806bc9e949e2bcb8faa813b0f00d1267e" +dependencies = [ + "logos-codegen", ] [[package]] @@ -4822,9 +4825,9 @@ dependencies = [ [[package]] name = "mime" -version = "0.3.16" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" [[package]] name = "mime_guess" @@ -5462,7 +5465,7 @@ dependencies = [ "opentelemetry-semantic-conventions", "opentelemetry_api", "opentelemetry_sdk", - "prost", + "prost 0.11.9", "thiserror", "tokio", "tonic 0.9.2", @@ -5476,7 +5479,7 @@ checksum = "b1e3f814aa9f8c905d0ee4bde026afd3b2577a97c10e1699912e3e44f0c4cbeb" dependencies = [ "opentelemetry_api", "opentelemetry_sdk", - "prost", + "prost 0.11.9", "tonic 0.9.2", ] @@ -5974,12 +5977,12 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "prettyplease" -version = "0.1.24" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ebcd279d20a4a0a2404a33056388e950504d891c855c7975b9a8fef75f3bf04" +checksum = "6c64d9ba0963cdcea2e1b2230fbae2bab30eb25a174be395c41e764bfb65dd62" dependencies = [ "proc-macro2", - "syn 1.0.109", + "syn 2.0.29", ] [[package]] @@ -6138,41 +6141,51 @@ dependencies = [ [[package]] name = "prost" -version = "0.11.8" +version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e48e50df39172a3e7eb17e14642445da64996989bc212b583015435d39a58537" +checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.11.9", +] + +[[package]] +name = "prost" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa8473a65b88506c106c28ae905ca4a2b83a2993640467a41bb3080627ddfd2c" +dependencies = [ + "bytes", + "prost-derive 0.12.0", ] [[package]] name = "prost-build" -version = "0.11.6" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3f8ad728fb08fe212df3c05169e940fbb6d9d16a877ddde14644a983ba2012e" +checksum = "30d3e647e9eb04ddfef78dfee2d5b3fefdf94821c84b710a3d8ebc89ede8b164" dependencies = [ "bytes", "heck", - "itertools 0.10.5", - "lazy_static", + "itertools 0.11.0", "log", "multimap", + "once_cell", "petgraph 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)", "prettyplease", - "prost", - "prost-types", + "prost 0.12.0", + "prost-types 0.12.0", "regex", - "syn 1.0.109", + "syn 2.0.29", "tempfile", "which", ] [[package]] name = "prost-derive" -version = "0.11.8" +version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ea9b0f8cbe5e15a8a042d030bd96668db28ecb567ec37d691971ff5731d2b1b" +checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" dependencies = [ "anyhow", "itertools 0.10.5", @@ -6181,28 +6194,50 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "prost-derive" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56075c27b20ae524d00f247b8a4dc333e5784f889fe63099f8e626bc8d73486c" +dependencies = [ + "anyhow", + "itertools 0.11.0", + "proc-macro2", + "quote", + "syn 2.0.29", +] + [[package]] name = "prost-reflect" -version = "0.10.2" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1c848c6e1773cb6f51228eda412c8872a61d24cb11a22efe7f9ea88fbff4f70" +checksum = "057237efdb71cf4b3f9396302a3d6599a92fa94063ba537b66130980ea9909f3" dependencies = [ "base64 0.21.0", "logos", "once_cell", - "prost", - "prost-types", + "prost 0.12.0", + "prost-types 0.12.0", "serde", "serde-value", ] [[package]] name = "prost-types" -version = "0.11.8" +version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "379119666929a1afd7a043aa6cf96fa67a6dce9af60c88095a4686dbce4c9c88" +checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13" dependencies = [ - "prost", + "prost 0.11.9", +] + +[[package]] +name = "prost-types" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cebe0a918c97f86c217b0f76fd754e966f8b9f41595095cf7d74cb4e59d730f6" +dependencies = [ + "prost 0.12.0", ] [[package]] @@ -7014,7 +7049,7 @@ checksum = "cd8d6c9f025a446bc4d18ad9632e69aec8f287aa84499ee335599fabd20c3fd8" dependencies = [ "log", "ring", - "rustls-webpki 0.101.3", + "rustls-webpki 0.101.5", "sct 0.7.0", ] @@ -7063,9 +7098,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.101.3" +version = "0.101.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "261e9e0888cba427c3316e6322805653c9425240b6fd96cee7cb671ab70ab8d0" +checksum = "45a27e3b59326c16e23d30aeb7a36a24cc0d29e71d68ff611cdfb4a01d013bed" dependencies = [ "ring", "untrusted", @@ -8200,14 +8235,13 @@ dependencies = [ [[package]] name = "tonic" -version = "0.8.3" +version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb" +checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" dependencies = [ - "async-stream", "async-trait", "axum", - "base64 0.13.1", + "base64 0.21.0", "bytes", "futures-core", "futures-util", @@ -8218,33 +8252,26 @@ dependencies = [ "hyper-timeout", "percent-encoding", "pin-project", - "prost", - "prost-derive", - "rustls-native-certs 0.6.3", - "rustls-pemfile", + "prost 0.11.9", "tokio", - "tokio-rustls 0.23.4", "tokio-stream", - "tokio-util 0.7.8", "tower", "tower-layer", "tower-service", "tracing", - "tracing-futures", ] [[package]] name = "tonic" -version = "0.9.2" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" +checksum = "5469afaf78a11265c343a88969045c1568aa8ecc6c787dbf756e92e70f199861" dependencies = [ + "async-stream", "async-trait", "axum", "base64 0.21.0", "bytes", - "futures-core", - "futures-util", "h2", "http", "http-body", @@ -8252,8 +8279,11 @@ dependencies = [ "hyper-timeout", "percent-encoding", "pin-project", - "prost", + "prost 0.12.0", + "rustls-native-certs 0.6.3", + "rustls-pemfile", "tokio", + "tokio-rustls 0.24.0", "tokio-stream", "tower", "tower-layer", @@ -8263,45 +8293,46 @@ dependencies = [ [[package]] name = "tonic-build" -version = "0.8.4" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bf5e9b9c0f7e0a7c027dcfaba7b2c60816c7049171f679d99ee2ff65d0de8c4" +checksum = "8b477abbe1d18c0b08f56cd01d1bc288668c5b5cfd19b2ae1886bbf599c546f1" dependencies = [ "prettyplease", "proc-macro2", "prost-build", "quote", - "syn 1.0.109", + "syn 2.0.29", ] [[package]] name = "tonic-reflection" -version = "0.6.0" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67494bad4dda4c9bffae901dfe14e2b2c0f760adb4706dc10beeb81799f7f7b2" +checksum = "16e61add39c1426d5f21eae2cc196e97e1f5a5ea7bcf491df3885797992a86eb" dependencies = [ - "bytes", - "prost", - "prost-types", + "prost 0.12.0", + "prost-types 0.12.0", "tokio", "tokio-stream", - "tonic 0.8.3", + "tonic 0.10.0", ] [[package]] name = "tonic-web" -version = "0.4.0" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e392f7556972523aa87ddb0fc7f2d2ce530559956706aa081bb0bd8fed158559" +checksum = "605028b8adec50b03ee93c1cf6a9dd0d861f508d82fbda569f4a813b411862c1" dependencies = [ - "base64 0.13.1", + "base64 0.21.0", "bytes", - "futures-core", "http", "http-body", "hyper", "pin-project", - "tonic 0.8.3", + "tokio-stream", + "tonic 0.10.0", + "tower-http", + "tower-layer", "tower-service", "tracing", ] @@ -8328,13 +8359,13 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.3.5" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f873044bf02dd1e8239e9c1293ea39dad76dc594ec16185d0a1bf31d8dc8d858" +checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" dependencies = [ - "async-compression 0.3.15", - "base64 0.13.1", - "bitflags 1.3.2", + "async-compression", + "base64 0.21.0", + "bitflags 2.4.0", "bytes", "futures-core", "futures-util", @@ -8414,16 +8445,6 @@ dependencies = [ "valuable", ] -[[package]] -name = "tracing-futures" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" -dependencies = [ - "pin-project", - "tracing", -] - [[package]] name = "tracing-log" version = "0.1.3" diff --git a/dozer-api/Cargo.toml b/dozer-api/Cargo.toml index dc768ac271..f96139fec7 100644 --- a/dozer-api/Cargo.toml +++ b/dozer-api/Cargo.toml @@ -14,7 +14,7 @@ dozer-tracing = { path = "../dozer-tracing" } dozer-core = { path = "../dozer-core" } actix-cors = "0.6.3" -actix-http = { version = "3.3.0", default-features = false, features = [ +actix-http = { version = "3.4.0", default-features = false, features = [ "rustls", ] } actix-web = { version = "4.2.1", default-features = false, features = [ @@ -28,23 +28,21 @@ actix-web = { version = "4.2.1", default-features = false, features = [ actix-web-httpauth = "0.8.0" handlebars = "4.3.7" openapiv3 = "1.0.2" -tonic-build = "0.8.2" +tonic-build = "0.10.0" tokio = { version = "1", features = ["full"] } -tonic = { version = "0.8.3" } -prost = "0.11.8" -prost-reflect = { version = "0.10.2", features = ["serde", "text-format"] } -tonic-reflection = "0.6.0" +prost-reflect = { version = "0.12.0", features = ["serde", "text-format"] } +tonic-reflection = "0.10.0" Inflector = "0.11.4" futures-util = "0.3.28" -prost-build = "0.11.6" -tonic-web = "0.4.0" +prost-build = "0.12.0" +tonic-web = "0.10.0" jsonwebtoken = "8.3.0" tokio-stream = "0.1.12" async-trait = "0.1.73" tracing-actix-web = "0.7.2" tower = "0.4.13" hyper = "0.14.24" -tower-http = { version = "0.3.5", features = ["full"] } +tower-http = { version = "0.4", features = ["full"] } arc-swap = "1.6.0" metrics = "0.21.0" gethostname = "0.4.3" diff --git a/dozer-api/src/auth/api.rs b/dozer-api/src/auth/api.rs index 22d1a9d110..132473b295 100644 --- a/dozer-api/src/auth/api.rs +++ b/dozer-api/src/auth/api.rs @@ -4,8 +4,8 @@ use actix_web::{ Error, HttpMessage, HttpRequest, HttpResponse, }; use actix_web_httpauth::extractors::bearer::BearerAuth; +use dozer_types::tonic::{Response, Status}; use dozer_types::{models::api_security::ApiSecurity, serde_json::json}; -use tonic::{Response, Status}; use crate::errors::{ApiError, AuthError}; diff --git a/dozer-api/src/errors.rs b/dozer-api/src/errors.rs index 5a6bcddcb2..33a1206474 100644 --- a/dozer-api/src/errors.rs +++ b/dozer-api/src/errors.rs @@ -66,9 +66,9 @@ pub enum GrpcError { Listen(SocketAddr, #[source] BoxedError), } -impl From for tonic::Status { +impl From for dozer_types::tonic::Status { fn from(input: ApiError) -> Self { - tonic::Status::new(tonic::Code::Unknown, input.to_string()) + dozer_types::tonic::Status::new(dozer_types::tonic::Code::Unknown, input.to_string()) } } diff --git a/dozer-api/src/grpc/auth/service.rs b/dozer-api/src/grpc/auth/service.rs index b4536b1326..60b60408b4 100644 --- a/dozer-api/src/grpc/auth/service.rs +++ b/dozer-api/src/grpc/auth/service.rs @@ -1,6 +1,6 @@ use crate::auth::Access; -use tonic::{Request, Response, Status}; +use dozer_types::tonic::{self, Request, Response, Status}; use crate::auth::api::auth_grpc; use dozer_types::grpc_types::auth::auth_grpc_service_server::AuthGrpcService; diff --git a/dozer-api/src/grpc/auth_middleware.rs b/dozer-api/src/grpc/auth_middleware.rs index bed2de2b11..7fd30ff09c 100644 --- a/dozer-api/src/grpc/auth_middleware.rs +++ b/dozer-api/src/grpc/auth_middleware.rs @@ -1,12 +1,12 @@ use dozer_types::models::api_security::ApiSecurity; -use futures_util::future::BoxFuture; -use hyper::{Body, Method}; -use std::task::{Context, Poll}; -use tonic::{ +use dozer_types::tonic::{ body::{empty_body, BoxBody}, codegen::http, transport::NamedService, }; +use futures_util::future::BoxFuture; +use hyper::{Body, Method}; +use std::task::{Context, Poll}; use tower::{Layer, Service}; use crate::auth::Authorizer; diff --git a/dozer-api/src/grpc/client_server.rs b/dozer-api/src/grpc/client_server.rs index 6658161c11..67cc930f36 100644 --- a/dozer-api/src/grpc/client_server.rs +++ b/dozer-api/src/grpc/client_server.rs @@ -3,6 +3,7 @@ use super::{auth_middleware::AuthMiddlewareLayer, common::CommonService, typed:: use crate::api_helper::get_api_security; use crate::errors::ApiInitError; use crate::grpc::auth::AuthService; +use crate::grpc::grpc_web_middleware::enable_grpc_web; use crate::grpc::health::HealthService; use crate::grpc::{common, run_server, typed}; use crate::{errors::GrpcError, CacheEndpoint}; @@ -14,6 +15,8 @@ use dozer_types::grpc_types::{ common::common_grpc_service_server::CommonGrpcServiceServer, health::health_grpc_service_server::HealthGrpcServiceServer, }; +use dozer_types::tonic::transport::server::TcpIncoming; +use dozer_types::tonic::transport::Server; use dozer_types::tracing::Level; use dozer_types::{ log::info, @@ -22,8 +25,6 @@ use dozer_types::{ use futures_util::Future; use std::{collections::HashMap, sync::Arc}; use tokio::sync::broadcast::{self, Receiver}; -use tonic::transport::server::TcpIncoming; -use tonic::transport::Server; use tonic_reflection::server::{ServerReflection, ServerReflectionServer}; use tower::Layer; use tower_http::trace::{self, TraceLayer}; @@ -92,27 +93,24 @@ impl ApiServer { operations_receiver: Option>, labels: LabelsAndProgress, default_max_num_records: usize, - ) -> Result>, ApiInitError> { + ) -> Result>, ApiInitError> + { // Create our services. - let mut web_config = tonic_web::config(); - if self.flags.grpc_web { - web_config = web_config.allow_all_origins(); - } - let common_service = CommonGrpcServiceServer::new(CommonService::new( cache_endpoints.clone(), operations_receiver.as_ref().map(|r| r.resubscribe()), default_max_num_records, )); - let common_service = web_config.enable(common_service); + let common_service = enable_grpc_web(common_service, self.flags.grpc_web); let (typed_service, reflection_service) = self.get_dynamic_service( cache_endpoints, operations_receiver, default_max_num_records, )?; - let typed_service = typed_service.map(|typed_service| web_config.enable(typed_service)); - let reflection_service = web_config.enable(reflection_service); + let typed_service = + typed_service.map(|typed_service| enable_grpc_web(typed_service, self.flags.grpc_web)); + let reflection_service = enable_grpc_web(reflection_service, self.flags.grpc_web); let mut service_map: HashMap = HashMap::new(); service_map.insert("".to_string(), ServingStatus::Serving); @@ -125,7 +123,7 @@ impl ApiServer { let health_service = HealthGrpcServiceServer::new(HealthService { serving_status: service_map, }); - let health_service = web_config.enable(health_service); + let health_service = enable_grpc_web(health_service, self.flags.grpc_web); // Auth middleware. let security = get_api_security(self.security.to_owned()); @@ -146,9 +144,10 @@ impl ApiServer { let mut auth_service = None; let security = get_api_security(self.security.to_owned()); if security.is_some() { - let service = web_config.enable(AuthGrpcServiceServer::new(AuthService::new( - security.to_owned(), - ))); + let service = enable_grpc_web( + AuthGrpcServiceServer::new(AuthService::new(security.to_owned())), + self.flags.grpc_web, + ); auth_service = Some(auth_middleware.layer(service)); } let metric_middleware = MetricMiddlewareLayer::new(labels); diff --git a/dozer-api/src/grpc/common/service.rs b/dozer-api/src/grpc/common/service.rs index de349106b5..3096a25dd8 100644 --- a/dozer-api/src/grpc/common/service.rs +++ b/dozer-api/src/grpc/common/service.rs @@ -9,8 +9,8 @@ use crate::CacheEndpoint; use dozer_types::grpc_types::common::common_grpc_service_server::CommonGrpcService; use dozer_types::grpc_types::conversions::field_definition_to_grpc; use dozer_types::indexmap::IndexMap; +use dozer_types::tonic::{self, Request, Response, Status}; use tokio_stream::wrappers::ReceiverStream; -use tonic::{Request, Response, Status}; use dozer_types::grpc_types::common::{ CountResponse, GetEndpointsRequest, GetEndpointsResponse, GetFieldsRequest, GetFieldsResponse, @@ -24,9 +24,9 @@ type ResponseStream = ReceiverStream>; // #[derive(Clone)] pub struct CommonService { /// For look up endpoint from its name. `key == value.endpoint.name`. Using index map to keep endpoint order. - pub endpoint_map: IndexMap>, - pub event_notifier: Option>, - pub default_max_num_records: usize, + endpoint_map: IndexMap>, + event_notifier: Option>, + default_max_num_records: usize, } impl CommonService { diff --git a/dozer-api/src/grpc/common/tests/mod.rs b/dozer-api/src/grpc/common/tests/mod.rs index 6d5a05b3a5..a092170327 100644 --- a/dozer-api/src/grpc/common/tests/mod.rs +++ b/dozer-api/src/grpc/common/tests/mod.rs @@ -11,7 +11,7 @@ use dozer_types::grpc_types::{ value, EventFilter, EventType, FieldDefinition, OperationType, RecordWithId, Type, Value, }, }; -use tonic::Request; +use dozer_types::tonic::Request; use super::CommonService; diff --git a/dozer-api/src/grpc/grpc_web_middleware.rs b/dozer-api/src/grpc/grpc_web_middleware.rs new file mode 100644 index 0000000000..76cb718056 --- /dev/null +++ b/dozer-api/src/grpc/grpc_web_middleware.rs @@ -0,0 +1,101 @@ +use dozer_types::tonic::{body::BoxBody, transport::NamedService}; +use futures_util::Future; +use pin_project::pin_project; +use tonic_web::{CorsGrpcWeb, GrpcWebLayer, GrpcWebService}; +use tower::{BoxError, Layer, Service}; +use tower_http::cors::{Cors, CorsLayer}; + +#[derive(Debug, Clone)] +pub enum MaybeGrpcWebService { + GrpcWeb(Cors>), + NoGrpcWeb(S), +} + +#[pin_project(project = MaybeGrpcWebServiceFutureProj)] +pub enum MaybeGrpcWebServiceFuture +where + S: Service, Response = http::Response>, + S: Clone + Send + 'static, + S::Future: Send + 'static, + S::Error: Into + Send, +{ + GrpcWeb(#[pin] as Service>>::Future), + NoGrpcWeb(#[pin] >>::Future), +} + +impl Service> for MaybeGrpcWebService +where + S: Service, Response = http::Response>, + S: Clone + Send + 'static, + S::Future: Send + 'static, + S::Error: Into + Send, +{ + type Response = S::Response; + type Error = S::Error; + type Future = MaybeGrpcWebServiceFuture; + + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match self { + MaybeGrpcWebService::GrpcWeb(service) => service.poll_ready(cx), + MaybeGrpcWebService::NoGrpcWeb(service) => service.poll_ready(cx), + } + } + + fn call(&mut self, req: http::Request) -> Self::Future { + match self { + MaybeGrpcWebService::GrpcWeb(service) => { + MaybeGrpcWebServiceFuture::GrpcWeb(service.call(req)) + } + MaybeGrpcWebService::NoGrpcWeb(service) => { + MaybeGrpcWebServiceFuture::NoGrpcWeb(service.call(req)) + } + } + } +} + +impl NamedService for MaybeGrpcWebService +where + S: NamedService, +{ + const NAME: &'static str = S::NAME; +} + +impl Future for MaybeGrpcWebServiceFuture +where + S: Service, Response = http::Response>, + S: Clone + Send + 'static, + S::Future: Send + 'static, + S::Error: Into + Send, +{ + type Output = <>>::Future as Future>::Output; + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let this = self.project(); + match this { + MaybeGrpcWebServiceFutureProj::GrpcWeb(fut) => fut.poll(cx), + MaybeGrpcWebServiceFutureProj::NoGrpcWeb(fut) => fut.poll(cx), + } + } +} + +pub fn enable_grpc_web(service: S, enabled: bool) -> MaybeGrpcWebService +where + S: Service, Response = http::Response>, + S: Clone + Send + 'static, + S::Future: Send + 'static, + S::Error: Into + Send, +{ + if enabled { + let service = GrpcWebLayer::new().layer(service); + let service = CorsLayer::very_permissive().layer(service); + MaybeGrpcWebService::GrpcWeb(service) + } else { + MaybeGrpcWebService::NoGrpcWeb(service) + } +} diff --git a/dozer-api/src/grpc/health/service.rs b/dozer-api/src/grpc/health/service.rs index e574643ed3..92bce4e987 100644 --- a/dozer-api/src/grpc/health/service.rs +++ b/dozer-api/src/grpc/health/service.rs @@ -1,9 +1,9 @@ use dozer_types::grpc_types::health::health_check_response::ServingStatus; use dozer_types::grpc_types::health::health_grpc_service_server::HealthGrpcService; use dozer_types::grpc_types::health::{HealthCheckRequest, HealthCheckResponse}; +use dozer_types::tonic::{self, Request, Response, Status}; use std::collections::HashMap; use tokio_stream::wrappers::ReceiverStream; -use tonic::{Request, Response, Status}; type ResponseStream = ReceiverStream>; diff --git a/dozer-api/src/grpc/health/tests/mod.rs b/dozer-api/src/grpc/health/tests/mod.rs index 071180609f..2fad4fe15d 100644 --- a/dozer-api/src/grpc/health/tests/mod.rs +++ b/dozer-api/src/grpc/health/tests/mod.rs @@ -1,5 +1,5 @@ +use dozer_types::tonic::Request; use std::collections::HashMap; -use tonic::Request; use dozer_types::grpc_types::health::health_check_response::ServingStatus; use dozer_types::grpc_types::health::health_grpc_service_server::HealthGrpcService; @@ -29,5 +29,8 @@ async fn test_grpc_health_check() { service: "non-existent".to_string(), })) .await; - assert_eq!(response.unwrap_err().code(), tonic::Code::NotFound); + assert_eq!( + response.unwrap_err().code(), + dozer_types::tonic::Code::NotFound + ); } diff --git a/dozer-api/src/grpc/internal/internal_pipeline_server.rs b/dozer-api/src/grpc/internal/internal_pipeline_server.rs index 463c0a61a1..f09053f2a6 100644 --- a/dozer-api/src/grpc/internal/internal_pipeline_server.rs +++ b/dozer-api/src/grpc/internal/internal_pipeline_server.rs @@ -12,15 +12,15 @@ use dozer_types::log::info; use dozer_types::models::api_config::AppGrpcOptions; use dozer_types::models::api_endpoint::ApiEndpoint; use dozer_types::parking_lot::Mutex; +use dozer_types::tonic::transport::server::TcpIncoming; +use dozer_types::tonic::transport::Server; +use dozer_types::tonic::{self, Request, Response, Status, Streaming}; use futures_util::future::Either; use futures_util::stream::BoxStream; use futures_util::{Future, StreamExt, TryStreamExt}; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use tonic::transport::server::TcpIncoming; -use tonic::transport::Server; -use tonic::{Request, Response, Status, Streaming}; use crate::errors::GrpcError; use crate::grpc::run_server; diff --git a/dozer-api/src/grpc/metric_middleware.rs b/dozer-api/src/grpc/metric_middleware.rs index cb49d6847c..40332c4da4 100644 --- a/dozer-api/src/grpc/metric_middleware.rs +++ b/dozer-api/src/grpc/metric_middleware.rs @@ -1,4 +1,5 @@ use dozer_tracing::LabelsAndProgress; +use dozer_types::tonic::{body::BoxBody, transport::NamedService}; use futures_util::future::BoxFuture; use hyper::Body; use metrics::{histogram, increment_counter}; @@ -6,7 +7,6 @@ use std::{ task::{Context, Poll}, time::Instant, }; -use tonic::{body::BoxBody, transport::NamedService}; use tower::{Layer, Service}; use crate::api_helper::{API_LATENCY_HISTOGRAM_NAME, API_REQUEST_COUNTER_NAME}; diff --git a/dozer-api/src/grpc/mod.rs b/dozer-api/src/grpc/mod.rs index b3e8af7fb5..5726ae14bc 100644 --- a/dozer-api/src/grpc/mod.rs +++ b/dozer-api/src/grpc/mod.rs @@ -5,6 +5,7 @@ pub mod health; pub mod internal; // pub mod dynamic; mod auth_middleware; +mod grpc_web_middleware; mod metric_middleware; mod shared_impl; pub mod typed; @@ -13,20 +14,20 @@ pub mod types_helper; use bytes::Bytes; pub use client_server::ApiServer; use dozer_types::errors::internal::BoxedError; +use dozer_types::tonic::transport::server::{Router, Routes, TcpIncoming}; use futures_util::{ stream::{AbortHandle, Abortable, Aborted}, Future, }; use http::{Request, Response}; use hyper::Body; -use tonic::transport::server::{Router, Routes, TcpIncoming}; use tower::{Layer, Service}; async fn run_server( server: Router, incoming: TcpIncoming, shutdown: impl Future + Send + 'static, -) -> Result<(), tonic::transport::Error> +) -> Result<(), dozer_types::tonic::transport::Error> where L: Layer, L::Service: Service, Response = Response> + Clone + Send + 'static, diff --git a/dozer-api/src/grpc/shared_impl/mod.rs b/dozer-api/src/grpc/shared_impl/mod.rs index cb53311a92..86c0fa1271 100644 --- a/dozer-api/src/grpc/shared_impl/mod.rs +++ b/dozer-api/src/grpc/shared_impl/mod.rs @@ -6,11 +6,11 @@ use dozer_cache::CacheReader; use dozer_types::grpc_types::types::Operation; use dozer_types::log::warn; use dozer_types::serde_json; +use dozer_types::tonic::{Code, Response, Status}; use dozer_types::types::Schema; use tokio::sync::broadcast::error::RecvError; use tokio::sync::broadcast::Receiver; use tokio_stream::wrappers::ReceiverStream; -use tonic::{Code, Response, Status}; use crate::api_helper::{get_records, get_records_count}; use crate::auth::Access; diff --git a/dozer-api/src/grpc/typed/codec.rs b/dozer-api/src/grpc/typed/codec.rs index 654ebfeacb..8a55e48e60 100644 --- a/dozer-api/src/grpc/typed/codec.rs +++ b/dozer-api/src/grpc/typed/codec.rs @@ -1,9 +1,9 @@ -use prost_reflect::prost::Message; -use prost_reflect::{DynamicMessage, MethodDescriptor}; -use tonic::{ +use dozer_types::tonic::{ codec::{Codec, DecodeBuf, Decoder, EncodeBuf, Encoder}, Status, }; +use prost_reflect::prost::Message; +use prost_reflect::{DynamicMessage, MethodDescriptor}; use super::TypedResponse; diff --git a/dozer-api/src/grpc/typed/service.rs b/dozer-api/src/grpc/typed/service.rs index 5197e14894..cf21448a50 100644 --- a/dozer-api/src/grpc/typed/service.rs +++ b/dozer-api/src/grpc/typed/service.rs @@ -17,17 +17,20 @@ use crate::{ CacheEndpoint, }; use dozer_cache::CacheReader; +use dozer_types::tonic::{ + self, + codegen::{ + self, empty_body, Body, BoxFuture, Context, EnabledCompressionEncodings, Poll, StdError, + }, + metadata::MetadataMap, + Code, Extensions, Request, Response, Status, +}; use dozer_types::{grpc_types::types::Operation, models::api_security::ApiSecurity}; use dozer_types::{log::error, types::Schema}; use futures_util::future; use prost_reflect::{MethodDescriptor, Value}; -use std::{borrow::Cow, collections::HashMap, convert::Infallible}; +use std::{borrow::Cow, collections::HashMap, convert::Infallible, sync::Arc}; use tokio_stream::wrappers::ReceiverStream; -use tonic::{ - codegen::{self, *}, - metadata::MetadataMap, - Code, Extensions, Request, Response, Status, -}; #[derive(Debug, Clone)] struct TypedEndpoint { diff --git a/dozer-api/src/grpc/typed/tests/service.rs b/dozer-api/src/grpc/typed/tests/service.rs index 494623cde1..22069adc03 100644 --- a/dozer-api/src/grpc/typed/tests/service.rs +++ b/dozer-api/src/grpc/typed/tests/service.rs @@ -16,6 +16,12 @@ use futures_util::FutureExt; use std::{env, str::FromStr, sync::Arc, time::Duration}; use crate::test_utils; +use dozer_types::tonic::{ + self, + metadata::MetadataValue, + transport::{Endpoint, Server}, + Code, Request, +}; use tokio::{ sync::{ broadcast::{self, Receiver}, @@ -24,11 +30,6 @@ use tokio::{ time::timeout, }; use tokio_stream::StreamExt; -use tonic::{ - metadata::MetadataValue, - transport::{Endpoint, Server}, - Code, Request, -}; pub async fn setup_pipeline() -> (Vec>, Receiver) { // Copy this file from dozer-types compilation output directory if it changes diff --git a/dozer-api/src/lib.rs b/dozer-api/src/lib.rs index 6fe9cdbe68..ea3d1c59c9 100644 --- a/dozer-api/src/lib.rs +++ b/dozer-api/src/lib.rs @@ -183,11 +183,11 @@ pub use actix_web_httpauth; pub use api_helper::API_LATENCY_HISTOGRAM_NAME; pub use api_helper::API_REQUEST_COUNTER_NAME; pub use async_trait; +pub use dozer_types::tonic; use errors::ApiInitError; pub use openapiv3; pub use tokio; use tokio::{sync::broadcast::Sender, task::JoinHandle}; -pub use tonic; pub use tracing_actix_web; #[cfg(test)] mod test_utils; diff --git a/dozer-cli/Cargo.toml b/dozer-cli/Cargo.toml index d535ed134f..fbee3e99aa 100644 --- a/dozer-cli/Cargo.toml +++ b/dozer-cli/Cargo.toml @@ -24,7 +24,7 @@ tokio = { version = "1", features = ["full"] } tempdir = "0.3.7" clap = { version = "4.4.1", features = ["derive"] } ctrlc = "3.2.5" -tonic = { version = "0.8.3", features = ["tls", "tls-roots"] } +tonic = { version = "0.10.0", features = ["tls", "tls-roots"] } tokio-stream = "0.1.12" include_dir = "0.7.3" handlebars = "4.3.7" diff --git a/dozer-cli/src/live/server.rs b/dozer-cli/src/live/server.rs index 4999fadfc5..b2616de3f7 100644 --- a/dozer-cli/src/live/server.rs +++ b/dozer-cli/src/live/server.rs @@ -209,13 +209,9 @@ pub async fn serve( let code_service = CodeServiceServer::new(live_server); let api_explorer_service = ApiExplorerServiceServer::new(api_explorer_server); // Enable CORS for local development - let contract_service = tonic_web::config() - .allow_all_origins() - .enable(contract_service); - let code_service = tonic_web::config().allow_all_origins().enable(code_service); - let api_explorer_service = tonic_web::config() - .allow_all_origins() - .enable(api_explorer_service); + let contract_service = tonic_web::enable(contract_service); + let code_service = tonic_web::enable(code_service); + let api_explorer_service = tonic_web::enable(api_explorer_service); let reflection_service = tonic_reflection::server::Builder::configure() .register_encoded_file_descriptor_set( diff --git a/dozer-ingestion/Cargo.toml b/dozer-ingestion/Cargo.toml index 23d38697c9..1f4f6f8760 100644 --- a/dozer-ingestion/Cargo.toml +++ b/dozer-ingestion/Cargo.toml @@ -40,12 +40,10 @@ schema_registry_converter = { version = "3.1.0", features = [ "avro", ], optional = true } regex = "1" -tonic = { version = "0.8.3" } -tonic-web = "0.4.0" -tonic-reflection = "0.6.0" -tower-http = { version = "0.3.5", features = ["full"] } -prost = "0.11.8" -prost-reflect = { version = "0.10.2", features = ["serde", "text-format"] } +tonic-web = "0.10.0" +tonic-reflection = "0.10.0" +tower-http = { version = "0.4", features = ["full"] } +prost-reflect = { version = "0.12.0", features = ["serde", "text-format"] } deltalake = { version = "0.13.0", default-features = false, features = [ "s3", "datafusion", diff --git a/dozer-ingestion/benches/grpc.rs b/dozer-ingestion/benches/grpc.rs index 1f214a2bbf..8c9fa1604d 100644 --- a/dozer-ingestion/benches/grpc.rs +++ b/dozer-ingestion/benches/grpc.rs @@ -2,13 +2,13 @@ use std::{sync::Arc, thread}; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; use dozer_ingestion::test_util::create_test_runtime; +use dozer_types::tonic::transport::Channel; use dozer_types::{ arrow::array::{Int32Array, StringArray}, grpc_types::ingest::{ingest_service_client::IngestServiceClient, IngestArrowRequest}, indicatif::{MultiProgress, ProgressBar}, serde_yaml, }; -use tonic::transport::Channel; mod helper; use crate::helper::TestConfig; use dozer_types::{ diff --git a/dozer-ingestion/src/connectors/delta_lake/connector.rs b/dozer-ingestion/src/connectors/delta_lake/connector.rs index 404cfeaf39..b7e1bb10a8 100644 --- a/dozer-ingestion/src/connectors/delta_lake/connector.rs +++ b/dozer-ingestion/src/connectors/delta_lake/connector.rs @@ -8,7 +8,7 @@ use crate::connectors::{ use crate::errors::ConnectorError; use crate::ingestion::Ingestor; use dozer_types::ingestion_types::DeltaLakeConfig; -use tonic::async_trait; +use dozer_types::tonic::async_trait; #[derive(Debug)] pub struct DeltaLakeConnector { diff --git a/dozer-ingestion/src/connectors/dozer/connector.rs b/dozer-ingestion/src/connectors/dozer/connector.rs index b6e43af654..bc76955c4c 100644 --- a/dozer-ingestion/src/connectors/dozer/connector.rs +++ b/dozer-ingestion/src/connectors/dozer/connector.rs @@ -4,6 +4,7 @@ use dozer_log::{ reader::{LogReaderBuilder, LogReaderOptions}, replication::LogOperation, }; +use dozer_types::tonic::{async_trait, transport::Channel}; use dozer_types::{ errors::types::DeserializationError, grpc_types::internal::{ @@ -21,7 +22,6 @@ use tokio::{ sync::mpsc::{channel, Sender}, task::JoinSet, }; -use tonic::{async_trait, transport::Channel}; use crate::{ connectors::{ diff --git a/dozer-ingestion/src/connectors/ethereum/log/connector.rs b/dozer-ingestion/src/connectors/ethereum/log/connector.rs index c1d1e89bfa..2498f28c44 100644 --- a/dozer-ingestion/src/connectors/ethereum/log/connector.rs +++ b/dozer-ingestion/src/connectors/ethereum/log/connector.rs @@ -11,7 +11,7 @@ use dozer_types::ingestion_types::{EthFilter, EthLogConfig}; use dozer_types::log::warn; use dozer_types::serde_json; -use tonic::async_trait; +use dozer_types::tonic::async_trait; use super::helper; use super::sender::{run, EthDetails}; diff --git a/dozer-ingestion/src/connectors/ethereum/trace/connector.rs b/dozer-ingestion/src/connectors/ethereum/trace/connector.rs index 08a3bf8834..149a379bbd 100644 --- a/dozer-ingestion/src/connectors/ethereum/trace/connector.rs +++ b/dozer-ingestion/src/connectors/ethereum/trace/connector.rs @@ -8,7 +8,7 @@ use crate::{connectors::TableInfo, errors::ConnectorError, ingestion::Ingestor}; use dozer_types::ingestion_types::{EthTraceConfig, IngestionMessage}; use dozer_types::log::{error, info, warn}; -use tonic::async_trait; +use dozer_types::tonic::async_trait; #[derive(Debug)] pub struct EthTraceConnector { diff --git a/dozer-ingestion/src/connectors/grpc/adapter/arrow.rs b/dozer-ingestion/src/connectors/grpc/adapter/arrow.rs index db53079b25..5989a2f496 100644 --- a/dozer-ingestion/src/connectors/grpc/adapter/arrow.rs +++ b/dozer-ingestion/src/connectors/grpc/adapter/arrow.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; +use dozer_types::tonic::async_trait; use dozer_types::{ arrow::datatypes::Schema as ArrowSchema, arrow::{self, ipc::reader::StreamReader}, @@ -11,7 +12,6 @@ use dozer_types::{ serde_json, types::{Operation, Record, Schema}, }; -use tonic::async_trait; use crate::{ connectors::{CdcType, SourceSchema}, diff --git a/dozer-ingestion/src/connectors/grpc/adapter/default.rs b/dozer-ingestion/src/connectors/grpc/adapter/default.rs index af2e846e01..ca010825d1 100644 --- a/dozer-ingestion/src/connectors/grpc/adapter/default.rs +++ b/dozer-ingestion/src/connectors/grpc/adapter/default.rs @@ -1,5 +1,5 @@ use dozer_types::serde_json; -use tonic::async_trait; +use dozer_types::tonic::async_trait; use crate::{connectors::SourceSchema, errors::ConnectorError}; diff --git a/dozer-ingestion/src/connectors/grpc/adapter/mod.rs b/dozer-ingestion/src/connectors/grpc/adapter/mod.rs index b2be2204f7..3e08229549 100644 --- a/dozer-ingestion/src/connectors/grpc/adapter/mod.rs +++ b/dozer-ingestion/src/connectors/grpc/adapter/mod.rs @@ -1,7 +1,7 @@ use std::fmt::Debug; use dozer_types::grpc_types::ingest::{IngestArrowRequest, IngestRequest}; -use tonic::async_trait; +use dozer_types::tonic::async_trait; use crate::{connectors::SourceSchema, errors::ConnectorError, ingestion::Ingestor}; diff --git a/dozer-ingestion/src/connectors/grpc/connector.rs b/dozer-ingestion/src/connectors/grpc/connector.rs index 95dd13103e..564ac8be51 100644 --- a/dozer-ingestion/src/connectors/grpc/connector.rs +++ b/dozer-ingestion/src/connectors/grpc/connector.rs @@ -10,9 +10,9 @@ use crate::{connectors::TableInfo, errors::ConnectorError, ingestion::Ingestor}; use dozer_types::grpc_types::ingest::ingest_service_server::IngestServiceServer; use dozer_types::ingestion_types::GrpcConfig; use dozer_types::log::{info, warn}; +use dozer_types::tonic::async_trait; +use dozer_types::tonic::transport::Server; use dozer_types::tracing::Level; -use tonic::async_trait; -use tonic::transport::Server; use tower_http::trace::{self, TraceLayer}; #[derive(Debug)] @@ -83,9 +83,7 @@ where let ingestor = unsafe { std::mem::transmute::<&'_ Ingestor, &'static Ingestor>(ingestor) }; let ingest_service = IngestorServiceImpl::new(adapter, ingestor, tables); - let ingest_service = tonic_web::config() - .allow_all_origins() - .enable(IngestServiceServer::new(ingest_service)); + let ingest_service = tonic_web::enable(IngestServiceServer::new(ingest_service)); let reflection_service = tonic_reflection::server::Builder::configure() .register_encoded_file_descriptor_set( diff --git a/dozer-ingestion/src/connectors/grpc/ingest.rs b/dozer-ingestion/src/connectors/grpc/ingest.rs index 26746b4481..6222f1325e 100644 --- a/dozer-ingestion/src/connectors/grpc/ingest.rs +++ b/dozer-ingestion/src/connectors/grpc/ingest.rs @@ -1,8 +1,8 @@ use std::sync::Arc; +use dozer_types::tonic::{self, Streaming}; use dozer_types::{grpc_types::ingest::IngestArrowRequest, log::error}; use futures::StreamExt; -use tonic::Streaming; use dozer_types::grpc_types::ingest::{ ingest_service_server::IngestService, IngestRequest, IngestResponse, diff --git a/dozer-ingestion/src/connectors/grpc/tests.rs b/dozer-ingestion/src/connectors/grpc/tests.rs index a9da0e3f52..6a9a5b6d6f 100644 --- a/dozer-ingestion/src/connectors/grpc/tests.rs +++ b/dozer-ingestion/src/connectors/grpc/tests.rs @@ -23,13 +23,13 @@ use dozer_types::{ use dozer_types::json_types::JsonValue as dozer_JsonValue; use dozer_types::ordered_float::OrderedFloat; +use dozer_types::tonic::transport::Channel; use dozer_types::types::{FieldDefinition, FieldType, Schema as DozerSchema, SourceDefinition}; use dozer_types::{ ingestion_types::{GrpcConfig, GrpcConfigSchemas}, serde_json::json, }; use tokio::runtime::Runtime; -use tonic::transport::Channel; use super::connector::GrpcConnector; use super::IngestAdapter; diff --git a/dozer-ingestion/src/connectors/kafka/connector.rs b/dozer-ingestion/src/connectors/kafka/connector.rs index 862398de86..5bd2354235 100644 --- a/dozer-ingestion/src/connectors/kafka/connector.rs +++ b/dozer-ingestion/src/connectors/kafka/connector.rs @@ -10,7 +10,7 @@ use dozer_types::ingestion_types::KafkaConfig; use rdkafka::consumer::Consumer; use rdkafka::util::Timeout; -use tonic::async_trait; +use dozer_types::tonic::async_trait; use crate::connectors::kafka::no_schema_registry_basic::NoSchemaRegistryBasic; diff --git a/dozer-ingestion/src/connectors/kafka/debezium/stream_consumer.rs b/dozer-ingestion/src/connectors/kafka/debezium/stream_consumer.rs index d02c8db47d..6342064590 100644 --- a/dozer-ingestion/src/connectors/kafka/debezium/stream_consumer.rs +++ b/dozer-ingestion/src/connectors/kafka/debezium/stream_consumer.rs @@ -15,8 +15,8 @@ use dozer_types::serde_json::Value; use dozer_types::types::{Operation, Record}; use crate::connectors::TableToIngest; +use dozer_types::tonic::async_trait; use rdkafka::{ClientConfig, Message}; -use tonic::async_trait; #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(crate = "dozer_types::serde")] diff --git a/dozer-ingestion/src/connectors/kafka/stream_consumer.rs b/dozer-ingestion/src/connectors/kafka/stream_consumer.rs index adbbe0e2c6..9c6d7deb59 100644 --- a/dozer-ingestion/src/connectors/kafka/stream_consumer.rs +++ b/dozer-ingestion/src/connectors/kafka/stream_consumer.rs @@ -2,8 +2,8 @@ use crate::errors::ConnectorError; use crate::ingestion::Ingestor; use crate::connectors::TableToIngest; +use dozer_types::tonic::async_trait; use rdkafka::ClientConfig; -use tonic::async_trait; #[async_trait] pub trait StreamConsumer { diff --git a/dozer-ingestion/src/connectors/kafka/stream_consumer_basic.rs b/dozer-ingestion/src/connectors/kafka/stream_consumer_basic.rs index 6a1063d9c0..67b8a4e402 100644 --- a/dozer-ingestion/src/connectors/kafka/stream_consumer_basic.rs +++ b/dozer-ingestion/src/connectors/kafka/stream_consumer_basic.rs @@ -16,7 +16,7 @@ use dozer_types::types::{Field, Operation, Record}; use crate::connectors::kafka::no_schema_registry_basic::NoSchemaRegistryBasic; use crate::connectors::kafka::schema_registry_basic::SchemaRegistryBasic; -use tonic::async_trait; +use dozer_types::tonic::async_trait; use crate::connectors::TableToIngest; use crate::errors::KafkaStreamError::PollingError; diff --git a/dozer-ingestion/src/connectors/mod.rs b/dozer-ingestion/src/connectors/mod.rs index 192a6bbd1a..57cc3e296c 100644 --- a/dozer-ingestion/src/connectors/mod.rs +++ b/dozer-ingestion/src/connectors/mod.rs @@ -28,7 +28,7 @@ use dozer_types::log::debug; use dozer_types::models::connection::Connection; use dozer_types::models::connection::ConnectionConfig; use dozer_types::node::OpIdentifier; -use tonic::async_trait; +use dozer_types::tonic::async_trait; use crate::connectors::object_store::connector::ObjectStoreConnector; diff --git a/dozer-ingestion/src/connectors/mongodb/mod.rs b/dozer-ingestion/src/connectors/mongodb/mod.rs index 40b61af9e6..714b4fdf86 100644 --- a/dozer-ingestion/src/connectors/mongodb/mod.rs +++ b/dozer-ingestion/src/connectors/mongodb/mod.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use bson::{doc, Bson, Document, Timestamp}; +use dozer_types::tonic::async_trait; use futures::{stream::FuturesUnordered, StreamExt, TryFutureExt, TryStreamExt}; use mongodb::{ change_stream::event::ChangeStreamEvent, @@ -8,7 +9,6 @@ use mongodb::{ options::{ChangeStreamOptions, ClientOptions, ConnectionString}, }; use tokio::sync::mpsc::{channel, Sender}; -use tonic::async_trait; use crate::{errors::ConnectorError, ingestion::Ingestor}; use dozer_types::{ diff --git a/dozer-ingestion/src/connectors/mysql/connector.rs b/dozer-ingestion/src/connectors/mysql/connector.rs index d9d12a6e00..8a217f878f 100644 --- a/dozer-ingestion/src/connectors/mysql/connector.rs +++ b/dozer-ingestion/src/connectors/mysql/connector.rs @@ -13,13 +13,13 @@ use crate::{ errors::MySQLConnectorError, }; use crate::{errors::ConnectorError, ingestion::Ingestor}; +use dozer_types::tonic::async_trait; use dozer_types::{ ingestion_types::IngestionMessage, types::{FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition}, }; use mysql_async::{Opts, Pool}; use mysql_common::Row; -use tonic::async_trait; #[derive(Debug)] pub struct MySQLConnector { diff --git a/dozer-ingestion/src/connectors/object_store/connector.rs b/dozer-ingestion/src/connectors/object_store/connector.rs index 0ce241f3cb..9c481a93ec 100644 --- a/dozer-ingestion/src/connectors/object_store/connector.rs +++ b/dozer-ingestion/src/connectors/object_store/connector.rs @@ -1,9 +1,9 @@ use dozer_types::ingestion_types::IngestionMessage; +use dozer_types::tonic::async_trait; use futures::future::join_all; use std::collections::HashMap; use tokio::sync::mpsc::channel; use tokio::task::JoinSet; -use tonic::async_trait; use crate::connectors::object_store::adapters::DozerObjectStore; use crate::connectors::object_store::schema_mapper; diff --git a/dozer-ingestion/src/connectors/object_store/csv/csv_table.rs b/dozer-ingestion/src/connectors/object_store/csv/csv_table.rs index 562e29e2ba..870458a20a 100644 --- a/dozer-ingestion/src/connectors/object_store/csv/csv_table.rs +++ b/dozer-ingestion/src/connectors/object_store/csv/csv_table.rs @@ -5,10 +5,10 @@ use dozer_types::{ }; use std::{collections::HashMap, path::Path, sync::Arc, time::Duration}; +use dozer_types::tonic::async_trait; use futures::StreamExt; use object_store::ObjectStore; use tokio::sync::mpsc::Sender; -use tonic::async_trait; use crate::{ connectors::{ diff --git a/dozer-ingestion/src/connectors/object_store/delta/delta_table.rs b/dozer-ingestion/src/connectors/object_store/delta/delta_table.rs index 2b26ce8704..1f6d53a896 100644 --- a/dozer-ingestion/src/connectors/object_store/delta/delta_table.rs +++ b/dozer-ingestion/src/connectors/object_store/delta/delta_table.rs @@ -3,6 +3,7 @@ use std::{collections::HashMap, sync::Arc}; use deltalake::{datafusion::prelude::SessionContext, s3_storage_options}; use dozer_types::chrono::{DateTime, Utc}; use dozer_types::ingestion_types::IngestionMessage; +use dozer_types::tonic::async_trait; use dozer_types::{ arrow_types::from_arrow::{map_schema_to_dozer, map_value_to_dozer_field}, ingestion_types::DeltaConfig, @@ -12,7 +13,6 @@ use dozer_types::{ use futures::StreamExt; use tokio::sync::mpsc::Sender; use tokio::task::JoinHandle; -use tonic::async_trait; use crate::{ connectors::{ diff --git a/dozer-ingestion/src/connectors/object_store/parquet/parquet_table.rs b/dozer-ingestion/src/connectors/object_store/parquet/parquet_table.rs index 79ed540c81..b50a6d87ec 100644 --- a/dozer-ingestion/src/connectors/object_store/parquet/parquet_table.rs +++ b/dozer-ingestion/src/connectors/object_store/parquet/parquet_table.rs @@ -11,6 +11,7 @@ use deltalake::{ }; use dozer_types::ingestion_types::IngestionMessage; +use dozer_types::tonic::async_trait; use dozer_types::{ chrono::{DateTime, Utc}, ingestion_types::ParquetConfig, @@ -21,7 +22,6 @@ use object_store::ObjectStore; use std::{collections::HashMap, path::Path, sync::Arc, time::Duration}; use tokio::sync::mpsc::Sender; use tokio::task::JoinHandle; -use tonic::async_trait; use crate::connectors::object_store::helper::is_marker_file_exist; use crate::{ diff --git a/dozer-ingestion/src/connectors/object_store/table_reader.rs b/dozer-ingestion/src/connectors/object_store/table_reader.rs index f7137c0730..84284d724e 100644 --- a/dozer-ingestion/src/connectors/object_store/table_reader.rs +++ b/dozer-ingestion/src/connectors/object_store/table_reader.rs @@ -14,11 +14,11 @@ use deltalake::datafusion::prelude::SessionContext; use dozer_types::arrow_types::from_arrow::{map_schema_to_dozer, map_value_to_dozer_field}; use dozer_types::ingestion_types::IngestionMessage; use dozer_types::log::error; +use dozer_types::tonic::async_trait; use dozer_types::types::{Operation, Record}; use futures::StreamExt; use std::sync::Arc; use tokio::sync::mpsc::Sender; -use tonic::async_trait; pub struct TableReader { pub(crate) config: T, diff --git a/dozer-ingestion/src/connectors/object_store/table_watcher.rs b/dozer-ingestion/src/connectors/object_store/table_watcher.rs index b3959b0298..eb9a300be9 100644 --- a/dozer-ingestion/src/connectors/object_store/table_watcher.rs +++ b/dozer-ingestion/src/connectors/object_store/table_watcher.rs @@ -6,9 +6,9 @@ use std::collections::HashMap; use dozer_types::chrono::{DateTime, Utc}; use dozer_types::ingestion_types::IngestionMessage; +use dozer_types::tonic::async_trait; use tokio::sync::mpsc::Sender; use tokio::task::JoinHandle; -use tonic::async_trait; #[derive(Debug, Eq, Clone)] pub struct FileInfo { diff --git a/dozer-ingestion/src/connectors/object_store/watcher.rs b/dozer-ingestion/src/connectors/object_store/watcher.rs index cfac4e152e..d71ac33c76 100644 --- a/dozer-ingestion/src/connectors/object_store/watcher.rs +++ b/dozer-ingestion/src/connectors/object_store/watcher.rs @@ -4,11 +4,11 @@ use deltalake::{ datafusion::{datasource::listing::ListingTableUrl, prelude::SessionContext}, Path as DeltaPath, }; +use dozer_types::tonic::async_trait; use dozer_types::tracing::info; use futures::StreamExt; use object_store::ObjectStore; use tokio::sync::mpsc::Sender; -use tonic::async_trait; use crate::{ connectors::{object_store::helper::map_listing_options, TableInfo}, diff --git a/dozer-ingestion/src/connectors/postgres/connector.rs b/dozer-ingestion/src/connectors/postgres/connector.rs index 0117e120a1..00c36890df 100644 --- a/dozer-ingestion/src/connectors/postgres/connector.rs +++ b/dozer-ingestion/src/connectors/postgres/connector.rs @@ -5,11 +5,11 @@ use crate::connectors::{ }; use crate::errors::ConnectorError; use crate::ingestion::Ingestor; +use dozer_types::tonic::async_trait; use dozer_types::tracing::info; use postgres_types::PgLsn; use rand::distributions::Alphanumeric; use rand::Rng; -use tonic::async_trait; use crate::connectors::postgres::schema::helper::{SchemaHelper, DEFAULT_SCHEMA_NAME}; use crate::errors::ConnectorError::PostgresConnectorError; diff --git a/dozer-ingestion/src/connectors/snowflake/connector/snowflake.rs b/dozer-ingestion/src/connectors/snowflake/connector/snowflake.rs index c7951697a3..50d38472ba 100644 --- a/dozer-ingestion/src/connectors/snowflake/connector/snowflake.rs +++ b/dozer-ingestion/src/connectors/snowflake/connector/snowflake.rs @@ -8,8 +8,8 @@ use crate::errors::ConnectorError; use crate::ingestion::Ingestor; use dozer_types::ingestion_types::SnowflakeConfig; use dozer_types::node::OpIdentifier; +use dozer_types::tonic::async_trait; use odbc::create_environment_v3; -use tonic::async_trait; use crate::connectors::snowflake::stream_consumer::StreamConsumer; diff --git a/dozer-ingestion/src/errors.rs b/dozer-ingestion/src/errors.rs index 0629f9e851..f732f37236 100644 --- a/dozer-ingestion/src/errors.rs +++ b/dozer-ingestion/src/errors.rs @@ -155,10 +155,10 @@ pub enum NestedDozerConnectorError { MissingGrpcConfig, #[error("Failed to connect to upstream dozer at {0}: {1:?}")] - ConnectionError(String, #[source] tonic::transport::Error), + ConnectionError(String, #[source] dozer_types::tonic::transport::Error), #[error("Failed to query endpoints from upstream dozer app: {0}")] - DescribeEndpointsError(#[source] tonic::Status), + DescribeEndpointsError(#[source] dozer_types::tonic::Status), #[error(transparent)] ReaderError(#[from] ReaderError), diff --git a/dozer-ingestion/tests/test_suite/connectors/dozer.rs b/dozer-ingestion/tests/test_suite/connectors/dozer.rs index 527bfe50e5..e514f3a737 100644 --- a/dozer-ingestion/tests/test_suite/connectors/dozer.rs +++ b/dozer-ingestion/tests/test_suite/connectors/dozer.rs @@ -24,11 +24,11 @@ use dozer_types::{ serde_json, }; +use dozer_types::tonic::async_trait; +use dozer_types::tonic::transport::Channel; use futures::lock::Mutex; use tempdir::TempDir; use tokio::runtime::Runtime; -use tonic::async_trait; -use tonic::transport::Channel; use crate::test_suite::records::Operation; use crate::test_suite::{ diff --git a/dozer-ingestion/tests/test_suite/connectors/mongodb.rs b/dozer-ingestion/tests/test_suite/connectors/mongodb.rs index fbad6b9472..0c85af2561 100644 --- a/dozer-ingestion/tests/test_suite/connectors/mongodb.rs +++ b/dozer-ingestion/tests/test_suite/connectors/mongodb.rs @@ -1,9 +1,9 @@ use bson::doc; use dozer_ingestion::connectors::mongodb::MongodbConnector; +use dozer_types::tonic::async_trait; use dozer_utils::{process::run_docker_compose, Cleanup}; use mongodb::options::{ClientOptions, InsertOneOptions, WriteConcern}; use tempdir::TempDir; -use tonic::async_trait; use crate::test_suite::DataReadyConnectorTest; diff --git a/dozer-ingestion/tests/test_suite/connectors/object_store/local_storage.rs b/dozer-ingestion/tests/test_suite/connectors/object_store/local_storage.rs index bbac064c3e..cc1f24a698 100644 --- a/dozer-ingestion/tests/test_suite/connectors/object_store/local_storage.rs +++ b/dozer-ingestion/tests/test_suite/connectors/object_store/local_storage.rs @@ -1,12 +1,12 @@ use dozer_ingestion::connectors::object_store::connector::ObjectStoreConnector; +use dozer_types::tonic::async_trait; use dozer_types::{ arrow, ingestion_types::{LocalDetails, LocalStorage, ParquetConfig, Table, TableConfig}, types::Field, }; use tempdir::TempDir; -use tonic::async_trait; use crate::test_suite::{DataReadyConnectorTest, FieldsAndPk, InsertOnlyConnectorTest}; diff --git a/dozer-ingestion/tests/test_suite/connectors/postgres.rs b/dozer-ingestion/tests/test_suite/connectors/postgres.rs index 393f145c8e..019c474fa1 100644 --- a/dozer-ingestion/tests/test_suite/connectors/postgres.rs +++ b/dozer-ingestion/tests/test_suite/connectors/postgres.rs @@ -2,10 +2,10 @@ use dozer_ingestion::connectors::postgres::{ connection::{client::Client, helper::connect}, connector::{PostgresConfig, PostgresConnector}, }; +use dozer_types::tonic::async_trait; use dozer_types::types::Field; use dozer_utils::{process::run_docker_compose, Cleanup}; use tempdir::TempDir; -use tonic::async_trait; use crate::test_suite::{ records::Operation, CudConnectorTest, DataReadyConnectorTest, FieldsAndPk, diff --git a/dozer-ingestion/tests/test_suite/mod.rs b/dozer-ingestion/tests/test_suite/mod.rs index 667c6d9baf..5a17099f45 100644 --- a/dozer-ingestion/tests/test_suite/mod.rs +++ b/dozer-ingestion/tests/test_suite/mod.rs @@ -51,4 +51,4 @@ pub use connectors::MongodbConnectorTest; pub use connectors::{ DozerConnectorTest, LocalStorageObjectStoreConnectorTest, PostgresConnectorTest, }; -use tonic::async_trait; +use dozer_types::tonic::async_trait; diff --git a/dozer-tracing/Cargo.toml b/dozer-tracing/Cargo.toml index 30c5ad6fba..9bddf59ecd 100644 --- a/dozer-tracing/Cargo.toml +++ b/dozer-tracing/Cargo.toml @@ -24,6 +24,7 @@ atty = "0.2.14" opentelemetry-otlp = "0.13.0" metrics = "0.21.1" console-subscriber = { version = "0.1.10", optional = true } +futures-util = "0.3.28" [features] tokio-console = ["tokio/tracing", "console-subscriber"] diff --git a/dozer-tracing/src/exporter.rs b/dozer-tracing/src/exporter.rs index 64b0f6c0ea..f53c398fa6 100644 --- a/dozer-tracing/src/exporter.rs +++ b/dozer-tracing/src/exporter.rs @@ -38,10 +38,8 @@ impl SpanExporter for DozerExporter { fn export( &mut self, batch: Vec, - ) -> dozer_types::tonic::codegen::futures_core::future::BoxFuture< - 'static, - opentelemetry::sdk::export::trace::ExportResult, - > { + ) -> futures_util::future::BoxFuture<'static, opentelemetry::sdk::export::trace::ExportResult> + { let endpoint = self.config.endpoint.clone(); let seq_no = self.seq_no.clone(); Box::pin(async move { diff --git a/dozer-types/Cargo.toml b/dozer-types/Cargo.toml index 11750e0b7b..67a8e45729 100644 --- a/dozer-types/Cargo.toml +++ b/dozer-types/Cargo.toml @@ -25,9 +25,9 @@ prettytable-rs = "0.10.0" indicatif = "0.17.3" geo = { version = "0.26.0", features = ["use-serde"] } pyo3 = { version = "0.18.1", optional = true } -tonic = { version = "0.8.3" } -prost-types = "0.11.1" -prost = "0.11.8" +tonic = { version = "0.10.0" } +prost-types = "0.12.0" +prost = "0.12.0" arrow = { version = "42.0.0" } arrow-schema = { version = "42.0.0", features = ["serde"] } tokio-postgres = { version = "0.7.7", features = [ @@ -38,7 +38,7 @@ tokio-postgres = { version = "0.7.7", features = [ serde_bytes = "0.11.12" [build-dependencies] -tonic-build = "0.8.2" +tonic-build = "0.10.0" [features] python-extension-module = ["dep:pyo3", "pyo3?/extension-module"]