From 4c15a89546cf940b2eef3a91e7f84a1acb06088a Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 28 Dec 2023 22:03:23 +0000 Subject: [PATCH] pool: Decompose the pool and balancer crates * Pool::update_pool is replaced with a narrower API: * Avoids need for unneeded allocations; * Slightly modifies the p2c endpoint update metrics, removing the DNE label. * Split up crates: * Move the pool trait to a minimal crate * Move the balancer's P2cPool types into a library crate * Rename NewBalancePeakEwma to NewBalance --- Cargo.lock | 89 ++++--- Cargo.toml | 4 +- linkerd/app/core/src/control.rs | 9 +- .../app/outbound/src/http/concrete/balance.rs | 2 +- linkerd/app/outbound/src/opaq/concrete.rs | 2 +- linkerd/pool/Cargo.toml | 9 + linkerd/pool/p2c/Cargo.toml | 34 +++ .../src/pool/p2c.rs => pool/p2c/src/lib.rs} | 242 +++++++----------- linkerd/pool/src/lib.rs | 24 ++ linkerd/proxy/balance/Cargo.toml | 26 +- .../proxy/{pool => balance/queue}/Cargo.toml | 14 +- .../{pool => balance/queue}/src/error.rs | 0 .../{pool => balance/queue}/src/failfast.rs | 0 .../{pool => balance/queue}/src/future.rs | 0 .../proxy/{pool => balance/queue}/src/lib.rs | 19 +- .../{pool => balance/queue}/src/message.rs | 0 .../{pool => balance/queue}/src/service.rs | 0 .../{pool => balance/queue}/src/tests.rs | 2 +- .../{pool => balance/queue}/src/tests/mock.rs | 25 +- .../{pool => balance/queue}/src/worker.rs | 38 ++- linkerd/proxy/balance/src/lib.rs | 206 ++++++++++++++- linkerd/proxy/balance/src/pool.rs | 198 -------------- linkerd/proxy/http/src/balance.rs | 4 +- linkerd/proxy/http/src/lib.rs | 2 +- linkerd/proxy/tcp/src/balance.rs | 4 +- linkerd/proxy/tcp/src/lib.rs | 2 +- 26 files changed, 494 insertions(+), 461 deletions(-) create mode 100644 linkerd/pool/Cargo.toml create mode 100644 linkerd/pool/p2c/Cargo.toml rename linkerd/{proxy/balance/src/pool/p2c.rs => pool/p2c/src/lib.rs} (77%) create mode 100644 linkerd/pool/src/lib.rs rename linkerd/proxy/{pool => balance/queue}/Cargo.toml (62%) rename linkerd/proxy/{pool => balance/queue}/src/error.rs (100%) rename linkerd/proxy/{pool => balance/queue}/src/failfast.rs (100%) rename linkerd/proxy/{pool => balance/queue}/src/future.rs (100%) rename linkerd/proxy/{pool => balance/queue}/src/lib.rs (81%) rename linkerd/proxy/{pool => balance/queue}/src/message.rs (100%) rename linkerd/proxy/{pool => balance/queue}/src/service.rs (100%) rename linkerd/proxy/{pool => balance/queue}/src/tests.rs (99%) rename linkerd/proxy/{pool => balance/queue}/src/tests/mock.rs (80%) rename linkerd/proxy/{pool => balance/queue}/src/worker.rs (89%) delete mode 100644 linkerd/proxy/balance/src/pool.rs diff --git a/Cargo.lock b/Cargo.lock index 0d202fdc82..ff41772fde 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -653,17 +653,6 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" -[[package]] -name = "futures-macro" -version = "0.3.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.39", -] - [[package]] name = "futures-sink" version = "0.3.28" @@ -685,7 +674,6 @@ dependencies = [ "futures-channel", "futures-core", "futures-io", - "futures-macro", "futures-sink", "futures-task", "memchr", @@ -1670,6 +1658,37 @@ dependencies = [ "tracing", ] +[[package]] +name = "linkerd-pool" +version = "0.1.0" +dependencies = [ + "tower-service", +] + +[[package]] +name = "linkerd-pool-p2c" +version = "0.1.0" +dependencies = [ + "ahash", + "futures", + "futures-util", + "indexmap", + "linkerd-error", + "linkerd-metrics", + "linkerd-pool", + "linkerd-stack", + "linkerd-tracing", + "parking_lot", + "prometheus-client", + "quickcheck", + "rand", + "tokio", + "tokio-test", + "tower", + "tower-test", + "tracing", +] + [[package]] name = "linkerd-proxy-api-resolve" version = "0.1.0" @@ -1697,25 +1716,39 @@ version = "0.1.0" dependencies = [ "ahash", "futures", - "futures-util", "indexmap", "linkerd-error", "linkerd-metrics", + "linkerd-pool-p2c", + "linkerd-proxy-balance-queue", + "linkerd-proxy-core", + "linkerd-stack", + "prometheus-client", + "rand", + "tokio", + "tower", + "tracing", +] + +[[package]] +name = "linkerd-proxy-balance-queue" +version = "0.1.0" +dependencies = [ + "futures", + "linkerd-error", + "linkerd-metrics", + "linkerd-pool", "linkerd-proxy-core", - "linkerd-proxy-pool", "linkerd-stack", "linkerd-tracing", "parking_lot", "pin-project", "prometheus-client", - "quickcheck", - "rand", "thiserror", "tokio", "tokio-stream", "tokio-test", "tokio-util", - "tower", "tower-test", "tracing", ] @@ -1820,28 +1853,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "linkerd-proxy-pool" -version = "0.1.0" -dependencies = [ - "futures", - "linkerd-error", - "linkerd-metrics", - "linkerd-proxy-core", - "linkerd-stack", - "linkerd-tracing", - "parking_lot", - "pin-project", - "prometheus-client", - "thiserror", - "tokio", - "tokio-stream", - "tokio-test", - "tokio-util", - "tower-test", - "tracing", -] - [[package]] name = "linkerd-proxy-resolve" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index bb3eb79313..5dee1534d6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,14 +39,16 @@ members = [ "linkerd/meshtls/verifier", "linkerd/metrics", "linkerd/opencensus", + "linkerd/pool", + "linkerd/pool/p2c", "linkerd/proxy/api-resolve", "linkerd/proxy/balance", + "linkerd/proxy/balance/queue", "linkerd/proxy/client-policy", "linkerd/proxy/core", "linkerd/proxy/dns-resolve", "linkerd/proxy/http", "linkerd/proxy/identity-client", - "linkerd/proxy/pool", "linkerd/proxy/resolve", "linkerd/proxy/server-policy", "linkerd/proxy/tap", diff --git a/linkerd/app/core/src/control.rs b/linkerd/app/core/src/control.rs index 28bb4fec85..f104d1c455 100644 --- a/linkerd/app/core/src/control.rs +++ b/linkerd/app/core/src/control.rs @@ -242,17 +242,12 @@ mod balance { recover: R, ) -> impl svc::Layer< N, - Service = http::NewBalancePeakEwma< - B, - Params, - recover::Resolve, - NewIntoTarget, - >, + Service = http::NewBalance, NewIntoTarget>, > { let resolve = recover::Resolve::new(recover, DnsResolve::new(dns)); let metrics = Params(http::balance::MetricFamilies::register(registry)); svc::layer::mk(move |inner| { - http::NewBalancePeakEwma::new(NewIntoTarget { inner }, resolve.clone(), metrics.clone()) + http::NewBalance::new(NewIntoTarget { inner }, resolve.clone(), metrics.clone()) }) } diff --git a/linkerd/app/outbound/src/http/concrete/balance.rs b/linkerd/app/outbound/src/http/concrete/balance.rs index 8f78cffd77..cd3affda48 100644 --- a/linkerd/app/outbound/src/http/concrete/balance.rs +++ b/linkerd/app/outbound/src/http/concrete/balance.rs @@ -155,7 +155,7 @@ where .push(svc::ArcNewService::layer()); endpoint - .push(http::NewBalancePeakEwma::layer( + .push(http::NewBalance::layer( resolve.clone(), metrics_params.clone(), )) diff --git a/linkerd/app/outbound/src/opaq/concrete.rs b/linkerd/app/outbound/src/opaq/concrete.rs index 58c18fa367..ac441578cc 100644 --- a/linkerd/app/outbound/src/opaq/concrete.rs +++ b/linkerd/app/outbound/src/opaq/concrete.rs @@ -174,7 +174,7 @@ impl Outbound { }, ) .lift_new_with_target() - .push(tcp::NewBalancePeakEwma::layer(resolve, metrics_params)) + .push(tcp::NewBalance::layer(resolve, metrics_params)) .push(svc::NewMapErr::layer_from_target::()) .push_on_service( rt.metrics diff --git a/linkerd/pool/Cargo.toml b/linkerd/pool/Cargo.toml new file mode 100644 index 0000000000..94309b472d --- /dev/null +++ b/linkerd/pool/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "linkerd-pool" +version = "0.1.0" +license = "Apache-2.0" +edition = "2021" +publish = false + +[dependencies] +tower-service = "0.3" diff --git a/linkerd/pool/p2c/Cargo.toml b/linkerd/pool/p2c/Cargo.toml new file mode 100644 index 0000000000..8172810c6f --- /dev/null +++ b/linkerd/pool/p2c/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "linkerd-pool-p2c" +version = "0.1.0" +authors = ["Linkerd Developers "] +license = "Apache-2.0" +edition = "2021" +publish = false + +[dependencies] +ahash = "0.8" +futures = { version = "0.3", default-features = false } +indexmap = "1" +prometheus-client = "0.22" +rand = { version = "0.8", features = ["small_rng"] } +tokio = { version = "1", features = ["rt", "sync", "time"] } +tracing = "0.1" + +linkerd-error = { path = "../../error" } +linkerd-metrics = { path = "../../metrics" } +linkerd-pool = { path = ".." } +linkerd-stack = { path = "../../stack" } + +[dependencies.tower] +version = "0.4.13" +default-features = false +features = ["load", "ready-cache"] + +[dev-dependencies] +futures-util = { version = "0.3", default-features = false } +linkerd-tracing = { path = "../../tracing" } +parking_lot = "0.12" +quickcheck = { version = "1", default-features = false } +tokio-test = "0.4" +tower-test = "0.4" diff --git a/linkerd/proxy/balance/src/pool/p2c.rs b/linkerd/pool/p2c/src/lib.rs similarity index 77% rename from linkerd/proxy/balance/src/pool/p2c.rs rename to linkerd/pool/p2c/src/lib.rs index 101b35e6c2..3b5a8b854a 100644 --- a/linkerd/proxy/balance/src/pool/p2c.rs +++ b/linkerd/pool/p2c/src/lib.rs @@ -2,11 +2,14 @@ //! // Based on tower::p2c::Balance. Copyright (c) 2019 Tower Contributors -use super::{Pool, Update}; +#![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)] +#![forbid(unsafe_code)] + use ahash::AHashMap; -use futures_util::TryFutureExt; +use futures::prelude::*; use linkerd_error::Error; use linkerd_metrics::prom; +use linkerd_pool::Pool; use linkerd_stack::{NewService, Service}; use rand::{rngs::SmallRng, thread_rng, Rng, SeedableRng}; use std::{ @@ -49,10 +52,6 @@ pub struct P2cMetrics { /// Measures the number of Remove updates received from service discovery. updates_rm: prom::Counter, - - /// Measures the number of DoesNotExist updates received from service - /// discovery. - updates_dne: prom::Counter, } #[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)] @@ -62,11 +61,10 @@ pub struct UpdateLabels { } #[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, prom::encoding::EncodeLabelValue)] -pub enum UpdateOp { +enum UpdateOp { Reset, Add, Remove, - DoesNotExist, } impl P2cPool @@ -89,102 +87,6 @@ where } } - /// Resets the pool to include the given targets without unnecessarily - /// rebuilding inner services. - /// - /// Returns true if the pool was changed. - fn reset(&mut self, targets: Vec<(SocketAddr, T)>) -> bool { - let mut changed = false; - let mut remaining = std::mem::take(&mut self.endpoints); - for (addr, target) in targets.into_iter() { - let t = remaining.remove(&addr); - if t.as_ref() == Some(&target) { - tracing::debug!(?addr, "Endpoint unchanged"); - } else { - if t.is_none() { - tracing::info!(?addr, "Adding endpoint"); - self.metrics.endpoints.inc(); - } else { - tracing::info!(?addr, "Updating endpoint"); - } - - let svc = self.new_endpoint.new_service((addr, target.clone())); - self.pool.push(addr, svc); - changed = true; - } - - self.endpoints.insert(addr, target); - } - - for (addr, _) in remaining.drain() { - tracing::info!(?addr, "Removing endpoint"); - self.pool.evict(&addr); - self.metrics.endpoints.dec(); - changed = true; - } - - changed - } - - /// Adds endpoints to the pool without unnecessarily rebuilding inner - /// services. - /// - /// Returns true if the pool was changed. - fn add(&mut self, targets: Vec<(SocketAddr, T)>) -> bool { - let mut changed = false; - for (addr, target) in targets.into_iter() { - match self.endpoints.entry(addr) { - Entry::Occupied(e) if e.get() == &target => { - tracing::debug!(?addr, "Endpoint unchanged"); - continue; - } - Entry::Occupied(mut e) => { - e.insert(target.clone()); - } - Entry::Vacant(e) => { - e.insert(target.clone()); - self.metrics.endpoints.inc(); - } - } - tracing::info!(?addr, "Adding endpoint"); - let svc = self.new_endpoint.new_service((addr, target)); - self.pool.push(addr, svc); - changed = true; - } - changed - } - - /// Removes endpoint services. - /// - /// Returns true if the pool was changed. - fn remove(&mut self, addrs: Vec) -> bool { - let mut changed = false; - for addr in addrs.into_iter() { - if self.endpoints.remove(&addr).is_some() { - tracing::info!(?addr, "Removing endpoint"); - self.pool.evict(&addr); - self.metrics.endpoints.dec(); - changed = true; - } else { - tracing::debug!(?addr, "Unknown endpoint"); - } - } - changed - } - - /// Clear all endpoints from the pool. - /// - /// Returns true if the pool was changed. - fn clear(&mut self) -> bool { - let changed = !self.endpoints.is_empty(); - for (addr, _) in self.endpoints.drain() { - tracing::info!(?addr, "Removing endpoint"); - self.pool.evict(&addr); - self.metrics.endpoints.dec(); - } - changed - } - fn p2c_ready_index(&mut self) -> Option { match self.pool.ready_len() { 0 => None, @@ -236,20 +138,76 @@ where S::Future: Send + 'static, S::Metric: std::fmt::Debug, { - fn update_pool(&mut self, update: Update) { - tracing::trace!(?update); - self.metrics.inc(&update); - let changed = match update { - Update::Reset(targets) => self.reset(targets), - Update::Add(targets) => self.add(targets), - Update::Remove(addrs) => self.remove(addrs), - Update::DoesNotExist => self.clear(), - }; + fn reset_pool(&mut self, update: Vec<(SocketAddr, T)>) { + let mut changed = false; + let mut remaining = std::mem::take(&mut self.endpoints); + for (addr, target) in update.into_iter() { + let t = remaining.remove(&addr); + if t.as_ref() == Some(&target) { + tracing::debug!(?addr, "Endpoint unchanged"); + } else { + if t.is_none() { + tracing::info!(?addr, "Adding endpoint"); + self.metrics.endpoints.inc(); + } else { + tracing::info!(?addr, "Updating endpoint"); + } + + let svc = self.new_endpoint.new_service((addr, target.clone())); + self.pool.push(addr, svc); + changed = true; + } + + self.endpoints.insert(addr, target); + } + + for (addr, _) in remaining.drain() { + tracing::info!(?addr, "Removing endpoint"); + self.pool.evict(&addr); + self.metrics.endpoints.dec(); + changed = true; + } + if changed { + self.metrics.updates_reset.inc(); self.next_idx = None; } } + fn add_endpoint(&mut self, addr: SocketAddr, target: T) { + match self.endpoints.entry(addr) { + Entry::Occupied(e) if e.get() == &target => { + tracing::debug!(?addr, "Endpoint unchanged"); + return; + } + Entry::Occupied(mut e) => { + e.insert(target.clone()); + } + Entry::Vacant(e) => { + e.insert(target.clone()); + self.metrics.endpoints.inc(); + } + } + + tracing::info!(?addr, "Adding endpoint"); + let svc = self.new_endpoint.new_service((addr, target)); + self.pool.push(addr, svc); + self.metrics.updates_add.inc(); + } + + fn remove_endpoint(&mut self, addr: SocketAddr) { + if self.endpoints.remove(&addr).is_none() { + tracing::debug!(?addr, "Unknown endpoint"); + return; + } + + tracing::info!(?addr, "Removing endpoint"); + self.pool.evict(&addr); + self.metrics.endpoints.dec(); + self.metrics.updates_rm.inc(); + self.next_idx = None; + } + /// Moves pending endpoints to ready. /// /// This must be called from the same task that invokes Service::poll_ready. @@ -281,8 +239,8 @@ where /// used to select one. /// /// NOTE that this may return `Pending` when there are no endpoints. In such - /// cases, the caller must invoke `update_pool` and then wait for new - /// endpoints to become ready. + /// cases, the caller must add endpoints and then wait for new endpoints to + /// become ready. fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { loop { tracing::trace!(pending = self.pool.pending_len(), "Polling pending"); @@ -365,37 +323,17 @@ where labels: labels.clone(), }) .clone(); - let updates_dne: prom::Counter = self - .updates - .get_or_create(&UpdateLabels { - op: UpdateOp::DoesNotExist, - labels: labels.clone(), - }) - .clone(); P2cMetrics { endpoints, updates_reset, updates_add, updates_rm, - updates_dne, } } } // === impl P2cMetrics === -impl P2cMetrics { - fn inc(&self, up: &Update) { - match up { - Update::Reset(..) => &self.updates_reset, - Update::Add(..) => &self.updates_add, - Update::Remove(..) => &self.updates_rm, - Update::DoesNotExist { .. } => &self.updates_dne, - } - .inc(); - } -} - impl prom::encoding::EncodeLabelSet for UpdateLabels { fn encode(&self, mut enc: prom::encoding::LabelSetEncoder<'_>) -> std::fmt::Result { use prom::encoding::EncodeLabel; @@ -408,7 +346,6 @@ impl prom::encoding::EncodeLabelSet for Updat mod tests { use super::*; use ahash::HashSet; - use futures::prelude::*; use linkerd_stack::ServiceExt; use parking_lot::Mutex; use std::sync::Arc; @@ -448,67 +385,66 @@ mod tests { ) }); - pool.update_pool(Update::Reset(vec![(addr0, 0)])); + pool.reset_pool(vec![(addr0, 0)]); assert_eq!(pool.endpoints.len(), 1); assert_eq!(metrics.endpoints.get(), pool.endpoints.len() as i64); assert_eq!(pool.endpoints.get(&addr0), Some(&0)); - pool.update_pool(Update::Add(vec![(addr0, 1)])); + pool.add_endpoint(addr0, 1); assert_eq!(pool.endpoints.len(), 1); assert_eq!(metrics.endpoints.get(), pool.endpoints.len() as i64); assert_eq!(pool.endpoints.get(&addr0), Some(&1)); - pool.update_pool(Update::Reset(vec![(addr0, 1)])); + pool.reset_pool(vec![(addr0, 1)]); assert_eq!(pool.endpoints.len(), 1); assert_eq!(metrics.endpoints.get(), pool.endpoints.len() as i64); assert_eq!(pool.endpoints.get(&addr0), Some(&1)); - pool.update_pool(Update::Add(vec![(addr1, 1)])); + pool.add_endpoint(addr1, 1); assert_eq!(pool.endpoints.len(), 2); assert_eq!(metrics.endpoints.get(), pool.endpoints.len() as i64); assert_eq!(pool.endpoints.get(&addr1), Some(&1)); - pool.update_pool(Update::Add(vec![(addr1, 1)])); + pool.add_endpoint(addr1, 1); assert_eq!(pool.endpoints.len(), 2); assert_eq!(metrics.endpoints.get(), pool.endpoints.len() as i64); assert_eq!(pool.endpoints.get(&addr1), Some(&1)); - pool.update_pool(Update::Remove(vec![addr0])); + pool.remove_endpoint(addr0); assert_eq!(pool.endpoints.len(), 1); assert_eq!(metrics.endpoints.get(), pool.endpoints.len() as i64); - pool.update_pool(Update::Remove(vec![addr0])); + pool.remove_endpoint(addr0); assert_eq!(pool.endpoints.len(), 1); assert_eq!(metrics.endpoints.get(), pool.endpoints.len() as i64); - pool.update_pool(Update::Reset(vec![(addr0, 2), (addr1, 2)])); + pool.reset_pool(vec![(addr0, 2), (addr1, 2)]); assert_eq!(pool.endpoints.len(), 2); assert_eq!(metrics.endpoints.get(), pool.endpoints.len() as i64); assert_eq!(pool.endpoints.get(&addr0), Some(&2)); assert_eq!(pool.endpoints.get(&addr1), Some(&2)); - pool.update_pool(Update::Reset(vec![(addr0, 2)])); + pool.reset_pool(vec![(addr0, 2)]); assert_eq!(pool.endpoints.len(), 1); assert_eq!(metrics.endpoints.get(), pool.endpoints.len() as i64); assert_eq!(pool.endpoints.get(&addr0), Some(&2)); - pool.update_pool(Update::Reset(vec![(addr0, 3)])); + pool.reset_pool(vec![(addr0, 3)]); assert_eq!(pool.endpoints.len(), 1); assert_eq!(metrics.endpoints.get(), pool.endpoints.len() as i64); assert_eq!(pool.endpoints.get(&addr0), Some(&3)); - pool.update_pool(Update::DoesNotExist); + pool.reset_pool(vec![]); assert_eq!(pool.endpoints.len(), 0); assert_eq!(metrics.endpoints.get(), pool.endpoints.len() as i64); - pool.update_pool(Update::DoesNotExist); + pool.reset_pool(vec![]); assert_eq!(pool.endpoints.len(), 0); assert_eq!(metrics.endpoints.get(), pool.endpoints.len() as i64); assert_eq!(metrics.updates_reset.get(), 5); - assert_eq!(metrics.updates_add.get(), 3); - assert_eq!(metrics.updates_rm.get(), 2); - assert_eq!(metrics.updates_dne.get(), 2); + assert_eq!(metrics.updates_add.get(), 2); + assert_eq!(metrics.updates_rm.get(), 1); } #[tokio::test(flavor = "current_thread", start_paused = true)] @@ -548,7 +484,7 @@ mod tests { assert!(pool.ready().now_or_never().is_none()); assert!(pool.next_idx.is_none()); - pool.update_pool(Update::Reset(vec![(addr0, ())])); + pool.reset_pool(vec![(addr0, ())]); assert!(pool.ready().now_or_never().is_none()); assert!(pool.next_idx.is_none()); @@ -558,7 +494,7 @@ mod tests { h1.allow(1); h2.allow(1); - pool.update_pool(Update::Reset(vec![(addr0, ()), (addr1, ()), (addr2, ())])); + pool.reset_pool(vec![(addr0, ()), (addr1, ()), (addr2, ())]); assert!(pool.next_idx.is_none()); assert!(pool.ready().now_or_never().is_some()); assert!(pool.next_idx.is_some()); diff --git a/linkerd/pool/src/lib.rs b/linkerd/pool/src/lib.rs new file mode 100644 index 0000000000..91d43ec3bf --- /dev/null +++ b/linkerd/pool/src/lib.rs @@ -0,0 +1,24 @@ +#![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)] +#![forbid(unsafe_code)] + +use std::{ + net::SocketAddr, + task::{Context, Poll}, +}; + +/// A collection of services updated from a resolution. +pub trait Pool: tower_service::Service { + fn reset_pool(&mut self, update: Vec<(SocketAddr, T)>); + + fn add_endpoint(&mut self, addr: SocketAddr, endpoint: T); + + fn remove_endpoint(&mut self, addr: SocketAddr); + + /// Polls to update the pool while the Service is ready. + /// + /// [`Service::poll_ready`] should do the same work, but will return ready + /// as soon as there at least one ready endpoint. This method will continue + /// to drive the pool until ready is returned (indicating that the pool need + /// not be updated before another request is processed). + fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll>; +} diff --git a/linkerd/proxy/balance/Cargo.toml b/linkerd/proxy/balance/Cargo.toml index 9202b4926b..e787a847f2 100644 --- a/linkerd/proxy/balance/Cargo.toml +++ b/linkerd/proxy/balance/Cargo.toml @@ -8,30 +8,20 @@ publish = false [dependencies] ahash = "0.8" futures = { version = "0.3", default-features = false } -futures-util = "0.3" indexmap = "1" -linkerd-error = { path = "../../error" } -linkerd-metrics = { path = "../../metrics" } -linkerd-proxy-core = { path = "../core" } -linkerd-proxy-pool = { path = "../pool" } -linkerd-stack = { path = "../../stack" } -parking_lot = "0.12" -pin-project = "1" prometheus-client = "0.22" rand = "0.8" -thiserror = "1" tokio = { version = "1", features = ["rt", "sync", "time"] } -tokio-stream = { version = "0.1", features = ["sync"] } -tokio-util = "0.7" tracing = "0.1" +linkerd-error = { path = "../../error" } +linkerd-metrics = { path = "../../metrics" } +linkerd-pool-p2c = { path = "../../pool/p2c" } +linkerd-proxy-core = { path = "../core" } +linkerd-proxy-balance-queue = { path = "queue" } +linkerd-stack = { path = "../../stack" } + [dependencies.tower] version = "0.4.13" default-features = false -features = ["balance", "load", "ready-cache"] - -[dev-dependencies] -linkerd-tracing = { path = "../../tracing" } -tokio-test = "0.4" -tower-test = "0.4" -quickcheck = { version = "1", default-features = false } +features = ["load"] diff --git a/linkerd/proxy/pool/Cargo.toml b/linkerd/proxy/balance/queue/Cargo.toml similarity index 62% rename from linkerd/proxy/pool/Cargo.toml rename to linkerd/proxy/balance/queue/Cargo.toml index 52f7360cff..698654969f 100644 --- a/linkerd/proxy/pool/Cargo.toml +++ b/linkerd/proxy/balance/queue/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "linkerd-proxy-pool" +name = "linkerd-proxy-balance-queue" version = "0.1.0" authors = ["Linkerd Developers "] license = "Apache-2.0" @@ -8,10 +8,6 @@ publish = false [dependencies] futures = { version = "0.3", default-features = false } -linkerd-error = { path = "../../error" } -linkerd-metrics = { path = "../../metrics" } -linkerd-proxy-core = { path = "../core" } -linkerd-stack = { path = "../../stack" } parking_lot = "0.12" pin-project = "1" prometheus-client = "0.22" @@ -20,8 +16,14 @@ tokio = { version = "1", features = ["rt", "sync", "time"] } tokio-util = "0.7" tracing = "0.1" +linkerd-error = { path = "../../../error" } +linkerd-metrics = { path = "../../../metrics" } +linkerd-proxy-core = { path = "../../core" } +linkerd-pool = { path = "../../../pool" } +linkerd-stack = { path = "../../../stack" } + [dev-dependencies] -linkerd-tracing = { path = "../../tracing" } +linkerd-tracing = { path = "../../../tracing" } tokio-stream = { version = "0.1", features = ["sync"] } tokio-test = "0.4" tower-test = "0.4" diff --git a/linkerd/proxy/pool/src/error.rs b/linkerd/proxy/balance/queue/src/error.rs similarity index 100% rename from linkerd/proxy/pool/src/error.rs rename to linkerd/proxy/balance/queue/src/error.rs diff --git a/linkerd/proxy/pool/src/failfast.rs b/linkerd/proxy/balance/queue/src/failfast.rs similarity index 100% rename from linkerd/proxy/pool/src/failfast.rs rename to linkerd/proxy/balance/queue/src/failfast.rs diff --git a/linkerd/proxy/pool/src/future.rs b/linkerd/proxy/balance/queue/src/future.rs similarity index 100% rename from linkerd/proxy/pool/src/future.rs rename to linkerd/proxy/balance/queue/src/future.rs diff --git a/linkerd/proxy/pool/src/lib.rs b/linkerd/proxy/balance/queue/src/lib.rs similarity index 81% rename from linkerd/proxy/pool/src/lib.rs rename to linkerd/proxy/balance/queue/src/lib.rs index 8f52b51655..f1fd7cfdb1 100644 --- a/linkerd/proxy/pool/src/lib.rs +++ b/linkerd/proxy/balance/queue/src/lib.rs @@ -8,7 +8,6 @@ #![forbid(unsafe_code)] use linkerd_metrics::prom; -use linkerd_stack::Service; mod error; mod failfast; @@ -20,27 +19,11 @@ mod tests; mod worker; pub use self::service::PoolQueue; +pub use linkerd_pool::Pool; pub use linkerd_proxy_core::Update; use self::failfast::{GateMetricFamilies, GateMetrics}; -/// A collection of services updated from a resolution. -pub trait Pool: Service { - /// Updates the pool's endpoints. - fn update_pool(&mut self, update: Update); - - /// Polls to update the pool while the Service is ready. - /// - /// [`Service::poll_ready`] should do the same work, but will return ready - /// as soon as there at least one ready endpoint. This method will continue - /// to drive the pool until ready is returned (indicating that the pool need - /// not be updated before another request is processed). - fn poll_pool( - &mut self, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll>; -} - #[derive(Clone, Debug)] pub struct QueueMetricFamilies { length: prom::Family, diff --git a/linkerd/proxy/pool/src/message.rs b/linkerd/proxy/balance/queue/src/message.rs similarity index 100% rename from linkerd/proxy/pool/src/message.rs rename to linkerd/proxy/balance/queue/src/message.rs diff --git a/linkerd/proxy/pool/src/service.rs b/linkerd/proxy/balance/queue/src/service.rs similarity index 100% rename from linkerd/proxy/pool/src/service.rs rename to linkerd/proxy/balance/queue/src/service.rs diff --git a/linkerd/proxy/pool/src/tests.rs b/linkerd/proxy/balance/queue/src/tests.rs similarity index 99% rename from linkerd/proxy/pool/src/tests.rs rename to linkerd/proxy/balance/queue/src/tests.rs index 2d6cb53876..7a3d5ee7fa 100644 --- a/linkerd/proxy/pool/src/tests.rs +++ b/linkerd/proxy/balance/queue/src/tests.rs @@ -155,7 +155,7 @@ async fn updates_while_idle() { tokio::task::yield_now().await; assert_eq!( handle.rx.try_recv().expect("must receive update"), - Update::Reset(vec![("192.168.1.44:80".parse().unwrap(), (),)]) + mock::Change::Reset(vec![("192.168.1.44:80".parse().unwrap(), (),)]) ); } diff --git a/linkerd/proxy/pool/src/tests/mock.rs b/linkerd/proxy/balance/queue/src/tests/mock.rs similarity index 80% rename from linkerd/proxy/pool/src/tests/mock.rs rename to linkerd/proxy/balance/queue/src/tests/mock.rs index 17ba0a739a..46d7f2eff8 100644 --- a/linkerd/proxy/pool/src/tests/mock.rs +++ b/linkerd/proxy/balance/queue/src/tests/mock.rs @@ -1,6 +1,6 @@ use linkerd_error::Error; -use linkerd_proxy_core::Update; use parking_lot::Mutex; +use std::net::SocketAddr; use std::{ sync::Arc, task::{Context, Poll, Waker}, @@ -29,17 +29,24 @@ pub fn pool() -> (MockPool, PoolHandle) { } pub struct MockPool { - tx: mpsc::UnboundedSender>, + tx: mpsc::UnboundedSender>, state: Arc>, svc: mock::Mock, } pub struct PoolHandle { state: Arc>, - pub rx: mpsc::UnboundedReceiver>, + pub rx: mpsc::UnboundedReceiver>, pub svc: mock::Handle, } +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum Change { + Add(SocketAddr, T), + Remove(SocketAddr), + Reset(Vec<(SocketAddr, T)>), +} + struct State { poll: Poll>, waker: Option, @@ -54,8 +61,16 @@ pub struct PoolError; pub struct ResolutionError; impl crate::Pool for MockPool { - fn update_pool(&mut self, update: Update) { - self.tx.send(update).ok().unwrap(); + fn reset_pool(&mut self, update: Vec<(SocketAddr, T)>) { + let _ = self.tx.send(Change::Reset(update)); + } + + fn add_endpoint(&mut self, addr: SocketAddr, endpoint: T) { + let _ = self.tx.send(Change::Add(addr, endpoint)); + } + + fn remove_endpoint(&mut self, addr: SocketAddr) { + let _ = self.tx.send(Change::Remove(addr)); } fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll> { diff --git a/linkerd/proxy/pool/src/worker.rs b/linkerd/proxy/balance/queue/src/worker.rs similarity index 89% rename from linkerd/proxy/pool/src/worker.rs rename to linkerd/proxy/balance/queue/src/worker.rs index 4ac0da7386..6d67044a8a 100644 --- a/linkerd/proxy/pool/src/worker.rs +++ b/linkerd/proxy/balance/queue/src/worker.rs @@ -164,7 +164,24 @@ where }; tracing::debug!(?update, "Discovered"); - self.pool.pool.update_pool(update); + match update { + Update::Reset(eps) => { + self.pool.pool.reset_pool(eps); + } + Update::Add(eps) => { + for (addr, ep) in eps.into_iter() { + self.pool.pool.add_endpoint(addr, ep); + } + } + Update::Remove(addrs) => { + for addr in addrs.into_iter() { + self.pool.pool.remove_endpoint(addr); + } + } + Update::DoesNotExist => { + self.pool.pool.reset_pool(vec![]); + } + } } } @@ -185,7 +202,24 @@ where }; tracing::debug!(?update, "Discovered"); - self.pool.pool.update_pool(update); + match update { + Update::Reset(eps) => { + self.pool.pool.reset_pool(eps); + } + Update::Add(eps) => { + for (addr, ep) in eps.into_iter() { + self.pool.pool.add_endpoint(addr, ep); + } + } + Update::Remove(addrs) => { + for addr in addrs.into_iter() { + self.pool.pool.remove_endpoint(addr); + } + } + Update::DoesNotExist => { + self.pool.pool.reset_pool(vec![]); + } + } } } } diff --git a/linkerd/proxy/balance/src/lib.rs b/linkerd/proxy/balance/src/lib.rs index 3c51464757..b3d3095fdb 100644 --- a/linkerd/proxy/balance/src/lib.rs +++ b/linkerd/proxy/balance/src/lib.rs @@ -1,10 +1,23 @@ +#![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)] +#![forbid(unsafe_code)] + +use futures::prelude::*; +use linkerd_error::Error; +use linkerd_metrics::prom; +use linkerd_pool_p2c::{P2cMetricFamilies, P2cMetrics, P2cPool}; +use linkerd_proxy_balance_queue::PoolQueue; +use linkerd_proxy_core::Resolve; +use linkerd_stack::{layer, queue, ExtractParam, Gate, NewService, Param, Service}; +use std::{fmt::Debug, marker::PhantomData, net::SocketAddr}; +use tokio::time; +use tower::load::{self, PeakEwma}; + +// TODO(ver) Endpoint gauges should be pulled up into this module once it's +// updated to use the new prometheus registry. mod gauge_endpoints; -mod pool; -pub use self::{ - gauge_endpoints::{EndpointsGauges, NewGaugeEndpoints}, - pool::*, -}; +pub use self::gauge_endpoints::{EndpointsGauges, NewGaugeEndpoints}; +pub use linkerd_proxy_balance_queue::{Pool, QueueMetricFamilies, QueueMetrics, Update}; pub use tower::load::peak_ewma; #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] @@ -12,3 +25,186 @@ pub struct EwmaConfig { pub default_rtt: std::time::Duration, pub decay: std::time::Duration, } + +#[derive(Clone, Debug)] +pub struct MetricFamilies { + queue: QueueMetricFamilies, + p2c: P2cMetricFamilies, +} + +#[derive(Clone, Debug)] +pub struct Metrics { + queue: QueueMetrics, + p2c: P2cMetrics, +} + +/// Configures a stack to resolve targets to balance requests over `N`-typed +/// endpoint stacks. +#[derive(Debug)] +pub struct NewBalance { + resolve: R, + inner: N, + params: X, + _marker: PhantomData C>, +} + +pub type Balance = Gate>; + +/// Wraps the inner services in [`PeakEwma`] services so their load is tracked +/// for the p2c balancer. +#[derive(Debug)] +struct NewPeakEwma { + config: EwmaConfig, + inner: N, + _marker: PhantomData C>, +} + +// === impl NewBalance === + +impl NewBalance { + pub fn new(inner: N, resolve: R, params: X) -> Self { + Self { + resolve, + inner, + params, + _marker: PhantomData, + } + } + + pub fn layer(resolve: R, params: X) -> impl layer::Layer + Clone + where + R: Clone, + X: Clone, + Self: NewService, + { + layer::mk(move |inner| Self::new(inner, resolve.clone(), params.clone())) + } +} + +impl NewService for NewBalance +where + T: Param + Param + Param + Clone + Send, + X: ExtractParam, + R: Resolve, + R::Resolution: Unpin, + R::Error: Send, + M: NewService + Clone, + N: NewService<(SocketAddr, R::Endpoint), Service = S> + Send + 'static, + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Error: Into, + C: load::TrackCompletion + Default + Send + 'static, + Req: Send + 'static, + Balance as Service>::Future, Error>>: Service, +{ + type Service = Balance as Service>::Future, Error>>; + + fn new_service(&self, target: T) -> Self::Service { + // Initialize a resolution stream to discover endpoint updates. This + // stream should be effectively inifite (and, i.e., handle errors + // gracefully). + // + // If the resolution stream ends, the balancer will simply stop + // processing endpoint updates. + // + // If the resolution stream fails, the balancer will return an error. + let disco = self.resolve.resolve(target.clone()).try_flatten_stream(); + tracing::debug!("Resolving"); + + let queue::Capacity(capacity) = target.param(); + let queue::Timeout(failfast) = target.param(); + let metrics = self.params.extract_param(&target); + + // The pool wraps the inner endpoint stack so that its inner ready cache + // can be updated without requiring the service to process requests. + let pool = { + let ewma = target.param(); + tracing::debug!(?ewma); + let new_endpoint = self.inner.new_service(target); + P2cPool::new(metrics.p2c, NewPeakEwma::new(ewma, new_endpoint)) + }; + + // The queue runs on a dedicated task, owning the resolution stream and + // all of the inner endpoint services. A cloneable Service is returned + // that allows passing requests to the service. When all clones of the + // service are dropped, the queue task completes, dropping the + // resolution and all inner services. + tracing::debug!(capacity, ?failfast, "Spawning p2c pool queue"); + PoolQueue::spawn(capacity, failfast, metrics.queue, disco, pool) + } +} + +impl Clone for NewBalance { + fn clone(&self) -> Self { + Self { + resolve: self.resolve.clone(), + inner: self.inner.clone(), + params: self.params.clone(), + _marker: self._marker, + } + } +} + +// === impl NewPeakEwma === + +impl NewPeakEwma { + fn new(config: EwmaConfig, inner: N) -> Self { + Self { + config, + inner, + _marker: PhantomData, + } + } +} + +impl NewService for NewPeakEwma +where + C: load::TrackCompletion + Default, + N: NewService, + S: Service, +{ + type Service = PeakEwma; + + fn new_service(&self, target: T) -> Self::Service { + // Converts durations to nanos in f64. + // + // Due to a lossy transformation, the maximum value that can be + // represented is ~585 years, which, I hope, is more than enough to + // represent request latencies. + fn nanos(d: time::Duration) -> f64 { + const NANOS_PER_SEC: u64 = 1_000_000_000; + let n = f64::from(d.subsec_nanos()); + let s = d.as_secs().saturating_mul(NANOS_PER_SEC) as f64; + n + s + } + + PeakEwma::new( + self.inner.new_service(target), + self.config.default_rtt, + nanos(self.config.decay), + C::default(), + ) + } +} + +// === impl MetricFamilies === + +impl MetricFamilies +where + L: prom::encoding::EncodeLabelSet + std::fmt::Debug + std::hash::Hash, + L: Eq + Clone + Send + Sync + 'static, +{ + pub fn register(reg: &mut prom::registry::Registry) -> Self { + let p2c = P2cMetricFamilies::register(reg.sub_registry_with_prefix("p2c")); + let queue = QueueMetricFamilies::register(reg.sub_registry_with_prefix("queue")); + Self { p2c, queue } + } + + pub fn metrics(&self, labels: &L) -> Metrics { + tracing::trace!(?labels, "Budilding metrics"); + Metrics { + p2c: self.p2c.metrics(labels), + queue: self.queue.metrics(labels), + } + } +} diff --git a/linkerd/proxy/balance/src/pool.rs b/linkerd/proxy/balance/src/pool.rs deleted file mode 100644 index 169f8ef6eb..0000000000 --- a/linkerd/proxy/balance/src/pool.rs +++ /dev/null @@ -1,198 +0,0 @@ -use crate::EwmaConfig; -use futures::prelude::*; -use linkerd_error::Error; -use linkerd_metrics::prom; -use linkerd_proxy_core::Resolve; -use linkerd_proxy_pool::PoolQueue; -use linkerd_stack::{layer, queue, ExtractParam, Gate, NewService, Param, Service}; -use std::{fmt::Debug, marker::PhantomData, net::SocketAddr}; -use tokio::time; -use tower::load::{self, PeakEwma}; - -mod p2c; - -pub use self::p2c::{P2cMetricFamilies, P2cMetrics, P2cPool}; -pub use linkerd_proxy_pool::{Pool, QueueMetricFamilies, QueueMetrics, Update}; - -#[derive(Clone, Debug)] -pub struct MetricFamilies { - queue: QueueMetricFamilies, - p2c: P2cMetricFamilies, -} - -#[derive(Clone, Debug)] -pub struct Metrics { - queue: QueueMetrics, - p2c: P2cMetrics, -} - -/// Configures a stack to resolve targets to balance requests over `N`-typed -/// endpoint stacks. -#[derive(Debug)] -pub struct NewBalancePeakEwma { - resolve: R, - inner: N, - params: X, - _marker: PhantomData C>, -} - -pub type Balance = Gate>; - -/// Wraps the inner services in [`PeakEwma`] services so their load is tracked -/// for the p2c balancer. -#[derive(Debug)] -pub struct NewPeakEwma { - config: EwmaConfig, - inner: N, - _marker: PhantomData C>, -} - -// === impl NewBalancePeakEwma === - -impl NewBalancePeakEwma { - pub fn new(inner: N, resolve: R, params: X) -> Self { - Self { - resolve, - inner, - params, - _marker: PhantomData, - } - } - - pub fn layer(resolve: R, params: X) -> impl layer::Layer + Clone - where - R: Clone, - X: Clone, - Self: NewService, - { - layer::mk(move |inner| Self::new(inner, resolve.clone(), params.clone())) - } -} - -impl NewService for NewBalancePeakEwma -where - T: Param + Param + Param + Clone + Send, - X: ExtractParam, - R: Resolve, - R::Resolution: Unpin, - R::Error: Send, - M: NewService + Clone, - N: NewService<(SocketAddr, R::Endpoint), Service = S> + Send + 'static, - S: Service + Send + 'static, - S::Future: Send + 'static, - S::Error: Into, - C: load::TrackCompletion + Default + Send + 'static, - Req: Send + 'static, - Balance as Service>::Future, Error>>: Service, -{ - type Service = Balance as Service>::Future, Error>>; - - fn new_service(&self, target: T) -> Self::Service { - // Initialize a resolution stream to discover endpoint updates. This - // stream should be effectively inifite (and, i.e., handle errors - // gracefully). - // - // If the resolution stream ends, the balancer will simply stop - // processing endpoint updates. - // - // If the resolution stream fails, the balancer will return an error. - let disco = self.resolve.resolve(target.clone()).try_flatten_stream(); - tracing::debug!("Resolving"); - - let queue::Capacity(capacity) = target.param(); - let queue::Timeout(failfast) = target.param(); - let metrics = self.params.extract_param(&target); - - // The pool wraps the inner endpoint stack so that its inner ready cache - // can be updated without requiring the service to process requests. - let pool = { - let ewma = target.param(); - tracing::debug!(?ewma); - let new_endpoint = self.inner.new_service(target); - P2cPool::new(metrics.p2c, NewPeakEwma::new(ewma, new_endpoint)) - }; - - // The queue runs on a dedicated task, owning the resolution stream and - // all of the inner endpoint services. A cloneable Service is returned - // that allows passing requests to the service. When all clones of the - // service are dropped, the queue task completes, dropping the - // resolution and all inner services. - tracing::debug!(capacity, ?failfast, "Spawning p2c pool queue"); - PoolQueue::spawn(capacity, failfast, metrics.queue, disco, pool) - } -} - -impl Clone for NewBalancePeakEwma { - fn clone(&self) -> Self { - Self { - resolve: self.resolve.clone(), - inner: self.inner.clone(), - params: self.params.clone(), - _marker: self._marker, - } - } -} - -// === impl NewPeakEwma === - -impl NewPeakEwma { - fn new(config: EwmaConfig, inner: N) -> Self { - Self { - config, - inner, - _marker: PhantomData, - } - } -} - -impl NewService for NewPeakEwma -where - C: load::TrackCompletion + Default, - N: NewService, - S: Service, -{ - type Service = PeakEwma; - - fn new_service(&self, target: T) -> Self::Service { - // Converts durations to nanos in f64. - // - // Due to a lossy transformation, the maximum value that can be - // represented is ~585 years, which, I hope, is more than enough to - // represent request latencies. - fn nanos(d: time::Duration) -> f64 { - const NANOS_PER_SEC: u64 = 1_000_000_000; - let n = f64::from(d.subsec_nanos()); - let s = d.as_secs().saturating_mul(NANOS_PER_SEC) as f64; - n + s - } - - PeakEwma::new( - self.inner.new_service(target), - self.config.default_rtt, - nanos(self.config.decay), - C::default(), - ) - } -} - -// === impl MetricFamilies === - -impl MetricFamilies -where - L: prom::encoding::EncodeLabelSet + std::fmt::Debug + std::hash::Hash, - L: Eq + Clone + Send + Sync + 'static, -{ - pub fn register(reg: &mut prom::registry::Registry) -> Self { - let p2c = P2cMetricFamilies::register(reg.sub_registry_with_prefix("p2c")); - let queue = QueueMetricFamilies::register(reg.sub_registry_with_prefix("queue")); - Self { p2c, queue } - } - - pub fn metrics(&self, labels: &L) -> Metrics { - tracing::trace!(?labels, "Budilding metrics"); - Metrics { - p2c: self.p2c.metrics(labels), - queue: self.queue.metrics(labels), - } - } -} diff --git a/linkerd/proxy/http/src/balance.rs b/linkerd/proxy/http/src/balance.rs index fba6a0e6c2..a002ddd3c4 100644 --- a/linkerd/proxy/http/src/balance.rs +++ b/linkerd/proxy/http/src/balance.rs @@ -3,5 +3,5 @@ pub use linkerd_proxy_balance::*; pub type Body = PendingUntilFirstDataBody; -pub type NewBalancePeakEwma = - linkerd_proxy_balance::NewBalancePeakEwma, X, R, N>; +pub type NewBalance = + linkerd_proxy_balance::NewBalance, X, R, N>; diff --git a/linkerd/proxy/http/src/lib.rs b/linkerd/proxy/http/src/lib.rs index cf6e2d3cd8..e4c15fea82 100644 --- a/linkerd/proxy/http/src/lib.rs +++ b/linkerd/proxy/http/src/lib.rs @@ -26,7 +26,7 @@ pub mod upgrade; pub mod version; pub use self::{ - balance::NewBalancePeakEwma, + balance::NewBalance, classify::{ Classify, ClassifyEos, ClassifyResponse, NewClassifyGate, NewClassifyGateSet, NewInsertClassifyResponse, diff --git a/linkerd/proxy/tcp/src/balance.rs b/linkerd/proxy/tcp/src/balance.rs index 51c5019ace..3d28bc9617 100644 --- a/linkerd/proxy/tcp/src/balance.rs +++ b/linkerd/proxy/tcp/src/balance.rs @@ -1,5 +1,5 @@ pub use linkerd_proxy_balance::*; pub use tower::load::CompleteOnResponse; -pub type NewBalancePeakEwma = - linkerd_proxy_balance::NewBalancePeakEwma; +pub type NewBalance = + linkerd_proxy_balance::NewBalance; diff --git a/linkerd/proxy/tcp/src/lib.rs b/linkerd/proxy/tcp/src/lib.rs index 6bce36b14f..b90e1cefd4 100644 --- a/linkerd/proxy/tcp/src/lib.rs +++ b/linkerd/proxy/tcp/src/lib.rs @@ -4,4 +4,4 @@ pub mod balance; pub mod forward; -pub use self::{balance::NewBalancePeakEwma, forward::Forward}; +pub use self::{balance::NewBalance, forward::Forward};