Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed the lazy static from the prometheus metrics #286

Merged
merged 2 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 19 additions & 5 deletions limitador-server/src/envoy_rls/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use crate::envoy_rls::server::envoy::service::ratelimit::v3::rate_limit_service_
use crate::envoy_rls::server::envoy::service::ratelimit::v3::{
RateLimitRequest, RateLimitResponse,
};
use crate::{Limiter, PROMETHEUS_METRICS};
use crate::prometheus_metrics::PrometheusMetrics;
use crate::Limiter;

include!("envoy_types.rs");

Expand All @@ -32,13 +33,19 @@ pub enum RateLimitHeaders {
pub struct MyRateLimiter {
limiter: Arc<Limiter>,
rate_limit_headers: RateLimitHeaders,
metrics: Arc<PrometheusMetrics>,
}

impl MyRateLimiter {
pub fn new(limiter: Arc<Limiter>, rate_limit_headers: RateLimitHeaders) -> Self {
pub fn new(
limiter: Arc<Limiter>,
rate_limit_headers: RateLimitHeaders,
metrics: Arc<PrometheusMetrics>,
) -> Self {
Self {
limiter,
rate_limit_headers,
metrics,
}
}
}
Expand Down Expand Up @@ -124,11 +131,11 @@ impl RateLimitService for MyRateLimiter {

let mut rate_limited_resp = rate_limited_resp.unwrap();
let resp_code = if rate_limited_resp.limited {
PROMETHEUS_METRICS
self.metrics
.incr_limited_calls(&namespace, rate_limited_resp.limit_name.as_deref());
Code::OverLimit
} else {
PROMETHEUS_METRICS.incr_authorized_calls(&namespace);
self.metrics.incr_authorized_calls(&namespace);
Code::Ok
};

Expand Down Expand Up @@ -237,9 +244,10 @@ pub async fn run_envoy_rls_server(
address: String,
limiter: Arc<Limiter>,
rate_limit_headers: RateLimitHeaders,
metrics: Arc<PrometheusMetrics>,
grpc_reflection_service: bool,
) -> Result<(), transport::Error> {
let rate_limiter = MyRateLimiter::new(limiter, rate_limit_headers);
let rate_limiter = MyRateLimiter::new(limiter, rate_limit_headers, metrics);
let svc = RateLimitServiceServer::new(rate_limiter);

let reflection_service = match grpc_reflection_service {
Expand Down Expand Up @@ -303,6 +311,7 @@ mod tests {
let rate_limiter = MyRateLimiter::new(
Arc::new(Limiter::Blocking(limiter)),
RateLimitHeaders::DraftVersion03,
Arc::new(PrometheusMetrics::default()),
);

let req = RateLimitRequest {
Expand Down Expand Up @@ -361,6 +370,7 @@ mod tests {
let rate_limiter = MyRateLimiter::new(
Arc::new(Limiter::new(Configuration::default()).await.unwrap()),
RateLimitHeaders::DraftVersion03,
Arc::new(PrometheusMetrics::default()),
);

let req = RateLimitRequest {
Expand Down Expand Up @@ -391,6 +401,7 @@ mod tests {
let rate_limiter = MyRateLimiter::new(
Arc::new(Limiter::new(Configuration::default()).await.unwrap()),
RateLimitHeaders::DraftVersion03,
Arc::new(PrometheusMetrics::default()),
);

let req = RateLimitRequest {
Expand Down Expand Up @@ -433,6 +444,7 @@ mod tests {
let rate_limiter = MyRateLimiter::new(
Arc::new(Limiter::Blocking(limiter)),
RateLimitHeaders::DraftVersion03,
Arc::new(PrometheusMetrics::default()),
);

let req = RateLimitRequest {
Expand Down Expand Up @@ -491,6 +503,7 @@ mod tests {
let rate_limiter = MyRateLimiter::new(
Arc::new(Limiter::Blocking(limiter)),
RateLimitHeaders::DraftVersion03,
Arc::new(PrometheusMetrics::default()),
);

let req = RateLimitRequest {
Expand Down Expand Up @@ -556,6 +569,7 @@ mod tests {
let rate_limiter = MyRateLimiter::new(
Arc::new(Limiter::Blocking(limiter)),
RateLimitHeaders::DraftVersion03,
Arc::new(PrometheusMetrics::default()),
);

let req = RateLimitRequest {
Expand Down
73 changes: 51 additions & 22 deletions limitador-server/src/http_api/server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::http_api::request_types::{CheckAndReportInfo, Counter, Limit};
use crate::{Limiter, PROMETHEUS_METRICS};
use crate::prometheus_metrics::PrometheusMetrics;
use crate::Limiter;
use actix_web::{http::StatusCode, ResponseError};
use actix_web::{App, HttpServer};
use paperclip::actix::{
Expand All @@ -13,6 +14,24 @@ use paperclip::actix::{
use std::fmt;
use std::sync::Arc;

struct RateLimitData {
limiter: Arc<Limiter>,
metrics: Arc<PrometheusMetrics>,
}

impl RateLimitData {
fn new(limiter: Arc<Limiter>, metrics: Arc<PrometheusMetrics>) -> Self {
Self { limiter, metrics }
}
fn limiter(&self) -> &Limiter {
self.limiter.as_ref()
}

fn metrics(&self) -> &PrometheusMetrics {
self.metrics.as_ref()
}
}

#[api_v2_errors(429, 500)]
#[derive(Debug)]
enum ErrorResponse {
Expand Down Expand Up @@ -44,20 +63,20 @@ async fn status() -> web::Json<()> {
Json(())
}

#[tracing::instrument(skip(_data))]
#[tracing::instrument(skip(data))]
#[api_v2_operation]
async fn metrics(_data: web::Data<Arc<Limiter>>) -> String {
PROMETHEUS_METRICS.gather_metrics()
async fn metrics(data: web::Data<RateLimitData>) -> String {
data.get_ref().metrics().gather_metrics()
}

#[api_v2_operation]
#[tracing::instrument(skip(data))]
async fn get_limits(
data: web::Data<Arc<Limiter>>,
data: web::Data<RateLimitData>,
namespace: web::Path<String>,
) -> Result<web::Json<Vec<Limit>>, ErrorResponse> {
let namespace = &namespace.into_inner().into();
let limits = match data.get_ref().as_ref() {
let limits = match data.get_ref().limiter() {
Limiter::Blocking(limiter) => limiter.get_limits(namespace),
Limiter::Async(limiter) => limiter.get_limits(namespace),
};
Expand All @@ -68,11 +87,11 @@ async fn get_limits(
#[tracing::instrument(skip(data))]
#[api_v2_operation]
async fn get_counters(
data: web::Data<Arc<Limiter>>,
data: web::Data<RateLimitData>,
namespace: web::Path<String>,
) -> Result<web::Json<Vec<Counter>>, ErrorResponse> {
let namespace = namespace.into_inner().into();
let get_counters_result = match data.get_ref().as_ref() {
let get_counters_result = match data.get_ref().limiter() {
Limiter::Blocking(limiter) => limiter.get_counters(&namespace),
Limiter::Async(limiter) => limiter.get_counters(&namespace).await,
};
Expand All @@ -92,7 +111,7 @@ async fn get_counters(
#[tracing::instrument(skip(state))]
#[api_v2_operation]
async fn check(
state: web::Data<Arc<Limiter>>,
state: web::Data<RateLimitData>,
request: web::Json<CheckAndReportInfo>,
) -> Result<web::Json<()>, ErrorResponse> {
let CheckAndReportInfo {
Expand All @@ -101,7 +120,7 @@ async fn check(
delta,
} = request.into_inner();
let namespace = namespace.into();
let is_rate_limited_result = match state.get_ref().as_ref() {
let is_rate_limited_result = match state.get_ref().limiter() {
Limiter::Blocking(limiter) => limiter.is_rate_limited(&namespace, &values, delta),
Limiter::Async(limiter) => limiter.is_rate_limited(&namespace, &values, delta).await,
};
Expand All @@ -121,7 +140,7 @@ async fn check(
#[tracing::instrument(skip(data))]
#[api_v2_operation]
async fn report(
data: web::Data<Arc<Limiter>>,
data: web::Data<RateLimitData>,
request: web::Json<CheckAndReportInfo>,
) -> Result<web::Json<()>, ErrorResponse> {
let CheckAndReportInfo {
Expand All @@ -130,7 +149,7 @@ async fn report(
delta,
} = request.into_inner();
let namespace = namespace.into();
let update_counters_result = match data.get_ref().as_ref() {
let update_counters_result = match data.get_ref().limiter() {
Limiter::Blocking(limiter) => limiter.update_counters(&namespace, &values, delta),
Limiter::Async(limiter) => limiter.update_counters(&namespace, &values, delta).await,
};
Expand All @@ -144,7 +163,7 @@ async fn report(
#[tracing::instrument(skip(data))]
#[api_v2_operation]
async fn check_and_report(
data: web::Data<Arc<Limiter>>,
data: web::Data<RateLimitData>,
request: web::Json<CheckAndReportInfo>,
) -> Result<web::Json<()>, ErrorResponse> {
let CheckAndReportInfo {
Expand All @@ -153,7 +172,8 @@ async fn check_and_report(
delta,
} = request.into_inner();
let namespace = namespace.into();
let rate_limited_and_update_result = match data.get_ref().as_ref() {
let rate_limit_data = data.get_ref();
let rate_limited_and_update_result = match rate_limit_data.limiter() {
Limiter::Blocking(limiter) => {
limiter.check_rate_limited_and_update(&namespace, &values, delta, false)
}
Expand All @@ -167,20 +187,25 @@ async fn check_and_report(
match rate_limited_and_update_result {
Ok(is_rate_limited) => {
if is_rate_limited.limited {
PROMETHEUS_METRICS
rate_limit_data
.metrics()
.incr_limited_calls(&namespace, is_rate_limited.limit_name.as_deref());
Err(ErrorResponse::TooManyRequests)
} else {
PROMETHEUS_METRICS.incr_authorized_calls(&namespace);
rate_limit_data.metrics().incr_authorized_calls(&namespace);
Ok(Json(()))
}
}
Err(_) => Err(ErrorResponse::InternalServerError),
}
}

pub async fn run_http_server(address: &str, rate_limiter: Arc<Limiter>) -> std::io::Result<()> {
let data = web::Data::new(rate_limiter);
pub async fn run_http_server(
address: &str,
rate_limiter: Arc<Limiter>,
prometheus_metrics: Arc<PrometheusMetrics>,
) -> std::io::Result<()> {
let data = web::Data::new(RateLimitData::new(rate_limiter, prometheus_metrics));

// This uses the paperclip crate to generate an OpenAPI spec.
// Ref: https://paperclip.waffles.space/actix-plugin.html
Expand Down Expand Up @@ -233,7 +258,8 @@ mod tests {
async fn test_metrics() {
let rate_limiter: Arc<Limiter> =
Arc::new(Limiter::new(Configuration::default()).await.unwrap());
let data = web::Data::new(rate_limiter);
let prometheus_metrics: Arc<PrometheusMetrics> = Arc::new(PrometheusMetrics::default());
let data = web::Data::new(RateLimitData::new(rate_limiter, prometheus_metrics));
let app = test::init_service(
App::new()
.app_data(data.clone())
Expand All @@ -257,7 +283,8 @@ mod tests {

let limit = create_test_limit(&limiter, namespace, 10).await;
let rate_limiter: Arc<Limiter> = Arc::new(limiter);
let data = web::Data::new(rate_limiter);
let prometheus_metrics: Arc<PrometheusMetrics> = Arc::new(PrometheusMetrics::default());
let data = web::Data::new(RateLimitData::new(rate_limiter, prometheus_metrics));
let app = test::init_service(
App::new()
.app_data(data.clone())
Expand All @@ -283,7 +310,8 @@ mod tests {
let namespace = "test_namespace";
let _limit = create_test_limit(&limiter, namespace, 1).await;
let rate_limiter: Arc<Limiter> = Arc::new(limiter);
let data = web::Data::new(rate_limiter);
let prometheus_metrics: Arc<PrometheusMetrics> = Arc::new(PrometheusMetrics::default());
let data = web::Data::new(RateLimitData::new(rate_limiter, prometheus_metrics));
let app = test::init_service(
App::new()
.app_data(data.clone())
Expand Down Expand Up @@ -327,7 +355,8 @@ mod tests {
let _limit = create_test_limit(&limiter, namespace, 1).await;

let rate_limiter: Arc<Limiter> = Arc::new(limiter);
let data = web::Data::new(rate_limiter);
let prometheus_metrics: Arc<PrometheusMetrics> = Arc::new(PrometheusMetrics::default());
let data = web::Data::new(RateLimitData::new(rate_limiter, prometheus_metrics));
let app = test::init_service(
App::new()
.app_data(data.clone())
Expand Down
21 changes: 10 additions & 11 deletions limitador-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use crate::http_api::server::run_http_server;
use crate::metrics::MetricsLayer;
use clap::{value_parser, Arg, ArgAction, Command};
use const_format::formatcp;
use lazy_static::lazy_static;
use limitador::counter::Counter;
use limitador::errors::LimitadorError;
use limitador::limit::Limit;
Expand Down Expand Up @@ -76,10 +75,6 @@ pub enum LimitadorServerError {
Internal(LimitadorError),
}

lazy_static! {
pub static ref PROMETHEUS_METRICS: PrometheusMetrics = PrometheusMetrics::default();
}

pub enum Limiter {
Blocking(RateLimiter),
Async(AsyncRateLimiter),
Expand Down Expand Up @@ -262,7 +257,7 @@ fn find_first_negative_limit(limits: &[Limit]) -> Option<usize> {

#[actix_rt::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = {
let (config, prometheus_metrics) = {
let (config, version) = create_config();
println!("{LIMITADOR_HEADER} {version}");
let level = config.log_level.unwrap_or_else(|| {
Expand All @@ -276,9 +271,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::layer()
};

let limit_name_in_metrics = config.limit_name_in_labels;
let prometheus_metrics =
Arc::new(PrometheusMetrics::new_with_options(limit_name_in_metrics));
let metrics = prometheus_metrics.clone();

let metrics_layer = MetricsLayer::new().gather(
"should_rate_limit",
|timings| PROMETHEUS_METRICS.counter_access(Duration::from(timings)),
move |timings| metrics.counter_access(Duration::from(timings)),
vec!["datastore"],
);

Expand Down Expand Up @@ -310,11 +310,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.init();
};

PROMETHEUS_METRICS.set_use_limit_name_in_label(config.limit_name_in_labels);

info!("Version: {}", version);
info!("Using config: {:?}", config);
config
(config, prometheus_metrics)
};

let limit_file = config.limits_file.clone();
Expand Down Expand Up @@ -410,11 +408,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
envoy_rls_address.to_string(),
rate_limiter.clone(),
rate_limit_headers,
prometheus_metrics.clone(),
grpc_reflection_service,
));

info!("HTTP server starting on {}", http_api_address);
run_http_server(&http_api_address, rate_limiter.clone()).await?;
run_http_server(&http_api_address, rate_limiter.clone(), prometheus_metrics).await?;

Ok(())
}
Expand Down
Loading
Loading