Skip to content

Commit

Permalink
Merge branch 'main' into add-resource-limits
Browse files Browse the repository at this point in the history
  • Loading branch information
zephraph committed Dec 12, 2023
2 parents 77f6d29 + 027c9b8 commit 760675d
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 73 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ itertools = "0.12.0"
key-manager = { path = "key-manager" }
kstat-rs = "0.2.3"
lazy_static = "1.4.0"
libc = "0.2.150"
libc = "0.2.151"
linear-map = "1.2.0"
macaddr = { version = "1.0.1", features = ["serde_std"] }
maplit = "1.0.2"
Expand Down
1 change: 1 addition & 0 deletions nexus/inventory/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ gateway-messages.workspace = true
nexus-types.workspace = true
slog.workspace = true
strum.workspace = true
thiserror.workspace = true
uuid.workspace = true
omicron-workspace-hack.workspace = true

Expand Down
73 changes: 51 additions & 22 deletions nexus/inventory/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,40 @@ use nexus_types::inventory::ServiceProcessor;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::sync::Arc;
use thiserror::Error;
use uuid::Uuid;

/// Describes an operational error encountered during the collection process
///
/// Examples include a down MGS instance, failure to parse a response from some
/// other service, etc. We currently don't need to distinguish these
/// programmatically.
#[derive(Debug, Error)]
pub struct InventoryError(#[from] anyhow::Error);

impl std::fmt::Display for InventoryError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:#}", self.0)
}
}

/// Describes a mis-use of the [`CollectionBuilder`] object
///
/// Example: reporting information about a caboose when the caller has not
/// already reported information about the corresopnding baseboard.
///
/// Unlike `InventoryError`s, which can always happen in a real system, these
/// errors are not ever expected. Ideally, all of these problems would be
/// compile errors.
#[derive(Debug, Error)]
pub struct CollectorBug(#[from] anyhow::Error);

impl std::fmt::Display for CollectorBug {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:#}", self.0)
}
}

/// Build an inventory [`Collection`]
///
/// This interface is oriented around the interfaces used by an actual
Expand All @@ -37,7 +69,7 @@ use uuid::Uuid;
#[derive(Debug)]
pub struct CollectionBuilder {
// For field documentation, see the corresponding fields in `Collection`.
errors: Vec<anyhow::Error>,
errors: Vec<InventoryError>,
time_started: DateTime<Utc>,
collector: String,
baseboards: BTreeSet<Arc<BaseboardId>>,
Expand Down Expand Up @@ -76,11 +108,7 @@ impl CollectionBuilder {
pub fn build(self) -> Collection {
Collection {
id: Uuid::new_v4(),
errors: self
.errors
.into_iter()
.map(|e| format!("{:#}", e))
.collect(),
errors: self.errors.into_iter().map(|e| e.to_string()).collect(),
time_started: self.time_started,
time_done: now(),
collector: self.collector,
Expand Down Expand Up @@ -115,12 +143,12 @@ impl CollectionBuilder {
// can stick it into a u16 (which still seems generous). This will
// allow us to store it into an Int32 in the database.
let Ok(sp_slot) = u16::try_from(slot) else {
self.found_error(anyhow!(
self.found_error(InventoryError::from(anyhow!(
"MGS {:?}: SP {:?} slot {}: slot number did not fit into u16",
source,
sp_type,
slot
));
)));
return None;
};

Expand Down Expand Up @@ -177,12 +205,12 @@ impl CollectionBuilder {
gateway_client::types::RotState::CommunicationFailed {
message,
} => {
self.found_error(anyhow!(
self.found_error(InventoryError::from(anyhow!(
"MGS {:?}: reading RoT state for {:?}: {}",
source,
baseboard,
message
));
)));
}
}

Expand Down Expand Up @@ -218,7 +246,7 @@ impl CollectionBuilder {
which: CabooseWhich,
source: &str,
caboose: SpComponentCaboose,
) -> Result<(), anyhow::Error> {
) -> Result<(), CollectorBug> {
// Normalize the caboose contents: i.e., if we've seen this exact
// caboose contents before, use the same record from before. Otherwise,
// make a new one.
Expand All @@ -243,7 +271,7 @@ impl CollectionBuilder {
},
) {
let error = if *previous.caboose == *sw_caboose {
anyhow!("reported multiple times (same value)",)
anyhow!("reported multiple times (same value)")
} else {
anyhow!(
"reported caboose multiple times (previously {:?}, \
Expand All @@ -252,10 +280,10 @@ impl CollectionBuilder {
sw_caboose
)
};
Err(error.context(format!(
Err(CollectorBug::from(error.context(format!(
"baseboard {:?} caboose {:?}",
baseboard, which
)))
))))
} else {
Ok(())
}
Expand Down Expand Up @@ -290,7 +318,7 @@ impl CollectionBuilder {
which: RotPageWhich,
source: &str,
page: RotPage,
) -> Result<(), anyhow::Error> {
) -> Result<(), CollectorBug> {
// Normalize the page contents: i.e., if we've seen this exact page
// before, use the same record from before. Otherwise, make a new one.
let sw_rot_page = Self::normalize_item(&mut self.rot_pages, page);
Expand Down Expand Up @@ -321,10 +349,10 @@ impl CollectionBuilder {
sw_rot_page
)
};
Err(error.context(format!(
Err(CollectorBug::from(error.context(format!(
"baseboard {:?} rot page {:?}",
baseboard, which
)))
))))
} else {
Ok(())
}
Expand All @@ -351,11 +379,12 @@ impl CollectionBuilder {

/// Record a collection error
///
/// This is used for operational errors encountered during the collection
/// process (e.g., a down MGS instance). It's not intended for mis-uses of
/// this API, which are conveyed instead through returned errors (and should
/// probably cause the caller to stop collection altogether).
pub fn found_error(&mut self, error: anyhow::Error) {
/// See [`InventoryError`] for more on what kinds of errors are reported
/// this way. These errors are stored as part of the collection so that
/// future readers can see what problems might make the collection
/// incomplete. By contrast, [`CollectorBug`]s are not reported and stored
/// this way.
pub fn found_error(&mut self, error: InventoryError) {
self.errors.push(error);
}
}
Expand Down
11 changes: 7 additions & 4 deletions nexus/inventory/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
//! Collection of inventory from Omicron components
use crate::builder::CollectionBuilder;
use crate::builder::InventoryError;
use anyhow::Context;
use gateway_client::types::GetCfpaParams;
use gateway_client::types::RotCfpaSlot;
Expand Down Expand Up @@ -93,7 +94,7 @@ impl Collector {
// being able to identify this particular condition.
let sps = match ignition_result {
Err(error) => {
self.in_progress.found_error(error);
self.in_progress.found_error(InventoryError::from(error));
return;
}

Expand Down Expand Up @@ -129,7 +130,7 @@ impl Collector {
});
let sp_state = match result {
Err(error) => {
self.in_progress.found_error(error);
self.in_progress.found_error(InventoryError::from(error));
continue;
}
Ok(response) => response.into_inner(),
Expand Down Expand Up @@ -179,7 +180,8 @@ impl Collector {
});
let caboose = match result {
Err(error) => {
self.in_progress.found_error(error);
self.in_progress
.found_error(InventoryError::from(error));
continue;
}
Ok(response) => response.into_inner(),
Expand Down Expand Up @@ -257,7 +259,8 @@ impl Collector {

let page = match result {
Err(error) => {
self.in_progress.found_error(error);
self.in_progress
.found_error(InventoryError::from(error));
continue;
}
Ok(data_base64) => RotPage { data_base64 },
Expand Down
2 changes: 2 additions & 0 deletions nexus/inventory/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,7 @@ pub mod examples;

// only exposed for test code to construct collections
pub use builder::CollectionBuilder;
pub use builder::CollectorBug;
pub use builder::InventoryError;

pub use collector::Collector;
54 changes: 36 additions & 18 deletions oximeter/collector/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,8 @@ mod tests {
use std::net::Ipv6Addr;
use std::net::SocketAddr;
use std::net::SocketAddrV6;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::time::Duration;
use tokio::sync::oneshot;
use tokio::time::Instant;
Expand All @@ -667,7 +669,8 @@ mod tests {
// timers complete as expected.
const TICK_INTERVAL: Duration = Duration::from_millis(10);

// Total number of collection attempts.
// Total number of collection attempts, and the expected number of
// collections which fail in the "unreachability" test below.
const N_COLLECTIONS: u64 = 5;

// Period these tests wait using `tokio::time::advance()` before checking
Expand All @@ -677,6 +680,12 @@ mod tests {
+ COLLECTION_INTERVAL.as_millis() as u64 / 2,
);

// The number of actual successful test collections.
static N_SUCCESSFUL_COLLECTIONS: AtomicU64 = AtomicU64::new(0);

// The number of actual failed test collections.
static N_FAILED_COLLECTIONS: AtomicU64 = AtomicU64::new(0);

// Test that we count successful collections from a target correctly.
#[tokio::test]
async fn test_self_stat_collection_count() {
Expand All @@ -697,13 +706,11 @@ mod tests {
// will be no actual data here, but the sample counter will increment.
let addr =
SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 0, 0, 0));
async fn handler(
_: Request<Body>,
) -> Result<Response<Body>, Infallible> {
Ok(Response::new(Body::from("[]")))
}
let make_svc = make_service_fn(|_conn| async {
Ok::<_, Infallible>(service_fn(handler))
Ok::<_, Infallible>(service_fn(|_: Request<Body>| async {
N_SUCCESSFUL_COLLECTIONS.fetch_add(1, Ordering::SeqCst);
Ok::<_, Infallible>(Response::new(Body::from("[]")))
}))
});
let server = Server::bind(&addr).serve(make_svc);
let address = server.local_addr();
Expand All @@ -722,7 +729,11 @@ mod tests {
.await
.expect("failed to register dummy producer");

// Step time until there has been exactly `N_COLLECTIONS` collections.
// Step time for a few collections.
//
// Due to scheduling variations, we don't verify the number of
// collections we expect based on time, but we instead check that every
// collection that _has_ occurred bumps the counter.
tokio::time::pause();
let now = Instant::now();
while now.elapsed() < TEST_WAIT_PERIOD {
Expand All @@ -744,7 +755,10 @@ mod tests {
.await
.expect("failed to request statistics from task");
let stats = rx.await.expect("failed to receive statistics from task");
assert_eq!(stats.collections.datum.value(), N_COLLECTIONS);
assert_eq!(
stats.collections.datum.value(),
N_SUCCESSFUL_COLLECTIONS.load(Ordering::SeqCst)
);
assert!(stats.failed_collections.is_empty());
logctx.cleanup_successful();
}
Expand Down Expand Up @@ -837,15 +851,13 @@ mod tests {
// And a dummy server that will always fail with a 500.
let addr =
SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 0, 0, 0));
async fn handler(
_: Request<Body>,
) -> Result<Response<Body>, Infallible> {
let mut res = Response::new(Body::from("im ded"));
*res.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
Ok(res)
}
let make_svc = make_service_fn(|_conn| async {
Ok::<_, Infallible>(service_fn(handler))
Ok::<_, Infallible>(service_fn(|_: Request<Body>| async {
N_FAILED_COLLECTIONS.fetch_add(1, Ordering::SeqCst);
let mut res = Response::new(Body::from("im ded"));
*res.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
Ok::<_, Infallible>(res)
}))
});
let server = Server::bind(&addr).serve(make_svc);
let address = server.local_addr();
Expand All @@ -865,6 +877,12 @@ mod tests {
.expect("failed to register flaky producer");

// Step time until there has been exactly `N_COLLECTIONS` collections.
//
// NOTE: This is technically still a bit racy, in that the server task
// may have made a different number of attempts than we expect. In
// practice, we've not seen this one fail, so basing the number of
// counts on time seems reasonable, especially since we don't have other
// low-cost options for verifying the behavior.
tokio::time::pause();
let now = Instant::now();
while now.elapsed() < TEST_WAIT_PERIOD {
Expand Down Expand Up @@ -894,7 +912,7 @@ mod tests {
.unwrap()
.datum
.value(),
N_COLLECTIONS,
N_FAILED_COLLECTIONS.load(Ordering::SeqCst),
);
assert_eq!(stats.failed_collections.len(), 1);
logctx.cleanup_successful();
Expand Down
Loading

0 comments on commit 760675d

Please sign in to comment.