Skip to content

Commit

Permalink
Move prometheus_metrics and remove dependency from lib
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-cattermole committed Mar 12, 2024
1 parent 63e609e commit cedac32
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 140 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions limitador-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ lazy_static = "1.4.0"
clap = "4.3"
sysinfo = "0.29.7"
openssl = { version = "0.10.57", features = ["vendored"] }
prometheus = "0.13.3"


[build-dependencies]
Expand Down
55 changes: 16 additions & 39 deletions limitador-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use lazy_static::lazy_static;
use limitador::counter::Counter;
use limitador::errors::LimitadorError;
use limitador::limit::Limit;
use limitador::prometheus_metrics::PrometheusMetrics;
use limitador::storage::disk::DiskStorage;
#[cfg(feature = "infinispan")]
use limitador::storage::infinispan::{Consistency, InfinispanStorageBuilder};
Expand All @@ -37,6 +36,7 @@ use notify::{Error, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher
use opentelemetry::KeyValue;
use opentelemetry_otlp::{Protocol, WithExportConfig};
use opentelemetry_sdk::{trace, Resource};
use prometheus_metrics::PrometheusMetrics;
use std::env::VarError;
use std::fs;
use std::path::Path;
Expand All @@ -56,6 +56,7 @@ mod http_api;

mod config;
mod metrics;
pub mod prometheus_metrics;

const LIMITADOR_VERSION: &str = env!("CARGO_PKG_VERSION");
const LIMITADOR_PROFILE: &str = env!("LIMITADOR_PROFILE");
Expand All @@ -73,7 +74,6 @@ pub enum LimitadorServerError {
}

lazy_static! {
// TODO: this should be using config.limit_name_in_labels to initialize
pub static ref PROMETHEUS_METRICS: PrometheusMetrics = PrometheusMetrics::default();
}

Expand All @@ -91,29 +91,19 @@ impl From<LimitadorError> for LimitadorServerError {
impl Limiter {
pub async fn new(config: Configuration) -> Result<Self, LimitadorServerError> {
let rate_limiter = match config.storage {
StorageConfiguration::Redis(cfg) => {
Self::redis_limiter(cfg, config.limit_name_in_labels).await
}
StorageConfiguration::Redis(cfg) => Self::redis_limiter(cfg).await,
#[cfg(feature = "infinispan")]
StorageConfiguration::Infinispan(cfg) => {
Self::infinispan_limiter(cfg, config.limit_name_in_labels).await
}
StorageConfiguration::InMemory(cfg) => {
Self::in_memory_limiter(cfg, config.limit_name_in_labels)
}
StorageConfiguration::Disk(cfg) => Self::disk_limiter(cfg, config.limit_name_in_labels),
StorageConfiguration::Infinispan(cfg) => Self::infinispan_limiter(cfg).await,
StorageConfiguration::InMemory(cfg) => Self::in_memory_limiter(cfg),
StorageConfiguration::Disk(cfg) => Self::disk_limiter(cfg),
};

Ok(rate_limiter)
}

async fn redis_limiter(cfg: RedisStorageConfiguration, limit_name_labels: bool) -> Self {
async fn redis_limiter(cfg: RedisStorageConfiguration) -> Self {
let storage = Self::storage_using_redis(cfg).await;
let mut rate_limiter_builder = AsyncRateLimiterBuilder::new(storage);

if limit_name_labels {
rate_limiter_builder = rate_limiter_builder.with_prometheus_limit_name_labels()
}
let rate_limiter_builder = AsyncRateLimiterBuilder::new(storage);

Self::Async(rate_limiter_builder.build())
}
Expand Down Expand Up @@ -170,10 +160,7 @@ impl Limiter {
}

#[cfg(feature = "infinispan")]
async fn infinispan_limiter(
cfg: InfinispanStorageConfiguration,
limit_name_labels: bool,
) -> Self {
async fn infinispan_limiter(cfg: InfinispanStorageConfiguration) -> Self {
use url::Url;

let parsed_url = Url::parse(&cfg.url).unwrap();
Expand Down Expand Up @@ -209,42 +196,30 @@ impl Limiter {
None => builder.build().await,
};

let mut rate_limiter_builder =
let rate_limiter_builder =
AsyncRateLimiterBuilder::new(AsyncStorage::with_counter_storage(Box::new(storage)));

if limit_name_labels {
rate_limiter_builder = rate_limiter_builder.with_prometheus_limit_name_labels()
}

Self::Async(rate_limiter_builder.build())
}

fn disk_limiter(cfg: DiskStorageConfiguration, limit_name_in_labels: bool) -> Self {
fn disk_limiter(cfg: DiskStorageConfiguration) -> Self {
let storage = match DiskStorage::open(cfg.path.as_str(), cfg.optimization) {
Ok(storage) => storage,
Err(err) => {
eprintln!("Failed to open DB at {}: {err}", cfg.path);
process::exit(1)
}
};
let mut rate_limiter_builder =
let rate_limiter_builder =
RateLimiterBuilder::with_storage(Storage::with_counter_storage(Box::new(storage)));

if limit_name_in_labels {
rate_limiter_builder = rate_limiter_builder.with_prometheus_limit_name_labels()
}

Self::Blocking(rate_limiter_builder.build())
}

fn in_memory_limiter(cfg: InMemoryStorageConfiguration, limit_name_in_labels: bool) -> Self {
let mut rate_limiter_builder =
fn in_memory_limiter(cfg: InMemoryStorageConfiguration) -> Self {
let rate_limiter_builder =
RateLimiterBuilder::new(cfg.cache_size.or_else(guess_cache_size).unwrap());

if limit_name_in_labels {
rate_limiter_builder = rate_limiter_builder.with_prometheus_limit_name_labels()
}

Self::Blocking(rate_limiter_builder.build())
}

Expand Down Expand Up @@ -344,6 +319,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.init();
};

PROMETHEUS_METRICS.set_use_limit_name_in_label(config.limit_name_in_labels);

info!("Version: {}", version);
info!("Using config: {:?}", config);
config
Expand Down
13 changes: 2 additions & 11 deletions limitador-server/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,8 @@ where
parent.extensions_mut().replace(span_state.clone());
}
// IF we are aggregator call consume function
match self.groups.get(name) {
Some(metrics_group) => {
(metrics_group.consumer)(*span_state.group_times.get(name).unwrap())
}
_ => (),
if let Some(metrics_group) = self.groups.get(name) {
(metrics_group.consumer)(*span_state.group_times.get(name).unwrap())
}
}
}
Expand Down Expand Up @@ -298,9 +295,3 @@ mod tests {
assert_eq!(ml.groups.get("group").unwrap().records, vec!["record"]);
}
}

// [X] 1. Use prometheus metrics in main
// [X] 2. Try to use consume method from the prometheus metrics in main
// [X] 3. Invoke the server using the PrometheusMetrics defined in main not the limiter
// [ ] 4. Record the authorized/limited calls
// [ ] 5. Burn the old prometheus instance and move inside the server + old timing stuff
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::limit::Namespace;
use lazy_static::lazy_static;
use limitador::limit::Namespace;
use prometheus::{
Encoder, Histogram, HistogramOpts, IntCounterVec, IntGauge, Opts, Registry, TextEncoder,
};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;

const NAMESPACE_LABEL: &str = "limitador_namespace";
Expand Down Expand Up @@ -36,7 +38,7 @@ pub struct PrometheusMetrics {
authorized_calls: IntCounterVec,
limited_calls: IntCounterVec,
counter_latency: Histogram,
use_limit_name_label: bool,
use_limit_name_label: AtomicBool,
}

impl Default for PrometheusMetrics {
Expand All @@ -58,6 +60,11 @@ impl PrometheusMetrics {
Self::new_with_options(true)
}

pub fn set_use_limit_name_in_label(&self, use_limit_name_in_label: bool) {
self.use_limit_name_label
.store(use_limit_name_in_label, Ordering::SeqCst)
}

pub fn incr_authorized_calls(&self, namespace: &Namespace) {
self.authorized_calls
.with_label_values(&[namespace.as_ref()])
Expand All @@ -70,7 +77,7 @@ impl PrometheusMetrics {
{
let mut labels = vec![namespace.as_ref()];

if self.use_limit_name_label {
if self.use_limit_name_label.load(Ordering::Relaxed) {
// If we have configured the metric to accept 2 labels we need to
// set values for them.
labels.push(limit_name.into().unwrap_or(""));
Expand Down Expand Up @@ -124,7 +131,7 @@ impl PrometheusMetrics {
authorized_calls: authorized_calls_counter,
limited_calls: limited_calls_counter,
counter_latency,
use_limit_name_label,
use_limit_name_label: AtomicBool::new(use_limit_name_label),
}
}

Expand Down Expand Up @@ -297,39 +304,6 @@ mod tests {
)
}

// #[test]
// fn collects_latencies() {
// let metrics = PrometheusMetrics::new();
// assert_eq!(metrics.counter_latency.get_sample_count(), 0);
// {
// let _access = metrics.counter_accesses();
// }
// assert_eq!(metrics.counter_latency.get_sample_count(), 0);
// {
// let mut access = metrics.counter_accesses();
// access.observe(Duration::from_millis(12));
// }
// assert_eq!(metrics.counter_latency.get_sample_count(), 1);
// assert_eq!(
// metrics.counter_latency.get_sample_sum(),
// Duration::from_millis(12).as_secs_f64()
// );
// {
// let mut access = metrics.counter_accesses();
// access.observe(Duration::from_millis(5));
// assert_eq!(metrics.counter_latency.get_sample_count(), 1);
// assert_eq!(
// metrics.counter_latency.get_sample_sum(),
// Duration::from_millis(12).as_secs_f64()
// );
// }
// assert_eq!(metrics.counter_latency.get_sample_count(), 2);
// assert_eq!(
// metrics.counter_latency.get_sample_sum(),
// Duration::from_millis(17).as_secs_f64()
// );
// }

fn formatted_counter_with_namespace_and_limit(
metric_name: &str,
count: i32,
Expand Down
1 change: 0 additions & 1 deletion limitador/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ thiserror = "1"
futures = "0.3"
async-trait = "0.1"
cfg-if = "1"
prometheus = "0.13"
lazy_static = "1"
tracing = "0.1.40"

Expand Down
Loading

0 comments on commit cedac32

Please sign in to comment.