Skip to content

Commit

Permalink
Add OpenTelemetry exporter implementation
Browse files Browse the repository at this point in the history
This duplicates most of the logic from the existing OpenCensus exporter to keep the functionality as similar as possible.

[#10111](linkerd/linkerd2#10111)

Signed-off-by: Scott Fleener <[email protected]>
  • Loading branch information
sfleen committed Sep 23, 2024
1 parent 11ed0d0 commit 62604f3
Show file tree
Hide file tree
Showing 5 changed files with 313 additions and 0 deletions.
18 changes: 18 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1750,6 +1750,24 @@ dependencies = [
"tracing",
]

[[package]]
name = "linkerd-opentelemetry"
version = "0.1.0"
dependencies = [
"futures",
"http",
"http-body",
"linkerd-error",
"linkerd-metrics",
"opentelemetry",
"opentelemetry-proto",
"opentelemetry_sdk",
"tokio",
"tokio-stream",
"tonic",
"tracing",
]

[[package]]
name = "linkerd-pool"
version = "0.1.0"
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ members = [
"linkerd/meshtls/verifier",
"linkerd/metrics",
"linkerd/opencensus",
"linkerd/opentelemetry",
"linkerd/pool",
"linkerd/pool/mock",
"linkerd/pool/p2c",
Expand Down
24 changes: 24 additions & 0 deletions linkerd/opentelemetry/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "linkerd-opentelemetry"
version = "0.1.0"
authors = ["Linkerd Developers <[email protected]>"]
license = "Apache-2.0"
edition = "2021"
publish = false

[dependencies]
futures = { version = "0.3", default-features = false }
http = "0.2"
http-body = "0.4"
linkerd-error = { path = "../error" }
linkerd-metrics = { path = "../metrics" }
opentelemetry = { version = "0.23", default-features = false, features = ["trace"] }
opentelemetry_sdk = { version = "0.23", default-features = false, features = ["trace"] }
opentelemetry-proto = { path = "../../opentelemetry-proto" }
tonic = { version = "0.10", default-features = false, features = [
"prost",
"codegen",
] }
tokio = { version = "1", features = ["macros", "sync", "time"] }
tokio-stream = { version = "0.1", features = ["sync"] }
tracing = "0.1"
212 changes: 212 additions & 0 deletions linkerd/opentelemetry/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
#![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)]
#![forbid(unsafe_code)]

pub mod metrics;

use futures::stream::{Stream, StreamExt};
use http_body::Body as HttpBody;
use linkerd_error::Error;
use metrics::Registry;
pub use opentelemetry_proto as proto;
use opentelemetry_proto::proto::collector::trace::v1::trace_service_client::TraceServiceClient;
use opentelemetry_proto::proto::collector::trace::v1::ExportTraceServiceRequest;
use opentelemetry_proto::proto::trace::v1::ResourceSpans;
use opentelemetry_proto::transform::common::ResourceAttributesWithSchema;
use opentelemetry_proto::transform::trace::group_spans_by_resource_and_scope;
pub use opentelemetry_sdk::export::trace::SpanData;
use tokio::{sync::mpsc, time};
use tonic::{self as grpc, body::BoxBody, client::GrpcService};
use tracing::{debug, trace};

pub async fn export_spans<T, S>(
client: T,
spans: S,
resource: ResourceAttributesWithSchema,
metrics: Registry,
) where
T: GrpcService<BoxBody> + Clone,
T::Error: Into<Error>,
T::ResponseBody: Default + HttpBody<Data = tonic::codegen::Bytes> + Send + 'static,
<T::ResponseBody as HttpBody>::Error: Into<Error> + Send,
S: Stream<Item = SpanData> + Unpin,
{
debug!("Span exporter running");
SpanExporter::new(client, spans, resource, metrics)

Check warning on line 34 in linkerd/opentelemetry/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/lib.rs#L33-L34

Added lines #L33 - L34 were not covered by tests
.run()
.await

Check warning on line 36 in linkerd/opentelemetry/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/lib.rs#L36

Added line #L36 was not covered by tests
}

/// SpanExporter sends a Stream of spans to the given TraceService gRPC service.
struct SpanExporter<T, S> {
client: T,
spans: S,
resource: ResourceAttributesWithSchema,
metrics: Registry,
}

#[derive(Debug)]
struct SpanRxClosed;

// === impl SpanExporter ===

impl<T, S> SpanExporter<T, S>
where
T: GrpcService<BoxBody> + Clone,
T::Error: Into<Error>,
T::ResponseBody: Default + HttpBody<Data = tonic::codegen::Bytes> + Send + 'static,
<T::ResponseBody as HttpBody>::Error: Into<Error> + Send,
S: Stream<Item = SpanData> + Unpin,
{
const MAX_BATCH_SIZE: usize = 1000;
const MAX_BATCH_IDLE: time::Duration = time::Duration::from_secs(10);

fn new(client: T, spans: S, resource: ResourceAttributesWithSchema, metrics: Registry) -> Self {

Check warning on line 63 in linkerd/opentelemetry/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/lib.rs#L63

Added line #L63 was not covered by tests
Self {
client,
spans,
resource,
metrics,
}
}

async fn run(self) {
let Self {
client,
mut spans,
resource,
mut metrics,
} = self;

Check warning on line 78 in linkerd/opentelemetry/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/lib.rs#L72-L78

Added lines #L72 - L78 were not covered by tests

// Holds the batch of pending spans. Cleared as the spans are flushed.
// Contains no more than MAX_BATCH_SIZE spans.
let mut accum = Vec::new();

Check warning on line 82 in linkerd/opentelemetry/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/lib.rs#L82

Added line #L82 was not covered by tests

let mut svc = TraceServiceClient::new(client);
loop {
trace!("Establishing new TraceService::export request");
metrics.start_stream();
let (tx, mut rx) = mpsc::channel(1);

Check warning on line 88 in linkerd/opentelemetry/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/lib.rs#L84-L88

Added lines #L84 - L88 were not covered by tests

let recv_future = async {
while let Some(req) = rx.recv().await {
match svc.export(grpc::Request::new(req)).await {
Ok(rsp) => {
let Some(partial_success) = rsp.into_inner().partial_success else {
continue;

Check warning on line 95 in linkerd/opentelemetry/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/lib.rs#L90-L95

Added lines #L90 - L95 were not covered by tests
};

if !partial_success.error_message.is_empty() {
debug!(
%partial_success.error_message,
rejected_spans = partial_success.rejected_spans,

Check warning on line 101 in linkerd/opentelemetry/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/lib.rs#L98-L101

Added lines #L98 - L101 were not covered by tests
"Response partially successful",
);
}
}
Err(error) => {
debug!(%error, "Response future failed; restarting");

Check warning on line 107 in linkerd/opentelemetry/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/lib.rs#L106-L107

Added lines #L106 - L107 were not covered by tests
}
}
}
};

// Drive both the response future and the export stream
// simultaneously.
tokio::select! {
_ = recv_future => {}
res = Self::export(&tx, &mut spans, &resource, &mut accum) => match res {

Check warning on line 117 in linkerd/opentelemetry/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/lib.rs#L115-L117

Added lines #L115 - L117 were not covered by tests
// The export stream closed; reconnect.
Ok(()) => {},

Check warning on line 119 in linkerd/opentelemetry/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/lib.rs#L119

Added line #L119 was not covered by tests
// No more spans.
Err(SpanRxClosed) => return,

Check warning on line 121 in linkerd/opentelemetry/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/lib.rs#L121

Added line #L121 was not covered by tests
},
}
}
}

/// Accumulate spans and send them on the export stream.
///
/// Returns an error when the proxy has closed the span stream.
async fn export(
tx: &mpsc::Sender<ExportTraceServiceRequest>,
spans: &mut S,
resource: &ResourceAttributesWithSchema,
accum: &mut Vec<ResourceSpans>,
) -> Result<(), SpanRxClosed> {
loop {

Check warning on line 136 in linkerd/opentelemetry/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/lib.rs#L136

Added line #L136 was not covered by tests
// Collect spans into a batch.
let collect = Self::collect_batch(spans, resource, accum).await;

Check warning on line 138 in linkerd/opentelemetry/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/lib.rs#L138

Added line #L138 was not covered by tests

// If we collected spans, flush them.
if !accum.is_empty() {

Check warning on line 141 in linkerd/opentelemetry/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/lib.rs#L141

Added line #L141 was not covered by tests
// Once a batch has been accumulated, ensure that the
// request stream is ready to accept the batch.
match tx.reserve().await {
Ok(tx) => {

Check warning on line 145 in linkerd/opentelemetry/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/lib.rs#L144-L145

Added lines #L144 - L145 were not covered by tests
let msg = ExportTraceServiceRequest {
resource_spans: std::mem::take(accum),

Check warning on line 147 in linkerd/opentelemetry/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/lib.rs#L147

Added line #L147 was not covered by tests
};
trace!(spans = msg.resource_spans.len(), "Sending batch");
tx.send(msg);

Check warning on line 150 in linkerd/opentelemetry/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/lib.rs#L149-L150

Added lines #L149 - L150 were not covered by tests
}
Err(error) => {

Check warning on line 152 in linkerd/opentelemetry/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/lib.rs#L152

Added line #L152 was not covered by tests
// If the channel isn't open, start a new stream
// and retry sending the batch.
debug!(%error, "Request stream lost; restarting");
return Ok(());

Check warning on line 156 in linkerd/opentelemetry/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/lib.rs#L155-L156

Added lines #L155 - L156 were not covered by tests
}
}
}

// If the span source was closed, end the task.
if let Err(closed) = collect {
debug!("Span channel lost");
return Err(closed);

Check warning on line 164 in linkerd/opentelemetry/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/lib.rs#L162-L164

Added lines #L162 - L164 were not covered by tests
}
}
}

/// Collects spans from the proxy into `accum`.
///
/// Returns an error when the span stream has completed. An error may be
/// returned after accumulating spans.
async fn collect_batch(
span_stream: &mut S,
resource: &ResourceAttributesWithSchema,
accum: &mut Vec<ResourceSpans>,
) -> Result<(), SpanRxClosed> {
let mut input_accum: Vec<SpanData> = vec![];

Check warning on line 178 in linkerd/opentelemetry/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/lib.rs#L178

Added line #L178 was not covered by tests

let res = loop {
if input_accum.len() == Self::MAX_BATCH_SIZE {
trace!(capacity = Self::MAX_BATCH_SIZE, "Batch capacity reached");
break Ok(());

Check warning on line 183 in linkerd/opentelemetry/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/lib.rs#L180-L183

Added lines #L180 - L183 were not covered by tests
}

tokio::select! {
biased;

Check warning on line 187 in linkerd/opentelemetry/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/lib.rs#L186-L187

Added lines #L186 - L187 were not covered by tests

res = span_stream.next() => match res {
Some(span) => {
trace!(?span, "Adding to batch");
input_accum.push(span);

Check warning on line 192 in linkerd/opentelemetry/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/lib.rs#L189-L192

Added lines #L189 - L192 were not covered by tests
}
None => break Err(SpanRxClosed),

Check warning on line 194 in linkerd/opentelemetry/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/lib.rs#L194

Added line #L194 was not covered by tests
},

// Don't hold spans indefinitely. Return if we hit an idle
// timeout and spans have been collected.
_ = time::sleep(Self::MAX_BATCH_IDLE) => {
if !input_accum.is_empty() {
trace!(spans = input_accum.len(), "Flushing spans due to inactivitiy");
break Ok(());

Check warning on line 202 in linkerd/opentelemetry/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/lib.rs#L199-L202

Added lines #L199 - L202 were not covered by tests
}
}
}
};

*accum = group_spans_by_resource_and_scope(input_accum, resource);

Check warning on line 208 in linkerd/opentelemetry/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/lib.rs#L208

Added line #L208 was not covered by tests

res

Check warning on line 210 in linkerd/opentelemetry/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/lib.rs#L210

Added line #L210 was not covered by tests
}
}
58 changes: 58 additions & 0 deletions linkerd/opentelemetry/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use linkerd_metrics::{metrics, Counter, FmtMetrics};
use std::fmt;
use std::sync::Arc;

metrics! {
opentelemetry_span_export_streams: Counter { "Total count of opened span export streams" },
opentelemetry_span_export_requests: Counter { "Total count of span export request messages" },
opentelemetry_span_exports: Counter { "Total count of spans exported" }
}

#[derive(Debug)]
struct Metrics {
streams: Counter,
requests: Counter,
spans: Counter,
}

#[derive(Clone, Debug)]
pub struct Registry(Arc<Metrics>);

#[derive(Clone, Debug)]
pub struct Report(Arc<Metrics>);

pub fn new() -> (Registry, Report) {

Check warning on line 24 in linkerd/opentelemetry/src/metrics.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/metrics.rs#L24

Added line #L24 was not covered by tests
let metrics = Metrics {
streams: Counter::default(),
requests: Counter::default(),
spans: Counter::default(),

Check warning on line 28 in linkerd/opentelemetry/src/metrics.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/metrics.rs#L26-L28

Added lines #L26 - L28 were not covered by tests
};
let shared = Arc::new(metrics);
(Registry(shared.clone()), Report(shared))

Check warning on line 31 in linkerd/opentelemetry/src/metrics.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/metrics.rs#L30-L31

Added lines #L30 - L31 were not covered by tests
}

impl Registry {
pub fn start_stream(&mut self) {
self.0.streams.incr()

Check warning on line 36 in linkerd/opentelemetry/src/metrics.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/metrics.rs#L35-L36

Added lines #L35 - L36 were not covered by tests
}

pub fn send(&mut self, spans: u64) {
self.0.requests.incr();
self.0.spans.add(spans);

Check warning on line 41 in linkerd/opentelemetry/src/metrics.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/metrics.rs#L39-L41

Added lines #L39 - L41 were not covered by tests
}
}

impl FmtMetrics for Report {
fn fmt_metrics(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
opentelemetry_span_export_streams.fmt_help(f)?;
opentelemetry_span_export_streams.fmt_metric(f, &self.0.streams)?;

Check warning on line 48 in linkerd/opentelemetry/src/metrics.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/metrics.rs#L46-L48

Added lines #L46 - L48 were not covered by tests

opentelemetry_span_export_requests.fmt_help(f)?;
opentelemetry_span_export_requests.fmt_metric(f, &self.0.requests)?;

Check warning on line 51 in linkerd/opentelemetry/src/metrics.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/metrics.rs#L50-L51

Added lines #L50 - L51 were not covered by tests

opentelemetry_span_exports.fmt_help(f)?;
opentelemetry_span_exports.fmt_metric(f, &self.0.spans)?;

Check warning on line 54 in linkerd/opentelemetry/src/metrics.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/metrics.rs#L53-L54

Added lines #L53 - L54 were not covered by tests

Ok(())

Check warning on line 56 in linkerd/opentelemetry/src/metrics.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/opentelemetry/src/metrics.rs#L56

Added line #L56 was not covered by tests
}
}

0 comments on commit 62604f3

Please sign in to comment.