Skip to content

Commit

Permalink
enable hyper for otel-otlp
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb committed Oct 31, 2024
1 parent a5e2061 commit ffe768d
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 99 deletions.
2 changes: 1 addition & 1 deletion opentelemetry-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ bytes = { workspace = true }
http = { workspace = true }
http-body-util = { workspace = true, optional = true }
hyper = { workspace = true, optional = true }
hyper-util = { workspace = true, features = ["client-legacy", "http2"], optional = true }
hyper-util = { workspace = true, features = ["client-legacy", "http1", "http2"], optional = true }
opentelemetry = { version = "0.26", path = "../opentelemetry", features = ["trace"] }
reqwest = { workspace = true, features = ["blocking"], optional = true }
tokio = { workspace = true, features = ["time"], optional = true }
40 changes: 23 additions & 17 deletions opentelemetry-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,47 +108,53 @@ pub mod hyper {
use http::HeaderValue;
use http_body_util::{BodyExt, Full};
use hyper::body::{Body as HttpBody, Frame};
use hyper_util::client::legacy::{connect::Connect, Client};
use hyper_util::client::legacy::{
connect::{Connect, HttpConnector},
Client,
};
use std::fmt::Debug;
use std::pin::Pin;
use std::task::{self, Poll};
use std::time::Duration;
use tokio::time;

#[derive(Debug, Clone)]
pub struct HyperClient<C> {
pub struct HyperClient<C = HttpConnector>
where
C: Connect + Clone + Send + Sync + 'static,
{
inner: Client<C, Body>,
timeout: Duration,
authorization: Option<HeaderValue>,
}

impl<C> HyperClient<C> {
pub fn new_with_timeout(inner: Client<C, Body>, timeout: Duration) -> Self {
impl<C> HyperClient<C>
where
C: Connect + Clone + Send + Sync + 'static,
{
pub fn new(connector: C, timeout: Duration, authorization: Option<HeaderValue>) -> Self {
// TODO - support custom executor
let inner = Client::builder(hyper_util::rt::TokioExecutor::new()).build(connector);

Check warning on line 137 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L135-L137

Added lines #L135 - L137 were not covered by tests
Self {
inner,
timeout,
authorization: None,
authorization,

Check warning on line 141 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L141

Added line #L141 was not covered by tests
}
}
}

pub fn new_with_timeout_and_authorization_header(
inner: Client<C, Body>,
impl HyperClient<HttpConnector> {
/// Creates a new `HyperClient` with a default `HttpConnector`.
pub fn with_default_connector(

Check warning on line 148 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L148

Added line #L148 was not covered by tests
timeout: Duration,
authorization: HeaderValue,
authorization: Option<HeaderValue>,

Check warning on line 150 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L150

Added line #L150 was not covered by tests
) -> Self {
Self {
inner,
timeout,
authorization: Some(authorization),
}
Self::new(HttpConnector::new(), timeout, authorization)

Check warning on line 152 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L152

Added line #L152 was not covered by tests
}
}

#[async_trait]
impl<C> HttpClient for HyperClient<C>
where
C: Connect + Send + Sync + Clone + Debug + 'static,
{
impl HttpClient for HyperClient {
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
let (parts, body) = request.into_parts();
let mut request = Request::from_parts(parts, Body(Full::from(body)));
Expand Down
1 change: 1 addition & 0 deletions opentelemetry-otlp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ reqwest-blocking-client = ["reqwest/blocking", "opentelemetry-http/reqwest"]
reqwest-client = ["reqwest", "opentelemetry-http/reqwest"]
reqwest-rustls = ["reqwest", "opentelemetry-http/reqwest-rustls"]
reqwest-rustls-webpki-roots = ["reqwest", "opentelemetry-http/reqwest-rustls-webpki-roots"]
hyper-client = ["opentelemetry-http/hyper"]

# test
integration-testing = ["tonic", "prost", "tokio/full", "trace"]
10 changes: 4 additions & 6 deletions opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,22 @@ publish = false
[features]
default = ["reqwest"]
reqwest = ["opentelemetry-otlp/reqwest-client"]
hyper = ["dep:async-trait", "dep:http", "dep:http-body-util", "dep:hyper", "dep:hyper-util", "dep:opentelemetry-http", "dep:bytes"]
hyper = ["opentelemetry-otlp/hyper-client"]


[dependencies]
once_cell = { workspace = true }
opentelemetry = { path = "../../../opentelemetry" }
opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "metrics", "logs"] }
opentelemetry-http = { path = "../../../opentelemetry-http", optional = true }
opentelemetry-otlp = { path = "../..", features = ["http-proto", "http-json", "reqwest-client", "logs"] }
opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "metrics", "logs"]}
opentelemetry-http = { path = "../../../opentelemetry-http", optional = true, default-features = false}
opentelemetry-otlp = { path = "../..", features = ["http-proto", "http-json", "logs"] , default-features = false}
opentelemetry-appender-tracing = { path = "../../../opentelemetry-appender-tracing", default-features = false}
opentelemetry-semantic-conventions = { path = "../../../opentelemetry-semantic-conventions" }

async-trait = { workspace = true, optional = true }
bytes = { workspace = true, optional = true }
http = { workspace = true, optional = true }
http-body-util = { workspace = true, optional = true }
hyper = { workspace = true, features = ["client"], optional = true }
hyper-util = { workspace = true, features = ["client-legacy"], optional = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true, features = ["std"]}
tracing-core = { workspace = true }
Expand Down
49 changes: 0 additions & 49 deletions opentelemetry-otlp/examples/basic-otlp-http/src/hyper.rs

This file was deleted.

18 changes: 5 additions & 13 deletions opentelemetry-otlp/examples/basic-otlp-http/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/// To use hyper as the HTTP client - cargo run --features="hyper" --no-default-features
use once_cell::sync::Lazy;
use opentelemetry::{
global,
Expand All @@ -23,12 +24,6 @@ use tracing::info;
use tracing_subscriber::prelude::*;
use tracing_subscriber::EnvFilter;

#[cfg(feature = "hyper")]
use opentelemetry_otlp::WithHttpConfig;

#[cfg(feature = "hyper")]
mod hyper;

static RESOURCE: Lazy<Resource> = Lazy::new(|| {
Resource::new(vec![KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
Expand All @@ -37,15 +32,11 @@ static RESOURCE: Lazy<Resource> = Lazy::new(|| {
});

fn init_logs() -> Result<sdklogs::LoggerProvider, opentelemetry::logs::LogError> {
let exporter_builder = LogExporter::builder()
let exporter = LogExporter::builder()
.with_http()
.with_endpoint("http://localhost:4318/v1/logs")
.with_protocol(Protocol::HttpBinary);

#[cfg(feature = "hyper")]
let exporter_builder = exporter_builder.with_http_client(hyper::HyperClient::default());

let exporter = exporter_builder.build()?;
.with_protocol(Protocol::HttpBinary)
.build()?;

Ok(LoggerProvider::builder()
.with_batch_exporter(exporter, runtime::Tokio)
Expand All @@ -59,6 +50,7 @@ fn init_tracer_provider() -> Result<sdktrace::TracerProvider, TraceError> {
.with_protocol(Protocol::HttpBinary) //can be changed to `Protocol::HttpJson` to export in JSON format
.with_endpoint("http://localhost:4318/v1/traces")
.build()?;

Ok(TracerProvider::builder()
.with_batch_exporter(exporter, runtime::Tokio)
.with_config(Config::default().with_resource(RESOURCE.clone()))
Expand Down
49 changes: 36 additions & 13 deletions opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,20 @@ mod logs;
#[cfg(feature = "trace")]
mod trace;

#[cfg(all(
not(feature = "reqwest-client"),
not(feature = "reqwest-blocking-client"),
feature = "hyper-client"
))]
use opentelemetry_http::hyper::HyperClient;

/// Configuration of the http transport
#[derive(Debug)]
#[cfg_attr(
all(
not(feature = "reqwest-client"),
not(feature = "reqwest-blocking-client")
not(feature = "reqwest-blocking-client"),
not(feature = "hyper-client")
),
derive(Default)
)]
Expand All @@ -50,19 +58,36 @@ pub struct HttpConfig {
headers: Option<HashMap<String, String>>,
}

#[cfg(any(feature = "reqwest-blocking-client", feature = "reqwest-client",))]
#[cfg(any(
feature = "reqwest-blocking-client",
feature = "reqwest-client",
feature = "hyper-client"
))]
impl Default for HttpConfig {
fn default() -> Self {
#[cfg(feature = "reqwest-blocking-client")]
let default_client =
Some(Arc::new(reqwest::blocking::Client::new()) as Arc<dyn HttpClient>);
#[cfg(all(not(feature = "reqwest-blocking-client"), feature = "reqwest-client"))]
let default_client = Some(Arc::new(reqwest::Client::new()) as Arc<dyn HttpClient>);
#[cfg(all(
not(feature = "reqwest-client"),
not(feature = "reqwest-blocking-client"),
feature = "hyper-client"
))]
// TODO - support configuring custom connector and executor
let default_client = Some(Arc::new(HyperClient::with_default_connector(
Duration::from_secs(10),
None,
)) as Arc<dyn HttpClient>);
#[cfg(all(
not(feature = "reqwest-client"),
not(feature = "reqwest-blocking-client"),
not(feature = "hyper-client")
))]
let default_client = None;
HttpConfig {
#[cfg(feature = "reqwest-blocking-client")]
client: Some(Arc::new(reqwest::blocking::Client::new())),
#[cfg(all(not(feature = "reqwest-blocking-client"), feature = "reqwest-client"))]
client: Some(Arc::new(reqwest::Client::new())),
#[cfg(all(
not(feature = "reqwest-client"),
not(feature = "reqwest-blocking-client")
))]
client: None,
client: default_client,
headers: None,
}
}
Expand Down Expand Up @@ -140,13 +165,11 @@ impl HttpExporterBuilder {
},
None => self.exporter_config.timeout,
};

let http_client = self
.http_config
.client
.take()
.ok_or(crate::Error::NoHttpClient)?;

#[allow(clippy::mutable_key_type)] // http headers are not mutated
let mut headers: HashMap<HeaderName, HeaderValue> = self
.http_config
Expand Down

0 comments on commit ffe768d

Please sign in to comment.