Skip to content

Commit

Permalink
feedback from @olix0r
Browse files Browse the repository at this point in the history
Signed-off-by: Zahari Dichev <[email protected]>
  • Loading branch information
zaharidichev committed Jan 9, 2024
1 parent 514e3f9 commit c9bdbb7
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 111 deletions.
6 changes: 4 additions & 2 deletions linkerd/app/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ pub use linkerd_transport_header as transport_header;
pub mod identity {
pub use linkerd_identity::*;
pub use linkerd_meshtls::*;
pub use linkerd_proxy_identity_client as client;
pub use linkerd_proxy_spire_client as spire_client;
pub mod client {
pub use linkerd_proxy_identity_client as linkerd;
pub use linkerd_proxy_spire_client as spire;
}
}

pub const CANONICAL_DST_HEADER: &str = "l5d-dst-canonical";
Expand Down
47 changes: 27 additions & 20 deletions linkerd/app/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
.unwrap_or(super::tap::Config::Disabled);

let identity = {
let (addr, certify, params) = identity_config?;
let (addr, certify, tls) = identity_config?;
// If the address doesn't have a server identity, then we're on localhost.
let connect = if addr.addr.is_loopback() {
inbound.proxy.connect.clone()
Expand All @@ -812,19 +812,17 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
} else {
outbound.http_request_queue.failfast_timeout
};
identity::Config {
provider: identity::Provider::ControlPlane {
certify,
control: ControlConfig {
addr,
connect,
buffer: QueueConfig {
capacity: DEFAULT_CONTROL_QUEUE_CAPACITY,
failfast_timeout,
},
identity::Config::Linkerd {
certify,
tls,
client: ControlConfig {
addr,
connect,
buffer: QueueConfig {
capacity: DEFAULT_CONTROL_QUEUE_CAPACITY,
failfast_timeout,
},
},
params,
}
};

Expand Down Expand Up @@ -1224,7 +1222,14 @@ pub fn parse_control_addr<S: Strings>(

pub fn parse_identity_config<S: Strings>(
strings: &S,
) -> Result<(ControlAddr, identity::certify::Config, identity::TlsParams), EnvError> {
) -> Result<
(
ControlAddr,
identity::client::linkerd::Config,
identity::TlsParams,
),
EnvError,
> {
let control = parse_control_addr(strings, ENV_IDENTITY_SVC_BASE);
let ta = parse(strings, ENV_IDENTITY_TRUST_ANCHORS, |s| {
if s.is_empty() {
Expand All @@ -1234,7 +1239,7 @@ pub fn parse_identity_config<S: Strings>(
});
let dir = parse(strings, ENV_IDENTITY_DIR, |ref s| Ok(PathBuf::from(s)));
let tok = parse(strings, ENV_IDENTITY_TOKEN_FILE, |ref s| {
identity::TokenSource::if_nonempty_file(s.to_string()).map_err(|e| {
identity::client::linkerd::TokenSource::if_nonempty_file(s.to_string()).map_err(|e| {
error!("Could not read {}: {}", ENV_IDENTITY_TOKEN_FILE, e);
ParseError::InvalidTokenSource
})
Expand Down Expand Up @@ -1265,17 +1270,19 @@ pub fn parse_identity_config<S: Strings>(
min_refresh,
max_refresh,
) => {
let certify = identity::certify::Config {
let certify = identity::client::linkerd::Config {
token,
min_refresh: min_refresh.unwrap_or(DEFAULT_IDENTITY_MIN_REFRESH),
max_refresh: max_refresh.unwrap_or(DEFAULT_IDENTITY_MAX_REFRESH),
documents: identity::certify::Documents::load(dir).map_err(|error| {
error!(%error, "Failed to read identity documents");
EnvError::InvalidEnvVar
})?,
documents: identity::client::linkerd::certify::Documents::load(dir).map_err(
|error| {
error!(%error, "Failed to read identity documents");
EnvError::InvalidEnvVar
},
)?,
};
let params = identity::TlsParams {
server_id: identity::Id::Dns(local_name.clone()),
id: identity::Id::Dns(local_name.clone()),
server_name: local_name,
trust_anchors_pem,
};
Expand Down
133 changes: 68 additions & 65 deletions linkerd/app/src/identity.rs
Original file line number Diff line number Diff line change
@@ -1,51 +1,41 @@
use crate::spire;
pub use linkerd_app_core::identity::{
client::{certify, TokenSource},
spire_client::Spire,
Id,
};

pub use linkerd_app_core::identity::{client, Id};
use linkerd_app_core::{
control, dns,
exp_backoff::{ExponentialBackoff, ExponentialBackoffStream},
identity::{client::Certify, creds, CertMetrics, Credentials, DerX509, Mode, WithCertMetrics},
identity::{
client::linkerd::Certify, creds, CertMetrics, Credentials, DerX509, Mode, WithCertMetrics,
},
metrics::{prom, ControlHttp as ClientMetrics},
Error, Result,
};
use std::{future::Future, pin::Pin, sync::Arc, time::SystemTime};
use std::{future::Future, pin::Pin, time::SystemTime};
use tokio::sync::watch;
use tracing::Instrument;

#[derive(Clone, Debug)]
#[allow(clippy::large_enum_variant)]
pub enum Provider {
ControlPlane {
control: control::Config,
certify: certify::Config,
pub enum Config {
Linkerd {
client: control::Config,
certify: client::linkerd::Config,
tls: TlsParams,
},
Spire {
client: spire::Config,
tls: TlsParams,
},
Spire(spire::Config),
}

#[derive(Clone, Debug)]
pub struct Config {
pub provider: Provider,
pub params: TlsParams,
}

#[derive(Clone, Debug)]
pub struct TlsParams {
pub server_id: Id,
pub id: Id,
pub server_name: dns::Name,
pub trust_anchors_pem: String,
}

#[derive(Clone)]
pub enum Addr {
Spire(Arc<String>),
Linkerd(control::ControlAddr),
}

pub struct Identity {
addr: Addr,
receiver: creds::Receiver,
ready: watch::Receiver<bool>,
task: Task,
Expand All @@ -72,68 +62,85 @@ impl Config {
client_metrics: ClientMetrics,
registry: &mut prom::Registry,
) -> Result<Identity> {
let name = self.params.server_name.clone();
let (store, receiver) = Mode::default().watch(
name.clone().into(),
name.clone(),
&self.params.trust_anchors_pem,
)?;

let (tx, ready) = watch::channel(false);
let cert_metrics =
CertMetrics::register(registry.sub_registry_with_prefix("identity_cert"));
let cred = WithCertMetrics::new(cert_metrics, NotifyReady { store, tx });

let identity = match self.provider {
Provider::ControlPlane { control, certify } => {
let certify = Certify::from(certify);
let addr = control.addr.clone();
Ok(match self {
Self::Linkerd {
client,
certify,
tls,
} => {
// TODO: move this validation into env.rs
let name = match (&tls.id, &tls.server_name) {
(Id::Dns(id), sni) if id == sni => id.clone(),
(_id, _sni) => {
return Err(
"Linkerd identity requires a TLS Id and server name to be the same"

Check warning on line 79 in linkerd/app/src/identity.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/src/identity.rs#L77-L79

Added lines #L77 - L79 were not covered by tests
.into(),
);
}
};

let task = Box::pin({
let addr = addr.clone();
let certify = Certify::from(certify);
let (store, receiver, ready) = watch(tls, cert_metrics)?;

let svc = control.build(
let task = {
let addr = client.addr.clone();
let svc = client.build(
dns,
client_metrics,
registry.sub_registry_with_prefix("control_identity"),
receiver.new_client(),
);

certify.run(name, cred, svc).instrument(
tracing::debug_span!("identity", server.addr = %addr).or_current(),
)
});

Box::pin(certify.run(name, store, svc).instrument(
tracing::info_span!("identity", server.addr = %addr).or_current(),
))
};
Identity {
addr: Addr::Linkerd(addr),
receiver,
ready,
task,
}
}
Provider::Spire(cfg) => {
let addr = cfg.socket_addr.clone();
let spire = Spire::new(self.params.server_id.clone());
let task = Box::pin({
let client = spire::Client::from(cfg);
spire.run(cred, client).instrument(
tracing::debug_span!("spire identity", server.addr = %addr).or_current(),
)
});
Self::Spire { client, tls } => {
let addr = client.socket_addr.clone();
let spire = spire::client::Spire::new(tls.id.clone());

Check warning on line 109 in linkerd/app/src/identity.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/src/identity.rs#L107-L109

Added lines #L107 - L109 were not covered by tests

let (store, receiver, ready) = watch(tls, cert_metrics)?;

Check warning on line 111 in linkerd/app/src/identity.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/src/identity.rs#L111

Added line #L111 was not covered by tests
let task =
Box::pin(spire.run(store, spire::Client::from(client)).instrument(
tracing::info_span!("spire", server.addr = %addr).or_current(),

Check warning on line 114 in linkerd/app/src/identity.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/src/identity.rs#L113-L114

Added lines #L113 - L114 were not covered by tests
));

Identity {
addr: Addr::Spire(addr),
receiver,
ready,

Check warning on line 119 in linkerd/app/src/identity.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/src/identity.rs#L117-L119

Added lines #L117 - L119 were not covered by tests
task,
}
}
};

Ok(identity)
})
}
}

fn watch(
tls: TlsParams,
metrics: CertMetrics,
) -> Result<(
WithCertMetrics<NotifyReady>,
creds::Receiver,
watch::Receiver<bool>,
)> {
let (tx, ready) = watch::channel(false);
let (store, receiver) =
Mode::default().watch(tls.id, tls.server_name, &tls.trust_anchors_pem)?;
let cred = WithCertMetrics::new(metrics, NotifyReady { store, tx });
Ok((cred, receiver, ready))
}

// === impl NotifyReady ===

impl Credentials for NotifyReady {
fn set_certificate(
&mut self,
Expand All @@ -151,10 +158,6 @@ impl Credentials for NotifyReady {
// === impl Identity ===

impl Identity {
pub fn addr(&self) -> Addr {
self.addr.clone()
}

/// Returns a future that is satisfied once certificates have been provisioned.
pub fn ready(&self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
let mut ready = self.ready.clone();
Expand Down
4 changes: 0 additions & 4 deletions linkerd/app/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,10 +345,6 @@ impl App {
self.identity.receiver().local_id().clone()

Check warning on line 345 in linkerd/app/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/src/lib.rs#L344-L345

Added lines #L344 - L345 were not covered by tests
}

pub fn identity_addr(&self) -> identity::Addr {
self.identity.addr()
}

pub fn opencensus_addr(&self) -> Option<&ControlAddr> {
match self.oc_collector {
oc_collector::OcCollector::Disabled { .. } => None,
Expand Down
7 changes: 4 additions & 3 deletions linkerd/app/src/spire.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
pub use linkerd_app_core::identity::spire_client;
use linkerd_app_core::{exp_backoff::ExponentialBackoff, Error};
use std::sync::Arc;
use tokio::net::UnixStream;
use tokio::sync::watch;
use tonic::transport::{Endpoint, Uri};

pub use linkerd_app_core::identity::client::spire as client;

const UNIX_PREFIX: &str = "unix:";
const TONIC_DEFAULT_URI: &str = "http://[::]:50051";

Expand All @@ -28,7 +29,7 @@ impl From<Config> for Client {
}

impl tower::Service<()> for Client {
type Response = tonic::Response<watch::Receiver<spire_client::SvidUpdate>>;
type Response = tonic::Response<watch::Receiver<client::SvidUpdate>>;
type Error = Error;
type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;

Expand Down Expand Up @@ -58,7 +59,7 @@ impl tower::Service<()> for Client {
}))
.await?;

Check warning on line 60 in linkerd/app/src/spire.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/src/spire.rs#L60

Added line #L60 was not covered by tests

let api = spire_client::Api::watch(chan, backoff);
let api = client::Api::watch(chan, backoff);
let receiver = api.spawn_watch(()).await?;

Check warning on line 63 in linkerd/app/src/spire.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/src/spire.rs#L62-L63

Added lines #L62 - L63 were not covered by tests

Ok(receiver)

Check warning on line 65 in linkerd/app/src/spire.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/src/spire.rs#L65

Added line #L65 was not covered by tests
Expand Down
5 changes: 4 additions & 1 deletion linkerd/proxy/identity-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,7 @@
pub mod certify;
mod token;

pub use self::{certify::Certify, token::TokenSource};
pub use self::{
certify::{Certify, Config},
token::TokenSource,
};
17 changes: 1 addition & 16 deletions linkerd2-proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ compile_error!(
"at least one of the following TLS implementations must be enabled: 'meshtls-boring', 'meshtls-rustls'"
);

use linkerd_app::{identity, trace, BindTcp, Config, BUILD_INFO};
use linkerd_app::{trace, BindTcp, Config, BUILD_INFO};
use linkerd_signal as signal;
use tokio::{sync::mpsc, time};
use tracing::{debug, info, warn};
Expand Down Expand Up @@ -83,21 +83,6 @@ fn main() {
info!("SNI is {}", app.local_server_name());
info!("Local identity is {}", app.local_tls_id());

match app.identity_addr() {
identity::Addr::Linkerd(cp_addr) => match cp_addr.identity.value() {
None => info!("Identity verified via {}", cp_addr.addr),
Some(tls) => {
info!(
"Identity verified via Control Plane {} ({})",
cp_addr.addr, tls.server_id
);
}
},
identity::Addr::Spire(spire_addr) => {
info!("Identity obtained via Spire {}", spire_addr);
}
}

let dst_addr = app.dst_addr();
match dst_addr.identity.value() {
None => info!("Destinations resolved via {}", dst_addr.addr),
Expand Down

0 comments on commit c9bdbb7

Please sign in to comment.