Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Update tonic crates #2039

Merged
merged 6 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 138 additions & 117 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 7 additions & 9 deletions dozer-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ dozer-tracing = { path = "../dozer-tracing" }
dozer-core = { path = "../dozer-core" }

actix-cors = "0.6.3"
actix-http = { version = "3.3.0", default-features = false, features = [
actix-http = { version = "3.4.0", default-features = false, features = [
"rustls",
] }
actix-web = { version = "4.2.1", default-features = false, features = [
Expand All @@ -28,23 +28,21 @@ actix-web = { version = "4.2.1", default-features = false, features = [
actix-web-httpauth = "0.8.0"
handlebars = "4.3.7"
openapiv3 = "1.0.2"
tonic-build = "0.8.2"
tonic-build = "0.10.0"
tokio = { version = "1", features = ["full"] }
tonic = { version = "0.8.3" }
prost = "0.11.8"
prost-reflect = { version = "0.10.2", features = ["serde", "text-format"] }
tonic-reflection = "0.6.0"
prost-reflect = { version = "0.12.0", features = ["serde", "text-format"] }
tonic-reflection = "0.10.0"
Inflector = "0.11.4"
futures-util = "0.3.28"
prost-build = "0.11.6"
tonic-web = "0.4.0"
prost-build = "0.12.0"
tonic-web = "0.10.0"
jsonwebtoken = "8.3.0"
tokio-stream = "0.1.12"
async-trait = "0.1.73"
tracing-actix-web = "0.7.2"
tower = "0.4.13"
hyper = "0.14.24"
tower-http = { version = "0.3.5", features = ["full"] }
tower-http = { version = "0.4", features = ["full"] }
arc-swap = "1.6.0"
metrics = "0.21.0"
gethostname = "0.4.3"
Expand Down
2 changes: 1 addition & 1 deletion dozer-api/src/auth/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use actix_web::{
Error, HttpMessage, HttpRequest, HttpResponse,
};
use actix_web_httpauth::extractors::bearer::BearerAuth;
use dozer_types::tonic::{Response, Status};
use dozer_types::{models::api_security::ApiSecurity, serde_json::json};
use tonic::{Response, Status};

use crate::errors::{ApiError, AuthError};

Expand Down
4 changes: 2 additions & 2 deletions dozer-api/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ pub enum GrpcError {
Listen(SocketAddr, #[source] BoxedError),
}

impl From<ApiError> for tonic::Status {
impl From<ApiError> for dozer_types::tonic::Status {
fn from(input: ApiError) -> Self {
tonic::Status::new(tonic::Code::Unknown, input.to_string())
dozer_types::tonic::Status::new(dozer_types::tonic::Code::Unknown, input.to_string())
}
}

Expand Down
2 changes: 1 addition & 1 deletion dozer-api/src/grpc/auth/service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::auth::Access;

use tonic::{Request, Response, Status};
use dozer_types::tonic::{self, Request, Response, Status};

use crate::auth::api::auth_grpc;
use dozer_types::grpc_types::auth::auth_grpc_service_server::AuthGrpcService;
Expand Down
8 changes: 4 additions & 4 deletions dozer-api/src/grpc/auth_middleware.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use dozer_types::models::api_security::ApiSecurity;
use futures_util::future::BoxFuture;
use hyper::{Body, Method};
use std::task::{Context, Poll};
use tonic::{
use dozer_types::tonic::{
body::{empty_body, BoxBody},
codegen::http,
transport::NamedService,
};
use futures_util::future::BoxFuture;
use hyper::{Body, Method};
use std::task::{Context, Poll};
use tower::{Layer, Service};

use crate::auth::Authorizer;
Expand Down
29 changes: 14 additions & 15 deletions dozer-api/src/grpc/client_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use super::{auth_middleware::AuthMiddlewareLayer, common::CommonService, typed::
use crate::api_helper::get_api_security;
use crate::errors::ApiInitError;
use crate::grpc::auth::AuthService;
use crate::grpc::grpc_web_middleware::enable_grpc_web;
use crate::grpc::health::HealthService;
use crate::grpc::{common, run_server, typed};
use crate::{errors::GrpcError, CacheEndpoint};
Expand All @@ -14,6 +15,8 @@ use dozer_types::grpc_types::{
common::common_grpc_service_server::CommonGrpcServiceServer,
health::health_grpc_service_server::HealthGrpcServiceServer,
};
use dozer_types::tonic::transport::server::TcpIncoming;
use dozer_types::tonic::transport::Server;
use dozer_types::tracing::Level;
use dozer_types::{
log::info,
Expand All @@ -22,8 +25,6 @@ use dozer_types::{
use futures_util::Future;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::broadcast::{self, Receiver};
use tonic::transport::server::TcpIncoming;
use tonic::transport::Server;
use tonic_reflection::server::{ServerReflection, ServerReflectionServer};
use tower::Layer;
use tower_http::trace::{self, TraceLayer};
Expand Down Expand Up @@ -92,27 +93,24 @@ impl ApiServer {
operations_receiver: Option<Receiver<Operation>>,
labels: LabelsAndProgress,
default_max_num_records: usize,
) -> Result<impl Future<Output = Result<(), tonic::transport::Error>>, ApiInitError> {
) -> Result<impl Future<Output = Result<(), dozer_types::tonic::transport::Error>>, ApiInitError>
{
// Create our services.
let mut web_config = tonic_web::config();
if self.flags.grpc_web {
web_config = web_config.allow_all_origins();
}

let common_service = CommonGrpcServiceServer::new(CommonService::new(
cache_endpoints.clone(),
operations_receiver.as_ref().map(|r| r.resubscribe()),
default_max_num_records,
));
let common_service = web_config.enable(common_service);
let common_service = enable_grpc_web(common_service, self.flags.grpc_web);

let (typed_service, reflection_service) = self.get_dynamic_service(
cache_endpoints,
operations_receiver,
default_max_num_records,
)?;
let typed_service = typed_service.map(|typed_service| web_config.enable(typed_service));
let reflection_service = web_config.enable(reflection_service);
let typed_service =
typed_service.map(|typed_service| enable_grpc_web(typed_service, self.flags.grpc_web));
let reflection_service = enable_grpc_web(reflection_service, self.flags.grpc_web);

let mut service_map: HashMap<String, ServingStatus> = HashMap::new();
service_map.insert("".to_string(), ServingStatus::Serving);
Expand All @@ -125,7 +123,7 @@ impl ApiServer {
let health_service = HealthGrpcServiceServer::new(HealthService {
serving_status: service_map,
});
let health_service = web_config.enable(health_service);
let health_service = enable_grpc_web(health_service, self.flags.grpc_web);

// Auth middleware.
let security = get_api_security(self.security.to_owned());
Expand All @@ -146,9 +144,10 @@ impl ApiServer {
let mut auth_service = None;
let security = get_api_security(self.security.to_owned());
if security.is_some() {
let service = web_config.enable(AuthGrpcServiceServer::new(AuthService::new(
security.to_owned(),
)));
let service = enable_grpc_web(
AuthGrpcServiceServer::new(AuthService::new(security.to_owned())),
self.flags.grpc_web,
);
auth_service = Some(auth_middleware.layer(service));
}
let metric_middleware = MetricMiddlewareLayer::new(labels);
Expand Down
8 changes: 4 additions & 4 deletions dozer-api/src/grpc/common/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::CacheEndpoint;
use dozer_types::grpc_types::common::common_grpc_service_server::CommonGrpcService;
use dozer_types::grpc_types::conversions::field_definition_to_grpc;
use dozer_types::indexmap::IndexMap;
use dozer_types::tonic::{self, Request, Response, Status};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};

use dozer_types::grpc_types::common::{
CountResponse, GetEndpointsRequest, GetEndpointsResponse, GetFieldsRequest, GetFieldsResponse,
Expand All @@ -24,9 +24,9 @@ type ResponseStream = ReceiverStream<Result<Operation, tonic::Status>>;
// #[derive(Clone)]
pub struct CommonService {
/// For look up endpoint from its name. `key == value.endpoint.name`. Using index map to keep endpoint order.
pub endpoint_map: IndexMap<String, Arc<CacheEndpoint>>,
pub event_notifier: Option<tokio::sync::broadcast::Receiver<Operation>>,
pub default_max_num_records: usize,
endpoint_map: IndexMap<String, Arc<CacheEndpoint>>,
event_notifier: Option<tokio::sync::broadcast::Receiver<Operation>>,
default_max_num_records: usize,
}

impl CommonService {
Expand Down
2 changes: 1 addition & 1 deletion dozer-api/src/grpc/common/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use dozer_types::grpc_types::{
value, EventFilter, EventType, FieldDefinition, OperationType, RecordWithId, Type, Value,
},
};
use tonic::Request;
use dozer_types::tonic::Request;

use super::CommonService;

Expand Down
101 changes: 101 additions & 0 deletions dozer-api/src/grpc/grpc_web_middleware.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use dozer_types::tonic::{body::BoxBody, transport::NamedService};
use futures_util::Future;
use pin_project::pin_project;
use tonic_web::{CorsGrpcWeb, GrpcWebLayer, GrpcWebService};
use tower::{BoxError, Layer, Service};
use tower_http::cors::{Cors, CorsLayer};

#[derive(Debug, Clone)]
pub enum MaybeGrpcWebService<S> {
GrpcWeb(Cors<GrpcWebService<S>>),
NoGrpcWeb(S),
}

#[pin_project(project = MaybeGrpcWebServiceFutureProj)]
pub enum MaybeGrpcWebServiceFuture<S>
where
S: Service<http::Request<hyper::Body>, Response = http::Response<BoxBody>>,
S: Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError> + Send,
{
GrpcWeb(#[pin] <CorsGrpcWeb<S> as Service<http::Request<hyper::Body>>>::Future),
NoGrpcWeb(#[pin] <S as Service<http::Request<hyper::Body>>>::Future),
}

impl<S> Service<http::Request<hyper::Body>> for MaybeGrpcWebService<S>
where
S: Service<http::Request<hyper::Body>, Response = http::Response<BoxBody>>,
S: Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError> + Send,
{
type Response = S::Response;
type Error = S::Error;
type Future = MaybeGrpcWebServiceFuture<S>;

fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
match self {
MaybeGrpcWebService::GrpcWeb(service) => service.poll_ready(cx),
MaybeGrpcWebService::NoGrpcWeb(service) => service.poll_ready(cx),
}
}

fn call(&mut self, req: http::Request<hyper::Body>) -> Self::Future {
match self {
MaybeGrpcWebService::GrpcWeb(service) => {
MaybeGrpcWebServiceFuture::GrpcWeb(service.call(req))
}
MaybeGrpcWebService::NoGrpcWeb(service) => {
MaybeGrpcWebServiceFuture::NoGrpcWeb(service.call(req))
}
}
}
}

impl<S> NamedService for MaybeGrpcWebService<S>
where
S: NamedService,
{
const NAME: &'static str = S::NAME;
}

impl<S> Future for MaybeGrpcWebServiceFuture<S>
where
S: Service<http::Request<hyper::Body>, Response = http::Response<BoxBody>>,
S: Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError> + Send,
{
type Output = <<S as Service<http::Request<hyper::Body>>>::Future as Future>::Output;

fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.project();
match this {
MaybeGrpcWebServiceFutureProj::GrpcWeb(fut) => fut.poll(cx),
MaybeGrpcWebServiceFutureProj::NoGrpcWeb(fut) => fut.poll(cx),
}
}
}

pub fn enable_grpc_web<S>(service: S, enabled: bool) -> MaybeGrpcWebService<S>
where
S: Service<http::Request<hyper::Body>, Response = http::Response<BoxBody>>,
S: Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError> + Send,
{
if enabled {
let service = GrpcWebLayer::new().layer(service);
let service = CorsLayer::very_permissive().layer(service);
MaybeGrpcWebService::GrpcWeb(service)
} else {
MaybeGrpcWebService::NoGrpcWeb(service)
}
}
2 changes: 1 addition & 1 deletion dozer-api/src/grpc/health/service.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use dozer_types::grpc_types::health::health_check_response::ServingStatus;
use dozer_types::grpc_types::health::health_grpc_service_server::HealthGrpcService;
use dozer_types::grpc_types::health::{HealthCheckRequest, HealthCheckResponse};
use dozer_types::tonic::{self, Request, Response, Status};
use std::collections::HashMap;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};

type ResponseStream = ReceiverStream<Result<HealthCheckResponse, Status>>;

Expand Down
7 changes: 5 additions & 2 deletions dozer-api/src/grpc/health/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use dozer_types::tonic::Request;
use std::collections::HashMap;
use tonic::Request;

use dozer_types::grpc_types::health::health_check_response::ServingStatus;
use dozer_types::grpc_types::health::health_grpc_service_server::HealthGrpcService;
Expand Down Expand Up @@ -29,5 +29,8 @@ async fn test_grpc_health_check() {
service: "non-existent".to_string(),
}))
.await;
assert_eq!(response.unwrap_err().code(), tonic::Code::NotFound);
assert_eq!(
response.unwrap_err().code(),
dozer_types::tonic::Code::NotFound
);
}
6 changes: 3 additions & 3 deletions dozer-api/src/grpc/internal/internal_pipeline_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ use dozer_types::log::info;
use dozer_types::models::api_config::AppGrpcOptions;
use dozer_types::models::api_endpoint::ApiEndpoint;
use dozer_types::parking_lot::Mutex;
use dozer_types::tonic::transport::server::TcpIncoming;
use dozer_types::tonic::transport::Server;
use dozer_types::tonic::{self, Request, Response, Status, Streaming};
use futures_util::future::Either;
use futures_util::stream::BoxStream;
use futures_util::{Future, StreamExt, TryStreamExt};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tonic::transport::server::TcpIncoming;
use tonic::transport::Server;
use tonic::{Request, Response, Status, Streaming};

use crate::errors::GrpcError;
use crate::grpc::run_server;
Expand Down
2 changes: 1 addition & 1 deletion dozer-api/src/grpc/metric_middleware.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use dozer_tracing::LabelsAndProgress;
use dozer_types::tonic::{body::BoxBody, transport::NamedService};
use futures_util::future::BoxFuture;
use hyper::Body;
use metrics::{histogram, increment_counter};
use std::{
task::{Context, Poll},
time::Instant,
};
use tonic::{body::BoxBody, transport::NamedService};
use tower::{Layer, Service};

use crate::api_helper::{API_LATENCY_HISTOGRAM_NAME, API_REQUEST_COUNTER_NAME};
Expand Down
5 changes: 3 additions & 2 deletions dozer-api/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod health;
pub mod internal;
// pub mod dynamic;
mod auth_middleware;
mod grpc_web_middleware;
mod metric_middleware;
mod shared_impl;
pub mod typed;
Expand All @@ -13,20 +14,20 @@ pub mod types_helper;
use bytes::Bytes;
pub use client_server::ApiServer;
use dozer_types::errors::internal::BoxedError;
use dozer_types::tonic::transport::server::{Router, Routes, TcpIncoming};
use futures_util::{
stream::{AbortHandle, Abortable, Aborted},
Future,
};
use http::{Request, Response};
use hyper::Body;
use tonic::transport::server::{Router, Routes, TcpIncoming};
use tower::{Layer, Service};

async fn run_server<L, ResBody>(
server: Router<L>,
incoming: TcpIncoming,
shutdown: impl Future<Output = ()> + Send + 'static,
) -> Result<(), tonic::transport::Error>
) -> Result<(), dozer_types::tonic::transport::Error>
where
L: Layer<Routes>,
L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
Expand Down
Loading