Skip to content

Commit

Permalink
Stop collecting Propolis metrics on instance stop (#4495)
Browse files Browse the repository at this point in the history
When Nexus responds to a sled-agent notification that the instance is
stopped and its Propolis server is gone, hard-delete the assignment
record and ask `oximeter` to stop collecting from it.
  • Loading branch information
bnaecker authored Nov 14, 2023
1 parent 2fc0dfd commit a8a49c3
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 7 deletions.
5 changes: 3 additions & 2 deletions nexus/db-model/src/oximeter_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use chrono::{DateTime, Utc};
use nexus_types::internal_api;
use uuid::Uuid;

/// Message used to notify Nexus that this oximeter instance is up and running.
/// A record representing a registered `oximeter` collector.
#[derive(Queryable, Insertable, Debug, Clone, Copy)]
#[diesel(table_name = oximeter)]
pub struct OximeterInfo {
Expand All @@ -18,8 +18,9 @@ pub struct OximeterInfo {
pub time_created: DateTime<Utc>,
/// When this resource was last modified.
pub time_modified: DateTime<Utc>,
/// The address on which this oximeter instance listens for requests
/// The address on which this `oximeter` instance listens for requests.
pub ip: ipnetwork::IpNetwork,
/// The port on which this `oximeter` instance listens for requests.
pub port: SqlU16,
}

Expand Down
41 changes: 37 additions & 4 deletions nexus/db-queries/src/db/datastore/oximeter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,20 @@ use omicron_common::api::external::ResourceType;
use uuid::Uuid;

impl DataStore {
// Create a record for a new Oximeter instance
/// Lookup an oximeter instance by its ID.
pub async fn oximeter_lookup(
&self,
id: &Uuid,
) -> Result<OximeterInfo, Error> {
use db::schema::oximeter::dsl;
dsl::oximeter
.find(*id)
.first_async(&*self.pool_connection_unauthorized().await?)
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))
}

/// Create a record for a new Oximeter instance
pub async fn oximeter_create(
&self,
info: &OximeterInfo,
Expand Down Expand Up @@ -55,7 +68,7 @@ impl DataStore {
Ok(())
}

// List the oximeter collector instances
/// List the oximeter collector instances
pub async fn oximeter_list(
&self,
page_params: &DataPageParams<'_, Uuid>,
Expand All @@ -69,7 +82,7 @@ impl DataStore {
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))
}

// Create a record for a new producer endpoint
/// Create a record for a new producer endpoint
pub async fn producer_endpoint_create(
&self,
producer: &ProducerEndpoint,
Expand Down Expand Up @@ -102,7 +115,27 @@ impl DataStore {
Ok(())
}

// List the producer endpoint records by the oximeter instance to which they're assigned.
/// Delete a record for a producer endpoint, by its ID.
///
/// This is idempotent, and deleting a record that is already removed is a
/// no-op. If the record existed, then the ID of the `oximeter` collector is
/// returned. If there was no record, `None` is returned.
pub async fn producer_endpoint_delete(
&self,
id: &Uuid,
) -> Result<Option<Uuid>, Error> {
use db::schema::metric_producer::dsl;
diesel::delete(dsl::metric_producer.find(*id))
.returning(dsl::oximeter_id)
.get_result_async::<Uuid>(
&*self.pool_connection_unauthorized().await?,
)
.await
.optional()
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))
}

/// List the producer endpoint records by the oximeter instance to which they're assigned.
pub async fn producers_list_by_oximeter_id(
&self,
oximeter_id: Uuid,
Expand Down
19 changes: 18 additions & 1 deletion nexus/src/app/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1320,7 +1320,9 @@ impl super::Nexus {
.await?;

// If the supplied instance state indicates that the instance no longer
// has an active VMM, attempt to delete the virtual provisioning record
// has an active VMM, attempt to delete the virtual provisioning record,
// and the assignment of the Propolis metric producer to an oximeter
// collector.
//
// As with updating networking state, this must be done before
// committing the new runtime state to the database: once the DB is
Expand All @@ -1338,6 +1340,21 @@ impl super::Nexus {
(&new_runtime_state.instance_state.gen).into(),
)
.await?;

// TODO-correctness: The `notify_instance_updated` method can run
// concurrently with itself in some situations, such as where a
// sled-agent attempts to update Nexus about a stopped instance;
// that times out; and it makes another request to a different
// Nexus. The call to `unassign_producer` is racy in those
// situations, and we may end with instances with no metrics.
//
// This unfortunate case should be handled as part of
// instance-lifecycle improvements, notably using a reliable
// persistent workflow to correctly update the oximete assignment as
// an instance's state changes.
//
// Tracked in https://github.com/oxidecomputer/omicron/issues/3742.
self.unassign_producer(instance_id).await?;
}

// Write the new instance and VMM states back to CRDB. This needs to be
Expand Down
52 changes: 52 additions & 0 deletions nexus/src/app/oximeter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,58 @@ impl super::Nexus {
Ok(())
}

/// Idempotently un-assign a producer from an oximeter collector.
pub(crate) async fn unassign_producer(
&self,
id: &Uuid,
) -> Result<(), Error> {
if let Some(collector_id) =
self.db_datastore.producer_endpoint_delete(id).await?
{
debug!(
self.log,
"deleted metric producer assignment";
"producer_id" => %id,
"collector_id" => %collector_id,
);
let oximeter_info =
self.db_datastore.oximeter_lookup(&collector_id).await?;
let address =
SocketAddr::new(oximeter_info.ip.ip(), *oximeter_info.port);
let client = self.build_oximeter_client(&id, address);
if let Err(e) = client.producer_delete(&id).await {
error!(
self.log,
"failed to delete producer from collector";
"producer_id" => %id,
"collector_id" => %collector_id,
"address" => %address,
"error" => ?e,
);
return Err(Error::internal_error(
format!("failed to delete producer from collector: {e:?}")
.as_str(),
));
} else {
debug!(
self.log,
"successfully deleted producer from collector";
"producer_id" => %id,
"collector_id" => %collector_id,
"address" => %address,
);
Ok(())
}
} else {
trace!(
self.log,
"un-assigned non-existent metric producer";
"producer_id" => %id,
);
Ok(())
}
}

/// Returns a results from the timeseries DB based on the provided query
/// parameters.
///
Expand Down

0 comments on commit a8a49c3

Please sign in to comment.