Skip to content

Commit

Permalink
Refactor metrics (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
kostekIV authored Feb 26, 2024
1 parent d520dd5 commit c983ce9
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 41 deletions.
15 changes: 7 additions & 8 deletions src/extensions/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ use jsonrpsee::Methods;
use prometheus_endpoint::Registry;
use serde::ser::StdError;
use serde::Deserialize;
use std::collections::HashMap;

use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::{future::Future, net::SocketAddr};
use tower::layer::layer_fn;
use tower::ServiceBuilder;
Expand All @@ -25,7 +25,7 @@ use crate::extensions::rate_limit::{MethodWeights, RateLimitBuilder, XFF};
mod prometheus;
mod proxy_get_request;
mod ready_get_request;
use crate::extensions::server::prometheus::{MetricPair, PrometheusService, WsMetrics};
use crate::extensions::server::prometheus::{PrometheusService, Protocol, WsMetrics};
use proxy_get_request::{ProxyGetRequestLayer, ProxyGetRequestMethod};
use ready_get_request::ReadyProxyLayer;

Expand Down Expand Up @@ -114,7 +114,6 @@ impl SubwayServerBuilder {
let (stop_handle, server_handle) = stop_channel();
let handle = stop_handle.clone();
let rpc_module = rpc_module_builder().await?;
let metrics: Arc<Mutex<HashMap<String, MetricPair>>> = Default::default();
let ws_metrics = WsMetrics::new(prometheus_registry.as_ref());

// make_service handle each connection
Expand Down Expand Up @@ -143,12 +142,14 @@ impl SubwayServerBuilder {
let rate_limit_builder = rate_limit_builder.clone();
let rpc_method_weights = rpc_method_weights.clone();
let prometheus_registry = prometheus_registry.clone();
let metrics = metrics.clone();
let ws_metrics = ws_metrics.clone();

async move {
// service_fn handle each request
Ok::<_, Box<dyn StdError + Send + Sync>>(service_fn(move |req| {
let is_websocket = ws::is_upgrade_request(&req);
let protocol = if is_websocket { Protocol::Ws } else { Protocol::Http };

let mut socket_ip = socket_ip.clone();
let methods: Methods = rpc_module.clone().into();
let stop_handle = stop_handle.clone();
Expand All @@ -173,7 +174,7 @@ impl SubwayServerBuilder {
.option_layer(
prometheus_registry
.as_ref()
.map(|r| layer_fn(|s| PrometheusService::new(s, r, metrics.clone()))),
.map(|r| layer_fn(|s| PrometheusService::new(s, r, protocol))),
);

let service_builder = ServerBuilder::default()
Expand All @@ -185,8 +186,6 @@ impl SubwayServerBuilder {

let mut service = service_builder.build(methods, stop_handle);

let is_websocket = ws::is_upgrade_request(&req);

if is_websocket {
let on_ws_close = service.on_session_closed();
ws_metrics.ws_open();
Expand Down
84 changes: 51 additions & 33 deletions src/extensions/server/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ use futures::{future::BoxFuture, FutureExt};
use jsonrpsee::server::middleware::rpc::RpcServiceT;
use jsonrpsee::types::Request;
use jsonrpsee::MethodResponse;
use prometheus_endpoint::{register, Counter, Histogram, HistogramOpts, Registry, U64};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use prometheus_endpoint::{register, Counter, CounterVec, HistogramOpts, HistogramVec, Opts, Registry, U64};

pub type MetricPair = (Counter<U64>, Histogram);
use std::fmt::Display;

#[derive(Clone)]
pub enum WsMetrics {
Expand Down Expand Up @@ -62,43 +60,55 @@ impl InnerMetrics {
}
}

#[derive(Clone, Copy)]
pub enum Protocol {
Ws,
Http,
}

impl Display for Protocol {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let str = match self {
Self::Ws => "ws".to_string(),
Self::Http => "http".to_string(),
};
write!(f, "{}", str)
}
}

#[derive(Clone)]
pub struct PrometheusService<S> {
inner: S,
registry: Registry,
call_metrics: Arc<Mutex<HashMap<String, MetricPair>>>,
protocol: Protocol,
call_times: HistogramVec,
calls_started: CounterVec<U64>,
calls_finished: CounterVec<U64>,
}

impl<S> PrometheusService<S> {
pub fn new(inner: S, registry: &Registry, call_metrics: Arc<Mutex<HashMap<String, MetricPair>>>) -> Self {
pub fn new(inner: S, registry: &Registry, protocol: Protocol) -> Self {
let call_times =
HistogramVec::new(HistogramOpts::new("rpc_calls_time", "No help"), &["protocol", "method"]).unwrap();
let calls_started_counter =
CounterVec::new(Opts::new("rpc_calls_started", "No help"), &["protocol", "method"]).unwrap();
let calls_finished_counter = CounterVec::new(
Opts::new("rpc_calls_finished", "No help"),
&["protocol", "method", "is_error"],
)
.unwrap();

let call_times = register(call_times, registry).unwrap();
let calls_started = register(calls_started_counter, registry).unwrap();
let calls_finished = register(calls_finished_counter, registry).unwrap();

Self {
inner,
call_metrics,
registry: registry.clone(),
protocol,
calls_started,
calls_finished,
call_times,
}
}

fn register_metrics_for(&self, method: String) -> MetricPair {
let counter_name = format!("{}_count", method);
let histogram_name = format!("{}_histogram", method);

let counter = Counter::new(counter_name, "No help").unwrap();
let histogram = Histogram::with_opts(HistogramOpts::new(histogram_name, "No help")).unwrap();

let counter = register(counter, &self.registry).unwrap();
let histogram = register(histogram, &self.registry).unwrap();

(counter, histogram)
}

fn metrics_for(&self, method: String) -> MetricPair {
let mut metrics = self.call_metrics.lock().unwrap();
let (counter, histogram) = metrics
.entry(method.clone())
.or_insert_with(|| self.register_metrics_for(method));

(counter.clone(), histogram.clone())
}
}

impl<'a, S> RpcServiceT<'a> for PrometheusService<S>
Expand All @@ -108,15 +118,23 @@ where
type Future = BoxFuture<'a, MethodResponse>;

fn call(&self, req: Request<'a>) -> Self::Future {
let (counter, histogram) = self.metrics_for(req.method.to_string());
let protocol = self.protocol.to_string();
let method = req.method.to_string();

let histogram = self.call_times.with_label_values(&[&protocol, &method]);
let started = self.calls_started.with_label_values(&[&protocol, &method]);
let finished = self.calls_finished.clone();

let service = self.inner.clone();
async move {
counter.inc();
started.inc();

let timer = histogram.start_timer();
let res = service.call(req).await;
timer.stop_and_record();
finished
.with_label_values(&[&protocol, &method, &res.is_error().to_string()])
.inc();

res
}
Expand Down

0 comments on commit c983ce9

Please sign in to comment.