Skip to content

Commit

Permalink
new thing
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkw committed Apr 25, 2024
1 parent 6fa1d7e commit 9ea71c5
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 20 deletions.
4 changes: 0 additions & 4 deletions nexus-config/src/nexus_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,10 +528,6 @@ pub struct InstanceWatcherConfig {
/// period (in seconds) for periodic activations of this background task
#[serde_as(as = "DurationSeconds<u64>")]
pub period_secs: Duration,

/// maximum number of retries to attempt before considering a sled-agent
/// dead.
pub max_retries: NonZeroU32,
}

/// Configuration for a nexus server
Expand Down
67 changes: 56 additions & 11 deletions nexus/src/app/background/instance_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
//! Background task for pulling instance state from sled-agents.
use super::common::BackgroundTask;
use crate::app::instance::InstanceUpdated;
use crate::Error;
use futures::{future::BoxFuture, FutureExt};
use nexus_db_model::{Sled, SledInstance};
Expand Down Expand Up @@ -33,7 +34,6 @@ impl InstanceWatcher {
pub(crate) fn new(
datastore: Arc<DataStore>,
resolver: internal_dns::resolver::Resolver,
max_retries: NonZeroU32,
) -> Self {
Self { datastore, resolver }
}
Expand All @@ -43,7 +43,10 @@ impl InstanceWatcher {
opctx: &OpContext,
client: &SledAgentClient,
instance: SledInstance,
) -> impl Future<Output = Result<(), Error>> + Send + 'static {
) -> impl Future<
Output = Result<crate::app::instance::InstanceUpdated, CheckError>,
> + Send
+ 'static {
let instance_id = instance.instance_id();
let watcher = self.clone();
let opctx = opctx.child(
Expand All @@ -66,15 +69,15 @@ impl InstanceWatcher {
&& rsp.as_ref().error_code.as_deref()
== Some("NO_SUCH_INSTANCE") =>
{
slog::info!(opctx.log, "instance is wayyyyy gone");
slog::debug!(opctx.log, "instance is wayyyyy gone");
todo!();
}
Err(e) => {
slog::warn!(
opctx.log,
"error checking up on instance: {e}"
);
return Err(e.into());
return Err(CheckError::SledAgent);
}
};

Expand All @@ -89,11 +92,17 @@ impl InstanceWatcher {
&state.into(),
)
.await
.map_err(|_| CheckError::Update)?
.ok_or(CheckError::NotFound)
}
}
}

struct CheckResult {}
enum CheckError {
SledAgent,
Update,
NotFound,
}

type ClientError = sled_agent_client::Error<sled_agent_client::types::Error>;

Expand Down Expand Up @@ -158,26 +167,62 @@ impl BackgroundTask for InstanceWatcher {
}

// All requests fired off, let's wait for them to come back.
let mut ok = 0;
let mut total = 0;
let mut instances_updated = 0;
let mut vmms_updated = 0;
let mut no_change = 0;
let mut not_found = 0;
let mut sled_agent_errors = 0;
let mut update_errors = 0;
while let Some(result) = tasks.join_next().await {
total += 1;
match result {
Ok(Ok(())) => {
ok += 1;
Ok(Ok(InstanceUpdated {
vmm_updated,
instance_updated,
})) => {
if instance_updated {
instances_updated += 1;
}

if vmm_updated {
vmms_updated += 1;
}

if !(vmm_updated || instance_updated) {
no_change += 1;
}
}
Ok(Err(CheckError::NotFound)) => not_found += 1,
Ok(Err(CheckError::SledAgent)) => sled_agent_errors += 1,
Ok(Err(CheckError::Update)) => update_errors += 1,
Err(e) => unreachable!(
"a `JoinError` is returned if a spawned task \
panics, or if the task is aborted. we never abort \
tasks on this `JoinSet`, and nexus is compiled with \
`panic=\"abort\"`, so neither of these cases should \
ever occur: {e}",
),
Ok(Err(e)) => {}
}
}

slog::trace!(opctx.log, "all instance checks complete");
slog::info!(opctx.log, "all instance checks complete";
"total_instances" => ?total,
"instances_updated" => ?instances_updated,
"vmms_updated" => ?vmms_updated,
"no_change" => ?no_change,
"not_found" => ?not_found,
"sled_agent_errors" => ?sled_agent_errors,
"update_errors" => ?update_errors,
);
serde_json::json!({
"num_ok": ok,
"total_instances": total,
"instances_updated": instances_updated,
"vmms_updated": vmms_updated,
"no_change": no_change,
"not_found": not_found,
"sled_agent_errors": sled_agent_errors,
"update_errors": update_errors,
})
}
.boxed()
Expand Down
19 changes: 15 additions & 4 deletions nexus/src/app/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1523,7 +1523,8 @@ impl super::Nexus {
instance_id,
new_runtime_state,
)
.await
.await?;
Ok(())
}

/// Returns the requested range of serial console output bytes,
Expand Down Expand Up @@ -1952,6 +1953,16 @@ impl super::Nexus {
}
}

/// Records what aspects of an instance's state were actually changed in a
/// [`notify_instance_updated`] call.
///
/// This is (presently) used for debugging purposes only.
#[derive(Copy, Clone)]
pub(crate) struct InstanceUpdated {
pub instance_updated: bool,
pub vmm_updated: bool,
}

/// Invoked by a sled agent to publish an updated runtime state for an
/// Instance.
pub(crate) async fn notify_instance_updated(
Expand All @@ -1962,7 +1973,7 @@ pub(crate) async fn notify_instance_updated(
log: &slog::Logger,
instance_id: &Uuid,
new_runtime_state: &nexus::SledInstanceState,
) -> Result<(), Error> {
) -> Result<Option<InstanceUpdated>, Error> {
let propolis_id = new_runtime_state.propolis_id;

info!(log, "received new runtime state from sled agent";
Expand Down Expand Up @@ -2103,7 +2114,7 @@ pub(crate) async fn notify_instance_updated(
"propolis_id" => %propolis_id,
"instance_updated" => instance_updated,
"vmm_updated" => vmm_updated);
Ok(())
Ok(Some(InstanceUpdated { instance_updated, vmm_updated }))
}

// The update command should swallow object-not-found errors and
Expand All @@ -2114,7 +2125,7 @@ pub(crate) async fn notify_instance_updated(
an object not found error";
"instance_id" => %instance_id,
"propolis_id" => %propolis_id);
Ok(())
Ok(None)
}

// If the datastore is unavailable, propagate that to the caller.
Expand Down
1 change: 0 additions & 1 deletion smf/nexus/single-sled/config-partial.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ sync_service_zone_nat.period_secs = 30
switch_port_settings_manager.period_secs = 30
region_replacement.period_secs = 30
instance_watcher.period_secs = 30
instance_watcher.max_retries = 5

[default_region_allocation_strategy]
# by default, allocate without requirement for distinct sleds.
Expand Down

0 comments on commit 9ea71c5

Please sign in to comment.