Skip to content

Commit

Permalink
add license copyright and fmt code
Browse files Browse the repository at this point in the history
  • Loading branch information
raphamorim committed Jan 9, 2024
1 parent 01a8f18 commit a82ff42
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 112 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ opentelemetry-jaeger = { version = "0.20", features = [
"rt-tokio",
], optional = true }
opentelemetry-semantic-conventions = { version = "0.13", optional = true }
tracing-opentelemetry-instrumentation-sdk = "0.16.0"
tracing-opentelemetry-instrumentation-sdk = { version = "0.16.0", features = ["http"], optional = true }
tracing = { version = "0.1.40", default-features = false }
tracing-subscriber = { version = "0.3.18", default-features = false, features = [
"env-filter",
Expand All @@ -34,19 +34,20 @@ hyper = { version = "0.14.24", features = ["full"], optional = true }
tower = { version = "0.4", optional = true }
axum = { version = "0.7.3", optional = true }
pin-project-lite = { version = "0.2", optional = true }
futures-util = { version = "0.3", default_features = false, features = [], optional = true }

[dev-dependencies]
assert2 = "0.3"
rstest = "0.18"

[features]
full = ["integration_test"]
default = ["otlp", "jaeger"]
default = ["otlp", "jaeger", "axum"]
jaeger = ["dep:opentelemetry-jaeger", "tracer"]
otlp = ["opentelemetry-otlp/http-proto", "tracer"]
tracer = ["dep:opentelemetry-semantic-conventions"]
integration_test = ["dep:serde", "dep:serde_json", "dep:opentelemetry_api", "dep:rand", "dep:hyper"]
axum = ["dep:axum", "dep:tower"]
axum = ["dep:axum", "dep:tower", "dep:futures-util", "dep:pin-project-lite", "dep:tracing-opentelemetry-instrumentation-sdk"]

[profile.dev]
lto = false
Expand Down
10 changes: 8 additions & 2 deletions src/http.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use opentelemetry::Context;
// Originally retired from davidB/tracing-opentelemetry-instrumentation-sdk
// which is licensed under CC0 1.0 Universal
// https://github.com/davidB/tracing-opentelemetry-instrumentation-sdk/blob/d3609ac2cc699d3a24fbf89754053cc8e938e3bf/LICENSE

use opentelemetry::propagation::{Extractor, Injector};
use opentelemetry::Context;

// copy from crate opentelemetry-http (to not be dependants of on 3rd: http, ...)
pub struct HeaderInjector<'a>(pub &'a mut http::HeaderMap);
Expand Down Expand Up @@ -43,5 +47,7 @@ pub fn inject_context(context: &Context, headers: &mut http::HeaderMap) {
#[must_use]
pub fn extract_context(headers: &http::HeaderMap) -> Context {
let extractor = HeaderExtractor(headers);
opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&extractor))
opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.extract(&extractor)
})
}
6 changes: 5 additions & 1 deletion src/jaeger.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// Originally retired from davidB/tracing-opentelemetry-instrumentation-sdk
// which is licensed under CC0 1.0 Universal
// https://github.com/davidB/tracing-opentelemetry-instrumentation-sdk/blob/d3609ac2cc699d3a24fbf89754053cc8e938e3bf/LICENSE

use opentelemetry::trace::TraceError;
use opentelemetry_jaeger::config::agent::AgentPipeline;
use opentelemetry_sdk::{
Expand Down Expand Up @@ -30,4 +34,4 @@ where
);
pipeline = transform(pipeline);
pipeline.install_batch(opentelemetry_sdk::runtime::Tokio)
}
}
60 changes: 33 additions & 27 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@
use std::cmp::max;

use opentelemetry_sdk::{
resource::{OsResourceDetector, ResourceDetector},
propagation::{
BaggagePropagator, TextMapCompositePropagator, TraceContextPropagator,
},
resource::{OsResourceDetector, ResourceDetector},
Resource,
};

use opentelemetry::{propagation::TextMapPropagator, trace::TraceError};
use tracing_subscriber::{
filter::LevelFilter,
fmt::{format::FmtSpan, writer::MakeWriterExt},
layer::SubscriberExt,
};
use opentelemetry::{propagation::TextMapPropagator, trace::TraceError};

pub mod middleware;
pub mod propagation;
Expand Down Expand Up @@ -47,7 +47,10 @@ impl DetectResource {
/// `service.name` is first extracted from environment variables
/// (in this order) `SERVICE_VERSION`, `APP_VERSION`.
/// But a default value can be provided with this method.
pub fn new(fallback_service_name: &'static str, fallback_service_version: &'static str) -> Self {
pub fn new(
fallback_service_name: &'static str,
fallback_service_version: &'static str,
) -> Self {
DetectResource {
fallback_service_name,
fallback_service_version,
Expand Down Expand Up @@ -90,17 +93,17 @@ impl ResourceDetector for ServiceInfoDetector {
.or_else(|_| std::env::var("SERVICE_NAME"))
.or_else(|_| std::env::var("APP_NAME"))
.ok()
.or_else(|| {
Some(self.fallback_service_name.to_string())
})
.map(|v| opentelemetry_semantic_conventions::resource::SERVICE_NAME.string(v));
.or_else(|| Some(self.fallback_service_name.to_string()))
.map(|v| {
opentelemetry_semantic_conventions::resource::SERVICE_NAME.string(v)
});
let service_version = std::env::var("SERVICE_VERSION")
.or_else(|_| std::env::var("APP_VERSION"))
.ok()
.or_else(|| {
Some(self.fallback_service_version.to_string())
})
.map(|v| opentelemetry_semantic_conventions::resource::SERVICE_VERSION.string(v));
.or_else(|| Some(self.fallback_service_version.to_string()))
.map(|v| {
opentelemetry_semantic_conventions::resource::SERVICE_VERSION.string(v)
});
Resource::new(vec![service_name, service_version].into_iter().flatten())
}
}
Expand All @@ -110,12 +113,14 @@ pub fn init_tracing_with_fallbacks(
fallback_service_name: &'static str,
fallback_service_version: &'static str,
) {
let otel_rsrc = DetectResource::new(fallback_service_name, fallback_service_version)
.build();
let otel_rsrc =
DetectResource::new(fallback_service_name, fallback_service_version).build();
let otel_tracer =
otlp::init_tracer(otel_rsrc, otlp::identity).expect("setup of Tracer");
init_propagator().expect("setup of propagator");
let otel_layer = tracing_opentelemetry::layer().with_error_records_to_exceptions(true).with_tracer(otel_tracer);
let otel_layer = tracing_opentelemetry::layer()
.with_error_records_to_exceptions(true)
.with_tracer(otel_tracer);

opentelemetry::global::set_text_map_propagator(
propagation::TextMapSplitPropagator::default(),
Expand Down Expand Up @@ -170,18 +175,19 @@ pub fn shutdown_signal() {
///
/// Will return `TraceError` if issue in reading or instanciate propagator.
pub fn init_propagator() -> Result<(), TraceError> {
let value_from_env =
std::env::var("OTEL_PROPAGATORS").unwrap_or_else(|_| "tracecontext,baggage".to_string());
let propagators: Vec<(Box<dyn TextMapPropagator + Send + Sync>, String)> = value_from_env
.split(',')
.map(|s| {
let name = s.trim().to_lowercase();
propagator_from_string(&name).map(|o| o.map(|b| (b, name)))
})
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.flatten()
.collect();
let value_from_env = std::env::var("OTEL_PROPAGATORS")
.unwrap_or_else(|_| "tracecontext,baggage".to_string());
let propagators: Vec<(Box<dyn TextMapPropagator + Send + Sync>, String)> =
value_from_env
.split(',')
.map(|s| {
let name = s.trim().to_lowercase();
propagator_from_string(&name).map(|o| o.map(|b| (b, name)))
})
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.flatten()
.collect();
if !propagators.is_empty() {
let (propagators_impl, propagators_name): (Vec<_>, Vec<_>) =
propagators.into_iter().unzip();
Expand Down Expand Up @@ -255,4 +261,4 @@ mod tests {
// dbg!(std::env::var("OTEL_PROPAGATORS"));
// let_assert!(Err(_) = init_tracing());
}
}
}
144 changes: 73 additions & 71 deletions src/middleware/axum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use tower::{Layer, Service};
use tracing::Span;
use tracing_opentelemetry_instrumentation_sdk::http as otel_http;

pub type Filter = fn(&str) -> bool;

#[derive(Default, Debug, Clone)]
pub struct OtelAxumLayer {
filter: Option<Filter>,
Expand Down Expand Up @@ -134,74 +136,74 @@ fn http_route<B>(req: &Request<B>) -> &str {
.map_or_else(|| "", |mp| mp.as_str())
}

#[cfg(test)]
mod tests {
use super::*;
use axum::{body::Body, routing::get, Router};
use http::{Request, StatusCode};
use rstest::rstest;
use testing_tracing_opentelemetry::{assert_trace, FakeEnvironment};
use tower::Service;

#[rstest]
#[case("filled_http_route_for_existing_route", "http://example.com/users/123", &[], false)]
#[case("empty_http_route_for_nonexisting_route", "/idontexist/123", &[], false)]
#[case("status_code_on_close_for_ok", "/users/123", &[], false)]
#[case("status_code_on_close_for_error", "/status/500", &[], false)]
#[case("filled_http_headers", "/users/123", &[("user-agent", "tests"), ("x-forwarded-for", "127.0.0.1")], false)]
#[case("call_with_w3c_trace", "/users/123", &[("traceparent", "00-b2611246a58fd7ea623d2264c5a1e226-b2c9b811f2f424af-01")], true)]
#[case("trace_id_in_child_span", "/with_child_span", &[], false)]
#[case("trace_id_in_child_span_for_remote", "/with_child_span", &[("traceparent", "00-b2611246a58fd7ea623d2264c5a1e226-b2c9b811f2f424af-01")], true)]
// failed to extract "http.route" before axum-0.6.15
// - https://github.com/davidB/axum-tracing-opentelemetry/pull/54 (reverted)
// - https://github.com/tokio-rs/axum/issues/1441#issuecomment-1272158039
#[case("extract_route_from_nested", "/nest/123", &[], false)]
#[tokio::test(flavor = "multi_thread")]
async fn check_span_event(
#[case] name: &str,
#[case] uri: &str,
#[case] headers: &[(&str, &str)],
#[case] is_trace_id_constant: bool,
) {
let fake_env = FakeEnvironment::setup().await;
{
let mut svc = Router::new()
.route("/users/:id", get(|| async { StatusCode::OK }))
.route(
"/status/500",
get(|| async { StatusCode::INTERNAL_SERVER_ERROR }),
)
.route(
"/with_child_span",
get(|| async {
let span = tracing::span!(tracing::Level::INFO, "my child span");
span.in_scope(|| {
// Any trace events in this closure or code called by it will occur within
// the span.
});
StatusCode::OK
}),
)
.nest(
"/nest",
Router::new()
.route("/:nest_id", get(|| async {}))
.fallback(|| async { (StatusCode::NOT_FOUND, "inner fallback") }),
)
.fallback(|| async { (StatusCode::NOT_FOUND, "outer fallback") })
.layer(opentelemetry_tracing_layer());
let mut builder = Request::builder();
for (key, value) in headers {
builder = builder.header(*key, *value);
}
let req = builder.uri(uri).body(Body::empty()).unwrap();
let _res = svc.call(req).await.unwrap();

// while res.data().await.is_some() {}
// res.trailers().await.unwrap();
// drop(res);
}
let (tracing_events, otel_spans) = fake_env.collect_traces().await;
assert_trace(name, tracing_events, otel_spans, is_trace_id_constant);
}
}
// #[cfg(test)]
// mod tests {
// use super::*;
// use axum::{body::Body, routing::get, Router};
// use http::{Request, StatusCode};
// use rstest::rstest;
// use testing_tracing_opentelemetry::{assert_trace, FakeEnvironment};
// use tower::Service;

// #[rstest]
// #[case("filled_http_route_for_existing_route", "http://example.com/users/123", &[], false)]
// #[case("empty_http_route_for_nonexisting_route", "/idontexist/123", &[], false)]
// #[case("status_code_on_close_for_ok", "/users/123", &[], false)]
// #[case("status_code_on_close_for_error", "/status/500", &[], false)]
// #[case("filled_http_headers", "/users/123", &[("user-agent", "tests"), ("x-forwarded-for", "127.0.0.1")], false)]
// #[case("call_with_w3c_trace", "/users/123", &[("traceparent", "00-b2611246a58fd7ea623d2264c5a1e226-b2c9b811f2f424af-01")], true)]
// #[case("trace_id_in_child_span", "/with_child_span", &[], false)]
// #[case("trace_id_in_child_span_for_remote", "/with_child_span", &[("traceparent", "00-b2611246a58fd7ea623d2264c5a1e226-b2c9b811f2f424af-01")], true)]
// // failed to extract "http.route" before axum-0.6.15
// // - https://github.com/davidB/axum-tracing-opentelemetry/pull/54 (reverted)
// // - https://github.com/tokio-rs/axum/issues/1441#issuecomment-1272158039
// #[case("extract_route_from_nested", "/nest/123", &[], false)]
// #[tokio::test(flavor = "multi_thread")]
// async fn check_span_event(
// #[case] name: &str,
// #[case] uri: &str,
// #[case] headers: &[(&str, &str)],
// #[case] is_trace_id_constant: bool,
// ) {
// let fake_env = FakeEnvironment::setup().await;
// {
// let mut svc = Router::new()
// .route("/users/:id", get(|| async { StatusCode::OK }))
// .route(
// "/status/500",
// get(|| async { StatusCode::INTERNAL_SERVER_ERROR }),
// )
// .route(
// "/with_child_span",
// get(|| async {
// let span = tracing::span!(tracing::Level::INFO, "my child span");
// span.in_scope(|| {
// // Any trace events in this closure or code called by it will occur within
// // the span.
// });
// StatusCode::OK
// }),
// )
// .nest(
// "/nest",
// Router::new()
// .route("/:nest_id", get(|| async {}))
// .fallback(|| async { (StatusCode::NOT_FOUND, "inner fallback") }),
// )
// .fallback(|| async { (StatusCode::NOT_FOUND, "outer fallback") })
// .layer(OtelAxumLayer::default());
// let mut builder = Request::builder();
// for (key, value) in headers {
// builder = builder.header(*key, *value);
// }
// let req = builder.uri(uri).body(Body::empty()).unwrap();
// let _res = svc.call(req).await.unwrap();

// // while res.data().await.is_some() {}
// // res.trailers().await.unwrap();
// // drop(res);
// }
// let (tracing_events, otel_spans) = fake_env.collect_traces().await;
// assert_trace(name, tracing_events, otel_spans, is_trace_id_constant);
// }
// }
2 changes: 1 addition & 1 deletion src/middleware/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
#[cfg(feature = "axum")]
pub mod axum;
pub mod axum;
15 changes: 9 additions & 6 deletions src/otlp.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
// Retired from https://github.com/davidB/tracing-opentelemetry-instrumentation-sdk/blob/d3609ac2cc699d3a24fbf89754053cc8e938e3bf/init-tracing-opentelemetry/src/otlp.rs#L75
// Originally retired from davidB/tracing-opentelemetry-instrumentation-sdk
// which is licensed under CC0 1.0 Universal
// https://github.com/davidB/tracing-opentelemetry-instrumentation-sdk/blob/d3609ac2cc699d3a24fbf89754053cc8e938e3bf/LICENSE

use std::{collections::HashMap, str::FromStr};

Expand Down Expand Up @@ -104,9 +106,9 @@ fn read_sampler_from_env() -> Sampler {
"traceidratio" => Sampler::TraceIdRatioBased(read_sampler_arg_from_env(1f64)),
"parentbased_always_on" => Sampler::ParentBased(Box::new(Sampler::AlwaysOn)),
"parentbased_always_off" => Sampler::ParentBased(Box::new(Sampler::AlwaysOff)),
"parentbased_traceidratio" => Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(
read_sampler_arg_from_env(1f64),
))),
"parentbased_traceidratio" => Sampler::ParentBased(Box::new(
Sampler::TraceIdRatioBased(read_sampler_arg_from_env(1f64)),
)),
"jaeger_remote" => todo!("unsupported: OTEL_TRACES_SAMPLER='jaeger_remote'"),
"xray" => todo!("unsupported: OTEL_TRACES_SAMPLER='xray'"),
_ => {
Expand Down Expand Up @@ -166,7 +168,8 @@ mod tests {
#[case(None, None, "http/protobuf", "http://localhost:4318")] //Devskim: ignore DS137138
#[case(Some("http/protobuf"), None, "http/protobuf", "http://localhost:4318")] //Devskim: ignore DS137138
#[case(Some("grpc"), None, "grpc", "http://localhost:4317")] //Devskim: ignore DS137138
#[case(None, Some("http://localhost:4317"), "grpc", "http://localhost:4317")] //Devskim: ignore DS137138
#[case(None, Some("http://localhost:4317"), "grpc", "http://localhost:4317")]
//Devskim: ignore DS137138
// #[cfg_attr(
// feature = "tls",
// case(
Expand Down Expand Up @@ -214,4 +217,4 @@ mod tests {
== (expected_protocol.to_string(), expected_endpoint.to_string())
);
}
}
}
Loading

0 comments on commit a82ff42

Please sign in to comment.