Skip to content

Commit

Permalink
hackety hack: add a future timeout that does not use tokio
Browse files Browse the repository at this point in the history
  • Loading branch information
scottgerring committed Dec 18, 2024
1 parent 6ad1d5c commit 0254bc1
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 5 deletions.
1 change: 1 addition & 0 deletions opentelemetry-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ hyper-util = { workspace = true, features = ["client-legacy", "http1", "http2"],
opentelemetry = { version = "0.27", path = "../opentelemetry", features = ["trace"] }
reqwest = { workspace = true, features = ["blocking"], optional = true }
tokio = { workspace = true, features = ["time"], optional = true }
futures-timer = "3.0.3"
46 changes: 42 additions & 4 deletions opentelemetry-http/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use async_trait::async_trait;
use std::fmt::Debug;

use std::future::Future;
use std::pin::Pin;
use std::task::Poll;
use std::time::Duration;
#[doc(no_inline)]
pub use bytes::Bytes;
#[doc(no_inline)]
pub use http::{Request, Response};
use opentelemetry::Context;
use opentelemetry::propagation::{Extractor, Injector};

/// Helper for injecting headers into HTTP Requests. This is used for OpenTelemetry context
Expand Down Expand Up @@ -102,7 +106,7 @@ mod reqwest {

#[cfg(feature = "hyper")]
pub mod hyper {
use crate::ResponseExt;
use crate::{timeout, ResponseExt};

use super::{async_trait, Bytes, HttpClient, HttpError, Request, Response};
use http::HeaderValue;
Expand All @@ -116,7 +120,7 @@ pub mod hyper {
use std::pin::Pin;
use std::task::{self, Poll};
use std::time::Duration;
use tokio::time;


#[derive(Debug, Clone)]
pub struct HyperClient<C = HttpConnector>
Expand Down Expand Up @@ -163,7 +167,7 @@ pub mod hyper {
.headers_mut()
.insert(http::header::AUTHORIZATION, authorization.clone());
}
let mut response = time::timeout(self.timeout, self.inner.request(request)).await??;
let mut response = timeout(self.timeout, self.inner.request(request)).await??;

Check warning on line 170 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L170

Added line #L170 was not covered by tests
let headers = std::mem::take(response.headers_mut());

let mut http_response = Response::builder()
Expand Down Expand Up @@ -218,6 +222,40 @@ impl<T> ResponseExt for Response<T> {
}
}

struct Timeout<F> {
future: F,
delay: Pin<Box<dyn Future<Output = ()> + Send>>,
}

impl<F: Future + std::marker::Unpin> Future for Timeout<F> where
F::Output: Send,
{
type Output = Result<F::Output, &'static str>;

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

// Poll the delay future
if this.delay.as_mut().poll(cx).is_ready() {
return Poll::Ready(Err("timeout"));
}

// Poll the main future
match Pin::new(&mut this.future).poll(cx) {
Poll::Ready(output) => Poll::Ready(Ok(output)),
Poll::Pending => Poll::Pending,

Check warning on line 246 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L235-L246

Added lines #L235 - L246 were not covered by tests
}
}

Check warning on line 248 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L248

Added line #L248 was not covered by tests
}

fn timeout<F: Future + Send>(duration: Duration, future: F) -> Timeout<F> {
Timeout {
future,
delay: Box::pin(futures_timer::Delay::new(duration)),
}
}

Check warning on line 256 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L251-L256

Added lines #L251 - L256 were not covered by tests


#[cfg(test)]
mod tests {
use super::*;
Expand Down
3 changes: 3 additions & 0 deletions opentelemetry-otlp/src/exporter/http/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use http::{header::CONTENT_TYPE, Method};
use opentelemetry::otel_debug;
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use opentelemetry_sdk::metrics::{MetricError, MetricResult};

Expand All @@ -21,6 +22,8 @@ impl MetricsClient for OtlpHttpClient {
_ => Err(MetricError::Other("exporter is already shut down".into())),
})?;

otel_debug!(name: "MetricsClientExport");

Check warning on line 25 in opentelemetry-otlp/src/exporter/http/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/metrics.rs#L25

Added line #L25 was not covered by tests

let (body, content_type) = self.build_metrics_export_body(metrics)?;
let mut request = http::Request::builder()
.method(Method::POST)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ static INIT_TRACING: Once = Once::new();
fn init_tracing() {
INIT_TRACING.call_once(|| {
let subscriber = FmtSubscriber::builder()
.with_max_level(tracing::Level::DEBUG)
.with_max_level(tracing::Level::TRACE)
.finish();

tracing::subscriber::set_global_default(subscriber)
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,8 @@ impl PeriodicReaderInner {
return Err(e);
}

otel_debug!(name: "PeriodicReaderMetricsExported");

Ok(())
}

Expand Down

0 comments on commit 0254bc1

Please sign in to comment.