Skip to content

Commit

Permalink
add metrics for every command (#31)
Browse files Browse the repository at this point in the history
* add metrics for every command

* style

* bump version

* rollback part of change
  • Loading branch information
LeoQuote authored Jun 1, 2023
1 parent 8b92218 commit ad7cae7
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 8 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "redarrow"
version = "0.17.0"
version = "0.18.0"
authors = ["everpcpc <[email protected]>"]
edition = "2018"
license = "BSD-3-Clause"
Expand Down
47 changes: 47 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
pub mod dispatcher;
pub mod webclient;

use prometheus::{TextEncoder, Encoder, Opts, Counter, Registry, Gauge};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
pub struct CommandParams {
pub chunked: Option<u8>,
pub argument: Option<String>,
pub format: Option<String>,
}

#[derive(Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -69,4 +71,49 @@ impl CommandResult {
pub fn to_json(self: &Self) -> String {
serde_json::to_string(self).unwrap()
}

pub fn to_prometheus(self: &Self) -> String {
// Create metrics.
let command_success_opt = Opts::new("redarrow_command_success", "command result success, 1 for success, 0 for failed");
let command_success = Gauge::with_opts(command_success_opt).unwrap();
let command_return_code_opt = Opts::new("redarrow_command_return_code", "command return code");
let command_return_code = Gauge::with_opts(command_return_code_opt).unwrap();
let command_time_cost_opt = Opts::new("redarrow_command_time_cost", "command time cost");
let command_time_cost = Gauge::with_opts(command_time_cost_opt).unwrap();

let r = Registry::new();
r.register(Box::new(command_success.clone())).unwrap();
r.register(Box::new(command_return_code.clone())).unwrap();
r.register(Box::new(command_time_cost.clone())).unwrap();

// return code = 0 is considered success
// other case is considered failed
if self.error.is_some() {
command_success.set(0.0);
} else {
match self.exit_code {
Some(code) => {
if code == 0 {
command_success.set(1.0);
} else {
command_success.set(0.0);
}
command_return_code.set(code as f64);
command_time_cost.set(self.time_cost.unwrap_or(0.0))
}
None => {
command_success.set(0.0);
}
}
}

// Gather the metrics.
let mut buffer = vec![];
let encoder = TextEncoder::new();
let metric_families = r.gather();
encoder.encode(&metric_families, &mut buffer).unwrap();

// Output as string
String::from_utf8(buffer).unwrap()
}
}
31 changes: 25 additions & 6 deletions src/server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::task::{Context, Poll};

use argh::FromArgs;
use futures::Stream;
use prometheus::{Registry, GaugeVec};
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::mpsc;
use warp::http::StatusCode;
Expand Down Expand Up @@ -118,6 +119,18 @@ async fn handlers_command(
None => Vec::new(),
Some(a) => a.split(" ").map(|x| x.to_string()).collect(),
};
let format: String = match &opts.format {
None => "json".to_string(),
Some(f) if f == "json" => "json".to_string(),
Some(f) if f == "prometheus" => "prometheus".to_string(),
Some(_f) => "json".to_string(),
};
if chunked && format != "json" {
return Ok(Box::new(warp::reply::with_status(
format!("0> {}\n", "chunked only support json format"),
StatusCode::BAD_REQUEST,
)));
}
match configs.get(&command) {
None => {
let err = CommandResult::err(format!("Unknown Command: {}", command));
Expand All @@ -137,14 +150,20 @@ async fn handlers_command(
if chunked {
handle_command_chunked(cmd.clone(), arguments)
} else {
let ret = match cmd.execute(arguments) {
Err(e) => warp::reply::with_status(
match cmd.execute(arguments) {
Err(e) => Ok(Box::new(warp::reply::with_status(
warp::reply::json(&CommandResult::err(format!("{}", e))),
StatusCode::INTERNAL_SERVER_ERROR,
),
Ok(r) => warp::reply::with_status(warp::reply::json(&r), StatusCode::OK),
};
Ok(Box::new(ret))
))),
Ok(r) => {
if format == "prometheus" {
Ok(Box::new(warp::reply::with_status(
warp::reply::with_header(r.to_prometheus(), "content-type", "text/plain"), StatusCode::OK)))
} else {
Ok(Box::new(warp::reply::with_status(warp::reply::json(&r), StatusCode::OK)))
}
}
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/webclient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ impl Client {
let params = CommandParams {
chunked: None,
argument: self.get_arguments(),
format: None,
};
let body = reqwest::Client::builder()
.user_agent(self.user_agent.as_str())
Expand All @@ -77,6 +78,7 @@ impl Client {
let params = CommandParams {
chunked: Some(1),
argument: self.get_arguments(),
format: None,
};
let mut res = reqwest::Client::builder()
.user_agent(self.user_agent.as_str())
Expand Down

0 comments on commit ad7cae7

Please sign in to comment.