From c156aa24d0d3a24a794974fc363af519bc99f98d Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Wed, 20 Dec 2023 12:29:08 +0000 Subject: [PATCH 01/13] identity: add spire identity client Signed-off-by: Zahari Dichev --- Cargo.lock | 56 ++++++ Cargo.toml | 3 + linkerd/app/core/Cargo.toml | 2 + linkerd/app/core/src/lib.rs | 1 + linkerd/app/src/identity.rs | 4 +- .../proxy/identity-client-metrics/Cargo.toml | 11 ++ .../src/lib.rs} | 7 +- linkerd/proxy/identity-client/Cargo.toml | 1 + linkerd/proxy/identity-client/src/certify.rs | 3 +- linkerd/proxy/identity-client/src/lib.rs | 3 +- linkerd/proxy/spire-client/Cargo.toml | 27 +++ linkerd/proxy/spire-client/src/api.rs | 151 ++++++++++++++++ linkerd/proxy/spire-client/src/client.rs | 64 +++++++ linkerd/proxy/spire-client/src/lib.rs | 167 ++++++++++++++++++ spiffe-proto/Cargo.toml | 25 +++ spiffe-proto/spiffe/proto/workload.proto | 55 ++++++ spiffe-proto/src/gen/spiffe.workloadapi.rs | 162 +++++++++++++++++ spiffe-proto/src/lib.rs | 11 ++ spiffe-proto/tests/bootstrap.rs | 47 +++++ 19 files changed, 793 insertions(+), 7 deletions(-) create mode 100644 linkerd/proxy/identity-client-metrics/Cargo.toml rename linkerd/proxy/{identity-client/src/metrics.rs => identity-client-metrics/src/lib.rs} (86%) create mode 100644 linkerd/proxy/spire-client/Cargo.toml create mode 100644 linkerd/proxy/spire-client/src/api.rs create mode 100644 linkerd/proxy/spire-client/src/client.rs create mode 100644 linkerd/proxy/spire-client/src/lib.rs create mode 100644 spiffe-proto/Cargo.toml create mode 100644 spiffe-proto/spiffe/proto/workload.proto create mode 100644 spiffe-proto/src/gen/spiffe.workloadapi.rs create mode 100644 spiffe-proto/src/lib.rs create mode 100644 spiffe-proto/tests/bootstrap.rs diff --git a/Cargo.lock b/Cargo.lock index 693a1690f6..644b3b1600 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1141,8 +1141,10 @@ dependencies = [ "linkerd-proxy-dns-resolve", "linkerd-proxy-http", "linkerd-proxy-identity-client", + "linkerd-proxy-identity-client-metrics", "linkerd-proxy-resolve", "linkerd-proxy-server-policy", + "linkerd-proxy-spire-client", "linkerd-proxy-tap", "linkerd-proxy-tcp", "linkerd-proxy-transport", @@ -1807,6 +1809,7 @@ dependencies = [ "linkerd-error", "linkerd-identity", "linkerd-metrics", + "linkerd-proxy-identity-client-metrics", "linkerd-stack", "linkerd2-proxy-api", "parking_lot", @@ -1817,6 +1820,14 @@ dependencies = [ "tracing", ] +[[package]] +name = "linkerd-proxy-identity-client-metrics" +version = "0.1.0" +dependencies = [ + "linkerd-metrics", + "parking_lot", +] + [[package]] name = "linkerd-proxy-pool" version = "0.1.0" @@ -1866,6 +1877,28 @@ dependencies = [ "thiserror", ] +[[package]] +name = "linkerd-proxy-spire-client" +version = "0.1.0" +dependencies = [ + "futures", + "linkerd-error", + "linkerd-exp-backoff", + "linkerd-identity", + "linkerd-proxy-http", + "linkerd-proxy-identity-client-metrics", + "linkerd-stack", + "linkerd-tonic-watch", + "rcgen", + "simple_asn1", + "spiffe-proto", + "tokio", + "tonic", + "tower", + "tracing", + "x509-parser", +] + [[package]] name = "linkerd-proxy-tap" version = "0.1.0" @@ -2984,6 +3017,18 @@ dependencies = [ "libc", ] +[[package]] +name = "simple_asn1" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adc4e5204eb1910f40f9cfa375f6f05b68c3abac4b6fd879c8ff5e7ae8a0a085" +dependencies = [ + "num-bigint", + "num-traits", + "thiserror", + "time", +] + [[package]] name = "slab" version = "0.4.8" @@ -3019,6 +3064,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "spiffe-proto" +version = "0.1.0" +dependencies = [ + "bytes", + "prost", + "prost-types", + "tonic", + "tonic-build", +] + [[package]] name = "spin" version = "0.5.2" diff --git a/Cargo.toml b/Cargo.toml index 5b23ede593..1e7918b916 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,8 @@ members = [ "linkerd/proxy/dns-resolve", "linkerd/proxy/http", "linkerd/proxy/identity-client", + "linkerd/proxy/identity-client-metrics", + "linkerd/proxy/spire-client", "linkerd/proxy/pool", "linkerd/proxy/resolve", "linkerd/proxy/server-policy", @@ -69,6 +71,7 @@ members = [ "linkerd/transport-metrics", "linkerd2-proxy", "opencensus-proto", + "spiffe-proto", "tools", ] diff --git a/linkerd/app/core/Cargo.toml b/linkerd/app/core/Cargo.toml index 2a7090ae0b..077a3e687e 100644 --- a/linkerd/app/core/Cargo.toml +++ b/linkerd/app/core/Cargo.toml @@ -43,6 +43,8 @@ linkerd-proxy-client-policy = { path = "../../proxy/client-policy" } linkerd-proxy-dns-resolve = { path = "../../proxy/dns-resolve" } linkerd-proxy-http = { path = "../../proxy/http" } linkerd-proxy-identity-client = { path = "../../proxy/identity-client" } +linkerd-proxy-identity-client-metrics = { path = "../../proxy/identity-client-metrics" } +linkerd-proxy-spire-client = { path = "../../proxy/spire-client" } linkerd-proxy-resolve = { path = "../../proxy/resolve" } linkerd-proxy-server-policy = { path = "../../proxy/server-policy" } linkerd-proxy-tap = { path = "../../proxy/tap" } diff --git a/linkerd/app/core/src/lib.rs b/linkerd/app/core/src/lib.rs index b7c1779699..dc907de0d1 100644 --- a/linkerd/app/core/src/lib.rs +++ b/linkerd/app/core/src/lib.rs @@ -51,6 +51,7 @@ pub mod identity { pub use linkerd_identity::*; pub use linkerd_meshtls::*; pub use linkerd_proxy_identity_client as client; + pub use linkerd_proxy_identity_client_metrics as client_metrics; } pub const CANONICAL_DST_HEADER: &str = "l5d-dst-canonical"; diff --git a/linkerd/app/src/identity.rs b/linkerd/app/src/identity.rs index 1f61b0111e..9abde56cbf 100644 --- a/linkerd/app/src/identity.rs +++ b/linkerd/app/src/identity.rs @@ -6,8 +6,8 @@ use linkerd_app_core::{ control, dns, exp_backoff::{ExponentialBackoff, ExponentialBackoffStream}, identity::{ - client::{Certify, Metrics as IdentityMetrics}, - creds, Credentials, DerX509, Mode, + client::Certify, client_metrics::Metrics as IdentityMetrics, creds, Credentials, DerX509, + Mode, }, metrics::{prom, ControlHttp as ClientMetrics}, Error, Result, diff --git a/linkerd/proxy/identity-client-metrics/Cargo.toml b/linkerd/proxy/identity-client-metrics/Cargo.toml new file mode 100644 index 0000000000..57fe9222f3 --- /dev/null +++ b/linkerd/proxy/identity-client-metrics/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "linkerd-proxy-identity-client-metrics" +version = "0.1.0" +authors = ["Linkerd Developers "] +license = "Apache-2.0" +edition = "2021" +publish = false + +[dependencies] +linkerd-metrics = { path = "../../metrics" } +parking_lot = "0.12" diff --git a/linkerd/proxy/identity-client/src/metrics.rs b/linkerd/proxy/identity-client-metrics/src/lib.rs similarity index 86% rename from linkerd/proxy/identity-client/src/metrics.rs rename to linkerd/proxy/identity-client-metrics/src/lib.rs index 0b3f499f78..de0a2f60af 100644 --- a/linkerd/proxy/identity-client/src/metrics.rs +++ b/linkerd/proxy/identity-client-metrics/src/lib.rs @@ -1,3 +1,6 @@ +#![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)] +#![forbid(unsafe_code)] + use linkerd_metrics::{metrics, Counter, FmtMetrics, Gauge}; use parking_lot::Mutex; use std::{ @@ -12,7 +15,7 @@ metrics! { }, identity_cert_refresh_count: Counter { - "The total number of times this proxy's mTLS identity certificate has been refreshed by the Identity service." + "The total number of times this proxy's mTLS identity certificate has been refreshed by the Identity provider." } } @@ -32,7 +35,7 @@ impl Default for Metrics { } impl Metrics { - pub(crate) fn refresh(&self, expiry: SystemTime) { + pub fn refresh(&self, expiry: SystemTime) { self.refreshes.incr(); *self.expiry.lock() = expiry; } diff --git a/linkerd/proxy/identity-client/Cargo.toml b/linkerd/proxy/identity-client/Cargo.toml index 8e68cc3f18..4f8b81eec6 100644 --- a/linkerd/proxy/identity-client/Cargo.toml +++ b/linkerd/proxy/identity-client/Cargo.toml @@ -12,6 +12,7 @@ linkerd2-proxy-api = { version = "0.12", features = ["identity"] } linkerd-dns-name = { path = "../../dns/name" } linkerd-error = { path = "../../error" } linkerd-identity = { path = "../../identity" } +linkerd-proxy-identity-client-metrics = { path = "../identity-client-metrics" } linkerd-metrics = { path = "../../metrics" } linkerd-stack = { path = "../../stack" } parking_lot = "0.12" diff --git a/linkerd/proxy/identity-client/src/certify.rs b/linkerd/proxy/identity-client/src/certify.rs index 714c1def17..20710671d4 100644 --- a/linkerd/proxy/identity-client/src/certify.rs +++ b/linkerd/proxy/identity-client/src/certify.rs @@ -1,9 +1,10 @@ -use crate::{Metrics, TokenSource}; +use crate::TokenSource; use http_body::Body; use linkerd2_proxy_api::identity::{self as api, identity_client::IdentityClient}; use linkerd_dns_name::Name; use linkerd_error::{Error, Result}; use linkerd_identity::{Credentials, DerX509}; +use linkerd_proxy_identity_client_metrics::Metrics; use linkerd_stack::NewService; use std::{ path::PathBuf, diff --git a/linkerd/proxy/identity-client/src/lib.rs b/linkerd/proxy/identity-client/src/lib.rs index 76ce0c79fd..b2dfce139a 100644 --- a/linkerd/proxy/identity-client/src/lib.rs +++ b/linkerd/proxy/identity-client/src/lib.rs @@ -2,7 +2,6 @@ #![forbid(unsafe_code)] pub mod certify; -pub mod metrics; mod token; -pub use self::{certify::Certify, metrics::Metrics, token::TokenSource}; +pub use self::{certify::Certify, token::TokenSource}; diff --git a/linkerd/proxy/spire-client/Cargo.toml b/linkerd/proxy/spire-client/Cargo.toml new file mode 100644 index 0000000000..83ca98ddf2 --- /dev/null +++ b/linkerd/proxy/spire-client/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "linkerd-proxy-spire-client" +version = "0.1.0" +authors = ["Linkerd Developers "] +license = "Apache-2.0" +edition = "2021" +publish = false + +[dependencies] +futures = { version = "0.3", default-features = false } +linkerd-error = { path = "../../error" } +linkerd-proxy-http = { path = "../../proxy/http" } +linkerd-identity = { path = "../../identity" } +spiffe-proto = { path = "../../../spiffe-proto" } +linkerd-tonic-watch = { path = "../../tonic-watch" } +linkerd-exp-backoff = { path = "../../exp-backoff" } +linkerd-proxy-identity-client-metrics = { path = "../identity-client-metrics" } +linkerd-stack = { path = "../../stack" } +tokio = { version = "1", features = ["time", "sync"] } +tonic = "0.10" +tower = "0.4" +tracing = "0.1" +x509-parser = "0.15.1" +asn1 = { version = "0.6", package = "simple_asn1" } + +[dev-dependencies] +rcgen = "0.11.3" diff --git a/linkerd/proxy/spire-client/src/api.rs b/linkerd/proxy/spire-client/src/api.rs new file mode 100644 index 0000000000..c199996b48 --- /dev/null +++ b/linkerd/proxy/spire-client/src/api.rs @@ -0,0 +1,151 @@ +use futures::prelude::*; +use linkerd_error::{Error, Recover, Result}; +use linkerd_exp_backoff::{ExponentialBackoff, ExponentialBackoffStream}; +use linkerd_identity::DerX509; +use linkerd_identity::Id; +use linkerd_proxy_http as http; +use linkerd_tonic_watch::StreamWatch; +use spiffe_proto::client::{ + self as api, spiffe_workload_api_client::SpiffeWorkloadApiClient as Client, +}; +use std::collections::HashMap; +use tower::Service; +use tracing::error; + +#[derive(Clone, Debug)] +pub struct Svid { + pub spiffe_id: Id, + pub leaf: DerX509, + pub private_key: Vec, + pub intermediates: Vec, +} + +#[derive(Clone, Debug)] +pub struct SvidUpdate { + pub svids: HashMap, +} + +#[derive(Clone, Debug)] +pub(crate) struct Api { + client: Client, +} + +#[derive(Clone)] +pub(crate) struct GrpcRecover(ExponentialBackoff); + +pub(crate) type Watch = StreamWatch>; + +// === impl Svid === + +impl TryFrom for Svid { + type Error = Error; + fn try_from(proto: api::X509svid) -> Result { + let cert_der_blocks = asn1::from_der(&proto.x509_svid)?; + let (leaf, intermediates) = match cert_der_blocks.split_first() { + None => return Err("empty cert chain".into()), + Some((leaf_block, intermediates_block)) => { + let leaf = DerX509(asn1::to_der(leaf_block)?); + let mut intermediates = vec![]; + for block in intermediates_block.iter() { + let cert_der = asn1::to_der(block)?; + intermediates.push(DerX509(cert_der)); + } + (leaf, intermediates) + } + }; + + let spiffe_id = Id::parse_uri(&proto.spiffe_id)?; + + Ok(Svid { + spiffe_id, + leaf: leaf.clone(), + private_key: proto.x509_svid_key, + intermediates: intermediates.to_vec(), + }) + } +} + +// === impl Api === + +impl Api +where + S: tonic::client::GrpcService + Clone, + S::Error: Into, + S::ResponseBody: Default + http::HttpBody + Send + 'static, + ::Error: Into + Send, +{ + pub(super) fn watch(client: S, backoff: ExponentialBackoff) -> Watch { + let client = Client::new(client); + StreamWatch::new(GrpcRecover(backoff), Self { client }) + } +} + +impl Service<()> for Api +where + S: tonic::client::GrpcService + Clone, + S: Clone + Send + Sync + 'static, + S::ResponseBody: Default + http::HttpBody + Send + 'static, + ::Error: Into + Send, + S::Future: Send + 'static, +{ + type Response = + tonic::Response>>; + type Error = tonic::Status; + type Future = futures::future::BoxFuture<'static, Result>; + + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + + fn call(&mut self, _: ()) -> Self::Future { + let req = api::X509svidRequest {}; + let mut client = self.client.clone(); + Box::pin(async move { + let rsp = client.fetch_x509svid(tonic::Request::new(req)).await?; + Ok(rsp.map(|svids| { + svids + .map_ok(move |s| { + let svids = s + .svids + .into_iter() + .filter_map(|proto| { + let svid: Option = proto + .try_into() + .map_err(|err| error!("could not parse SVID: {}", err)) + .ok(); + + svid.map(|svid| (svid.spiffe_id.clone(), svid)) + }) + .collect(); + + SvidUpdate { svids } + }) + .boxed() + })) + }) + } +} + +// === impl GrpcRecover === + +impl Recover for GrpcRecover { + type Backoff = ExponentialBackoffStream; + + fn recover(&self, status: tonic::Status) -> Result { + if status.code() == tonic::Code::InvalidArgument + || status.code() == tonic::Code::FailedPrecondition + { + return Err(status); + } + + tracing::warn!( + grpc.status = %status.code(), + grpc.message = status.message(), + "Unexpected policy SPIRE Workload API response; retrying with a backoff", + ); + Ok(self.0.stream()) + } +} diff --git a/linkerd/proxy/spire-client/src/client.rs b/linkerd/proxy/spire-client/src/client.rs new file mode 100644 index 0000000000..25563e3ec5 --- /dev/null +++ b/linkerd/proxy/spire-client/src/client.rs @@ -0,0 +1,64 @@ +use crate::api::{Api, SvidUpdate}; +use linkerd_error::Error; +use linkerd_exp_backoff::ExponentialBackoff; +use std::sync::Arc; +use tokio::net::UnixStream; +use tokio::sync::watch; +use tonic::transport::{Endpoint, Uri}; + +const UNIX_PREFIX: &str = "unix:"; +const TONIC_DEFAULT_URI: &str = "http://[::]:50051"; + +#[derive(Clone, Debug)] +pub struct Client { + socket: Arc, + backoff: ExponentialBackoff, +} + +impl Client { + pub fn new(socket: Arc, backoff: ExponentialBackoff) -> Self { + Self { socket, backoff } + } +} + +// === impl Client === + +impl tower::Service<()> for Client { + type Response = watch::Receiver; + type Error = Error; + type Future = futures::future::BoxFuture<'static, Result>; + + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + + fn call(&mut self, _req: ()) -> Self::Future { + let socket = self.socket.clone(); + let backoff = self.backoff; + Box::pin(async move { + //spiffe::workload_api::client::WorkloadApiClient + // Strip the 'unix:' prefix for tonic compatibility. + let stripped_path = socket + .strip_prefix(UNIX_PREFIX) + .unwrap_or(socket.as_str()) + .to_string(); + + // We will ignore this uri because uds do not use it + // if your connector does use the uri it will be provided + // as the request to the `MakeConnection`. + let chan = Endpoint::try_from(TONIC_DEFAULT_URI)? + .connect_with_connector(tower::util::service_fn(move |_: Uri| { + UnixStream::connect(stripped_path.clone()) + })) + .await?; + + let api = Api::watch(chan, backoff); + let receiver = api.spawn_watch(()).await?.into_inner(); + + Ok(receiver) + }) + } +} diff --git a/linkerd/proxy/spire-client/src/lib.rs b/linkerd/proxy/spire-client/src/lib.rs new file mode 100644 index 0000000000..fcb55c0146 --- /dev/null +++ b/linkerd/proxy/spire-client/src/lib.rs @@ -0,0 +1,167 @@ +#![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)] +#![forbid(unsafe_code)] + +mod api; +mod client; + +pub use self::client::Client; + +use api::SvidUpdate; +use linkerd_error::{Error, Result}; +use linkerd_identity::Credentials; +use linkerd_identity::Id; +use linkerd_proxy_identity_client_metrics::Metrics; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use tokio::sync::watch; +use tower::Service; +use tracing::error; + +pub struct Spire { + id: Id, + metrics: Metrics, +} + +// === impl Spire === + +impl Spire { + pub fn new(id: Id, metrics: Metrics) -> Self { + Self { id, metrics } + } + + pub async fn run(self, credentials: C, mut new_client: N) + where + C: Credentials, + N: Service<(), Response = watch::Receiver>, + N::Error: Into, + { + match new_client.call(()).await { + Ok(rx) => consume_updates(&self.id, rx, credentials, &self.metrics).await, + Err(error) => error!("could not establish SVID stream: {}", error.into()), + } + } +} + +async fn consume_updates( + id: &Id, + mut updates: watch::Receiver, + mut credentials: C, + metrics: &Metrics, +) where + C: Credentials, +{ + loop { + let svid_update = updates.borrow_and_update().clone(); + match process_svid(&mut credentials, svid_update, id) { + Ok(expiration) => metrics.refresh(expiration), + Err(error) => tracing::error!("Error processing SVID update: {}", error), + } + + if let Err(error) = updates.changed().await { + tracing::debug!("SVID watch closed; terminating {}", error); + return; + } + } +} + +fn process_svid(credentials: &mut C, mut update: SvidUpdate, id: &Id) -> Result +where + C: Credentials, +{ + if let Some(svid) = update.svids.remove(id) { + use x509_parser::prelude::*; + + let (_, parsed_cert) = X509Certificate::from_der(&svid.leaf.0)?; + let exp: u64 = parsed_cert.validity().not_after.timestamp().try_into()?; + let exp = UNIX_EPOCH + Duration::from_secs(exp); + + credentials.set_certificate(svid.leaf, svid.intermediates, svid.private_key)?; + return Ok(exp); + } + + Err("could not find an SVID".into()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::api::Svid; + use linkerd_error::Result; + use linkerd_identity::{Credentials, DerX509, Id}; + use rcgen::{Certificate, CertificateParams, SanType, SerialNumber}; + use std::collections::HashMap; + use tokio::sync::watch; + + fn gen_cert(subject_alt_names: Vec, serial: SerialNumber) -> DerX509 { + let mut params = CertificateParams::default(); + params.subject_alt_names = subject_alt_names; + params.serial_number = Some(serial); + + DerX509( + Certificate::from_params(params) + .expect("should generate cert") + .serialize_der() + .expect("should serialize"), + ) + } + + struct MockCredentials { + tx: watch::Sender>, + } + + impl MockCredentials { + fn new() -> (Self, watch::Receiver>) { + let (tx, rx) = watch::channel(None); + (Self { tx }, rx) + } + } + + impl Credentials for MockCredentials { + fn set_certificate(&mut self, leaf: DerX509, _: Vec, _: Vec) -> Result<()> { + let (_, cert) = x509_parser::parse_x509_certificate(&leaf.0).unwrap(); + let serial = SerialNumber::from_slice(&cert.serial.to_bytes_be()); + self.tx.send(Some(serial)).unwrap(); + Ok(()) + } + } + + #[tokio::test(flavor = "current_thread")] + async fn valid_update() { + let serial = SerialNumber::from_slice("some-serial".as_bytes()); + let spiffe_san = "spiffe://some-domain/some-workload"; + let leaf = gen_cert(vec![SanType::URI(spiffe_san.into())], serial.clone()); + let spiffe_id = Id::parse_uri("spiffe://some-domain/some-workload").expect("should parse"); + + let (mut creds, mut rx) = MockCredentials::new(); + let svid = Svid { + spiffe_id: spiffe_id.clone(), + leaf, + private_key: Vec::default(), + intermediates: Vec::default(), + }; + let mut svids = HashMap::default(); + svids.insert(svid.spiffe_id.clone(), svid); + let update = SvidUpdate { svids }; + + assert!(process_svid(&mut creds, update, &spiffe_id).is_ok()); + rx.changed().await.unwrap(); + assert!(*rx.borrow_and_update() == Some(serial)); + } + + #[tokio::test(flavor = "current_thread")] + async fn invalid_update() { + let spiffe_id = Id::parse_uri("spiffe://some-domain/some-workload").expect("should parse"); + let (mut creds, mut rx) = MockCredentials::new(); + let svid = Svid { + spiffe_id: spiffe_id.clone(), + leaf: DerX509(Vec::default()), + private_key: Vec::default(), + intermediates: Vec::default(), + }; + let mut svids = HashMap::default(); + svids.insert(svid.spiffe_id.clone(), svid); + let update = SvidUpdate { svids }; + + assert!(process_svid(&mut creds, update, &spiffe_id).is_err()); + assert!(rx.borrow_and_update().is_none()); + } +} diff --git a/spiffe-proto/Cargo.toml b/spiffe-proto/Cargo.toml new file mode 100644 index 0000000000..9e1790e63c --- /dev/null +++ b/spiffe-proto/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "spiffe-proto" +version = "0.1.0" +authors = ["Linkerd Developers "] +license = "Apache-2.0" +edition = "2021" +publish = false + +[dependencies] +bytes = "1" +prost = "0.12" +prost-types = "0.12" + +[dependencies.tonic] +version = "0.10" +default-features = false +features = ["prost", "codegen"] + +[dev-dependencies.tonic-build] +version = "0.10" +default-features = false +features = ["prost"] + +[lib] +doctest = false diff --git a/spiffe-proto/spiffe/proto/workload.proto b/spiffe-proto/spiffe/proto/workload.proto new file mode 100644 index 0000000000..d0480f7ebd --- /dev/null +++ b/spiffe-proto/spiffe/proto/workload.proto @@ -0,0 +1,55 @@ +syntax = "proto3"; + +package spiffe.workloadapi; + +service SpiffeWorkloadAPI { + // Fetch X.509-SVIDs for all SPIFFE identities the workload is entitled to, + // as well as related information like trust bundles and CRLs. As this + // information changes, subsequent messages will be streamed from the + // server. + rpc FetchX509SVID(X509SVIDRequest) returns (stream X509SVIDResponse); +} + +// The X509SVIDRequest message conveys parameters for requesting an X.509-SVID. +// There are currently no request parameters. +message X509SVIDRequest { } + +// The X509SVIDResponse message carries X.509-SVIDs and related information, +// including a set of global CRLs and a list of bundles the workload may use +// for federating with foreign trust domains. +message X509SVIDResponse { + // Required. A list of X509SVID messages, each of which includes a single + // X.509-SVID, its private key, and the bundle for the trust domain. + repeated X509SVID svids = 1; + + // Optional. ASN.1 DER encoded certificate revocation lists. + repeated bytes crl = 2; + + // Optional. CA certificate bundles belonging to foreign trust domains that + // the workload should trust, keyed by the SPIFFE ID of the foreign trust + // domain. Bundles are ASN.1 DER encoded. + map federated_bundles = 3; +} + +// The X509SVID message carries a single SVID and all associated information, +// including the X.509 bundle for the trust domain. +message X509SVID { + // Required. The SPIFFE ID of the SVID in this entry + string spiffe_id = 1; + + // Required. ASN.1 DER encoded certificate chain. MAY include + // intermediates, the leaf certificate (or SVID itself) MUST come first. + bytes x509_svid = 2; + + // Required. ASN.1 DER encoded PKCS#8 private key. MUST be unencrypted. + bytes x509_svid_key = 3; + + // Required. ASN.1 DER encoded X.509 bundle for the trust domain. + bytes bundle = 4; + + // Optional. An operator-specified string used to provide guidance on how this + // identity should be used by a workload when more than one SVID is returned. + // For example, `internal` and `external` to indicate an SVID for internal or + // external use, respectively. + string hint = 5; +} diff --git a/spiffe-proto/src/gen/spiffe.workloadapi.rs b/spiffe-proto/src/gen/spiffe.workloadapi.rs new file mode 100644 index 0000000000..4a382cad06 --- /dev/null +++ b/spiffe-proto/src/gen/spiffe.workloadapi.rs @@ -0,0 +1,162 @@ +/// The X509SVIDRequest message conveys parameters for requesting an X.509-SVID. +/// There are currently no request parameters. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct X509svidRequest {} +/// The X509SVIDResponse message carries X.509-SVIDs and related information, +/// including a set of global CRLs and a list of bundles the workload may use +/// for federating with foreign trust domains. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct X509svidResponse { + /// Required. A list of X509SVID messages, each of which includes a single + /// X.509-SVID, its private key, and the bundle for the trust domain. + #[prost(message, repeated, tag = "1")] + pub svids: ::prost::alloc::vec::Vec, + /// Optional. ASN.1 DER encoded certificate revocation lists. + #[prost(bytes = "vec", repeated, tag = "2")] + pub crl: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, + /// Optional. CA certificate bundles belonging to foreign trust domains that + /// the workload should trust, keyed by the SPIFFE ID of the foreign trust + /// domain. Bundles are ASN.1 DER encoded. + #[prost(map = "string, bytes", tag = "3")] + pub federated_bundles: ::std::collections::HashMap< + ::prost::alloc::string::String, + ::prost::alloc::vec::Vec, + >, +} +/// The X509SVID message carries a single SVID and all associated information, +/// including the X.509 bundle for the trust domain. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct X509svid { + /// Required. The SPIFFE ID of the SVID in this entry + #[prost(string, tag = "1")] + pub spiffe_id: ::prost::alloc::string::String, + /// Required. ASN.1 DER encoded certificate chain. MAY include + /// intermediates, the leaf certificate (or SVID itself) MUST come first. + #[prost(bytes = "vec", tag = "2")] + pub x509_svid: ::prost::alloc::vec::Vec, + /// Required. ASN.1 DER encoded PKCS#8 private key. MUST be unencrypted. + #[prost(bytes = "vec", tag = "3")] + pub x509_svid_key: ::prost::alloc::vec::Vec, + /// Required. ASN.1 DER encoded X.509 bundle for the trust domain. + #[prost(bytes = "vec", tag = "4")] + pub bundle: ::prost::alloc::vec::Vec, + /// Optional. An operator-specified string used to provide guidance on how this + /// identity should be used by a workload when more than one SVID is returned. + /// For example, `internal` and `external` to indicate an SVID for internal or + /// external use, respectively. + #[prost(string, tag = "5")] + pub hint: ::prost::alloc::string::String, +} +/// Generated client implementations. +pub mod spiffe_workload_api_client { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct SpiffeWorkloadApiClient { + inner: tonic::client::Grpc, + } + impl SpiffeWorkloadApiClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> SpiffeWorkloadApiClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + Send + Sync, + { + SpiffeWorkloadApiClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + /// Fetch X.509-SVIDs for all SPIFFE identities the workload is entitled to, + /// as well as related information like trust bundles and CRLs. As this + /// information changes, subsequent messages will be streamed from the + /// server. + pub async fn fetch_x509svid( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/spiffe.workloadapi.SpiffeWorkloadAPI/FetchX509SVID", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "spiffe.workloadapi.SpiffeWorkloadAPI", + "FetchX509SVID", + ), + ); + self.inner.server_streaming(req, path, codec).await + } + } +} diff --git a/spiffe-proto/src/lib.rs b/spiffe-proto/src/lib.rs new file mode 100644 index 0000000000..1223fc17f0 --- /dev/null +++ b/spiffe-proto/src/lib.rs @@ -0,0 +1,11 @@ +//! gRPC bindings for SPIFFE workload api. +//! +//! Vendored from . + +#![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)] +#![allow(clippy::derive_partial_eq_without_eq)] +#![forbid(unsafe_code)] + +pub mod client { + include!("gen/spiffe.workloadapi.rs"); +} diff --git a/spiffe-proto/tests/bootstrap.rs b/spiffe-proto/tests/bootstrap.rs new file mode 100644 index 0000000000..498b3941fb --- /dev/null +++ b/spiffe-proto/tests/bootstrap.rs @@ -0,0 +1,47 @@ +//! A test that regenerates the Rust protobuf bindings. +//! +//! It can be run via: +//! +//! ```no_run +//! cargo test -p spiffe-proto --test=bootstrap +//! ``` + +/// Generates protobuf bindings into src/gen and fails if the generated files do +/// not match those that are already checked into git +#[test] +fn bootstrap() { + let out_dir = std::path::PathBuf::from(std::env!("CARGO_MANIFEST_DIR")) + .join("src") + .join("gen"); + generate(&out_dir); + if changed(&out_dir) { + panic!("protobuf interfaces do not match generated sources"); + } +} + +/// Generates protobuf bindings into the given directory +fn generate(out_dir: &std::path::Path) { + let iface_files = &["spiffe/proto/workload.proto"]; + if let Err(error) = tonic_build::configure() + .build_client(true) + .build_server(false) + .emit_rerun_if_changed(false) + .out_dir(out_dir) + .compile(iface_files, &["."]) + { + panic!("failed to compile protobuf: {error}") + } +} + +/// Returns true if the given path contains files that have changed since the +/// last Git commit +fn changed(path: &std::path::Path) -> bool { + let status = std::process::Command::new("git") + .arg("diff") + .arg("--exit-code") + .arg("--") + .arg(path) + .status() + .expect("failed to run git"); + !status.success() +} From 189c7770350cc16ebe30391e2e5c2f9d3ea0bd5c Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Thu, 21 Dec 2023 11:01:14 +0000 Subject: [PATCH 02/13] alter tests Signed-off-by: Zahari Dichev --- Cargo.lock | 1 + linkerd/proxy/spire-client/Cargo.toml | 1 + linkerd/proxy/spire-client/src/client.rs | 1 - linkerd/proxy/spire-client/src/lib.rs | 137 ++++++++++++++++++----- 4 files changed, 109 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 644b3b1600..23ec884397 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1893,6 +1893,7 @@ dependencies = [ "simple_asn1", "spiffe-proto", "tokio", + "tokio-test", "tonic", "tower", "tracing", diff --git a/linkerd/proxy/spire-client/Cargo.toml b/linkerd/proxy/spire-client/Cargo.toml index 83ca98ddf2..bc4ded5662 100644 --- a/linkerd/proxy/spire-client/Cargo.toml +++ b/linkerd/proxy/spire-client/Cargo.toml @@ -25,3 +25,4 @@ asn1 = { version = "0.6", package = "simple_asn1" } [dev-dependencies] rcgen = "0.11.3" +tokio-test = "0.4" diff --git a/linkerd/proxy/spire-client/src/client.rs b/linkerd/proxy/spire-client/src/client.rs index 25563e3ec5..d294515e3f 100644 --- a/linkerd/proxy/spire-client/src/client.rs +++ b/linkerd/proxy/spire-client/src/client.rs @@ -39,7 +39,6 @@ impl tower::Service<()> for Client { let socket = self.socket.clone(); let backoff = self.backoff; Box::pin(async move { - //spiffe::workload_api::client::WorkloadApiClient // Strip the 'unix:' prefix for tonic compatibility. let stripped_path = socket .strip_prefix(UNIX_PREFIX) diff --git a/linkerd/proxy/spire-client/src/lib.rs b/linkerd/proxy/spire-client/src/lib.rs index fcb55c0146..5b8ed96189 100644 --- a/linkerd/proxy/spire-client/src/lib.rs +++ b/linkerd/proxy/spire-client/src/lib.rs @@ -91,17 +91,61 @@ mod tests { use std::collections::HashMap; use tokio::sync::watch; - fn gen_cert(subject_alt_names: Vec, serial: SerialNumber) -> DerX509 { + fn gen_svid(id: Id, subject_alt_names: Vec, serial: SerialNumber) -> Svid { let mut params = CertificateParams::default(); params.subject_alt_names = subject_alt_names; params.serial_number = Some(serial); - DerX509( - Certificate::from_params(params) - .expect("should generate cert") - .serialize_der() - .expect("should serialize"), - ) + Svid { + spiffe_id: id, + leaf: DerX509( + Certificate::from_params(params) + .expect("should generate cert") + .serialize_der() + .expect("should serialize"), + ), + private_key: Vec::default(), + intermediates: Vec::default(), + } + } + + fn svid_update(svids: Vec) -> SvidUpdate { + let mut svids_map = HashMap::default(); + for svid in svids.into_iter() { + svids_map.insert(svid.spiffe_id.clone(), svid); + } + + SvidUpdate { svids: svids_map } + } + + // TODO: use a service_fn for this mock + struct MockNewClient { + rx: watch::Receiver, + } + + impl MockNewClient { + fn new(init: SvidUpdate) -> (Self, watch::Sender) { + let (tx, rx) = watch::channel(init); + (Self { rx }, tx) + } + } + + impl tower::Service<()> for MockNewClient { + type Response = watch::Receiver; + type Error = Error; + // type Future = futures::future::BoxFuture<'static, Result>; + type Future = futures::future::BoxFuture<'static, Result>; + + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + + fn call(&mut self, _req: ()) -> Self::Future { + Box::pin(futures::future::ready(Ok(self.rx.clone()))) + } } struct MockCredentials { @@ -125,43 +169,76 @@ mod tests { } #[tokio::test(flavor = "current_thread")] - async fn valid_update() { - let serial = SerialNumber::from_slice("some-serial".as_bytes()); + async fn valid_updates() { let spiffe_san = "spiffe://some-domain/some-workload"; - let leaf = gen_cert(vec![SanType::URI(spiffe_san.into())], serial.clone()); let spiffe_id = Id::parse_uri("spiffe://some-domain/some-workload").expect("should parse"); - let (mut creds, mut rx) = MockCredentials::new(); - let svid = Svid { - spiffe_id: spiffe_id.clone(), - leaf, - private_key: Vec::default(), - intermediates: Vec::default(), - }; - let mut svids = HashMap::default(); - svids.insert(svid.spiffe_id.clone(), svid); - let update = SvidUpdate { svids }; + let (creds, mut creds_rx) = MockCredentials::new(); + + let spire = Spire::new(spiffe_id.clone(), Metrics::default()); + + let serial_1 = SerialNumber::from_slice("some-serial-1".as_bytes()); + let update_1 = svid_update(vec![gen_svid( + spiffe_id.clone(), + vec![SanType::URI(spiffe_san.into())], + serial_1.clone(), + )]); + + let (client, svid_tx) = MockNewClient::new(update_1); + tokio::spawn(spire.run(creds, client)); + + creds_rx.changed().await.unwrap(); + assert!(*creds_rx.borrow_and_update() == Some(serial_1)); + + let serial_2 = SerialNumber::from_slice("some-serial-2".as_bytes()); + let update_2 = svid_update(vec![gen_svid( + spiffe_id.clone(), + vec![SanType::URI(spiffe_san.into())], + serial_2.clone(), + )]); + + svid_tx.send(update_2).expect("should send"); - assert!(process_svid(&mut creds, update, &spiffe_id).is_ok()); - rx.changed().await.unwrap(); - assert!(*rx.borrow_and_update() == Some(serial)); + creds_rx.changed().await.unwrap(); + assert!(*creds_rx.borrow_and_update() == Some(serial_2)); } #[tokio::test(flavor = "current_thread")] async fn invalid_update() { + let spiffe_san = "spiffe://some-domain/some-workload"; let spiffe_id = Id::parse_uri("spiffe://some-domain/some-workload").expect("should parse"); - let (mut creds, mut rx) = MockCredentials::new(); - let svid = Svid { + + let (creds, mut creds_rx) = MockCredentials::new(); + + let spire = Spire::new(spiffe_id.clone(), Metrics::default()); + + let serial_1 = SerialNumber::from_slice("some-serial-1".as_bytes()); + let update_1 = svid_update(vec![gen_svid( + spiffe_id.clone(), + vec![SanType::URI(spiffe_san.into())], + serial_1.clone(), + )]); + + let (client, svid_tx) = MockNewClient::new(update_1); + tokio::spawn(spire.run(creds, client)); + + creds_rx.changed().await.unwrap(); + assert!(*creds_rx.borrow_and_update() == Some(serial_1.clone())); + + let invalid_svid = Svid { spiffe_id: spiffe_id.clone(), leaf: DerX509(Vec::default()), private_key: Vec::default(), intermediates: Vec::default(), }; - let mut svids = HashMap::default(); - svids.insert(svid.spiffe_id.clone(), svid); - let update = SvidUpdate { svids }; - assert!(process_svid(&mut creds, update, &spiffe_id).is_err()); - assert!(rx.borrow_and_update().is_none()); + let mut update_sent = svid_tx.subscribe(); + let update_2 = svid_update(vec![invalid_svid]); + svid_tx.send(update_2).expect("should send"); + + update_sent.changed().await.unwrap(); + + assert!(!creds_rx.has_changed().unwrap()); + assert!(*creds_rx.borrow_and_update() == Some(serial_1)); } } From d1665b337a32ebecaf60391c164cc788ae13d96e Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Thu, 4 Jan 2024 12:08:46 +0000 Subject: [PATCH 03/13] move grpc client to app/src Signed-off-by: Zahari Dichev --- linkerd/app/core/src/lib.rs | 1 + linkerd/app/src/env.rs | 16 ++-- linkerd/app/src/identity.rs | 96 +++++++++++++------ linkerd/app/src/lib.rs | 7 +- .../src/client.rs => app/src/spire.rs} | 34 ++++--- linkerd/proxy/identity-client/src/certify.rs | 4 + linkerd/proxy/spire-client/src/api.rs | 87 ++++++++++++++--- linkerd/proxy/spire-client/src/lib.rs | 76 +++++++++++---- linkerd2-proxy/src/main.rs | 23 +++-- spiffe-proto/spiffe/proto/workload.proto | 6 -- spiffe-proto/src/gen/spiffe.workloadapi.rs | 6 -- 11 files changed, 251 insertions(+), 105 deletions(-) rename linkerd/{proxy/spire-client/src/client.rs => app/src/spire.rs} (67%) diff --git a/linkerd/app/core/src/lib.rs b/linkerd/app/core/src/lib.rs index dc907de0d1..cb880128a0 100644 --- a/linkerd/app/core/src/lib.rs +++ b/linkerd/app/core/src/lib.rs @@ -52,6 +52,7 @@ pub mod identity { pub use linkerd_meshtls::*; pub use linkerd_proxy_identity_client as client; pub use linkerd_proxy_identity_client_metrics as client_metrics; + pub use linkerd_proxy_spire_client as spire_client; } pub const CANONICAL_DST_HEADER: &str = "l5d-dst-canonical"; diff --git a/linkerd/app/src/env.rs b/linkerd/app/src/env.rs index 2303c52e47..9a95535a7c 100644 --- a/linkerd/app/src/env.rs +++ b/linkerd/app/src/env.rs @@ -793,13 +793,15 @@ pub fn parse_config(strings: &S) -> Result outbound.http_request_queue.failfast_timeout }; identity::Config { - certify, - control: ControlConfig { - addr, - connect, - buffer: QueueConfig { - capacity: DEFAULT_CONTROL_QUEUE_CAPACITY, - failfast_timeout, + provider: identity::Provider::ControlPlane { + certify, + control: ControlConfig { + addr, + connect, + buffer: QueueConfig { + capacity: DEFAULT_CONTROL_QUEUE_CAPACITY, + failfast_timeout, + }, }, }, params, diff --git a/linkerd/app/src/identity.rs b/linkerd/app/src/identity.rs index 9abde56cbf..772121bd27 100644 --- a/linkerd/app/src/identity.rs +++ b/linkerd/app/src/identity.rs @@ -1,5 +1,7 @@ +use crate::spire; pub use linkerd_app_core::identity::{ client::{certify, TokenSource}, + spire_client::Spire, Id, }; use linkerd_app_core::{ @@ -12,14 +14,23 @@ use linkerd_app_core::{ metrics::{prom, ControlHttp as ClientMetrics}, Error, Result, }; -use std::{future::Future, pin::Pin}; +use std::{future::Future, pin::Pin, sync::Arc}; use tokio::sync::watch; use tracing::Instrument; +#[derive(Clone, Debug)] +#[allow(clippy::large_enum_variant)] +pub enum Provider { + ControlPlane { + control: control::Config, + certify: certify::Config, + }, + Spire(spire::Config), +} + #[derive(Clone, Debug)] pub struct Config { - pub control: control::Config, - pub certify: certify::Config, + pub provider: Provider, pub params: TlsParams, } @@ -30,8 +41,14 @@ pub struct TlsParams { pub trust_anchors_pem: String, } +#[derive(Clone)] +pub enum Addr { + Spire(Arc), + Linkerd(control::ControlAddr), +} + pub struct Identity { - addr: control::ControlAddr, + addr: Addr, receiver: creds::Receiver, ready: watch::Receiver, metrics: IdentityMetrics, @@ -66,33 +83,52 @@ impl Config { &self.params.trust_anchors_pem, )?; - let certify = Certify::from(self.certify); - let metrics = certify.metrics(); - - let addr = self.control.addr.clone(); - + let metrics = IdentityMetrics::default(); let (tx, ready) = watch::channel(false); + let cred = NotifyReady { store, tx }; + + let identity = match self.provider { + Provider::ControlPlane { control, certify } => { + let certify = Certify::new(certify, metrics.clone()); + let addr = control.addr.clone(); + + let task = Box::pin({ + let addr = addr.clone(); + let svc = control.build(dns, client_metrics, registry, receiver.new_client()); + + certify.run(name, cred, svc).instrument( + tracing::debug_span!("identity", server.addr = %addr).or_current(), + ) + }); + Identity { + addr: Addr::Linkerd(addr), + receiver, + metrics, + ready, + task, + } + } + Provider::Spire(cfg) => { + let addr = cfg.socket_addr.clone(); + let spire = Spire::new(self.params.server_id.clone(), metrics.clone()); + let task = Box::pin({ + let client = spire::Client::from(cfg); + spire.run(cred, client).instrument( + tracing::debug_span!("spire identity", server.addr = %addr).or_current(), + ) + }); + + Identity { + addr: Addr::Spire(addr), + receiver, + metrics, + ready, + task, + } + } + }; - // Save to be spawned on an auxiliary runtime. - let task = Box::pin({ - let addr = addr.clone(); - let svc = self - .control - .build(dns, client_metrics, registry, receiver.new_client()); - - let cred = NotifyReady { store, tx }; - certify - .run(name, cred, svc) - .instrument(tracing::debug_span!("identity", server.addr = %addr).or_current()) - }); - - Ok(Identity { - addr, - receiver, - metrics, - ready, - task, - }) + Ok(identity) } } @@ -107,7 +143,7 @@ impl Credentials for NotifyReady { // === impl Identity === impl Identity { - pub fn addr(&self) -> control::ControlAddr { + pub fn addr(&self) -> Addr { self.addr.clone() } diff --git a/linkerd/app/src/lib.rs b/linkerd/app/src/lib.rs index 7b91fd8a82..39e89f2fef 100644 --- a/linkerd/app/src/lib.rs +++ b/linkerd/app/src/lib.rs @@ -9,6 +9,7 @@ pub mod env; pub mod identity; pub mod oc_collector; pub mod policy; +pub mod spire; pub mod tap; pub use self::metrics::Metrics; @@ -341,7 +342,11 @@ impl App { self.identity.receiver().server_name().clone() } - pub fn identity_addr(&self) -> ControlAddr { + pub fn local_tls_id(&self) -> identity::Id { + self.identity.receiver().local_id().clone() + } + + pub fn identity_addr(&self) -> identity::Addr { self.identity.addr() } diff --git a/linkerd/proxy/spire-client/src/client.rs b/linkerd/app/src/spire.rs similarity index 67% rename from linkerd/proxy/spire-client/src/client.rs rename to linkerd/app/src/spire.rs index d294515e3f..a4653aa192 100644 --- a/linkerd/proxy/spire-client/src/client.rs +++ b/linkerd/app/src/spire.rs @@ -1,6 +1,5 @@ -use crate::api::{Api, SvidUpdate}; -use linkerd_error::Error; -use linkerd_exp_backoff::ExponentialBackoff; +pub use linkerd_app_core::identity::spire_client; +use linkerd_app_core::{exp_backoff::ExponentialBackoff, Error}; use std::sync::Arc; use tokio::net::UnixStream; use tokio::sync::watch; @@ -10,21 +9,26 @@ const UNIX_PREFIX: &str = "unix:"; const TONIC_DEFAULT_URI: &str = "http://[::]:50051"; #[derive(Clone, Debug)] -pub struct Client { - socket: Arc, - backoff: ExponentialBackoff, +pub struct Config { + pub(crate) socket_addr: Arc, + pub(crate) backoff: ExponentialBackoff, } -impl Client { - pub fn new(socket: Arc, backoff: ExponentialBackoff) -> Self { - Self { socket, backoff } - } +// Connects to SPIRE workload API via Unix Domain Socket +pub struct Client { + config: Config, } // === impl Client === +impl From for Client { + fn from(config: Config) -> Self { + Self { config } + } +} + impl tower::Service<()> for Client { - type Response = watch::Receiver; + type Response = tonic::Response>; type Error = Error; type Future = futures::future::BoxFuture<'static, Result>; @@ -36,8 +40,8 @@ impl tower::Service<()> for Client { } fn call(&mut self, _req: ()) -> Self::Future { - let socket = self.socket.clone(); - let backoff = self.backoff; + let socket = self.config.socket_addr.clone(); + let backoff = self.config.backoff; Box::pin(async move { // Strip the 'unix:' prefix for tonic compatibility. let stripped_path = socket @@ -54,8 +58,8 @@ impl tower::Service<()> for Client { })) .await?; - let api = Api::watch(chan, backoff); - let receiver = api.spawn_watch(()).await?.into_inner(); + let api = spire_client::Api::watch(chan, backoff); + let receiver = api.spawn_watch(()).await?; Ok(receiver) }) diff --git a/linkerd/proxy/identity-client/src/certify.rs b/linkerd/proxy/identity-client/src/certify.rs index 20710671d4..1fc32f3391 100644 --- a/linkerd/proxy/identity-client/src/certify.rs +++ b/linkerd/proxy/identity-client/src/certify.rs @@ -93,6 +93,10 @@ impl From for Certify { } impl Certify { + pub fn new(config: Config, metrics: Metrics) -> Self { + Self { config, metrics } + } + pub fn metrics(&self) -> Metrics { self.metrics.clone() } diff --git a/linkerd/proxy/spire-client/src/api.rs b/linkerd/proxy/spire-client/src/api.rs index c199996b48..d14db9a13c 100644 --- a/linkerd/proxy/spire-client/src/api.rs +++ b/linkerd/proxy/spire-client/src/api.rs @@ -12,34 +12,41 @@ use std::collections::HashMap; use tower::Service; use tracing::error; -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct Svid { - pub spiffe_id: Id, - pub leaf: DerX509, - pub private_key: Vec, - pub intermediates: Vec, + pub(super) spiffe_id: Id, + pub(super) leaf: DerX509, + pub(super) private_key: Vec, + pub(super) intermediates: Vec, } -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct SvidUpdate { - pub svids: HashMap, + pub(super) svids: HashMap, } #[derive(Clone, Debug)] -pub(crate) struct Api { +pub struct Api { client: Client, } #[derive(Clone)] -pub(crate) struct GrpcRecover(ExponentialBackoff); +pub struct GrpcRecover(ExponentialBackoff); -pub(crate) type Watch = StreamWatch>; +pub type Watch = StreamWatch>; // === impl Svid === impl TryFrom for Svid { + // TODO: Use bundles from response to compare against + // what is provided at bootstrap time + type Error = Error; fn try_from(proto: api::X509svid) -> Result { + if proto.x509_svid_key.is_empty() { + return Err("empty private key".into()); + } + let cert_der_blocks = asn1::from_der(&proto.x509_svid)?; let (leaf, intermediates) = match cert_der_blocks.split_first() { None => return Err("empty cert chain".into()), @@ -74,7 +81,7 @@ where S::ResponseBody: Default + http::HttpBody + Send + 'static, ::Error: Into + Send, { - pub(super) fn watch(client: S, backoff: ExponentialBackoff) -> Watch { + pub fn watch(client: S, backoff: ExponentialBackoff) -> Watch { let client = Client::new(client); StreamWatch::new(GrpcRecover(backoff), Self { client }) } @@ -135,17 +142,67 @@ impl Recover for GrpcRecover { type Backoff = ExponentialBackoffStream; fn recover(&self, status: tonic::Status) -> Result { - if status.code() == tonic::Code::InvalidArgument - || status.code() == tonic::Code::FailedPrecondition - { + // Non retriable conditions described in: + // https://github.com/spiffe/spiffe/blob/a5b6456ff1bcdb6935f61ed7f83e8ee533a325a3/standards/SPIFFE_Workload_API.md#client-state-machine + if status.code() == tonic::Code::InvalidArgument { return Err(status); } tracing::warn!( grpc.status = %status.code(), grpc.message = status.message(), - "Unexpected policy SPIRE Workload API response; retrying with a backoff", + "Unexpected SPIRE Workload API response; retrying with a backoff", ); Ok(self.0.stream()) } } + +#[cfg(test)] +mod tests { + use crate::api::Svid; + use rcgen::{Certificate, CertificateParams, SanType}; + use spiffe_proto::client as api; + + fn gen_svid_pb(id: String, subject_alt_names: Vec) -> api::X509svid { + let mut params = CertificateParams::default(); + params.subject_alt_names = subject_alt_names; + let cert = Certificate::from_params(params).expect("should generate cert"); + + api::X509svid { + spiffe_id: id, + x509_svid: cert.serialize_der().expect("should serialize"), + x509_svid_key: cert.serialize_private_key_der(), + bundle: Vec::default(), + } + } + + #[test] + fn can_parse_valid_proto() { + let id = "spiffe://some-domain/some-workload"; + let svid_pb = gen_svid_pb(id.into(), vec![SanType::URI(id.into())]); + assert!(Svid::try_from(svid_pb).is_ok()); + } + + #[test] + fn cannot_parse_non_spiffe_id() { + let id = "some-domain.some-workload"; + let svid_pb = gen_svid_pb(id.into(), vec![SanType::DnsName(id.into())]); + assert!(Svid::try_from(svid_pb).is_err()); + } + + #[test] + fn cannot_parse_empty_cert() { + let id = "spiffe://some-domain/some-workload"; + let mut svid_pb = gen_svid_pb(id.into(), vec![SanType::URI(id.into())]); + svid_pb.x509_svid = Vec::default(); + assert!(Svid::try_from(svid_pb).is_err()); + } + + #[test] + fn cannot_parse_empty_key() { + let id = "spiffe://some-domain/some-workload"; + let mut svid_pb = gen_svid_pb(id.into(), vec![SanType::URI(id.into())]); + svid_pb.x509_svid_key = Vec::default(); + assert!(Svid::try_from(svid_pb).is_err()); + } +} diff --git a/linkerd/proxy/spire-client/src/lib.rs b/linkerd/proxy/spire-client/src/lib.rs index 5b8ed96189..bd43a63f9a 100644 --- a/linkerd/proxy/spire-client/src/lib.rs +++ b/linkerd/proxy/spire-client/src/lib.rs @@ -2,11 +2,8 @@ #![forbid(unsafe_code)] mod api; -mod client; -pub use self::client::Client; - -use api::SvidUpdate; +pub use api::{Api, SvidUpdate}; use linkerd_error::{Error, Result}; use linkerd_identity::Credentials; use linkerd_identity::Id; @@ -28,14 +25,16 @@ impl Spire { Self { id, metrics } } - pub async fn run(self, credentials: C, mut new_client: N) + pub async fn run(self, credentials: C, mut client: S) where C: Credentials, - N: Service<(), Response = watch::Receiver>, - N::Error: Into, + S: Service<(), Response = tonic::Response>>, + S::Error: Into, { - match new_client.call(()).await { - Ok(rx) => consume_updates(&self.id, rx, credentials, &self.metrics).await, + match client.call(()).await { + Ok(rsp) => { + consume_updates(&self.id, rsp.into_inner(), credentials, &self.metrics).await + } Err(error) => error!("could not establish SVID stream: {}", error.into()), } } @@ -118,20 +117,19 @@ mod tests { SvidUpdate { svids: svids_map } } - // TODO: use a service_fn for this mock - struct MockNewClient { + struct MockClient { rx: watch::Receiver, } - impl MockNewClient { + impl MockClient { fn new(init: SvidUpdate) -> (Self, watch::Sender) { let (tx, rx) = watch::channel(init); (Self { rx }, tx) } } - impl tower::Service<()> for MockNewClient { - type Response = watch::Receiver; + impl tower::Service<()> for MockClient { + type Response = tonic::Response>; type Error = Error; // type Future = futures::future::BoxFuture<'static, Result>; type Future = futures::future::BoxFuture<'static, Result>; @@ -144,7 +142,8 @@ mod tests { } fn call(&mut self, _req: ()) -> Self::Future { - Box::pin(futures::future::ready(Ok(self.rx.clone()))) + let rsp = tonic::Response::new(self.rx.clone()); + Box::pin(futures::future::ready(Ok(rsp))) } } @@ -184,7 +183,7 @@ mod tests { serial_1.clone(), )]); - let (client, svid_tx) = MockNewClient::new(update_1); + let (client, svid_tx) = MockClient::new(update_1); tokio::spawn(spire.run(creds, client)); creds_rx.changed().await.unwrap(); @@ -204,7 +203,7 @@ mod tests { } #[tokio::test(flavor = "current_thread")] - async fn invalid_update() { + async fn invalid_update_empty_cert() { let spiffe_san = "spiffe://some-domain/some-workload"; let spiffe_id = Id::parse_uri("spiffe://some-domain/some-workload").expect("should parse"); @@ -219,7 +218,7 @@ mod tests { serial_1.clone(), )]); - let (client, svid_tx) = MockNewClient::new(update_1); + let (client, svid_tx) = MockClient::new(update_1); tokio::spawn(spire.run(creds, client)); creds_rx.changed().await.unwrap(); @@ -241,4 +240,45 @@ mod tests { assert!(!creds_rx.has_changed().unwrap()); assert!(*creds_rx.borrow_and_update() == Some(serial_1)); } + + #[tokio::test(flavor = "current_thread")] + async fn invalid_valid_update_non_matching_id() { + let spiffe_san = "spiffe://some-domain/some-workload"; + let spiffe_san_wrong = "spiffe://some-domain/wrong"; + + let spiffe_id = Id::parse_uri("spiffe://some-domain/some-workload").expect("should parse"); + let spiffe_id_wrong = Id::parse_uri("spiffe://some-domain/wrong").expect("should parse"); + + let (creds, mut creds_rx) = MockCredentials::new(); + + let spire = Spire::new(spiffe_id.clone(), Metrics::default()); + + let serial_1 = SerialNumber::from_slice("some-serial-1".as_bytes()); + let update_1 = svid_update(vec![gen_svid( + spiffe_id.clone(), + vec![SanType::URI(spiffe_san.into())], + serial_1.clone(), + )]); + + let (client, svid_tx) = MockClient::new(update_1); + tokio::spawn(spire.run(creds, client)); + + creds_rx.changed().await.unwrap(); + assert!(*creds_rx.borrow_and_update() == Some(serial_1.clone())); + + let serial_2 = SerialNumber::from_slice("some-serial-2".as_bytes()); + let mut update_sent = svid_tx.subscribe(); + let update_2 = svid_update(vec![gen_svid( + spiffe_id_wrong, + vec![SanType::URI(spiffe_san_wrong.into())], + serial_2.clone(), + )]); + + svid_tx.send(update_2).expect("should send"); + + update_sent.changed().await.unwrap(); + + assert!(!creds_rx.has_changed().unwrap()); + assert!(*creds_rx.borrow_and_update() == Some(serial_1)); + } } diff --git a/linkerd2-proxy/src/main.rs b/linkerd2-proxy/src/main.rs index ea3836829e..d13755721b 100644 --- a/linkerd2-proxy/src/main.rs +++ b/linkerd2-proxy/src/main.rs @@ -11,7 +11,7 @@ compile_error!( "at least one of the following TLS implementations must be enabled: 'meshtls-boring', 'meshtls-rustls'" ); -use linkerd_app::{trace, BindTcp, Config, BUILD_INFO}; +use linkerd_app::{identity, trace, BindTcp, Config, BUILD_INFO}; use linkerd_signal as signal; use tokio::{sync::mpsc, time}; use tracing::{debug, info, warn}; @@ -80,12 +80,21 @@ fn main() { } // TODO distinguish ServerName and Identity. - info!("Local identity is {}", app.local_server_name()); - let addr = app.identity_addr(); - match addr.identity.value() { - None => info!("Identity verified via {}", addr.addr), - Some(tls) => { - info!("Identity verified via {} ({})", addr.addr, tls.server_id); + info!("SNI is {}", app.local_server_name()); + info!("Local identity is {}", app.local_tls_id()); + + match app.identity_addr() { + identity::Addr::Linkerd(cp_addr) => match cp_addr.identity.value() { + None => info!("Identity verified via {}", cp_addr.addr), + Some(tls) => { + info!( + "Identity verified via Control Plane {} ({})", + cp_addr.addr, tls.server_id + ); + } + }, + identity::Addr::Spire(spire_addr) => { + info!("Identity obtained via Spire {}", spire_addr); } } diff --git a/spiffe-proto/spiffe/proto/workload.proto b/spiffe-proto/spiffe/proto/workload.proto index d0480f7ebd..75a4b0922f 100644 --- a/spiffe-proto/spiffe/proto/workload.proto +++ b/spiffe-proto/spiffe/proto/workload.proto @@ -46,10 +46,4 @@ message X509SVID { // Required. ASN.1 DER encoded X.509 bundle for the trust domain. bytes bundle = 4; - - // Optional. An operator-specified string used to provide guidance on how this - // identity should be used by a workload when more than one SVID is returned. - // For example, `internal` and `external` to indicate an SVID for internal or - // external use, respectively. - string hint = 5; } diff --git a/spiffe-proto/src/gen/spiffe.workloadapi.rs b/spiffe-proto/src/gen/spiffe.workloadapi.rs index 4a382cad06..352c1a7c8a 100644 --- a/spiffe-proto/src/gen/spiffe.workloadapi.rs +++ b/spiffe-proto/src/gen/spiffe.workloadapi.rs @@ -43,12 +43,6 @@ pub struct X509svid { /// Required. ASN.1 DER encoded X.509 bundle for the trust domain. #[prost(bytes = "vec", tag = "4")] pub bundle: ::prost::alloc::vec::Vec, - /// Optional. An operator-specified string used to provide guidance on how this - /// identity should be used by a workload when more than one SVID is returned. - /// For example, `internal` and `external` to indicate an SVID for internal or - /// external use, respectively. - #[prost(string, tag = "5")] - pub hint: ::prost::alloc::string::String, } /// Generated client implementations. pub mod spiffe_workload_api_client { From ba38986e3543b5855f26ebc1004e2aed406ab079 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Thu, 4 Jan 2024 13:11:17 +0000 Subject: [PATCH 04/13] address nits Signed-off-by: Zahari Dichev --- linkerd/proxy/spire-client/src/api.rs | 2 +- linkerd/proxy/spire-client/src/lib.rs | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/linkerd/proxy/spire-client/src/api.rs b/linkerd/proxy/spire-client/src/api.rs index d14db9a13c..cbe2e3d692 100644 --- a/linkerd/proxy/spire-client/src/api.rs +++ b/linkerd/proxy/spire-client/src/api.rs @@ -65,7 +65,7 @@ impl TryFrom for Svid { Ok(Svid { spiffe_id, - leaf: leaf.clone(), + leaf, private_key: proto.x509_svid_key, intermediates: intermediates.to_vec(), }) diff --git a/linkerd/proxy/spire-client/src/lib.rs b/linkerd/proxy/spire-client/src/lib.rs index bd43a63f9a..1744340640 100644 --- a/linkerd/proxy/spire-client/src/lib.rs +++ b/linkerd/proxy/spire-client/src/lib.rs @@ -8,6 +8,7 @@ use linkerd_error::{Error, Result}; use linkerd_identity::Credentials; use linkerd_identity::Id; use linkerd_proxy_identity_client_metrics::Metrics; +use std::fmt::Display; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::sync::watch; use tower::Service; @@ -29,13 +30,13 @@ impl Spire { where C: Credentials, S: Service<(), Response = tonic::Response>>, - S::Error: Into, + S::Error: Into + Display, { match client.call(()).await { Ok(rsp) => { consume_updates(&self.id, rsp.into_inner(), credentials, &self.metrics).await } - Err(error) => error!("could not establish SVID stream: {}", error.into()), + Err(error) => error!(%error, "could not establish SVID stream"), } } } From c9bdbb78934037cc2dbe1a9318ebe29243c56c36 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Tue, 9 Jan 2024 14:28:37 +0000 Subject: [PATCH 05/13] feedback from @olix0r Signed-off-by: Zahari Dichev --- linkerd/app/core/src/lib.rs | 6 +- linkerd/app/src/env.rs | 47 ++++---- linkerd/app/src/identity.rs | 133 ++++++++++++----------- linkerd/app/src/lib.rs | 4 - linkerd/app/src/spire.rs | 7 +- linkerd/proxy/identity-client/src/lib.rs | 5 +- linkerd2-proxy/src/main.rs | 17 +-- 7 files changed, 108 insertions(+), 111 deletions(-) diff --git a/linkerd/app/core/src/lib.rs b/linkerd/app/core/src/lib.rs index 87f7002394..668e04de9c 100644 --- a/linkerd/app/core/src/lib.rs +++ b/linkerd/app/core/src/lib.rs @@ -50,8 +50,10 @@ pub use linkerd_transport_header as transport_header; pub mod identity { pub use linkerd_identity::*; pub use linkerd_meshtls::*; - pub use linkerd_proxy_identity_client as client; - pub use linkerd_proxy_spire_client as spire_client; + pub mod client { + pub use linkerd_proxy_identity_client as linkerd; + pub use linkerd_proxy_spire_client as spire; + } } pub const CANONICAL_DST_HEADER: &str = "l5d-dst-canonical"; diff --git a/linkerd/app/src/env.rs b/linkerd/app/src/env.rs index 271869c59c..f896ef0cd3 100644 --- a/linkerd/app/src/env.rs +++ b/linkerd/app/src/env.rs @@ -800,7 +800,7 @@ pub fn parse_config(strings: &S) -> Result .unwrap_or(super::tap::Config::Disabled); let identity = { - let (addr, certify, params) = identity_config?; + let (addr, certify, tls) = identity_config?; // If the address doesn't have a server identity, then we're on localhost. let connect = if addr.addr.is_loopback() { inbound.proxy.connect.clone() @@ -812,19 +812,17 @@ pub fn parse_config(strings: &S) -> Result } else { outbound.http_request_queue.failfast_timeout }; - identity::Config { - provider: identity::Provider::ControlPlane { - certify, - control: ControlConfig { - addr, - connect, - buffer: QueueConfig { - capacity: DEFAULT_CONTROL_QUEUE_CAPACITY, - failfast_timeout, - }, + identity::Config::Linkerd { + certify, + tls, + client: ControlConfig { + addr, + connect, + buffer: QueueConfig { + capacity: DEFAULT_CONTROL_QUEUE_CAPACITY, + failfast_timeout, }, }, - params, } }; @@ -1224,7 +1222,14 @@ pub fn parse_control_addr( pub fn parse_identity_config( strings: &S, -) -> Result<(ControlAddr, identity::certify::Config, identity::TlsParams), EnvError> { +) -> Result< + ( + ControlAddr, + identity::client::linkerd::Config, + identity::TlsParams, + ), + EnvError, +> { let control = parse_control_addr(strings, ENV_IDENTITY_SVC_BASE); let ta = parse(strings, ENV_IDENTITY_TRUST_ANCHORS, |s| { if s.is_empty() { @@ -1234,7 +1239,7 @@ pub fn parse_identity_config( }); let dir = parse(strings, ENV_IDENTITY_DIR, |ref s| Ok(PathBuf::from(s))); let tok = parse(strings, ENV_IDENTITY_TOKEN_FILE, |ref s| { - identity::TokenSource::if_nonempty_file(s.to_string()).map_err(|e| { + identity::client::linkerd::TokenSource::if_nonempty_file(s.to_string()).map_err(|e| { error!("Could not read {}: {}", ENV_IDENTITY_TOKEN_FILE, e); ParseError::InvalidTokenSource }) @@ -1265,17 +1270,19 @@ pub fn parse_identity_config( min_refresh, max_refresh, ) => { - let certify = identity::certify::Config { + let certify = identity::client::linkerd::Config { token, min_refresh: min_refresh.unwrap_or(DEFAULT_IDENTITY_MIN_REFRESH), max_refresh: max_refresh.unwrap_or(DEFAULT_IDENTITY_MAX_REFRESH), - documents: identity::certify::Documents::load(dir).map_err(|error| { - error!(%error, "Failed to read identity documents"); - EnvError::InvalidEnvVar - })?, + documents: identity::client::linkerd::certify::Documents::load(dir).map_err( + |error| { + error!(%error, "Failed to read identity documents"); + EnvError::InvalidEnvVar + }, + )?, }; let params = identity::TlsParams { - server_id: identity::Id::Dns(local_name.clone()), + id: identity::Id::Dns(local_name.clone()), server_name: local_name, trust_anchors_pem, }; diff --git a/linkerd/app/src/identity.rs b/linkerd/app/src/identity.rs index 4f3863d5bd..9ec2eafd9b 100644 --- a/linkerd/app/src/identity.rs +++ b/linkerd/app/src/identity.rs @@ -1,51 +1,41 @@ use crate::spire; -pub use linkerd_app_core::identity::{ - client::{certify, TokenSource}, - spire_client::Spire, - Id, -}; + +pub use linkerd_app_core::identity::{client, Id}; use linkerd_app_core::{ control, dns, exp_backoff::{ExponentialBackoff, ExponentialBackoffStream}, - identity::{client::Certify, creds, CertMetrics, Credentials, DerX509, Mode, WithCertMetrics}, + identity::{ + client::linkerd::Certify, creds, CertMetrics, Credentials, DerX509, Mode, WithCertMetrics, + }, metrics::{prom, ControlHttp as ClientMetrics}, Error, Result, }; -use std::{future::Future, pin::Pin, sync::Arc, time::SystemTime}; +use std::{future::Future, pin::Pin, time::SystemTime}; use tokio::sync::watch; use tracing::Instrument; #[derive(Clone, Debug)] #[allow(clippy::large_enum_variant)] -pub enum Provider { - ControlPlane { - control: control::Config, - certify: certify::Config, +pub enum Config { + Linkerd { + client: control::Config, + certify: client::linkerd::Config, + tls: TlsParams, + }, + Spire { + client: spire::Config, + tls: TlsParams, }, - Spire(spire::Config), -} - -#[derive(Clone, Debug)] -pub struct Config { - pub provider: Provider, - pub params: TlsParams, } #[derive(Clone, Debug)] pub struct TlsParams { - pub server_id: Id, + pub id: Id, pub server_name: dns::Name, pub trust_anchors_pem: String, } -#[derive(Clone)] -pub enum Addr { - Spire(Arc), - Linkerd(control::ControlAddr), -} - pub struct Identity { - addr: Addr, receiver: creds::Receiver, ready: watch::Receiver, task: Task, @@ -72,68 +62,85 @@ impl Config { client_metrics: ClientMetrics, registry: &mut prom::Registry, ) -> Result { - let name = self.params.server_name.clone(); - let (store, receiver) = Mode::default().watch( - name.clone().into(), - name.clone(), - &self.params.trust_anchors_pem, - )?; - - let (tx, ready) = watch::channel(false); let cert_metrics = CertMetrics::register(registry.sub_registry_with_prefix("identity_cert")); - let cred = WithCertMetrics::new(cert_metrics, NotifyReady { store, tx }); - let identity = match self.provider { - Provider::ControlPlane { control, certify } => { - let certify = Certify::from(certify); - let addr = control.addr.clone(); + Ok(match self { + Self::Linkerd { + client, + certify, + tls, + } => { + // TODO: move this validation into env.rs + let name = match (&tls.id, &tls.server_name) { + (Id::Dns(id), sni) if id == sni => id.clone(), + (_id, _sni) => { + return Err( + "Linkerd identity requires a TLS Id and server name to be the same" + .into(), + ); + } + }; - let task = Box::pin({ - let addr = addr.clone(); + let certify = Certify::from(certify); + let (store, receiver, ready) = watch(tls, cert_metrics)?; - let svc = control.build( + let task = { + let addr = client.addr.clone(); + let svc = client.build( dns, client_metrics, registry.sub_registry_with_prefix("control_identity"), receiver.new_client(), ); - certify.run(name, cred, svc).instrument( - tracing::debug_span!("identity", server.addr = %addr).or_current(), - ) - }); - + Box::pin(certify.run(name, store, svc).instrument( + tracing::info_span!("identity", server.addr = %addr).or_current(), + )) + }; Identity { - addr: Addr::Linkerd(addr), receiver, ready, task, } } - Provider::Spire(cfg) => { - let addr = cfg.socket_addr.clone(); - let spire = Spire::new(self.params.server_id.clone()); - let task = Box::pin({ - let client = spire::Client::from(cfg); - spire.run(cred, client).instrument( - tracing::debug_span!("spire identity", server.addr = %addr).or_current(), - ) - }); + Self::Spire { client, tls } => { + let addr = client.socket_addr.clone(); + let spire = spire::client::Spire::new(tls.id.clone()); + + let (store, receiver, ready) = watch(tls, cert_metrics)?; + let task = + Box::pin(spire.run(store, spire::Client::from(client)).instrument( + tracing::info_span!("spire", server.addr = %addr).or_current(), + )); Identity { - addr: Addr::Spire(addr), receiver, ready, task, } } - }; - - Ok(identity) + }) } } +fn watch( + tls: TlsParams, + metrics: CertMetrics, +) -> Result<( + WithCertMetrics, + creds::Receiver, + watch::Receiver, +)> { + let (tx, ready) = watch::channel(false); + let (store, receiver) = + Mode::default().watch(tls.id, tls.server_name, &tls.trust_anchors_pem)?; + let cred = WithCertMetrics::new(metrics, NotifyReady { store, tx }); + Ok((cred, receiver, ready)) +} + +// === impl NotifyReady === + impl Credentials for NotifyReady { fn set_certificate( &mut self, @@ -151,10 +158,6 @@ impl Credentials for NotifyReady { // === impl Identity === impl Identity { - pub fn addr(&self) -> Addr { - self.addr.clone() - } - /// Returns a future that is satisfied once certificates have been provisioned. pub fn ready(&self) -> Pin + Send + 'static>> { let mut ready = self.ready.clone(); diff --git a/linkerd/app/src/lib.rs b/linkerd/app/src/lib.rs index 83d639d60e..42413772de 100644 --- a/linkerd/app/src/lib.rs +++ b/linkerd/app/src/lib.rs @@ -345,10 +345,6 @@ impl App { self.identity.receiver().local_id().clone() } - pub fn identity_addr(&self) -> identity::Addr { - self.identity.addr() - } - pub fn opencensus_addr(&self) -> Option<&ControlAddr> { match self.oc_collector { oc_collector::OcCollector::Disabled { .. } => None, diff --git a/linkerd/app/src/spire.rs b/linkerd/app/src/spire.rs index a4653aa192..f1f449c2e6 100644 --- a/linkerd/app/src/spire.rs +++ b/linkerd/app/src/spire.rs @@ -1,10 +1,11 @@ -pub use linkerd_app_core::identity::spire_client; use linkerd_app_core::{exp_backoff::ExponentialBackoff, Error}; use std::sync::Arc; use tokio::net::UnixStream; use tokio::sync::watch; use tonic::transport::{Endpoint, Uri}; +pub use linkerd_app_core::identity::client::spire as client; + const UNIX_PREFIX: &str = "unix:"; const TONIC_DEFAULT_URI: &str = "http://[::]:50051"; @@ -28,7 +29,7 @@ impl From for Client { } impl tower::Service<()> for Client { - type Response = tonic::Response>; + type Response = tonic::Response>; type Error = Error; type Future = futures::future::BoxFuture<'static, Result>; @@ -58,7 +59,7 @@ impl tower::Service<()> for Client { })) .await?; - let api = spire_client::Api::watch(chan, backoff); + let api = client::Api::watch(chan, backoff); let receiver = api.spawn_watch(()).await?; Ok(receiver) diff --git a/linkerd/proxy/identity-client/src/lib.rs b/linkerd/proxy/identity-client/src/lib.rs index b2dfce139a..c1c95ac1af 100644 --- a/linkerd/proxy/identity-client/src/lib.rs +++ b/linkerd/proxy/identity-client/src/lib.rs @@ -4,4 +4,7 @@ pub mod certify; mod token; -pub use self::{certify::Certify, token::TokenSource}; +pub use self::{ + certify::{Certify, Config}, + token::TokenSource, +}; diff --git a/linkerd2-proxy/src/main.rs b/linkerd2-proxy/src/main.rs index d13755721b..51c60b0b8d 100644 --- a/linkerd2-proxy/src/main.rs +++ b/linkerd2-proxy/src/main.rs @@ -11,7 +11,7 @@ compile_error!( "at least one of the following TLS implementations must be enabled: 'meshtls-boring', 'meshtls-rustls'" ); -use linkerd_app::{identity, trace, BindTcp, Config, BUILD_INFO}; +use linkerd_app::{trace, BindTcp, Config, BUILD_INFO}; use linkerd_signal as signal; use tokio::{sync::mpsc, time}; use tracing::{debug, info, warn}; @@ -83,21 +83,6 @@ fn main() { info!("SNI is {}", app.local_server_name()); info!("Local identity is {}", app.local_tls_id()); - match app.identity_addr() { - identity::Addr::Linkerd(cp_addr) => match cp_addr.identity.value() { - None => info!("Identity verified via {}", cp_addr.addr), - Some(tls) => { - info!( - "Identity verified via Control Plane {} ({})", - cp_addr.addr, tls.server_id - ); - } - }, - identity::Addr::Spire(spire_addr) => { - info!("Identity obtained via Spire {}", spire_addr); - } - } - let dst_addr = app.dst_addr(); match dst_addr.identity.value() { None => info!("Destinations resolved via {}", dst_addr.addr), From f3f0469f403f12163b2d26c459c17e5dbcd6a194 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Tue, 9 Jan 2024 15:28:42 +0000 Subject: [PATCH 06/13] add test to verify that creds propagation fails when tls is Store is configured with wrong identity Signed-off-by: Zahari Dichev --- Cargo.lock | 1 + linkerd/meshtls/Cargo.toml | 1 + linkerd/meshtls/tests/boring.rs | 5 ++++ linkerd/meshtls/tests/rustls.rs | 5 ++++ linkerd/meshtls/tests/util.rs | 44 ++++++++++++++++++++++++++++++++- 5 files changed, 55 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index f0d883e700..cf1cbda245 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1594,6 +1594,7 @@ dependencies = [ "linkerd-tls-test-util", "linkerd-tracing", "pin-project", + "rcgen", "tokio", "tracing", ] diff --git a/linkerd/meshtls/Cargo.toml b/linkerd/meshtls/Cargo.toml index a087bd1841..41b7495324 100644 --- a/linkerd/meshtls/Cargo.toml +++ b/linkerd/meshtls/Cargo.toml @@ -29,6 +29,7 @@ linkerd-tls = { path = "../tls" } [dev-dependencies] tokio = { version = "1", features = ["macros", "net", "rt-multi-thread"] } tracing = "0.1" +rcgen = "0.11.3" linkerd-conditional = { path = "../conditional" } linkerd-proxy-transport = { path = "../proxy/transport" } diff --git a/linkerd/meshtls/tests/boring.rs b/linkerd/meshtls/tests/boring.rs index 839e3dc87a..0aecc52ae9 100644 --- a/linkerd/meshtls/tests/boring.rs +++ b/linkerd/meshtls/tests/boring.rs @@ -6,6 +6,11 @@ mod util; use linkerd_meshtls::Mode; +#[test] +fn fails_processing_cert_when_wrong_id_configured() { + util::fails_processing_cert_when_wrong_id_configured(Mode::Boring); +} + #[tokio::test(flavor = "current_thread")] async fn plaintext() { util::plaintext(Mode::Boring).await; diff --git a/linkerd/meshtls/tests/rustls.rs b/linkerd/meshtls/tests/rustls.rs index bd9eed32c7..ac8eff9177 100644 --- a/linkerd/meshtls/tests/rustls.rs +++ b/linkerd/meshtls/tests/rustls.rs @@ -6,6 +6,11 @@ mod util; use linkerd_meshtls::Mode; +#[test] +fn fails_processing_cert_when_wrong_id_configured() { + util::fails_processing_cert_when_wrong_id_configured(Mode::Rustls); +} + #[tokio::test(flavor = "current_thread")] async fn plaintext() { util::plaintext(Mode::Rustls).await; diff --git a/linkerd/meshtls/tests/util.rs b/linkerd/meshtls/tests/util.rs index 5380af1145..79d98af6be 100644 --- a/linkerd/meshtls/tests/util.rs +++ b/linkerd/meshtls/tests/util.rs @@ -3,8 +3,9 @@ use futures::prelude::*; use linkerd_conditional::Conditional; +use linkerd_dns_name::Name; use linkerd_error::Infallible; -use linkerd_identity::{Credentials, DerX509}; +use linkerd_identity::{Credentials, DerX509, Id}; use linkerd_io::{self as io, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use linkerd_meshtls as meshtls; use linkerd_proxy_transport::{ @@ -17,6 +18,8 @@ use linkerd_stack::{ }; use linkerd_tls as tls; use linkerd_tls_test_util as test_util; +use rcgen::{BasicConstraints, Certificate, CertificateParams, IsCa, SanType}; +use std::str::FromStr; use std::{ future::Future, net::SocketAddr, @@ -26,6 +29,45 @@ use std::{ use tokio::net::TcpStream; use tracing::Instrument; +fn generate_cert_with_name(subject_alt_names: Vec) -> (Vec, Vec, String) { + let mut root_params = CertificateParams::default(); + root_params.is_ca = IsCa::Ca(BasicConstraints::Unconstrained); + let root_cert = Certificate::from_params(root_params).expect("should generate root"); + + let mut params = CertificateParams::default(); + params.subject_alt_names = subject_alt_names; + + let cert = Certificate::from_params(params).expect("should generate cert"); + + ( + cert.serialize_der_with_signer(&root_cert) + .expect("should serialize"), + cert.serialize_private_key_der(), + root_cert.serialize_pem().expect("should serialize"), + ) +} + +pub fn fails_processing_cert_when_wrong_id_configured(mode: meshtls::Mode) { + let server_name = Name::from_str("system.local").expect("should parse"); + let id = Id::Dns(server_name.clone()); + + let (cert, key, roots) = + generate_cert_with_name(vec![SanType::URI("spiffe://system/local".into())]); + let (mut store, _) = mode + .watch(id, server_name.clone(), &roots) + .expect("should construct"); + + let err = store + .set_certificate(DerX509(cert), vec![], key, SystemTime::now()) + .err() + .expect("error"); + + assert_eq!( + "certificate does not match TLS identity", + format!("{}", err), + ); +} + pub async fn plaintext(mode: meshtls::Mode) { let (_foo, _, server_tls) = load(mode, &test_util::FOO_NS1); let (_bar, client_tls, _) = load(mode, &test_util::BAR_NS1); From c2ca723a23f96bec9a8baad4099888a8701bd2db Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Wed, 10 Jan 2024 20:02:20 +0000 Subject: [PATCH 07/13] feedback Signed-off-by: Zahari Dichev --- Cargo.lock | 1 + linkerd/meshtls/tests/util.rs | 3 +- linkerd/proxy/spire-client/Cargo.toml | 1 + linkerd/proxy/spire-client/src/api.rs | 70 ++++++++++++++++++---- linkerd/proxy/spire-client/src/lib.rs | 83 +++++++++------------------ 5 files changed, 91 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cf1cbda245..7caa997776 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1929,6 +1929,7 @@ dependencies = [ "rcgen", "simple_asn1", "spiffe-proto", + "thiserror", "tokio", "tokio-test", "tonic", diff --git a/linkerd/meshtls/tests/util.rs b/linkerd/meshtls/tests/util.rs index 79d98af6be..65fbecf4a0 100644 --- a/linkerd/meshtls/tests/util.rs +++ b/linkerd/meshtls/tests/util.rs @@ -59,8 +59,7 @@ pub fn fails_processing_cert_when_wrong_id_configured(mode: meshtls::Mode) { let err = store .set_certificate(DerX509(cert), vec![], key, SystemTime::now()) - .err() - .expect("error"); + .expect_err("should error"); assert_eq!( "certificate does not match TLS identity", diff --git a/linkerd/proxy/spire-client/Cargo.toml b/linkerd/proxy/spire-client/Cargo.toml index 07782e29e3..76c8c6fd41 100644 --- a/linkerd/proxy/spire-client/Cargo.toml +++ b/linkerd/proxy/spire-client/Cargo.toml @@ -21,6 +21,7 @@ tower = "0.4" tracing = "0.1" x509-parser = "0.15.1" asn1 = { version = "0.6", package = "simple_asn1" } +thiserror = "1" [dev-dependencies] rcgen = "0.11.3" diff --git a/linkerd/proxy/spire-client/src/api.rs b/linkerd/proxy/spire-client/src/api.rs index cbe2e3d692..df3735e1db 100644 --- a/linkerd/proxy/spire-client/src/api.rs +++ b/linkerd/proxy/spire-client/src/api.rs @@ -2,27 +2,32 @@ use futures::prelude::*; use linkerd_error::{Error, Recover, Result}; use linkerd_exp_backoff::{ExponentialBackoff, ExponentialBackoffStream}; use linkerd_identity::DerX509; -use linkerd_identity::Id; +use linkerd_identity::{Credentials, Id}; use linkerd_proxy_http as http; use linkerd_tonic_watch::StreamWatch; use spiffe_proto::client::{ self as api, spiffe_workload_api_client::SpiffeWorkloadApiClient as Client, }; use std::collections::HashMap; +use std::time::{Duration, UNIX_EPOCH}; use tower::Service; use tracing::error; +#[derive(Debug, thiserror::Error)] +#[error("no matching SVID found")] +pub struct NoMatchingSVIDFound(()); + #[derive(Clone)] pub struct Svid { pub(super) spiffe_id: Id, - pub(super) leaf: DerX509, - pub(super) private_key: Vec, - pub(super) intermediates: Vec, + leaf: DerX509, + private_key: Vec, + intermediates: Vec, } #[derive(Clone)] pub struct SvidUpdate { - pub(super) svids: HashMap, + svids: HashMap, } #[derive(Clone, Debug)] @@ -37,6 +42,36 @@ pub type Watch = StreamWatch>; // === impl Svid === +impl SvidUpdate { + pub(super) fn new(svids: Vec) -> Self { + let mut svids_map = HashMap::default(); + for svid in svids.into_iter() { + svids_map.insert(svid.spiffe_id.clone(), svid); + } + + SvidUpdate { svids: svids_map } + } +} + +// === impl Svid === + +impl Svid { + #[cfg(test)] + pub(super) fn new( + spiffe_id: Id, + leaf: DerX509, + private_key: Vec, + intermediates: Vec, + ) -> Self { + Self { + spiffe_id, + leaf, + private_key, + intermediates, + } + } +} + impl TryFrom for Svid { // TODO: Use bundles from response to compare against // what is provided at bootstrap time @@ -119,16 +154,14 @@ where .svids .into_iter() .filter_map(|proto| { - let svid: Option = proto + proto .try_into() .map_err(|err| error!("could not parse SVID: {}", err)) - .ok(); - - svid.map(|svid| (svid.spiffe_id.clone(), svid)) + .ok() }) .collect(); - SvidUpdate { svids } + SvidUpdate::new(svids) }) .boxed() })) @@ -157,6 +190,23 @@ impl Recover for GrpcRecover { } } +pub(super) fn process_svid(credentials: &mut C, mut update: SvidUpdate, id: &Id) -> Result<()> +where + C: Credentials, +{ + if let Some(svid) = update.svids.remove(id) { + use x509_parser::prelude::*; + + let (_, parsed_cert) = X509Certificate::from_der(&svid.leaf.0)?; + let exp: u64 = parsed_cert.validity().not_after.timestamp().try_into()?; + let exp = UNIX_EPOCH + Duration::from_secs(exp); + + return credentials.set_certificate(svid.leaf, svid.intermediates, svid.private_key, exp); + } + + Err(NoMatchingSVIDFound(()).into()) +} + #[cfg(test)] mod tests { use crate::api::Svid; diff --git a/linkerd/proxy/spire-client/src/lib.rs b/linkerd/proxy/spire-client/src/lib.rs index 72ab0ec80d..7acc16290e 100644 --- a/linkerd/proxy/spire-client/src/lib.rs +++ b/linkerd/proxy/spire-client/src/lib.rs @@ -4,13 +4,12 @@ mod api; pub use api::{Api, SvidUpdate}; -use linkerd_error::{Error, Result}; +use linkerd_error::Error; use linkerd_identity::Credentials; use linkerd_identity::Id; -use std::fmt::Display; -use std::time::{Duration, UNIX_EPOCH}; +use std::fmt::{Debug, Display}; use tokio::sync::watch; -use tower::Service; +use tower::{util::ServiceExt, Service}; use tracing::error; pub struct Spire { @@ -28,8 +27,9 @@ impl Spire { where C: Credentials, S: Service<(), Response = tonic::Response>>, - S::Error: Into + Display, + S::Error: Into + Display + Debug, { + let client = client.ready().await.expect("should be ready"); match client.call(()).await { Ok(rsp) => consume_updates(&self.id, rsp.into_inner(), credentials).await, Err(error) => error!(%error, "could not establish SVID stream"), @@ -46,34 +46,16 @@ async fn consume_updates( { loop { let svid_update = updates.borrow_and_update().clone(); - if let Err(error) = process_svid(&mut credentials, svid_update, id) { - tracing::error!("Error processing SVID update: {}", error); + if let Err(error) = api::process_svid(&mut credentials, svid_update, id) { + tracing::error!(%error, "Error processing SVID update"); } - - if let Err(error) = updates.changed().await { - tracing::debug!("SVID watch closed; terminating {}", error); + if updates.changed().await.is_err() { + tracing::debug!("SVID watch closed; terminating"); return; } } } -fn process_svid(credentials: &mut C, mut update: SvidUpdate, id: &Id) -> Result<()> -where - C: Credentials, -{ - if let Some(svid) = update.svids.remove(id) { - use x509_parser::prelude::*; - - let (_, parsed_cert) = X509Certificate::from_der(&svid.leaf.0)?; - let exp: u64 = parsed_cert.validity().not_after.timestamp().try_into()?; - let exp = UNIX_EPOCH + Duration::from_secs(exp); - - return credentials.set_certificate(svid.leaf, svid.intermediates, svid.private_key, exp); - } - - Err("could not find an SVID".into()) -} - #[cfg(test)] mod tests { use super::*; @@ -81,7 +63,7 @@ mod tests { use linkerd_error::Result; use linkerd_identity::{Credentials, DerX509, Id}; use rcgen::{Certificate, CertificateParams, SanType, SerialNumber}; - use std::{collections::HashMap, time::SystemTime}; + use std::time::SystemTime; use tokio::sync::watch; fn gen_svid(id: Id, subject_alt_names: Vec, serial: SerialNumber) -> Svid { @@ -89,26 +71,17 @@ mod tests { params.subject_alt_names = subject_alt_names; params.serial_number = Some(serial); - Svid { - spiffe_id: id, - leaf: DerX509( + Svid::new( + id, + DerX509( Certificate::from_params(params) .expect("should generate cert") .serialize_der() .expect("should serialize"), ), - private_key: Vec::default(), - intermediates: Vec::default(), - } - } - - fn svid_update(svids: Vec) -> SvidUpdate { - let mut svids_map = HashMap::default(); - for svid in svids.into_iter() { - svids_map.insert(svid.spiffe_id.clone(), svid); - } - - SvidUpdate { svids: svids_map } + Vec::default(), + Vec::default(), + ) } struct MockClient { @@ -177,7 +150,7 @@ mod tests { let spire = Spire::new(spiffe_id.clone()); let serial_1 = SerialNumber::from_slice("some-serial-1".as_bytes()); - let update_1 = svid_update(vec![gen_svid( + let update_1 = SvidUpdate::new(vec![gen_svid( spiffe_id.clone(), vec![SanType::URI(spiffe_san.into())], serial_1.clone(), @@ -190,7 +163,7 @@ mod tests { assert!(*creds_rx.borrow_and_update() == Some(serial_1)); let serial_2 = SerialNumber::from_slice("some-serial-2".as_bytes()); - let update_2 = svid_update(vec![gen_svid( + let update_2 = SvidUpdate::new(vec![gen_svid( spiffe_id.clone(), vec![SanType::URI(spiffe_san.into())], serial_2.clone(), @@ -212,7 +185,7 @@ mod tests { let spire = Spire::new(spiffe_id.clone()); let serial_1 = SerialNumber::from_slice("some-serial-1".as_bytes()); - let update_1 = svid_update(vec![gen_svid( + let update_1 = SvidUpdate::new(vec![gen_svid( spiffe_id.clone(), vec![SanType::URI(spiffe_san.into())], serial_1.clone(), @@ -224,15 +197,15 @@ mod tests { creds_rx.changed().await.unwrap(); assert!(*creds_rx.borrow_and_update() == Some(serial_1.clone())); - let invalid_svid = Svid { - spiffe_id: spiffe_id.clone(), - leaf: DerX509(Vec::default()), - private_key: Vec::default(), - intermediates: Vec::default(), - }; + let invalid_svid = Svid::new( + spiffe_id.clone(), + DerX509(Vec::default()), + Vec::default(), + Vec::default(), + ); let mut update_sent = svid_tx.subscribe(); - let update_2 = svid_update(vec![invalid_svid]); + let update_2 = SvidUpdate::new(vec![invalid_svid]); svid_tx.send(update_2).expect("should send"); update_sent.changed().await.unwrap(); @@ -254,7 +227,7 @@ mod tests { let spire = Spire::new(spiffe_id.clone()); let serial_1 = SerialNumber::from_slice("some-serial-1".as_bytes()); - let update_1 = svid_update(vec![gen_svid( + let update_1 = SvidUpdate::new(vec![gen_svid( spiffe_id.clone(), vec![SanType::URI(spiffe_san.into())], serial_1.clone(), @@ -268,7 +241,7 @@ mod tests { let serial_2 = SerialNumber::from_slice("some-serial-2".as_bytes()); let mut update_sent = svid_tx.subscribe(); - let update_2 = svid_update(vec![gen_svid( + let update_2 = SvidUpdate::new(vec![gen_svid( spiffe_id_wrong, vec![SanType::URI(spiffe_san_wrong.into())], serial_2.clone(), From 27d4888482ebbd2e254777dd50deb68312441197 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Thu, 11 Jan 2024 10:03:14 +0000 Subject: [PATCH 08/13] panic on error Signed-off-by: Zahari Dichev --- linkerd/app/src/identity.rs | 9 +++++---- linkerd/proxy/spire-client/src/lib.rs | 10 +++++----- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/linkerd/app/src/identity.rs b/linkerd/app/src/identity.rs index 9ec2eafd9b..6bab7dd0cd 100644 --- a/linkerd/app/src/identity.rs +++ b/linkerd/app/src/identity.rs @@ -14,6 +14,10 @@ use std::{future::Future, pin::Pin, time::SystemTime}; use tokio::sync::watch; use tracing::Instrument; +#[derive(Debug, thiserror::Error)] +#[error("linkerd identity requires a TLS Id and server name to be the same")] +pub struct TlsIdAndServerNameNotMatching(()); + #[derive(Clone, Debug)] #[allow(clippy::large_enum_variant)] pub enum Config { @@ -75,10 +79,7 @@ impl Config { let name = match (&tls.id, &tls.server_name) { (Id::Dns(id), sni) if id == sni => id.clone(), (_id, _sni) => { - return Err( - "Linkerd identity requires a TLS Id and server name to be the same" - .into(), - ); + return Err(TlsIdAndServerNameNotMatching(()).into()); } }; diff --git a/linkerd/proxy/spire-client/src/lib.rs b/linkerd/proxy/spire-client/src/lib.rs index 7acc16290e..4987209147 100644 --- a/linkerd/proxy/spire-client/src/lib.rs +++ b/linkerd/proxy/spire-client/src/lib.rs @@ -10,7 +10,6 @@ use linkerd_identity::Id; use std::fmt::{Debug, Display}; use tokio::sync::watch; use tower::{util::ServiceExt, Service}; -use tracing::error; pub struct Spire { id: Id, @@ -30,10 +29,11 @@ impl Spire { S::Error: Into + Display + Debug, { let client = client.ready().await.expect("should be ready"); - match client.call(()).await { - Ok(rsp) => consume_updates(&self.id, rsp.into_inner(), credentials).await, - Err(error) => error!(%error, "could not establish SVID stream"), - } + let rsp = client + .call(()) + .await + .expect("spire client must gracefully handle errors"); + consume_updates(&self.id, rsp.into_inner(), credentials).await } } From cd3dcc5fdff0dc40d20eeb1bb3c28a67112bc69a Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Thu, 11 Jan 2024 12:10:18 +0000 Subject: [PATCH 09/13] do not emit proto package in client impl Signed-off-by: Zahari Dichev --- spiffe-proto/src/gen/spiffe.workloadapi.rs | 9 ++------- spiffe-proto/tests/bootstrap.rs | 1 + 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/spiffe-proto/src/gen/spiffe.workloadapi.rs b/spiffe-proto/src/gen/spiffe.workloadapi.rs index 352c1a7c8a..b052c5308d 100644 --- a/spiffe-proto/src/gen/spiffe.workloadapi.rs +++ b/spiffe-proto/src/gen/spiffe.workloadapi.rs @@ -140,16 +140,11 @@ pub mod spiffe_workload_api_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( - "/spiffe.workloadapi.SpiffeWorkloadAPI/FetchX509SVID", + "/SpiffeWorkloadAPI/FetchX509SVID", ); let mut req = request.into_request(); req.extensions_mut() - .insert( - GrpcMethod::new( - "spiffe.workloadapi.SpiffeWorkloadAPI", - "FetchX509SVID", - ), - ); + .insert(GrpcMethod::new("SpiffeWorkloadAPI", "FetchX509SVID")); self.inner.server_streaming(req, path, codec).await } } diff --git a/spiffe-proto/tests/bootstrap.rs b/spiffe-proto/tests/bootstrap.rs index 498b3941fb..3aa90b3d3c 100644 --- a/spiffe-proto/tests/bootstrap.rs +++ b/spiffe-proto/tests/bootstrap.rs @@ -26,6 +26,7 @@ fn generate(out_dir: &std::path::Path) { .build_client(true) .build_server(false) .emit_rerun_if_changed(false) + .disable_package_emission() .out_dir(out_dir) .compile(iface_files, &["."]) { From 3757c91d1f75c9a68aec9f750b5b80bbf52c082f Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Thu, 11 Jan 2024 12:48:24 +0000 Subject: [PATCH 10/13] update lock file Signed-off-by: Zahari Dichev --- Cargo.lock | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3489012f22..27c34601d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1593,7 +1593,7 @@ dependencies = [ "linkerd-tls-test-util", "linkerd-tracing", "pin-project", - "rcgen", + "rcgen 0.11.3", "tokio", "tracing", ] @@ -1647,7 +1647,7 @@ version = "0.1.0" dependencies = [ "linkerd-error", "linkerd-identity", - "rcgen", + "rcgen 0.12.0", "tracing", "x509-parser", ] @@ -1925,7 +1925,7 @@ dependencies = [ "linkerd-proxy-http", "linkerd-stack", "linkerd-tonic-watch", - "rcgen", + "rcgen 0.11.3", "simple_asn1", "spiffe-proto", "thiserror", @@ -2800,6 +2800,18 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "977b1e897f9d764566891689e642653e5ed90c6895106acd005eb4c1d0203991" +[[package]] +name = "rcgen" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52c4f3084aa3bc7dfbba4eff4fab2a54db4324965d8872ab933565e6fbd83bc6" +dependencies = [ + "pem", + "ring 0.16.20", + "time", + "yasna", +] + [[package]] name = "rcgen" version = "0.12.0" From a8620b34ce2daaa27a06b31954a97b2957c52935 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Thu, 11 Jan 2024 13:26:15 +0000 Subject: [PATCH 11/13] spire: add required header to request Signed-off-by: Zahari Dichev --- linkerd/proxy/spire-client/src/api.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/linkerd/proxy/spire-client/src/api.rs b/linkerd/proxy/spire-client/src/api.rs index df3735e1db..c16f79e689 100644 --- a/linkerd/proxy/spire-client/src/api.rs +++ b/linkerd/proxy/spire-client/src/api.rs @@ -9,10 +9,14 @@ use spiffe_proto::client::{ self as api, spiffe_workload_api_client::SpiffeWorkloadApiClient as Client, }; use std::collections::HashMap; +use std::str::FromStr; use std::time::{Duration, UNIX_EPOCH}; use tower::Service; use tracing::error; +const SPIFFE_HEADER_KEY: &str = "workload.spiffe.io"; +const SPIFFE_HEADER_VALUE: &str = "true"; + #[derive(Debug, thiserror::Error)] #[error("no matching SVID found")] pub struct NoMatchingSVIDFound(()); @@ -146,7 +150,14 @@ where let req = api::X509svidRequest {}; let mut client = self.client.clone(); Box::pin(async move { - let rsp = client.fetch_x509svid(tonic::Request::new(req)).await?; + let parsed_header = SPIFFE_HEADER_VALUE + .parse() + .map_err(|e| tonic::Status::internal(format!("Failed to parse header: {}", e)))?; + + let mut req = tonic::Request::new(req); + req.metadata_mut().insert(SPIFFE_HEADER_KEY, parsed_header); + + let rsp = client.fetch_x509svid(req).await?; Ok(rsp.map(|svids| { svids .map_ok(move |s| { From b07e30c3c7337ab97cd7182828e29b8c15fcd3ed Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Thu, 11 Jan 2024 13:40:07 +0000 Subject: [PATCH 12/13] lint Signed-off-by: Zahari Dichev --- linkerd/proxy/spire-client/src/api.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/linkerd/proxy/spire-client/src/api.rs b/linkerd/proxy/spire-client/src/api.rs index c16f79e689..3c3ec16d4f 100644 --- a/linkerd/proxy/spire-client/src/api.rs +++ b/linkerd/proxy/spire-client/src/api.rs @@ -9,7 +9,6 @@ use spiffe_proto::client::{ self as api, spiffe_workload_api_client::SpiffeWorkloadApiClient as Client, }; use std::collections::HashMap; -use std::str::FromStr; use std::time::{Duration, UNIX_EPOCH}; use tower::Service; use tracing::error; From f6e0e815cc71e1d5982f33b06ae5df47df89c876 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Thu, 11 Jan 2024 14:12:19 +0000 Subject: [PATCH 13/13] fix logging Signed-off-by: Zahari Dichev --- linkerd/app/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/linkerd/app/src/lib.rs b/linkerd/app/src/lib.rs index 42413772de..bc559263cf 100644 --- a/linkerd/app/src/lib.rs +++ b/linkerd/app/src/lib.rs @@ -390,7 +390,7 @@ impl App { // Kick off the identity so that the process can become ready. let local = identity.receiver(); - let local_name = local.server_name().clone(); + let local_id = local.local_id().clone(); let ready = identity.ready(); tokio::spawn( identity @@ -403,7 +403,7 @@ impl App { ready .map(move |()| { latch.release(); - info!(id = %local_name, "Certified identity"); + info!(id = %local_id, "Certified identity"); }) .instrument(info_span!("identity").or_current()), );