diff --git a/Cargo.lock b/Cargo.lock index 852dc29ba2..11dd1839a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5402,6 +5402,7 @@ dependencies = [ "itertools 0.12.1", "macaddr", "mg-admin-client", + "nexus-client", "nexus-config", "nexus-db-model", "nexus-db-queries", diff --git a/dev-tools/omdb/src/bin/omdb/nexus.rs b/dev-tools/omdb/src/bin/omdb/nexus.rs index c81d36c73c..60925d0cb4 100644 --- a/dev-tools/omdb/src/bin/omdb/nexus.rs +++ b/dev-tools/omdb/src/bin/omdb/nexus.rs @@ -889,8 +889,95 @@ fn print_task_details(bgtask: &BackgroundTask, details: &serde_json::Value) { ); } }; + } else if name == "instance_watcher" { + #[derive(Deserialize)] + struct TaskSuccess { + /// total number of instances checked + total_instances: usize, + + /// number of stale instance metrics that were deleted + pruned_instances: usize, + + /// instance states from completed checks. + /// + /// this is a mapping of stringified instance states to the number + /// of instances in that state. these stringified states correspond + /// to the `state` field recorded by the instance watcher's + /// `virtual_machine:check` timeseries with the `healthy` field set + /// to `true`. any changes to the instance state type which cause it + /// to print differently will be counted as a distinct state. + instance_states: BTreeMap, + + /// instance check failures. + /// + /// this is a mapping of stringified instance check failure reasons + /// to the number of instances with checks that failed for that + /// reason. these stringified failure reasons correspond to the + /// `state` field recorded by the instance watcher's + /// `virtual_machine:check` timeseries with the `healthy` field set + /// to `false`. any changes to the instance state type which cause + /// it to print differently will be counted as a distinct failure + /// reason. + failed_checks: BTreeMap, + + /// instance checks that could not be completed successfully. + /// + /// this is a mapping of stringified instance check errors + /// to the number of instance checks that were not completed due to + /// that error. these stringified errors correspond to the `reason ` + /// field recorded by the instance watcher's + /// `virtual_machine:incomplete_check` timeseries. any changes to + /// the check error type which cause it to print + /// differently will be counted as a distinct check error. + incomplete_checks: BTreeMap, + } + + match serde_json::from_value::(details.clone()) { + Err(error) => eprintln!( + "warning: failed to interpret task details: {:?}: {:?}", + error, details + ), + Ok(TaskSuccess { + total_instances, + pruned_instances, + instance_states, + failed_checks, + incomplete_checks, + }) => { + let total_successes: usize = instance_states.values().sum(); + let total_failures: usize = failed_checks.values().sum(); + let total_incomplete: usize = incomplete_checks.values().sum(); + println!(" total instances checked: {total_instances}",); + println!( + " checks completed: {}", + total_successes + total_failures + ); + println!(" successful checks: {total_successes}",); + for (state, count) in &instance_states { + println!(" -> {count} instances {state}") + } + + println!(" failed checks: {total_failures}"); + for (failure, count) in &failed_checks { + println!(" -> {count} {failure}") + } + println!( + " checks that could not be completed: {total_incomplete}", + ); + for (error, count) in &incomplete_checks { + println!(" -> {count} {error} errors") + } + println!( + " stale instance metrics pruned: {pruned_instances}" + ); + } + }; } else if name == "service_firewall_rule_propagation" { match serde_json::from_value::(details.clone()) { + Err(error) => eprintln!( + "warning: failed to interpret task details: {:?}: {:?}", + error, details + ), Ok(serde_json::Value::Object(map)) => { if !map.is_empty() { eprintln!( @@ -902,11 +989,7 @@ fn print_task_details(bgtask: &BackgroundTask, details: &serde_json::Value) { Ok(val) => { eprintln!(" unexpected return value from task: {:?}", val) } - Err(error) => eprintln!( - "warning: failed to interpret task details: {:?}: {:?}", - error, details - ), - } + }; } else { println!( "warning: unknown background task: {:?} \ diff --git a/dev-tools/omdb/tests/env.out b/dev-tools/omdb/tests/env.out index e69626a74b..a224155bf9 100644 --- a/dev-tools/omdb/tests/env.out +++ b/dev-tools/omdb/tests/env.out @@ -72,6 +72,10 @@ task: "external_endpoints" on each one +task: "instance_watcher" + periodically checks instance states + + task: "inventory_collection" collects hardware and software inventory data from the whole system @@ -179,6 +183,10 @@ task: "external_endpoints" on each one +task: "instance_watcher" + periodically checks instance states + + task: "inventory_collection" collects hardware and software inventory data from the whole system @@ -273,6 +281,10 @@ task: "external_endpoints" on each one +task: "instance_watcher" + periodically checks instance states + + task: "inventory_collection" collects hardware and software inventory data from the whole system diff --git a/dev-tools/omdb/tests/successes.out b/dev-tools/omdb/tests/successes.out index c178102784..d7711610bd 100644 --- a/dev-tools/omdb/tests/successes.out +++ b/dev-tools/omdb/tests/successes.out @@ -249,6 +249,10 @@ task: "external_endpoints" on each one +task: "instance_watcher" + periodically checks instance states + + task: "inventory_collection" collects hardware and software inventory data from the whole system @@ -396,6 +400,18 @@ task: "external_endpoints" TLS certificates: 0 +task: "instance_watcher" + configured period: every 30s + currently executing: no + last completed activation: , triggered by an explicit signal + started at (s ago) and ran for ms + total instances checked: 0 + checks completed: 0 + successful checks: 0 + failed checks: 0 + checks that could not be completed: 0 + stale instance metrics pruned: 0 + task: "inventory_collection" configured period: every 10m currently executing: no diff --git a/nexus-config/src/nexus_config.rs b/nexus-config/src/nexus_config.rs index 32180a77d8..01f642a36b 100644 --- a/nexus-config/src/nexus_config.rs +++ b/nexus-config/src/nexus_config.rs @@ -375,6 +375,8 @@ pub struct BackgroundTaskConfig { pub switch_port_settings_manager: SwitchPortSettingsManagerConfig, /// configuration for region replacement task pub region_replacement: RegionReplacementConfig, + /// configuration for instance watcher task + pub instance_watcher: InstanceWatcherConfig, /// configuration for service VPC firewall propagation task pub service_firewall_propagation: ServiceFirewallPropagationConfig, } @@ -521,6 +523,14 @@ pub struct RegionReplacementConfig { pub period_secs: Duration, } +#[serde_as] +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct InstanceWatcherConfig { + /// period (in seconds) for periodic activations of this background task + #[serde_as(as = "DurationSeconds")] + pub period_secs: Duration, +} + #[serde_as] #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] pub struct ServiceFirewallPropagationConfig { @@ -765,6 +775,7 @@ mod test { sync_service_zone_nat.period_secs = 30 switch_port_settings_manager.period_secs = 30 region_replacement.period_secs = 30 + instance_watcher.period_secs = 30 service_firewall_propagation.period_secs = 300 [default_region_allocation_strategy] type = "random" @@ -894,6 +905,9 @@ mod test { region_replacement: RegionReplacementConfig { period_secs: Duration::from_secs(30), }, + instance_watcher: InstanceWatcherConfig { + period_secs: Duration::from_secs(30), + }, service_firewall_propagation: ServiceFirewallPropagationConfig { period_secs: Duration::from_secs(300), @@ -964,6 +978,7 @@ mod test { sync_service_zone_nat.period_secs = 30 switch_port_settings_manager.period_secs = 30 region_replacement.period_secs = 30 + instance_watcher.period_secs = 30 service_firewall_propagation.period_secs = 300 [default_region_allocation_strategy] type = "random" diff --git a/nexus/Cargo.toml b/nexus/Cargo.toml index 2c1a3b21fe..9793c32bf8 100644 --- a/nexus/Cargo.toml +++ b/nexus/Cargo.toml @@ -42,6 +42,7 @@ itertools.workspace = true macaddr.workspace = true # Not under "dev-dependencies"; these also need to be implemented for # integration tests. +nexus-client.workspace = true nexus-config.workspace = true nexus-networking.workspace = true nexus-test-interface.workspace = true diff --git a/nexus/db-model/src/schema.rs b/nexus/db-model/src/schema.rs index 4117079880..979395d907 100644 --- a/nexus/db-model/src/schema.rs +++ b/nexus/db-model/src/schema.rs @@ -1770,3 +1770,5 @@ allow_tables_to_appear_in_same_query!(volume, virtual_provisioning_resource); allow_tables_to_appear_in_same_query!(ssh_key, instance_ssh_key, instance); joinable!(instance_ssh_key -> ssh_key (ssh_key_id)); joinable!(instance_ssh_key -> instance (instance_id)); + +allow_tables_to_appear_in_same_query!(sled, sled_instance); diff --git a/nexus/db-model/src/schema_versions.rs b/nexus/db-model/src/schema_versions.rs index c4510c02be..a86d030e48 100644 --- a/nexus/db-model/src/schema_versions.rs +++ b/nexus/db-model/src/schema_versions.rs @@ -17,7 +17,7 @@ use std::collections::BTreeMap; /// /// This must be updated when you change the database schema. Refer to /// schema/crdb/README.adoc in the root of this repository for details. -pub const SCHEMA_VERSION: SemverVersion = SemverVersion::new(59, 0, 0); +pub const SCHEMA_VERSION: SemverVersion = SemverVersion::new(60, 0, 0); /// List of all past database schema versions, in *reverse* order /// @@ -29,6 +29,7 @@ static KNOWN_VERSIONS: Lazy> = Lazy::new(|| { // | leaving the first copy as an example for the next person. // v // KnownVersion::new(next_int, "unique-dirname-with-the-sql-files"), + KnownVersion::new(60, "add-lookup-vmm-by-sled-id-index"), KnownVersion::new(59, "enforce-first-as-default"), KnownVersion::new(58, "insert-default-allowlist"), KnownVersion::new(57, "add-allowed-source-ips"), diff --git a/nexus/db-model/src/sled_instance.rs b/nexus/db-model/src/sled_instance.rs index e3a901264d..bbc92ddf18 100644 --- a/nexus/db-model/src/sled_instance.rs +++ b/nexus/db-model/src/sled_instance.rs @@ -41,3 +41,9 @@ impl From for views::SledInstance { } } } + +impl SledInstance { + pub fn instance_id(&self) -> Uuid { + self.identity.id + } +} diff --git a/nexus/db-queries/src/db/datastore/instance.rs b/nexus/db-queries/src/db/datastore/instance.rs index 731f7b4c06..ce40e20501 100644 --- a/nexus/db-queries/src/db/datastore/instance.rs +++ b/nexus/db-queries/src/db/datastore/instance.rs @@ -22,6 +22,7 @@ use crate::db::model::Instance; use crate::db::model::InstanceRuntimeState; use crate::db::model::Name; use crate::db::model::Project; +use crate::db::model::Sled; use crate::db::model::Vmm; use crate::db::pagination::paginated; use crate::db::update_and_check::UpdateAndCheck; @@ -29,11 +30,14 @@ use crate::db::update_and_check::UpdateStatus; use async_bb8_diesel::AsyncRunQueryDsl; use chrono::Utc; use diesel::prelude::*; +use nexus_db_model::ApplySledFilterExt; use nexus_db_model::Disk; use nexus_db_model::VmmRuntimeState; +use nexus_types::deployment::SledFilter; use omicron_common::api; use omicron_common::api::external::http_pagination::PaginatedBy; use omicron_common::api::external::CreateResult; +use omicron_common::api::external::DataPageParams; use omicron_common::api::external::DeleteResult; use omicron_common::api::external::Error; use omicron_common::api::external::ListResultVec; @@ -385,6 +389,58 @@ impl DataStore { Ok((instance_updated, vmm_updated)) } + /// Lists all instances on in-service sleds with active Propolis VMM + /// processes, returning the instance along with the VMM on which it's + /// running, the sled on which the VMM is running, and the project that owns + /// the instance. + /// + /// The query performed by this function is paginated by the sled's UUID. + pub async fn instance_and_vmm_list_by_sled_agent( + &self, + opctx: &OpContext, + pagparams: &DataPageParams<'_, Uuid>, + ) -> ListResultVec<(Sled, Instance, Vmm, Project)> { + use crate::db::schema::{ + instance::dsl as instance_dsl, project::dsl as project_dsl, + sled::dsl as sled_dsl, vmm::dsl as vmm_dsl, + }; + opctx.authorize(authz::Action::Read, &authz::FLEET).await?; + let conn = self.pool_connection_authorized(opctx).await?; + + let result = paginated(sled_dsl::sled, sled_dsl::id, pagparams) + .filter(sled_dsl::time_deleted.is_null()) + .sled_filter(SledFilter::InService) + .inner_join( + vmm_dsl::vmm + .on(vmm_dsl::sled_id + .eq(sled_dsl::id) + .and(vmm_dsl::time_deleted.is_null())) + .inner_join( + instance_dsl::instance + .on(instance_dsl::id + .eq(vmm_dsl::instance_id) + .and(instance_dsl::time_deleted.is_null())) + .inner_join( + project_dsl::project.on(project_dsl::id + .eq(instance_dsl::project_id) + .and(project_dsl::time_deleted.is_null())), + ), + ), + ) + .sled_filter(SledFilter::InService) + .select(( + Sled::as_select(), + Instance::as_select(), + Vmm::as_select(), + Project::as_select(), + )) + .load_async::<(Sled, Instance, Vmm, Project)>(&*conn) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; + + Ok(result) + } + pub async fn project_delete_instance( &self, opctx: &OpContext, diff --git a/nexus/examples/config.toml b/nexus/examples/config.toml index eac6c6a489..d3faf2459c 100644 --- a/nexus/examples/config.toml +++ b/nexus/examples/config.toml @@ -113,6 +113,8 @@ blueprints.period_secs_execute = 60 sync_service_zone_nat.period_secs = 30 switch_port_settings_manager.period_secs = 30 region_replacement.period_secs = 30 +# How frequently to query the status of active instances. +instance_watcher.period_secs = 30 service_firewall_propagation.period_secs = 300 [default_region_allocation_strategy] diff --git a/nexus/src/app/background/init.rs b/nexus/src/app/background/init.rs index 8ceff3e2b8..9d9a65c23b 100644 --- a/nexus/src/app/background/init.rs +++ b/nexus/src/app/background/init.rs @@ -12,6 +12,7 @@ use super::dns_config; use super::dns_propagation; use super::dns_servers; use super::external_endpoints; +use super::instance_watcher; use super::inventory_collection; use super::metrics_producer_gc; use super::nat_cleanup; @@ -28,6 +29,7 @@ use nexus_config::DnsTasksConfig; use nexus_db_model::DnsGroup; use nexus_db_queries::context::OpContext; use nexus_db_queries::db::DataStore; +use oximeter::types::ProducerRegistry; use std::collections::BTreeMap; use std::sync::Arc; use tokio::sync::mpsc::Sender; @@ -92,6 +94,9 @@ pub struct BackgroundTasks { /// begins the process pub task_region_replacement: common::TaskHandle, + /// task handle for the task that polls sled agents for instance states. + pub task_instance_watcher: common::TaskHandle, + /// task handle for propagation of VPC firewall rules for Omicron services /// with external network connectivity, pub task_service_firewall_propagation: common::TaskHandle, @@ -108,6 +113,7 @@ impl BackgroundTasks { nexus_id: Uuid, resolver: internal_dns::resolver::Resolver, saga_request: Sender, + producer_registry: &ProducerRegistry, ) -> BackgroundTasks { let mut driver = common::Driver::new(); @@ -346,6 +352,22 @@ impl BackgroundTasks { task }; + let task_instance_watcher = { + let watcher = instance_watcher::InstanceWatcher::new( + datastore.clone(), + resolver.clone(), + producer_registry, + instance_watcher::WatcherIdentity { nexus_id, rack_id }, + ); + driver.register( + "instance_watcher".to_string(), + "periodically checks instance states".to_string(), + config.instance_watcher.period_secs, + Box::new(watcher), + opctx.child(BTreeMap::new()), + vec![], + ) + }; // Background task: service firewall rule propagation let task_service_firewall_propagation = driver.register( String::from("service_firewall_rule_propagation"), @@ -355,7 +377,7 @@ impl BackgroundTasks { ), config.service_firewall_propagation.period_secs, Box::new(service_firewall_rules::ServiceRulePropagator::new( - datastore.clone(), + datastore, )), opctx.child(BTreeMap::new()), vec![], @@ -380,6 +402,7 @@ impl BackgroundTasks { task_service_zone_nat_tracker, task_switch_port_settings_manager, task_region_replacement, + task_instance_watcher, task_service_firewall_propagation, } } diff --git a/nexus/src/app/background/instance_watcher.rs b/nexus/src/app/background/instance_watcher.rs new file mode 100644 index 0000000000..4cdca3c4b7 --- /dev/null +++ b/nexus/src/app/background/instance_watcher.rs @@ -0,0 +1,617 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Background task for pulling instance state from sled-agents. + +use super::common::BackgroundTask; +use futures::{future::BoxFuture, FutureExt}; +use http::StatusCode; +use nexus_db_model::Instance; +use nexus_db_model::Project; +use nexus_db_model::Sled; +use nexus_db_model::Vmm; +use nexus_db_queries::context::OpContext; +use nexus_db_queries::db::pagination::Paginator; +use nexus_db_queries::db::DataStore; +use nexus_types::identity::Asset; +use nexus_types::identity::Resource; +use omicron_common::api::external::InstanceState; +use omicron_common::api::internal::nexus::SledInstanceState; +use oximeter::types::ProducerRegistry; +use sled_agent_client::Client as SledAgentClient; +use std::borrow::Cow; +use std::collections::BTreeMap; +use std::future::Future; +use std::net::IpAddr; +use std::num::NonZeroU32; +use std::sync::Arc; +use std::sync::Mutex; +use uuid::Uuid; + +/// Background task that periodically checks instance states. +pub(crate) struct InstanceWatcher { + datastore: Arc, + resolver: internal_dns::resolver::Resolver, + metrics: Arc>, + id: WatcherIdentity, +} + +const MAX_SLED_AGENTS: NonZeroU32 = unsafe { + // Safety: last time I checked, 100 was greater than zero. + NonZeroU32::new_unchecked(100) +}; + +impl InstanceWatcher { + pub(crate) fn new( + datastore: Arc, + resolver: internal_dns::resolver::Resolver, + producer_registry: &ProducerRegistry, + id: WatcherIdentity, + ) -> Self { + let metrics = Arc::new(Mutex::new(metrics::Metrics::default())); + producer_registry + .register_producer(metrics::Producer(metrics.clone())) + .unwrap(); + Self { datastore, resolver, metrics, id } + } + + fn check_instance( + &self, + opctx: &OpContext, + client: &SledAgentClient, + target: VirtualMachine, + ) -> impl Future + Send + 'static { + let datastore = self.datastore.clone(); + let resolver = self.resolver.clone(); + + let opctx = opctx.child( + std::iter::once(( + "instance_id".to_string(), + target.instance_id.to_string(), + )) + .collect(), + ); + let client = client.clone(); + + async move { + slog::trace!(opctx.log, "checking on instance..."); + let rsp = client.instance_get_state(&target.instance_id).await; + let mut check = + Check { target, outcome: Default::default(), result: Ok(()) }; + let state = match rsp { + Ok(rsp) => rsp.into_inner(), + Err(ClientError::ErrorResponse(rsp)) => { + let status = rsp.status(); + if status == StatusCode::NOT_FOUND + && rsp.as_ref().error_code.as_deref() + == Some("NO_SUCH_INSTANCE") + { + slog::info!(opctx.log, "instance is wayyyyy gone"); + // TODO(eliza): eventually, we should attempt to put the + // instance in the `Failed` state here. + check.outcome = + CheckOutcome::Failure(Failure::NoSuchInstance); + return check; + } + if status.is_client_error() { + slog::warn!(opctx.log, "check failed due to client error"; + "status" => ?status, "error" => ?rsp.into_inner()); + check.result = + Err(Incomplete::ClientHttpError(status.as_u16())); + } else { + slog::info!(opctx.log, "check failed due to server error"; + "status" => ?status, "error" => ?rsp.into_inner()); + } + + check.outcome = CheckOutcome::Failure( + Failure::SledAgentResponse(status.as_u16()), + ); + return check; + } + Err(ClientError::CommunicationError(e)) => { + // TODO(eliza): eventually, we may want to transition the + // instance to the `Failed` state if the sled-agent has been + // unreachable for a while. We may also want to take other + // corrective actions or alert an operator in this case. + // + // TODO(eliza): because we have the preported IP address + // of the instance's VMM from our databse query, we could + // also ask the VMM directly when the sled-agent is + // unreachable. We should start doing that here at some + // point. + slog::info!(opctx.log, "sled agent is unreachable"; "error" => ?e); + check.outcome = + CheckOutcome::Failure(Failure::SledAgentUnreachable); + return check; + } + Err(e) => { + slog::warn!( + opctx.log, + "error checking up on instance"; + "error" => ?e, + "status" => ?e.status(), + ); + check.result = Err(Incomplete::ClientError); + return check; + } + }; + + let new_runtime_state: SledInstanceState = state.into(); + check.outcome = + CheckOutcome::Success(new_runtime_state.vmm_state.state); + slog::debug!( + opctx.log, + "updating instance state"; + "state" => ?new_runtime_state.vmm_state.state, + ); + check.result = crate::app::instance::notify_instance_updated( + &datastore, + &resolver, + &opctx, + &opctx, + &opctx.log, + &target.instance_id, + &new_runtime_state, + ) + .await + .map_err(|e| { + slog::warn!( + opctx.log, + "error updating instance"; + "error" => ?e, + "state" => ?new_runtime_state.vmm_state.state, + ); + Incomplete::UpdateFailed + }) + .and_then(|updated| { + updated.ok_or_else(|| { + slog::warn!( + opctx.log, + "error updating instance: not found in database"; + "state" => ?new_runtime_state.vmm_state.state, + ); + Incomplete::InstanceNotFound + }) + }) + .map(|updated| { + slog::debug!( + opctx.log, + "update successful"; + "instance_updated" => updated.instance_updated, + "vmm_updated" => updated.vmm_updated, + "state" => ?new_runtime_state.vmm_state.state, + ); + }); + + check + } + } +} + +/// The identity of the process performing the health check, for distinguishing +/// health check metrics emitted by different Nexus instances. +/// +/// This is a struct just to ensure that the two UUIDs are named arguments +/// (rather than positional arguments) and can't be swapped accidentally. +#[derive(Copy, Clone)] +pub struct WatcherIdentity { + pub nexus_id: Uuid, + pub rack_id: Uuid, +} + +#[derive( + Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, oximeter::Target, +)] +struct VirtualMachine { + /// The rack ID of the Nexus process which performed the health check. + rack_id: Uuid, + /// The ID of the Nexus process which performed the health check. + nexus_id: Uuid, + /// The instance's ID. + instance_id: Uuid, + /// The silo ID of the instance's silo. + silo_id: Uuid, + /// The project ID of the instance. + project_id: Uuid, + /// The VMM ID of the instance's virtual machine manager. + vmm_id: Uuid, + /// The sled-agent's ID. + sled_agent_id: Uuid, + /// The sled agent's IP address. + sled_agent_ip: IpAddr, + /// The sled agent's port. + sled_agent_port: u16, +} + +impl VirtualMachine { + fn new( + WatcherIdentity { rack_id, nexus_id }: WatcherIdentity, + sled: &Sled, + instance: &Instance, + vmm: &Vmm, + project: &Project, + ) -> Self { + let addr = sled.address(); + Self { + rack_id, + nexus_id, + instance_id: instance.id(), + silo_id: project.silo_id, + project_id: project.id(), + vmm_id: vmm.id, + sled_agent_id: sled.id(), + sled_agent_ip: (*addr.ip()).into(), + sled_agent_port: addr.port(), + } + } +} + +struct Check { + target: VirtualMachine, + + /// The outcome of performing this check. Either we were able to reach the + /// sled-agent that owns this instance and it told us the instance's state + /// and VMM, or we the health check failed in a way that suggests a + /// potential issue with the sled-agent or instance. + /// + /// If we were not able to perform the request at all due to an error on + /// *our* end, this will be `None`. + outcome: CheckOutcome, + + /// `Some` if the instance check was unsuccessful. + /// + /// This indicates that something went wrong *while performing the check* that + /// does not necessarily indicate that the instance itself is in a bad + /// state. For example, the sled-agent client may have constructed an + /// invalid request, or an error may have occurred while updating the + /// instance in the database. + /// + /// Depending on when the error occurred, the `outcome` field may also + /// be populated. + result: Result<(), Incomplete>, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)] +enum CheckOutcome { + Success(InstanceState), + Failure(Failure), + #[default] + Unknown, +} + +impl Check { + fn state_str(&self) -> Cow<'static, str> { + match self.outcome { + CheckOutcome::Success(state) => state.label().into(), + CheckOutcome::Failure(_) => InstanceState::Failed.label().into(), + CheckOutcome::Unknown => "unknown".into(), + } + } + + fn reason_str(&self) -> Cow<'static, str> { + match self.outcome { + CheckOutcome::Success(_) => "success".into(), + CheckOutcome::Failure(reason) => reason.as_str(), + CheckOutcome::Unknown => match self.result { + Ok(()) => "unknown".into(), // this shouldn't happen, but there's no way to prevent it from happening, + Err(e) => e.as_str(), + }, + } + } +} + +#[derive( + Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, +)] +enum Failure { + /// The sled-agent for the sled on which the instance is running was + /// unreachable. + /// + /// This may indicate a network partition between us and that sled, that + /// the sled-agent process has crashed, or that the sled is down. + SledAgentUnreachable, + /// The sled-agent responded with an unexpected HTTP error. + SledAgentResponse(u16), + /// The sled-agent indicated that it doesn't know about an instance ID that + /// we believe it *should* know about. This probably means the sled-agent, + /// and potentially the whole sled, has been restarted. + NoSuchInstance, +} + +impl Failure { + fn as_str(&self) -> Cow<'static, str> { + match self { + Self::SledAgentUnreachable => "unreachable".into(), + Self::SledAgentResponse(status) => status.to_string().into(), + Self::NoSuchInstance => "no_such_instance".into(), + } + } +} + +#[derive( + Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, +)] +enum Incomplete { + /// The sled-agent responded with an HTTP client error, indicating that our + /// request as somehow malformed. + ClientHttpError(u16), + /// Something else went wrong while making an HTTP request. + ClientError, + /// We attempted to update the instance state in the database, but no + /// instance with that UUID existed. + /// + /// Because the instance UUIDs that we perform checks on come from querying + /// the instances table, this would probably indicate that the instance was + /// removed from the database between when we listed instances and when the + /// check completed. + InstanceNotFound, + /// Something went wrong while updating the state of the instance in the + /// database. + UpdateFailed, +} + +impl Incomplete { + fn as_str(&self) -> Cow<'static, str> { + match self { + Self::ClientHttpError(status) => status.to_string().into(), + Self::ClientError => "client_error".into(), + Self::InstanceNotFound => "instance_not_found".into(), + Self::UpdateFailed => "update_failed".into(), + } + } +} + +type ClientError = sled_agent_client::Error; + +impl BackgroundTask for InstanceWatcher { + fn activate<'a>( + &'a mut self, + opctx: &'a OpContext, + ) -> BoxFuture<'a, serde_json::Value> { + async { + let mut tasks = tokio::task::JoinSet::new(); + let mut paginator = Paginator::new(MAX_SLED_AGENTS); + let mk_client = |sled: &Sled| { + nexus_networking::sled_client_from_address( + sled.id(), + sled.address(), + &opctx.log, + ) + }; + + while let Some(p) = paginator.next() { + let maybe_batch = self + .datastore + .instance_and_vmm_list_by_sled_agent( + opctx, + &p.current_pagparams(), + ) + .await; + let batch = match maybe_batch { + Ok(batch) => batch, + Err(e) => { + slog::error!( + opctx.log, + "sled instances by sled agent query failed: {e}" + ); + return serde_json::json!({ "error": e.to_string() }); + } + }; + paginator = p.found_batch(&batch, &|(sled, _, _, _)| sled.id()); + + // When we iterate over the batch of sled instances, we pop the + // first sled from the batch before looping over the rest, to + // insure that the initial sled-agent client is created first, + // as we need the address of the first sled to construct it. + // We could, alternatively, make the sled-agent client an + // `Option`, but then every subsequent iteration would have to + // handle the case where it's `None`, and I thought this was a + // bit neater... + let mut batch = batch.into_iter(); + if let Some((mut curr_sled, instance, vmm, project)) = batch.next() { + let mut client = mk_client(&curr_sled); + let target = VirtualMachine::new(self.id, &curr_sled, &instance, &vmm, &project); + tasks.spawn(self.check_instance(opctx, &client, target)); + + for (sled, instance, vmm, project) in batch { + // We're now talking to a new sled agent; update the client. + if sled.id() != curr_sled.id() { + client = mk_client(&sled); + curr_sled = sled; + } + + let target = VirtualMachine::new(self.id, &curr_sled, &instance, &vmm, &project); + tasks.spawn(self.check_instance(opctx, &client, target)); + } + } + } + + // Now, wait for the check results to come back. + let mut total: usize = 0; + let mut instance_states: BTreeMap = + BTreeMap::new(); + let mut check_failures: BTreeMap = + BTreeMap::new(); + let mut check_errors: BTreeMap = BTreeMap::new(); + while let Some(result) = tasks.join_next().await { + total += 1; + let check = result.expect( + "a `JoinError` is returned if a spawned task \ + panics, or if the task is aborted. we never abort \ + tasks on this `JoinSet`, and nexus is compiled with \ + `panic=\"abort\"`, so neither of these cases should \ + ever occur", + ); + match check.outcome { + CheckOutcome::Success(state) => { + *instance_states + .entry(state.to_string()) + .or_default() += 1; + } + CheckOutcome::Failure(reason) => { + *check_failures.entry(reason.as_str().into_owned()).or_default() += 1; + } + CheckOutcome::Unknown => {} + } + if let Err(ref reason) = check.result { + *check_errors.entry(reason.as_str().into_owned()).or_default() += 1; + } + self.metrics.lock().unwrap().record_check(check); + } + + // All requests completed! Prune any old instance metrics for + // instances that we didn't check --- if we didn't spawn a check for + // something, that means it wasn't present in the most recent + // database query. + let pruned = self.metrics.lock().unwrap().prune(); + + slog::info!(opctx.log, "all instance checks complete"; + "total_instances" => total, + "total_completed" => instance_states.len() + check_failures.len(), + "total_failed" => check_failures.len(), + "total_incomplete" => check_errors.len(), + "pruned_instances" => pruned, + ); + serde_json::json!({ + "total_instances": total, + "instance_states": instance_states, + "failed_checks": check_failures, + "incomplete_checks": check_errors, + "pruned_instances": pruned, + }) + } + .boxed() + } +} + +mod metrics { + use super::{CheckOutcome, Incomplete, VirtualMachine}; + use oximeter::types::Cumulative; + use oximeter::Metric; + use oximeter::MetricsError; + use oximeter::Sample; + use std::borrow::Cow; + use std::collections::BTreeMap; + use std::sync::Arc; + use std::sync::Mutex; + + #[derive(Debug, Default)] + pub(super) struct Metrics { + instances: BTreeMap, + } + + #[derive(Debug)] + pub(super) struct Producer(pub(super) Arc>); + + #[derive(Debug, Default)] + struct Instance { + checks: BTreeMap, + check_errors: BTreeMap, + touched: bool, + } + + impl Metrics { + pub(crate) fn record_check(&mut self, check: super::Check) { + let instance = self.instances.entry(check.target).or_default(); + instance + .checks + .entry(check.outcome) + .or_insert_with(|| Check { + state: check.state_str(), + reason: check.reason_str(), + datum: Cumulative::default(), + }) + .datum += 1; + if let Err(error) = check.result { + instance + .check_errors + .entry(error) + .or_insert_with(|| IncompleteCheck { + reason: error.as_str(), + datum: Cumulative::default(), + }) + .datum += 1; + } + instance.touched = true; + } + + pub(super) fn prune(&mut self) -> usize { + let len = self.instances.len(); + self.instances.retain(|_, instance| { + std::mem::replace(&mut instance.touched, false) + }); + len - self.instances.len() + } + + fn len(&self) -> usize { + self.instances.values().map(Instance::len).sum() + } + } + + impl oximeter::Producer for Producer { + fn produce( + &mut self, + ) -> Result>, MetricsError> { + let metrics = self.0.lock().unwrap(); + let mut v = Vec::with_capacity(metrics.len()); + for (target, instance) in &metrics.instances { + instance.sample_into(target, &mut v)?; + } + Ok(Box::new(v.into_iter())) + } + } + + impl Instance { + fn len(&self) -> usize { + self.checks.len() + self.check_errors.len() + } + + fn sample_into( + &self, + target: &VirtualMachine, + dest: &mut Vec, + ) -> Result<(), MetricsError> { + for metric in self.checks.values() { + dest.push(Sample::new(target, metric)?); + } + for metric in self.check_errors.values() { + dest.push(Sample::new(target, metric)?); + } + Ok(()) + } + } + + /// The number of successful checks for a single instance, VMM, and sled agent. + #[derive(Clone, Debug, Metric)] + struct Check { + /// The string representation of the instance's state as understood by + /// the VMM. If the check failed, this will generally be "failed". + state: Cow<'static, str>, + /// `Why the instance was marked as being in this state. + /// + /// If an instance was marked as "failed" due to a check failure, this + /// will be a string representation of the failure reason. Otherwise, if + /// the check was successful, this will be "success". Note that this may + /// be "success" even if the instance's state is "failed", which + /// indicates that we successfully queried the instance's state from the + /// sled-agent, and the *sled-agent* reported that the instance has + /// failed --- which is distinct from the instance watcher marking an + /// instance as failed due to a failed check. + reason: Cow<'static, str>, + /// The number of checks for this instance and sled agent which recorded + /// this state for this reason. + datum: Cumulative, + } + + /// The number of unsuccessful checks for an instance and sled agent pair. + #[derive(Clone, Debug, Metric)] + struct IncompleteCheck { + /// The reason why the check was unsuccessful. + /// + /// This is generated from the [`Incomplete`] enum's `Display` implementation. + reason: Cow<'static, str>, + /// The number of failed checks for this instance and sled agent. + datum: Cumulative, + } +} diff --git a/nexus/src/app/background/mod.rs b/nexus/src/app/background/mod.rs index 526ed84e4f..512c782b2e 100644 --- a/nexus/src/app/background/mod.rs +++ b/nexus/src/app/background/mod.rs @@ -13,6 +13,7 @@ mod dns_propagation; mod dns_servers; mod external_endpoints; mod init; +mod instance_watcher; mod inventory_collection; mod metrics_producer_gc; mod nat_cleanup; diff --git a/nexus/src/app/instance.rs b/nexus/src/app/instance.rs index b64757b690..50b46c8e8d 100644 --- a/nexus/src/app/instance.rs +++ b/nexus/src/app/instance.rs @@ -1523,7 +1523,8 @@ impl super::Nexus { instance_id, new_runtime_state, ) - .await + .await?; + Ok(()) } /// Returns the requested range of serial console output bytes, @@ -1952,6 +1953,16 @@ impl super::Nexus { } } +/// Records what aspects of an instance's state were actually changed in a +/// [`notify_instance_updated`] call. +/// +/// This is (presently) used for debugging purposes only. +#[derive(Copy, Clone)] +pub(crate) struct InstanceUpdated { + pub instance_updated: bool, + pub vmm_updated: bool, +} + /// Invoked by a sled agent to publish an updated runtime state for an /// Instance. pub(crate) async fn notify_instance_updated( @@ -1962,7 +1973,7 @@ pub(crate) async fn notify_instance_updated( log: &slog::Logger, instance_id: &Uuid, new_runtime_state: &nexus::SledInstanceState, -) -> Result<(), Error> { +) -> Result, Error> { let propolis_id = new_runtime_state.propolis_id; info!(log, "received new runtime state from sled agent"; @@ -2103,7 +2114,7 @@ pub(crate) async fn notify_instance_updated( "propolis_id" => %propolis_id, "instance_updated" => instance_updated, "vmm_updated" => vmm_updated); - Ok(()) + Ok(Some(InstanceUpdated { instance_updated, vmm_updated })) } // The update command should swallow object-not-found errors and @@ -2114,7 +2125,7 @@ pub(crate) async fn notify_instance_updated( an object not found error"; "instance_id" => %instance_id, "propolis_id" => %propolis_id); - Ok(()) + Ok(None) } // If the datastore is unavailable, propagate that to the caller. diff --git a/nexus/src/app/mod.rs b/nexus/src/app/mod.rs index f8cccd89a4..a22fad0c81 100644 --- a/nexus/src/app/mod.rs +++ b/nexus/src/app/mod.rs @@ -387,6 +387,7 @@ impl Nexus { config.deployment.id, resolver.clone(), saga_request, + producer_registry, ); let external_resolver = { diff --git a/nexus/tests/config.test.toml b/nexus/tests/config.test.toml index 8b3d20a024..25a6d97efc 100644 --- a/nexus/tests/config.test.toml +++ b/nexus/tests/config.test.toml @@ -109,6 +109,7 @@ blueprints.period_secs_execute = 600 sync_service_zone_nat.period_secs = 30 switch_port_settings_manager.period_secs = 30 region_replacement.period_secs = 30 +instance_watcher.period_secs = 30 service_firewall_propagation.period_secs = 300 [default_region_allocation_strategy] diff --git a/nexus/tests/integration_tests/instances.rs b/nexus/tests/integration_tests/instances.rs index 08cf195384..0b5947ef7e 100644 --- a/nexus/tests/integration_tests/instances.rs +++ b/nexus/tests/integration_tests/instances.rs @@ -116,7 +116,9 @@ fn default_vpc_subnets_url() -> String { format!("/v1/vpc-subnets?{}&vpc=default", get_project_selector()) } -async fn create_project_and_pool(client: &ClientTestContext) -> views::Project { +pub async fn create_project_and_pool( + client: &ClientTestContext, +) -> views::Project { create_default_ip_pool(client).await; create_project(client, PROJECT_NAME).await } diff --git a/nexus/tests/integration_tests/metrics.rs b/nexus/tests/integration_tests/metrics.rs index 62c24a73e3..ec44c3747a 100644 --- a/nexus/tests/integration_tests/metrics.rs +++ b/nexus/tests/integration_tests/metrics.rs @@ -4,6 +4,9 @@ use std::time::Duration; +use crate::integration_tests::instances::{ + create_project_and_pool, instance_post, instance_simulate, InstanceOp, +}; use chrono::Utc; use dropshot::test_util::ClientTestContext; use dropshot::ResultsPage; @@ -277,6 +280,289 @@ async fn test_timeseries_schema_list( .expect("Failed to find HTTP request latency histogram schema"); } +pub async fn timeseries_query( + cptestctx: &ControlPlaneTestContext, + query: impl ToString, +) -> Vec { + // first, make sure the latest timeseries have been collected. + cptestctx.oximeter.force_collect().await; + + // okay, do the query + let body = nexus_types::external_api::params::TimeseriesQuery { + query: query.to_string(), + }; + let query = &body.query; + let rsp = NexusRequest::new( + nexus_test_utils::http_testing::RequestBuilder::new( + &cptestctx.external_client, + http::Method::POST, + "/v1/timeseries/query", + ) + .body(Some(&body)), + ) + .authn_as(AuthnMode::PrivilegedUser) + .execute() + .await + .unwrap_or_else(|e| { + panic!("timeseries query failed: {e:?}\nquery: {query}") + }); + rsp.parsed_body().unwrap_or_else(|e| { + panic!( + "could not parse timeseries query response: {e:?}\n\ + query: {query}\nresponse: {rsp:#?}" + ); + }) +} + +#[nexus_test] +async fn test_instance_watcher_metrics( + cptestctx: &ControlPlaneTestContext, +) { + use oximeter::types::FieldValue; + const INSTANCE_ID_FIELD: &str = "instance_id"; + const STATE_FIELD: &str = "state"; + const STATE_STARTING: &str = "starting"; + const STATE_RUNNING: &str = "running"; + const STATE_STOPPING: &str = "stopping"; + const OXQL_QUERY: &str = "get virtual_machine:check"; + + let client = &cptestctx.external_client; + let internal_client = &cptestctx.internal_client; + let nexus = &cptestctx.server.apictx().nexus; + + // TODO(eliza): consider factoring this out to a generic + // `activate_background_task` function in `nexus-test-utils` eventually? + let activate_instance_watcher = || async { + use nexus_client::types::BackgroundTask; + use nexus_client::types::CurrentStatus; + use nexus_client::types::CurrentStatusRunning; + use nexus_client::types::LastResult; + use nexus_client::types::LastResultCompleted; + + fn most_recent_start_time( + task: &BackgroundTask, + ) -> Option> { + match task.current { + CurrentStatus::Idle => match task.last { + LastResult::Completed(LastResultCompleted { + start_time, + .. + }) => Some(start_time), + LastResult::NeverCompleted => None, + }, + CurrentStatus::Running(CurrentStatusRunning { + start_time, + .. + }) => Some(start_time), + } + } + + eprintln!("\n --- activating instance watcher ---\n"); + let task = NexusRequest::object_get( + internal_client, + "/bgtasks/view/instance_watcher", + ) + .execute_and_parse_unwrap::() + .await; + let last_start = most_recent_start_time(&task); + + internal_client + .make_request( + http::Method::POST, + "/bgtasks/activate", + Some(serde_json::json!({ + "bgtask_names": vec![String::from("instance_watcher")] + })), + http::StatusCode::NO_CONTENT, + ) + .await + .unwrap(); + // Wait for the instance watcher task to finish + wait_for_condition( + || async { + let task = NexusRequest::object_get( + internal_client, + "/bgtasks/view/instance_watcher", + ) + .execute_and_parse_unwrap::() + .await; + if matches!(&task.current, CurrentStatus::Idle) + && most_recent_start_time(&task) > last_start + { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &Duration::from_millis(500), + &Duration::from_secs(60), + ) + .await + .unwrap(); + }; + + #[track_caller] + fn count_state( + table: &oximeter_db::oxql::Table, + instance_id: Uuid, + state: &'static str, + ) -> i64 { + use oximeter_db::oxql::point::ValueArray; + let uuid = FieldValue::Uuid(instance_id); + let state = FieldValue::String(state.into()); + let mut timeserieses = table.timeseries().filter(|ts| { + ts.fields.get(INSTANCE_ID_FIELD) == Some(&uuid) + && ts.fields.get(STATE_FIELD) == Some(&state) + }); + let Some(timeseries) = timeserieses.next() else { + panic!( + "missing timeseries for instance {instance_id}, state {state}\n\ + found: {table:#?}" + ) + }; + if let Some(timeseries) = timeserieses.next() { + panic!( + "multiple timeseries for instance {instance_id}, state {state}: \ + {timeseries:?}, {timeseries:?}, ...\n\ + found: {table:#?}" + ) + } + match timeseries.points.values(0) { + Some(ValueArray::Integer(ref vals)) => { + vals.iter().filter_map(|&v| v).sum() + } + x => panic!( + "expected timeseries for instance {instance_id}, \ + state {state} to be an integer, but found: {x:?}" + ), + } + } + + // N.B. that we've gotta use the project name that this function hardcodes + // if we're going to use the `instance_post` test helper later. + let project = create_project_and_pool(&client).await; + let project_name = project.identity.name.as_str(); + // Wait until Nexus registers as a producer with Oximeter. + wait_for_producer( + &cptestctx.oximeter, + cptestctx.server.apictx().nexus.id(), + ) + .await; + + eprintln!("--- creating instance 1 ---"); + let instance1 = create_instance(&client, project_name, "i-1").await; + let instance1_uuid = instance1.identity.id; + + // activate the instance watcher background task. + activate_instance_watcher().await; + + let metrics = timeseries_query(&cptestctx, OXQL_QUERY).await; + let checks = metrics + .iter() + .find(|t| t.name() == "virtual_machine:check") + .expect("missing virtual_machine:check"); + let ts = dbg!(count_state(&checks, instance1_uuid, STATE_STARTING)); + assert_eq!(ts, 1); + + // okay, make another instance + eprintln!("--- creating instance 2 ---"); + let instance2 = create_instance(&client, project_name, "i-2").await; + let instance2_uuid = instance2.identity.id; + + // activate the instance watcher background task. + activate_instance_watcher().await; + + let metrics = timeseries_query(&cptestctx, OXQL_QUERY).await; + let checks = metrics + .iter() + .find(|t| t.name() == "virtual_machine:check") + .expect("missing virtual_machine:check"); + let ts1 = dbg!(count_state(&checks, instance1_uuid, STATE_STARTING)); + let ts2 = dbg!(count_state(&checks, instance2_uuid, STATE_STARTING)); + assert_eq!(ts1, 2); + assert_eq!(ts2, 1); + + // poke instance 1 to get it into the running state + eprintln!("--- starting instance 1 ---"); + instance_simulate(nexus, &instance1_uuid).await; + + // activate the instance watcher background task. + activate_instance_watcher().await; + + let metrics = timeseries_query(&cptestctx, OXQL_QUERY).await; + let checks = metrics + .iter() + .find(|t| t.name() == "virtual_machine:check") + .expect("missing virtual_machine:check"); + let ts1_starting = + dbg!(count_state(&checks, instance1_uuid, STATE_STARTING)); + let ts1_running = dbg!(count_state(&checks, instance1_uuid, STATE_RUNNING)); + let ts2 = dbg!(count_state(&checks, instance2_uuid, STATE_STARTING)); + assert_eq!(ts1_starting, 2); + assert_eq!(ts1_running, 1); + assert_eq!(ts2, 2); + + // poke instance 2 to get it into the Running state. + eprintln!("--- starting instance 2 ---"); + instance_simulate(nexus, &instance2_uuid).await; + // stop instance 1 + eprintln!("--- start stopping instance 1 ---"); + instance_simulate(nexus, &instance1_uuid).await; + instance_post(&client, &instance1.identity.name.as_str(), InstanceOp::Stop) + .await; + + // activate the instance watcher background task. + activate_instance_watcher().await; + + let metrics = timeseries_query(&cptestctx, OXQL_QUERY).await; + let checks = metrics + .iter() + .find(|t| t.name() == "virtual_machine:check") + .expect("missing virtual_machine:check"); + + let ts1_starting = + dbg!(count_state(&checks, instance1_uuid, STATE_STARTING)); + let ts1_running = dbg!(count_state(&checks, instance1_uuid, STATE_RUNNING)); + let ts1_stopping = + dbg!(count_state(&checks, instance1_uuid, STATE_STOPPING)); + let ts2_starting = + dbg!(count_state(&checks, instance2_uuid, STATE_STARTING)); + let ts2_running = dbg!(count_state(&checks, instance2_uuid, STATE_RUNNING)); + assert_eq!(ts1_starting, 2); + assert_eq!(ts1_running, 1); + assert_eq!(ts1_stopping, 1); + assert_eq!(ts2_starting, 2); + assert_eq!(ts2_running, 1); + + // simulate instance 1 completing its stop, which will remove it from the + // set of active instances in CRDB. now, it won't be checked again. + + eprintln!("--- finish stopping instance 1 ---"); + instance_simulate(nexus, &instance1_uuid).await; + + // activate the instance watcher background task. + activate_instance_watcher().await; + + let metrics = timeseries_query(&cptestctx, OXQL_QUERY).await; + let checks = metrics + .iter() + .find(|t| t.name() == "virtual_machine:check") + .expect("missing virtual_machine:check"); + let ts1_starting = + dbg!(count_state(&checks, instance1_uuid, STATE_STARTING)); + let ts1_running = dbg!(count_state(&checks, instance1_uuid, STATE_RUNNING)); + let ts1_stopping = + dbg!(count_state(&checks, instance1_uuid, STATE_STOPPING)); + let ts2_starting = + dbg!(count_state(&checks, instance2_uuid, STATE_STARTING)); + let ts2_running = dbg!(count_state(&checks, instance2_uuid, STATE_RUNNING)); + assert_eq!(ts1_starting, 2); + assert_eq!(ts1_running, 1); + assert_eq!(ts1_stopping, 1); + assert_eq!(ts2_starting, 2); + assert_eq!(ts2_running, 2); +} + /// Wait until a producer is registered with Oximeter. /// /// This blocks until the producer is registered, for up to 60s. It panics if diff --git a/openapi/sled-agent.json b/openapi/sled-agent.json index 77c4dd4170..fab597a761 100644 --- a/openapi/sled-agent.json +++ b/openapi/sled-agent.json @@ -468,6 +468,38 @@ } }, "/instances/{instance_id}/state": { + "get": { + "operationId": "instance_get_state", + "parameters": [ + { + "in": "path", + "name": "instance_id", + "required": true, + "schema": { + "type": "string", + "format": "uuid" + } + } + ], + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/SledInstanceState" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + }, "put": { "operationId": "instance_put_state", "parameters": [ diff --git a/schema/crdb/add-lookup-vmm-by-sled-id-index/up.sql b/schema/crdb/add-lookup-vmm-by-sled-id-index/up.sql new file mode 100644 index 0000000000..7f9262e4fe --- /dev/null +++ b/schema/crdb/add-lookup-vmm-by-sled-id-index/up.sql @@ -0,0 +1,3 @@ +CREATE INDEX IF NOT EXISTS lookup_vmms_by_sled_id ON omicron.public.vmm ( + sled_id +) WHERE time_deleted IS NULL; diff --git a/schema/crdb/dbinit.sql b/schema/crdb/dbinit.sql index e77b7b81ef..e7025f2499 100644 --- a/schema/crdb/dbinit.sql +++ b/schema/crdb/dbinit.sql @@ -3426,6 +3426,10 @@ CREATE TABLE IF NOT EXISTS omicron.public.vmm ( propolis_port INT4 NOT NULL CHECK (propolis_port BETWEEN 0 AND 65535) DEFAULT 12400 ); +CREATE INDEX IF NOT EXISTS lookup_vmms_by_sled_id ON omicron.public.vmm ( + sled_id +) WHERE time_deleted IS NULL; + /* * A special view of an instance provided to operators for insights into what's * running on a sled. @@ -3842,7 +3846,7 @@ INSERT INTO omicron.public.db_metadata ( version, target_version ) VALUES - (TRUE, NOW(), NOW(), '59.0.0', NULL) + (TRUE, NOW(), NOW(), '60.0.0', NULL) ON CONFLICT DO NOTHING; COMMIT; diff --git a/sled-agent/src/http_entrypoints.rs b/sled-agent/src/http_entrypoints.rs index b457047ad6..99c7725fe3 100644 --- a/sled-agent/src/http_entrypoints.rs +++ b/sled-agent/src/http_entrypoints.rs @@ -51,6 +51,7 @@ pub fn api() -> SledApiDescription { api.register(instance_issue_disk_snapshot_request)?; api.register(instance_put_migration_ids)?; api.register(instance_put_state)?; + api.register(instance_get_state)?; api.register(instance_put_external_ip)?; api.register(instance_delete_external_ip)?; api.register(instance_register)?; @@ -473,6 +474,19 @@ async fn instance_put_state( )) } +#[endpoint { + method = GET, + path = "/instances/{instance_id}/state", +}] +async fn instance_get_state( + rqctx: RequestContext, + path_params: Path, +) -> Result, HttpError> { + let sa = rqctx.context(); + let instance_id = path_params.into_inner().instance_id; + Ok(HttpResponseOk(sa.instance_get_state(instance_id).await?)) +} + #[endpoint { method = PUT, path = "/instances/{instance_id}/migration-ids", diff --git a/sled-agent/src/instance.rs b/sled-agent/src/instance.rs index 94ad8522c7..271eceb556 100644 --- a/sled-agent/src/instance.rs +++ b/sled-agent/src/instance.rs @@ -405,7 +405,7 @@ impl InstanceRunner { .map_err(|_| Error::FailedSendClientClosed) }, Some(CurrentState{ tx }) => { - tx.send(self.current_state().await) + tx.send(self.current_state()) .map_err(|_| Error::FailedSendClientClosed) }, Some(PutState{ state, tx }) => { @@ -1176,7 +1176,7 @@ impl InstanceRunner { } } - async fn current_state(&self) -> SledInstanceState { + fn current_state(&self) -> SledInstanceState { self.state.sled_instance_state() } diff --git a/sled-agent/src/instance_manager.rs b/sled-agent/src/instance_manager.rs index cf6563b117..ee1425f0d7 100644 --- a/sled-agent/src/instance_manager.rs +++ b/sled-agent/src/instance_manager.rs @@ -310,6 +310,19 @@ impl InstanceManager { pub fn reservoir_size(&self) -> ByteCount { self.inner.vmm_reservoir_manager.reservoir_size() } + + pub async fn get_instance_state( + &self, + instance_id: Uuid, + ) -> Result { + let (tx, rx) = oneshot::channel(); + self.inner + .tx + .send(InstanceManagerRequest::GetState { instance_id, tx }) + .await + .map_err(|_| Error::FailedSendInstanceManagerClosed)?; + rx.await? + } } // Most requests that can be sent to the "InstanceManagerRunner" task. @@ -365,6 +378,10 @@ enum InstanceManagerRequest { ip: InstanceExternalIpBody, tx: oneshot::Sender>, }, + GetState { + instance_id: Uuid, + tx: oneshot::Sender>, + }, } // Requests that the instance manager stop processing information about a @@ -467,6 +484,14 @@ impl InstanceManagerRunner { Some(InstanceDeleteExternalIp { instance_id, ip, tx }) => { self.delete_external_ip(tx, instance_id, &ip).await }, + Some(GetState { instance_id, tx }) => { + // TODO(eliza): it could potentially be nice to + // refactor this to use `tokio::sync::watch`, rather + // than having to force `GetState` requests to + // serialize with the requests that actually update + // the state... + self.get_instance_state(tx, instance_id).await + }, None => { warn!(self.log, "InstanceManager's request channel closed; shutting down"); break; @@ -732,6 +757,22 @@ impl InstanceManagerRunner { instance.delete_external_ip(tx, ip).await?; Ok(()) } + + async fn get_instance_state( + &self, + tx: oneshot::Sender>, + instance_id: Uuid, + ) -> Result<(), Error> { + let Some(instance) = self.get_instance(instance_id) else { + return tx + .send(Err(Error::NoSuchInstance(instance_id))) + .map_err(|_| Error::FailedSendClientClosed); + }; + + let state = instance.current_state().await?; + tx.send(Ok(state)).map_err(|_| Error::FailedSendClientClosed)?; + Ok(()) + } } /// Represents membership of an instance in the [`InstanceManager`]. diff --git a/sled-agent/src/sim/http_entrypoints.rs b/sled-agent/src/sim/http_entrypoints.rs index 73e94e949b..6cddac6fb8 100644 --- a/sled-agent/src/sim/http_entrypoints.rs +++ b/sled-agent/src/sim/http_entrypoints.rs @@ -41,6 +41,7 @@ pub fn api() -> SledApiDescription { fn register_endpoints(api: &mut SledApiDescription) -> Result<(), String> { api.register(instance_put_migration_ids)?; api.register(instance_put_state)?; + api.register(instance_get_state)?; api.register(instance_register)?; api.register(instance_unregister)?; api.register(instance_put_external_ip)?; @@ -134,6 +135,19 @@ async fn instance_put_state( )) } +#[endpoint { + method = GET, + path = "/instances/{instance_id}/state", +}] +async fn instance_get_state( + rqctx: RequestContext>, + path_params: Path, +) -> Result, HttpError> { + let sa = rqctx.context(); + let instance_id = path_params.into_inner().instance_id; + Ok(HttpResponseOk(sa.instance_get_state(instance_id).await?)) +} + #[endpoint { method = PUT, path = "/instances/{instance_id}/migration-ids", diff --git a/sled-agent/src/sim/sled_agent.rs b/sled-agent/src/sim/sled_agent.rs index 37086a8343..298a8adc34 100644 --- a/sled-agent/src/sim/sled_agent.rs +++ b/sled-agent/src/sim/sled_agent.rs @@ -477,6 +477,22 @@ impl SledAgent { Ok(InstancePutStateResponse { updated_runtime: Some(new_state) }) } + pub async fn instance_get_state( + &self, + instance_id: Uuid, + ) -> Result { + let instance = self + .instances + .sim_get_cloned_object(&instance_id) + .await + .map_err(|_| { + crate::sled_agent::Error::Instance( + crate::instance_manager::Error::NoSuchInstance(instance_id), + ) + })?; + Ok(instance.current()) + } + pub async fn set_instance_ensure_state_error(&self, error: Option) { *self.instance_ensure_state_error.lock().await = error; } diff --git a/sled-agent/src/sled_agent.rs b/sled-agent/src/sled_agent.rs index 4216cd4b6a..39a5647420 100644 --- a/sled-agent/src/sled_agent.rs +++ b/sled-agent/src/sled_agent.rs @@ -171,16 +171,15 @@ impl From for omicron_common::api::external::Error { impl From for dropshot::HttpError { fn from(err: Error) -> Self { match err { - Error::Instance(instance_manager_error) => { - match instance_manager_error { - crate::instance_manager::Error::Instance( - instance_error, - ) => match instance_error { - crate::instance::Error::Propolis(propolis_error) => { - // Work around dropshot#693: HttpError::for_status - // only accepts client errors and asserts on server - // errors, so convert server errors by hand. - match propolis_error.status() { + Error::Instance(crate::instance_manager::Error::Instance( + instance_error, + )) => { + match instance_error { + crate::instance::Error::Propolis(propolis_error) => { + // Work around dropshot#693: HttpError::for_status + // only accepts client errors and asserts on server + // errors, so convert server errors by hand. + match propolis_error.status() { None => HttpError::for_internal_error( propolis_error.to_string(), ), @@ -196,18 +195,22 @@ impl From for dropshot::HttpError { HttpError::for_internal_error(propolis_error.to_string()), } } - } - crate::instance::Error::Transition(omicron_error) => { - // Preserve the status associated with the wrapped - // Omicron error so that Nexus will see it in the - // Progenitor client error it gets back. - HttpError::from(omicron_error) - } - e => HttpError::for_internal_error(e.to_string()), - }, + } + crate::instance::Error::Transition(omicron_error) => { + // Preserve the status associated with the wrapped + // Omicron error so that Nexus will see it in the + // Progenitor client error it gets back. + HttpError::from(omicron_error) + } e => HttpError::for_internal_error(e.to_string()), } } + Error::Instance( + e @ crate::instance_manager::Error::NoSuchInstance(_), + ) => HttpError::for_not_found( + Some("NO_SUCH_INSTANCE".to_string()), + e.to_string(), + ), Error::ZoneBundle(ref inner) => match inner { BundleError::NoStorage | BundleError::Unavailable { .. } => { HttpError::for_unavail(None, inner.to_string()) @@ -982,6 +985,18 @@ impl SledAgent { .map_err(|e| Error::Instance(e)) } + /// Returns the state of the instance with the provided ID. + pub async fn instance_get_state( + &self, + instance_id: Uuid, + ) -> Result { + self.inner + .instances + .get_instance_state(instance_id) + .await + .map_err(|e| Error::Instance(e)) + } + /// Idempotently ensures that the given virtual disk is attached (or not) as /// specified. /// diff --git a/smf/nexus/multi-sled/config-partial.toml b/smf/nexus/multi-sled/config-partial.toml index 62e8b51b07..696411966b 100644 --- a/smf/nexus/multi-sled/config-partial.toml +++ b/smf/nexus/multi-sled/config-partial.toml @@ -56,6 +56,7 @@ sync_service_zone_nat.period_secs = 30 switch_port_settings_manager.period_secs = 30 region_replacement.period_secs = 30 service_firewall_propagation.period_secs = 300 +instance_watcher.period_secs = 30 [default_region_allocation_strategy] # by default, allocate across 3 distinct sleds diff --git a/smf/nexus/single-sled/config-partial.toml b/smf/nexus/single-sled/config-partial.toml index d5a4b4eb77..206f716fa7 100644 --- a/smf/nexus/single-sled/config-partial.toml +++ b/smf/nexus/single-sled/config-partial.toml @@ -56,6 +56,7 @@ sync_service_zone_nat.period_secs = 30 switch_port_settings_manager.period_secs = 30 region_replacement.period_secs = 30 service_firewall_propagation.period_secs = 300 +instance_watcher.period_secs = 30 [default_region_allocation_strategy] # by default, allocate without requirement for distinct sleds. diff --git a/uuid-kinds/src/lib.rs b/uuid-kinds/src/lib.rs index 41d1bfc1f6..489e0da365 100644 --- a/uuid-kinds/src/lib.rs +++ b/uuid-kinds/src/lib.rs @@ -53,6 +53,7 @@ impl_typed_uuid_kind! { Downstairs => "downstairs", DownstairsRegion => "downstairs_region", ExternalIp => "external_ip", + Instance => "instance", LoopbackAddress => "loopback_address", OmicronZone => "service", PhysicalDisk => "physical_disk",