Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

identity: add spire identity client #2580

Merged
merged 17 commits into from
Jan 13, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,7 @@ dependencies = [
"linkerd-proxy-identity-client",
"linkerd-proxy-resolve",
"linkerd-proxy-server-policy",
"linkerd-proxy-spire-client",
"linkerd-proxy-tap",
"linkerd-proxy-tcp",
"linkerd-proxy-transport",
Expand Down Expand Up @@ -1913,6 +1914,28 @@ dependencies = [
"thiserror",
]

[[package]]
name = "linkerd-proxy-spire-client"
version = "0.1.0"
dependencies = [
"futures",
"linkerd-error",
"linkerd-exp-backoff",
"linkerd-identity",
"linkerd-proxy-http",
"linkerd-stack",
"linkerd-tonic-watch",
"rcgen",
"simple_asn1",
"spiffe-proto",
"tokio",
"tokio-test",
"tonic",
"tower",
"tracing",
"x509-parser",
]

[[package]]
name = "linkerd-proxy-tap"
version = "0.1.0"
Expand Down Expand Up @@ -3061,6 +3084,18 @@ dependencies = [
"libc",
]

[[package]]
name = "simple_asn1"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adc4e5204eb1910f40f9cfa375f6f05b68c3abac4b6fd879c8ff5e7ae8a0a085"
dependencies = [
"num-bigint",
"num-traits",
"thiserror",
"time",
]

[[package]]
name = "slab"
version = "0.4.8"
Expand Down Expand Up @@ -3096,6 +3131,17 @@ dependencies = [
"windows-sys 0.48.0",
]

[[package]]
name = "spiffe-proto"
version = "0.1.0"
dependencies = [
"bytes",
"prost",
"prost-types",
"tonic",
"tonic-build",
]

[[package]]
name = "spin"
version = "0.5.2"
Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ members = [
"linkerd/proxy/dns-resolve",
"linkerd/proxy/http",
"linkerd/proxy/identity-client",
"linkerd/proxy/spire-client",
"linkerd/proxy/resolve",
"linkerd/proxy/server-policy",
"linkerd/proxy/tap",
Expand All @@ -73,6 +74,7 @@ members = [
"linkerd/transport-metrics",
"linkerd2-proxy",
"opencensus-proto",
"spiffe-proto",
"tools",
]

Expand Down
1 change: 1 addition & 0 deletions linkerd/app/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ linkerd-proxy-client-policy = { path = "../../proxy/client-policy" }
linkerd-proxy-dns-resolve = { path = "../../proxy/dns-resolve" }
linkerd-proxy-http = { path = "../../proxy/http" }
linkerd-proxy-identity-client = { path = "../../proxy/identity-client" }
linkerd-proxy-spire-client = { path = "../../proxy/spire-client" }
linkerd-proxy-resolve = { path = "../../proxy/resolve" }
linkerd-proxy-server-policy = { path = "../../proxy/server-policy" }
linkerd-proxy-tap = { path = "../../proxy/tap" }
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub mod identity {
pub use linkerd_identity::*;
pub use linkerd_meshtls::*;
pub use linkerd_proxy_identity_client as client;
pub use linkerd_proxy_spire_client as spire_client;
}

pub const CANONICAL_DST_HEADER: &str = "l5d-dst-canonical";
Expand Down
16 changes: 9 additions & 7 deletions linkerd/app/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -813,13 +813,15 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
outbound.http_request_queue.failfast_timeout
};
identity::Config {
certify,
control: ControlConfig {
addr,
connect,
buffer: QueueConfig {
capacity: DEFAULT_CONTROL_QUEUE_CAPACITY,
failfast_timeout,
provider: identity::Provider::ControlPlane {
certify,
control: ControlConfig {
addr,
connect,
buffer: QueueConfig {
capacity: DEFAULT_CONTROL_QUEUE_CAPACITY,
failfast_timeout,
},
},
},
params,
Expand Down
106 changes: 72 additions & 34 deletions linkerd/app/src/identity.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::spire;
pub use linkerd_app_core::identity::{
client::{certify, TokenSource},
spire_client::Spire,
Id,
};
use linkerd_app_core::{
Expand All @@ -9,14 +11,23 @@
metrics::{prom, ControlHttp as ClientMetrics},
Error, Result,
};
use std::{future::Future, pin::Pin, time::SystemTime};
use std::{future::Future, pin::Pin, sync::Arc, time::SystemTime};
use tokio::sync::watch;
use tracing::Instrument;

#[derive(Clone, Debug)]
#[allow(clippy::large_enum_variant)]
pub enum Provider {
ControlPlane {
control: control::Config,
certify: certify::Config,
},
Spire(spire::Config),
}

#[derive(Clone, Debug)]
pub struct Config {
pub control: control::Config,
pub certify: certify::Config,
pub provider: Provider,
pub params: TlsParams,
}

Expand All @@ -27,8 +38,14 @@
pub trust_anchors_pem: String,
}

#[derive(Clone)]
pub enum Addr {
Spire(Arc<String>),
Linkerd(control::ControlAddr),
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to move this into the Provider type to avoid having redundant enums?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used in the App, here. It was returning a control::ControlAddr before.


pub struct Identity {
addr: control::ControlAddr,
addr: Addr,
receiver: creds::Receiver,
ready: watch::Receiver<bool>,
task: Task,
Expand Down Expand Up @@ -62,37 +79,58 @@
&self.params.trust_anchors_pem,
)?;

let certify = Certify::from(self.certify);

let addr = self.control.addr.clone();

let (tx, ready) = watch::channel(false);
let cert_metrics =
CertMetrics::register(registry.sub_registry_with_prefix("identity_cert"));
let cred = WithCertMetrics::new(cert_metrics, NotifyReady { store, tx });

let identity = match self.provider {
Provider::ControlPlane { control, certify } => {
let certify = Certify::from(certify);
let addr = control.addr.clone();

let task = Box::pin({
let addr = addr.clone();

let svc = control.build(
dns,
client_metrics,
registry.sub_registry_with_prefix("control_identity"),
receiver.new_client(),
);

certify.run(name, cred, svc).instrument(
tracing::debug_span!("identity", server.addr = %addr).or_current(),
)
});

Identity {
addr: Addr::Linkerd(addr),
receiver,
ready,
task,
}
}
Provider::Spire(cfg) => {
let addr = cfg.socket_addr.clone();
let spire = Spire::new(self.params.server_id.clone());
let task = Box::pin({
let client = spire::Client::from(cfg);
spire.run(cred, client).instrument(
tracing::debug_span!("spire identity", server.addr = %addr).or_current(),

Check warning on line 120 in linkerd/app/src/identity.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/src/identity.rs#L114-L120

Added lines #L114 - L120 were not covered by tests
)
});

Identity {
addr: Addr::Spire(addr),

Check warning on line 125 in linkerd/app/src/identity.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/src/identity.rs#L125

Added line #L125 was not covered by tests
receiver,
ready,
task,
}
}
};

// Save to be spawned on an auxiliary runtime.
let task = Box::pin({
let addr = addr.clone();
let svc = self.control.build(
dns,
client_metrics,
registry.sub_registry_with_prefix("control_identity"),
receiver.new_client(),
);

let cert_metrics =
CertMetrics::register(registry.sub_registry_with_prefix("identity_cert"));
let cred = WithCertMetrics::new(cert_metrics, NotifyReady { store, tx });

certify
.run(name, cred, svc)
.instrument(tracing::debug_span!("identity", server.addr = %addr).or_current())
});

Ok(Identity {
addr,
receiver,
ready,
task,
})
Ok(identity)
}
}

Expand All @@ -113,7 +151,7 @@
// === impl Identity ===

impl Identity {
pub fn addr(&self) -> control::ControlAddr {
pub fn addr(&self) -> Addr {

Check warning on line 154 in linkerd/app/src/identity.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/src/identity.rs#L154

Added line #L154 was not covered by tests
self.addr.clone()
}

Expand Down
7 changes: 6 additions & 1 deletion linkerd/app/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
pub mod identity;
pub mod oc_collector;
pub mod policy;
pub mod spire;
pub mod tap;

pub use self::metrics::Metrics;
Expand Down Expand Up @@ -340,7 +341,11 @@
self.identity.receiver().server_name().clone()
}

pub fn identity_addr(&self) -> ControlAddr {
pub fn local_tls_id(&self) -> identity::Id {
self.identity.receiver().local_id().clone()

Check warning on line 345 in linkerd/app/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/src/lib.rs#L344-L345

Added lines #L344 - L345 were not covered by tests
}

pub fn identity_addr(&self) -> identity::Addr {

Check warning on line 348 in linkerd/app/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/src/lib.rs#L348

Added line #L348 was not covered by tests
self.identity.addr()
}

Expand Down
67 changes: 67 additions & 0 deletions linkerd/app/src/spire.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
pub use linkerd_app_core::identity::spire_client;
use linkerd_app_core::{exp_backoff::ExponentialBackoff, Error};
use std::sync::Arc;
use tokio::net::UnixStream;
use tokio::sync::watch;
use tonic::transport::{Endpoint, Uri};

const UNIX_PREFIX: &str = "unix:";
const TONIC_DEFAULT_URI: &str = "http://[::]:50051";

#[derive(Clone, Debug)]
pub struct Config {
pub(crate) socket_addr: Arc<String>,
pub(crate) backoff: ExponentialBackoff,
}

// Connects to SPIRE workload API via Unix Domain Socket
pub struct Client {
config: Config,
}

// === impl Client ===

impl From<Config> for Client {
fn from(config: Config) -> Self {

Check warning on line 25 in linkerd/app/src/spire.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/src/spire.rs#L25

Added line #L25 was not covered by tests
Self { config }
}
}

impl tower::Service<()> for Client {
type Response = tonic::Response<watch::Receiver<spire_client::SvidUpdate>>;
type Error = Error;
type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(

Check warning on line 35 in linkerd/app/src/spire.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/src/spire.rs#L35

Added line #L35 was not covered by tests
&mut self,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))

Check warning on line 39 in linkerd/app/src/spire.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/src/spire.rs#L39

Added line #L39 was not covered by tests
}

fn call(&mut self, _req: ()) -> Self::Future {
let socket = self.config.socket_addr.clone();
let backoff = self.config.backoff;
Box::pin(async move {

Check warning on line 45 in linkerd/app/src/spire.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/src/spire.rs#L42-L45

Added lines #L42 - L45 were not covered by tests
// Strip the 'unix:' prefix for tonic compatibility.
let stripped_path = socket

Check warning on line 47 in linkerd/app/src/spire.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/src/spire.rs#L47

Added line #L47 was not covered by tests
.strip_prefix(UNIX_PREFIX)
.unwrap_or(socket.as_str())
.to_string();

Check warning on line 50 in linkerd/app/src/spire.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/src/spire.rs#L49-L50

Added lines #L49 - L50 were not covered by tests

// We will ignore this uri because uds do not use it
// if your connector does use the uri it will be provided
// as the request to the `MakeConnection`.
let chan = Endpoint::try_from(TONIC_DEFAULT_URI)?
.connect_with_connector(tower::util::service_fn(move |_: Uri| {
UnixStream::connect(stripped_path.clone())

Check warning on line 57 in linkerd/app/src/spire.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/src/spire.rs#L55-L57

Added lines #L55 - L57 were not covered by tests
}))
.await?;

Check warning on line 59 in linkerd/app/src/spire.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/src/spire.rs#L59

Added line #L59 was not covered by tests

let api = spire_client::Api::watch(chan, backoff);
let receiver = api.spawn_watch(()).await?;

Check warning on line 62 in linkerd/app/src/spire.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/src/spire.rs#L61-L62

Added lines #L61 - L62 were not covered by tests

Ok(receiver)

Check warning on line 64 in linkerd/app/src/spire.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/src/spire.rs#L64

Added line #L64 was not covered by tests
})
}
}
27 changes: 27 additions & 0 deletions linkerd/proxy/spire-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[package]
name = "linkerd-proxy-spire-client"
version = "0.1.0"
authors = ["Linkerd Developers <[email protected]>"]
license = "Apache-2.0"
edition = "2021"
publish = false

[dependencies]
futures = { version = "0.3", default-features = false }
linkerd-error = { path = "../../error" }
linkerd-proxy-http = { path = "../../proxy/http" }
linkerd-identity = { path = "../../identity" }
spiffe-proto = { path = "../../../spiffe-proto" }
linkerd-tonic-watch = { path = "../../tonic-watch" }
linkerd-exp-backoff = { path = "../../exp-backoff" }
linkerd-stack = { path = "../../stack" }
tokio = { version = "1", features = ["time", "sync"] }
tonic = "0.10"
tower = "0.4"
tracing = "0.1"
x509-parser = "0.15.1"
asn1 = { version = "0.6", package = "simple_asn1" }

[dev-dependencies]
rcgen = "0.11.3"
tokio-test = "0.4"
Loading
Loading