Skip to content

Commit

Permalink
feat: add timer support for legacy::Pool (#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
dswij authored Jan 8, 2024
1 parent b96757e commit 784109d
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 33 deletions.
40 changes: 36 additions & 4 deletions src/client/legacy/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use tracing::{debug, trace, warn};
use super::connect::HttpConnector;
use super::connect::{Alpn, Connect, Connected, Connection};
use super::pool::{self, Ver};
use crate::common::{lazy as hyper_lazy, Exec, Lazy, SyncWrapper};

use crate::common::{lazy as hyper_lazy, timer, Exec, Lazy, SyncWrapper};

type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>;

Expand Down Expand Up @@ -975,6 +976,7 @@ pub struct Builder {
#[cfg(feature = "http2")]
h2_builder: hyper::client::conn::http2::Builder<Exec>,
pool_config: pool::Config,
pool_timer: Option<timer::Timer>,
}

impl Builder {
Expand All @@ -999,13 +1001,34 @@ impl Builder {
idle_timeout: Some(Duration::from_secs(90)),
max_idle_per_host: std::usize::MAX,
},
pool_timer: None,
}
}
/// Set an optional timeout for idle sockets being kept-alive.
/// A `Timer` is required for this to take effect. See `Builder::pool_timer`
///
/// Pass `None` to disable timeout.
///
/// Default is 90 seconds.
///
/// # Example
///
/// ```
/// # #[cfg(feature = "tokio")]
/// # fn run () {
/// use std::time::Duration;
/// use hyper_util::client::legacy::Client;
/// use hyper_util::rt::{TokioExecutor, TokioTimer};
///
/// let client = Client::builder(TokioExecutor::new())
/// .pool_idle_timeout(Duration::from_secs(30))
/// .pool_timer(TokioTimer::new())
/// .build_http();
///
/// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
/// # }
/// # fn main() {}
/// ```
pub fn pool_idle_timeout<D>(&mut self, val: D) -> &mut Self
where
D: Into<Option<Duration>>,
Expand Down Expand Up @@ -1366,7 +1389,7 @@ impl Builder {
self
}

/// Provide a timer to be used for timeouts and intervals.
/// Provide a timer to be used for h2
///
/// See the documentation of [`h2::client::Builder::timer`] for more
/// details.
Expand All @@ -1378,7 +1401,15 @@ impl Builder {
{
#[cfg(feature = "http2")]
self.h2_builder.timer(timer);
// TODO(https://github.com/hyperium/hyper/issues/3167) set for pool as well
self
}

/// Provide a timer to be used for timeouts and intervals in connection pools.
pub fn pool_timer<M>(&mut self, timer: M) -> &mut Self
where
M: Timer + Clone + Send + Sync + 'static,
{
self.pool_timer = Some(timer::Timer::new(timer.clone()));
self
}

Expand Down Expand Up @@ -1447,6 +1478,7 @@ impl Builder {
B::Data: Send,
{
let exec = self.exec.clone();
let timer = self.pool_timer.clone();
Client {
config: self.client_config,
exec: exec.clone(),
Expand All @@ -1455,7 +1487,7 @@ impl Builder {
#[cfg(feature = "http2")]
h2_builder: self.h2_builder.clone(),
connector,
pool: pool::Pool::new(self.pool_config, exec),
pool: pool::Pool::new(self.pool_config, exec, timer),
}
}
}
Expand Down
76 changes: 47 additions & 29 deletions src/client/legacy/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use futures_channel::oneshot;
use futures_util::ready;
use tracing::{debug, trace};

use crate::common::exec::{self, Exec};
use hyper::rt::Sleep;
use hyper::rt::Timer as _;

use crate::common::{exec, exec::Exec, timer::Timer};

// FIXME: allow() required due to `impl Trait` leaking types to this lint
#[allow(missing_debug_implementations)]
Expand Down Expand Up @@ -97,6 +100,7 @@ struct PoolInner<T, K: Eq + Hash> {
// the Pool completely drops. That way, the interval can cancel immediately.
idle_interval_ref: Option<oneshot::Sender<Infallible>>,
exec: Exec,
timer: Option<Timer>,
timeout: Option<Duration>,
}

Expand All @@ -117,11 +121,13 @@ impl Config {
}

impl<T, K: Key> Pool<T, K> {
pub fn new<E>(config: Config, executor: E) -> Pool<T, K>
pub fn new<E, M>(config: Config, executor: E, timer: Option<M>) -> Pool<T, K>
where
E: hyper::rt::Executor<exec::BoxSendFuture> + Send + Sync + Clone + 'static,
M: hyper::rt::Timer + Send + Sync + Clone + 'static,
{
let exec = Exec::new(executor);
let timer = timer.map(|t| Timer::new(t));
let inner = if config.is_enabled() {
Some(Arc::new(Mutex::new(PoolInner {
connecting: HashSet::new(),
Expand All @@ -130,6 +136,7 @@ impl<T, K: Key> Pool<T, K> {
max_idle_per_host: config.max_idle_per_host,
waiters: HashMap::new(),
exec,
timer,
timeout: config.idle_timeout,
})))
} else {
Expand Down Expand Up @@ -411,31 +418,33 @@ impl<T: Poolable, K: Key> PoolInner<T, K> {
self.waiters.remove(key);
}

fn spawn_idle_interval(&mut self, _pool_ref: &Arc<Mutex<PoolInner<T, K>>>) {
// TODO
/*
let (dur, rx) = {
if self.idle_interval_ref.is_some() {
return;
}
if let Some(dur) = self.timeout {
let (tx, rx) = oneshot::channel();
self.idle_interval_ref = Some(tx);
(dur, rx)
} else {
return;
}
fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T, K>>>) {
if self.idle_interval_ref.is_some() {
return;
}
let dur = if let Some(dur) = self.timeout {
dur
} else {
return;
};
let timer = if let Some(timer) = self.timer.clone() {
timer
} else {
return;
};
let (tx, rx) = oneshot::channel();
self.idle_interval_ref = Some(tx);

let interval = IdleTask {
interval: tokio::time::interval(dur),
timer: timer.clone(),
duration: dur,
deadline: Instant::now(),
fut: timer.sleep_until(Instant::now()), // ready at first tick
pool: WeakOpt::downgrade(pool_ref),
pool_drop_notifier: rx,
};

self.exec.execute(interval);
*/
}
}

Expand Down Expand Up @@ -755,11 +764,12 @@ impl Expiration {
}
}

/*
pin_project_lite::pin_project! {
struct IdleTask<T, K: Key> {
#[pin]
interval: Interval,
timer: Timer,
duration: Duration,
deadline: Instant,
fut: Pin<Box<dyn Sleep>>,
pool: WeakOpt<Mutex<PoolInner<T, K>>>,
// This allows the IdleTask to be notified as soon as the entire
// Pool is fully dropped, and shutdown. This channel is never sent on,
Expand All @@ -784,7 +794,15 @@ impl<T: Poolable + 'static, K: Key> Future for IdleTask<T, K> {
}
}

ready!(this.interval.as_mut().poll_tick(cx));
ready!(Pin::new(&mut this.fut).poll(cx));
// Set this task to run after the next deadline
// If the poll missed the deadline by a lot, set the deadline
// from the current time instead
*this.deadline = *this.deadline + *this.duration;
if *this.deadline < Instant::now() - Duration::from_millis(5) {
*this.deadline = Instant::now() + *this.duration;
}
*this.fut = this.timer.sleep_until(*this.deadline);

if let Some(inner) = this.pool.upgrade() {
if let Ok(mut inner) = inner.lock() {
Expand All @@ -797,7 +815,6 @@ impl<T: Poolable + 'static, K: Key> Future for IdleTask<T, K> {
}
}
}
*/

impl<T> WeakOpt<T> {
fn none() -> Self {
Expand All @@ -823,7 +840,9 @@ mod tests {
use std::time::Duration;

use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt};
use crate::rt::TokioExecutor;
use crate::rt::{TokioExecutor, TokioTimer};

use crate::common::timer;

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct KeyImpl(http::uri::Scheme, http::uri::Authority);
Expand Down Expand Up @@ -870,6 +889,7 @@ mod tests {
max_idle_per_host: max_idle,
},
TokioExecutor::new(),
Option::<timer::Timer>::None,
);
pool.no_timer();
pool
Expand Down Expand Up @@ -960,16 +980,14 @@ mod tests {
}

#[tokio::test]
#[ignore] // TODO
async fn test_pool_timer_removes_expired() {
tokio::time::pause();

let pool = Pool::new(
super::Config {
idle_timeout: Some(Duration::from_millis(10)),
max_idle_per_host: std::usize::MAX,
},
TokioExecutor::new(),
Some(TokioTimer::new()),
);

let key = host_key("foo");
Expand All @@ -984,7 +1002,7 @@ mod tests {
);

// Let the timer tick passed the expiration...
tokio::time::advance(Duration::from_millis(30)).await;
tokio::time::sleep(Duration::from_millis(30)).await;
// Yield so the Interval can reap...
tokio::task::yield_now().await;

Expand Down
1 change: 1 addition & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod lazy;
pub(crate) mod rewind;
#[cfg(feature = "client")]
mod sync;
pub(crate) mod timer;

#[cfg(feature = "client")]
pub(crate) use exec::Exec;
Expand Down
38 changes: 38 additions & 0 deletions src/common/timer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#![allow(dead_code)]

use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

use hyper::rt::Sleep;

#[derive(Clone)]
pub(crate) struct Timer(Arc<dyn hyper::rt::Timer + Send + Sync>);

// =====impl Timer=====
impl Timer {
pub(crate) fn new<T>(inner: T) -> Self
where
T: hyper::rt::Timer + Send + Sync + 'static,
{
Self(Arc::new(inner))
}
}

impl fmt::Debug for Timer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Timer").finish()
}
}

impl hyper::rt::Timer for Timer {
fn sleep(&self, duration: Duration) -> Pin<Box<dyn Sleep>> {
self.0.sleep(duration)
}

fn sleep_until(&self, deadline: Instant) -> Pin<Box<dyn Sleep>> {
self.0.sleep_until(deadline)
}
}

0 comments on commit 784109d

Please sign in to comment.