Skip to content

Commit

Permalink
feat: don't reuse connection when it closed
Browse files Browse the repository at this point in the history
  • Loading branch information
Millione committed Aug 1, 2024
1 parent e32aac4 commit 95f9ef9
Show file tree
Hide file tree
Showing 15 changed files with 147 additions and 83 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion benchmark/scripts/reports/diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class bcolors:
def diff(from_csv, to_csv):
from_reader = list(csv.reader(open(from_csv)))
to_reader = csv.reader(open(to_csv))
title = ['Kind', 'Concurrency', 'Data Size', 'QPS', 'P99', 'P999', 'Client CPU', 'Server CPU']
title = ['Kind', 'Concurrency', 'Data Size', 'QPS', 'P99', 'P999', 'Server CPU', 'Client CPU']
results = []

for line_num, line in enumerate(to_reader):
Expand Down
2 changes: 1 addition & 1 deletion benchmark/scripts/reports/render_images.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def parse_y(lines, idx, times=1):

# TODO
color_dict = {
"[volo]": "royalblue",
"[thrift]": "royalblue",
}


Expand Down
2 changes: 1 addition & 1 deletion benchmark/scripts/util.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ function check_supported_env() {
darwin*) ;;
*) echo "[ERROR] volo benchmark is not supported on $OSTYPE"; exit 1;;
esac
}
}
1 change: 0 additions & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ motore.workspace = true
serde.workspace = true
tokio = { workspace = true, features = ["full"] }
tokio-stream.workspace = true
tonic-web.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true

Expand Down
26 changes: 20 additions & 6 deletions volo-thrift/src/codec/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ use std::future::Future;
use bytes::Bytes;
use linkedbytes::LinkedBytes;
use pilota::thrift::ThriftException;
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, Interest};
use tracing::{trace, warn};
use volo::util::buf_reader::BufReader;
use volo::{net::ready::AsyncReady, util::buf_reader::BufReader};

use self::{framed::MakeFramedCodec, thrift::MakeThriftCodec, ttheader::MakeTTHeaderCodec};
use super::{Decoder, Encoder, MakeCodec};
Expand Down Expand Up @@ -115,7 +115,7 @@ pub struct DefaultEncoder<E, W> {
linked_bytes: LinkedBytes,
}

impl<E: ZeroCopyEncoder, W: AsyncWrite + Unpin + Send + Sync + 'static> Encoder
impl<E: ZeroCopyEncoder, W: AsyncWrite + AsyncReady + Unpin + Send + Sync + 'static> Encoder
for DefaultEncoder<E, W>
{
#[inline]
Expand Down Expand Up @@ -180,14 +180,28 @@ impl<E: ZeroCopyEncoder, W: AsyncWrite + Unpin + Send + Sync + 'static> Encoder
}
// write_result
}

async fn is_closed(&self) -> bool {
match self
.writer
.ready(Interest::READABLE | Interest::WRITABLE)
.await
{
Ok(ready) => ready.is_read_closed() || ready.is_write_closed(),
Err(e) => {
warn!("[VOLO] thrift codec write half ready error: {}", e);
true
}
}
}
}

pub struct DefaultDecoder<D, R> {
decoder: D,
reader: BufReader<R>,
}

impl<D: ZeroCopyDecoder, R: AsyncRead + Unpin + Send + Sync + 'static> Decoder
impl<D: ZeroCopyDecoder, R: AsyncRead + AsyncReady + Unpin + Send + Sync + 'static> Decoder
for DefaultDecoder<D, R>
{
#[inline]
Expand Down Expand Up @@ -274,8 +288,8 @@ impl Default for DefaultMakeCodec<MakeTTHeaderCodec<MakeFramedCodec<MakeThriftCo
impl<MkZC, R, W> MakeCodec<R, W> for DefaultMakeCodec<MkZC>
where
MkZC: MakeZeroCopyCodec,
R: AsyncRead + Unpin + Send + Sync + 'static,
W: AsyncWrite + Unpin + Send + Sync + 'static,
R: AsyncRead + AsyncReady + Unpin + Send + Sync + 'static,
W: AsyncWrite + AsyncReady + Unpin + Send + Sync + 'static,
{
type Encoder = DefaultEncoder<MkZC::Encoder, W>;
type Decoder = DefaultDecoder<MkZC::Decoder, R>;
Expand Down
12 changes: 10 additions & 2 deletions volo-thrift/src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,30 @@ pub use default::DefaultMakeCodec;
/// Returning an Ok(None) indicates the EOF has been reached.
///
/// Note: [`Decoder`] should be designed to be ready for reuse.
pub trait Decoder: Send + 'static {
pub trait Decoder: Send + Sync + 'static {
fn decode<Msg: Send + EntryMessage, Cx: ThriftContext>(
&mut self,
cx: &mut Cx,
) -> impl Future<Output = Result<Option<ThriftMessage<Msg>>, ThriftException>> + Send;

fn is_closed(&self) -> impl Future<Output = bool> + Send {
async { false }
}
}

/// [`Encoder`] writes a [`ThriftMessage`] to an [`AsyncWrite`] and flushes the data.
///
/// Note: [`Encoder`] should be designed to be ready for reuse.
pub trait Encoder: Send + 'static {
pub trait Encoder: Send + Sync + 'static {
fn encode<Req: Send + EntryMessage, Cx: ThriftContext>(
&mut self,
cx: &mut Cx,
msg: ThriftMessage<Req>,
) -> impl Future<Output = Result<(), ThriftException>> + Send;

fn is_closed(&self) -> impl Future<Output = bool> + Send {
async { false }
}
}

/// [`MakeCodec`] receives an [`AsyncRead`] and an [`AsyncWrite`] and returns a
Expand Down
2 changes: 1 addition & 1 deletion volo-thrift/src/transport/multiplex/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ where
}
}
if cx.transport.should_reuse && resp.is_ok() {
transport.reuse();
transport.reuse().await;
}
resp
}
Expand Down
4 changes: 2 additions & 2 deletions volo-thrift/src/transport/multiplex/thrift_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,8 @@ where
}
}

impl<TTEncoder, Resp> Poolable for ThriftTransport<TTEncoder, Resp> {
fn reusable(&self) -> bool {
impl<TTEncoder: Send, Resp: Send> Poolable for ThriftTransport<TTEncoder, Resp> {
async fn reusable(&self) -> bool {
!self.write_error.load(std::sync::atomic::Ordering::Relaxed)
&& !self.read_error.load(std::sync::atomic::Ordering::Relaxed)
&& !self.read_closed.load(std::sync::atomic::Ordering::Relaxed)
Expand Down
2 changes: 1 addition & 1 deletion volo-thrift/src/transport/pingpong/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ where
}
}
if cx.transport.should_reuse && resp.is_ok() {
transport.reuse();
transport.reuse().await;
}
resp
}
Expand Down
7 changes: 5 additions & 2 deletions volo-thrift/src/transport/pingpong/thrift_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,10 @@ where
E: Encoder,
D: Decoder,
{
fn reusable(&self) -> bool {
self.read_half.reusable && self.write_half.reusable
async fn reusable(&self) -> bool {
self.read_half.reusable
&& self.write_half.reusable
&& !self.read_half.decoder.is_closed().await
&& !self.write_half.encoder.is_closed().await
}
}
117 changes: 53 additions & 64 deletions volo-thrift/src/transport/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub enum Ver {

pub trait Poolable: Sized {
// check if the connection is opened
fn reusable(&self) -> bool;
fn reusable(&self) -> impl Future<Output = bool> + Send;

/// Reserve this connection.
///
Expand Down Expand Up @@ -158,53 +158,6 @@ impl Expiration {
}
}

/// Pop off this list, looking for a usable connection that hasn't expired.
struct IdlePopper<'a, K: Key, T> {
key: &'a K,
list: &'a mut VecDeque<Idle<T>>,
}

impl<'a, K: Key, T: Poolable + 'a> IdlePopper<'a, K, T> {
fn pop(self, expiration: &Expiration) -> Option<Idle<T>> {
while let Some(entry) = self.list.pop_front() {
// If the connection has been closed, or is older than our idle
// timeout, simply drop it and keep looking...
if !entry.inner.reusable() {
tracing::trace!("[VOLO] removing closed connection for {:?}", self.key);
continue;
}
// TODO: Actually, since the `idle` list is pushed to the end always,
// that would imply that if *this* entry is expired, then anything
// "earlier" in the list would *have* to be expired also... Right?
//
// In that case, we could just break out of the loop and drop the
// whole list...
if expiration.expires(entry.idle_at) {
tracing::trace!("[VOLO] removing expired connection for {:?}", self.key);
continue;
}

let value = match entry.inner.reserve() {
Reservation::Shared(to_reinsert, to_return) => {
self.list.push_back(Idle {
idle_at: Instant::now(),
inner: to_reinsert,
});
to_return
}
Reservation::Unique(unique) => unique,
};

return Some(Idle {
idle_at: entry.idle_at,
inner: value,
});
}

None
}
}

impl<K: Key, T: Poolable + Send + 'static> Pool<K, T> {
#[allow(dead_code)]
pub fn new(cfg: Option<Config>) -> Self {
Expand Down Expand Up @@ -267,19 +220,56 @@ impl<K: Key, T: Poolable + Send + 'static> Pool<K, T> {
MT::Error: Into<crate::ClientError> + Send,
{
let (rx, _waiter_token) = {
let mut inner = self.inner.lock().volo_unwrap();
// 1. check the idle and opened connections
let expiration = Expiration::new(Some(inner.timeout));
let entry = inner.idle.get_mut(&key).and_then(|list| {
tracing::trace!("[VOLO] take? {:?}: expiration = {:?}", key, expiration.0);
{
let popper = IdlePopper { key: &key, list };
popper.pop(&expiration)
let entry = 'outer: loop {
let entry = 'inner: {
let mut inner = self.inner.lock().volo_unwrap();
// 1. check the idle and opened connections
let expiration = Expiration::new(Some(inner.timeout));

if let Some(list) = inner.idle.get_mut(&key) {
tracing::trace!("[VOLO] take? {:?}: expiration = {:?}", key, expiration.0);
while let Some(entry) = list.pop_front() {
// TODO: Actually, since the `idle` list is pushed to the end always,
// that would imply that if *this* entry is expired, then anything
// "earlier" in the list would *have* to be expired also... Right?
//
// In that case, we could just break out of the loop and drop the
// whole list...
if expiration.expires(entry.idle_at) {
tracing::trace!("[VOLO] removing expired connection for {:?}", key);
continue;
}
break 'inner entry;
}
break 'outer None;
} else {
break 'outer None;
}
};
// If the connection has been closed, or is older than our idle
// timeout, simply drop it and keep looking...
if !entry.inner.reusable().await {
continue;
}
});
break 'outer Some(entry);
};

let mut inner = self.inner.lock().volo_unwrap();

if let Some(t) = entry {
return Ok(self.reuse(&key, t.inner));
let value = match t.inner.reserve() {
Reservation::Shared(to_reinsert, to_return) => {
if let Some(list) = inner.idle.get_mut(&key) {
list.push_back(Idle {
idle_at: Instant::now(),
inner: to_reinsert,
})
}
to_return
}
Reservation::Unique(unique) => unique,
};
return Ok(self.reuse(&key, value));
}
// 2. no valid idle then add caller into waiters and make connection
let waiters = if let Some(waiter) = inner.waiters.get_mut(&key) {
Expand Down Expand Up @@ -440,9 +430,9 @@ impl<K: Key, T: Poolable> Pooled<K, T> {
}
}

pub(crate) fn reuse(mut self) {
pub(crate) async fn reuse(mut self) {
let inner = self.t.take().volo_unwrap();
if !inner.reusable() {
if !inner.reusable().await {
// If we *already* know the connection is done here,
// it shouldn't be re-inserted back into the pool.
return;
Expand Down Expand Up @@ -544,11 +534,10 @@ impl<K: Key, T: Poolable> Inner<K, T> {
let now = Instant::now();
self.idle.retain(|key, values| {
values.retain(|entry| {
// if !entry.inner.reusable().await {
// continue;
// }
// TODO: check has_idle && remove the (idle, waiters) key
if !entry.inner.reusable() {
tracing::trace!("[VOLO] idle interval evicting closed for {:?}", key);
return false;
}
if now - entry.idle_at > timeout {
tracing::trace!("[VOLO] idle interval evicting expired for {:?}", key);
return false;
Expand Down
1 change: 1 addition & 0 deletions volo/src/net/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod conn;
pub mod dial;
pub mod incoming;
pub mod ready;
#[cfg(feature = "__tls")]
#[cfg_attr(docsrs, doc(cfg(any(feature = "rustls", feature = "native-tls"))))]
pub mod tls;
Expand Down
43 changes: 43 additions & 0 deletions volo/src/net/ready.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use futures::Future;
use tokio::io::{self, Interest, Ready};

use super::conn::{OwnedReadHalf, OwnedWriteHalf};

/// Asynchronous IO readiness.
///
/// Like [`tokio::io::AsyncRead`] or [`tokio::io::AsyncWrite`], but for
/// readiness events.
pub trait AsyncReady {
/// Checks for IO readiness.
///
/// See [`tokio::net::TcpStream::ready`] for details.
fn ready(&self, interest: Interest) -> impl Future<Output = io::Result<Ready>> + Send;
}

impl AsyncReady for OwnedReadHalf {
async fn ready(&self, interest: Interest) -> io::Result<Ready> {
match self {
OwnedReadHalf::Tcp(half) => half.ready(interest).await,
#[cfg(target_family = "unix")]
OwnedReadHalf::Unix(half) => half.ready(interest).await,
#[cfg(feature = "rustls")]
OwnedReadHalf::Rustls(_) => todo!(),
#[cfg(feature = "native-tls")]
OwnedReadHalf::NativeTls(_) => todo!(),
}
}
}

impl AsyncReady for OwnedWriteHalf {
async fn ready(&self, interest: Interest) -> io::Result<Ready> {
match self {
OwnedWriteHalf::Tcp(half) => half.ready(interest).await,
#[cfg(target_family = "unix")]
OwnedWriteHalf::Unix(half) => half.ready(interest).await,
#[cfg(feature = "rustls")]
OwnedWriteHalf::Rustls(_) => todo!(),
#[cfg(feature = "native-tls")]
OwnedWriteHalf::NativeTls(_) => todo!(),
}
}
}
Loading

0 comments on commit 95f9ef9

Please sign in to comment.