Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the PoolQueue middleware #2540

Merged
merged 14 commits into from
Dec 2, 2023
20 changes: 20 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1797,6 +1797,26 @@ dependencies = [
"tracing",
]

[[package]]
name = "linkerd-proxy-pool"
version = "0.1.0"
dependencies = [
"futures",
"linkerd-error",
"linkerd-proxy-core",
"linkerd-stack",
"linkerd-tracing",
"parking_lot",
"pin-project",
"thiserror",
"tokio",
"tokio-stream",
"tokio-test",
"tokio-util",
"tower-test",
"tracing",
]

[[package]]
name = "linkerd-proxy-resolve"
version = "0.1.0"
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ members = [
"linkerd/proxy/dns-resolve",
"linkerd/proxy/http",
"linkerd/proxy/identity-client",
"linkerd/proxy/pool",
"linkerd/proxy/resolve",
"linkerd/proxy/server-policy",
"linkerd/proxy/tap",
Expand Down
28 changes: 28 additions & 0 deletions linkerd/proxy/pool/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[package]
name = "linkerd-proxy-pool"
version = "0.1.0"
authors = ["Linkerd Developers <[email protected]>"]
license = "Apache-2.0"
edition = "2021"
publish = false

[dependencies]
futures = { version = "0.3", default-features = false }
linkerd-error = { path = "../../error" }
# linkerd-metrics = { path = "../../metrics" }
linkerd-proxy-core = { path = "../core" }
linkerd-stack = { path = "../../stack" }
parking_lot = "0.12"
pin-project = "1"
# rand = "0.8"
thiserror = "1"
tokio = { version = "1", features = ["rt", "sync", "time"] }
# tokio-stream = { version = "0.1", features = ["sync"] }
tokio-util = "0.7"
tracing = "0.1"

[dev-dependencies]
linkerd-tracing = { path = "../../tracing" }
tokio-stream = { version = "0.1", features = ["sync"] }
tokio-test = "0.4"
tower-test = "0.4"
48 changes: 48 additions & 0 deletions linkerd/proxy/pool/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//! Error types for the `Buffer` middleware.
olix0r marked this conversation as resolved.
Show resolved Hide resolved

use linkerd_error::Error;
use std::{fmt, sync::Arc};

/// A shareable, terminal error produced by either a service or discovery
/// resolution.
///
/// [`Service`]: crate::Service
/// [`Buffer`]: crate::buffer::Buffer
#[derive(Clone, Debug)]
pub struct TerminalFailure {
inner: Arc<Error>,
}

/// An error produced when the a buffer's worker closes unexpectedly.
#[derive(Debug, thiserror::Error)]
#[error("buffer worker closed unexpectedly")]
pub struct Closed(());

// === impl ServiceError ===

impl TerminalFailure {
pub(crate) fn new(inner: Error) -> TerminalFailure {
let inner = Arc::new(inner);
TerminalFailure { inner }
}
}

impl fmt::Display for TerminalFailure {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "buffered service failed: {}", self.inner)

Check warning on line 32 in linkerd/proxy/pool/src/error.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/proxy/pool/src/error.rs#L31-L32

Added lines #L31 - L32 were not covered by tests
}
}

impl std::error::Error for TerminalFailure {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(&**self.inner)

Check warning on line 38 in linkerd/proxy/pool/src/error.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/proxy/pool/src/error.rs#L37-L38

Added lines #L37 - L38 were not covered by tests
}
}

// === impl Closed ====

impl Closed {
pub(crate) fn new() -> Self {
Closed(())
}
}
130 changes: 130 additions & 0 deletions linkerd/proxy/pool/src/failfast.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use linkerd_stack::gate;
use std::pin::Pin;
use tokio::time;

/// Manages the failfast state for a pool.
#[derive(Debug)]
pub(super) struct Failfast {
timeout: time::Duration,
sleep: Pin<Box<time::Sleep>>,
state: Option<State>,
gate: gate::Tx,
}

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub(super) enum State {
Waiting { since: time::Instant },
Failfast { since: time::Instant },
}

// === impl Failfast ===

impl Failfast {
pub(super) fn new(timeout: time::Duration, gate: gate::Tx) -> Self {
Self {
timeout,
sleep: Box::pin(time::sleep(time::Duration::MAX)),
state: None,
gate,
}
}

pub(super) fn duration(&self) -> time::Duration {
self.timeout
}

/// Returns true if we are currently in a failfast state.
pub(super) fn is_active(&self) -> bool {
matches!(self.state, Some(State::Failfast { .. }))
}

/// Clears any waiting or failfast state.
pub(super) fn set_ready(&mut self) -> Option<State> {
let state = self.state.take()?;
if matches!(state, State::Failfast { .. }) {
tracing::trace!("Exiting failfast");
let _ = self.gate.open();
}
Some(state)
}

/// Waits for the failfast timeout to expire and enters the failfast state.
pub(super) async fn timeout(&mut self) {
let since = match self.state {
// If we're already in failfast, then we don't need to wait.
Some(State::Failfast { .. }) => {
return;
}

// Ensure that the timer's been initialized.
Some(State::Waiting { since }) => since,
None => {
let now = time::Instant::now();
self.sleep.as_mut().reset(now + self.timeout);
self.state = Some(State::Waiting { since: now });
now
}
};

// Wait for the failfast timer to expire.
tracing::trace!("Waiting for failfast timeout");
self.sleep.as_mut().await;
tracing::trace!("Entering failfast");

// Once we enter failfast, shut the upstream gate so that we can
// advertise backpressure past the queue.
self.state = Some(State::Failfast { since });
let _ = self.gate.shut();
}
}

#[cfg(test)]
mod tests {
use super::*;
use futures::prelude::*;

#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn failfast() {
let (tx, gate_rx) = gate::channel();
let dur = time::Duration::from_secs(1);
let mut failfast = Failfast::new(dur, tx);

assert_eq!(dur, failfast.duration());
assert!(gate_rx.is_open());

// The failfast timeout should not be initialized until the first
// request is received.
assert!(!failfast.is_active(), "failfast should be active");

failfast.timeout().await;
assert!(failfast.is_active(), "failfast should be active");
assert!(gate_rx.is_shut(), "gate should be shut");

failfast
.timeout()
.now_or_never()
.expect("timeout must return immediately when in failfast");
assert!(failfast.is_active(), "failfast should be active");
assert!(gate_rx.is_shut(), "gate should be shut");

failfast.set_ready();
assert!(!failfast.is_active(), "failfast should be inactive");
assert!(gate_rx.is_open(), "gate should be open");

tokio::select! {
_ = time::sleep(time::Duration::from_millis(10)) => {}
_ = failfast.timeout() => unreachable!("timed out too quick"),
}
assert!(!failfast.is_active(), "failfast should be inactive");
assert!(gate_rx.is_open(), "gate should be open");

assert!(
matches!(failfast.state, Some(State::Waiting { .. })),
"failfast should be waiting"
);

failfast.timeout().await;
assert!(failfast.is_active(), "failfast should be active");
assert!(gate_rx.is_shut(), "gate should be shut");
}
}
77 changes: 77 additions & 0 deletions linkerd/proxy/pool/src/future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
//! Future types for the [`Buffer`] middleware.
//!
//! [`Buffer`]: crate::buffer::Buffer
olix0r marked this conversation as resolved.
Show resolved Hide resolved

use super::{error::Closed, message};
use futures::ready;
use linkerd_error::Error;
use pin_project::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

#[pin_project]
/// Future that completes when the buffered service eventually services the submitted request.
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
state: ResponseState<T>,
}

#[pin_project(project = ResponseStateProj)]
#[derive(Debug)]
enum ResponseState<T> {
Failed {
error: Option<Error>,
},
Rx {
#[pin]
rx: message::Rx<T>,
},
Poll {
#[pin]
fut: T,
},
}

impl<T> ResponseFuture<T> {
pub(crate) fn new(rx: message::Rx<T>) -> Self {
ResponseFuture {
state: ResponseState::Rx { rx },
}
}

pub(crate) fn failed(err: Error) -> Self {

Check warning on line 46 in linkerd/proxy/pool/src/future.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/proxy/pool/src/future.rs#L46

Added line #L46 was not covered by tests
ResponseFuture {
state: ResponseState::Failed { error: Some(err) },

Check warning on line 48 in linkerd/proxy/pool/src/future.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/proxy/pool/src/future.rs#L48

Added line #L48 was not covered by tests
}
}
}

impl<F, T, E> Future for ResponseFuture<F>
where
F: Future<Output = Result<T, E>>,
E: Into<Error>,
{
type Output = Result<T, Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

loop {
match this.state.as_mut().project() {
ResponseStateProj::Failed { error } => {
return Poll::Ready(Err(error.take().expect("polled after error")));

Check warning on line 66 in linkerd/proxy/pool/src/future.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/proxy/pool/src/future.rs#L65-L66

Added lines #L65 - L66 were not covered by tests
}
ResponseStateProj::Rx { rx } => match ready!(rx.poll(cx)) {
Ok(Ok(fut)) => this.state.set(ResponseState::Poll { fut }),
Ok(Err(e)) => return Poll::Ready(Err(e)),
Err(_) => return Poll::Ready(Err(Closed::new().into())),

Check warning on line 71 in linkerd/proxy/pool/src/future.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/proxy/pool/src/future.rs#L71

Added line #L71 was not covered by tests
},
ResponseStateProj::Poll { fut } => return fut.poll(cx).map_err(Into::into),
}
}
}
}
39 changes: 39 additions & 0 deletions linkerd/proxy/pool/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//! Adapted from [`tower::buffer`][buffer].
//!
//! [buffer]: https://github.com/tower-rs/tower/tree/bf4ea948346c59a5be03563425a7d9f04aadedf2/tower/src/buffer
//
// Copyright (c) 2019 Tower Contributors

#![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)]
#![forbid(unsafe_code)]

mod error;
mod failfast;
mod future;
mod message;
mod service;
#[cfg(test)]
mod tests;
mod worker;

pub use self::service::PoolQueue;
pub use linkerd_proxy_core::Update;

use linkerd_stack::Service;

/// A collection of services updated from a resolution.
pub trait Pool<T, Req>: Service<Req> {
/// Updates the pool's endpoints.
fn update_pool(&mut self, update: Update<T>);

/// Polls pending endpoints to ready.
///
/// This is complementary to [`Service::poll_ready`], which may also handle
/// driving endpoints to ready. Unlike [`Service::poll_ready`], which
/// returns ready when *at least one* inner endpoint is ready,
/// [`Pool::poll_pool`] returns ready when *all* inner endpoints are ready.
olix0r marked this conversation as resolved.
Show resolved Hide resolved
fn poll_pool(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>>;
}
26 changes: 26 additions & 0 deletions linkerd/proxy/pool/src/message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use linkerd_error::Result;
use tokio::{sync::oneshot, time};

/// Message sent over buffer
#[derive(Debug)]
pub(crate) struct Message<Req, Fut> {
pub(crate) req: Req,
pub(crate) tx: Tx<Fut>,
pub(crate) span: tracing::Span,
pub(crate) t0: time::Instant,
}

/// Response sender
type Tx<Fut> = oneshot::Sender<Result<Fut>>;

/// Response receiver
pub(crate) type Rx<Fut> = oneshot::Receiver<Result<Fut>>;

impl<Req, Fut> Message<Req, Fut> {
pub(crate) fn channel(req: Req) -> (Self, Rx<Fut>) {
let (tx, rx) = oneshot::channel();
let t0 = time::Instant::now();
let span = tracing::Span::current();
(Message { req, span, tx, t0 }, rx)
}
}
Loading