From 2ddf01e72f682a6ec091f9925b2f60962fde6f56 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 1 Dec 2023 01:06:28 +0000 Subject: [PATCH 01/13] Add the PoolQueue middleware Tower's Buffer middleware is used to make a Service shareable across tasks. Within Linkerd, we've augmented the Buffer with failfast behavior, etc, as a Queue middleware. This queue plays a vital role in dispatching requests to load balancers, ensuring that load is shed when a balancer has no available endpoints. The tower Balancer additionally holds a Stream of endpoint updates that are processed as the balancer is driven to readiness. Crucially, this means that updates to the balancer can only occur while requests are being processed. We cannot eagerly drop defunct endpoints, nor can we eagerly connect to new endpoints. We have observed situations where long-lived, mostly-idle balancers can buffer discovery updates indefinitely, bloating memory and even forcing backpressure to the control plane. To correct this behavior, this change introduces the PoolQueue middleware. The PoolQueue is based on the tower's Buffer, but instead of composing over an inner Service, it composes over an inner Pool. Pool is a new interface that provides an additional interface to update the pool's members and to drive all pending endpoints in the pool to be ready (decoupling the semantics of Service::poll_ready and Pool::poll_pool). Pool implementations will typically hold a ReadyCache of inner endpoints (as the tower balancer does). This change, however, does not include the concrete implementation of a Pool to be replace the balancer. A p2c pool will be introduced in a followup change. This change has the added benefit of simplifying the endpoint discovery pipeline. We currently process Updates (including an array of endpoints) from the discovery API and convert that into a stream of discrete endpoint updates for the balancer, requiring redundant caching. The Pool interface processes Updates directly, so there is no need for the extra translation. --- Cargo.lock | 20 ++ Cargo.toml | 1 + linkerd/proxy/pool/Cargo.toml | 28 +++ linkerd/proxy/pool/src/error.rs | 69 ++++++ linkerd/proxy/pool/src/failfast.rs | 130 ++++++++++ linkerd/proxy/pool/src/future.rs | 77 ++++++ linkerd/proxy/pool/src/lib.rs | 39 +++ linkerd/proxy/pool/src/message.rs | 26 ++ linkerd/proxy/pool/src/service.rs | 120 +++++++++ linkerd/proxy/pool/src/tests.rs | 343 ++++++++++++++++++++++++++ linkerd/proxy/pool/src/tests/mock.rs | 90 +++++++ linkerd/proxy/pool/src/worker.rs | 349 +++++++++++++++++++++++++++ linkerd/stack/src/failfast.rs | 2 +- 13 files changed, 1293 insertions(+), 1 deletion(-) create mode 100644 linkerd/proxy/pool/Cargo.toml create mode 100644 linkerd/proxy/pool/src/error.rs create mode 100644 linkerd/proxy/pool/src/failfast.rs create mode 100644 linkerd/proxy/pool/src/future.rs create mode 100644 linkerd/proxy/pool/src/lib.rs create mode 100644 linkerd/proxy/pool/src/message.rs create mode 100644 linkerd/proxy/pool/src/service.rs create mode 100644 linkerd/proxy/pool/src/tests.rs create mode 100644 linkerd/proxy/pool/src/tests/mock.rs create mode 100644 linkerd/proxy/pool/src/worker.rs diff --git a/Cargo.lock b/Cargo.lock index b7503bf74f..833af50ffd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1797,6 +1797,26 @@ dependencies = [ "tracing", ] +[[package]] +name = "linkerd-proxy-pool" +version = "0.1.0" +dependencies = [ + "futures", + "linkerd-error", + "linkerd-proxy-core", + "linkerd-stack", + "linkerd-tracing", + "parking_lot", + "pin-project", + "thiserror", + "tokio", + "tokio-stream", + "tokio-test", + "tokio-util", + "tower-test", + "tracing", +] + [[package]] name = "linkerd-proxy-resolve" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 118ce889c2..a00b13f1f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,7 @@ members = [ "linkerd/proxy/dns-resolve", "linkerd/proxy/http", "linkerd/proxy/identity-client", + "linkerd/proxy/pool", "linkerd/proxy/resolve", "linkerd/proxy/server-policy", "linkerd/proxy/tap", diff --git a/linkerd/proxy/pool/Cargo.toml b/linkerd/proxy/pool/Cargo.toml new file mode 100644 index 0000000000..ab1f8ac7eb --- /dev/null +++ b/linkerd/proxy/pool/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "linkerd-proxy-pool" +version = "0.1.0" +authors = ["Linkerd Developers "] +license = "Apache-2.0" +edition = "2021" +publish = false + +[dependencies] +futures = { version = "0.3", default-features = false } +linkerd-error = { path = "../../error" } +# linkerd-metrics = { path = "../../metrics" } +linkerd-proxy-core = { path = "../core" } +linkerd-stack = { path = "../../stack" } +parking_lot = "0.12" +pin-project = "1" +# rand = "0.8" +thiserror = "1" +tokio = { version = "1", features = ["rt", "sync", "time"] } +# tokio-stream = { version = "0.1", features = ["sync"] } +tokio-util = "0.7" +tracing = "0.1" + +[dev-dependencies] +linkerd-tracing = { path = "../../tracing" } +tokio-stream = { version = "0.1", features = ["sync"] } +tokio-test = "0.4" +tower-test = "0.4" diff --git a/linkerd/proxy/pool/src/error.rs b/linkerd/proxy/pool/src/error.rs new file mode 100644 index 0000000000..511089449a --- /dev/null +++ b/linkerd/proxy/pool/src/error.rs @@ -0,0 +1,69 @@ +//! Error types for the `Buffer` middleware. + +use linkerd_error::Error; +use std::{fmt, sync::Arc}; + +/// A shareable, terminal error produced by either a service or discovery +/// resolution. +/// +/// [`Service`]: crate::Service +/// [`Buffer`]: crate::buffer::Buffer +#[derive(Clone, Debug)] +pub struct TerminalFailure { + inner: Arc, +} + +/// An error produced when the a buffer's worker closes unexpectedly. +pub struct Closed { + _p: (), +} + +// ===== impl ServiceError ===== + +impl TerminalFailure { + pub(crate) fn new(inner: Error) -> TerminalFailure { + let inner = Arc::new(inner); + TerminalFailure { inner } + } + + // Private to avoid exposing `Clone` trait as part of the public API + pub(crate) fn clone(&self) -> TerminalFailure { + TerminalFailure { + inner: self.inner.clone(), + } + } +} + +impl fmt::Display for TerminalFailure { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "buffered service failed: {}", self.inner) + } +} + +impl std::error::Error for TerminalFailure { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + Some(&**self.inner) + } +} + +// ===== impl Closed ===== + +impl Closed { + pub(crate) fn new() -> Self { + Closed { _p: () } + } +} + +impl fmt::Debug for Closed { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_tuple("Closed").finish() + } +} + +impl fmt::Display for Closed { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.write_str("buffer's worker closed unexpectedly") + } +} + +impl std::error::Error for Closed {} diff --git a/linkerd/proxy/pool/src/failfast.rs b/linkerd/proxy/pool/src/failfast.rs new file mode 100644 index 0000000000..c2bc51ffa2 --- /dev/null +++ b/linkerd/proxy/pool/src/failfast.rs @@ -0,0 +1,130 @@ +use linkerd_stack::gate; +use std::pin::Pin; +use tokio::time; + +/// Manages the failfast state for a pool. +#[derive(Debug)] +pub(super) struct Failfast { + timeout: time::Duration, + sleep: Pin>, + state: Option, + gate: gate::Tx, +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub(super) enum State { + Waiting { since: time::Instant }, + Failfast { since: time::Instant }, +} + +// === impl Failfast === + +impl Failfast { + pub(super) fn new(timeout: time::Duration, gate: gate::Tx) -> Self { + Self { + timeout, + sleep: Box::pin(time::sleep(time::Duration::MAX)), + state: None, + gate, + } + } + + pub(super) fn duration(&self) -> time::Duration { + self.timeout + } + + /// Returns true if we are currently in a failfast state. + pub(super) fn is_active(&self) -> bool { + matches!(self.state, Some(State::Failfast { .. })) + } + + /// Clears any waiting or failfast state. + pub(super) fn set_ready(&mut self) -> Option { + let state = self.state.take()?; + if matches!(state, State::Failfast { .. }) { + tracing::trace!("Exiting failfast"); + let _ = self.gate.open(); + } + Some(state) + } + + /// Waits for the failfast timeout to expire and enters the failfast state. + pub(super) async fn timeout(&mut self) { + let since = match self.state { + // If we're already in failfast, then we don't need to wait. + Some(State::Failfast { .. }) => { + return; + } + + // Ensure that the timer's been initialized. + Some(State::Waiting { since }) => since, + None => { + let now = time::Instant::now(); + self.sleep.as_mut().reset(now + self.timeout); + self.state = Some(State::Waiting { since: now }); + now + } + }; + + // Wait for the failfast timer to expire. + tracing::trace!("Waiting for failfast timeout"); + self.sleep.as_mut().await; + tracing::trace!("Entering failfast"); + + // Once we enter failfast, shut the upstream gate so that we can + // advertise backpressure past the queue. + self.state = Some(State::Failfast { since }); + let _ = self.gate.shut(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::prelude::*; + + #[tokio::test(flavor = "current_thread", start_paused = true)] + async fn failfast() { + let (tx, gate_rx) = gate::channel(); + let dur = time::Duration::from_secs(1); + let mut failfast = Failfast::new(dur, tx); + + assert_eq!(dur, failfast.duration()); + assert!(gate_rx.is_open()); + + // The failfast timeout should not be initialized until the first + // request is received. + assert!(!failfast.is_active(), "failfast should be active"); + + failfast.timeout().await; + assert!(failfast.is_active(), "failfast should be active"); + assert!(gate_rx.is_shut(), "gate should be shut"); + + failfast + .timeout() + .now_or_never() + .expect("timeout must return immediately when in failfast"); + assert!(failfast.is_active(), "failfast should be active"); + assert!(gate_rx.is_shut(), "gate should be shut"); + + failfast.set_ready(); + assert!(!failfast.is_active(), "failfast should be inactive"); + assert!(gate_rx.is_open(), "gate should be open"); + + tokio::select! { + _ = time::sleep(time::Duration::from_millis(10)) => {} + _ = failfast.timeout() => unreachable!("timed out too quick"), + } + assert!(!failfast.is_active(), "failfast should be inactive"); + assert!(gate_rx.is_open(), "gate should be open"); + + assert!( + matches!(failfast.state, Some(State::Waiting { .. })), + "failfast should be waiting" + ); + + failfast.timeout().await; + assert!(failfast.is_active(), "failfast should be active"); + assert!(gate_rx.is_shut(), "gate should be shut"); + } +} diff --git a/linkerd/proxy/pool/src/future.rs b/linkerd/proxy/pool/src/future.rs new file mode 100644 index 0000000000..ce5bd24a13 --- /dev/null +++ b/linkerd/proxy/pool/src/future.rs @@ -0,0 +1,77 @@ +//! Future types for the [`Buffer`] middleware. +//! +//! [`Buffer`]: crate::buffer::Buffer + +use super::{error::Closed, message}; +use futures::ready; +use linkerd_error::Error; +use pin_project::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +#[pin_project] +/// Future that completes when the buffered service eventually services the submitted request. +#[derive(Debug)] +pub struct ResponseFuture { + #[pin] + state: ResponseState, +} + +#[pin_project(project = ResponseStateProj)] +#[derive(Debug)] +enum ResponseState { + Failed { + error: Option, + }, + Rx { + #[pin] + rx: message::Rx, + }, + Poll { + #[pin] + fut: T, + }, +} + +impl ResponseFuture { + pub(crate) fn new(rx: message::Rx) -> Self { + ResponseFuture { + state: ResponseState::Rx { rx }, + } + } + + pub(crate) fn failed(err: Error) -> Self { + ResponseFuture { + state: ResponseState::Failed { error: Some(err) }, + } + } +} + +impl Future for ResponseFuture +where + F: Future>, + E: Into, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + loop { + match this.state.as_mut().project() { + ResponseStateProj::Failed { error } => { + return Poll::Ready(Err(error.take().expect("polled after error"))); + } + ResponseStateProj::Rx { rx } => match ready!(rx.poll(cx)) { + Ok(Ok(fut)) => this.state.set(ResponseState::Poll { fut }), + Ok(Err(e)) => return Poll::Ready(Err(e)), + Err(_) => return Poll::Ready(Err(Closed::new().into())), + }, + ResponseStateProj::Poll { fut } => return fut.poll(cx).map_err(Into::into), + } + } + } +} diff --git a/linkerd/proxy/pool/src/lib.rs b/linkerd/proxy/pool/src/lib.rs new file mode 100644 index 0000000000..44ca4604cb --- /dev/null +++ b/linkerd/proxy/pool/src/lib.rs @@ -0,0 +1,39 @@ +//! Adapted from [`tower::buffer`][buffer]. +//! +//! [buffer]: https://github.com/tower-rs/tower/tree/bf4ea948346c59a5be03563425a7d9f04aadedf2/tower/src/buffer +// +// Copyright (c) 2019 Tower Contributors + +#![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)] +#![forbid(unsafe_code)] + +mod error; +mod failfast; +mod future; +mod message; +mod service; +#[cfg(test)] +mod tests; +mod worker; + +pub use self::service::PoolQueue; +pub use linkerd_proxy_core::Update; + +use linkerd_stack::Service; + +/// A collection of services updated from a resolution. +pub trait Pool: Service { + /// Updates the pool's endpoints. + fn update_pool(&mut self, update: Update); + + /// Polls pending endpoints to ready. + /// + /// This is complementary to [`Service::poll_ready`], which may also handle + /// driving endpoints to ready. Unlike [`Service::poll_ready`], which + /// returns ready when *at least one* inner endpoint is ready, + /// [`Pool::poll_pool`] returns ready when *all* inner endpoints are ready. + fn poll_pool( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll>; +} diff --git a/linkerd/proxy/pool/src/message.rs b/linkerd/proxy/pool/src/message.rs new file mode 100644 index 0000000000..60bb4d77a5 --- /dev/null +++ b/linkerd/proxy/pool/src/message.rs @@ -0,0 +1,26 @@ +use linkerd_error::Result; +use tokio::{sync::oneshot, time}; + +/// Message sent over buffer +#[derive(Debug)] +pub(crate) struct Message { + pub(crate) req: Req, + pub(crate) tx: Tx, + pub(crate) span: tracing::Span, + pub(crate) t0: time::Instant, +} + +/// Response sender +type Tx = oneshot::Sender>; + +/// Response receiver +pub(crate) type Rx = oneshot::Receiver>; + +impl Message { + pub(crate) fn channel(req: Req) -> (Self, Rx) { + let (tx, rx) = oneshot::channel(); + let t0 = time::Instant::now(); + let span = tracing::Span::current(); + (Message { req, span, tx, t0 }, rx) + } +} diff --git a/linkerd/proxy/pool/src/service.rs b/linkerd/proxy/pool/src/service.rs new file mode 100644 index 0000000000..9901d5baf5 --- /dev/null +++ b/linkerd/proxy/pool/src/service.rs @@ -0,0 +1,120 @@ +use crate::{error, future::ResponseFuture, message::Message, worker, Pool}; +use futures::TryStream; +use linkerd_error::{Error, Result}; +use linkerd_proxy_core::Update; +use linkerd_stack::{gate, Service}; +use parking_lot::Mutex; +use std::{ + future::Future, + sync::Arc, + task::{Context, Poll}, +}; +use tokio::{sync::mpsc, time}; +use tokio_util::sync::PollSender; + +/// A shareable service backed by a dynamic endpoint. +#[derive(Debug)] +pub struct PoolQueue { + tx: PollSender>, + terminal: Arc>>, +} + +/// Provides a copy of the terminal failure error to all handles. +#[derive(Clone, Debug, Default)] +pub(crate) struct Terminate { + inner: Arc>>, +} + +// === impl SharedTerminalFailure === + +impl Terminate { + pub(crate) fn send(self, error: error::TerminalFailure) { + *self.inner.lock() = Some(error); + } +} + +impl PoolQueue +where + Req: Send + 'static, + F: Send + 'static, +{ + pub fn spawn( + capacity: usize, + failfast: time::Duration, + resolution: R, + pool: P, + ) -> gate::Gate + where + T: Clone + Eq + std::fmt::Debug + Send, + R: TryStream> + Send + Unpin + 'static, + R::Error: Into + Send, + P: Pool + Send + 'static, + P::Error: Into + Send + Sync, + Req: Send + 'static, + { + let (gate_tx, gate_rx) = gate::channel(); + let (tx, rx) = mpsc::channel(capacity); + let inner = Self::new(tx); + let terminate = Terminate { + inner: inner.terminal.clone(), + }; + worker::spawn(rx, failfast, gate_tx, terminate, resolution, pool); + gate::Gate::new(gate_rx, inner) + } + + fn new(tx: mpsc::Sender>) -> Self { + Self { + tx: PollSender::new(tx), + terminal: Default::default(), + } + } + + #[inline] + fn error_or_closed(&self) -> Error { + (*self.terminal.lock()) + .clone() + .map(Into::into) + .unwrap_or_else(|| error::Closed::new().into()) + } +} + +impl Service for PoolQueue +where + Req: Send + 'static, + F: Future> + Send + 'static, + E: Into, +{ + type Response = Rsp; + type Error = Error; + type Future = ResponseFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + let poll = self.tx.poll_reserve(cx).map_err(|_| self.error_or_closed()); + tracing::trace!(?poll); + poll + } + + fn call(&mut self, req: Req) -> Self::Future { + tracing::trace!("Sending request to worker"); + let (msg, rx) = Message::channel(req); + if self.tx.send_item(msg).is_err() { + // The channel closed since poll_ready was called, so propagate the + // failure in the response future. + return ResponseFuture::failed(self.error_or_closed()); + } + ResponseFuture::new(rx) + } +} + +impl Clone for PoolQueue +where + Req: Send + 'static, + F: Send + 'static, +{ + fn clone(&self) -> Self { + Self { + terminal: self.terminal.clone(), + tx: self.tx.clone(), + } + } +} diff --git a/linkerd/proxy/pool/src/tests.rs b/linkerd/proxy/pool/src/tests.rs new file mode 100644 index 0000000000..0233381316 --- /dev/null +++ b/linkerd/proxy/pool/src/tests.rs @@ -0,0 +1,343 @@ +use crate::PoolQueue; +use futures::prelude::*; +use linkerd_proxy_core::Update; +use linkerd_stack::{Service, ServiceExt}; +use tokio::{sync::mpsc, time}; +use tokio_stream::wrappers::ReceiverStream; + +mod mock; + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn processes_requests() { + let _trace = linkerd_tracing::test::with_default_filter("linkerd=trace"); + + let (pool, mut handle) = mock::pool::<(), (), ()>(); + let (_updates, u) = mpsc::channel::, mock::ResolutionError>>(1); + let mut poolq = PoolQueue::spawn( + 1, + time::Duration::from_secs(1), + ReceiverStream::from(u), + pool, + ); + + handle.svc.allow(1); + assert!(poolq.ready().now_or_never().expect("ready").is_ok()); + let call = poolq.call(()); + let ((), respond) = handle.svc.next_request().await.expect("request"); + respond.send_response(()); + call.await.expect("response"); +} + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn processes_requests_cloned() { + let _trace = linkerd_tracing::test::with_default_filter("linkerd=trace"); + + let (pool, mut handle) = mock::pool::<(), (), ()>(); + let (_updates, u) = mpsc::channel::, mock::ResolutionError>>(1); + let mut poolq0 = PoolQueue::spawn( + 10, + time::Duration::from_secs(1), + ReceiverStream::from(u), + pool, + ); + let mut poolq1 = poolq0.clone(); + + handle.svc.allow(2); + assert!(poolq0.ready().now_or_never().expect("ready").is_ok()); + assert!(poolq1.ready().now_or_never().expect("ready").is_ok()); + let call0 = poolq0.call(()); + let call1 = poolq1.call(()); + + let ((), respond0) = handle.svc.next_request().await.expect("request"); + respond0.send_response(()); + call0.await.expect("response"); + + let ((), respond1) = handle.svc.next_request().await.expect("request"); + respond1.send_response(()); + call1.await.expect("response"); +} + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn limits_request_capacity() { + let _trace = linkerd_tracing::test::with_default_filter("linkerd=trace"); + + let (pool, mut handle) = mock::pool::<(), (), ()>(); + let (_updates, u) = mpsc::channel::, mock::ResolutionError>>(1); + let mut poolq0 = PoolQueue::spawn( + 1, + time::Duration::from_secs(1), + ReceiverStream::from(u), + pool, + ); + let mut poolq1 = poolq0.clone(); + + handle.svc.allow(0); + assert!(poolq0.ready().now_or_never().expect("ready").is_ok()); + let mut _call0 = poolq0.call(()); + + assert!( + poolq0.ready().now_or_never().is_none(), + "poolq must not be ready when at capacity" + ); + assert!( + poolq1.ready().now_or_never().is_none(), + "poolq must not be ready when at capacity" + ); +} + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn waits_for_endpoints() { + let _trace = linkerd_tracing::test::with_default_filter("linkerd=trace"); + + let (pool, mut handle) = mock::pool::<(), (), ()>(); + let (updates, u) = mpsc::channel::, mock::ResolutionError>>(1); + let mut poolq = PoolQueue::spawn( + 1, + time::Duration::from_secs(1), + ReceiverStream::from(u), + pool, + ); + + handle.svc.allow(0); + assert!(poolq.ready().await.is_ok(), "poolq must be ready"); + let call = poolq.call(()); + + updates + .try_send(Ok(Update::Reset(vec![( + "192.168.1.44:80".parse().unwrap(), + (), + )]))) + .ok() + .expect("send update"); + handle.set_poll(std::task::Poll::Pending); + tokio::task::yield_now().await; + + handle.set_poll(std::task::Poll::Ready(Ok(()))); + handle.svc.allow(1); + tokio::task::yield_now().await; + + let ((), respond) = handle.svc.next_request().await.expect("request"); + respond.send_response(()); + call.await.expect("response"); +} + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn updates_while_idle() { + let _trace = linkerd_tracing::test::with_default_filter("linkerd=trace"); + + let (pool, mut handle) = mock::pool::<(), (), ()>(); + let (updates, u) = mpsc::channel::, mock::ResolutionError>>(1); + let mut _poolq = PoolQueue::spawn( + 1, + time::Duration::from_secs(1), + ReceiverStream::from(u), + pool, + ); + + handle.svc.allow(0); + updates + .try_send(Ok(Update::Reset(vec![( + "192.168.1.44:80".parse().unwrap(), + (), + )]))) + .ok() + .expect("send update"); + + tokio::task::yield_now().await; + assert_eq!( + handle.rx.try_recv().expect("must receive update"), + Update::Reset(vec![("192.168.1.44:80".parse().unwrap(), (),)]) + ); +} + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn complete_resolution() { + let _trace = linkerd_tracing::test::with_default_filter("linkerd=trace"); + + let (pool, mut handle) = mock::pool::<(), (), ()>(); + let (updates, u) = mpsc::channel::, mock::ResolutionError>>(1); + let mut poolq = PoolQueue::spawn( + 1, + time::Duration::from_secs(1), + ReceiverStream::from(u), + pool, + ); + + // When we drop the update stream, everything continues to work as long as + // the pool is ready. + handle.svc.allow(1); + assert!(poolq.ready().await.is_ok(), "poolq must be ready"); + drop(updates); + let call = poolq.call(()); + let ((), respond) = handle.svc.next_request().await.expect("request"); + respond.send_response(()); + assert!(call.await.is_ok()); + + handle.svc.allow(1); + assert!(poolq.ready().await.is_ok(), "poolq must be ready"); + let call = poolq.call(()); + let ((), respond) = handle.svc.next_request().await.expect("request"); + respond.send_response(()); + assert!(call.await.is_ok()); +} + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn error_resolution() { + let _trace = linkerd_tracing::test::with_default_filter("linkerd=trace"); + + let (pool, mut handle) = mock::pool::<(), (), ()>(); + let (updates, u) = mpsc::channel::, mock::ResolutionError>>(1); + let mut poolq = PoolQueue::spawn( + 10, + time::Duration::from_secs(1), + ReceiverStream::from(u), + pool, + ); + + handle.svc.allow(0); + + assert!(poolq.ready().await.is_ok(), "poolq must be ready"); + let call0 = poolq.call(()); + + updates + .try_send(Err(mock::ResolutionError)) + .ok() + .expect("send update"); + + call0.await.expect_err("response should fail"); + + assert!( + poolq.ready().await.is_err(), + "poolq must error after failed resolution" + ); +} + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn error_pool_while_pending() { + let _trace = linkerd_tracing::test::with_default_filter("linkerd=trace"); + + let (pool, mut handle) = mock::pool::<(), (), ()>(); + let (_updates, u) = mpsc::channel::, mock::ResolutionError>>(1); + let mut poolq = PoolQueue::spawn( + 10, + time::Duration::from_secs(1), + ReceiverStream::from(u), + pool, + ); + + handle.svc.allow(0); + handle.set_poll(std::task::Poll::Pending); + + assert!(poolq.ready().await.is_ok(), "poolq must be ready"); + let call = poolq.call(()); + handle.set_poll(std::task::Poll::Ready(Err(mock::PoolError))); + call.await.expect_err("response should fail"); +} + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn error_after_ready() { + let _trace = linkerd_tracing::test::with_default_filter("linkerd=trace"); + + let (pool, mut handle) = mock::pool::<(), (), ()>(); + let (updates, u) = mpsc::channel::, mock::ResolutionError>>(1); + let mut poolq = PoolQueue::spawn( + 10, + time::Duration::from_secs(1), + ReceiverStream::from(u), + pool, + ); + + handle.svc.allow(0); + assert!(poolq.ready().await.is_ok(), "poolq must be ready"); + updates + .try_send(Err(mock::ResolutionError)) + .ok() + .expect("send update"); + tokio::task::yield_now().await; + poolq.call(()).await.expect_err("response should fail"); +} + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn terminates() { + let _trace = linkerd_tracing::test::with_default_filter("linkerd=trace"); + + let (pool, mut handle) = mock::pool::<(), (), ()>(); + let (updates, u) = mpsc::channel::, mock::ResolutionError>>(1); + let mut poolq = PoolQueue::spawn( + 10, + time::Duration::from_secs(1), + ReceiverStream::from(u), + pool, + ); + + handle.svc.allow(0); + assert!(poolq.ready().await.is_ok(), "poolq must be ready"); + let call = poolq.call(()); + drop(poolq); + assert!( + call.await.is_err(), + "call should fail when queue is dropped" + ); + assert!(updates.is_closed()); +} + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn failfast() { + let _trace = linkerd_tracing::test::with_default_filter("linkerd=trace"); + + let (pool, mut handle) = mock::pool::<(), (), ()>(); + let (_updates, u) = mpsc::channel::, mock::ResolutionError>>(1); + let mut poolq = PoolQueue::spawn( + 10, + time::Duration::from_secs(1), + ReceiverStream::from(u), + pool, + ); + + handle.svc.allow(0); + assert!(poolq.ready().await.is_ok(), "poolq must be ready"); + let call = poolq.call(()); + time::sleep(time::Duration::from_secs(1)).await; + assert!(call.await.is_err(), "call should failfast"); + if let Ok(_) = time::timeout(time::Duration::from_secs(1), poolq.ready()).await { + panic!("queue should not be ready while in failfast"); + } + + handle.svc.allow(1); + tokio::task::yield_now().await; + tracing::info!("Waiting for poolq to exit failfast"); + assert!(poolq.ready().await.is_ok(), "poolq must be ready"); + // A delay doesn't impact failfast behavior when the pool is ready. + time::sleep(time::Duration::from_secs(1)).await; + let call = poolq.call(()); + let ((), respond) = handle.svc.next_request().await.expect("request"); + respond.send_response(()); + assert!(call.await.is_ok(), "call should not failfast"); + assert!(poolq.ready().await.is_ok(), "poolq must be ready"); +} + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn failfast_interrupted() { + let _trace = linkerd_tracing::test::with_default_filter("linkerd=trace"); + + let (pool, mut handle) = mock::pool::<(), (), ()>(); + let (_updates, u) = mpsc::channel::, mock::ResolutionError>>(1); + let mut poolq = PoolQueue::spawn( + 10, + time::Duration::from_secs(1), + ReceiverStream::from(u), + pool, + ); + + handle.svc.allow(0); + assert!(poolq.ready().await.is_ok(), "poolq must be ready"); + let call = poolq.call(()); + // Wait for half a failfast timeout and then allow the request to be + // processed. + time::sleep(time::Duration::from_secs_f64(0.5)).await; + handle.svc.allow(1); + let ((), respond) = handle.svc.next_request().await.expect("request"); + respond.send_response(()); + assert!(call.await.is_ok(), "call should not failfast"); + assert!(poolq.ready().await.is_ok(), "poolq must be ready"); +} diff --git a/linkerd/proxy/pool/src/tests/mock.rs b/linkerd/proxy/pool/src/tests/mock.rs new file mode 100644 index 0000000000..f8317532ff --- /dev/null +++ b/linkerd/proxy/pool/src/tests/mock.rs @@ -0,0 +1,90 @@ +use linkerd_error::Error; +use linkerd_proxy_core::Update; +use parking_lot::Mutex; +use std::{ + sync::Arc, + task::{Context, Poll, Waker}, +}; +use tokio::sync::mpsc; +use tower_test::mock; + +pub fn pool() -> (MockPool, PoolHandle) { + let state = Arc::new(Mutex::new(State { + poll: Poll::Ready(Ok(())), + waker: None, + })); + let (updates_tx, updates_rx) = mpsc::unbounded_channel(); + let (mock, svc) = mock::pair(); + let h = PoolHandle { + rx: updates_rx, + state: state.clone(), + svc, + }; + let p = MockPool { + tx: updates_tx, + state, + svc: mock, + }; + (p, h) +} + +pub struct MockPool { + tx: mpsc::UnboundedSender>, + state: Arc>, + svc: mock::Mock, +} + +pub struct PoolHandle { + state: Arc>, + pub rx: mpsc::UnboundedReceiver>, + pub svc: mock::Handle, +} + +struct State { + poll: Poll>, + waker: Option, +} + +#[derive(Clone, Copy, Debug, thiserror::Error)] +#[error("mock pool error")] +pub struct PoolError; + +#[derive(Clone, Copy, Debug, thiserror::Error)] +#[error("mock resolution error")] +pub struct ResolutionError; + +impl crate::Pool for MockPool { + fn update_pool(&mut self, update: Update) { + self.tx.send(update).ok().unwrap(); + } + + fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll> { + let mut s = self.state.lock(); + s.waker.replace(cx.waker().clone()); + s.poll.map_err(Into::into) + } +} + +impl linkerd_stack::Service for MockPool { + type Response = Rsp; + type Error = Error; + type Future = mock::future::ResponseFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.svc.poll_ready(cx) + } + + fn call(&mut self, req: Req) -> Self::Future { + self.svc.call(req) + } +} + +impl PoolHandle { + pub fn set_poll(&self, poll: Poll>) { + let mut s = self.state.lock(); + s.poll = poll; + if let Some(w) = s.waker.take() { + w.wake(); + } + } +} diff --git a/linkerd/proxy/pool/src/worker.rs b/linkerd/proxy/pool/src/worker.rs new file mode 100644 index 0000000000..f0e44a26aa --- /dev/null +++ b/linkerd/proxy/pool/src/worker.rs @@ -0,0 +1,349 @@ +use std::future::poll_fn; + +use crate::{ + error::TerminalFailure, + failfast::{self, Failfast}, + message::Message, + service::Terminate, + Pool, +}; +use futures::{future, TryStream, TryStreamExt}; +use linkerd_error::{Error, Result}; +use linkerd_proxy_core::Update; +use linkerd_stack::{gate, FailFastError, ServiceExt}; +use tokio::{sync::mpsc, task::JoinHandle, time}; +use tracing::{debug_span, Instrument}; + +#[derive(Debug)] +struct Worker { + pool: PoolDriver

, + discovery: Discovery, +} + +/// Manages the pool's readiness state, handling failfast timeouts. +#[derive(Debug)] +struct PoolDriver

{ + pool: P, + failfast: Failfast, +} + +/// Processes endpoint updates from service discovery. +#[derive(Debug)] +struct Discovery { + resolution: R, + closed: bool, +} + +/// Spawns a task that simultaneously updates a pool of services from a +/// discovery stream and dispatches requests to it. +/// +/// If the pool service does not become ready within the failfast timeout, then +/// request are failed with a FailFastError until the pool becomes ready. While +/// in the failfast state, the provided gate is shut so that the caller may +/// exert backpressure to eliminate requests from being added to the queue. +pub(crate) fn spawn( + mut reqs_rx: mpsc::Receiver>, + failfast: time::Duration, + gate: gate::Tx, + terminate: Terminate, + updates_rx: R, + pool: P, +) -> JoinHandle> +where + Req: Send + 'static, + T: Clone + Eq + std::fmt::Debug + Send, + R: TryStream> + Unpin + Send + 'static, + R::Error: Into + Send, + P: Pool + Send + 'static, + P::Future: Send + 'static, + P::Error: Into + Send, +{ + let mut terminate = Some(terminate); + let mut terminal_failure = None; + tokio::spawn( + async move { + let mut worker = Worker { + pool: PoolDriver::new(pool, Failfast::new(failfast, gate)), + discovery: Discovery::new(updates_rx), + }; + + loop { + // Drive the pool with discovery updates while waiting for a + // request. + // + // NOTE: We do NOT require that pool become ready before + // processing a request, so this technically means that the + // queue supports capacity + 1 items. This behavior is + // inherrited from tower::buffer. Correcting this is not worth + // the complexity. + let Message { req, tx, span, t0 } = tokio::select! { + biased; + + // If either the discovery stream or the pool fail, close + // the request stream and process any remaining requests. + e = worker.discover_while_awaiting_requests(), if terminal_failure.is_none() => { + let err = TerminalFailure::new(e); + terminate.take().expect("must not fail twice").send(err.clone()); + reqs_rx.close(); + tracing::trace!("Closed"); + terminal_failure = Some(err); + continue; + } + + msg = reqs_rx.recv() => match msg { + Some(msg) => msg, + None => { + tracing::debug!("Requests channel closed"); + return Ok(()); + } + }, + }; + + // Preserve the original request's tracing context. + let _enter = span.enter(); + + // Wait for the pool to have at least one ready endpoint. + if terminal_failure.is_none() { + tracing::trace!("Waiting for pool"); + if let Err(e) = worker.ready_pool().await { + let err = TerminalFailure::new(e); + terminate + .take() + .expect("must not fail twice") + .send(err.clone()); + reqs_rx.close(); + terminal_failure = Some(err); + tracing::trace!("Closed"); + } else { + tracing::trace!("Pool ready"); + } + } + + // Process requests, either by dispatching them to the pool or + // by serving errors directly. + let _ = if let Some(e) = terminal_failure.clone() { + tx.send(Err(e.into())) + } else { + tx.send(worker.pool.call(req)) + }; + + // TODO(ver) track histogram from t0 until the request is dispatched. + tracing::debug!( + latency = (time::Instant::now() - t0).as_secs_f64(), + "Dispatched" + ); + } + } + .instrument(debug_span!("pool")), + ) +} + +// === impl Worker === + +impl Worker +where + T: Clone + Eq + std::fmt::Debug, + R: TryStream> + Unpin, + R::Error: Into, +{ + /// Attempts to update the pool with discovery updates. + /// + /// Additionally, this attempts to drive the pool to ready if it is + /// currently in failfast. + /// + /// If the discovery stream is closed, this never returns. + async fn discover_while_awaiting_requests(&mut self) -> Error + where + P: Pool, + P::Error: Into, + { + tracing::trace!("Discovering while awaiting requests"); + + loop { + let update = tokio::select! { + e = self.pool.drive() => return e, + res = self.discovery.discover() => match res { + Err(e) => return e, + Ok(up) => up, + }, + }; + + tracing::debug!(?update, "Discovered"); + self.pool.pool.update_pool(update); + } + } + + /// Wait for the pool to have at least one ready endpoint, while also + /// processing service discovery updates (e.g. to provide new available + /// endpoints). + async fn ready_pool(&mut self) -> Result<(), Error> + where + P: Pool, + P::Error: Into, + { + loop { + tokio::select! { + // Tests, especially, depend on discovery updates being + // processed before ready returning. + biased; + + // If the pool updated, continue waiting for the pool to be + // ready. + res = self.discovery.discover() => { + let update = res?; + tracing::debug!(?update, "Discovered"); + self.pool.pool.update_pool(update); + } + + // When the pool is ready, clear any failfast state we may have + // set before returning. + res = self.pool.ready() => { + tracing::trace!(ready.ok = res.is_ok()); + return res; + } + } + } + } +} + +// === impl Discovery === + +impl Discovery +where + T: Clone + Eq + std::fmt::Debug, + R: TryStream> + Unpin, + R::Error: Into, +{ + fn new(resolution: R) -> Self { + Self { + resolution, + closed: false, + } + } + + /// Await the next service discovery update. + /// + /// If the discovery stream has closed, this never returns. + async fn discover(&mut self) -> Result, Error> { + if self.closed { + // Never returns. + return futures::future::pending().await; + } + + match self.resolution.try_next().await { + Ok(Some(up)) => Ok(up), + + Ok(None) => { + tracing::debug!("Resolution stream closed"); + self.closed = true; + // Never returns. + futures::future::pending().await + } + + Err(e) => { + let error = e.into(); + tracing::debug!(%error, "Resolution stream failed"); + self.closed = true; + Err(error) + } + } + } +} + +// === impl PoolDriver === + +impl

PoolDriver

{ + fn new(pool: P, failfast: Failfast) -> Self { + Self { pool, failfast } + } + + /// Drives all current endpoints to ready. + /// + /// If the service is in failfast, this clears the failfast state on readiness. + /// + /// If any endpoint fails, the error + /// is returned. + async fn drive(&mut self) -> Error + where + P: Pool, + P::Error: Into, + { + if self.failfast.is_active() { + tracing::trace!("Waiting to leave failfast"); + let res = self.pool.ready().await; + match self.failfast.set_ready() { + Some(failfast::State::Failfast { since }) => { + tracing::info!( + elapsed = (time::Instant::now() - since).as_secs_f64(), + "Available; exited failfast" + ); + } + _ => unreachable!("must be in failfast"), + } + if let Err(e) = res { + return e.into(); + } + } + + tracing::trace!("Driving pending endpoints"); + if let Err(e) = poll_fn(|cx| self.pool.poll_pool(cx)).await { + return e.into(); + } + + tracing::trace!("Driven"); + future::pending().await + } + + async fn ready(&mut self) -> Result<(), Error> + where + P: Pool, + P::Error: Into, + { + tokio::select! { + biased; + + res = self.pool.ready() => { + match self.failfast.set_ready() { + None => tracing::trace!("Ready"), + Some(failfast::State::Waiting { since }) => { + tracing::debug!( + elapsed = (time::Instant::now() - since).as_secs_f64(), + "Available" + ); + } + Some(failfast::State::Failfast { since }) => { + tracing::info!( + elapsed = (time::Instant::now() - since).as_secs_f64(), + "Available; exited failfast" + ); + } + } + if let Err(e) = res { + return Err(e.into()); + } + } + + () = self.failfast.timeout() => { + tracing::info!( + timeout = self.failfast.duration().as_secs_f64(), "Unavailable; entering failfast", + ); + } + } + + Ok(()) + } + + fn call(&mut self, req: Req) -> Result + where + P: Pool, + P::Error: Into, + { + // If we've tripped failfast, fail the request. + if self.failfast.is_active() { + return Err(FailFastError::default().into()); + } + + // Otherwise dispatch the request to the pool. + Ok(self.pool.call(req)) + } +} diff --git a/linkerd/stack/src/failfast.rs b/linkerd/stack/src/failfast.rs index 0f26dc02f2..fb96bd7dcd 100644 --- a/linkerd/stack/src/failfast.rs +++ b/linkerd/stack/src/failfast.rs @@ -44,7 +44,7 @@ pub struct FailFast { } /// An error representing that an operation timed out. -#[derive(Debug, Error)] +#[derive(Debug, Default, Error)] #[error("service in fail-fast")] pub struct FailFastError(()); From 55dc0595fc3fb3e9c2f8c8fd5b82510e25a1be3f Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 1 Dec 2023 01:36:54 +0000 Subject: [PATCH 02/13] lints --- linkerd/proxy/pool/src/tests.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/linkerd/proxy/pool/src/tests.rs b/linkerd/proxy/pool/src/tests.rs index 0233381316..05870d4eee 100644 --- a/linkerd/proxy/pool/src/tests.rs +++ b/linkerd/proxy/pool/src/tests.rs @@ -1,3 +1,5 @@ +#![allow(clippy::ok_expect)] + use crate::PoolQueue; use futures::prelude::*; use linkerd_proxy_core::Update; @@ -299,7 +301,10 @@ async fn failfast() { let call = poolq.call(()); time::sleep(time::Duration::from_secs(1)).await; assert!(call.await.is_err(), "call should failfast"); - if let Ok(_) = time::timeout(time::Duration::from_secs(1), poolq.ready()).await { + if time::timeout(time::Duration::from_secs(1), poolq.ready()) + .await + .is_ok() + { panic!("queue should not be ready while in failfast"); } From 220a7fe5fe8a0a880e7accaf949e739316e703f2 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 1 Dec 2023 13:50:47 +0000 Subject: [PATCH 03/13] thiserror --- linkerd/proxy/pool/src/error.rs | 33 ++++++--------------------------- 1 file changed, 6 insertions(+), 27 deletions(-) diff --git a/linkerd/proxy/pool/src/error.rs b/linkerd/proxy/pool/src/error.rs index 511089449a..c2e58d835c 100644 --- a/linkerd/proxy/pool/src/error.rs +++ b/linkerd/proxy/pool/src/error.rs @@ -14,24 +14,17 @@ pub struct TerminalFailure { } /// An error produced when the a buffer's worker closes unexpectedly. -pub struct Closed { - _p: (), -} +#[derive(Debug, thiserror::Error)] +#[error("buffer worker closed unexpectedly")] +pub struct Closed(()); -// ===== impl ServiceError ===== +// === impl ServiceError === impl TerminalFailure { pub(crate) fn new(inner: Error) -> TerminalFailure { let inner = Arc::new(inner); TerminalFailure { inner } } - - // Private to avoid exposing `Clone` trait as part of the public API - pub(crate) fn clone(&self) -> TerminalFailure { - TerminalFailure { - inner: self.inner.clone(), - } - } } impl fmt::Display for TerminalFailure { @@ -46,24 +39,10 @@ impl std::error::Error for TerminalFailure { } } -// ===== impl Closed ===== +// === impl Closed ==== impl Closed { pub(crate) fn new() -> Self { - Closed { _p: () } - } -} - -impl fmt::Debug for Closed { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_tuple("Closed").finish() - } -} - -impl fmt::Display for Closed { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.write_str("buffer's worker closed unexpectedly") + Closed(()) } } - -impl std::error::Error for Closed {} From 9be44abb1cdaaea2043521420f8a62dac8d9d12b Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 1 Dec 2023 21:10:15 +0000 Subject: [PATCH 04/13] comments --- linkerd/proxy/pool/src/error.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/linkerd/proxy/pool/src/error.rs b/linkerd/proxy/pool/src/error.rs index c2e58d835c..933f6b6234 100644 --- a/linkerd/proxy/pool/src/error.rs +++ b/linkerd/proxy/pool/src/error.rs @@ -1,13 +1,10 @@ -//! Error types for the `Buffer` middleware. +//! Error types for the `PoolQueue` middleware. use linkerd_error::Error; use std::{fmt, sync::Arc}; /// A shareable, terminal error produced by either a service or discovery /// resolution. -/// -/// [`Service`]: crate::Service -/// [`Buffer`]: crate::buffer::Buffer #[derive(Clone, Debug)] pub struct TerminalFailure { inner: Arc, @@ -18,7 +15,7 @@ pub struct TerminalFailure { #[error("buffer worker closed unexpectedly")] pub struct Closed(()); -// === impl ServiceError === +// === impl TerminalFailure === impl TerminalFailure { pub(crate) fn new(inner: Error) -> TerminalFailure { From 79dd7b4d0e37a70eb1ec721e652fd626fa77369c Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 1 Dec 2023 21:29:44 +0000 Subject: [PATCH 05/13] use a rwlock for shared errors --- linkerd/proxy/pool/src/service.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/linkerd/proxy/pool/src/service.rs b/linkerd/proxy/pool/src/service.rs index 9901d5baf5..9d0d6e7cf6 100644 --- a/linkerd/proxy/pool/src/service.rs +++ b/linkerd/proxy/pool/src/service.rs @@ -3,7 +3,7 @@ use futures::TryStream; use linkerd_error::{Error, Result}; use linkerd_proxy_core::Update; use linkerd_stack::{gate, Service}; -use parking_lot::Mutex; +use parking_lot::RwLock; use std::{ future::Future, sync::Arc, @@ -16,20 +16,20 @@ use tokio_util::sync::PollSender; #[derive(Debug)] pub struct PoolQueue { tx: PollSender>, - terminal: Arc>>, + terminal: Arc>>, } /// Provides a copy of the terminal failure error to all handles. #[derive(Clone, Debug, Default)] pub(crate) struct Terminate { - inner: Arc>>, + inner: Arc>>, } -// === impl SharedTerminalFailure === +// === impl Terminate === impl Terminate { pub(crate) fn send(self, error: error::TerminalFailure) { - *self.inner.lock() = Some(error); + *self.inner.write() = Some(error); } } @@ -71,7 +71,7 @@ where #[inline] fn error_or_closed(&self) -> Error { - (*self.terminal.lock()) + (*self.terminal.read()) .clone() .map(Into::into) .unwrap_or_else(|| error::Closed::new().into()) From 4c7c4c1f98e8f7b2ce703f2fded587531cd43845 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Sat, 2 Dec 2023 03:42:03 +0000 Subject: [PATCH 06/13] fixup tracing, comments --- linkerd/proxy/pool/src/future.rs | 4 ---- linkerd/proxy/pool/src/lib.rs | 10 +++++----- linkerd/proxy/pool/src/worker.rs | 28 ++++++++++++++++------------ 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/linkerd/proxy/pool/src/future.rs b/linkerd/proxy/pool/src/future.rs index ce5bd24a13..9fb088df9a 100644 --- a/linkerd/proxy/pool/src/future.rs +++ b/linkerd/proxy/pool/src/future.rs @@ -1,7 +1,3 @@ -//! Future types for the [`Buffer`] middleware. -//! -//! [`Buffer`]: crate::buffer::Buffer - use super::{error::Closed, message}; use futures::ready; use linkerd_error::Error; diff --git a/linkerd/proxy/pool/src/lib.rs b/linkerd/proxy/pool/src/lib.rs index 44ca4604cb..824a0cf002 100644 --- a/linkerd/proxy/pool/src/lib.rs +++ b/linkerd/proxy/pool/src/lib.rs @@ -26,12 +26,12 @@ pub trait Pool: Service { /// Updates the pool's endpoints. fn update_pool(&mut self, update: Update); - /// Polls pending endpoints to ready. + /// Polls to update the pool while the Service is ready. /// - /// This is complementary to [`Service::poll_ready`], which may also handle - /// driving endpoints to ready. Unlike [`Service::poll_ready`], which - /// returns ready when *at least one* inner endpoint is ready, - /// [`Pool::poll_pool`] returns ready when *all* inner endpoints are ready. + /// [`Service::poll_ready`] should do the same work, but will return ready + /// as soon as there at least one ready endpoint. This method will continue + /// to drive the pool until ready is returned (indicating that the pool need + /// not be updated before another request is processed). fn poll_pool( &mut self, cx: &mut std::task::Context<'_>, diff --git a/linkerd/proxy/pool/src/worker.rs b/linkerd/proxy/pool/src/worker.rs index f0e44a26aa..0c8374e526 100644 --- a/linkerd/proxy/pool/src/worker.rs +++ b/linkerd/proxy/pool/src/worker.rs @@ -99,9 +99,6 @@ where }, }; - // Preserve the original request's tracing context. - let _enter = span.enter(); - // Wait for the pool to have at least one ready endpoint. if terminal_failure.is_none() { tracing::trace!("Waiting for pool"); @@ -121,17 +118,24 @@ where // Process requests, either by dispatching them to the pool or // by serving errors directly. - let _ = if let Some(e) = terminal_failure.clone() { - tx.send(Err(e.into())) - } else { - tx.send(worker.pool.call(req)) + let call = match terminal_failure.clone() { + Some(e) => Err(e.into()), + None => { + // Preserve the original request's tracing context. + let _enter = span.enter(); + worker.pool.call(req) + } }; - // TODO(ver) track histogram from t0 until the request is dispatched. - tracing::debug!( - latency = (time::Instant::now() - t0).as_secs_f64(), - "Dispatched" - ); + if tx.send(call).is_ok() { + // TODO(ver) track histogram from t0 until the request is dispatched. + tracing::debug!( + latency = (time::Instant::now() - t0).as_secs_f64(), + "Dispatched" + ); + } else { + tracing::debug!("Caller dropped"); + } } } .instrument(debug_span!("pool")), From 16caec90414a6abd5916453158bcb1090fac3872 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Sat, 2 Dec 2023 15:21:51 +0000 Subject: [PATCH 07/13] comment --- linkerd/proxy/pool/src/worker.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/linkerd/proxy/pool/src/worker.rs b/linkerd/proxy/pool/src/worker.rs index 0c8374e526..a08d1bc388 100644 --- a/linkerd/proxy/pool/src/worker.rs +++ b/linkerd/proxy/pool/src/worker.rs @@ -155,7 +155,8 @@ where /// Additionally, this attempts to drive the pool to ready if it is /// currently in failfast. /// - /// If the discovery stream is closed, this never returns. + /// If the discovery stream is closed, this never returns. This only returns + /// errors. async fn discover_while_awaiting_requests(&mut self) -> Error where P: Pool, From 9becd3228ff14ec022f6c4af43bceb97772f4815 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Sat, 2 Dec 2023 16:33:21 +0000 Subject: [PATCH 08/13] log pool errors --- linkerd/proxy/pool/src/worker.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/linkerd/proxy/pool/src/worker.rs b/linkerd/proxy/pool/src/worker.rs index a08d1bc388..64c6a72b54 100644 --- a/linkerd/proxy/pool/src/worker.rs +++ b/linkerd/proxy/pool/src/worker.rs @@ -103,14 +103,14 @@ where if terminal_failure.is_none() { tracing::trace!("Waiting for pool"); if let Err(e) = worker.ready_pool().await { - let err = TerminalFailure::new(e); + let error = TerminalFailure::new(e); + tracing::debug!(%error, "Pool failed"); terminate .take() .expect("must not fail twice") - .send(err.clone()); + .send(error.clone()); reqs_rx.close(); - terminal_failure = Some(err); - tracing::trace!("Closed"); + terminal_failure = Some(error); } else { tracing::trace!("Pool ready"); } From 9cf23b52425a90c0f6a0972e619d93fb0e6ade78 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Sat, 2 Dec 2023 17:18:37 +0000 Subject: [PATCH 09/13] Move error handling and request draining into a dedicated helper --- linkerd/proxy/pool/src/service.rs | 56 ++++++------------ linkerd/proxy/pool/src/worker.rs | 95 +++++++++++++++++++------------ 2 files changed, 76 insertions(+), 75 deletions(-) diff --git a/linkerd/proxy/pool/src/service.rs b/linkerd/proxy/pool/src/service.rs index 9d0d6e7cf6..18da016d74 100644 --- a/linkerd/proxy/pool/src/service.rs +++ b/linkerd/proxy/pool/src/service.rs @@ -1,12 +1,15 @@ -use crate::{error, future::ResponseFuture, message::Message, worker, Pool}; +use crate::{ + future::ResponseFuture, + message::Message, + worker::{self, Terminate}, + Pool, +}; use futures::TryStream; use linkerd_error::{Error, Result}; use linkerd_proxy_core::Update; use linkerd_stack::{gate, Service}; -use parking_lot::RwLock; use std::{ future::Future, - sync::Arc, task::{Context, Poll}, }; use tokio::{sync::mpsc, time}; @@ -16,21 +19,7 @@ use tokio_util::sync::PollSender; #[derive(Debug)] pub struct PoolQueue { tx: PollSender>, - terminal: Arc>>, -} - -/// Provides a copy of the terminal failure error to all handles. -#[derive(Clone, Debug, Default)] -pub(crate) struct Terminate { - inner: Arc>>, -} - -// === impl Terminate === - -impl Terminate { - pub(crate) fn send(self, error: error::TerminalFailure) { - *self.inner.write() = Some(error); - } + terminal: Terminate, } impl PoolQueue @@ -54,28 +43,14 @@ where { let (gate_tx, gate_rx) = gate::channel(); let (tx, rx) = mpsc::channel(capacity); - let inner = Self::new(tx); - let terminate = Terminate { - inner: inner.terminal.clone(), + let terminal = Terminate::default(); + let inner = Self { + tx: PollSender::new(tx), + terminal: terminal.clone(), }; - worker::spawn(rx, failfast, gate_tx, terminate, resolution, pool); + worker::spawn(rx, failfast, gate_tx, terminal, resolution, pool); gate::Gate::new(gate_rx, inner) } - - fn new(tx: mpsc::Sender>) -> Self { - Self { - tx: PollSender::new(tx), - terminal: Default::default(), - } - } - - #[inline] - fn error_or_closed(&self) -> Error { - (*self.terminal.read()) - .clone() - .map(Into::into) - .unwrap_or_else(|| error::Closed::new().into()) - } } impl Service for PoolQueue @@ -89,7 +64,10 @@ where type Future = ResponseFuture; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - let poll = self.tx.poll_reserve(cx).map_err(|_| self.error_or_closed()); + let poll = self + .tx + .poll_reserve(cx) + .map_err(|_| self.terminal.error_or_closed()); tracing::trace!(?poll); poll } @@ -100,7 +78,7 @@ where if self.tx.send_item(msg).is_err() { // The channel closed since poll_ready was called, so propagate the // failure in the response future. - return ResponseFuture::failed(self.error_or_closed()); + return ResponseFuture::failed(self.terminal.error_or_closed()); } ResponseFuture::new(rx) } diff --git a/linkerd/proxy/pool/src/worker.rs b/linkerd/proxy/pool/src/worker.rs index 64c6a72b54..f05f21690b 100644 --- a/linkerd/proxy/pool/src/worker.rs +++ b/linkerd/proxy/pool/src/worker.rs @@ -1,19 +1,26 @@ use std::future::poll_fn; use crate::{ - error::TerminalFailure, + error, failfast::{self, Failfast}, message::Message, - service::Terminate, Pool, }; use futures::{future, TryStream, TryStreamExt}; use linkerd_error::{Error, Result}; use linkerd_proxy_core::Update; use linkerd_stack::{gate, FailFastError, ServiceExt}; +use parking_lot::RwLock; +use std::sync::Arc; use tokio::{sync::mpsc, task::JoinHandle, time}; use tracing::{debug_span, Instrument}; +/// Provides a copy of the terminal failure error to all handles. +#[derive(Clone, Debug, Default)] +pub(crate) struct Terminate { + inner: Arc>>, +} + #[derive(Debug)] struct Worker { pool: PoolDriver

, @@ -45,7 +52,7 @@ pub(crate) fn spawn( mut reqs_rx: mpsc::Receiver>, failfast: time::Duration, gate: gate::Tx, - terminate: Terminate, + terminal: Terminate, updates_rx: R, pool: P, ) -> JoinHandle> @@ -58,8 +65,6 @@ where P::Future: Send + 'static, P::Error: Into + Send, { - let mut terminate = Some(terminate); - let mut terminal_failure = None; tokio::spawn( async move { let mut worker = Worker { @@ -81,55 +86,40 @@ where // If either the discovery stream or the pool fail, close // the request stream and process any remaining requests. - e = worker.discover_while_awaiting_requests(), if terminal_failure.is_none() => { - let err = TerminalFailure::new(e); - terminate.take().expect("must not fail twice").send(err.clone()); - reqs_rx.close(); - tracing::trace!("Closed"); - terminal_failure = Some(err); - continue; + e = worker.discover_while_awaiting_requests() => { + terminal.close(reqs_rx, e).await; + return Ok(()); } msg = reqs_rx.recv() => match msg { Some(msg) => msg, None => { - tracing::debug!("Requests channel closed"); + tracing::debug!("Callers dropped"); return Ok(()); } }, }; - // Wait for the pool to have at least one ready endpoint. - if terminal_failure.is_none() { - tracing::trace!("Waiting for pool"); - if let Err(e) = worker.ready_pool().await { - let error = TerminalFailure::new(e); - tracing::debug!(%error, "Pool failed"); - terminate - .take() - .expect("must not fail twice") - .send(error.clone()); - reqs_rx.close(); - terminal_failure = Some(error); - } else { - tracing::trace!("Pool ready"); - } + // Wait for the pool to be ready to process a request. If this fails, we enter + tracing::trace!("Waiting for pool"); + if let Err(e) = worker.ready_pool().await { + terminal.close(reqs_rx, e).await; + return Ok(()); } + tracing::trace!("Pool ready"); // Process requests, either by dispatching them to the pool or // by serving errors directly. - let call = match terminal_failure.clone() { - Some(e) => Err(e.into()), - None => { - // Preserve the original request's tracing context. - let _enter = span.enter(); - worker.pool.call(req) - } + let call = { + // Preserve the original request's tracing context in + // the inner call. + let _enter = span.enter(); + worker.pool.call(req) }; if tx.send(call).is_ok() { // TODO(ver) track histogram from t0 until the request is dispatched. - tracing::debug!( + tracing::trace!( latency = (time::Instant::now() - t0).as_secs_f64(), "Dispatched" ); @@ -352,3 +342,36 @@ impl

PoolDriver

{ Ok(self.pool.call(req)) } } + +// === impl Terminate === + +impl Terminate { + #[inline] + pub(super) fn error_or_closed(&self) -> Error { + (*self.inner.read()) + .clone() + .map(Into::into) + .unwrap_or_else(|| error::Closed::new().into()) + } + + async fn close(self, mut reqs_rx: mpsc::Receiver>, error: Error) { + tracing::debug!(%error, "Closing pool"); + reqs_rx.close(); + + let error = error::TerminalFailure::new(error); + *self.inner.write() = Some(error.clone()); + + while let Some(Message { tx, t0, .. }) = reqs_rx.recv().await { + if tx.send(Err(error.clone().into())).is_ok() { + tracing::debug!( + latency = (time::Instant::now() - t0).as_secs_f64(), + "Failed due to pool error" + ); + } else { + tracing::debug!("Caller dropped"); + } + } + + tracing::debug!("Closed"); + } +} From f57eb03d9161ab392974c4d073f9ad5352398947 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Sat, 2 Dec 2023 17:32:05 +0000 Subject: [PATCH 10/13] Assert that errors are propagated Remove the error::Closed variant, since it indicates that the inner task panicked. It seems preferable to propagate the panic than to handle it gracefully. --- linkerd/proxy/pool/src/error.rs | 24 ++++-------------------- linkerd/proxy/pool/src/future.rs | 12 ++++++------ linkerd/proxy/pool/src/service.rs | 15 ++++++++++----- linkerd/proxy/pool/src/tests.rs | 18 ++++++++++++++++++ linkerd/proxy/pool/src/worker.rs | 7 ++----- 5 files changed, 40 insertions(+), 36 deletions(-) diff --git a/linkerd/proxy/pool/src/error.rs b/linkerd/proxy/pool/src/error.rs index 933f6b6234..c67ac5802a 100644 --- a/linkerd/proxy/pool/src/error.rs +++ b/linkerd/proxy/pool/src/error.rs @@ -6,40 +6,24 @@ use std::{fmt, sync::Arc}; /// A shareable, terminal error produced by either a service or discovery /// resolution. #[derive(Clone, Debug)] -pub struct TerminalFailure { - inner: Arc, -} - -/// An error produced when the a buffer's worker closes unexpectedly. -#[derive(Debug, thiserror::Error)] -#[error("buffer worker closed unexpectedly")] -pub struct Closed(()); +pub struct TerminalFailure(Arc); // === impl TerminalFailure === impl TerminalFailure { pub(crate) fn new(inner: Error) -> TerminalFailure { - let inner = Arc::new(inner); - TerminalFailure { inner } + TerminalFailure(Arc::new(inner)) } } impl fmt::Display for TerminalFailure { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "buffered service failed: {}", self.inner) + write!(fmt, "pool failed: {}", self.0) } } impl std::error::Error for TerminalFailure { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - Some(&**self.inner) - } -} - -// === impl Closed ==== - -impl Closed { - pub(crate) fn new() -> Self { - Closed(()) + Some(&**self.0) } } diff --git a/linkerd/proxy/pool/src/future.rs b/linkerd/proxy/pool/src/future.rs index 9fb088df9a..70881a42dd 100644 --- a/linkerd/proxy/pool/src/future.rs +++ b/linkerd/proxy/pool/src/future.rs @@ -1,4 +1,4 @@ -use super::{error::Closed, message}; +use super::message; use futures::ready; use linkerd_error::Error; use pin_project::pin_project; @@ -61,11 +61,11 @@ where ResponseStateProj::Failed { error } => { return Poll::Ready(Err(error.take().expect("polled after error"))); } - ResponseStateProj::Rx { rx } => match ready!(rx.poll(cx)) { - Ok(Ok(fut)) => this.state.set(ResponseState::Poll { fut }), - Ok(Err(e)) => return Poll::Ready(Err(e)), - Err(_) => return Poll::Ready(Err(Closed::new().into())), - }, + ResponseStateProj::Rx { rx } => { + let fut = ready!(rx.poll(cx)) + .expect("worker must set a failure if it exits prematurely")?; + this.state.set(ResponseState::Poll { fut }); + } ResponseStateProj::Poll { fut } => return fut.poll(cx).map_err(Into::into), } } diff --git a/linkerd/proxy/pool/src/service.rs b/linkerd/proxy/pool/src/service.rs index 18da016d74..ec87e82d90 100644 --- a/linkerd/proxy/pool/src/service.rs +++ b/linkerd/proxy/pool/src/service.rs @@ -64,10 +64,11 @@ where type Future = ResponseFuture; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - let poll = self - .tx - .poll_reserve(cx) - .map_err(|_| self.terminal.error_or_closed()); + let poll = self.tx.poll_reserve(cx).map_err(|_| { + self.terminal + .failure() + .expect("worker must set a failure if it exits prematurely") + }); tracing::trace!(?poll); poll } @@ -78,7 +79,11 @@ where if self.tx.send_item(msg).is_err() { // The channel closed since poll_ready was called, so propagate the // failure in the response future. - return ResponseFuture::failed(self.terminal.error_or_closed()); + return ResponseFuture::failed( + self.terminal + .failure() + .expect("worker must set a failure if it exits prematurely"), + ); } ResponseFuture::new(rx) } diff --git a/linkerd/proxy/pool/src/tests.rs b/linkerd/proxy/pool/src/tests.rs index 05870d4eee..883351dbfd 100644 --- a/linkerd/proxy/pool/src/tests.rs +++ b/linkerd/proxy/pool/src/tests.rs @@ -212,6 +212,12 @@ async fn error_resolution() { poolq.ready().await.is_err(), "poolq must error after failed resolution" ); + + poolq + .ready() + .await + .err() + .expect("poolq must error after failed resolution"); } #[tokio::test(flavor = "current_thread", start_paused = true)] @@ -234,6 +240,12 @@ async fn error_pool_while_pending() { let call = poolq.call(()); handle.set_poll(std::task::Poll::Ready(Err(mock::PoolError))); call.await.expect_err("response should fail"); + + poolq + .ready() + .await + .err() + .expect("poolq must error after pool error"); } #[tokio::test(flavor = "current_thread", start_paused = true)] @@ -257,6 +269,12 @@ async fn error_after_ready() { .expect("send update"); tokio::task::yield_now().await; poolq.call(()).await.expect_err("response should fail"); + + poolq + .ready() + .await + .err() + .expect("poolq must error after pool error"); } #[tokio::test(flavor = "current_thread", start_paused = true)] diff --git a/linkerd/proxy/pool/src/worker.rs b/linkerd/proxy/pool/src/worker.rs index f05f21690b..b759c519c1 100644 --- a/linkerd/proxy/pool/src/worker.rs +++ b/linkerd/proxy/pool/src/worker.rs @@ -347,11 +347,8 @@ impl

PoolDriver

{ impl Terminate { #[inline] - pub(super) fn error_or_closed(&self) -> Error { - (*self.inner.read()) - .clone() - .map(Into::into) - .unwrap_or_else(|| error::Closed::new().into()) + pub(super) fn failure(&self) -> Option { + (*self.inner.read()).clone().map(Into::into) } async fn close(self, mut reqs_rx: mpsc::Receiver>, error: Error) { From a0613268a81af40269e186b519a057bd6a6af17b Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Sat, 2 Dec 2023 18:38:11 +0000 Subject: [PATCH 11/13] Improve testing for error cases --- linkerd/proxy/pool/src/failfast.rs | 10 +-- linkerd/proxy/pool/src/message.rs | 13 +++- linkerd/proxy/pool/src/tests.rs | 20 +++++- linkerd/proxy/pool/src/tests/mock.rs | 4 ++ linkerd/proxy/pool/src/worker.rs | 94 ++++++++++++---------------- 5 files changed, 78 insertions(+), 63 deletions(-) diff --git a/linkerd/proxy/pool/src/failfast.rs b/linkerd/proxy/pool/src/failfast.rs index c2bc51ffa2..7b3105fb6a 100644 --- a/linkerd/proxy/pool/src/failfast.rs +++ b/linkerd/proxy/pool/src/failfast.rs @@ -49,7 +49,7 @@ impl Failfast { } /// Waits for the failfast timeout to expire and enters the failfast state. - pub(super) async fn timeout(&mut self) { + pub(super) async fn enter(&mut self) { let since = match self.state { // If we're already in failfast, then we don't need to wait. Some(State::Failfast { .. }) => { @@ -96,12 +96,12 @@ mod tests { // request is received. assert!(!failfast.is_active(), "failfast should be active"); - failfast.timeout().await; + failfast.enter().await; assert!(failfast.is_active(), "failfast should be active"); assert!(gate_rx.is_shut(), "gate should be shut"); failfast - .timeout() + .enter() .now_or_never() .expect("timeout must return immediately when in failfast"); assert!(failfast.is_active(), "failfast should be active"); @@ -113,7 +113,7 @@ mod tests { tokio::select! { _ = time::sleep(time::Duration::from_millis(10)) => {} - _ = failfast.timeout() => unreachable!("timed out too quick"), + _ = failfast.enter() => unreachable!("timed out too quick"), } assert!(!failfast.is_active(), "failfast should be inactive"); assert!(gate_rx.is_open(), "gate should be open"); @@ -123,7 +123,7 @@ mod tests { "failfast should be waiting" ); - failfast.timeout().await; + failfast.enter().await; assert!(failfast.is_active(), "failfast should be active"); assert!(gate_rx.is_shut(), "gate should be shut"); } diff --git a/linkerd/proxy/pool/src/message.rs b/linkerd/proxy/pool/src/message.rs index 60bb4d77a5..697d511490 100644 --- a/linkerd/proxy/pool/src/message.rs +++ b/linkerd/proxy/pool/src/message.rs @@ -1,4 +1,4 @@ -use linkerd_error::Result; +use linkerd_error::{Error, Result}; use tokio::{sync::oneshot, time}; /// Message sent over buffer @@ -23,4 +23,15 @@ impl Message { let span = tracing::Span::current(); (Message { req, span, tx, t0 }, rx) } + + pub(crate) fn fail(self, err: impl Into) { + if self.tx.send(Err(err.into())).is_ok() { + tracing::debug!( + latency = (time::Instant::now() - self.t0).as_secs_f64(), + "Failed due to pool error" + ); + } else { + tracing::debug!("Caller dropped"); + } + } } diff --git a/linkerd/proxy/pool/src/tests.rs b/linkerd/proxy/pool/src/tests.rs index 883351dbfd..331a1914ab 100644 --- a/linkerd/proxy/pool/src/tests.rs +++ b/linkerd/proxy/pool/src/tests.rs @@ -6,6 +6,7 @@ use linkerd_proxy_core::Update; use linkerd_stack::{Service, ServiceExt}; use tokio::{sync::mpsc, time}; use tokio_stream::wrappers::ReceiverStream; +use tokio_test::{assert_pending, assert_ready}; mod mock; @@ -238,12 +239,20 @@ async fn error_pool_while_pending() { assert!(poolq.ready().await.is_ok(), "poolq must be ready"); let call = poolq.call(()); + tokio::task::yield_now().await; + handle.set_poll(std::task::Poll::Ready(Err(mock::PoolError))); - call.await.expect_err("response should fail"); + tokio::task::yield_now().await; + call.now_or_never() + .expect("response should fail immediately") + .expect_err("response should fail"); + tracing::info!("Awaiting readiness failure"); + tokio::task::yield_now().await; poolq .ready() - .await + .now_or_never() + .expect("poolq readiness fail immediately") .err() .expect("poolq must error after pool error"); } @@ -293,12 +302,19 @@ async fn terminates() { handle.svc.allow(0); assert!(poolq.ready().await.is_ok(), "poolq must be ready"); let call = poolq.call(()); + assert_pending!(handle.svc.poll_request()); + drop(poolq); + assert!( call.await.is_err(), "call should fail when queue is dropped" ); assert!(updates.is_closed()); + assert!( + assert_ready!(handle.svc.poll_request(), "poll_request should be ready").is_none(), + "poll_request should return None" + ); } #[tokio::test(flavor = "current_thread", start_paused = true)] diff --git a/linkerd/proxy/pool/src/tests/mock.rs b/linkerd/proxy/pool/src/tests/mock.rs index f8317532ff..2eeb755fa3 100644 --- a/linkerd/proxy/pool/src/tests/mock.rs +++ b/linkerd/proxy/pool/src/tests/mock.rs @@ -71,6 +71,7 @@ impl linkerd_stack::Service for MockPool { type Future = mock::future::ResponseFuture; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + futures::ready!(crate::Pool::poll_pool(self, cx))?; self.svc.poll_ready(cx) } @@ -84,7 +85,10 @@ impl PoolHandle { let mut s = self.state.lock(); s.poll = poll; if let Some(w) = s.waker.take() { + tracing::trace!("Wake"); w.wake(); + } else { + tracing::trace!("No waker"); } } } diff --git a/linkerd/proxy/pool/src/worker.rs b/linkerd/proxy/pool/src/worker.rs index b759c519c1..b83a94719a 100644 --- a/linkerd/proxy/pool/src/worker.rs +++ b/linkerd/proxy/pool/src/worker.rs @@ -81,13 +81,13 @@ where // queue supports capacity + 1 items. This behavior is // inherrited from tower::buffer. Correcting this is not worth // the complexity. - let Message { req, tx, span, t0 } = tokio::select! { + let msg = tokio::select! { biased; // If either the discovery stream or the pool fail, close // the request stream and process any remaining requests. - e = worker.discover_while_awaiting_requests() => { - terminal.close(reqs_rx, e).await; + e = worker.drive_pool() => { + terminal.close(reqs_rx, error::TerminalFailure::new(e)).await; return Ok(()); } @@ -101,15 +101,18 @@ where }; // Wait for the pool to be ready to process a request. If this fails, we enter - tracing::trace!("Waiting for pool"); - if let Err(e) = worker.ready_pool().await { - terminal.close(reqs_rx, e).await; + tracing::trace!("Waiting for inner service readiness"); + if let Err(e) = worker.ready_pool_for_request().await { + let error = error::TerminalFailure::new(e); + msg.fail(error.clone()); + terminal.close(reqs_rx, error).await; return Ok(()); } tracing::trace!("Pool ready"); // Process requests, either by dispatching them to the pool or // by serving errors directly. + let Message { req, tx, span, t0 } = msg; let call = { // Preserve the original request's tracing context in // the inner call. @@ -140,14 +143,10 @@ where R: TryStream> + Unpin, R::Error: Into, { - /// Attempts to update the pool with discovery updates. + /// Drives the pool, processing discovery updates. /// - /// Additionally, this attempts to drive the pool to ready if it is - /// currently in failfast. - /// - /// If the discovery stream is closed, this never returns. This only returns - /// errors. - async fn discover_while_awaiting_requests(&mut self) -> Error + /// This never returns unless the pool or discovery stream fails. + async fn drive_pool(&mut self) -> Error where P: Pool, P::Error: Into, @@ -168,35 +167,24 @@ where } } - /// Wait for the pool to have at least one ready endpoint, while also - /// processing service discovery updates (e.g. to provide new available - /// endpoints). - async fn ready_pool(&mut self) -> Result<(), Error> + /// Waits for [`Service::poll_ready`], while also processing service + /// discovery updates (e.g. to provide new available endpoints). + async fn ready_pool_for_request(&mut self) -> Result<(), Error> where P: Pool, P::Error: Into, { loop { - tokio::select! { + let update = tokio::select! { // Tests, especially, depend on discovery updates being // processed before ready returning. biased; + res = self.discovery.discover() => res?, + res = self.pool.ready_or_failfast() => return res, + }; - // If the pool updated, continue waiting for the pool to be - // ready. - res = self.discovery.discover() => { - let update = res?; - tracing::debug!(?update, "Discovered"); - self.pool.pool.update_pool(update); - } - - // When the pool is ready, clear any failfast state we may have - // set before returning. - res = self.pool.ready() => { - tracing::trace!(ready.ok = res.is_ok()); - return res; - } - } + tracing::debug!(?update, "Discovered"); + self.pool.pool.update_pool(update); } } } @@ -252,12 +240,12 @@ impl

PoolDriver

{ Self { pool, failfast } } - /// Drives all current endpoints to ready. + /// Drives the inner pool, ensuring that the failfast state is cleared if appropriate. + /// [`Pool::poll_pool``]. This allows the pool to /// /// If the service is in failfast, this clears the failfast state on readiness. /// - /// If any endpoint fails, the error - /// is returned. + /// This only returns if the pool fails. async fn drive(&mut self) -> Error where P: Pool, @@ -280,16 +268,17 @@ impl

PoolDriver

{ } } - tracing::trace!("Driving pending endpoints"); + tracing::trace!("Driving pool"); if let Err(e) = poll_fn(|cx| self.pool.poll_pool(cx)).await { return e.into(); } - tracing::trace!("Driven"); + tracing::trace!("Pool driven"); future::pending().await } - async fn ready(&mut self) -> Result<(), Error> + /// Waits for the inner pool's [`Service::poll_ready`] to be ready, while + async fn ready_or_failfast(&mut self) -> Result<(), Error> where P: Pool, P::Error: Into, @@ -301,7 +290,7 @@ impl

PoolDriver

{ match self.failfast.set_ready() { None => tracing::trace!("Ready"), Some(failfast::State::Waiting { since }) => { - tracing::debug!( + tracing::trace!( elapsed = (time::Instant::now() - since).as_secs_f64(), "Available" ); @@ -309,7 +298,7 @@ impl

PoolDriver

{ Some(failfast::State::Failfast { since }) => { tracing::info!( elapsed = (time::Instant::now() - since).as_secs_f64(), - "Available; exited failfast" + "Available; exiting failfast" ); } } @@ -318,7 +307,7 @@ impl

PoolDriver

{ } } - () = self.failfast.timeout() => { + () = self.failfast.enter() => { tracing::info!( timeout = self.failfast.duration().as_secs_f64(), "Unavailable; entering failfast", ); @@ -351,22 +340,17 @@ impl Terminate { (*self.inner.read()).clone().map(Into::into) } - async fn close(self, mut reqs_rx: mpsc::Receiver>, error: Error) { + async fn close( + self, + mut reqs_rx: mpsc::Receiver>, + error: error::TerminalFailure, + ) { tracing::debug!(%error, "Closing pool"); - reqs_rx.close(); - - let error = error::TerminalFailure::new(error); *self.inner.write() = Some(error.clone()); + reqs_rx.close(); - while let Some(Message { tx, t0, .. }) = reqs_rx.recv().await { - if tx.send(Err(error.clone().into())).is_ok() { - tracing::debug!( - latency = (time::Instant::now() - t0).as_secs_f64(), - "Failed due to pool error" - ); - } else { - tracing::debug!("Caller dropped"); - } + while let Some(msg) = reqs_rx.recv().await { + msg.fail(error.clone()); } tracing::debug!("Closed"); From a8ee87c6eb2d7ade67ab8fa4ef714b70e430ec9d Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Sat, 2 Dec 2023 18:47:50 +0000 Subject: [PATCH 12/13] commentary/clarity --- linkerd/proxy/pool/src/failfast.rs | 10 +++++----- linkerd/proxy/pool/src/tests.rs | 5 ++++- linkerd/proxy/pool/src/worker.rs | 5 ++++- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/linkerd/proxy/pool/src/failfast.rs b/linkerd/proxy/pool/src/failfast.rs index 7b3105fb6a..ec474a4d41 100644 --- a/linkerd/proxy/pool/src/failfast.rs +++ b/linkerd/proxy/pool/src/failfast.rs @@ -49,7 +49,7 @@ impl Failfast { } /// Waits for the failfast timeout to expire and enters the failfast state. - pub(super) async fn enter(&mut self) { + pub(super) async fn entered(&mut self) { let since = match self.state { // If we're already in failfast, then we don't need to wait. Some(State::Failfast { .. }) => { @@ -96,12 +96,12 @@ mod tests { // request is received. assert!(!failfast.is_active(), "failfast should be active"); - failfast.enter().await; + failfast.entered().await; assert!(failfast.is_active(), "failfast should be active"); assert!(gate_rx.is_shut(), "gate should be shut"); failfast - .enter() + .entered() .now_or_never() .expect("timeout must return immediately when in failfast"); assert!(failfast.is_active(), "failfast should be active"); @@ -113,7 +113,7 @@ mod tests { tokio::select! { _ = time::sleep(time::Duration::from_millis(10)) => {} - _ = failfast.enter() => unreachable!("timed out too quick"), + _ = failfast.entered() => unreachable!("timed out too quick"), } assert!(!failfast.is_active(), "failfast should be inactive"); assert!(gate_rx.is_open(), "gate should be open"); @@ -123,7 +123,7 @@ mod tests { "failfast should be waiting" ); - failfast.enter().await; + failfast.entered().await; assert!(failfast.is_active(), "failfast should be active"); assert!(gate_rx.is_shut(), "gate should be shut"); } diff --git a/linkerd/proxy/pool/src/tests.rs b/linkerd/proxy/pool/src/tests.rs index 331a1914ab..ad66c865fd 100644 --- a/linkerd/proxy/pool/src/tests.rs +++ b/linkerd/proxy/pool/src/tests.rs @@ -89,7 +89,7 @@ async fn limits_request_capacity() { } #[tokio::test(flavor = "current_thread", start_paused = true)] -async fn waits_for_endpoints() { +async fn updates_while_pending() { let _trace = linkerd_tracing::test::with_default_filter("linkerd=trace"); let (pool, mut handle) = mock::pool::<(), (), ()>(); @@ -104,6 +104,7 @@ async fn waits_for_endpoints() { handle.svc.allow(0); assert!(poolq.ready().await.is_ok(), "poolq must be ready"); let call = poolq.call(()); + tokio::task::yield_now().await; updates .try_send(Ok(Update::Reset(vec![( @@ -171,6 +172,8 @@ async fn complete_resolution() { handle.svc.allow(1); assert!(poolq.ready().await.is_ok(), "poolq must be ready"); drop(updates); + tokio::task::yield_now().await; + let call = poolq.call(()); let ((), respond) = handle.svc.next_request().await.expect("request"); respond.send_response(()); diff --git a/linkerd/proxy/pool/src/worker.rs b/linkerd/proxy/pool/src/worker.rs index b83a94719a..9533dc548f 100644 --- a/linkerd/proxy/pool/src/worker.rs +++ b/linkerd/proxy/pool/src/worker.rs @@ -296,6 +296,9 @@ impl

PoolDriver

{ ); } Some(failfast::State::Failfast { since }) => { + // Note: It is exceptionally unlikely that we will exit + // failfast here, since the below `failfaast.entered()` + // will return immediately when in the failfast state. tracing::info!( elapsed = (time::Instant::now() - since).as_secs_f64(), "Available; exiting failfast" @@ -307,7 +310,7 @@ impl

PoolDriver

{ } } - () = self.failfast.enter() => { + () = self.failfast.entered() => { tracing::info!( timeout = self.failfast.duration().as_secs_f64(), "Unavailable; entering failfast", ); From 2ea8cfaa794b7fe83a251b123ccf6d6eb485ba33 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Sat, 2 Dec 2023 18:58:04 +0000 Subject: [PATCH 13/13] fixup mock semantics --- linkerd/proxy/pool/src/tests/mock.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/linkerd/proxy/pool/src/tests/mock.rs b/linkerd/proxy/pool/src/tests/mock.rs index 2eeb755fa3..17ba0a739a 100644 --- a/linkerd/proxy/pool/src/tests/mock.rs +++ b/linkerd/proxy/pool/src/tests/mock.rs @@ -71,8 +71,12 @@ impl linkerd_stack::Service for MockPool { type Future = mock::future::ResponseFuture; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - futures::ready!(crate::Pool::poll_pool(self, cx))?; - self.svc.poll_ready(cx) + if let Poll::Ready(res) = self.svc.poll_ready(cx) { + return Poll::Ready(res); + } + // Drive the pool when the service isn't ready. + let _ = crate::Pool::poll_pool(self, cx)?; + Poll::Pending } fn call(&mut self, req: Req) -> Self::Future {