diff --git a/tonic/Cargo.toml b/tonic/Cargo.toml index ff029f042..99388e357 100644 --- a/tonic/Cargo.toml +++ b/tonic/Cargo.toml @@ -28,7 +28,7 @@ gzip = ["dep:flate2"] zstd = ["dep:zstd"] default = ["transport", "codegen", "prost"] prost = ["dep:prost"] -tls = ["dep:rustls-pki-types", "dep:rustls-pemfile", "transport", "dep:tokio-rustls", "tokio/rt", "tokio/macros"] +tls = ["dep:rustls-pki-types", "dep:rustls-pemfile", "transport", "dep:tokio-rustls", "dep:tokio", "tokio?/rt", "tokio?/macros"] tls-roots = ["tls-roots-common", "dep:rustls-native-certs"] tls-roots-common = ["tls"] tls-webpki-roots = ["tls-roots-common", "dep:webpki-roots"] @@ -38,8 +38,7 @@ transport = [ "channel", "dep:h2", "dep:hyper", - "tokio/net", - "tokio/time", + "dep:tokio", "tokio?/net", "tokio?/time", "dep:tower", "dep:hyper-timeout", ] @@ -55,7 +54,6 @@ bytes = "1.0" http = "0.2" tracing = "0.1" -tokio = "1.0.1" http-body = "0.4.4" percent-encoding = "2.1" pin-project = "1.0.11" @@ -72,6 +70,7 @@ async-trait = {version = "0.1.13", optional = true} h2 = {version = "0.3.24", optional = true} hyper = {version = "0.14.26", features = ["full"], optional = true} hyper-timeout = {version = "0.4", optional = true} +tokio = {version = "1.0.1", optional = true} tokio-stream = "0.1" tower = {version = "0.4.7", default-features = false, features = ["balance", "buffer", "discover", "limit", "load", "make", "timeout", "util"], optional = true} axum = {version = "0.6.9", default_features = false, optional = true} diff --git a/tonic/src/client/grpc.rs b/tonic/src/client/grpc.rs index e070f08d3..b02ebb949 100644 --- a/tonic/src/client/grpc.rs +++ b/tonic/src/client/grpc.rs @@ -11,7 +11,7 @@ use http::{ uri::{PathAndQuery, Uri}, }; use http_body::Body; -use std::{fmt, future}; +use std::{fmt, future, pin::pin}; use tokio_stream::{Stream, StreamExt}; /// A gRPC client dispatcher. @@ -239,7 +239,7 @@ impl Grpc { let (mut parts, body, extensions) = self.streaming(request, path, codec).await?.into_parts(); - tokio::pin!(body); + let mut body = pin!(body); let message = body .try_next() diff --git a/tonic/src/codec/prost.rs b/tonic/src/codec/prost.rs index d2f1652f4..aa872a9ba 100644 --- a/tonic/src/codec/prost.rs +++ b/tonic/src/codec/prost.rs @@ -84,6 +84,7 @@ mod tests { use crate::{Code, Status}; use bytes::{Buf, BufMut, BytesMut}; use http_body::Body; + use std::pin::pin; const LEN: usize = 10000; // The maximum uncompressed size in bytes for a message. Set to 2MB. @@ -157,15 +158,13 @@ mod tests { let messages = std::iter::repeat_with(move || Ok::<_, Status>(msg.clone())).take(10000); let source = tokio_stream::iter(messages); - let body = encode_server( + let mut body = pin!(encode_server( encoder, source, None, SingleMessageCompressionOverride::default(), None, - ); - - tokio::pin!(body); + )); while let Some(r) = body.data().await { r.unwrap(); @@ -181,15 +180,13 @@ mod tests { let messages = std::iter::once(Ok::<_, Status>(msg)); let source = tokio_stream::iter(messages); - let body = encode_server( + let mut body = pin!(encode_server( encoder, source, None, SingleMessageCompressionOverride::default(), Some(MAX_MESSAGE_SIZE), - ); - - tokio::pin!(body); + )); assert!(body.data().await.is_none()); assert_eq!( @@ -215,15 +212,13 @@ mod tests { let messages = std::iter::once(Ok::<_, Status>(msg)); let source = tokio_stream::iter(messages); - let body = encode_server( + let mut body = pin!(encode_server( encoder, source, None, SingleMessageCompressionOverride::default(), Some(usize::MAX), - ); - - tokio::pin!(body); + )); assert!(body.data().await.is_none()); assert_eq!( diff --git a/tonic/src/server/grpc.rs b/tonic/src/server/grpc.rs index ec94b97fb..5330b30ed 100644 --- a/tonic/src/server/grpc.rs +++ b/tonic/src/server/grpc.rs @@ -8,7 +8,7 @@ use crate::{ Code, Request, Status, }; use http_body::Body; -use std::fmt; +use std::{fmt, pin::pin}; use tokio_stream::{Stream, StreamExt}; macro_rules! t { @@ -375,14 +375,12 @@ where let (parts, body) = request.into_parts(); - let stream = Streaming::new_request( + let mut stream = pin!(Streaming::new_request( self.codec.decoder(), body, request_compression_encoding, self.max_decoding_message_size, - ); - - tokio::pin!(stream); + )); let message = stream .try_next() diff --git a/tonic/src/transport/server/incoming.rs b/tonic/src/transport/server/incoming.rs index 61aadc93d..bc1bb7650 100644 --- a/tonic/src/transport/server/incoming.rs +++ b/tonic/src/transport/server/incoming.rs @@ -6,7 +6,7 @@ use hyper::server::{ }; use std::{ net::SocketAddr, - pin::Pin, + pin::{pin, Pin}, task::{Context, Poll}, time::Duration, }; @@ -26,7 +26,7 @@ where IE: Into, { async_stream::try_stream! { - tokio::pin!(incoming); + let mut incoming = pin!(incoming); while let Some(item) = incoming.next().await { yield item.map(ServerIo::new_io)? @@ -44,7 +44,7 @@ where IE: Into, { async_stream::try_stream! { - tokio::pin!(incoming); + let mut incoming = pin!(incoming); let mut tasks = tokio::task::JoinSet::new();