diff --git a/Cargo.lock b/Cargo.lock index ed355d77..1237abb1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -656,7 +656,6 @@ dependencies = [ "serde", "tokio", "tokio-stream", - "tonic-web", "tracing", "tracing-subscriber", "volo", diff --git a/benchmark/scripts/reports/diff.py b/benchmark/scripts/reports/diff.py index e4e57ba0..1640e6a8 100644 --- a/benchmark/scripts/reports/diff.py +++ b/benchmark/scripts/reports/diff.py @@ -23,7 +23,7 @@ class bcolors: def diff(from_csv, to_csv): from_reader = list(csv.reader(open(from_csv))) to_reader = csv.reader(open(to_csv)) - title = ['Kind', 'Concurrency', 'Data Size', 'QPS', 'P99', 'P999', 'Client CPU', 'Server CPU'] + title = ['Kind', 'Concurrency', 'Data Size', 'QPS', 'P99', 'P999', 'Server CPU', 'Client CPU'] results = [] for line_num, line in enumerate(to_reader): diff --git a/benchmark/scripts/reports/render_images.py b/benchmark/scripts/reports/render_images.py index c24e3a7a..6664ba48 100644 --- a/benchmark/scripts/reports/render_images.py +++ b/benchmark/scripts/reports/render_images.py @@ -57,7 +57,7 @@ def parse_y(lines, idx, times=1): # TODO color_dict = { - "[volo]": "royalblue", + "[thrift]": "royalblue", } diff --git a/benchmark/scripts/util.sh b/benchmark/scripts/util.sh index 1a5ed5df..4b79acb8 100755 --- a/benchmark/scripts/util.sh +++ b/benchmark/scripts/util.sh @@ -4,4 +4,4 @@ function check_supported_env() { darwin*) ;; *) echo "[ERROR] volo benchmark is not supported on $OSTYPE"; exit 1;; esac -} \ No newline at end of file +} diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 48eb7928..6eca718e 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -125,7 +125,6 @@ motore.workspace = true serde.workspace = true tokio = { workspace = true, features = ["full"] } tokio-stream.workspace = true -tonic-web.workspace = true tracing.workspace = true tracing-subscriber.workspace = true diff --git a/volo-thrift/src/codec/default/mod.rs b/volo-thrift/src/codec/default/mod.rs index beea4a08..2e48297d 100644 --- a/volo-thrift/src/codec/default/mod.rs +++ b/volo-thrift/src/codec/default/mod.rs @@ -32,9 +32,9 @@ use std::future::Future; use bytes::Bytes; use linkedbytes::LinkedBytes; use pilota::thrift::ThriftException; -use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt}; +use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, Interest}; use tracing::{trace, warn}; -use volo::util::buf_reader::BufReader; +use volo::{net::ready::AsyncReady, util::buf_reader::BufReader}; use self::{framed::MakeFramedCodec, thrift::MakeThriftCodec, ttheader::MakeTTHeaderCodec}; use super::{Decoder, Encoder, MakeCodec}; @@ -115,7 +115,7 @@ pub struct DefaultEncoder { linked_bytes: LinkedBytes, } -impl Encoder +impl Encoder for DefaultEncoder { #[inline] @@ -180,6 +180,20 @@ impl Encoder } // write_result } + + async fn is_closed(&self) -> bool { + match self + .writer + .ready(Interest::READABLE | Interest::WRITABLE) + .await + { + Ok(ready) => ready.is_read_closed() || ready.is_write_closed(), + Err(e) => { + warn!("[VOLO] thrift codec write half ready error: {}", e); + true + } + } + } } pub struct DefaultDecoder { @@ -187,7 +201,7 @@ pub struct DefaultDecoder { reader: BufReader, } -impl Decoder +impl Decoder for DefaultDecoder { #[inline] @@ -274,8 +288,8 @@ impl Default for DefaultMakeCodec MakeCodec for DefaultMakeCodec where MkZC: MakeZeroCopyCodec, - R: AsyncRead + Unpin + Send + Sync + 'static, - W: AsyncWrite + Unpin + Send + Sync + 'static, + R: AsyncRead + AsyncReady + Unpin + Send + Sync + 'static, + W: AsyncWrite + AsyncReady + Unpin + Send + Sync + 'static, { type Encoder = DefaultEncoder; type Decoder = DefaultDecoder; diff --git a/volo-thrift/src/codec/mod.rs b/volo-thrift/src/codec/mod.rs index ee3ce43e..6f191b1b 100644 --- a/volo-thrift/src/codec/mod.rs +++ b/volo-thrift/src/codec/mod.rs @@ -14,22 +14,30 @@ pub use default::DefaultMakeCodec; /// Returning an Ok(None) indicates the EOF has been reached. /// /// Note: [`Decoder`] should be designed to be ready for reuse. -pub trait Decoder: Send + 'static { +pub trait Decoder: Send + Sync + 'static { fn decode( &mut self, cx: &mut Cx, ) -> impl Future>, ThriftException>> + Send; + + fn is_closed(&self) -> impl Future + Send { + async { false } + } } /// [`Encoder`] writes a [`ThriftMessage`] to an [`AsyncWrite`] and flushes the data. /// /// Note: [`Encoder`] should be designed to be ready for reuse. -pub trait Encoder: Send + 'static { +pub trait Encoder: Send + Sync + 'static { fn encode( &mut self, cx: &mut Cx, msg: ThriftMessage, ) -> impl Future> + Send; + + fn is_closed(&self) -> impl Future + Send { + async { false } + } } /// [`MakeCodec`] receives an [`AsyncRead`] and an [`AsyncWrite`] and returns a diff --git a/volo-thrift/src/transport/multiplex/client.rs b/volo-thrift/src/transport/multiplex/client.rs index e874fca1..dd07fe86 100644 --- a/volo-thrift/src/transport/multiplex/client.rs +++ b/volo-thrift/src/transport/multiplex/client.rs @@ -150,7 +150,7 @@ where } } if cx.transport.should_reuse && resp.is_ok() { - transport.reuse(); + transport.reuse().await; } resp } diff --git a/volo-thrift/src/transport/multiplex/thrift_transport.rs b/volo-thrift/src/transport/multiplex/thrift_transport.rs index 55e171c4..b672b29a 100644 --- a/volo-thrift/src/transport/multiplex/thrift_transport.rs +++ b/volo-thrift/src/transport/multiplex/thrift_transport.rs @@ -359,8 +359,8 @@ where } } -impl Poolable for ThriftTransport { - fn reusable(&self) -> bool { +impl Poolable for ThriftTransport { + async fn reusable(&self) -> bool { !self.write_error.load(std::sync::atomic::Ordering::Relaxed) && !self.read_error.load(std::sync::atomic::Ordering::Relaxed) && !self.read_closed.load(std::sync::atomic::Ordering::Relaxed) diff --git a/volo-thrift/src/transport/pingpong/client.rs b/volo-thrift/src/transport/pingpong/client.rs index 5ea84e92..a3c942be 100644 --- a/volo-thrift/src/transport/pingpong/client.rs +++ b/volo-thrift/src/transport/pingpong/client.rs @@ -137,7 +137,7 @@ where } } if cx.transport.should_reuse && resp.is_ok() { - transport.reuse(); + transport.reuse().await; } resp } diff --git a/volo-thrift/src/transport/pingpong/thrift_transport.rs b/volo-thrift/src/transport/pingpong/thrift_transport.rs index 76badb22..f7ad2fa8 100644 --- a/volo-thrift/src/transport/pingpong/thrift_transport.rs +++ b/volo-thrift/src/transport/pingpong/thrift_transport.rs @@ -146,7 +146,10 @@ where E: Encoder, D: Decoder, { - fn reusable(&self) -> bool { - self.read_half.reusable && self.write_half.reusable + async fn reusable(&self) -> bool { + self.read_half.reusable + && self.write_half.reusable + && !self.read_half.decoder.is_closed().await + && !self.write_half.encoder.is_closed().await } } diff --git a/volo-thrift/src/transport/pool/mod.rs b/volo-thrift/src/transport/pool/mod.rs index cd52e37d..62365576 100644 --- a/volo-thrift/src/transport/pool/mod.rs +++ b/volo-thrift/src/transport/pool/mod.rs @@ -44,7 +44,7 @@ pub enum Ver { pub trait Poolable: Sized { // check if the connection is opened - fn reusable(&self) -> bool; + fn reusable(&self) -> impl Future + Send; /// Reserve this connection. /// @@ -158,53 +158,6 @@ impl Expiration { } } -/// Pop off this list, looking for a usable connection that hasn't expired. -struct IdlePopper<'a, K: Key, T> { - key: &'a K, - list: &'a mut VecDeque>, -} - -impl<'a, K: Key, T: Poolable + 'a> IdlePopper<'a, K, T> { - fn pop(self, expiration: &Expiration) -> Option> { - while let Some(entry) = self.list.pop_front() { - // If the connection has been closed, or is older than our idle - // timeout, simply drop it and keep looking... - if !entry.inner.reusable() { - tracing::trace!("[VOLO] removing closed connection for {:?}", self.key); - continue; - } - // TODO: Actually, since the `idle` list is pushed to the end always, - // that would imply that if *this* entry is expired, then anything - // "earlier" in the list would *have* to be expired also... Right? - // - // In that case, we could just break out of the loop and drop the - // whole list... - if expiration.expires(entry.idle_at) { - tracing::trace!("[VOLO] removing expired connection for {:?}", self.key); - continue; - } - - let value = match entry.inner.reserve() { - Reservation::Shared(to_reinsert, to_return) => { - self.list.push_back(Idle { - idle_at: Instant::now(), - inner: to_reinsert, - }); - to_return - } - Reservation::Unique(unique) => unique, - }; - - return Some(Idle { - idle_at: entry.idle_at, - inner: value, - }); - } - - None - } -} - impl Pool { #[allow(dead_code)] pub fn new(cfg: Option) -> Self { @@ -267,19 +220,56 @@ impl Pool { MT::Error: Into + Send, { let (rx, _waiter_token) = { - let mut inner = self.inner.lock().volo_unwrap(); - // 1. check the idle and opened connections - let expiration = Expiration::new(Some(inner.timeout)); - let entry = inner.idle.get_mut(&key).and_then(|list| { - tracing::trace!("[VOLO] take? {:?}: expiration = {:?}", key, expiration.0); - { - let popper = IdlePopper { key: &key, list }; - popper.pop(&expiration) + let entry = 'outer: loop { + let entry = 'inner: { + let mut inner = self.inner.lock().volo_unwrap(); + // 1. check the idle and opened connections + let expiration = Expiration::new(Some(inner.timeout)); + + if let Some(list) = inner.idle.get_mut(&key) { + tracing::trace!("[VOLO] take? {:?}: expiration = {:?}", key, expiration.0); + while let Some(entry) = list.pop_front() { + // TODO: Actually, since the `idle` list is pushed to the end always, + // that would imply that if *this* entry is expired, then anything + // "earlier" in the list would *have* to be expired also... Right? + // + // In that case, we could just break out of the loop and drop the + // whole list... + if expiration.expires(entry.idle_at) { + tracing::trace!("[VOLO] removing expired connection for {:?}", key); + continue; + } + break 'inner entry; + } + break 'outer None; + } else { + break 'outer None; + } + }; + // If the connection has been closed, or is older than our idle + // timeout, simply drop it and keep looking... + if !entry.inner.reusable().await { + continue; } - }); + break 'outer Some(entry); + }; + + let mut inner = self.inner.lock().volo_unwrap(); if let Some(t) = entry { - return Ok(self.reuse(&key, t.inner)); + let value = match t.inner.reserve() { + Reservation::Shared(to_reinsert, to_return) => { + if let Some(list) = inner.idle.get_mut(&key) { + list.push_back(Idle { + idle_at: Instant::now(), + inner: to_reinsert, + }) + } + to_return + } + Reservation::Unique(unique) => unique, + }; + return Ok(self.reuse(&key, value)); } // 2. no valid idle then add caller into waiters and make connection let waiters = if let Some(waiter) = inner.waiters.get_mut(&key) { @@ -440,9 +430,9 @@ impl Pooled { } } - pub(crate) fn reuse(mut self) { + pub(crate) async fn reuse(mut self) { let inner = self.t.take().volo_unwrap(); - if !inner.reusable() { + if !inner.reusable().await { // If we *already* know the connection is done here, // it shouldn't be re-inserted back into the pool. return; @@ -544,11 +534,10 @@ impl Inner { let now = Instant::now(); self.idle.retain(|key, values| { values.retain(|entry| { + // if !entry.inner.reusable().await { + // continue; + // } // TODO: check has_idle && remove the (idle, waiters) key - if !entry.inner.reusable() { - tracing::trace!("[VOLO] idle interval evicting closed for {:?}", key); - return false; - } if now - entry.idle_at > timeout { tracing::trace!("[VOLO] idle interval evicting expired for {:?}", key); return false; diff --git a/volo/src/net/mod.rs b/volo/src/net/mod.rs index 097a2d5c..93b85f6f 100644 --- a/volo/src/net/mod.rs +++ b/volo/src/net/mod.rs @@ -1,6 +1,7 @@ pub mod conn; pub mod dial; pub mod incoming; +pub mod ready; #[cfg(feature = "__tls")] #[cfg_attr(docsrs, doc(cfg(any(feature = "rustls", feature = "native-tls"))))] pub mod tls; diff --git a/volo/src/net/ready.rs b/volo/src/net/ready.rs new file mode 100644 index 00000000..ded91d9e --- /dev/null +++ b/volo/src/net/ready.rs @@ -0,0 +1,43 @@ +use futures::Future; +use tokio::io::{self, Interest, Ready}; + +use super::conn::{OwnedReadHalf, OwnedWriteHalf}; + +/// Asynchronous IO readiness. +/// +/// Like [`tokio::io::AsyncRead`] or [`tokio::io::AsyncWrite`], but for +/// readiness events. +pub trait AsyncReady { + /// Checks for IO readiness. + /// + /// See [`tokio::net::TcpStream::ready`] for details. + fn ready(&self, interest: Interest) -> impl Future> + Send; +} + +impl AsyncReady for OwnedReadHalf { + async fn ready(&self, interest: Interest) -> io::Result { + match self { + OwnedReadHalf::Tcp(half) => half.ready(interest).await, + #[cfg(target_family = "unix")] + OwnedReadHalf::Unix(half) => half.ready(interest).await, + #[cfg(feature = "rustls")] + OwnedReadHalf::Rustls(_) => todo!(), + #[cfg(feature = "native-tls")] + OwnedReadHalf::NativeTls(_) => todo!(), + } + } +} + +impl AsyncReady for OwnedWriteHalf { + async fn ready(&self, interest: Interest) -> io::Result { + match self { + OwnedWriteHalf::Tcp(half) => half.ready(interest).await, + #[cfg(target_family = "unix")] + OwnedWriteHalf::Unix(half) => half.ready(interest).await, + #[cfg(feature = "rustls")] + OwnedWriteHalf::Rustls(_) => todo!(), + #[cfg(feature = "native-tls")] + OwnedWriteHalf::NativeTls(_) => todo!(), + } + } +} diff --git a/volo/src/util/buf_reader.rs b/volo/src/util/buf_reader.rs index e6742db2..1b831e70 100644 --- a/volo/src/util/buf_reader.rs +++ b/volo/src/util/buf_reader.rs @@ -10,6 +10,8 @@ use std::{ use pin_project::pin_project; use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt, ReadBuf}; +use crate::net::ready::AsyncReady; + // used by `BufReader` and `BufWriter` // https://github.com/rust-lang/rust/blob/master/library/std/src/sys_common/io.rs#L1 const DEFAULT_BUF_SIZE: usize = 8 * 1024; @@ -223,6 +225,12 @@ impl AsyncBufRead for BufReader { } } +impl AsyncReady for BufReader { + async fn ready(&self, interest: tokio::io::Interest) -> io::Result { + self.inner.ready(interest).await + } +} + impl fmt::Debug for BufReader { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("BufReader")