Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics exporter to Prometheus with OTLP #1438

Merged
merged 17 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ipa-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ ipa-metrics = { path = "../ipa-metrics" }
ipa-metrics-tracing = { optional = true, path = "../ipa-metrics-tracing" }
ipa-step = { version = "*", path = "../ipa-step" }
ipa-step-derive = { version = "*", path = "../ipa-step-derive" }
ipa-metrics-prometheus = { path = "../ipa-metrics-prometheus" }

aes = "0.8.3"
async-trait = "0.1.79"
Expand Down Expand Up @@ -177,6 +178,7 @@ rustls = { version = "0.23" }
tempfile = "3"
ipa-metrics-tracing = { path = "../ipa-metrics-tracing" }
ipa-metrics = { path = "../ipa-metrics", features = ["partitions"] }
ipa-metrics-prometheus = { path = "../ipa-metrics-prometheus" }


[lib]
Expand Down
9 changes: 9 additions & 0 deletions ipa-core/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Weak;
use async_trait::async_trait;

use crate::{
cli::LoggingHandle,
executor::IpaRuntime,
helpers::{
query::{CompareStatusRequest, PrepareQuery, QueryConfig, QueryInput},
Expand Down Expand Up @@ -65,6 +66,7 @@ struct Inner {
/// the flamegraph
mpc_transport: MpcTransportImpl,
shard_transport: ShardTransportImpl,
logging_handle: LoggingHandle,
}

impl Setup {
Expand Down Expand Up @@ -96,11 +98,13 @@ impl Setup {
self,
mpc_transport: MpcTransportImpl,
shard_transport: ShardTransportImpl,
logging_handle: LoggingHandle,
) -> HelperApp {
let app = Arc::new(Inner {
query_processor: self.query_processor,
mpc_transport,
shard_transport,
logging_handle,
});
self.mpc_handler
.set_handler(Arc::downgrade(&app) as Weak<dyn RequestHandler<HelperIdentity>>);
Expand Down Expand Up @@ -278,6 +282,11 @@ impl RequestHandler<HelperIdentity> for Inner {
let query_id = ext_query_id(&req)?;
HelperResponse::from(qp.kill(query_id)?)
}
RouteId::Metrics => {
let logging_handler = &self.logging_handle;
let metrics_handle = &logging_handler.metrics_handle;
HelperResponse::from(metrics_handle.scrape_metrics())
}
})
}
}
2 changes: 1 addition & 1 deletion ipa-core/src/bin/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ async fn server(args: ServerArgs, logging_handle: LoggingHandle) -> Result<(), B
Some(shard_handler),
);

let _app = setup.connect(transport.clone(), shard_transport.clone());
let _app = setup.connect(transport.clone(), shard_transport.clone(), logging_handle);

let listener = create_listener(args.server_socket_fd)?;
let shard_listener = create_listener(args.shard_server_socket_fd)?;
Expand Down
18 changes: 16 additions & 2 deletions ipa-core/src/cli/metric_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ use std::{io, thread, thread::JoinHandle};
use ipa_metrics::{
MetricChannelType, MetricsCollectorController, MetricsCurrentThreadContext, MetricsProducer,
};
use ipa_metrics_prometheus::PrometheusMetricsExporter;
use tokio::runtime::Builder;

/// Holds a reference to metrics controller and producer
pub struct CollectorHandle {
thread_handle: JoinHandle<()>,
/// This will be used once we start consuming metrics
_controller: MetricsCollectorController,
controller: MetricsCollectorController,
producer: MetricsProducer,
}

Expand All @@ -26,7 +27,7 @@ pub fn install_collector() -> io::Result<CollectorHandle> {

Ok(CollectorHandle {
thread_handle: handle,
_controller: controller,
controller,
producer,
})
}
Expand All @@ -53,4 +54,17 @@ impl CollectorHandle {
.on_thread_stop(flush_fn)
.on_thread_park(flush_fn)
}

/// Export the metrics to be consumed by metrics scraper, e.g. Prometheus
///
/// # Panics
/// If metrics is not initialized
#[must_use]
pub fn scrape_metrics(&self) -> Vec<u8> {
let mut store = self.controller.snapshot().expect("Metrics must be set up");
let mut buff = Vec::new();
store.export(&mut buff);

buff
}
}
6 changes: 6 additions & 0 deletions ipa-core/src/helpers/transport/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ impl<R: AsRef<dyn ProtocolResult>> From<R> for HelperResponse {
}
}

impl From<Vec<u8>> for HelperResponse {
fn from(value: Vec<u8>) -> Self {
Self { body: value }
}
}

/// Union of error types returned by API operations.
#[derive(thiserror::Error, Debug)]
pub enum Error {
Expand Down
3 changes: 2 additions & 1 deletion ipa-core/src/helpers/transport/in_memory/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ impl<I: TransportIdentity> InMemoryTransport<I> {
| RouteId::QueryInput
| RouteId::QueryStatus
| RouteId::CompleteQuery
| RouteId::KillQuery => {
| RouteId::KillQuery
| RouteId::Metrics => {
handler
.as_ref()
.expect("Handler is set")
Expand Down
20 changes: 20 additions & 0 deletions ipa-core/src/helpers/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,26 @@ where
fn extra(&self) -> Self::Params;
}

impl RouteParams<RouteId, NoQueryId, NoStep> for RouteId {
type Params = &'static str;

fn resource_identifier(&self) -> RouteId {
*self
}

fn query_id(&self) -> NoQueryId {
NoQueryId
}

fn gate(&self) -> NoStep {
NoStep
}

fn extra(&self) -> Self::Params {
""
}
}

impl RouteParams<NoResourceIdentifier, QueryId, Gate> for (QueryId, Gate) {
type Params = &'static str;

Expand Down
1 change: 1 addition & 0 deletions ipa-core/src/helpers/transport/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub enum RouteId {
QueryStatus,
CompleteQuery,
KillQuery,
Metrics,
}

/// The header/metadata of the incoming request.
Expand Down
10 changes: 10 additions & 0 deletions ipa-core/src/net/http_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ pub mod echo {
pub const AXUM_PATH: &str = "/echo";
}

pub mod metrics {

use serde::{Deserialize, Serialize};

#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Request {}

pub const AXUM_PATH: &str = "/metrics";
}

pub mod query {
use std::fmt::{Display, Formatter};

Expand Down
14 changes: 8 additions & 6 deletions ipa-core/src/net/server/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ use axum::Router;
use crate::net::{http_serde, transport::MpcHttpTransport, ShardHttpTransport};

pub fn mpc_router(transport: MpcHttpTransport) -> Router {
echo::router().nest(
http_serde::query::BASE_AXUM_PATH,
Router::new()
.merge(query::query_router(transport.clone()))
.merge(query::h2h_router(transport)),
)
echo::router()
.nest(
http_serde::query::BASE_AXUM_PATH,
Router::new()
.merge(query::query_router(transport.clone()))
.merge(query::h2h_router(transport.clone())),
)
.merge(query::metric_router(transport))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Does this handler belong inside the query module? You can perhaps park it alongside echo

}

pub fn shard_router(transport: ShardHttpTransport) -> Router {
Expand Down
28 changes: 28 additions & 0 deletions ipa-core/src/net/server/handlers/query/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use axum::{routing::get, Extension, Router};
use hyper::StatusCode;

use crate::{
helpers::{routing::RouteId, BodyStream},
net::{
http_serde::{self},
Error, MpcHttpTransport,
},
};

/// Takes details from the HTTP request and creates a `[TransportCommand]::CreateQuery` that is sent
/// to the [`HttpTransport`].
async fn handler(transport: Extension<MpcHttpTransport>) -> Result<Vec<u8>, Error> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd encourage to write some tests here as it is often hard to figure out why HTTP server is not responding or responds with errors. There are some examples in this folder that exercise happy path and error path

match transport
.dispatch(RouteId::Metrics, BodyStream::empty())
.await
{
Ok(resp) => Ok(resp.into_body()),
Err(err) => Err(Error::application(StatusCode::INTERNAL_SERVER_ERROR, err)),
}
}

pub fn router(transport: MpcHttpTransport) -> Router {
Router::new()
.route(http_serde::metrics::AXUM_PATH, get(handler))
.layer(Extension(transport))
}
6 changes: 6 additions & 0 deletions ipa-core/src/net/server/handlers/query/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod create;
mod input;
mod kill;
mod metrics;
mod prepare;
mod results;
mod status;
Expand Down Expand Up @@ -64,6 +65,11 @@ pub fn s2s_router(transport: ShardHttpTransport) -> Router {
.layer(layer_fn(HelperAuthentication::<_, Shard>::new))
}

/// Construct router for exporting metrics to metrics backend (e.g. Prometheus scraper)
pub fn metric_router(transport: MpcHttpTransport) -> Router {
Router::new().merge(metrics::router(transport))
}

/// Returns HTTP 401 Unauthorized if the request does not have valid authentication.
///
/// Authentication information is carried via the `ClientIdentity` request extension. The extension
Expand Down
3 changes: 2 additions & 1 deletion ipa-core/src/net/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ impl<F: ConnectionFlavor> HttpTransport<F> {
evt @ (RouteId::QueryInput
| RouteId::ReceiveQuery
| RouteId::QueryStatus
| RouteId::KillQuery) => {
| RouteId::KillQuery
| RouteId::Metrics) => {
unimplemented!(
"attempting to send client-specific request {evt:?} to another helper"
)
Expand Down
12 changes: 10 additions & 2 deletions ipa-core/src/test_fixture/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use typenum::Unsigned;

use crate::{
app::AppConfig,
cli::{install_collector, LoggingHandle},
ff::Serializable,
helpers::{
query::{QueryConfig, QueryInput},
Expand Down Expand Up @@ -68,8 +69,15 @@ impl Default for TestApp {

let mpc_network = InMemoryMpcNetwork::new(handlers.map(Some));
let shard_network = InMemoryShardNetwork::with_shards(1);
let drivers = zip3(mpc_network.transports().each_ref(), setup)
.map(|(t, s)| s.connect(Clone::clone(t), shard_network.transport(t.identity(), 0)));
let drivers = zip3(mpc_network.transports().each_ref(), setup).map(|(t, s)| {
let metrics_handle = install_collector().unwrap();
let logging_handle = LoggingHandle { metrics_handle };
s.connect(
Clone::clone(t),
shard_network.transport(t.identity(), 0),
logging_handle,
)
});

Self {
drivers,
Expand Down
16 changes: 16 additions & 0 deletions ipa-metrics-prometheus/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "ipa-metrics-prometheus"
version = "0.1.0"
edition = "2021"

[features]
default = []

[dependencies]
ipa-metrics = { path = "../ipa-metrics" }

# Open telemetry crates: opentelemetry-prometheus crate implementation is based on Opentelemetry API and SDK 0.23. (TBC)
opentelemetry = "0.24"
opentelemetry_sdk = { version = "0.24", features = ["metrics", "rt-tokio"] }
opentelemetry-prometheus = { version = "0.17" }
prometheus = "0.13.3"
77 changes: 77 additions & 0 deletions ipa-metrics-prometheus/src/exporter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use std::io;

use ipa_metrics::MetricsStore;
use opentelemetry::{metrics::MeterProvider, KeyValue};
use opentelemetry_sdk::metrics::SdkMeterProvider;
use prometheus::{self, Encoder, TextEncoder};

pub trait PrometheusMetricsExporter {
fn export<W: io::Write>(&mut self, w: &mut W);
}

impl PrometheusMetricsExporter for MetricsStore {
fn export<W: io::Write>(&mut self, w: &mut W) {
// Setup prometheus registry and open-telemetry exporter
let registry = prometheus::Registry::new();

let exporter = opentelemetry_prometheus::exporter()
.with_registry(registry.clone())
.build()
.unwrap();

let meter_provider = SdkMeterProvider::builder().with_reader(exporter).build();

// Convert the snapshot to otel struct
// TODO : We need to define a proper scope for the metrics
let meter = meter_provider.meter("ipa-helper");

let counters = self.counters();
counters.for_each(|(counter_name, counter_value)| {
let otlp_counter = meter.u64_counter(counter_name.key).init();

let attributes: Vec<KeyValue> = counter_name
.labels()
.map(|l| KeyValue::new(l.name, l.val.to_string()))
.collect();

otlp_counter.add(counter_value, &attributes[..]);
});

let encoder = TextEncoder::new();
let metric_families = registry.gather();
// TODO: Handle error?
encoder.encode(&metric_families, w).unwrap();
}
}

#[cfg(test)]
mod test {

use std::thread;

use ipa_metrics::{counter, install_new_thread, MetricChannelType};

use super::PrometheusMetricsExporter;

#[test]
fn export_to_prometheus() {
let (producer, controller, _) = install_new_thread(MetricChannelType::Rendezvous).unwrap();

thread::spawn(move || {
producer.install();
counter!("baz", 4);
counter!("bar", 1);
let _ = producer.drop_handle();
})
.join()
.unwrap();

let mut store = controller.snapshot().unwrap();

let mut buff = Vec::new();
store.export(&mut buff);

let result = String::from_utf8(buff).unwrap();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to validate the results somehow. Also consider removing the println.

println!("Export to Prometheus: {result}");
}
}
3 changes: 3 additions & 0 deletions ipa-metrics-prometheus/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod exporter;

pub use exporter::PrometheusMetricsExporter;
1 change: 0 additions & 1 deletion ipa-metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,3 @@ hashbrown = "0.15"
rustc-hash = "2.0.0"
# logging
tracing = "0.1"

Loading
Loading