Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics exporter to Prometheus with OTLP #1438

Merged
merged 17 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ipa-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ ipa-metrics = { path = "../ipa-metrics" }
ipa-metrics-tracing = { optional = true, path = "../ipa-metrics-tracing" }
ipa-step = { version = "*", path = "../ipa-step" }
ipa-step-derive = { version = "*", path = "../ipa-step-derive" }
ipa-metrics-prometheus = { path = "../ipa-metrics-prometheus" }

aes = "0.8.3"
async-trait = "0.1.79"
Expand Down Expand Up @@ -176,6 +177,7 @@ rustls = { version = "0.23" }
tempfile = "3"
ipa-metrics-tracing = { path = "../ipa-metrics-tracing" }
ipa-metrics = { path = "../ipa-metrics", features = ["partitions"] }
ipa-metrics-prometheus = { path = "../ipa-metrics-prometheus" }


[lib]
Expand Down
9 changes: 9 additions & 0 deletions ipa-core/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use async_trait::async_trait;

use crate::{
cli::LoggingHandle,
executor::IpaRuntime,
helpers::{
query::{CompareStatusRequest, PrepareQuery, QueryConfig, QueryInput},
Expand Down Expand Up @@ -65,6 +66,7 @@
/// the flamegraph
mpc_transport: MpcTransportImpl,
shard_transport: ShardTransportImpl,
logging_handle: LoggingHandle,
}

impl Setup {
Expand Down Expand Up @@ -96,11 +98,13 @@
self,
mpc_transport: MpcTransportImpl,
shard_transport: ShardTransportImpl,
logging_handle: LoggingHandle,
) -> HelperApp {
let app = Arc::new(Inner {
query_processor: self.query_processor,
mpc_transport,
shard_transport,
logging_handle,
});
self.mpc_handler
.set_handler(Arc::downgrade(&app) as Weak<dyn RequestHandler<HelperIdentity>>);
Expand Down Expand Up @@ -277,6 +281,11 @@
let query_id = ext_query_id(&req)?;
HelperResponse::from(qp.kill(query_id)?)
}
RouteId::Metrics => {
let logging_handler = &self.logging_handle;
let metrics_handle = &logging_handler.metrics_handle;
HelperResponse::from(metrics_handle.scrape_metrics())

Check warning on line 287 in ipa-core/src/app.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/app.rs#L285-L287

Added lines #L285 - L287 were not covered by tests
}
})
}
}
2 changes: 1 addition & 1 deletion ipa-core/src/bin/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@
Some(shard_handler),
);

let _app = setup.connect(transport.clone(), shard_transport.clone());
let _app = setup.connect(transport.clone(), shard_transport.clone(), logging_handle);

Check warning on line 274 in ipa-core/src/bin/helper.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/helper.rs#L274

Added line #L274 was not covered by tests

let listener = create_listener(args.server_socket_fd)?;
let shard_listener = create_listener(args.shard_server_socket_fd)?;
Expand Down
18 changes: 16 additions & 2 deletions ipa-core/src/cli/metric_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
use ipa_metrics::{
MetricChannelType, MetricsCollectorController, MetricsCurrentThreadContext, MetricsProducer,
};
use ipa_metrics_prometheus::PrometheusMetricsExporter;
use tokio::runtime::Builder;

/// Holds a reference to metrics controller and producer
pub struct CollectorHandle {
thread_handle: JoinHandle<()>,
/// This will be used once we start consuming metrics
_controller: MetricsCollectorController,
controller: MetricsCollectorController,
producer: MetricsProducer,
}

Expand All @@ -26,7 +27,7 @@

Ok(CollectorHandle {
thread_handle: handle,
_controller: controller,
controller,
producer,
})
}
Expand All @@ -53,4 +54,17 @@
.on_thread_stop(flush_fn)
.on_thread_park(flush_fn)
}

/// Export the metrics to be consumed by metrics scraper, e.g. Prometheus
///
/// # Panics
/// If metrics is not initialized
#[must_use]
pub fn scrape_metrics(&self) -> Vec<u8> {
let mut store = self.controller.snapshot().expect("Metrics must be set up");
let mut buff = Vec::new();
store.export(&mut buff);

buff
}

Check warning on line 69 in ipa-core/src/cli/metric_collector.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/cli/metric_collector.rs#L63-L69

Added lines #L63 - L69 were not covered by tests
}
6 changes: 6 additions & 0 deletions ipa-core/src/helpers/transport/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ impl<R: AsRef<dyn ProtocolResult>> From<R> for HelperResponse {
}
}

impl From<Vec<u8>> for HelperResponse {
fn from(value: Vec<u8>) -> Self {
Self { body: value }
}
}

/// Union of error types returned by API operations.
#[derive(thiserror::Error, Debug)]
pub enum Error {
Expand Down
3 changes: 2 additions & 1 deletion ipa-core/src/helpers/transport/in_memory/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ impl<I: TransportIdentity> InMemoryTransport<I> {
| RouteId::QueryInput
| RouteId::QueryStatus
| RouteId::CompleteQuery
| RouteId::KillQuery => {
| RouteId::KillQuery
| RouteId::Metrics => {
handler
.as_ref()
.expect("Handler is set")
Expand Down
20 changes: 20 additions & 0 deletions ipa-core/src/helpers/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,26 @@ where
fn extra(&self) -> Self::Params;
}

impl RouteParams<RouteId, NoQueryId, NoStep> for RouteId {
type Params = &'static str;

fn resource_identifier(&self) -> RouteId {
*self
}

fn query_id(&self) -> NoQueryId {
NoQueryId
}

fn gate(&self) -> NoStep {
NoStep
}

fn extra(&self) -> Self::Params {
""
}
}

impl RouteParams<NoResourceIdentifier, QueryId, Gate> for (QueryId, Gate) {
type Params = &'static str;

Expand Down
1 change: 1 addition & 0 deletions ipa-core/src/helpers/transport/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub enum RouteId {
QueryStatus,
CompleteQuery,
KillQuery,
Metrics,
}

/// The header/metadata of the incoming request.
Expand Down
10 changes: 10 additions & 0 deletions ipa-core/src/net/http_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ pub mod echo {
pub const AXUM_PATH: &str = "/echo";
}

pub mod metrics {

use serde::{Deserialize, Serialize};

#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Request {}

pub const AXUM_PATH: &str = "/metrics";
}

pub mod query {
use std::fmt::{Display, Formatter};

Expand Down
62 changes: 62 additions & 0 deletions ipa-core/src/net/server/handlers/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use axum::{routing::get, Extension, Router};
use hyper::StatusCode;

use crate::{
helpers::{routing::RouteId, BodyStream},
net::{
http_serde::{self},
Error, MpcHttpTransport,
},
};

/// Takes details from the HTTP request and creates a `[TransportCommand]::CreateQuery` that is sent
/// to the [`HttpTransport`].
async fn handler(transport: Extension<MpcHttpTransport>) -> Result<Vec<u8>, Error> {
match transport
.dispatch(RouteId::Metrics, BodyStream::empty())
.await
{
Ok(resp) => Ok(resp.into_body()),
Err(err) => Err(Error::application(StatusCode::INTERNAL_SERVER_ERROR, err)),

Check warning on line 20 in ipa-core/src/net/server/handlers/metrics.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/net/server/handlers/metrics.rs#L20

Added line #L20 was not covered by tests
}
}

pub fn router(transport: MpcHttpTransport) -> Router {
Router::new()
.route(http_serde::metrics::AXUM_PATH, get(handler))
.layer(Extension(transport))
}

#[cfg(all(test, unit_test))]
mod tests {
use axum::{
body::Body,
http::uri::{self, Authority, Scheme},
};

use super::*;
use crate::{
helpers::{make_owned_handler, routing::Addr, HelperIdentity, HelperResponse},
net::server::handlers::query::test_helpers::assert_success_with,
};

#[tokio::test]
async fn happy_case() {
let handler = make_owned_handler(
move |addr: Addr<HelperIdentity>, _data: BodyStream| async move {
let RouteId::Metrics = addr.route else {
panic!("unexpected call");
};
Ok(HelperResponse::from(Vec::new()))
},
);
let uri = uri::Builder::new()
.scheme(Scheme::HTTP)
.authority(Authority::from_static("localhost"))
.path_and_query(String::from("/metrics"))
.build()
.unwrap();
let req = hyper::Request::get(uri).body(Body::empty()).unwrap();
assert_success_with(req, handler).await;
}
}
15 changes: 9 additions & 6 deletions ipa-core/src/net/server/handlers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod echo;
mod metrics;
mod query;

use axum::Router;
Expand All @@ -9,12 +10,14 @@ use crate::{
};

pub fn mpc_router(transport: MpcHttpTransport) -> Router {
echo::router().nest(
http_serde::query::BASE_AXUM_PATH,
Router::new()
.merge(query::query_router(transport.clone()))
.merge(query::h2h_router(transport.inner_transport)),
)
echo::router()
.merge(metrics::router(transport.clone()))
.nest(
http_serde::query::BASE_AXUM_PATH,
Router::new()
.merge(query::query_router(transport.clone()))
.merge(query::h2h_router(transport.inner_transport)),
)
}

pub fn shard_router(transport: Arc<HttpTransport<Shard>>) -> Router {
Expand Down
8 changes: 7 additions & 1 deletion ipa-core/src/net/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use once_cell::sync::Lazy;
use rustls_pki_types::CertificateDer;

use super::{ConnectionFlavor, HttpTransport, Shard};
#[cfg(all(test, web_test, descriptive_gate))]
use crate::cli::{install_collector, LoggingHandle};
use crate::{
config::{
ClientConfig, HpkeClientConfig, HpkeServerConfig, NetworkConfig, PeerConfig, ServerConfig,
Expand Down Expand Up @@ -263,7 +265,11 @@ impl TestApp {
shard_server.start_on(&IpaRuntime::current(), self.shard_server.socket.take(), ()),
)
.await;
setup.connect(transport, shard_transport)

let metrics_handle = install_collector().unwrap();
let logging_handle = LoggingHandle { metrics_handle };

setup.connect(transport, shard_transport, logging_handle)
}
}

Expand Down
5 changes: 4 additions & 1 deletion ipa-core/src/net/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@
let req = serde_json::from_str(route.extra().borrow())?;
self.clients[client_ix].status_match(req).await
}
evt @ (RouteId::QueryInput | RouteId::ReceiveQuery | RouteId::KillQuery) => {
evt @ (RouteId::QueryInput
| RouteId::ReceiveQuery
| RouteId::KillQuery
| RouteId::Metrics) => {

Check warning on line 121 in ipa-core/src/net/transport.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/net/transport.rs#L118-L121

Added lines #L118 - L121 were not covered by tests
unimplemented!(
"attempting to send client-specific request {evt:?} to another helper"
)
Expand Down
12 changes: 10 additions & 2 deletions ipa-core/src/test_fixture/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use typenum::Unsigned;

use crate::{
app::AppConfig,
cli::{install_collector, LoggingHandle},
ff::Serializable,
helpers::{
query::{QueryConfig, QueryInput},
Expand Down Expand Up @@ -68,8 +69,15 @@ impl Default for TestApp {

let mpc_network = InMemoryMpcNetwork::new(handlers.map(Some));
let shard_network = InMemoryShardNetwork::with_shards(1);
let drivers = zip3(mpc_network.transports().each_ref(), setup)
.map(|(t, s)| s.connect(Clone::clone(t), shard_network.transport(t.identity(), 0)));
let drivers = zip3(mpc_network.transports().each_ref(), setup).map(|(t, s)| {
let metrics_handle = install_collector().unwrap();
let logging_handle = LoggingHandle { metrics_handle };
s.connect(
Clone::clone(t),
shard_network.transport(t.identity(), 0),
logging_handle,
)
});

Self {
drivers,
Expand Down
16 changes: 16 additions & 0 deletions ipa-metrics-prometheus/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "ipa-metrics-prometheus"
version = "0.1.0"
edition = "2021"

[features]
default = []

[dependencies]
ipa-metrics = { path = "../ipa-metrics" }

# Open telemetry crates: opentelemetry-prometheus crate implementation is based on Opentelemetry API and SDK 0.23. (TBC)
opentelemetry = "0.24"
opentelemetry_sdk = { version = "0.24", features = ["metrics", "rt-tokio"] }
opentelemetry-prometheus = { version = "0.17" }
prometheus = "0.13.3"
Loading
Loading