Skip to content

Commit

Permalink
Merge branch 'main' into move-errors-to-sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb authored Nov 1, 2024
2 parents 9cf5898 + 3742953 commit a047fad
Show file tree
Hide file tree
Showing 18 changed files with 323 additions and 240 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ jobs:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@nightly
with:
toolchain: nightly-2024-05-01
toolchain: nightly-2024-06-30
components: rustfmt
- name: external-type-check
run: |
cargo install cargo-check-external-types
cargo install cargo-check-external-types@0.1.13
cd ${{ matrix.example }}
cargo check-external-types --config allowed-external-types.toml
non-default-examples:
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ hyper-util = "0.1"
log = "0.4.21"
once_cell = "1.13"
ordered-float = "4.0"
pin-project-lite = "=0.2.14" # 0.2.15 is failing for cargo-check-external-types
pin-project-lite = "0.2"
prost = "0.13"
prost-build = "0.13"
prost-types = "0.13"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ observability tools.

*OpenTelemetry Rust is not introducing a new end user callable Logging API.
Instead, it provides [Logs Bridge
API](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/bridge-api.md),
API](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/api.md),
that allows one to write log appenders that can bridge existing logging
libraries to the OpenTelemetry log data model. The following log appenders are
available:
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ bytes = { workspace = true }
http = { workspace = true }
http-body-util = { workspace = true, optional = true }
hyper = { workspace = true, optional = true }
hyper-util = { workspace = true, features = ["client-legacy", "http2"], optional = true }
hyper-util = { workspace = true, features = ["client-legacy", "http1", "http2"], optional = true }
opentelemetry = { version = "0.26", path = "../opentelemetry", features = ["trace"] }
reqwest = { workspace = true, features = ["blocking"], optional = true }
tokio = { workspace = true, features = ["time"], optional = true }
40 changes: 23 additions & 17 deletions opentelemetry-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,47 +108,53 @@ pub mod hyper {
use http::HeaderValue;
use http_body_util::{BodyExt, Full};
use hyper::body::{Body as HttpBody, Frame};
use hyper_util::client::legacy::{connect::Connect, Client};
use hyper_util::client::legacy::{
connect::{Connect, HttpConnector},
Client,
};
use std::fmt::Debug;
use std::pin::Pin;
use std::task::{self, Poll};
use std::time::Duration;
use tokio::time;

#[derive(Debug, Clone)]
pub struct HyperClient<C> {
pub struct HyperClient<C = HttpConnector>
where
C: Connect + Clone + Send + Sync + 'static,
{
inner: Client<C, Body>,
timeout: Duration,
authorization: Option<HeaderValue>,
}

impl<C> HyperClient<C> {
pub fn new_with_timeout(inner: Client<C, Body>, timeout: Duration) -> Self {
impl<C> HyperClient<C>
where
C: Connect + Clone + Send + Sync + 'static,
{
pub fn new(connector: C, timeout: Duration, authorization: Option<HeaderValue>) -> Self {
// TODO - support custom executor
let inner = Client::builder(hyper_util::rt::TokioExecutor::new()).build(connector);
Self {
inner,
timeout,
authorization: None,
authorization,
}
}
}

pub fn new_with_timeout_and_authorization_header(
inner: Client<C, Body>,
impl HyperClient<HttpConnector> {
/// Creates a new `HyperClient` with a default `HttpConnector`.
pub fn with_default_connector(
timeout: Duration,
authorization: HeaderValue,
authorization: Option<HeaderValue>,
) -> Self {
Self {
inner,
timeout,
authorization: Some(authorization),
}
Self::new(HttpConnector::new(), timeout, authorization)
}
}

#[async_trait]
impl<C> HttpClient for HyperClient<C>
where
C: Connect + Send + Sync + Clone + Debug + 'static,
{
impl HttpClient for HyperClient {
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
let (parts, body) = request.into_parts();
let mut request = Request::from_parts(parts, Body(Full::from(body)));
Expand Down
4 changes: 4 additions & 0 deletions opentelemetry-otlp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ Released 2024-Sep-30
- `MetricsExporter` -> `MetricExporter`
- `MetricsExporterBuilder` -> `MetricExporterBuilder`

- [#2263](https://github.com/open-telemetry/opentelemetry-rust/pull/2263)
Support `hyper` client for opentelemetry-otlp. This can be enabled using flag `hyper-client`.
Refer example: https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry-otlp/examples/basic-otlp-http

## v0.25.0

- Update `opentelemetry` dependency version to 0.25
Expand Down
1 change: 1 addition & 0 deletions opentelemetry-otlp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ reqwest-blocking-client = ["reqwest/blocking", "opentelemetry-http/reqwest"]
reqwest-client = ["reqwest", "opentelemetry-http/reqwest"]
reqwest-rustls = ["reqwest", "opentelemetry-http/reqwest-rustls"]
reqwest-rustls-webpki-roots = ["reqwest", "opentelemetry-http/reqwest-rustls-webpki-roots"]
hyper-client = ["opentelemetry-http/hyper"]

# test
integration-testing = ["tonic", "prost", "tokio/full", "trace"]
10 changes: 4 additions & 6 deletions opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,22 @@ publish = false
[features]
default = ["reqwest"]
reqwest = ["opentelemetry-otlp/reqwest-client"]
hyper = ["dep:async-trait", "dep:http", "dep:http-body-util", "dep:hyper", "dep:hyper-util", "dep:opentelemetry-http", "dep:bytes"]
hyper = ["opentelemetry-otlp/hyper-client"]


[dependencies]
once_cell = { workspace = true }
opentelemetry = { path = "../../../opentelemetry" }
opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "metrics", "logs"] }
opentelemetry-http = { path = "../../../opentelemetry-http", optional = true }
opentelemetry-otlp = { path = "../..", features = ["http-proto", "http-json", "reqwest-client", "logs"] }
opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "metrics", "logs"]}
opentelemetry-http = { path = "../../../opentelemetry-http", optional = true, default-features = false}
opentelemetry-otlp = { path = "../..", features = ["http-proto", "http-json", "logs"] , default-features = false}
opentelemetry-appender-tracing = { path = "../../../opentelemetry-appender-tracing", default-features = false}
opentelemetry-semantic-conventions = { path = "../../../opentelemetry-semantic-conventions" }

async-trait = { workspace = true, optional = true }
bytes = { workspace = true, optional = true }
http = { workspace = true, optional = true }
http-body-util = { workspace = true, optional = true }
hyper = { workspace = true, features = ["client"], optional = true }
hyper-util = { workspace = true, features = ["client-legacy"], optional = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true, features = ["std"]}
tracing-core = { workspace = true }
Expand Down
49 changes: 0 additions & 49 deletions opentelemetry-otlp/examples/basic-otlp-http/src/hyper.rs

This file was deleted.

18 changes: 5 additions & 13 deletions opentelemetry-otlp/examples/basic-otlp-http/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/// To use hyper as the HTTP client - cargo run --features="hyper" --no-default-features
use once_cell::sync::Lazy;
use opentelemetry::{
global,
Expand All @@ -23,12 +24,6 @@ use tracing::info;
use tracing_subscriber::prelude::*;
use tracing_subscriber::EnvFilter;

#[cfg(feature = "hyper")]
use opentelemetry_otlp::WithHttpConfig;

#[cfg(feature = "hyper")]
mod hyper;

static RESOURCE: Lazy<Resource> = Lazy::new(|| {
Resource::new(vec![KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
Expand All @@ -37,15 +32,11 @@ static RESOURCE: Lazy<Resource> = Lazy::new(|| {
});

fn init_logs() -> Result<sdklogs::LoggerProvider, opentelemetry::logs::LogError> {
let exporter_builder = LogExporter::builder()
let exporter = LogExporter::builder()
.with_http()
.with_endpoint("http://localhost:4318/v1/logs")
.with_protocol(Protocol::HttpBinary);

#[cfg(feature = "hyper")]
let exporter_builder = exporter_builder.with_http_client(hyper::HyperClient::default());

let exporter = exporter_builder.build()?;
.with_protocol(Protocol::HttpBinary)
.build()?;

Ok(LoggerProvider::builder()
.with_batch_exporter(exporter, runtime::Tokio)
Expand All @@ -59,6 +50,7 @@ fn init_tracer_provider() -> Result<sdktrace::TracerProvider, TraceError> {
.with_protocol(Protocol::HttpBinary) //can be changed to `Protocol::HttpJson` to export in JSON format
.with_endpoint("http://localhost:4318/v1/traces")
.build()?;

Ok(TracerProvider::builder()
.with_batch_exporter(exporter, runtime::Tokio)
.with_config(Config::default().with_resource(RESOURCE.clone()))
Expand Down
49 changes: 36 additions & 13 deletions opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,20 @@ mod logs;
#[cfg(feature = "trace")]
mod trace;

#[cfg(all(
not(feature = "reqwest-client"),
not(feature = "reqwest-blocking-client"),
feature = "hyper-client"
))]
use opentelemetry_http::hyper::HyperClient;

/// Configuration of the http transport
#[derive(Debug)]
#[cfg_attr(
all(
not(feature = "reqwest-client"),
not(feature = "reqwest-blocking-client")
not(feature = "reqwest-blocking-client"),
not(feature = "hyper-client")
),
derive(Default)
)]
Expand All @@ -50,19 +58,36 @@ pub struct HttpConfig {
headers: Option<HashMap<String, String>>,
}

#[cfg(any(feature = "reqwest-blocking-client", feature = "reqwest-client",))]
#[cfg(any(
feature = "reqwest-blocking-client",
feature = "reqwest-client",
feature = "hyper-client"
))]
impl Default for HttpConfig {
fn default() -> Self {
#[cfg(feature = "reqwest-blocking-client")]
let default_client =
Some(Arc::new(reqwest::blocking::Client::new()) as Arc<dyn HttpClient>);
#[cfg(all(not(feature = "reqwest-blocking-client"), feature = "reqwest-client"))]
let default_client = Some(Arc::new(reqwest::Client::new()) as Arc<dyn HttpClient>);
#[cfg(all(
not(feature = "reqwest-client"),
not(feature = "reqwest-blocking-client"),
feature = "hyper-client"
))]
// TODO - support configuring custom connector and executor
let default_client = Some(Arc::new(HyperClient::with_default_connector(
Duration::from_secs(10),
None,
)) as Arc<dyn HttpClient>);
#[cfg(all(
not(feature = "reqwest-client"),
not(feature = "reqwest-blocking-client"),
not(feature = "hyper-client")
))]
let default_client = None;
HttpConfig {
#[cfg(feature = "reqwest-blocking-client")]
client: Some(Arc::new(reqwest::blocking::Client::new())),
#[cfg(all(not(feature = "reqwest-blocking-client"), feature = "reqwest-client"))]
client: Some(Arc::new(reqwest::Client::new())),
#[cfg(all(
not(feature = "reqwest-client"),
not(feature = "reqwest-blocking-client")
))]
client: None,
client: default_client,
headers: None,
}
}
Expand Down Expand Up @@ -140,13 +165,11 @@ impl HttpExporterBuilder {
},
None => self.exporter_config.timeout,
};

let http_client = self
.http_config
.client
.take()
.ok_or(crate::Error::NoHttpClient)?;

#[allow(clippy::mutable_key_type)] // http headers are not mutated
let mut headers: HashMap<HeaderName, HeaderValue> = self
.http_config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ impl<T: Number> ExpoHistogram<T> {
pub(crate) fn measure(&self, value: T, attrs: &[KeyValue]) {
let f_value = value.into_float();
// Ignore NaN and infinity.
// Only makes sense if T is f64, maybe this could be no-op for other cases?
if f_value.is_infinite() || f_value.is_nan() {
return;
}
Expand Down
Loading

0 comments on commit a047fad

Please sign in to comment.