Skip to content

Commit

Permalink
refactor(metrics): separate metrics recorder impl from the server (#2561
Browse files Browse the repository at this point in the history
)
  • Loading branch information
kariy authored Oct 19, 2024
1 parent a20b53c commit fac93b0
Show file tree
Hide file tree
Showing 10 changed files with 325 additions and 256 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 4 additions & 10 deletions bin/torii/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::time::Duration;

use anyhow::Context;
use clap::{ArgAction, Parser};
use dojo_metrics::{metrics_process, prometheus_exporter};
use dojo_metrics::exporters::prometheus::PrometheusRecorder;
use dojo_utils::parse::{parse_socket_address, parse_url};
use dojo_world::contracts::world::WorldContractReader;
use sqlx::sqlite::{
Expand Down Expand Up @@ -296,16 +296,10 @@ async fn main() -> anyhow::Result<()> {
}

if let Some(listen_addr) = args.metrics {
let prometheus_handle = prometheus_exporter::install_recorder("torii")?;

info!(target: LOG_TARGET, addr = %listen_addr, "Starting metrics endpoint.");
prometheus_exporter::serve(
listen_addr,
prometheus_handle,
metrics_process::Collector::default(),
Vec::new(),
)
.await?;
let prometheus_handle = PrometheusRecorder::install("torii")?;
let server = dojo_metrics::Server::new(prometheus_handle).with_process_metrics();
tokio::spawn(server.start(listen_addr));
}

let engine_handle = tokio::spawn(async move { engine.start().await });
Expand Down
30 changes: 13 additions & 17 deletions crates/katana/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use anyhow::Result;
use config::metrics::MetricsConfig;
use config::rpc::{ApiKind, RpcConfig};
use config::{Config, SequencingConfig};
use dojo_metrics::prometheus_exporter::PrometheusHandle;
use dojo_metrics::{metrics_process, prometheus_exporter, Report};
use dojo_metrics::exporters::prometheus::PrometheusRecorder;
use dojo_metrics::{Report, Server as MetricsServer};
use hyper::{Method, Uri};
use jsonrpsee::server::middleware::proxy_get_request::ProxyGetRequestLayer;
use jsonrpsee::server::{AllowHosts, ServerBuilder, ServerHandle};
Expand Down Expand Up @@ -81,7 +81,6 @@ pub struct Node {
pub pool: TxPool,
pub db: Option<DbEnv>,
pub task_manager: TaskManager,
pub prometheus_handle: PrometheusHandle,
pub backend: Arc<Backend<BlockifierFactory>>,
pub block_producer: BlockProducer<BlockifierFactory>,
pub rpc_config: RpcConfig,
Expand All @@ -98,23 +97,19 @@ impl Node {
let chain = self.backend.chain_spec.id;
info!(%chain, "Starting node.");

// TODO: maybe move this to the build stage
if let Some(ref cfg) = self.metrics_config {
let addr = cfg.addr;
let mut reports = Vec::new();
let mut reports: Vec<Box<dyn Report>> = Vec::new();

if let Some(ref db) = self.db {
reports.push(Box::new(db.clone()) as Box<dyn Report>);
}

prometheus_exporter::serve(
addr,
self.prometheus_handle.clone(),
metrics_process::Collector::default(),
reports,
)
.await?;
let exporter = PrometheusRecorder::current().expect("qed; should exist at this point");
let server = MetricsServer::new(exporter).with_process_metrics().with_reports(reports);

info!(%addr, "Metrics endpoint started.");
self.task_manager.task_spawner().build_task().spawn(server.start(cfg.addr));
info!(addr = %cfg.addr, "Metrics server started.");
}

let pool = self.pool.clone();
Expand Down Expand Up @@ -156,9 +151,11 @@ impl Node {
/// This returns a [`Node`] instance which can be launched with the all the necessary components
/// configured.
pub async fn build(mut config: Config) -> Result<Node> {
// Metrics recorder must be initialized before calling any of the metrics macros, in order
// for it to be registered.
let prometheus_handle = prometheus_exporter::install_recorder("katana")?;
if config.metrics.is_some() {
// Metrics recorder must be initialized before calling any of the metrics macros, in order
// for it to be registered.
let _ = PrometheusRecorder::install("katana")?;
}

// --- build executor factory

Expand Down Expand Up @@ -223,7 +220,6 @@ pub async fn build(mut config: Config) -> Result<Node> {
pool,
backend,
block_producer,
prometheus_handle,
rpc_config: config.rpc,
metrics_config: config.metrics,
messaging_config: config.messaging,
Expand Down
1 change: 1 addition & 0 deletions crates/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ metrics-derive = "0.1"
metrics-exporter-prometheus = "0.15.3"
metrics-process = "2.1.0"
metrics-util = "0.17.0"
tokio-util.workspace = true

[target.'cfg(not(windows))'.dependencies]
jemalloc-ctl = { version = "0.5.0", optional = true }
Expand Down
7 changes: 7 additions & 0 deletions crates/metrics/src/exporters/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pub mod prometheus;

/// Trait for metrics recorder whose metrics can be exported.
pub trait Exporter: Clone + Send + Sync {
/// Export the metrics that have been recorded by the metrics thus far.
fn export(&self) -> String;
}
52 changes: 52 additions & 0 deletions crates/metrics/src/exporters/prometheus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//! Prometheus exporter
use std::sync::OnceLock;

use metrics_exporter_prometheus::PrometheusBuilder;
pub use metrics_exporter_prometheus::PrometheusHandle as Prometheus;
use metrics_util::layers::{PrefixLayer, Stack};
use tracing::info;

use crate::exporters::Exporter;
use crate::Error;

static PROMETHEUS_HANDLE: OnceLock<Prometheus> = OnceLock::new();

/// Prometheus exporter recorder.
#[derive(Debug)]
pub struct PrometheusRecorder;

impl PrometheusRecorder {
/// Installs Prometheus as the metrics recorder.
///
/// ## Arguments
///
/// * `prefix` - Apply a prefix to all metrics keys.
pub fn install(prefix: &str) -> Result<Prometheus, Error> {
let recorder = PrometheusBuilder::new().build_recorder();
let handle = recorder.handle();

// Build metrics stack and install the recorder
Stack::new(recorder)
.push(PrefixLayer::new(prefix))
.install()
.map_err(|_| Error::GlobalRecorderAlreadyInstalled)?;

info!(target: "metrics", %prefix, "Prometheus recorder installed.");

let _ = PROMETHEUS_HANDLE.set(handle.clone());

Ok(handle)
}

/// Get the handle to the installed Prometheus recorder (if any).
pub fn current() -> Option<Prometheus> {
PROMETHEUS_HANDLE.get().cloned()
}
}

impl Exporter for Prometheus {
fn export(&self) -> String {
self.render()
}
}
25 changes: 24 additions & 1 deletion crates/metrics/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
pub mod prometheus_exporter;
pub mod exporters;
mod process;
mod server;

use std::net::SocketAddr;

#[cfg(all(feature = "jemalloc", unix))]
use jemallocator as _;
Expand All @@ -8,16 +12,35 @@ pub use metrics;
pub use metrics_derive::Metrics;
/// Re-export the metrics-process crate
pub use metrics_process;
pub use server::*;

// We use jemalloc for performance reasons
#[cfg(all(feature = "jemalloc", unix))]
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("global metrics recorder already installed.")]
GlobalRecorderAlreadyInstalled,

#[error("could not bind to address: {addr}")]
FailedToBindAddress { addr: SocketAddr },

#[error(transparent)]
Server(#[from] hyper::Error),
}

/// A helper trait for reporting metrics.
///
/// This is meant for types that require a specific trigger to register their metrics.
pub trait Report: Send + Sync {
/// Report the metrics.
fn report(&self);
}

impl Report for ::metrics_process::Collector {
fn report(&self) {
self.collect();
}
}
122 changes: 122 additions & 0 deletions crates/metrics/src/process.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use metrics::{describe_gauge, gauge};

const LOG_TARGET: &str = "metrics";

#[cfg(all(feature = "jemalloc", unix))]
pub fn collect_memory_stats() {
use jemalloc_ctl::{epoch, stats};

if epoch::advance()
.map_err(|error| {
tracing::error!(
target: LOG_TARGET,
error = %error,
"Advance jemalloc epoch."
)
})
.is_err()
{
return;
}

if let Ok(value) = stats::active::read().map_err(|error| {
tracing::error!(
target: LOG_TARGET,
error = %error,
"Read jemalloc.stats.active."
)
}) {
gauge!("jemalloc.active").increment(value as f64);
}

if let Ok(value) = stats::allocated::read().map_err(|error| {
tracing::error!(
target: LOG_TARGET,
error = %error,
"Read jemalloc.stats.allocated."
)
}) {
gauge!("jemalloc.allocated").increment(value as f64);
}

if let Ok(value) = stats::mapped::read().map_err(|error| {
tracing::error!(
target: LOG_TARGET,
error = %error,
"Read jemalloc.stats.mapped."
)
}) {
gauge!("jemalloc.mapped").increment(value as f64);
}

if let Ok(value) = stats::metadata::read().map_err(|error| {
tracing::error!(
target: LOG_TARGET,
error = %error,
"Read jemalloc.stats.metadata."
)
}) {
gauge!("jemalloc.metadata").increment(value as f64);
}

if let Ok(value) = stats::resident::read().map_err(|error| {
tracing::error!(
target: LOG_TARGET,
error = %error,
"Read jemalloc.stats.resident."
)
}) {
gauge!("jemalloc.resident").increment(value as f64);
}

if let Ok(value) = stats::retained::read().map_err(|error| {
tracing::error!(
target: LOG_TARGET,
error = %error,
"Read jemalloc.stats.retained."
)
}) {
gauge!("jemalloc.retained").increment(value as f64);
}
}

#[cfg(all(feature = "jemalloc", unix))]
pub fn describe_memory_stats() {
describe_gauge!(
"jemalloc.active",
metrics::Unit::Bytes,
"Total number of bytes in active pages allocated by the application"
);
describe_gauge!(
"jemalloc.allocated",
metrics::Unit::Bytes,
"Total number of bytes allocated by the application"
);
describe_gauge!(
"jemalloc.mapped",
metrics::Unit::Bytes,
"Total number of bytes in active extents mapped by the allocator"
);
describe_gauge!(
"jemalloc.metadata",
metrics::Unit::Bytes,
"Total number of bytes dedicated to jemalloc metadata"
);
describe_gauge!(
"jemalloc.resident",
metrics::Unit::Bytes,
"Total number of bytes in physically resident data pages mapped by the allocator"
);
describe_gauge!(
"jemalloc.retained",
metrics::Unit::Bytes,
"Total number of bytes in virtual memory mappings that were retained rather than being \
returned to the operating system via e.g. munmap(2)"
);
}

#[cfg(not(all(feature = "jemalloc", unix)))]
pub fn collect_memory_stats() {}

#[cfg(not(all(feature = "jemalloc", unix)))]
pub fn describe_memory_stats() {}
Loading

0 comments on commit fac93b0

Please sign in to comment.