Skip to content

Commit

Permalink
chore: Update tonic crates (#2039)
Browse files Browse the repository at this point in the history
* chore: Remove direct `tonic` dependency from `dozer-api` and `dozer-ingestion`

* chore: Update `tonic` crates

* chore: Update `tower-http`

* chore: Remove some unnecessary `pub`s

* fix: Make cors more permissive

* chore: Update `tonic` crates
  • Loading branch information
chubei authored Sep 15, 2023
1 parent 82a42f2 commit 1a2cdd4
Show file tree
Hide file tree
Showing 59 changed files with 358 additions and 240 deletions.
255 changes: 138 additions & 117 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 7 additions & 9 deletions dozer-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ dozer-tracing = { path = "../dozer-tracing" }
dozer-core = { path = "../dozer-core" }

actix-cors = "0.6.3"
actix-http = { version = "3.3.0", default-features = false, features = [
actix-http = { version = "3.4.0", default-features = false, features = [
"rustls",
] }
actix-web = { version = "4.2.1", default-features = false, features = [
Expand All @@ -28,23 +28,21 @@ actix-web = { version = "4.2.1", default-features = false, features = [
actix-web-httpauth = "0.8.0"
handlebars = "4.3.7"
openapiv3 = "1.0.2"
tonic-build = "0.8.2"
tonic-build = "0.10.0"
tokio = { version = "1", features = ["full"] }
tonic = { version = "0.8.3" }
prost = "0.11.8"
prost-reflect = { version = "0.10.2", features = ["serde", "text-format"] }
tonic-reflection = "0.6.0"
prost-reflect = { version = "0.12.0", features = ["serde", "text-format"] }
tonic-reflection = "0.10.0"
Inflector = "0.11.4"
futures-util = "0.3.28"
prost-build = "0.11.6"
tonic-web = "0.4.0"
prost-build = "0.12.0"
tonic-web = "0.10.0"
jsonwebtoken = "8.3.0"
tokio-stream = "0.1.12"
async-trait = "0.1.73"
tracing-actix-web = "0.7.2"
tower = "0.4.13"
hyper = "0.14.24"
tower-http = { version = "0.3.5", features = ["full"] }
tower-http = { version = "0.4", features = ["full"] }
arc-swap = "1.6.0"
metrics = "0.21.0"
gethostname = "0.4.3"
Expand Down
2 changes: 1 addition & 1 deletion dozer-api/src/auth/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use actix_web::{
Error, HttpMessage, HttpRequest, HttpResponse,
};
use actix_web_httpauth::extractors::bearer::BearerAuth;
use dozer_types::tonic::{Response, Status};
use dozer_types::{models::api_security::ApiSecurity, serde_json::json};
use tonic::{Response, Status};

use crate::errors::{ApiError, AuthError};

Expand Down
4 changes: 2 additions & 2 deletions dozer-api/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ pub enum GrpcError {
Listen(SocketAddr, #[source] BoxedError),
}

impl From<ApiError> for tonic::Status {
impl From<ApiError> for dozer_types::tonic::Status {
fn from(input: ApiError) -> Self {
tonic::Status::new(tonic::Code::Unknown, input.to_string())
dozer_types::tonic::Status::new(dozer_types::tonic::Code::Unknown, input.to_string())
}
}

Expand Down
2 changes: 1 addition & 1 deletion dozer-api/src/grpc/auth/service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::auth::Access;

use tonic::{Request, Response, Status};
use dozer_types::tonic::{self, Request, Response, Status};

use crate::auth::api::auth_grpc;
use dozer_types::grpc_types::auth::auth_grpc_service_server::AuthGrpcService;
Expand Down
8 changes: 4 additions & 4 deletions dozer-api/src/grpc/auth_middleware.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use dozer_types::models::api_security::ApiSecurity;
use futures_util::future::BoxFuture;
use hyper::{Body, Method};
use std::task::{Context, Poll};
use tonic::{
use dozer_types::tonic::{
body::{empty_body, BoxBody},
codegen::http,
transport::NamedService,
};
use futures_util::future::BoxFuture;
use hyper::{Body, Method};
use std::task::{Context, Poll};
use tower::{Layer, Service};

use crate::auth::Authorizer;
Expand Down
29 changes: 14 additions & 15 deletions dozer-api/src/grpc/client_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use super::{auth_middleware::AuthMiddlewareLayer, common::CommonService, typed::
use crate::api_helper::get_api_security;
use crate::errors::ApiInitError;
use crate::grpc::auth::AuthService;
use crate::grpc::grpc_web_middleware::enable_grpc_web;
use crate::grpc::health::HealthService;
use crate::grpc::{common, run_server, typed};
use crate::{errors::GrpcError, CacheEndpoint};
Expand All @@ -14,6 +15,8 @@ use dozer_types::grpc_types::{
common::common_grpc_service_server::CommonGrpcServiceServer,
health::health_grpc_service_server::HealthGrpcServiceServer,
};
use dozer_types::tonic::transport::server::TcpIncoming;
use dozer_types::tonic::transport::Server;
use dozer_types::tracing::Level;
use dozer_types::{
log::info,
Expand All @@ -22,8 +25,6 @@ use dozer_types::{
use futures_util::Future;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::broadcast::{self, Receiver};
use tonic::transport::server::TcpIncoming;
use tonic::transport::Server;
use tonic_reflection::server::{ServerReflection, ServerReflectionServer};
use tower::Layer;
use tower_http::trace::{self, TraceLayer};
Expand Down Expand Up @@ -92,27 +93,24 @@ impl ApiServer {
operations_receiver: Option<Receiver<Operation>>,
labels: LabelsAndProgress,
default_max_num_records: usize,
) -> Result<impl Future<Output = Result<(), tonic::transport::Error>>, ApiInitError> {
) -> Result<impl Future<Output = Result<(), dozer_types::tonic::transport::Error>>, ApiInitError>
{
// Create our services.
let mut web_config = tonic_web::config();
if self.flags.grpc_web {
web_config = web_config.allow_all_origins();
}

let common_service = CommonGrpcServiceServer::new(CommonService::new(
cache_endpoints.clone(),
operations_receiver.as_ref().map(|r| r.resubscribe()),
default_max_num_records,
));
let common_service = web_config.enable(common_service);
let common_service = enable_grpc_web(common_service, self.flags.grpc_web);

let (typed_service, reflection_service) = self.get_dynamic_service(
cache_endpoints,
operations_receiver,
default_max_num_records,
)?;
let typed_service = typed_service.map(|typed_service| web_config.enable(typed_service));
let reflection_service = web_config.enable(reflection_service);
let typed_service =
typed_service.map(|typed_service| enable_grpc_web(typed_service, self.flags.grpc_web));
let reflection_service = enable_grpc_web(reflection_service, self.flags.grpc_web);

let mut service_map: HashMap<String, ServingStatus> = HashMap::new();
service_map.insert("".to_string(), ServingStatus::Serving);
Expand All @@ -125,7 +123,7 @@ impl ApiServer {
let health_service = HealthGrpcServiceServer::new(HealthService {
serving_status: service_map,
});
let health_service = web_config.enable(health_service);
let health_service = enable_grpc_web(health_service, self.flags.grpc_web);

// Auth middleware.
let security = get_api_security(self.security.to_owned());
Expand All @@ -146,9 +144,10 @@ impl ApiServer {
let mut auth_service = None;
let security = get_api_security(self.security.to_owned());
if security.is_some() {
let service = web_config.enable(AuthGrpcServiceServer::new(AuthService::new(
security.to_owned(),
)));
let service = enable_grpc_web(
AuthGrpcServiceServer::new(AuthService::new(security.to_owned())),
self.flags.grpc_web,
);
auth_service = Some(auth_middleware.layer(service));
}
let metric_middleware = MetricMiddlewareLayer::new(labels);
Expand Down
8 changes: 4 additions & 4 deletions dozer-api/src/grpc/common/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::CacheEndpoint;
use dozer_types::grpc_types::common::common_grpc_service_server::CommonGrpcService;
use dozer_types::grpc_types::conversions::field_definition_to_grpc;
use dozer_types::indexmap::IndexMap;
use dozer_types::tonic::{self, Request, Response, Status};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};

use dozer_types::grpc_types::common::{
CountResponse, GetEndpointsRequest, GetEndpointsResponse, GetFieldsRequest, GetFieldsResponse,
Expand All @@ -24,9 +24,9 @@ type ResponseStream = ReceiverStream<Result<Operation, tonic::Status>>;
// #[derive(Clone)]
pub struct CommonService {
/// For look up endpoint from its name. `key == value.endpoint.name`. Using index map to keep endpoint order.
pub endpoint_map: IndexMap<String, Arc<CacheEndpoint>>,
pub event_notifier: Option<tokio::sync::broadcast::Receiver<Operation>>,
pub default_max_num_records: usize,
endpoint_map: IndexMap<String, Arc<CacheEndpoint>>,
event_notifier: Option<tokio::sync::broadcast::Receiver<Operation>>,
default_max_num_records: usize,
}

impl CommonService {
Expand Down
2 changes: 1 addition & 1 deletion dozer-api/src/grpc/common/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use dozer_types::grpc_types::{
value, EventFilter, EventType, FieldDefinition, OperationType, RecordWithId, Type, Value,
},
};
use tonic::Request;
use dozer_types::tonic::Request;

use super::CommonService;

Expand Down
101 changes: 101 additions & 0 deletions dozer-api/src/grpc/grpc_web_middleware.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use dozer_types::tonic::{body::BoxBody, transport::NamedService};
use futures_util::Future;
use pin_project::pin_project;
use tonic_web::{CorsGrpcWeb, GrpcWebLayer, GrpcWebService};
use tower::{BoxError, Layer, Service};
use tower_http::cors::{Cors, CorsLayer};

#[derive(Debug, Clone)]
pub enum MaybeGrpcWebService<S> {
GrpcWeb(Cors<GrpcWebService<S>>),
NoGrpcWeb(S),
}

#[pin_project(project = MaybeGrpcWebServiceFutureProj)]
pub enum MaybeGrpcWebServiceFuture<S>
where
S: Service<http::Request<hyper::Body>, Response = http::Response<BoxBody>>,
S: Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError> + Send,
{
GrpcWeb(#[pin] <CorsGrpcWeb<S> as Service<http::Request<hyper::Body>>>::Future),
NoGrpcWeb(#[pin] <S as Service<http::Request<hyper::Body>>>::Future),
}

impl<S> Service<http::Request<hyper::Body>> for MaybeGrpcWebService<S>
where
S: Service<http::Request<hyper::Body>, Response = http::Response<BoxBody>>,
S: Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError> + Send,
{
type Response = S::Response;
type Error = S::Error;
type Future = MaybeGrpcWebServiceFuture<S>;

fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
match self {
MaybeGrpcWebService::GrpcWeb(service) => service.poll_ready(cx),
MaybeGrpcWebService::NoGrpcWeb(service) => service.poll_ready(cx),
}
}

fn call(&mut self, req: http::Request<hyper::Body>) -> Self::Future {
match self {
MaybeGrpcWebService::GrpcWeb(service) => {
MaybeGrpcWebServiceFuture::GrpcWeb(service.call(req))
}
MaybeGrpcWebService::NoGrpcWeb(service) => {
MaybeGrpcWebServiceFuture::NoGrpcWeb(service.call(req))
}
}
}
}

impl<S> NamedService for MaybeGrpcWebService<S>
where
S: NamedService,
{
const NAME: &'static str = S::NAME;
}

impl<S> Future for MaybeGrpcWebServiceFuture<S>
where
S: Service<http::Request<hyper::Body>, Response = http::Response<BoxBody>>,
S: Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError> + Send,
{
type Output = <<S as Service<http::Request<hyper::Body>>>::Future as Future>::Output;

fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.project();
match this {
MaybeGrpcWebServiceFutureProj::GrpcWeb(fut) => fut.poll(cx),
MaybeGrpcWebServiceFutureProj::NoGrpcWeb(fut) => fut.poll(cx),
}
}
}

pub fn enable_grpc_web<S>(service: S, enabled: bool) -> MaybeGrpcWebService<S>
where
S: Service<http::Request<hyper::Body>, Response = http::Response<BoxBody>>,
S: Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError> + Send,
{
if enabled {
let service = GrpcWebLayer::new().layer(service);
let service = CorsLayer::very_permissive().layer(service);
MaybeGrpcWebService::GrpcWeb(service)
} else {
MaybeGrpcWebService::NoGrpcWeb(service)
}
}
2 changes: 1 addition & 1 deletion dozer-api/src/grpc/health/service.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use dozer_types::grpc_types::health::health_check_response::ServingStatus;
use dozer_types::grpc_types::health::health_grpc_service_server::HealthGrpcService;
use dozer_types::grpc_types::health::{HealthCheckRequest, HealthCheckResponse};
use dozer_types::tonic::{self, Request, Response, Status};
use std::collections::HashMap;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};

type ResponseStream = ReceiverStream<Result<HealthCheckResponse, Status>>;

Expand Down
7 changes: 5 additions & 2 deletions dozer-api/src/grpc/health/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use dozer_types::tonic::Request;
use std::collections::HashMap;
use tonic::Request;

use dozer_types::grpc_types::health::health_check_response::ServingStatus;
use dozer_types::grpc_types::health::health_grpc_service_server::HealthGrpcService;
Expand Down Expand Up @@ -29,5 +29,8 @@ async fn test_grpc_health_check() {
service: "non-existent".to_string(),
}))
.await;
assert_eq!(response.unwrap_err().code(), tonic::Code::NotFound);
assert_eq!(
response.unwrap_err().code(),
dozer_types::tonic::Code::NotFound
);
}
6 changes: 3 additions & 3 deletions dozer-api/src/grpc/internal/internal_pipeline_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ use dozer_types::log::info;
use dozer_types::models::api_config::AppGrpcOptions;
use dozer_types::models::api_endpoint::ApiEndpoint;
use dozer_types::parking_lot::Mutex;
use dozer_types::tonic::transport::server::TcpIncoming;
use dozer_types::tonic::transport::Server;
use dozer_types::tonic::{self, Request, Response, Status, Streaming};
use futures_util::future::Either;
use futures_util::stream::BoxStream;
use futures_util::{Future, StreamExt, TryStreamExt};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tonic::transport::server::TcpIncoming;
use tonic::transport::Server;
use tonic::{Request, Response, Status, Streaming};

use crate::errors::GrpcError;
use crate::grpc::run_server;
Expand Down
2 changes: 1 addition & 1 deletion dozer-api/src/grpc/metric_middleware.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use dozer_tracing::LabelsAndProgress;
use dozer_types::tonic::{body::BoxBody, transport::NamedService};
use futures_util::future::BoxFuture;
use hyper::Body;
use metrics::{histogram, increment_counter};
use std::{
task::{Context, Poll},
time::Instant,
};
use tonic::{body::BoxBody, transport::NamedService};
use tower::{Layer, Service};

use crate::api_helper::{API_LATENCY_HISTOGRAM_NAME, API_REQUEST_COUNTER_NAME};
Expand Down
5 changes: 3 additions & 2 deletions dozer-api/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod health;
pub mod internal;
// pub mod dynamic;
mod auth_middleware;
mod grpc_web_middleware;
mod metric_middleware;
mod shared_impl;
pub mod typed;
Expand All @@ -13,20 +14,20 @@ pub mod types_helper;
use bytes::Bytes;
pub use client_server::ApiServer;
use dozer_types::errors::internal::BoxedError;
use dozer_types::tonic::transport::server::{Router, Routes, TcpIncoming};
use futures_util::{
stream::{AbortHandle, Abortable, Aborted},
Future,
};
use http::{Request, Response};
use hyper::Body;
use tonic::transport::server::{Router, Routes, TcpIncoming};
use tower::{Layer, Service};

async fn run_server<L, ResBody>(
server: Router<L>,
incoming: TcpIncoming,
shutdown: impl Future<Output = ()> + Send + 'static,
) -> Result<(), tonic::transport::Error>
) -> Result<(), dozer_types::tonic::transport::Error>
where
L: Layer<Routes>,
L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
Expand Down
Loading

0 comments on commit 1a2cdd4

Please sign in to comment.