diff --git a/.github/workflows/format-code.yml b/.github/workflows/format-code.yml index 42693b3..45cbff1 100644 --- a/.github/workflows/format-code.yml +++ b/.github/workflows/format-code.yml @@ -8,7 +8,7 @@ on: jobs: format-code: runs-on: "ubuntu-latest" - container: rust:1.79 + container: rust:1.82 steps: - name: Checkout the code on merge diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 9e0d1d5..5cfc852 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -9,7 +9,7 @@ on: jobs: lint: runs-on: "ubuntu-latest" - container: rust:1.79 + container: rust:1.82 steps: - uses: actions/checkout@v2 diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index a6364d1..f58e658 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -13,7 +13,7 @@ jobs: matrix: features: [""] runs-on: "ubuntu-latest" - container: rust:1.79 + container: rust:1.82 steps: - uses: actions/checkout@v2 @@ -26,7 +26,7 @@ jobs: matrix: features: [""] runs-on: "ubuntu-latest" - container: rust:1.79 + container: rust:1.82 services: consul: image: consul:1.11.11 diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 9786c24..0551364 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -6,7 +6,7 @@ on: jobs: test: runs-on: ubuntu-latest - container: rust:1.79 + container: rust:1.82 services: consul: image: consul:1.11.11 @@ -25,7 +25,7 @@ jobs: dry-run: runs-on: ubuntu-latest - container: rust:1.79 + container: rust:1.82 steps: - uses: actions/checkout@v2 @@ -36,7 +36,7 @@ jobs: publish: needs: [test, dry-run] runs-on: ubuntu-latest - container: rust:1.79 + container: rust:1.82 environment: crates-publish steps: diff --git a/CHANGELOG.md b/CHANGELOG.md index c7df596..5359871 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## Unreleased +## 0.8.0 - 2024-11-20 + +### Changed + +- `opentelemetry` upgraded to version `0.27` from `0.24`. +- Removed `prometheus` dependency in the `metrics` feature, replacing it with a `metrics_receiver` in a metrics library agnostic fashion. + ## 0.7.0 - 2024-06-25 ### Changed diff --git a/Cargo.toml b/Cargo.toml index 678f5c9..a2c1af7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rs-consul" -version = "0.7.0" +version = "0.8.0" authors = ["Roblox"] edition = "2021" description = "This crate provides access to a set of strongly typed apis to interact with consul (https://www.consul.io/)" @@ -11,7 +11,7 @@ license-file = "LICENSE" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] default = [] -metrics = ["prometheus", "lazy_static"] +metrics = [] trace = ["dep:opentelemetry"] # keep this list sorted! @@ -22,9 +22,7 @@ http-body-util = "0.1" hyper = { version = "1", features = ["full"] } hyper-rustls = { version = "0.27", default-features = false, features = ["webpki-roots", "ring", "http1"] } hyper-util = { version = "0.1", features = ["client", "client-legacy", "tokio", "http2"] } -lazy_static = { version = "1", optional = true } -opentelemetry = { version = "0.24", optional = true } -prometheus = { version = "0.13", optional = true } +opentelemetry = { version = "0.27", optional = true } quick-error = "2" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/rust-toolchain b/rust-toolchain index c408301..a98f408 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -1.79 \ No newline at end of file +1.82 \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index c2af1a0..7ab53cd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,11 +26,14 @@ SOFTWARE. //! //! This crate provides access to a set of strongly typed apis to interact with consul (https://www.consul.io/) #![deny(missing_docs)] - use http_body_util::BodyExt; + +#[cfg(feature = "metrics")] +use metrics::MetricInfoWrapper; + use std::collections::HashMap; use std::convert::Infallible; -use std::time::{Duration, Instant}; +use std::time::Duration; use std::{env, str::Utf8Error}; use base64::Engine; @@ -39,13 +42,14 @@ use http_body_util::{Empty, Full}; use hyper::body::Bytes; use hyper::{body::Buf, Method}; use hyper_util::client::legacy::{connect::HttpConnector, Builder, Client}; -#[cfg(feature = "metrics")] -use lazy_static::lazy_static; use quick_error::quick_error; use serde::{Deserialize, Serialize}; use slog_scope::{error, info}; use tokio::time::timeout; +#[cfg(feature = "metrics")] +use http::StatusCode; + #[cfg(feature = "trace")] use opentelemetry::global; #[cfg(feature = "trace")] @@ -55,10 +59,15 @@ use opentelemetry::trace::Span; #[cfg(feature = "trace")] use opentelemetry::trace::Status; +#[cfg(feature = "metrics")] +pub use metrics::MetricInfo; +pub use metrics::{Function, HttpMethod}; pub use types::*; #[cfg(feature = "trace")] mod hyper_wrapper; +/// Types exposed for metrics on the consuming application without taking a dependency on a metrics library or a specific implementation. +mod metrics; /// The strongly typed data structures representing canonical consul objects. pub mod types; @@ -115,44 +124,6 @@ quick_error! { } } -#[cfg(feature = "metrics")] -lazy_static! { - static ref CONSUL_REQUESTS_TOTAL: prometheus::CounterVec = prometheus::register_counter_vec!( - prometheus::opts!("consul_requests_total", "Total requests made to consul"), - &["method", "function"] - ) - .unwrap(); - static ref CONSUL_REQUESTS_FAILED_TOTAL: prometheus::CounterVec = - prometheus::register_counter_vec!( - prometheus::opts!( - "consul_requests_failed_total", - "Total requests made to consul that failed" - ), - &["method", "function"] - ) - .unwrap(); - static ref CONSUL_REQUESTS_DURATION_MS: prometheus::HistogramVec = - prometheus::register_histogram_vec!( - prometheus::histogram_opts!( - "consul_requests_duration_milliseconds", - "Time it takes for a consul request to complete" - ), - &["method", "function"] - ) - .unwrap(); -} - -const READ_KEY_METHOD_NAME: &str = "read_key"; -const CREATE_OR_UPDATE_KEY_METHOD_NAME: &str = "create_or_update_key"; -const CREATE_OR_UPDATE_KEY_SYNC_METHOD_NAME: &str = "create_or_update_key_sync"; -const DELETE_KEY_METHOD_NAME: &str = "delete_key"; -const GET_LOCK_METHOD_NAME: &str = "get_lock"; -const REGISTER_ENTITY_METHOD_NAME: &str = "register_entity"; -const DEREGISTER_ENTITY_METHOD_NAME: &str = "deregister_entity"; -const GET_ALL_REGISTERED_SERVICE_NAMES_METHOD_NAME: &str = "get_all_registered_service_names"; -const GET_SERVICE_NODES_METHOD_NAME: &str = "get_service_nodes"; -const GET_SESSION_METHOD_NAME: &str = "get_session"; - pub(crate) type Result = std::result::Result; /// The config necessary to create a new consul client. @@ -256,6 +227,10 @@ pub struct Consul { config: Config, #[cfg(feature = "trace")] tracer: BoxedTracer, + #[cfg(feature = "metrics")] + metrics_tx: std::sync::mpsc::Sender, + #[cfg(feature = "metrics")] + metrics_rx: Option>, } fn https_connector() -> hyper_rustls::HttpsConnector { @@ -318,14 +293,26 @@ impl Consul { /// - [Config](consul::Config) /// - [HttpsClient](consul::HttpsClient) pub fn new_with_client(config: Config, https_client: HttpsClient) -> Self { + #[cfg(feature = "metrics")] + let (tx, rx) = std::sync::mpsc::channel::(); Consul { https_client, config, #[cfg(feature = "trace")] tracer: global::tracer("consul"), + #[cfg(feature = "metrics")] + metrics_tx: tx, + #[cfg(feature = "metrics")] + metrics_rx: Some(rx), } } + #[cfg(feature = "metrics")] + /// Returns the metrics receiver for the consul client. + pub fn metrics_receiver(&mut self) -> Option> { + self.metrics_rx.take() + } + /// Reads a key from Consul's KV store. See the [consul docs](https://www.consul.io/api-docs/kv#read-key) for more information. /// # Arguments: /// - request - the [ReadKeyRequest](consul::types::ReadKeyRequest) @@ -341,7 +328,7 @@ impl Consul { req, BoxBody::new(http_body_util::Empty::::new()), None, - READ_KEY_METHOD_NAME, + Function::ReadKey, ) .await?; Ok(ResponseMeta { @@ -386,7 +373,7 @@ impl Consul { req, BoxBody::new(Full::::new(Bytes::from(value))), None, - CREATE_OR_UPDATE_KEY_METHOD_NAME, + Function::CreateOrUpdateKey, ) .await?; Ok(( @@ -413,9 +400,13 @@ impl Consul { // TODO: Emit OpenTelemetry span for this request let url = self.build_create_or_update_url(request); - - record_request_metric_if_enabled(&Method::PUT, CREATE_OR_UPDATE_KEY_SYNC_METHOD_NAME); - let step_start_instant = Instant::now(); + #[cfg(feature = "metrics")] + let mut metrics_info_wrapper = MetricInfoWrapper::new( + HttpMethod::Put, + Function::CreateOrUpdateKey, + None, + self.metrics_tx.clone(), + ); let result = ureq::put(&url) .set( "X-Consul-Token", @@ -423,33 +414,36 @@ impl Consul { ) .send_bytes(&value); - record_duration_metric_if_enabled( - &Method::PUT, - CREATE_OR_UPDATE_KEY_SYNC_METHOD_NAME, - step_start_instant.elapsed().as_millis() as f64, - ); - let response = result.map_err(|e| { - record_failure_metric_if_enabled(&Method::PUT, CREATE_OR_UPDATE_KEY_SYNC_METHOD_NAME); - match e { - ureq::Error::Status(code, response) => ConsulError::UnexpectedResponseCode( - hyper::StatusCode::from_u16(code).unwrap_or_default(), + let response = result.map_err(|e| match e { + ureq::Error::Status(code, response) => { + let code = hyper::StatusCode::from_u16(code).unwrap_or_default(); + #[cfg(feature = "metrics")] + { + metrics_info_wrapper.set_status(code); + drop(metrics_info_wrapper.clone()); + } + ConsulError::UnexpectedResponseCode( + code, response.into_string().unwrap_or_default(), - ), - ureq::Error::Transport(t) => ConsulError::TransportError( - t.kind(), - t.message().unwrap_or_default().to_string(), - ), + ) + } + ureq::Error::Transport(t) => { + ConsulError::TransportError(t.kind(), t.message().unwrap_or_default().to_string()) } })?; let status = response.status(); if status == 200 { let val = response.into_string()?; let response: bool = std::str::FromStr::from_str(val.trim())?; + #[cfg(feature = "metrics")] + { + metrics_info_wrapper.set_status(StatusCode::OK); + drop(metrics_info_wrapper.clone()); + } return Ok(response); } let body = response.into_string()?; - record_failure_metric_if_enabled(&Method::PUT, CREATE_OR_UPDATE_KEY_SYNC_METHOD_NAME); Err(ConsulError::SyncUnexpectedResponseCode(status, body)) } @@ -476,7 +470,7 @@ impl Consul { req, BoxBody::new(Empty::::new()), None, - DELETE_KEY_METHOD_NAME, + Function::DeleteKey, ) .await?; serde_json::from_reader(response_body.reader()) @@ -525,7 +519,7 @@ impl Consul { lock_index_req, BoxBody::new(http_body_util::Empty::::new()), None, - GET_LOCK_METHOD_NAME, + Function::ReadKey, ) .await?; Err(ConsulError::LockAcquisitionFailure(index)) @@ -567,7 +561,7 @@ impl Consul { request, BoxBody::new(Full::::new(Bytes::from(payload.into_bytes()))), Some(Duration::from_secs(5)), - REGISTER_ENTITY_METHOD_NAME, + Function::RegisterEntity, ) .await?; Ok(()) @@ -587,7 +581,7 @@ impl Consul { request, BoxBody::new(Full::::new(Bytes::from(payload.into_bytes()))), Some(Duration::from_secs(5)), - DEREGISTER_ENTITY_METHOD_NAME, + Function::DeregisterEntity, ) .await?; Ok(()) @@ -615,7 +609,7 @@ impl Consul { request, BoxBody::new(Empty::::new()), query_opts.timeout, - GET_ALL_REGISTERED_SERVICE_NAMES_METHOD_NAME, + Function::GetAllRegisteredServices, ) .await?; let service_tags_by_name = @@ -647,7 +641,7 @@ impl Consul { req, BoxBody::new(Empty::::new()), query_opts.timeout, - GET_SERVICE_NODES_METHOD_NAME, + Function::GetServiceNodes, ) .await?; let response = @@ -767,7 +761,7 @@ impl Consul { create_session_json.into_bytes(), ))), None, - GET_SESSION_METHOD_NAME, + Function::GetSession, ) .await?; serde_json::from_reader(response_body.reader()) @@ -801,7 +795,7 @@ impl Consul { req: http::request::Builder, body: BoxBody, duration: Option, - request_name: &str, + _function: Function, ) -> Result<(Box, u64)> { let req = req .header( @@ -813,37 +807,45 @@ impl Consul { #[cfg(feature = "trace")] let mut span = crate::hyper_wrapper::span_for_request(&self.tracer, &req); - let method = req.method().clone(); - record_request_metric_if_enabled(&method, request_name); + #[cfg(feature = "metrics")] + let mut metrics_info_wrapper = MetricInfoWrapper::new( + req.method().clone().into(), + _function, + None, + self.metrics_tx.clone(), + ); let future = self.https_client.request(req); - - let step_start_instant = Instant::now(); let response = if let Some(dur) = duration { match timeout(dur, future).await { Ok(resp) => resp.map_err(ConsulError::ResponseError), - Err(_) => Err(ConsulError::TimeoutExceeded(dur)), + Err(_) => { + #[cfg(feature = "metrics")] + { + metrics_info_wrapper.set_status(StatusCode::REQUEST_TIMEOUT); + drop(metrics_info_wrapper.clone()); + } + Err(ConsulError::TimeoutExceeded(dur)) + } } } else { future.await.map_err(ConsulError::ResponseError) }; - record_duration_metric_if_enabled( - &method, - request_name, - step_start_instant.elapsed().as_millis() as f64, - ); - if response.is_err() { - record_failure_metric_if_enabled(&method, request_name); - } - - let response = response?; + let response = response.inspect_err(|_| { + #[cfg(feature = "metrics")] + drop(metrics_info_wrapper.clone()); + })?; #[cfg(feature = "trace")] crate::hyper_wrapper::annotate_span_for_response(&mut span, &response); let status = response.status(); if status != hyper::StatusCode::OK { - record_failure_metric_if_enabled(&method, request_name); + #[cfg(feature = "metrics")] + { + metrics_info_wrapper.set_status(status); + drop(metrics_info_wrapper); + } let mut response_body = response .into_body() @@ -867,8 +869,6 @@ impl Consul { match response.into_body().collect().await.map(|b| b.aggregate()) { Ok(body) => Ok((Box::new(body), index)), Err(e) => { - record_failure_metric_if_enabled(&method, request_name); - #[cfg(feature = "trace")] span.set_status(Status::error(e.to_string())); Err(ConsulError::InvalidResponse(e)) @@ -955,33 +955,6 @@ fn add_query_param_separator(mut url: String, already_added: bool) -> String { url } -fn record_request_metric_if_enabled(_method: &Method, _function: &str) { - #[cfg(feature = "metrics")] - { - CONSUL_REQUESTS_TOTAL - .with_label_values(&[_method.as_str(), _function]) - .inc(); - } -} - -fn record_failure_metric_if_enabled(_method: &Method, _function: &str) { - #[cfg(feature = "metrics")] - { - CONSUL_REQUESTS_FAILED_TOTAL - .with_label_values(&[_method.as_str(), _function]) - .inc(); - } -} - -fn record_duration_metric_if_enabled(_method: &Method, _function: &str, _duration: f64) { - #[cfg(feature = "metrics")] - { - CONSUL_REQUESTS_DURATION_MS - .with_label_values(&[_method.as_str(), _function]) - .observe(_duration); - } -} - #[cfg(test)] mod tests { use std::time::Duration; diff --git a/src/metrics.rs b/src/metrics.rs new file mode 100644 index 0000000..fa599d7 --- /dev/null +++ b/src/metrics.rs @@ -0,0 +1,178 @@ +#[cfg(feature = "metrics")] +use http::StatusCode; + +#[cfg(feature = "metrics")] +use std::time::Duration; + +/// A struct to hold information about a calls to consul for metrics. +#[cfg(feature = "metrics")] +#[derive(Debug, Clone, Copy)] +pub struct MetricInfo { + /// The HTTP method used in the call. + pub method: HttpMethod, + /// The function called in the consul client. + pub function: Function, + /// The status code returned by the call if any. + pub status: Option, + /// The duration of the call. + pub duration: Option, +} + +#[cfg(feature = "metrics")] +impl MetricInfo { + fn new(method: HttpMethod, function: Function, status: Option) -> Self { + Self { + method, + function, + status, + duration: None, + } + } + + /// Get the labels for the metric as an array of `&str`. + pub fn labels(&self) -> [&str; 3] { + if let Some(status) = self.status.and_then(|o| o.canonical_reason()) { + [self.method.as_str(), self.function.as_str(), status] + } else { + [self.method.as_str(), self.function.as_str(), "unknown"] + } + } +} + +#[cfg(feature = "metrics")] +#[derive(Debug, Clone)] +pub(crate) struct MetricInfoWrapper { + metrics: MetricInfo, + sender: Option>, + start: std::time::Instant, +} + +#[cfg(feature = "metrics")] +impl MetricInfoWrapper { + pub fn new( + method: HttpMethod, + function: Function, + status: Option, + sender: std::sync::mpsc::Sender, + ) -> Self { + Self { + metrics: MetricInfo::new(method, function, status), + sender: Some(sender), + start: std::time::Instant::now(), + } + } + + pub fn set_status(&mut self, status: StatusCode) { + self.metrics.status = Some(status); + } +} + +#[cfg(feature = "metrics")] +impl Drop for MetricInfoWrapper { + fn drop(&mut self) { + if let Some(sender) = self.sender.take() { + let mut metrics = self.metrics; + metrics.duration = Some(self.start.elapsed()); + let _ = sender.send(self.metrics); + } + } +} + +/// The HTTP methods supported by the consul API. +#[derive(Debug, Clone, Copy)] +pub enum HttpMethod { + /// The OPTIONS method. + Options, + /// The GET method. + Get, + /// The POST method. + Post, + /// The PUT method. + Put, + /// The DELETE method. + Delete, + /// The HEAD method. + Head, + /// The TRACE method. + Trace, + /// The CONNECT method. + Connect, + /// The PATCH method. + Patch, + /// Extensions to the HTTP methods. + Extensions, +} + +impl HttpMethod { + #[cfg(feature = "metrics")] + fn as_str(&self) -> &'static str { + match self { + HttpMethod::Options => "options", + HttpMethod::Get => "get", + HttpMethod::Post => "post", + HttpMethod::Put => "put", + HttpMethod::Delete => "delete", + HttpMethod::Head => "head", + HttpMethod::Trace => "trace", + HttpMethod::Connect => "connect", + HttpMethod::Patch => "patch", + HttpMethod::Extensions => "extensions", + } + } +} + +#[cfg(feature = "metrics")] +impl From for HttpMethod { + fn from(method: http::Method) -> Self { + match method { + http::Method::OPTIONS => HttpMethod::Options, + http::Method::GET => HttpMethod::Get, + http::Method::POST => HttpMethod::Post, + http::Method::PUT => HttpMethod::Put, + http::Method::DELETE => HttpMethod::Delete, + http::Method::HEAD => HttpMethod::Head, + http::Method::TRACE => HttpMethod::Trace, + http::Method::CONNECT => HttpMethod::Connect, + http::Method::PATCH => HttpMethod::Patch, + _ => HttpMethod::Extensions, + } + } +} + +/// The functions supported by the consul client. +#[derive(Debug, Clone, Copy)] +pub enum Function { + /// The read_key function. + ReadKey, + /// The create_or_update_key function. + CreateOrUpdateKey, + /// The delete_key function. + DeleteKey, + /// The register_entity function. + RegisterEntity, + /// The deregister_entity function. + DeregisterEntity, + /// The get_service_nodes function. + GetServiceNodes, + /// The get_all_registered_services function. + GetAllRegisteredServices, + /// The get_session function. + GetSession, +} + +impl Function { + /// Get the function as a string. + #[cfg(feature = "metrics")] + pub fn as_str(&self) -> &'static str { + match self { + Function::ReadKey => "read_key", + Function::CreateOrUpdateKey => "create_or_update_key", + Function::DeleteKey => "delete_key", + Function::RegisterEntity => "register_entity", + Function::DeregisterEntity => "deregister_entity", + Function::GetServiceNodes => "get_service_nodes", + Function::GetAllRegisteredServices => "get_all_registered_services", + Function::GetSession => "get_session", + } + } +}