-
Notifications
You must be signed in to change notification settings - Fork 271
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add the PoolQueue middleware (#2540)
Tower's Buffer middleware is used to make a Service shareable across tasks. Within Linkerd, we've augmented the Buffer with failfast behavior, etc, as a Queue middleware. This queue plays a vital role in dispatching requests to load balancers, ensuring that load is shed when a balancer has no available endpoints. The tower Balancer additionally holds a Stream of endpoint updates that are processed as the balancer is driven to readiness. Crucially, this means that updates to the balancer can only occur while requests are being processed. We cannot eagerly drop defunct endpoints, nor can we eagerly connect to new endpoints. We have observed situations where long-lived, mostly-idle balancers can buffer discovery updates indefinitely, bloating memory and even forcing backpressure to the control plane. To correct this behavior, this change introduces the PoolQueue middleware. The PoolQueue is based on the tower's Buffer, but instead of composing over an inner Service, it composes over an inner Pool. Pool is a new interface that provides an additional interface to update the pool's members and to drive all pending endpoints in the pool to be ready (decoupling the semantics of Service::poll_ready and Pool::poll_pool). Pool implementations will typically hold a ReadyCache of inner endpoints (as the tower balancer does). This change, however, does not include the concrete implementation of a Pool to be replace the balancer. A p2c pool will be introduced in a followup change. This change has the added benefit of simplifying the endpoint discovery pipeline. We currently process Updates (including an array of endpoints) from the discovery API and convert that into a stream of discrete endpoint updates for the balancer, requiring redundant caching. The Pool interface processes Updates directly, so there is no need for the extra translation.
- Loading branch information
Showing
13 changed files
with
1,305 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
[package] | ||
name = "linkerd-proxy-pool" | ||
version = "0.1.0" | ||
authors = ["Linkerd Developers <[email protected]>"] | ||
license = "Apache-2.0" | ||
edition = "2021" | ||
publish = false | ||
|
||
[dependencies] | ||
futures = { version = "0.3", default-features = false } | ||
linkerd-error = { path = "../../error" } | ||
# linkerd-metrics = { path = "../../metrics" } | ||
linkerd-proxy-core = { path = "../core" } | ||
linkerd-stack = { path = "../../stack" } | ||
parking_lot = "0.12" | ||
pin-project = "1" | ||
# rand = "0.8" | ||
thiserror = "1" | ||
tokio = { version = "1", features = ["rt", "sync", "time"] } | ||
# tokio-stream = { version = "0.1", features = ["sync"] } | ||
tokio-util = "0.7" | ||
tracing = "0.1" | ||
|
||
[dev-dependencies] | ||
linkerd-tracing = { path = "../../tracing" } | ||
tokio-stream = { version = "0.1", features = ["sync"] } | ||
tokio-test = "0.4" | ||
tower-test = "0.4" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
//! Error types for the `PoolQueue` middleware. | ||
use linkerd_error::Error; | ||
use std::{fmt, sync::Arc}; | ||
|
||
/// A shareable, terminal error produced by either a service or discovery | ||
/// resolution. | ||
#[derive(Clone, Debug)] | ||
pub struct TerminalFailure(Arc<Error>); | ||
|
||
// === impl TerminalFailure === | ||
|
||
impl TerminalFailure { | ||
pub(crate) fn new(inner: Error) -> TerminalFailure { | ||
TerminalFailure(Arc::new(inner)) | ||
} | ||
} | ||
|
||
impl fmt::Display for TerminalFailure { | ||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
write!(fmt, "pool failed: {}", self.0) | ||
} | ||
} | ||
|
||
impl std::error::Error for TerminalFailure { | ||
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { | ||
Some(&**self.0) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
use linkerd_stack::gate; | ||
use std::pin::Pin; | ||
use tokio::time; | ||
|
||
/// Manages the failfast state for a pool. | ||
#[derive(Debug)] | ||
pub(super) struct Failfast { | ||
timeout: time::Duration, | ||
sleep: Pin<Box<time::Sleep>>, | ||
state: Option<State>, | ||
gate: gate::Tx, | ||
} | ||
|
||
#[derive(Copy, Clone, Debug, PartialEq, Eq)] | ||
pub(super) enum State { | ||
Waiting { since: time::Instant }, | ||
Failfast { since: time::Instant }, | ||
} | ||
|
||
// === impl Failfast === | ||
|
||
impl Failfast { | ||
pub(super) fn new(timeout: time::Duration, gate: gate::Tx) -> Self { | ||
Self { | ||
timeout, | ||
sleep: Box::pin(time::sleep(time::Duration::MAX)), | ||
state: None, | ||
gate, | ||
} | ||
} | ||
|
||
pub(super) fn duration(&self) -> time::Duration { | ||
self.timeout | ||
} | ||
|
||
/// Returns true if we are currently in a failfast state. | ||
pub(super) fn is_active(&self) -> bool { | ||
matches!(self.state, Some(State::Failfast { .. })) | ||
} | ||
|
||
/// Clears any waiting or failfast state. | ||
pub(super) fn set_ready(&mut self) -> Option<State> { | ||
let state = self.state.take()?; | ||
if matches!(state, State::Failfast { .. }) { | ||
tracing::trace!("Exiting failfast"); | ||
let _ = self.gate.open(); | ||
} | ||
Some(state) | ||
} | ||
|
||
/// Waits for the failfast timeout to expire and enters the failfast state. | ||
pub(super) async fn entered(&mut self) { | ||
let since = match self.state { | ||
// If we're already in failfast, then we don't need to wait. | ||
Some(State::Failfast { .. }) => { | ||
return; | ||
} | ||
|
||
// Ensure that the timer's been initialized. | ||
Some(State::Waiting { since }) => since, | ||
None => { | ||
let now = time::Instant::now(); | ||
self.sleep.as_mut().reset(now + self.timeout); | ||
self.state = Some(State::Waiting { since: now }); | ||
now | ||
} | ||
}; | ||
|
||
// Wait for the failfast timer to expire. | ||
tracing::trace!("Waiting for failfast timeout"); | ||
self.sleep.as_mut().await; | ||
tracing::trace!("Entering failfast"); | ||
|
||
// Once we enter failfast, shut the upstream gate so that we can | ||
// advertise backpressure past the queue. | ||
self.state = Some(State::Failfast { since }); | ||
let _ = self.gate.shut(); | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
use futures::prelude::*; | ||
|
||
#[tokio::test(flavor = "current_thread", start_paused = true)] | ||
async fn failfast() { | ||
let (tx, gate_rx) = gate::channel(); | ||
let dur = time::Duration::from_secs(1); | ||
let mut failfast = Failfast::new(dur, tx); | ||
|
||
assert_eq!(dur, failfast.duration()); | ||
assert!(gate_rx.is_open()); | ||
|
||
// The failfast timeout should not be initialized until the first | ||
// request is received. | ||
assert!(!failfast.is_active(), "failfast should be active"); | ||
|
||
failfast.entered().await; | ||
assert!(failfast.is_active(), "failfast should be active"); | ||
assert!(gate_rx.is_shut(), "gate should be shut"); | ||
|
||
failfast | ||
.entered() | ||
.now_or_never() | ||
.expect("timeout must return immediately when in failfast"); | ||
assert!(failfast.is_active(), "failfast should be active"); | ||
assert!(gate_rx.is_shut(), "gate should be shut"); | ||
|
||
failfast.set_ready(); | ||
assert!(!failfast.is_active(), "failfast should be inactive"); | ||
assert!(gate_rx.is_open(), "gate should be open"); | ||
|
||
tokio::select! { | ||
_ = time::sleep(time::Duration::from_millis(10)) => {} | ||
_ = failfast.entered() => unreachable!("timed out too quick"), | ||
} | ||
assert!(!failfast.is_active(), "failfast should be inactive"); | ||
assert!(gate_rx.is_open(), "gate should be open"); | ||
|
||
assert!( | ||
matches!(failfast.state, Some(State::Waiting { .. })), | ||
"failfast should be waiting" | ||
); | ||
|
||
failfast.entered().await; | ||
assert!(failfast.is_active(), "failfast should be active"); | ||
assert!(gate_rx.is_shut(), "gate should be shut"); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
use super::message; | ||
use futures::ready; | ||
use linkerd_error::Error; | ||
use pin_project::pin_project; | ||
use std::{ | ||
future::Future, | ||
pin::Pin, | ||
task::{Context, Poll}, | ||
}; | ||
|
||
#[pin_project] | ||
/// Future that completes when the buffered service eventually services the submitted request. | ||
#[derive(Debug)] | ||
pub struct ResponseFuture<T> { | ||
#[pin] | ||
state: ResponseState<T>, | ||
} | ||
|
||
#[pin_project(project = ResponseStateProj)] | ||
#[derive(Debug)] | ||
enum ResponseState<T> { | ||
Failed { | ||
error: Option<Error>, | ||
}, | ||
Rx { | ||
#[pin] | ||
rx: message::Rx<T>, | ||
}, | ||
Poll { | ||
#[pin] | ||
fut: T, | ||
}, | ||
} | ||
|
||
impl<T> ResponseFuture<T> { | ||
pub(crate) fn new(rx: message::Rx<T>) -> Self { | ||
ResponseFuture { | ||
state: ResponseState::Rx { rx }, | ||
} | ||
} | ||
|
||
pub(crate) fn failed(err: Error) -> Self { | ||
ResponseFuture { | ||
state: ResponseState::Failed { error: Some(err) }, | ||
} | ||
} | ||
} | ||
|
||
impl<F, T, E> Future for ResponseFuture<F> | ||
where | ||
F: Future<Output = Result<T, E>>, | ||
E: Into<Error>, | ||
{ | ||
type Output = Result<T, Error>; | ||
|
||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
let mut this = self.project(); | ||
|
||
loop { | ||
match this.state.as_mut().project() { | ||
ResponseStateProj::Failed { error } => { | ||
return Poll::Ready(Err(error.take().expect("polled after error"))); | ||
} | ||
ResponseStateProj::Rx { rx } => { | ||
let fut = ready!(rx.poll(cx)) | ||
.expect("worker must set a failure if it exits prematurely")?; | ||
this.state.set(ResponseState::Poll { fut }); | ||
} | ||
ResponseStateProj::Poll { fut } => return fut.poll(cx).map_err(Into::into), | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
//! Adapted from [`tower::buffer`][buffer]. | ||
//! | ||
//! [buffer]: https://github.com/tower-rs/tower/tree/bf4ea948346c59a5be03563425a7d9f04aadedf2/tower/src/buffer | ||
// | ||
// Copyright (c) 2019 Tower Contributors | ||
|
||
#![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)] | ||
#![forbid(unsafe_code)] | ||
|
||
mod error; | ||
mod failfast; | ||
mod future; | ||
mod message; | ||
mod service; | ||
#[cfg(test)] | ||
mod tests; | ||
mod worker; | ||
|
||
pub use self::service::PoolQueue; | ||
pub use linkerd_proxy_core::Update; | ||
|
||
use linkerd_stack::Service; | ||
|
||
/// A collection of services updated from a resolution. | ||
pub trait Pool<T, Req>: Service<Req> { | ||
/// Updates the pool's endpoints. | ||
fn update_pool(&mut self, update: Update<T>); | ||
|
||
/// Polls to update the pool while the Service is ready. | ||
/// | ||
/// [`Service::poll_ready`] should do the same work, but will return ready | ||
/// as soon as there at least one ready endpoint. This method will continue | ||
/// to drive the pool until ready is returned (indicating that the pool need | ||
/// not be updated before another request is processed). | ||
fn poll_pool( | ||
&mut self, | ||
cx: &mut std::task::Context<'_>, | ||
) -> std::task::Poll<Result<(), Self::Error>>; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
use linkerd_error::{Error, Result}; | ||
use tokio::{sync::oneshot, time}; | ||
|
||
/// Message sent over buffer | ||
#[derive(Debug)] | ||
pub(crate) struct Message<Req, Fut> { | ||
pub(crate) req: Req, | ||
pub(crate) tx: Tx<Fut>, | ||
pub(crate) span: tracing::Span, | ||
pub(crate) t0: time::Instant, | ||
} | ||
|
||
/// Response sender | ||
type Tx<Fut> = oneshot::Sender<Result<Fut>>; | ||
|
||
/// Response receiver | ||
pub(crate) type Rx<Fut> = oneshot::Receiver<Result<Fut>>; | ||
|
||
impl<Req, Fut> Message<Req, Fut> { | ||
pub(crate) fn channel(req: Req) -> (Self, Rx<Fut>) { | ||
let (tx, rx) = oneshot::channel(); | ||
let t0 = time::Instant::now(); | ||
let span = tracing::Span::current(); | ||
(Message { req, span, tx, t0 }, rx) | ||
} | ||
|
||
pub(crate) fn fail(self, err: impl Into<Error>) { | ||
if self.tx.send(Err(err.into())).is_ok() { | ||
tracing::debug!( | ||
latency = (time::Instant::now() - self.t0).as_secs_f64(), | ||
"Failed due to pool error" | ||
); | ||
} else { | ||
tracing::debug!("Caller dropped"); | ||
} | ||
} | ||
} |
Oops, something went wrong.