diff --git a/Cargo.lock b/Cargo.lock index 879de461de..7ccd4d8ab4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4627,6 +4627,7 @@ dependencies = [ "metrics-util", "thiserror", "tokio", + "tokio-util", "tracing", ] diff --git a/bin/torii/src/main.rs b/bin/torii/src/main.rs index 65ba340c16..c55da2f464 100644 --- a/bin/torii/src/main.rs +++ b/bin/torii/src/main.rs @@ -20,7 +20,7 @@ use std::time::Duration; use anyhow::Context; use clap::{ArgAction, Parser}; -use dojo_metrics::{metrics_process, prometheus_exporter}; +use dojo_metrics::exporters::prometheus::PrometheusRecorder; use dojo_utils::parse::{parse_socket_address, parse_url}; use dojo_world::contracts::world::WorldContractReader; use sqlx::sqlite::{ @@ -296,16 +296,10 @@ async fn main() -> anyhow::Result<()> { } if let Some(listen_addr) = args.metrics { - let prometheus_handle = prometheus_exporter::install_recorder("torii")?; - info!(target: LOG_TARGET, addr = %listen_addr, "Starting metrics endpoint."); - prometheus_exporter::serve( - listen_addr, - prometheus_handle, - metrics_process::Collector::default(), - Vec::new(), - ) - .await?; + let prometheus_handle = PrometheusRecorder::install("torii")?; + let server = dojo_metrics::Server::new(prometheus_handle).with_process_metrics(); + tokio::spawn(server.start(listen_addr)); } let engine_handle = tokio::spawn(async move { engine.start().await }); diff --git a/crates/katana/node/src/lib.rs b/crates/katana/node/src/lib.rs index 253f54b3fb..361dc9baf8 100644 --- a/crates/katana/node/src/lib.rs +++ b/crates/katana/node/src/lib.rs @@ -12,8 +12,8 @@ use anyhow::Result; use config::metrics::MetricsConfig; use config::rpc::{ApiKind, RpcConfig}; use config::{Config, SequencingConfig}; -use dojo_metrics::prometheus_exporter::PrometheusHandle; -use dojo_metrics::{metrics_process, prometheus_exporter, Report}; +use dojo_metrics::exporters::prometheus::PrometheusRecorder; +use dojo_metrics::{Report, Server as MetricsServer}; use hyper::{Method, Uri}; use jsonrpsee::server::middleware::proxy_get_request::ProxyGetRequestLayer; use jsonrpsee::server::{AllowHosts, ServerBuilder, ServerHandle}; @@ -81,7 +81,6 @@ pub struct Node { pub pool: TxPool, pub db: Option, pub task_manager: TaskManager, - pub prometheus_handle: PrometheusHandle, pub backend: Arc>, pub block_producer: BlockProducer, pub rpc_config: RpcConfig, @@ -98,23 +97,19 @@ impl Node { let chain = self.backend.chain_spec.id; info!(%chain, "Starting node."); + // TODO: maybe move this to the build stage if let Some(ref cfg) = self.metrics_config { - let addr = cfg.addr; - let mut reports = Vec::new(); + let mut reports: Vec> = Vec::new(); if let Some(ref db) = self.db { reports.push(Box::new(db.clone()) as Box); } - prometheus_exporter::serve( - addr, - self.prometheus_handle.clone(), - metrics_process::Collector::default(), - reports, - ) - .await?; + let exporter = PrometheusRecorder::current().expect("qed; should exist at this point"); + let server = MetricsServer::new(exporter).with_process_metrics().with_reports(reports); - info!(%addr, "Metrics endpoint started."); + self.task_manager.task_spawner().build_task().spawn(server.start(cfg.addr)); + info!(addr = %cfg.addr, "Metrics server started."); } let pool = self.pool.clone(); @@ -156,9 +151,11 @@ impl Node { /// This returns a [`Node`] instance which can be launched with the all the necessary components /// configured. pub async fn build(mut config: Config) -> Result { - // Metrics recorder must be initialized before calling any of the metrics macros, in order - // for it to be registered. - let prometheus_handle = prometheus_exporter::install_recorder("katana")?; + if config.metrics.is_some() { + // Metrics recorder must be initialized before calling any of the metrics macros, in order + // for it to be registered. + let _ = PrometheusRecorder::install("katana")?; + } // --- build executor factory @@ -223,7 +220,6 @@ pub async fn build(mut config: Config) -> Result { pool, backend, block_producer, - prometheus_handle, rpc_config: config.rpc, metrics_config: config.metrics, messaging_config: config.messaging, diff --git a/crates/metrics/Cargo.toml b/crates/metrics/Cargo.toml index 6aaace8d77..2faa0c043d 100644 --- a/crates/metrics/Cargo.toml +++ b/crates/metrics/Cargo.toml @@ -18,6 +18,7 @@ metrics-derive = "0.1" metrics-exporter-prometheus = "0.15.3" metrics-process = "2.1.0" metrics-util = "0.17.0" +tokio-util.workspace = true [target.'cfg(not(windows))'.dependencies] jemalloc-ctl = { version = "0.5.0", optional = true } diff --git a/crates/metrics/src/exporters/mod.rs b/crates/metrics/src/exporters/mod.rs new file mode 100644 index 0000000000..c4c8e150da --- /dev/null +++ b/crates/metrics/src/exporters/mod.rs @@ -0,0 +1,7 @@ +pub mod prometheus; + +/// Trait for metrics recorder whose metrics can be exported. +pub trait Exporter: Clone + Send + Sync { + /// Export the metrics that have been recorded by the metrics thus far. + fn export(&self) -> String; +} diff --git a/crates/metrics/src/exporters/prometheus.rs b/crates/metrics/src/exporters/prometheus.rs new file mode 100644 index 0000000000..e06bc63aee --- /dev/null +++ b/crates/metrics/src/exporters/prometheus.rs @@ -0,0 +1,52 @@ +//! Prometheus exporter + +use std::sync::OnceLock; + +use metrics_exporter_prometheus::PrometheusBuilder; +pub use metrics_exporter_prometheus::PrometheusHandle as Prometheus; +use metrics_util::layers::{PrefixLayer, Stack}; +use tracing::info; + +use crate::exporters::Exporter; +use crate::Error; + +static PROMETHEUS_HANDLE: OnceLock = OnceLock::new(); + +/// Prometheus exporter recorder. +#[derive(Debug)] +pub struct PrometheusRecorder; + +impl PrometheusRecorder { + /// Installs Prometheus as the metrics recorder. + /// + /// ## Arguments + /// + /// * `prefix` - Apply a prefix to all metrics keys. + pub fn install(prefix: &str) -> Result { + let recorder = PrometheusBuilder::new().build_recorder(); + let handle = recorder.handle(); + + // Build metrics stack and install the recorder + Stack::new(recorder) + .push(PrefixLayer::new(prefix)) + .install() + .map_err(|_| Error::GlobalRecorderAlreadyInstalled)?; + + info!(target: "metrics", %prefix, "Prometheus recorder installed."); + + let _ = PROMETHEUS_HANDLE.set(handle.clone()); + + Ok(handle) + } + + /// Get the handle to the installed Prometheus recorder (if any). + pub fn current() -> Option { + PROMETHEUS_HANDLE.get().cloned() + } +} + +impl Exporter for Prometheus { + fn export(&self) -> String { + self.render() + } +} diff --git a/crates/metrics/src/lib.rs b/crates/metrics/src/lib.rs index 8c0e84e00f..acdbb44891 100644 --- a/crates/metrics/src/lib.rs +++ b/crates/metrics/src/lib.rs @@ -1,4 +1,8 @@ -pub mod prometheus_exporter; +pub mod exporters; +mod process; +mod server; + +use std::net::SocketAddr; #[cfg(all(feature = "jemalloc", unix))] use jemallocator as _; @@ -8,12 +12,25 @@ pub use metrics; pub use metrics_derive::Metrics; /// Re-export the metrics-process crate pub use metrics_process; +pub use server::*; // We use jemalloc for performance reasons #[cfg(all(feature = "jemalloc", unix))] #[global_allocator] static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("global metrics recorder already installed.")] + GlobalRecorderAlreadyInstalled, + + #[error("could not bind to address: {addr}")] + FailedToBindAddress { addr: SocketAddr }, + + #[error(transparent)] + Server(#[from] hyper::Error), +} + /// A helper trait for reporting metrics. /// /// This is meant for types that require a specific trigger to register their metrics. @@ -21,3 +38,9 @@ pub trait Report: Send + Sync { /// Report the metrics. fn report(&self); } + +impl Report for ::metrics_process::Collector { + fn report(&self) { + self.collect(); + } +} diff --git a/crates/metrics/src/process.rs b/crates/metrics/src/process.rs new file mode 100644 index 0000000000..ab715b9413 --- /dev/null +++ b/crates/metrics/src/process.rs @@ -0,0 +1,122 @@ +use metrics::{describe_gauge, gauge}; + +const LOG_TARGET: &str = "metrics"; + +#[cfg(all(feature = "jemalloc", unix))] +pub fn collect_memory_stats() { + use jemalloc_ctl::{epoch, stats}; + + if epoch::advance() + .map_err(|error| { + tracing::error!( + target: LOG_TARGET, + error = %error, + "Advance jemalloc epoch." + ) + }) + .is_err() + { + return; + } + + if let Ok(value) = stats::active::read().map_err(|error| { + tracing::error!( + target: LOG_TARGET, + error = %error, + "Read jemalloc.stats.active." + ) + }) { + gauge!("jemalloc.active").increment(value as f64); + } + + if let Ok(value) = stats::allocated::read().map_err(|error| { + tracing::error!( + target: LOG_TARGET, + error = %error, + "Read jemalloc.stats.allocated." + ) + }) { + gauge!("jemalloc.allocated").increment(value as f64); + } + + if let Ok(value) = stats::mapped::read().map_err(|error| { + tracing::error!( + target: LOG_TARGET, + error = %error, + "Read jemalloc.stats.mapped." + ) + }) { + gauge!("jemalloc.mapped").increment(value as f64); + } + + if let Ok(value) = stats::metadata::read().map_err(|error| { + tracing::error!( + target: LOG_TARGET, + error = %error, + "Read jemalloc.stats.metadata." + ) + }) { + gauge!("jemalloc.metadata").increment(value as f64); + } + + if let Ok(value) = stats::resident::read().map_err(|error| { + tracing::error!( + target: LOG_TARGET, + error = %error, + "Read jemalloc.stats.resident." + ) + }) { + gauge!("jemalloc.resident").increment(value as f64); + } + + if let Ok(value) = stats::retained::read().map_err(|error| { + tracing::error!( + target: LOG_TARGET, + error = %error, + "Read jemalloc.stats.retained." + ) + }) { + gauge!("jemalloc.retained").increment(value as f64); + } +} + +#[cfg(all(feature = "jemalloc", unix))] +pub fn describe_memory_stats() { + describe_gauge!( + "jemalloc.active", + metrics::Unit::Bytes, + "Total number of bytes in active pages allocated by the application" + ); + describe_gauge!( + "jemalloc.allocated", + metrics::Unit::Bytes, + "Total number of bytes allocated by the application" + ); + describe_gauge!( + "jemalloc.mapped", + metrics::Unit::Bytes, + "Total number of bytes in active extents mapped by the allocator" + ); + describe_gauge!( + "jemalloc.metadata", + metrics::Unit::Bytes, + "Total number of bytes dedicated to jemalloc metadata" + ); + describe_gauge!( + "jemalloc.resident", + metrics::Unit::Bytes, + "Total number of bytes in physically resident data pages mapped by the allocator" + ); + describe_gauge!( + "jemalloc.retained", + metrics::Unit::Bytes, + "Total number of bytes in virtual memory mappings that were retained rather than being \ + returned to the operating system via e.g. munmap(2)" + ); +} + +#[cfg(not(all(feature = "jemalloc", unix)))] +pub fn collect_memory_stats() {} + +#[cfg(not(all(feature = "jemalloc", unix)))] +pub fn describe_memory_stats() {} diff --git a/crates/metrics/src/prometheus_exporter.rs b/crates/metrics/src/prometheus_exporter.rs deleted file mode 100644 index deaa9a0133..0000000000 --- a/crates/metrics/src/prometheus_exporter.rs +++ /dev/null @@ -1,228 +0,0 @@ -//! Prometheus exporter -//! Adapted from Paradigm's [`reth`](https://github.com/paradigmxyz/reth/blob/c1d7d2bde398bcf410c7e2df13fd7151fc2a58b9/bin/reth/src/prometheus_exporter.rs) - -use std::convert::Infallible; -use std::net::SocketAddr; -use std::sync::Arc; - -use anyhow::{Context, Result}; -use hyper::service::{make_service_fn, service_fn}; -use hyper::{Body, Request, Response, Server}; -use metrics::{describe_gauge, gauge}; -use metrics_exporter_prometheus::PrometheusBuilder; -pub use metrics_exporter_prometheus::PrometheusHandle; -use metrics_util::layers::{PrefixLayer, Stack}; - -use crate::Report; - -pub(crate) const LOG_TARGET: &str = "metrics::prometheus_exporter"; - -pub(crate) trait Hook: Fn() + Send + Sync {} -impl Hook for T {} - -/// Installs Prometheus as the metrics recorder. -/// -/// ## Arguments -/// * `prefix` - Apply a prefix to all metrics keys. -pub fn install_recorder(prefix: &str) -> Result { - let recorder = PrometheusBuilder::new().build_recorder(); - let handle = recorder.handle(); - - // Build metrics stack and install the recorder - Stack::new(recorder) - .push(PrefixLayer::new(prefix)) - .install() - .context("Couldn't set metrics recorder")?; - - Ok(handle) -} - -/// Serves Prometheus metrics over HTTP with database and process metrics. -pub async fn serve( - listen_addr: SocketAddr, - handle: PrometheusHandle, - process: metrics_process::Collector, - reports: Vec>, -) -> Result<()> { - // Clone `process` to move it into the hook and use the original `process` for describe below. - let cloned_process = process.clone(); - - let mut hooks: Vec>> = - vec![Box::new(move || cloned_process.collect()), Box::new(collect_memory_stats)]; - - let report_hooks = - reports.into_iter().map(|r| Box::new(move || r.report()) as Box>); - - hooks.extend(report_hooks); - - serve_with_hooks(listen_addr, handle, hooks).await?; - - process.describe(); - describe_memory_stats(); - - Ok(()) -} - -/// Serves Prometheus metrics over HTTP with hooks. -/// -/// The hooks are called every time the metrics are requested at the given endpoint, and can be used -/// to record values for pull-style metrics, i.e. metrics that are not automatically updated. -async fn serve_with_hooks( - listen_addr: SocketAddr, - handle: PrometheusHandle, - hooks: impl IntoIterator, -) -> Result<()> { - let hooks: Vec<_> = hooks.into_iter().collect(); - - // Start endpoint - start_endpoint(listen_addr, handle, Arc::new(move || hooks.iter().for_each(|hook| hook()))) - .await - .context("Could not start Prometheus endpoint")?; - - Ok(()) -} - -/// Starts an endpoint at the given address to serve Prometheus metrics. -async fn start_endpoint( - listen_addr: SocketAddr, - handle: PrometheusHandle, - hook: Arc, -) -> Result<()> { - let make_svc = make_service_fn(move |_| { - let handle = handle.clone(); - let hook = Arc::clone(&hook); - async move { - Ok::<_, Infallible>(service_fn(move |_: Request| { - (hook)(); - let metrics = handle.render(); - async move { Ok::<_, Infallible>(Response::new(Body::from(metrics))) } - })) - } - }); - let server = Server::try_bind(&listen_addr) - .context(format!("Could not bind to address: {listen_addr}"))? - .serve(make_svc); - - tokio::spawn(async move { server.await.expect("Metrics endpoint crashed") }); - - Ok(()) -} - -#[cfg(all(feature = "jemalloc", unix))] -fn collect_memory_stats() { - use jemalloc_ctl::{epoch, stats}; - - if epoch::advance() - .map_err(|error| { - tracing::error!( - target: LOG_TARGET, - error = %error, - "Advance jemalloc epoch." - ) - }) - .is_err() - { - return; - } - - if let Ok(value) = stats::active::read().map_err(|error| { - tracing::error!( - target: LOG_TARGET, - error = %error, - "Read jemalloc.stats.active." - ) - }) { - gauge!("jemalloc.active").increment(value as f64); - } - - if let Ok(value) = stats::allocated::read().map_err(|error| { - tracing::error!( - target: LOG_TARGET, - error = %error, - "Read jemalloc.stats.allocated." - ) - }) { - gauge!("jemalloc.allocated").increment(value as f64); - } - - if let Ok(value) = stats::mapped::read().map_err(|error| { - tracing::error!( - target: LOG_TARGET, - error = %error, - "Read jemalloc.stats.mapped." - ) - }) { - gauge!("jemalloc.mapped").increment(value as f64); - } - - if let Ok(value) = stats::metadata::read().map_err(|error| { - tracing::error!( - target: LOG_TARGET, - error = %error, - "Read jemalloc.stats.metadata." - ) - }) { - gauge!("jemalloc.metadata").increment(value as f64); - } - - if let Ok(value) = stats::resident::read().map_err(|error| { - tracing::error!( - target: LOG_TARGET, - error = %error, - "Read jemalloc.stats.resident." - ) - }) { - gauge!("jemalloc.resident").increment(value as f64); - } - - if let Ok(value) = stats::retained::read().map_err(|error| { - tracing::error!( - target: LOG_TARGET, - error = %error, - "Read jemalloc.stats.retained." - ) - }) { - gauge!("jemalloc.retained").increment(value as f64); - } -} - -#[cfg(all(feature = "jemalloc", unix))] -fn describe_memory_stats() { - describe_gauge!( - "jemalloc.active", - metrics::Unit::Bytes, - "Total number of bytes in active pages allocated by the application" - ); - describe_gauge!( - "jemalloc.allocated", - metrics::Unit::Bytes, - "Total number of bytes allocated by the application" - ); - describe_gauge!( - "jemalloc.mapped", - metrics::Unit::Bytes, - "Total number of bytes in active extents mapped by the allocator" - ); - describe_gauge!( - "jemalloc.metadata", - metrics::Unit::Bytes, - "Total number of bytes dedicated to jemalloc metadata" - ); - describe_gauge!( - "jemalloc.resident", - metrics::Unit::Bytes, - "Total number of bytes in physically resident data pages mapped by the allocator" - ); - describe_gauge!( - "jemalloc.retained", - metrics::Unit::Bytes, - "Total number of bytes in virtual memory mappings that were retained rather than being \ - returned to the operating system via e.g. munmap(2)" - ); -} - -#[cfg(not(all(feature = "jemalloc", unix)))] -fn collect_memory_stats() {} - -#[cfg(not(all(feature = "jemalloc", unix)))] -fn describe_memory_stats() {} diff --git a/crates/metrics/src/server.rs b/crates/metrics/src/server.rs new file mode 100644 index 0000000000..e49aea9032 --- /dev/null +++ b/crates/metrics/src/server.rs @@ -0,0 +1,101 @@ +use core::fmt; +use std::convert::Infallible; +use std::net::SocketAddr; +use std::sync::Arc; + +use hyper::service::{make_service_fn, service_fn}; +use hyper::{Body, Request, Response}; + +use crate::exporters::Exporter; +use crate::{Error, Report}; + +/// A helper trait for defining the type for hooks that are called when the metrics are being +/// collected by the server. +trait Hook: Fn() + Send + Sync {} +impl Hook for T {} + +/// A boxed [`Hook`]. +type BoxedHook = Box>; +/// A list of [BoxedHook]. +type Hooks = Vec; + +/// Server for serving metrics. +// TODO: allow configuring the server executor to allow cancelling on invidiual connection tasks. +// See, [hyper::server::server::Builder::executor] +pub struct Server { + /// Hooks or callable functions for collecting metrics in the cases where + /// the metrics are not being collected in the main program flow. + /// + /// These are called when metrics are being served through the server. + hooks: Hooks, + /// The exporter that is used to export the collected metrics. + exporter: MetricsExporter, +} + +impl Server +where + MetricsExporter: Exporter + 'static, +{ + /// Creates a new metrics server using the given exporter. + pub fn new(exporter: MetricsExporter) -> Self { + Self { exporter, hooks: Vec::new() } + } + + /// Add new metrics reporter to the server. + pub fn with_reports(mut self, reports: I) -> Self + where + I: IntoIterator>, + { + // convert the report types into callable hooks + let hooks = reports.into_iter().map(|r| Box::new(move || r.report()) as BoxedHook); + self.hooks.extend(hooks); + self + } + + pub fn with_process_metrics(mut self) -> Self { + use crate::process::{collect_memory_stats, describe_memory_stats}; + + let process = metrics_process::Collector::default(); + process.describe(); + describe_memory_stats(); + + let hooks: Hooks = + vec![Box::new(collect_memory_stats), Box::new(move || process.collect())]; + + self.hooks.extend(hooks); + self + } + + /// Starts an endpoint at the given address to serve Prometheus metrics. + pub async fn start(self, addr: SocketAddr) -> Result<(), Error> { + let hooks = Arc::new(move || self.hooks.iter().for_each(|hook| hook())); + + hyper::Server::try_bind(&addr) + .map_err(|_| Error::FailedToBindAddress { addr })? + .serve(make_service_fn(move |_| { + let hook = Arc::clone(&hooks); + let exporter = self.exporter.clone(); + async move { + Ok::<_, Infallible>(service_fn(move |_: Request| { + // call the hooks to collect metrics before exporting them + (hook)(); + // export the metrics from the installed exporter and send as response + let metrics = Body::from(exporter.export()); + async move { Ok::<_, Infallible>(Response::new(metrics)) } + })) + } + })) + .await?; + + Ok(()) + } +} + +impl fmt::Debug for Server +where + MetricsExporter: fmt::Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Server").field("hooks", &"...").field("exporter", &self.exporter).finish() + } +}