diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b1533d4..d6eed664 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,20 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.22.3] - unreleased + +### Added + +- Added `encode_registry` and `encode_eof` functions to `text` module. + See [PR 205]. + + [PR 205]: https://github.com/prometheus/client_rust/pull/205 + +- Support all platforms with 32 bit atomics lacking 64 bit atomics. + See [PR 203]. + +[PR 203]: https://github.com/prometheus/client_rust/pull/203 + ## [0.22.2] ### Added diff --git a/Cargo.toml b/Cargo.toml index 5507fbb9..4f78e1ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "prometheus-client" -version = "0.22.2" +version = "0.22.3" authors = ["Max Inden "] edition = "2021" description = "Open Metrics client library allowing users to natively instrument applications." @@ -35,7 +35,9 @@ rand = "0.8.4" tide = "0.16" actix-web = "4" tokio = { version = "1", features = ["rt-multi-thread", "net", "macros", "signal"] } -hyper = { version = "0.14.16", features = ["server", "http1", "tcp"] } +hyper = { version = "1.3.1", features = ["server", "http1"] } +hyper-util = { version = "0.1.3", features = ["tokio"] } +http-body-util = "0.1.1" [build-dependencies] prost-build = { version = "0.11.0", optional = true } diff --git a/examples/actix-web.rs b/examples/actix-web.rs index d01a1027..a1ce8e79 100644 --- a/examples/actix-web.rs +++ b/examples/actix-web.rs @@ -1,5 +1,6 @@ use std::sync::Mutex; +use actix_web::middleware::Compress; use actix_web::{web, App, HttpResponse, HttpServer, Responder, Result}; use prometheus_client::encoding::text::encode; use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue}; @@ -61,6 +62,7 @@ async fn main() -> std::io::Result<()> { HttpServer::new(move || { App::new() + .wrap(Compress::default()) .app_data(metrics.clone()) .app_data(state.clone()) .service(web::resource("/metrics").route(web::get().to(metrics_handler))) diff --git a/examples/hyper.rs b/examples/hyper.rs index f5a4009d..82ee121b 100644 --- a/examples/hyper.rs +++ b/examples/hyper.rs @@ -1,7 +1,11 @@ +use http_body_util::{combinators, BodyExt, Full}; use hyper::{ - service::{make_service_fn, service_fn}, - Body, Request, Response, Server, + body::{Bytes, Incoming}, + server::conn::http1, + service::service_fn, + Request, Response, }; +use hyper_util::rt::TokioIo; use prometheus_client::{encoding::text::encode, metrics::counter::Counter, registry::Registry}; use std::{ future::Future, @@ -10,7 +14,11 @@ use std::{ pin::Pin, sync::Arc, }; -use tokio::signal::unix::{signal, SignalKind}; +use tokio::{ + net::TcpListener, + pin, + signal::unix::{signal, SignalKind}, +}; #[tokio::main] async fn main() { @@ -31,39 +39,48 @@ async fn main() { /// Start a HTTP server to report metrics. pub async fn start_metrics_server(metrics_addr: SocketAddr, registry: Registry) { - let mut shutdown_stream = signal(SignalKind::terminate()).unwrap(); - eprintln!("Starting metrics server on {metrics_addr}"); let registry = Arc::new(registry); - Server::bind(&metrics_addr) - .serve(make_service_fn(move |_conn| { - let registry = registry.clone(); - async move { - let handler = make_handler(registry); - Ok::<_, io::Error>(service_fn(handler)) + + let tcp_listener = TcpListener::bind(metrics_addr).await.unwrap(); + let server = http1::Builder::new(); + while let Ok((stream, _)) = tcp_listener.accept().await { + let mut shutdown_stream = signal(SignalKind::terminate()).unwrap(); + let io = TokioIo::new(stream); + let server_clone = server.clone(); + let registry_clone = registry.clone(); + tokio::task::spawn(async move { + let conn = server_clone.serve_connection(io, service_fn(make_handler(registry_clone))); + pin!(conn); + tokio::select! { + _ = conn.as_mut() => {} + _ = shutdown_stream.recv() => { + conn.as_mut().graceful_shutdown(); + } } - })) - .with_graceful_shutdown(async move { - shutdown_stream.recv().await; - }) - .await - .unwrap(); + }); + } } +/// Boxed HTTP body for responses +type BoxBody = combinators::BoxBody; + /// This function returns a HTTP handler (i.e. another function) pub fn make_handler( registry: Arc, -) -> impl Fn(Request) -> Pin>> + Send>> { +) -> impl Fn(Request) -> Pin>> + Send>> +{ // This closure accepts a request and responds with the OpenMetrics encoding of our metrics. - move |_req: Request| { + move |_req: Request| { let reg = registry.clone(); + Box::pin(async move { let mut buf = String::new(); encode(&mut buf, ®.clone()) .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) .map(|_| { - let body = Body::from(buf); + let body = full(Bytes::from(buf)); Response::builder() .header( hyper::header::CONTENT_TYPE, @@ -75,3 +92,8 @@ pub fn make_handler( }) } } + +/// helper function to build a full boxed body +pub fn full(body: Bytes) -> BoxBody { + Full::new(body).map_err(|never| match never {}).boxed() +} diff --git a/src/encoding/text.rs b/src/encoding/text.rs index 4acf1d51..42815269 100644 --- a/src/encoding/text.rs +++ b/src/encoding/text.rs @@ -1,7 +1,7 @@ //! Open Metrics text format implementation. //! //! ``` -//! # use prometheus_client::encoding::text::encode; +//! # use prometheus_client::encoding::text::{encode, encode_registry, encode_eof}; //! # use prometheus_client::metrics::counter::Counter; //! # use prometheus_client::registry::Registry; //! # @@ -15,13 +15,26 @@ //! # ); //! # counter.inc(); //! let mut buffer = String::new(); +//! +//! // Encode the complete OpenMetrics exposition into the message buffer //! encode(&mut buffer, ®istry).unwrap(); +//! let expected_msg = "# HELP my_counter This is my counter.\n".to_owned() + +//! "# TYPE my_counter counter\n" + +//! "my_counter_total 1\n" + +//! "# EOF\n"; +//! assert_eq!(expected_msg, buffer); +//! buffer.clear(); +//! +//! // Encode just the registry into the message buffer +//! encode_registry(&mut buffer, ®istry).unwrap(); +//! let expected_reg = "# HELP my_counter This is my counter.\n".to_owned() + +//! "# TYPE my_counter counter\n" + +//! "my_counter_total 1\n"; +//! assert_eq!(expected_reg, buffer); //! -//! let expected = "# HELP my_counter This is my counter.\n".to_owned() + -//! "# TYPE my_counter counter\n" + -//! "my_counter_total 1\n" + -//! "# EOF\n"; -//! assert_eq!(expected, buffer); +//! // Encode EOF marker into message buffer to complete the OpenMetrics exposition +//! encode_eof(&mut buffer).unwrap(); +//! assert_eq!(expected_msg, buffer); //! ``` use crate::encoding::{EncodeExemplarValue, EncodeLabelSet}; @@ -33,15 +46,140 @@ use std::borrow::Cow; use std::collections::HashMap; use std::fmt::Write; +/// Encode both the metrics registered with the provided [`Registry`] and the +/// EOF marker into the provided [`Write`]r using the OpenMetrics text format. +/// +/// Note: This function encodes the **complete** OpenMetrics exposition. +/// +/// Use [`encode_registry`] or [`encode_eof`] if partial encoding is needed. +/// +/// See [OpenMetrics exposition format](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#text-format) +/// for additional details. +/// +/// # Examples +/// +/// ```no_run +/// # use prometheus_client::encoding::text::encode; +/// # use prometheus_client::metrics::counter::Counter; +/// # use prometheus_client::metrics::gauge::Gauge; +/// # use prometheus_client::registry::Registry; +/// # +/// // Initialize registry with metric families +/// let mut registry = Registry::default(); +/// let counter: Counter = Counter::default(); +/// registry.register( +/// "my_counter", +/// "This is my counter", +/// counter.clone(), +/// ); +/// let gauge: Gauge = Gauge::default(); +/// registry.register( +/// "my_gauge", +/// "This is my gauge", +/// gauge.clone(), +/// ); +/// +/// // Encode the complete OpenMetrics exposition into the buffer +/// let mut buffer = String::new(); +/// encode(&mut buffer, ®istry)?; +/// # Ok::<(), std::fmt::Error>(()) +/// ``` +pub fn encode(writer: &mut W, registry: &Registry) -> Result<(), std::fmt::Error> +where + W: Write, +{ + encode_registry(writer, registry)?; + encode_eof(writer) +} + /// Encode the metrics registered with the provided [`Registry`] into the /// provided [`Write`]r using the OpenMetrics text format. -pub fn encode(writer: &mut W, registry: &Registry) -> Result<(), std::fmt::Error> +/// +/// Note: The OpenMetrics exposition requires that a complete message must end +/// with an EOF marker. +/// +/// This function may be called repeatedly for the HTTP scrape response until +/// [`encode_eof`] signals the end of the response. +/// +/// This may also be used to compose a partial message with metrics assembled +/// from multiple registries. +/// +/// # Examples +/// +/// ```no_run +/// # use prometheus_client::encoding::text::encode_registry; +/// # use prometheus_client::metrics::counter::Counter; +/// # use prometheus_client::metrics::gauge::Gauge; +/// # use prometheus_client::registry::Registry; +/// # +/// // Initialize registry with a counter +/// let mut reg_counter = Registry::default(); +/// let counter: Counter = Counter::default(); +/// reg_counter.register( +/// "my_counter", +/// "This is my counter", +/// counter.clone(), +/// ); +/// +/// // Encode the counter registry into the buffer +/// let mut buffer = String::new(); +/// encode_registry(&mut buffer, ®_counter)?; +/// +/// // Initialize another registry but with a gauge +/// let mut reg_gauge = Registry::default(); +/// let gauge: Gauge = Gauge::default(); +/// reg_gauge.register( +/// "my_gauge", +/// "This is my gauge", +/// gauge.clone(), +/// ); +/// +/// // Encode the gauge registry into the buffer +/// encode_registry(&mut buffer, ®_gauge)?; +/// # Ok::<(), std::fmt::Error>(()) +/// ``` +pub fn encode_registry(writer: &mut W, registry: &Registry) -> Result<(), std::fmt::Error> where W: Write, { - registry.encode(&mut DescriptorEncoder::new(writer).into())?; - writer.write_str("# EOF\n")?; - Ok(()) + registry.encode(&mut DescriptorEncoder::new(writer).into()) +} + +/// Encode the EOF marker into the provided [`Write`]r using the OpenMetrics +/// text format. +/// +/// Note: This function is used to mark/signal the end of the exposition. +/// +/// # Examples +/// +/// ```no_run +/// # use prometheus_client::encoding::text::{encode_registry, encode_eof}; +/// # use prometheus_client::metrics::counter::Counter; +/// # use prometheus_client::metrics::gauge::Gauge; +/// # use prometheus_client::registry::Registry; +/// # +/// // Initialize registry with a counter +/// let mut registry = Registry::default(); +/// let counter: Counter = Counter::default(); +/// registry.register( +/// "my_counter", +/// "This is my counter", +/// counter.clone(), +/// ); +/// +/// // Encode registry into the buffer +/// let mut buffer = String::new(); +/// encode_registry(&mut buffer, ®istry)?; +/// +/// // Encode EOF marker to complete the message +/// encode_eof(&mut buffer)?; +/// # Ok::<(), std::fmt::Error>(()) +/// ``` +pub fn encode_eof(writer: &mut W) -> Result<(), std::fmt::Error> +where + W: Write, +{ + writer.write_str("# EOF\n") } pub(crate) struct DescriptorEncoder<'a> { @@ -915,6 +1053,52 @@ mod tests { parse_with_python_client(encoded); } + #[test] + fn encode_registry_eof() { + let mut orders_registry = Registry::default(); + + let total_orders: Counter = Default::default(); + orders_registry.register("orders", "Total orders received", total_orders.clone()); + total_orders.inc(); + + let processing_times = Histogram::new(exponential_buckets(1.0, 2.0, 10)); + orders_registry.register_with_unit( + "processing_times", + "Order times", + Unit::Seconds, + processing_times.clone(), + ); + processing_times.observe(2.4); + + let mut user_auth_registry = Registry::default(); + + let successful_logins: Counter = Default::default(); + user_auth_registry.register( + "successful_logins", + "Total successful logins", + successful_logins.clone(), + ); + successful_logins.inc(); + + let failed_logins: Counter = Default::default(); + user_auth_registry.register( + "failed_logins", + "Total failed logins", + failed_logins.clone(), + ); + + let mut response = String::new(); + + encode_registry(&mut response, &orders_registry).unwrap(); + assert_eq!(&response[response.len() - 20..], "bucket{le=\"+Inf\"} 1\n"); + + encode_registry(&mut response, &user_auth_registry).unwrap(); + assert_eq!(&response[response.len() - 20..], "iled_logins_total 0\n"); + + encode_eof(&mut response).unwrap(); + assert_eq!(&response[response.len() - 20..], "ogins_total 0\n# EOF\n"); + } + fn parse_with_python_client(input: String) { pyo3::prepare_freethreaded_python(); diff --git a/src/metrics/counter.rs b/src/metrics/counter.rs index c1c5e511..63d84ffc 100644 --- a/src/metrics/counter.rs +++ b/src/metrics/counter.rs @@ -6,7 +6,7 @@ use crate::encoding::{EncodeMetric, MetricEncoder}; use super::{MetricType, TypedMetric}; use std::marker::PhantomData; -#[cfg(not(any(target_arch = "mips", target_arch = "powerpc")))] +#[cfg(target_has_atomic = "64")] use std::sync::atomic::AtomicU64; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; @@ -40,7 +40,7 @@ use std::sync::Arc; /// counter.inc(); /// let _value: f64 = counter.get(); /// ``` -#[cfg(not(any(target_arch = "mips", target_arch = "powerpc")))] +#[cfg(target_has_atomic = "64")] #[derive(Debug)] pub struct Counter { value: Arc, @@ -48,7 +48,7 @@ pub struct Counter { } /// Open Metrics [`Counter`] to measure discrete events. -#[cfg(any(target_arch = "mips", target_arch = "powerpc"))] +#[cfg(not(target_has_atomic = "64"))] #[derive(Debug)] pub struct Counter { value: Arc, @@ -114,7 +114,7 @@ pub trait Atomic { fn get(&self) -> N; } -#[cfg(not(any(target_arch = "mips", target_arch = "powerpc")))] +#[cfg(target_has_atomic = "64")] impl Atomic for AtomicU64 { fn inc(&self) -> u64 { self.inc_by(1) @@ -143,7 +143,7 @@ impl Atomic for AtomicU32 { } } -#[cfg(not(any(target_arch = "mips", target_arch = "powerpc")))] +#[cfg(target_has_atomic = "64")] impl Atomic for AtomicU64 { fn inc(&self) -> f64 { self.inc_by(1.0) @@ -231,7 +231,7 @@ mod tests { assert_eq!(1, counter.get()); } - #[cfg(not(any(target_arch = "mips", target_arch = "powerpc")))] + #[cfg(target_has_atomic = "64")] #[test] fn f64_stored_in_atomic_u64() { fn prop(fs: Vec) { diff --git a/src/metrics/exemplar.rs b/src/metrics/exemplar.rs index c8478c6a..e61f4d5a 100644 --- a/src/metrics/exemplar.rs +++ b/src/metrics/exemplar.rs @@ -11,9 +11,9 @@ use super::histogram::Histogram; use super::{MetricType, TypedMetric}; use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard}; use std::collections::HashMap; -#[cfg(any(target_arch = "mips", target_arch = "powerpc"))] +#[cfg(not(target_has_atomic = "64"))] use std::sync::atomic::AtomicU32; -#[cfg(not(any(target_arch = "mips", target_arch = "powerpc")))] +#[cfg(target_has_atomic = "64")] use std::sync::atomic::AtomicU64; use std::sync::Arc; @@ -65,7 +65,7 @@ pub struct Exemplar { /// }), /// ); /// ``` -#[cfg(not(any(target_arch = "mips", target_arch = "powerpc")))] +#[cfg(target_has_atomic = "64")] #[derive(Debug)] pub struct CounterWithExemplar { pub(crate) inner: Arc>>, @@ -77,7 +77,7 @@ impl TypedMetric for CounterWithExemplar { /// Open Metrics [`Counter`] with an [`Exemplar`] to both measure discrete /// events and track references to data outside of the metric set. -#[cfg(any(target_arch = "mips", target_arch = "powerpc"))] +#[cfg(not(target_has_atomic = "64"))] #[derive(Debug)] pub struct CounterWithExemplar { pub(crate) inner: Arc>>, diff --git a/src/metrics/gauge.rs b/src/metrics/gauge.rs index 7b268427..fcf7e6a7 100644 --- a/src/metrics/gauge.rs +++ b/src/metrics/gauge.rs @@ -7,7 +7,7 @@ use crate::encoding::{EncodeGaugeValue, EncodeMetric, MetricEncoder}; use super::{MetricType, TypedMetric}; use std::marker::PhantomData; use std::sync::atomic::{AtomicI32, AtomicU32, Ordering}; -#[cfg(not(any(target_arch = "mips", target_arch = "powerpc")))] +#[cfg(target_has_atomic = "64")] use std::sync::atomic::{AtomicI64, AtomicU64}; use std::sync::Arc; @@ -40,7 +40,7 @@ use std::sync::Arc; /// gauge.set(42.0); /// let _value: f64 = gauge.get(); /// ``` -#[cfg(not(any(target_arch = "mips", target_arch = "powerpc")))] +#[cfg(target_has_atomic = "64")] #[derive(Debug)] pub struct Gauge { value: Arc, @@ -48,7 +48,7 @@ pub struct Gauge { } /// Open Metrics [`Gauge`] to record current measurements. -#[cfg(any(target_arch = "mips", target_arch = "powerpc"))] +#[cfg(not(target_has_atomic = "64"))] #[derive(Debug)] pub struct Gauge { value: Arc, @@ -186,7 +186,7 @@ impl Atomic for AtomicU32 { } } -#[cfg(not(any(target_arch = "mips", target_arch = "powerpc")))] +#[cfg(target_has_atomic = "64")] impl Atomic for AtomicI64 { fn inc(&self) -> i64 { self.inc_by(1) @@ -213,7 +213,7 @@ impl Atomic for AtomicI64 { } } -#[cfg(not(any(target_arch = "mips", target_arch = "powerpc")))] +#[cfg(target_has_atomic = "64")] impl Atomic for AtomicU64 { fn inc(&self) -> f64 { self.inc_by(1.0)