diff --git a/opentelemetry-http/Cargo.toml b/opentelemetry-http/Cargo.toml index ff1d5f0a72..95ba0613bc 100644 --- a/opentelemetry-http/Cargo.toml +++ b/opentelemetry-http/Cargo.toml @@ -20,7 +20,7 @@ bytes = { workspace = true } http = { workspace = true } http-body-util = { workspace = true, optional = true } hyper = { workspace = true, optional = true } -hyper-util = { workspace = true, features = ["client-legacy", "http2"], optional = true } +hyper-util = { workspace = true, features = ["client-legacy", "http1", "http2"], optional = true } opentelemetry = { version = "0.26", path = "../opentelemetry", features = ["trace"] } reqwest = { workspace = true, features = ["blocking"], optional = true } tokio = { workspace = true, features = ["time"], optional = true } diff --git a/opentelemetry-http/src/lib.rs b/opentelemetry-http/src/lib.rs index bed95cd389..094fdb4664 100644 --- a/opentelemetry-http/src/lib.rs +++ b/opentelemetry-http/src/lib.rs @@ -108,7 +108,10 @@ pub mod hyper { use http::HeaderValue; use http_body_util::{BodyExt, Full}; use hyper::body::{Body as HttpBody, Frame}; - use hyper_util::client::legacy::{connect::Connect, Client}; + use hyper_util::client::legacy::{ + connect::{Connect, HttpConnector}, + Client, + }; use std::fmt::Debug; use std::pin::Pin; use std::task::{self, Poll}; @@ -116,39 +119,42 @@ pub mod hyper { use tokio::time; #[derive(Debug, Clone)] - pub struct HyperClient { + pub struct HyperClient + where + C: Connect + Clone + Send + Sync + 'static, + { inner: Client, timeout: Duration, authorization: Option, } - impl HyperClient { - pub fn new_with_timeout(inner: Client, timeout: Duration) -> Self { + impl HyperClient + where + C: Connect + Clone + Send + Sync + 'static, + { + pub fn new(connector: C, timeout: Duration, authorization: Option) -> Self { + // TODO - support custom executor + let inner = Client::builder(hyper_util::rt::TokioExecutor::new()).build(connector); Self { inner, timeout, - authorization: None, + authorization, } } + } - pub fn new_with_timeout_and_authorization_header( - inner: Client, + impl HyperClient { + /// Creates a new `HyperClient` with a default `HttpConnector`. + pub fn with_default_connector( timeout: Duration, - authorization: HeaderValue, + authorization: Option, ) -> Self { - Self { - inner, - timeout, - authorization: Some(authorization), - } + Self::new(HttpConnector::new(), timeout, authorization) } } #[async_trait] - impl HttpClient for HyperClient - where - C: Connect + Send + Sync + Clone + Debug + 'static, - { + impl HttpClient for HyperClient { async fn send(&self, request: Request>) -> Result, HttpError> { let (parts, body) = request.into_parts(); let mut request = Request::from_parts(parts, Body(Full::from(body))); diff --git a/opentelemetry-otlp/CHANGELOG.md b/opentelemetry-otlp/CHANGELOG.md index fce91cbb61..02d6b145cc 100644 --- a/opentelemetry-otlp/CHANGELOG.md +++ b/opentelemetry-otlp/CHANGELOG.md @@ -53,6 +53,10 @@ Released 2024-Sep-30 - `MetricsExporter` -> `MetricExporter` - `MetricsExporterBuilder` -> `MetricExporterBuilder` + - [#2263](https://github.com/open-telemetry/opentelemetry-rust/pull/2263) + Support `hyper` client for opentelemetry-otlp. This can be enabled using flag `hyper-client`. + Refer example: https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry-otlp/examples/basic-otlp-http + ## v0.25.0 - Update `opentelemetry` dependency version to 0.25 diff --git a/opentelemetry-otlp/Cargo.toml b/opentelemetry-otlp/Cargo.toml index 272481053e..9dcf9547c7 100644 --- a/opentelemetry-otlp/Cargo.toml +++ b/opentelemetry-otlp/Cargo.toml @@ -80,6 +80,7 @@ reqwest-blocking-client = ["reqwest/blocking", "opentelemetry-http/reqwest"] reqwest-client = ["reqwest", "opentelemetry-http/reqwest"] reqwest-rustls = ["reqwest", "opentelemetry-http/reqwest-rustls"] reqwest-rustls-webpki-roots = ["reqwest", "opentelemetry-http/reqwest-rustls-webpki-roots"] +hyper-client = ["opentelemetry-http/hyper"] # test integration-testing = ["tonic", "prost", "tokio/full", "trace"] diff --git a/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml b/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml index ccbe22e960..89df249f2d 100644 --- a/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml +++ b/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml @@ -8,15 +8,15 @@ publish = false [features] default = ["reqwest"] reqwest = ["opentelemetry-otlp/reqwest-client"] -hyper = ["dep:async-trait", "dep:http", "dep:http-body-util", "dep:hyper", "dep:hyper-util", "dep:opentelemetry-http", "dep:bytes"] +hyper = ["opentelemetry-otlp/hyper-client"] [dependencies] once_cell = { workspace = true } opentelemetry = { path = "../../../opentelemetry" } -opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "metrics", "logs"] } -opentelemetry-http = { path = "../../../opentelemetry-http", optional = true } -opentelemetry-otlp = { path = "../..", features = ["http-proto", "http-json", "reqwest-client", "logs"] } +opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "metrics", "logs"]} +opentelemetry-http = { path = "../../../opentelemetry-http", optional = true, default-features = false} +opentelemetry-otlp = { path = "../..", features = ["http-proto", "http-json", "logs"] , default-features = false} opentelemetry-appender-tracing = { path = "../../../opentelemetry-appender-tracing", default-features = false} opentelemetry-semantic-conventions = { path = "../../../opentelemetry-semantic-conventions" } @@ -24,8 +24,6 @@ async-trait = { workspace = true, optional = true } bytes = { workspace = true, optional = true } http = { workspace = true, optional = true } http-body-util = { workspace = true, optional = true } -hyper = { workspace = true, features = ["client"], optional = true } -hyper-util = { workspace = true, features = ["client-legacy"], optional = true } tokio = { workspace = true, features = ["full"] } tracing = { workspace = true, features = ["std"]} tracing-core = { workspace = true } diff --git a/opentelemetry-otlp/examples/basic-otlp-http/src/hyper.rs b/opentelemetry-otlp/examples/basic-otlp-http/src/hyper.rs deleted file mode 100644 index 80a28ae62d..0000000000 --- a/opentelemetry-otlp/examples/basic-otlp-http/src/hyper.rs +++ /dev/null @@ -1,49 +0,0 @@ -use async_trait::async_trait; -use bytes::Bytes; -use http::{Request, Response}; -use http_body_util::{BodyExt, Full}; -use hyper_util::{ - client::legacy::{ - connect::{Connect, HttpConnector}, - Client, - }, - rt::TokioExecutor, -}; -use opentelemetry_http::{HttpClient, HttpError, ResponseExt}; - -pub struct HyperClient { - inner: hyper_util::client::legacy::Client>, -} - -impl Default for HyperClient { - fn default() -> Self { - Self { - inner: Client::builder(TokioExecutor::new()).build_http(), - } - } -} - -impl std::fmt::Debug for HyperClient { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("HyperClient") - .field("inner", &self.inner) - .finish() - } -} - -#[async_trait] -impl HttpClient for HyperClient { - async fn send(&self, request: Request>) -> Result, HttpError> { - let request = request.map(|body| Full::new(Bytes::from(body))); - - let (parts, body) = self - .inner - .request(request) - .await? - .error_for_status()? - .into_parts(); - let body = body.collect().await?.to_bytes(); - - Ok(Response::from_parts(parts, body)) - } -} diff --git a/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs b/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs index 1ff36c7549..c67d6b21c5 100644 --- a/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs +++ b/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs @@ -1,3 +1,4 @@ +/// To use hyper as the HTTP client - cargo run --features="hyper" --no-default-features use once_cell::sync::Lazy; use opentelemetry::{ global, @@ -23,12 +24,6 @@ use tracing::info; use tracing_subscriber::prelude::*; use tracing_subscriber::EnvFilter; -#[cfg(feature = "hyper")] -use opentelemetry_otlp::WithHttpConfig; - -#[cfg(feature = "hyper")] -mod hyper; - static RESOURCE: Lazy = Lazy::new(|| { Resource::new(vec![KeyValue::new( opentelemetry_semantic_conventions::resource::SERVICE_NAME, @@ -37,15 +32,11 @@ static RESOURCE: Lazy = Lazy::new(|| { }); fn init_logs() -> Result { - let exporter_builder = LogExporter::builder() + let exporter = LogExporter::builder() .with_http() .with_endpoint("http://localhost:4318/v1/logs") - .with_protocol(Protocol::HttpBinary); - - #[cfg(feature = "hyper")] - let exporter_builder = exporter_builder.with_http_client(hyper::HyperClient::default()); - - let exporter = exporter_builder.build()?; + .with_protocol(Protocol::HttpBinary) + .build()?; Ok(LoggerProvider::builder() .with_batch_exporter(exporter, runtime::Tokio) @@ -59,6 +50,7 @@ fn init_tracer_provider() -> Result { .with_protocol(Protocol::HttpBinary) //can be changed to `Protocol::HttpJson` to export in JSON format .with_endpoint("http://localhost:4318/v1/traces") .build()?; + Ok(TracerProvider::builder() .with_batch_exporter(exporter, runtime::Tokio) .with_config(Config::default().with_resource(RESOURCE.clone())) diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index 74ae46817d..154871887b 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -33,12 +33,20 @@ mod logs; #[cfg(feature = "trace")] mod trace; +#[cfg(all( + not(feature = "reqwest-client"), + not(feature = "reqwest-blocking-client"), + feature = "hyper-client" +))] +use opentelemetry_http::hyper::HyperClient; + /// Configuration of the http transport #[derive(Debug)] #[cfg_attr( all( not(feature = "reqwest-client"), - not(feature = "reqwest-blocking-client") + not(feature = "reqwest-blocking-client"), + not(feature = "hyper-client") ), derive(Default) )] @@ -50,19 +58,36 @@ pub struct HttpConfig { headers: Option>, } -#[cfg(any(feature = "reqwest-blocking-client", feature = "reqwest-client",))] +#[cfg(any( + feature = "reqwest-blocking-client", + feature = "reqwest-client", + feature = "hyper-client" +))] impl Default for HttpConfig { fn default() -> Self { + #[cfg(feature = "reqwest-blocking-client")] + let default_client = + Some(Arc::new(reqwest::blocking::Client::new()) as Arc); + #[cfg(all(not(feature = "reqwest-blocking-client"), feature = "reqwest-client"))] + let default_client = Some(Arc::new(reqwest::Client::new()) as Arc); + #[cfg(all( + not(feature = "reqwest-client"), + not(feature = "reqwest-blocking-client"), + feature = "hyper-client" + ))] + // TODO - support configuring custom connector and executor + let default_client = Some(Arc::new(HyperClient::with_default_connector( + Duration::from_secs(10), + None, + )) as Arc); + #[cfg(all( + not(feature = "reqwest-client"), + not(feature = "reqwest-blocking-client"), + not(feature = "hyper-client") + ))] + let default_client = None; HttpConfig { - #[cfg(feature = "reqwest-blocking-client")] - client: Some(Arc::new(reqwest::blocking::Client::new())), - #[cfg(all(not(feature = "reqwest-blocking-client"), feature = "reqwest-client"))] - client: Some(Arc::new(reqwest::Client::new())), - #[cfg(all( - not(feature = "reqwest-client"), - not(feature = "reqwest-blocking-client") - ))] - client: None, + client: default_client, headers: None, } } @@ -140,13 +165,11 @@ impl HttpExporterBuilder { }, None => self.exporter_config.timeout, }; - let http_client = self .http_config .client .take() .ok_or(crate::Error::NoHttpClient)?; - #[allow(clippy::mutable_key_type)] // http headers are not mutated let mut headers: HashMap = self .http_config