Skip to content

Commit

Permalink
pool: Decompose the pool and balancer crates
Browse files Browse the repository at this point in the history
* Pool::update_pool is replaced with a narrower API:
  * Avoids need for unneeded allocations;
  * Slightly modifies the p2c endpoint update metrics, removing the DNE
    label.
* Split up crates:
  * Move the pool trait to a minimal crate
  * Move the balancer's P2cPool types into a library crate
* Rename NewBalancePeakEwma to NewBalance
  • Loading branch information
olix0r committed Dec 29, 2023
1 parent f191eac commit 4c15a89
Show file tree
Hide file tree
Showing 26 changed files with 494 additions and 461 deletions.
89 changes: 50 additions & 39 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -653,17 +653,6 @@ version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964"

[[package]]
name = "futures-macro"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.39",
]

[[package]]
name = "futures-sink"
version = "0.3.28"
Expand All @@ -685,7 +674,6 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
Expand Down Expand Up @@ -1670,6 +1658,37 @@ dependencies = [
"tracing",
]

[[package]]
name = "linkerd-pool"
version = "0.1.0"
dependencies = [
"tower-service",
]

[[package]]
name = "linkerd-pool-p2c"
version = "0.1.0"
dependencies = [
"ahash",
"futures",
"futures-util",
"indexmap",
"linkerd-error",
"linkerd-metrics",
"linkerd-pool",
"linkerd-stack",
"linkerd-tracing",
"parking_lot",
"prometheus-client",
"quickcheck",
"rand",
"tokio",
"tokio-test",
"tower",
"tower-test",
"tracing",
]

[[package]]
name = "linkerd-proxy-api-resolve"
version = "0.1.0"
Expand Down Expand Up @@ -1697,25 +1716,39 @@ version = "0.1.0"
dependencies = [
"ahash",
"futures",
"futures-util",
"indexmap",
"linkerd-error",
"linkerd-metrics",
"linkerd-pool-p2c",
"linkerd-proxy-balance-queue",
"linkerd-proxy-core",
"linkerd-stack",
"prometheus-client",
"rand",
"tokio",
"tower",
"tracing",
]

[[package]]
name = "linkerd-proxy-balance-queue"
version = "0.1.0"
dependencies = [
"futures",
"linkerd-error",
"linkerd-metrics",
"linkerd-pool",
"linkerd-proxy-core",
"linkerd-proxy-pool",
"linkerd-stack",
"linkerd-tracing",
"parking_lot",
"pin-project",
"prometheus-client",
"quickcheck",
"rand",
"thiserror",
"tokio",
"tokio-stream",
"tokio-test",
"tokio-util",
"tower",
"tower-test",
"tracing",
]
Expand Down Expand Up @@ -1820,28 +1853,6 @@ dependencies = [
"tracing",
]

[[package]]
name = "linkerd-proxy-pool"
version = "0.1.0"
dependencies = [
"futures",
"linkerd-error",
"linkerd-metrics",
"linkerd-proxy-core",
"linkerd-stack",
"linkerd-tracing",
"parking_lot",
"pin-project",
"prometheus-client",
"thiserror",
"tokio",
"tokio-stream",
"tokio-test",
"tokio-util",
"tower-test",
"tracing",
]

[[package]]
name = "linkerd-proxy-resolve"
version = "0.1.0"
Expand Down
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,16 @@ members = [
"linkerd/meshtls/verifier",
"linkerd/metrics",
"linkerd/opencensus",
"linkerd/pool",
"linkerd/pool/p2c",
"linkerd/proxy/api-resolve",
"linkerd/proxy/balance",
"linkerd/proxy/balance/queue",
"linkerd/proxy/client-policy",
"linkerd/proxy/core",
"linkerd/proxy/dns-resolve",
"linkerd/proxy/http",
"linkerd/proxy/identity-client",
"linkerd/proxy/pool",
"linkerd/proxy/resolve",
"linkerd/proxy/server-policy",
"linkerd/proxy/tap",
Expand Down
9 changes: 2 additions & 7 deletions linkerd/app/core/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,17 +242,12 @@ mod balance {
recover: R,
) -> impl svc::Layer<
N,
Service = http::NewBalancePeakEwma<
B,
Params,
recover::Resolve<R, DnsResolve>,
NewIntoTarget<N>,
>,
Service = http::NewBalance<B, Params, recover::Resolve<R, DnsResolve>, NewIntoTarget<N>>,
> {
let resolve = recover::Resolve::new(recover, DnsResolve::new(dns));
let metrics = Params(http::balance::MetricFamilies::register(registry));
svc::layer::mk(move |inner| {
http::NewBalancePeakEwma::new(NewIntoTarget { inner }, resolve.clone(), metrics.clone())
http::NewBalance::new(NewIntoTarget { inner }, resolve.clone(), metrics.clone())
})
}

Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/outbound/src/http/concrete/balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ where
.push(svc::ArcNewService::layer());

endpoint
.push(http::NewBalancePeakEwma::layer(
.push(http::NewBalance::layer(
resolve.clone(),
metrics_params.clone(),
))
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/outbound/src/opaq/concrete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl<C> Outbound<C> {
},
)
.lift_new_with_target()
.push(tcp::NewBalancePeakEwma::layer(resolve, metrics_params))
.push(tcp::NewBalance::layer(resolve, metrics_params))
.push(svc::NewMapErr::layer_from_target::<ConcreteError, _>())
.push_on_service(
rt.metrics
Expand Down
9 changes: 9 additions & 0 deletions linkerd/pool/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[package]
name = "linkerd-pool"
version = "0.1.0"
license = "Apache-2.0"
edition = "2021"
publish = false

[dependencies]
tower-service = "0.3"
34 changes: 34 additions & 0 deletions linkerd/pool/p2c/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
[package]
name = "linkerd-pool-p2c"
version = "0.1.0"
authors = ["Linkerd Developers <[email protected]>"]
license = "Apache-2.0"
edition = "2021"
publish = false

[dependencies]
ahash = "0.8"
futures = { version = "0.3", default-features = false }
indexmap = "1"
prometheus-client = "0.22"
rand = { version = "0.8", features = ["small_rng"] }
tokio = { version = "1", features = ["rt", "sync", "time"] }
tracing = "0.1"

linkerd-error = { path = "../../error" }
linkerd-metrics = { path = "../../metrics" }
linkerd-pool = { path = ".." }
linkerd-stack = { path = "../../stack" }

[dependencies.tower]
version = "0.4.13"
default-features = false
features = ["load", "ready-cache"]

[dev-dependencies]
futures-util = { version = "0.3", default-features = false }
linkerd-tracing = { path = "../../tracing" }
parking_lot = "0.12"
quickcheck = { version = "1", default-features = false }
tokio-test = "0.4"
tower-test = "0.4"
Loading

0 comments on commit 4c15a89

Please sign in to comment.