From f464a88ccead90d1bf0f61dfc525cedea8b62064 Mon Sep 17 00:00:00 2001 From: taco-paco Date: Mon, 14 Oct 2024 13:32:39 +0300 Subject: [PATCH 1/4] fix: filter out preflight requests --- api/src/metrics.rs | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/api/src/metrics.rs b/api/src/metrics.rs index 612a3a28..9ad98eec 100644 --- a/api/src/metrics.rs +++ b/api/src/metrics.rs @@ -1,10 +1,12 @@ -use crate::errors::CoreError; use prometheus::core::{AtomicU64, GenericCounter, GenericCounterVec}; use prometheus::{Encoder, IntCounter, IntCounterVec, Opts, Registry, TextEncoder}; use rocket::fairing::{Fairing, Info, Kind}; +use rocket::http::Method; use rocket::{Data, Request, State}; use tracing::instrument; +use crate::errors::CoreError; + const NAMESPACE: &str = "zksync_api"; #[derive(Clone)] @@ -24,10 +26,21 @@ impl Fairing for Metrics { } async fn on_request(&self, req: &mut Request<'_>, _data: &mut Data<'_>) { + match req.method() { + Method::Options => {} + _ => self.update_metrics(req), + } + } +} + +impl Metrics { + fn update_metrics(&self, req: &mut Request<'_>) { if let Some(val) = req.client_ip() { - self.num_distinct_users - .with_label_values(&[val.to_string().as_str()]) - .inc(); + let ip = val.to_string(); + let ip = ip.as_str(); + info!("Plugin launched by: {}", ip); + + self.num_distinct_users.with_label_values(&[ip]).inc(); } match req.uri().path().as_str() { @@ -37,6 +50,7 @@ impl Fairing for Metrics { } } } + pub(crate) fn create_metrics(registry: Registry) -> Result { let opts = Opts::new("num_distinct_users", "Number of distinct users").namespace(NAMESPACE); let num_distinct_users = IntCounterVec::new(opts, &["ip"])?; From 845bcaa16c480e9886562c50e99553553a8774c9 Mon Sep 17 00:00:00 2001 From: taco-paco Date: Mon, 14 Oct 2024 15:25:06 +0300 Subject: [PATCH 2/4] feat: added following metrics: Requests Per Second - all requests compilation error rate - counter to track the failed compilation actions latency - "time taken to compile", for each compilation process, once complete, record the time it took compilation success rate - counter to track the successful compilation actions Same for verify --- api/src/handlers/compile.rs | 32 +++++++++++---- api/src/handlers/mod.rs | 79 +++++++++++++++++++++++++++++-------- api/src/handlers/verify.rs | 34 ++++++++++++---- api/src/main.rs | 2 +- api/src/metrics.rs | 39 ++++++++++++++++-- api/src/worker.rs | 26 ++++++++---- 6 files changed, 168 insertions(+), 44 deletions(-) diff --git a/api/src/handlers/compile.rs b/api/src/handlers/compile.rs index 505bc092..ada636a1 100644 --- a/api/src/handlers/compile.rs +++ b/api/src/handlers/compile.rs @@ -4,6 +4,7 @@ use crate::handlers::types::{ ApiCommand, ApiCommandResult, CompilationRequest, CompileResponse, CompiledFile, }; use crate::handlers::SPAWN_SEMAPHORE; +use crate::metrics::Metrics; use crate::rate_limiter::RateLimited; use crate::utils::cleaner::AutoCleanUp; use crate::utils::hardhat_config::HardhatConfigBuilder; @@ -20,21 +21,26 @@ use std::process::Stdio; use tracing::instrument; use tracing::{error, info}; +pub(crate) const COMPILATION_LABEL_VALUE: &'static str = "compilation"; + #[instrument] #[post("/compile", format = "json", data = "")] pub async fn compile( request_json: Json, _rate_limited: RateLimited, + engine: &State, ) -> Json { info!("/compile/{:?}", request_json.config); - do_compile(request_json.0).await.unwrap_or_else(|e| { - Json(CompileResponse { - file_content: vec![], - message: e.to_string(), - status: "Error".to_string(), + do_compile(request_json.0, &engine.metrics) + .await + .unwrap_or_else(|e| { + Json(CompileResponse { + file_content: vec![], + message: e.to_string(), + status: "Error".to_string(), + }) }) - }) } #[instrument] @@ -62,7 +68,10 @@ pub async fn get_compile_result(process_id: String, engine: &State }) } -pub async fn do_compile(compilation_request: CompilationRequest) -> Result> { +pub async fn do_compile( + compilation_request: CompilationRequest, + metrics: &Metrics, +) -> Result> { let zksolc_version = compilation_request.config.version; // check if the version is supported @@ -161,6 +170,11 @@ pub async fn do_compile(compilation_request: CompilationRequest) -> Result Result HealthCheckResponse { +pub async fn health(engine: &State) -> HealthCheckResponse { info!("/health"); - let result = do_compile(generate_mock_compile_request()).await; + let result = do_compile(generate_mock_compile_request(), &engine.metrics).await; if result.is_ok() { HealthCheckResponse::ok() @@ -42,20 +47,60 @@ pub async fn who_is_this() -> &'static str { "Who are you?" } -pub async fn dispatch_command(command: ApiCommand) -> Result { +pub async fn dispatch_command( + command: ApiCommand, + metrics: &Metrics, +) -> Result { + let start_time = Instant::now(); + match command { ApiCommand::CompilerVersion => match do_compiler_version() { Ok(result) => Ok(ApiCommandResult::CompilerVersion(result)), Err(e) => Err(e), }, - ApiCommand::Compile(request) => match do_compile(request).await { - Ok(compile_response) => Ok(ApiCommandResult::Compile(compile_response.into_inner())), - Err(e) => Err(e), - }, - ApiCommand::Verify(request) => match do_verify(request).await { - Ok(verify_response) => Ok(ApiCommandResult::Verify(verify_response.into_inner())), - Err(e) => Err(e), - }, + ApiCommand::Compile(request) => { + let res = match do_compile(request, &metrics).await { + Ok(compile_response) => { + Ok(ApiCommandResult::Compile(compile_response.into_inner())) + } + Err(e) => { + metrics + .action_failures_total + .with_label_values(&[COMPILATION_LABEL_VALUE]) + .inc(); + Err(e) + } + }; + + let elapsed_time = start_time.elapsed().as_secs_f64(); + metrics + .action_duration_seconds + .with_label_values(&[COMPILATION_LABEL_VALUE]) + .set(elapsed_time); + + res + } + ApiCommand::Verify(request) => { + let res = match do_verify(request, &metrics).await { + Ok(verify_response) => Ok(ApiCommandResult::Verify(verify_response.into_inner())), + Err(e) => { + metrics + .action_failures_total + .with_label_values(&[VERIFICATION_LABEL_VALUE]) + .inc(); + + Err(e) + } + }; + + let elapsed_time = start_time.elapsed().as_secs_f64(); + metrics + .action_duration_seconds + .with_label_values(&[VERIFICATION_LABEL_VALUE]) + .set(elapsed_time); + + res + } ApiCommand::Shutdown => Ok(ApiCommandResult::Shutdown), } } diff --git a/api/src/handlers/verify.rs b/api/src/handlers/verify.rs index 85642f4c..09eaee57 100644 --- a/api/src/handlers/verify.rs +++ b/api/src/handlers/verify.rs @@ -1,7 +1,15 @@ +use rocket::serde::{json, json::Json}; +use rocket::{tokio, State}; +use std::path::Path; +use std::process::Stdio; +use tracing::info; +use tracing::instrument; + use crate::errors::{ApiError, Result}; use crate::handlers::process::{do_process_command, fetch_process_result}; use crate::handlers::types::{ApiCommand, ApiCommandResult, VerificationRequest, VerifyResponse}; use crate::handlers::SPAWN_SEMAPHORE; +use crate::metrics::Metrics; use crate::rate_limiter::RateLimited; use crate::utils::cleaner::AutoCleanUp; use crate::utils::hardhat_config::HardhatConfigBuilder; @@ -10,22 +18,19 @@ use crate::utils::lib::{ ZKSOLC_VERSIONS, }; use crate::worker::WorkerEngine; -use rocket::serde::{json, json::Json}; -use rocket::{tokio, State}; -use std::path::Path; -use std::process::Stdio; -use tracing::info; -use tracing::instrument; + +pub(crate) const VERIFICATION_LABEL_VALUE: &'static str = "compilation"; #[instrument] #[post("/verify", format = "json", data = "")] pub async fn verify( verification_request_json: Json, _rate_limited: RateLimited, + engine: &State, ) -> Json { info!("/verify"); - do_verify(verification_request_json.0) + do_verify(verification_request_json.0, &engine.metrics) .await .unwrap_or_else(|e| { Json(VerifyResponse { @@ -79,7 +84,10 @@ fn extract_verify_args(request: &VerificationRequest) -> Vec { args } -pub async fn do_verify(verification_request: VerificationRequest) -> Result> { +pub async fn do_verify( + verification_request: VerificationRequest, + metrics: &Metrics, +) -> Result> { let zksolc_version = verification_request.config.zksolc_version.clone(); // check if the version is supported @@ -170,12 +178,22 @@ pub async fn do_verify(verification_request: VerificationRequest) -> Result Rocket { }; // Launch the worker processes - let mut engine = WorkerEngine::new(number_of_workers, queue_size); + let mut engine = WorkerEngine::new(number_of_workers, queue_size, metrics.clone()); engine.start(); // Create a new scheduler diff --git a/api/src/metrics.rs b/api/src/metrics.rs index 9ad98eec..9046f9a4 100644 --- a/api/src/metrics.rs +++ b/api/src/metrics.rs @@ -1,5 +1,5 @@ -use prometheus::core::{AtomicU64, GenericCounter, GenericCounterVec}; -use prometheus::{Encoder, IntCounter, IntCounterVec, Opts, Registry, TextEncoder}; +use prometheus::core::{AtomicF64, AtomicU64, GenericCounter, GenericCounterVec, GenericGaugeVec}; +use prometheus::{Encoder, GaugeVec, IntCounter, IntCounterVec, Opts, Registry, TextEncoder}; use rocket::fairing::{Fairing, Info, Kind}; use rocket::http::Method; use rocket::{Data, Request, State}; @@ -9,11 +9,15 @@ use crate::errors::CoreError; const NAMESPACE: &str = "zksync_api"; -#[derive(Clone)] -pub(crate) struct Metrics { +#[derive(Clone, Debug)] +pub struct Metrics { pub num_distinct_users: GenericCounterVec, pub num_plugin_launches: GenericCounter, pub num_of_compilations: GenericCounter, + pub requests_total: GenericCounter, + pub action_failures_total: GenericCounterVec, + pub action_successes_total: GenericCounterVec, + pub action_duration_seconds: GenericGaugeVec, } #[rocket::async_trait] @@ -26,6 +30,8 @@ impl Fairing for Metrics { } async fn on_request(&self, req: &mut Request<'_>, _data: &mut Data<'_>) { + self.requests_total.inc(); + match req.method() { Method::Options => {} _ => self.update_metrics(req), @@ -52,6 +58,8 @@ impl Metrics { } pub(crate) fn create_metrics(registry: Registry) -> Result { + const ACTION_LABEL_NAME: &'static str = "action"; + let opts = Opts::new("num_distinct_users", "Number of distinct users").namespace(NAMESPACE); let num_distinct_users = IntCounterVec::new(opts, &["ip"])?; registry.register(Box::new(num_distinct_users.clone()))?; @@ -64,10 +72,33 @@ pub(crate) fn create_metrics(registry: Registry) -> Result { let num_of_compilations = IntCounter::with_opts(opts)?; registry.register(Box::new(num_of_compilations.clone()))?; + // Follow naming conventions for new metrics https://prometheus.io/docs/practices/naming/ + let opts = Opts::new("requests_total", "Number of requests").namespace(NAMESPACE); + let requests_total = IntCounter::with_opts(opts)?; + registry.register(Box::new(requests_total.clone()))?; + + let opts = Opts::new("action_failures_total", "Number of action failures").namespace(NAMESPACE); + let action_failures_total = IntCounterVec::new(opts, &[ACTION_LABEL_NAME])?; + registry.register(Box::new(action_failures_total.clone()))?; + + let opts = + Opts::new("action_successes_total", "Number of action successes").namespace(NAMESPACE); + let action_successes_total = IntCounterVec::new(opts, &[ACTION_LABEL_NAME])?; + registry.register(Box::new(action_successes_total.clone()))?; + + let opts = + Opts::new("action_duration_seconds", "Duration of action in seconds").namespace(NAMESPACE); + let action_duration_seconds = GaugeVec::new(opts, &[ACTION_LABEL_NAME])?; + registry.register(Box::new(action_duration_seconds.clone()))?; + Ok(Metrics { num_distinct_users, num_plugin_launches, num_of_compilations, + requests_total, + action_failures_total, + action_successes_total, + action_duration_seconds, }) } diff --git a/api/src/worker.rs b/api/src/worker.rs index 4874dffc..217ba2c3 100644 --- a/api/src/worker.rs +++ b/api/src/worker.rs @@ -1,7 +1,3 @@ -use crate::errors::ApiError; -use crate::handlers; -use crate::handlers::types::{ApiCommand, ApiCommandResult}; -use crate::utils::lib::DURATION_TO_PURGE; use crossbeam_queue::ArrayQueue; use crossbeam_skiplist::SkipMap; use rocket::tokio; @@ -14,6 +10,12 @@ use std::sync::Arc; use tracing::info; use uuid::Uuid; +use crate::errors::ApiError; +use crate::handlers; +use crate::handlers::types::{ApiCommand, ApiCommandResult}; +use crate::metrics::Metrics; +use crate::utils::lib::DURATION_TO_PURGE; + #[derive(Debug)] pub enum ProcessState { New, @@ -45,10 +47,11 @@ pub struct WorkerEngine { pub arc_timestamps_to_purge: Arc>, pub is_supervisor_enabled: Arc>, pub supervisor_thread: Arc>>, + pub metrics: Metrics, } impl WorkerEngine { - pub fn new(num_workers: u32, queue_capacity: usize) -> Self { + pub fn new(num_workers: u32, queue_capacity: usize, metrics: Metrics) -> Self { // Create a queue instance let queue: ArrayQueue<(Uuid, ApiCommand)> = ArrayQueue::new(queue_capacity); @@ -76,6 +79,7 @@ impl WorkerEngine { supervisor_thread: Arc::new(None), arc_timestamps_to_purge, is_supervisor_enabled, + metrics, } } @@ -85,8 +89,15 @@ impl WorkerEngine { let arc_clone = self.arc_command_queue.clone(); let arc_states = self.arc_process_states.clone(); let arc_timestamps_to_purge = self.arc_timestamps_to_purge.clone(); + let metrics_clone = self.metrics.clone(); self.worker_threads.push(tokio::spawn(async move { - WorkerEngine::worker(arc_clone, arc_states, arc_timestamps_to_purge).await; + WorkerEngine::worker( + arc_clone, + arc_states, + arc_timestamps_to_purge, + metrics_clone, + ) + .await; })); } @@ -183,6 +194,7 @@ impl WorkerEngine { arc_command_queue: Arc>, arc_process_states: Arc, arc_timestamps_to_purge: Arc>, + metrics: Metrics, ) { info!("Starting worker thread..."); 'worker_loop: loop { @@ -199,7 +211,7 @@ impl WorkerEngine { // update process state arc_process_states.insert(process_id, ProcessState::Running); - match handlers::dispatch_command(command).await { + match handlers::dispatch_command(command, &metrics).await { Ok(result) => { arc_process_states .insert(process_id, ProcessState::Completed(result)); From 4643f6cfa3d73bc33794e334d9563d51072a7b97 Mon Sep 17 00:00:00 2001 From: taco-paco Date: Mon, 14 Oct 2024 17:44:03 +0300 Subject: [PATCH 3/4] fix: machete --- api/Cargo.toml | 2 -- 1 file changed, 2 deletions(-) diff --git a/api/Cargo.toml b/api/Cargo.toml index 2c67230d..668bc05f 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -16,12 +16,10 @@ tracing-subscriber = { version = "^0.3.17", features = [ "smallvec", ] } tracing-appender = "0.2" -tracing-log = "0.1.3" uuid = { version = "1.4.1", features = ["serde", "v4"] } solang-parser = "0.3.2" crossbeam-queue = "0.3.8" crossbeam-skiplist = "0.1.1" -fmt = "0.1.0" yansi = "0.5.1" thiserror = "1.0.49" chrono = "0.4.31" From 005f36e88a68a4143c29fe386ab6369dec48e9e4 Mon Sep 17 00:00:00 2001 From: taco-paco Date: Mon, 14 Oct 2024 20:25:09 +0300 Subject: [PATCH 4/4] fix: clippy --- api/src/handlers/compile.rs | 2 +- api/src/handlers/mod.rs | 4 ++-- api/src/handlers/verify.rs | 2 +- api/src/metrics.rs | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/api/src/handlers/compile.rs b/api/src/handlers/compile.rs index ada636a1..6e33bfa4 100644 --- a/api/src/handlers/compile.rs +++ b/api/src/handlers/compile.rs @@ -21,7 +21,7 @@ use std::process::Stdio; use tracing::instrument; use tracing::{error, info}; -pub(crate) const COMPILATION_LABEL_VALUE: &'static str = "compilation"; +pub(crate) const COMPILATION_LABEL_VALUE: &str = "compilation"; #[instrument] #[post("/compile", format = "json", data = "")] diff --git a/api/src/handlers/mod.rs b/api/src/handlers/mod.rs index 38373611..c6469a55 100644 --- a/api/src/handlers/mod.rs +++ b/api/src/handlers/mod.rs @@ -59,7 +59,7 @@ pub async fn dispatch_command( Err(e) => Err(e), }, ApiCommand::Compile(request) => { - let res = match do_compile(request, &metrics).await { + let res = match do_compile(request, metrics).await { Ok(compile_response) => { Ok(ApiCommandResult::Compile(compile_response.into_inner())) } @@ -81,7 +81,7 @@ pub async fn dispatch_command( res } ApiCommand::Verify(request) => { - let res = match do_verify(request, &metrics).await { + let res = match do_verify(request, metrics).await { Ok(verify_response) => Ok(ApiCommandResult::Verify(verify_response.into_inner())), Err(e) => { metrics diff --git a/api/src/handlers/verify.rs b/api/src/handlers/verify.rs index 09eaee57..6653f073 100644 --- a/api/src/handlers/verify.rs +++ b/api/src/handlers/verify.rs @@ -19,7 +19,7 @@ use crate::utils::lib::{ }; use crate::worker::WorkerEngine; -pub(crate) const VERIFICATION_LABEL_VALUE: &'static str = "compilation"; +pub(crate) const VERIFICATION_LABEL_VALUE: &str = "compilation"; #[instrument] #[post("/verify", format = "json", data = "")] diff --git a/api/src/metrics.rs b/api/src/metrics.rs index 9046f9a4..1b2c240c 100644 --- a/api/src/metrics.rs +++ b/api/src/metrics.rs @@ -58,7 +58,7 @@ impl Metrics { } pub(crate) fn create_metrics(registry: Registry) -> Result { - const ACTION_LABEL_NAME: &'static str = "action"; + const ACTION_LABEL_NAME: &str = "action"; let opts = Opts::new("num_distinct_users", "Number of distinct users").namespace(NAMESPACE); let num_distinct_users = IntCounterVec::new(opts, &["ip"])?;