Skip to content

Commit

Permalink
add latency tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
bcantrill committed Jan 18, 2024
1 parent fdfbf30 commit fc4efc9
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 29 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 3 additions & 5 deletions dev-tools/omdb/src/bin/omdb/mgs/dashboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ use ratatui::{
};

use crate::mgs::sensors::{
sensor_data, sensor_metadata, SensorId, SensorInput,
SensorMetadata, SensorValues, SensorsArgs,
sensor_data, sensor_metadata, SensorId, SensorInput, SensorMetadata,
SensorValues, SensorsArgs,
};
use crate::mgs::sp_to_string;
use clap::Args;
Expand Down Expand Up @@ -486,9 +486,7 @@ struct Dashboard {
}

impl Dashboard {
fn new(
metadata: &'static SensorMetadata,
) -> Result<Dashboard> {
fn new(metadata: &'static SensorMetadata) -> Result<Dashboard> {
let mut sps =
metadata.sensors_by_sp.keys().map(|m| *m).collect::<Vec<_>>();
let mut graphs = HashMap::new();
Expand Down
90 changes: 67 additions & 23 deletions dev-tools/omdb/src/bin/omdb/mgs/sensors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use gateway_client::types::SpType;
use multimap::MultiMap;
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::time::SystemTime;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

#[derive(Debug, Args)]
pub(crate) struct SensorsArgs {
Expand Down Expand Up @@ -47,6 +47,10 @@ pub(crate) struct SensorsArgs {
#[clap(long, short)]
pub parseable: bool,

/// show latencies
#[clap(long)]
pub show_latencies: bool,

/// restrict sensors by type of sensor
#[clap(
long,
Expand Down Expand Up @@ -192,8 +196,15 @@ pub(crate) struct SensorMetadata {
pub end_time: Option<u64>,
}

struct SensorSpInfo {
info: Vec<(SpIdentifier, SpInfo)>,
time: u64,
latencies: Option<HashMap<SpIdentifier, Duration>>,
}

pub(crate) struct SensorValues {
pub values: HashMap<SensorId, Option<f32>>,
pub latencies: Option<HashMap<SpIdentifier, Duration>>,
pub time: u64,
}

Expand Down Expand Up @@ -279,8 +290,9 @@ async fn sp_info(
async fn sp_info_mgs(
mgs_client: &gateway_client::Client,
args: &SensorsArgs,
) -> Result<(Vec<(SpIdentifier, SpInfo)>, u64), anyhow::Error> {
) -> Result<SensorSpInfo, anyhow::Error> {
let mut rval = vec![];
let mut latencies = HashMap::new();

//
// First, get all of the SPs that we can see via Ignition
Expand Down Expand Up @@ -330,14 +342,16 @@ async fn sp_info_mgs(
for (sp_id, handle) in handles {
match handle.await.unwrap() {
Ok(info) => {
let l0 = info.timestamps[1].duration_since(info.timestamps[0]);
let l1 = info.timestamps[2].duration_since(info.timestamps[1]);

if args.verbose {
eprintln!(
"mgs: latencies for {sp_id:?}: {:.1?} {:.1?}",
info.timestamps[2].duration_since(info.timestamps[1]),
info.timestamps[1].duration_since(info.timestamps[0])
"mgs: latencies for {sp_id:?}: {l1:.1?} {l0:.1?}",
);
}

latencies.insert(sp_id, l0 + l1);
rval.push((sp_id, info));
}

Expand All @@ -351,17 +365,18 @@ async fn sp_info_mgs(
eprintln!("total discovery time {:?}", now.elapsed());
}

Ok((
rval,
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?.as_secs(),
))
Ok(SensorSpInfo {
info: rval,
time: SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(),
latencies: Some(latencies),
})
}

fn sp_info_csv<R: std::io::Read>(
reader: &mut csv::Reader<R>,
position: &mut csv::Position,
args: &SensorsArgs,
) -> Result<(Vec<(SpIdentifier, SpInfo)>, u64), anyhow::Error> {
) -> Result<SensorSpInfo, anyhow::Error> {
let mut sps = vec![];
let headers = reader.headers()?;

Expand Down Expand Up @@ -522,7 +537,7 @@ fn sp_info_csv<R: std::io::Read>(
}
}

Ok((rval, time.unwrap()))
Ok(SensorSpInfo { info: rval, time: time.unwrap(), latencies: None })
}

pub(crate) async fn sensor_metadata<R: std::io::Read>(
Expand Down Expand Up @@ -550,7 +565,7 @@ pub(crate) async fn sensor_metadata<R: std::io::Read>(
None
};

let (info, time) = match input {
let info = match input {
SensorInput::MgsClient(ref mgs_client) => {
sp_info_mgs(mgs_client, args).await?
}
Expand All @@ -567,8 +582,9 @@ pub(crate) async fn sensor_metadata<R: std::io::Read>(
let mut work_by_sp = HashMap::new();

let mut current = 0;
let time = info.time;

for (sp_id, info) in info {
for (sp_id, info) in info.info {
let mut sp_work = vec![];

for (device, sensors) in info.devices {
Expand Down Expand Up @@ -633,18 +649,20 @@ pub(crate) async fn sensor_metadata<R: std::io::Read>(
},
},
},
SensorValues { values, time },
SensorValues { values, time, latencies: info.latencies },
))
}

async fn sp_read_sensors(
mgs_client: &gateway_client::Client,
id: &SpIdentifier,
metadata: &SensorMetadata,
) -> Result<Vec<(SensorId, Option<f32>)>, anyhow::Error> {
) -> Result<(Vec<(SensorId, Option<f32>)>, Duration), anyhow::Error> {
let work = metadata.work_by_sp.get(id).unwrap();
let mut rval = vec![];

let start = std::time::Instant::now();

for (component, ids) in work.iter() {
for (value, id) in mgs_client
.sp_component_get(id.type_, id.slot, component.device())
Expand All @@ -669,14 +687,15 @@ async fn sp_read_sensors(
}
}

Ok(rval)
Ok((rval, std::time::Instant::now().duration_since(start)))
}

async fn sp_data_mgs(
mgs_client: &gateway_client::Client,
metadata: &'static SensorMetadata,
) -> Result<SensorValues, anyhow::Error> {
let mut values = HashMap::new();
let mut latencies = HashMap::new();
let mut handles = vec![];

for sp_id in metadata.sensors_by_sp.keys() {
Expand All @@ -687,11 +706,13 @@ async fn sp_data_mgs(
sp_read_sensors(&mgs_client, &id, metadata).await
});

handles.push(handle);
handles.push((id, handle));
}

for handle in handles {
let rval = handle.await.unwrap()?;
for (id, handle) in handles {
let (rval, latency) = handle.await.unwrap()?;

latencies.insert(id, latency);

for (id, value) in rval {
values.insert(id, value);
Expand All @@ -700,9 +721,8 @@ async fn sp_data_mgs(

Ok(SensorValues {
values,
time: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)?
.as_secs(),
latencies: Some(latencies),
time: SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(),
})
}

Expand Down Expand Up @@ -770,7 +790,7 @@ fn sp_data_csv<R: std::io::Read + std::io::Seek>(
}
}

Ok(SensorValues { values, time: time.unwrap() })
Ok(SensorValues { values, latencies: None, time: time.unwrap() })
}

pub(crate) async fn sensor_data<R: std::io::Read + std::io::Seek>(
Expand Down Expand Up @@ -855,6 +875,14 @@ pub(crate) async fn cmd_mgs_sensors(
}
};

let print_latency = |now: u64| {
if !args.parseable {
print!("{:20} ", "LATENCY");
} else {
print!("{now},{},{}", "LATENCY", "latency");
}
};

let mut wakeup =
tokio::time::Instant::now() + tokio::time::Duration::from_millis(1000);

Expand Down Expand Up @@ -886,6 +914,22 @@ pub(crate) async fn cmd_mgs_sensors(
println!();
}

if args.show_latencies {
if let Some(latencies) = values.latencies {
print_latency(values.time);

for sp in &sps {
print_value(if let Some(latency) = latencies.get(sp) {
format!("{}ms", latency.as_millis())
} else {
"?".to_string()
});
}
}

println!();
}

if !args.sleep {
if args.input.is_none() {
break;
Expand Down

0 comments on commit fc4efc9

Please sign in to comment.