diff --git a/clients/nexus-client/src/lib.rs b/clients/nexus-client/src/lib.rs index ad8269e675..0a1a569f42 100644 --- a/clients/nexus-client/src/lib.rs +++ b/clients/nexus-client/src/lib.rs @@ -382,3 +382,35 @@ impl From } } } + +impl From + for omicron_common::api::internal::nexus::ProducerKind +{ + fn from(kind: types::ProducerKind) -> Self { + use omicron_common::api::internal::nexus::ProducerKind; + match kind { + types::ProducerKind::SledAgent => ProducerKind::SledAgent, + types::ProducerKind::Instance => ProducerKind::Instance, + types::ProducerKind::Service => ProducerKind::Service, + } + } +} + +impl TryFrom + for omicron_common::api::internal::nexus::ProducerEndpoint +{ + type Error = String; + + fn try_from(ep: types::ProducerEndpoint) -> Result { + let Ok(address) = ep.address.parse() else { + return Err(format!("Invalid IP address: {}", ep.address)); + }; + Ok(Self { + id: ep.id, + kind: ep.kind.into(), + address, + base_route: ep.base_route, + interval: ep.interval.into(), + }) + } +} diff --git a/dev-tools/omdb/src/bin/omdb/oximeter.rs b/dev-tools/omdb/src/bin/omdb/oximeter.rs index e0f20556a2..29491bb083 100644 --- a/dev-tools/omdb/src/bin/omdb/oximeter.rs +++ b/dev-tools/omdb/src/bin/omdb/oximeter.rs @@ -67,6 +67,11 @@ impl OximeterArgs { .with(tabled::settings::Padding::new(0, 1, 0, 0)) .to_string(); println!("Collector ID: {}\n", info.id); + let last_refresh = info + .last_refresh + .map(|r| r.to_string()) + .unwrap_or(String::from("Never")); + println!("Last refresh: {}\n", last_refresh); println!("{table}"); Ok(()) } diff --git a/nexus/db-model/src/producer_endpoint.rs b/nexus/db-model/src/producer_endpoint.rs index 55533690f1..1a38781ce5 100644 --- a/nexus/db-model/src/producer_endpoint.rs +++ b/nexus/db-model/src/producer_endpoint.rs @@ -2,6 +2,9 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. +use std::net::SocketAddr; +use std::time::Duration; + use super::SqlU16; use crate::impl_enum_type; use crate::schema::metric_producer; @@ -44,6 +47,18 @@ impl From for internal::nexus::ProducerKind { } } +impl From for internal::nexus::ProducerEndpoint { + fn from(ep: ProducerEndpoint) -> Self { + internal::nexus::ProducerEndpoint { + id: ep.id(), + kind: ep.kind.into(), + address: SocketAddr::new(ep.ip.ip(), *ep.port), + base_route: ep.base_route.clone(), + interval: Duration::from_secs_f64(ep.interval), + } + } +} + /// Information announced by a metric server, used so that clients can contact it and collect /// available metric data from it. #[derive(Queryable, Insertable, Debug, Clone, Selectable, Asset)] diff --git a/nexus/db-queries/src/db/datastore/oximeter.rs b/nexus/db-queries/src/db/datastore/oximeter.rs index 116e8586b0..55e8e0f5f6 100644 --- a/nexus/db-queries/src/db/datastore/oximeter.rs +++ b/nexus/db-queries/src/db/datastore/oximeter.rs @@ -5,6 +5,7 @@ //! [`DataStore`] methods related to Oximeter. use super::DataStore; +use crate::context::OpContext; use crate::db; use crate::db::error::public_error_from_diesel; use crate::db::error::ErrorHandler; @@ -24,12 +25,13 @@ impl DataStore { /// Lookup an oximeter instance by its ID. pub async fn oximeter_lookup( &self, + opctx: &OpContext, id: &Uuid, ) -> Result { use db::schema::oximeter::dsl; dsl::oximeter .find(*id) - .first_async(&*self.pool_connection_unauthorized().await?) + .first_async(&*self.pool_connection_authorized(opctx).await?) .await .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) } @@ -37,6 +39,7 @@ impl DataStore { /// Create a record for a new Oximeter instance pub async fn oximeter_create( &self, + opctx: &OpContext, info: &OximeterInfo, ) -> Result<(), Error> { use db::schema::oximeter::dsl; @@ -54,7 +57,7 @@ impl DataStore { dsl::ip.eq(info.ip), dsl::port.eq(info.port), )) - .execute_async(&*self.pool_connection_unauthorized().await?) + .execute_async(&*self.pool_connection_authorized(opctx).await?) .await .map_err(|e| { public_error_from_diesel( @@ -71,12 +74,13 @@ impl DataStore { /// List the oximeter collector instances pub async fn oximeter_list( &self, + opctx: &OpContext, page_params: &DataPageParams<'_, Uuid>, ) -> ListResultVec { use db::schema::oximeter::dsl; paginated(dsl::oximeter, dsl::id, page_params) .load_async::( - &*self.pool_connection_unauthorized().await?, + &*self.pool_connection_authorized(opctx).await?, ) .await .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) @@ -85,6 +89,7 @@ impl DataStore { /// Create a record for a new producer endpoint pub async fn producer_endpoint_create( &self, + opctx: &OpContext, producer: &ProducerEndpoint, ) -> Result<(), Error> { use db::schema::metric_producer::dsl; @@ -102,7 +107,7 @@ impl DataStore { dsl::interval.eq(producer.interval), dsl::base_route.eq(producer.base_route.clone()), )) - .execute_async(&*self.pool_connection_unauthorized().await?) + .execute_async(&*self.pool_connection_authorized(opctx).await?) .await .map_err(|e| { public_error_from_diesel( @@ -123,13 +128,14 @@ impl DataStore { /// returned. If there was no record, `None` is returned. pub async fn producer_endpoint_delete( &self, + opctx: &OpContext, id: &Uuid, ) -> Result, Error> { use db::schema::metric_producer::dsl; diesel::delete(dsl::metric_producer.find(*id)) .returning(dsl::oximeter_id) .get_result_async::( - &*self.pool_connection_unauthorized().await?, + &*self.pool_connection_authorized(opctx).await?, ) .await .optional() @@ -139,6 +145,7 @@ impl DataStore { /// List the producer endpoint records by the oximeter instance to which they're assigned. pub async fn producers_list_by_oximeter_id( &self, + opctx: &OpContext, oximeter_id: Uuid, pagparams: &DataPageParams<'_, Uuid>, ) -> ListResultVec { @@ -147,7 +154,7 @@ impl DataStore { .filter(dsl::oximeter_id.eq(oximeter_id)) .order_by((dsl::oximeter_id, dsl::id)) .select(ProducerEndpoint::as_select()) - .load_async(&*self.pool_connection_unauthorized().await?) + .load_async(&*self.pool_connection_authorized(opctx).await?) .await .map_err(|e| { public_error_from_diesel( diff --git a/nexus/src/app/instance.rs b/nexus/src/app/instance.rs index 2300bd56f2..e29ed21192 100644 --- a/nexus/src/app/instance.rs +++ b/nexus/src/app/instance.rs @@ -1562,7 +1562,7 @@ impl super::Nexus { // an instance's state changes. // // Tracked in https://github.com/oxidecomputer/omicron/issues/3742. - self.unassign_producer(instance_id).await?; + self.unassign_producer(opctx, instance_id).await?; } // Write the new instance and VMM states back to CRDB. This needs to be diff --git a/nexus/src/app/oximeter.rs b/nexus/src/app/oximeter.rs index a168b35293..f178bffc8c 100644 --- a/nexus/src/app/oximeter.rs +++ b/nexus/src/app/oximeter.rs @@ -9,13 +9,12 @@ use crate::internal_api::params::OximeterInfo; use dropshot::PaginationParams; use internal_dns::resolver::{ResolveError, Resolver}; use internal_dns::ServiceName; +use nexus_db_queries::context::OpContext; use nexus_db_queries::db; -use nexus_db_queries::db::identity::Asset; use omicron_common::address::CLICKHOUSE_PORT; -use omicron_common::api::external::DataPageParams; use omicron_common::api::external::Error; -use omicron_common::api::external::PaginationOrder; -use omicron_common::api::internal::nexus; +use omicron_common::api::external::{DataPageParams, ListResultVec}; +use omicron_common::api::internal::nexus::{self, ProducerEndpoint}; use omicron_common::backoff; use oximeter_client::Client as OximeterClient; use oximeter_db::query::Timestamp; @@ -73,77 +72,34 @@ impl super::Nexus { /// Insert a new record of an Oximeter collector server. pub(crate) async fn upsert_oximeter_collector( &self, + opctx: &OpContext, oximeter_info: &OximeterInfo, ) -> Result<(), Error> { // Insert the Oximeter instance into the DB. Note that this _updates_ the record, // specifically, the time_modified, ip, and port columns, if the instance has already been // registered. let db_info = db::model::OximeterInfo::new(&oximeter_info); - self.db_datastore.oximeter_create(&db_info).await?; + self.db_datastore.oximeter_create(opctx, &db_info).await?; info!( self.log, "registered new oximeter metric collection server"; "collector_id" => ?oximeter_info.collector_id, "address" => oximeter_info.address, ); + Ok(()) + } - // Regardless, notify the collector of any assigned metric producers. - // - // This should be empty if this Oximeter collector is registering for - // the first time, but may not be if the service is re-registering after - // failure. - let client = self.build_oximeter_client( - &oximeter_info.collector_id, - oximeter_info.address, - ); - let mut last_producer_id = None; - loop { - let pagparams = DataPageParams { - marker: last_producer_id.as_ref(), - direction: PaginationOrder::Ascending, - limit: std::num::NonZeroU32::new(100).unwrap(), - }; - let producers = self - .db_datastore - .producers_list_by_oximeter_id( - oximeter_info.collector_id, - &pagparams, - ) - .await?; - if producers.is_empty() { - return Ok(()); - } - debug!( - self.log, - "re-assigning existing metric producers to a collector"; - "n_producers" => producers.len(), - "collector_id" => ?oximeter_info.collector_id, - ); - // Be sure to continue paginating from the last producer. - // - // Safety: We check just above if the list is empty, so there is a - // last element. - last_producer_id.replace(producers.last().unwrap().id()); - for producer in producers.into_iter() { - let producer_info = oximeter_client::types::ProducerEndpoint { - id: producer.id(), - kind: nexus::ProducerKind::from(producer.kind).into(), - address: SocketAddr::new( - producer.ip.ip(), - producer.port.try_into().unwrap(), - ) - .to_string(), - base_route: producer.base_route, - interval: oximeter_client::types::Duration::from( - Duration::from_secs_f64(producer.interval), - ), - }; - client - .producers_post(&producer_info) - .await - .map_err(Error::from)?; - } - } + /// List the producers assigned to an oximeter collector. + pub(crate) async fn list_assigned_producers( + &self, + opctx: &OpContext, + collector_id: Uuid, + pagparams: &DataPageParams<'_, Uuid>, + ) -> ListResultVec { + self.db_datastore + .producers_list_by_oximeter_id(opctx, collector_id, pagparams) + .await + .map(|list| list.into_iter().map(ProducerEndpoint::from).collect()) } /// Register as a metric producer with the oximeter metric collection server. @@ -179,11 +135,12 @@ impl super::Nexus { /// Assign a newly-registered metric producer to an oximeter collector server. pub(crate) async fn assign_producer( &self, + opctx: &OpContext, producer_info: nexus::ProducerEndpoint, ) -> Result<(), Error> { - let (collector, id) = self.next_collector().await?; + let (collector, id) = self.next_collector(opctx).await?; let db_info = db::model::ProducerEndpoint::new(&producer_info, id); - self.db_datastore.producer_endpoint_create(&db_info).await?; + self.db_datastore.producer_endpoint_create(opctx, &db_info).await?; collector .producers_post(&oximeter_client::types::ProducerEndpoint::from( &producer_info, @@ -202,10 +159,11 @@ impl super::Nexus { /// Idempotently un-assign a producer from an oximeter collector. pub(crate) async fn unassign_producer( &self, + opctx: &OpContext, id: &Uuid, ) -> Result<(), Error> { if let Some(collector_id) = - self.db_datastore.producer_endpoint_delete(id).await? + self.db_datastore.producer_endpoint_delete(opctx, id).await? { debug!( self.log, @@ -214,7 +172,7 @@ impl super::Nexus { "collector_id" => %collector_id, ); let oximeter_info = - self.db_datastore.oximeter_lookup(&collector_id).await?; + self.db_datastore.oximeter_lookup(opctx, &collector_id).await?; let address = SocketAddr::new(oximeter_info.ip.ip(), *oximeter_info.port); let client = self.build_oximeter_client(&id, address); @@ -380,14 +338,17 @@ impl super::Nexus { } // Return an oximeter collector to assign a newly-registered producer - async fn next_collector(&self) -> Result<(OximeterClient, Uuid), Error> { + async fn next_collector( + &self, + opctx: &OpContext, + ) -> Result<(OximeterClient, Uuid), Error> { // TODO-robustness Replace with a real load-balancing strategy. let page_params = DataPageParams { marker: None, direction: dropshot::PaginationOrder::Ascending, limit: std::num::NonZeroU32::new(1).unwrap(), }; - let oxs = self.db_datastore.oximeter_list(&page_params).await?; + let oxs = self.db_datastore.oximeter_list(opctx, &page_params).await?; let info = oxs.first().ok_or_else(|| Error::ServiceUnavailable { internal_message: String::from("no oximeter collectors available"), })?; diff --git a/nexus/src/internal_api/http_entrypoints.rs b/nexus/src/internal_api/http_entrypoints.rs index 0676ace70c..3758b5289b 100644 --- a/nexus/src/internal_api/http_entrypoints.rs +++ b/nexus/src/internal_api/http_entrypoints.rs @@ -83,6 +83,7 @@ pub(crate) fn internal_api() -> NexusApiDescription { api.register(cpapi_volume_remove_read_only_parent)?; api.register(cpapi_disk_remove_read_only_parent)?; api.register(cpapi_producers_post)?; + api.register(cpapi_assigned_producers_list)?; api.register(cpapi_collectors_post)?; api.register(cpapi_metrics_collect)?; api.register(cpapi_artifact_download)?; @@ -454,10 +455,12 @@ async fn cpapi_producers_post( producer_info: TypedBody, ) -> Result { let context = request_context.context(); - let nexus = &context.nexus; - let producer_info = producer_info.into_inner(); let handler = async { - nexus.assign_producer(producer_info).await?; + let nexus = &context.nexus; + let producer_info = producer_info.into_inner(); + let opctx = + crate::context::op_context_for_internal_api(&request_context).await; + nexus.assign_producer(&opctx, producer_info).await?; Ok(HttpResponseUpdatedNoContent()) }; context @@ -466,6 +469,52 @@ async fn cpapi_producers_post( .await } +#[derive( + Clone, + Copy, + Debug, + serde::Deserialize, + schemars::JsonSchema, + serde::Serialize, +)] +pub struct CollectorIdPathParams { + /// The ID of the oximeter collector. + pub collector_id: Uuid, +} + +/// List all metric producers assigned to an oximeter collector. +#[endpoint { + method = GET, + path = "/metrics/collectors/{collector_id}/producers", + }] +async fn cpapi_assigned_producers_list( + request_context: RequestContext>, + path_params: Path, + query_params: Query, +) -> Result>, HttpError> { + let context = request_context.context(); + let handler = async { + let nexus = &context.nexus; + let collector_id = path_params.into_inner().collector_id; + let query = query_params.into_inner(); + let pagparams = data_page_params_for(&request_context, &query)?; + let opctx = + crate::context::op_context_for_internal_api(&request_context).await; + let producers = nexus + .list_assigned_producers(&opctx, collector_id, &pagparams) + .await?; + Ok(HttpResponseOk(ScanById::results_page( + &query, + producers, + &|_, producer: &ProducerEndpoint| producer.id, + )?)) + }; + context + .internal_latencies + .instrument_dropshot_handler(&request_context, handler) + .await +} + /// Accept a notification of a new oximeter collection server. #[endpoint { method = POST, @@ -476,10 +525,12 @@ async fn cpapi_collectors_post( oximeter_info: TypedBody, ) -> Result { let context = request_context.context(); - let nexus = &context.nexus; - let oximeter_info = oximeter_info.into_inner(); let handler = async { - nexus.upsert_oximeter_collector(&oximeter_info).await?; + let nexus = &context.nexus; + let oximeter_info = oximeter_info.into_inner(); + let opctx = + crate::context::op_context_for_internal_api(&request_context).await; + nexus.upsert_oximeter_collector(&opctx, &oximeter_info).await?; Ok(HttpResponseUpdatedNoContent()) }; context diff --git a/nexus/test-utils/src/lib.rs b/nexus/test-utils/src/lib.rs index cc9c8c43df..e5616a4641 100644 --- a/nexus/test-utils/src/lib.rs +++ b/nexus/test-utils/src/lib.rs @@ -1411,6 +1411,7 @@ pub async fn start_oximeter( let config = oximeter_collector::Config { nexus_address: Some(nexus_address), db, + refresh_interval: oximeter_collector::default_refresh_interval(), log: ConfigLogging::StderrTerminal { level: ConfigLoggingLevel::Error }, }; let args = oximeter_collector::OximeterArguments { diff --git a/nexus/tests/integration_tests/oximeter.rs b/nexus/tests/integration_tests/oximeter.rs index 9663e10fa0..20e098ec08 100644 --- a/nexus/tests/integration_tests/oximeter.rs +++ b/nexus/tests/integration_tests/oximeter.rs @@ -4,18 +4,11 @@ //! Integration tests for oximeter collectors and producers. -use dropshot::Method; -use http::StatusCode; use nexus_test_interface::NexusServer; use nexus_test_utils_macros::nexus_test; -use omicron_common::api::internal::nexus::ProducerEndpoint; -use omicron_common::api::internal::nexus::ProducerKind; use omicron_test_utils::dev::poll::{wait_for_condition, CondCheckError}; use oximeter_db::DbWrite; -use std::collections::BTreeSet; use std::net; -use std::net::Ipv6Addr; -use std::net::SocketAddr; use std::time::Duration; use uuid::Uuid; @@ -339,88 +332,3 @@ async fn test_oximeter_reregistration() { ); context.teardown().await; } - -// A regression test for https://github.com/oxidecomputer/omicron/issues/4498 -#[tokio::test] -async fn test_oximeter_collector_reregistration_gets_all_assignments() { - let mut context = nexus_test_utils::test_setup::( - "test_oximeter_collector_reregistration_gets_all_assignments", - ) - .await; - let oximeter_id = nexus_test_utils::OXIMETER_UUID.parse().unwrap(); - - // Create a bunch of producer records. - // - // Note that the actual count is arbitrary, but it should be larger than the - // internal pagination limit used in `Nexus::upsert_oximeter_collector()`, - // which is currently 100. - const N_PRODUCERS: usize = 150; - let mut ids = BTreeSet::new(); - for _ in 0..N_PRODUCERS { - let id = Uuid::new_v4(); - ids.insert(id); - let info = ProducerEndpoint { - id, - kind: ProducerKind::Service, - address: SocketAddr::new(Ipv6Addr::LOCALHOST.into(), 12345), - base_route: String::from("/collect"), - interval: Duration::from_secs(1), - }; - context - .internal_client - .make_request( - Method::POST, - "/metrics/producers", - Some(&info), - StatusCode::NO_CONTENT, - ) - .await - .expect("failed to register test producer"); - } - - // Check that `oximeter` has these registered. - let producers = - context.oximeter.list_producers(None, N_PRODUCERS * 2).await; - let actual_ids: BTreeSet<_> = - producers.iter().map(|info| info.id).collect(); - - // There is an additional producer that's created as part of the normal test - // setup, so we'll check that all of the new producers exist, and that - // there's exactly 1 additional one. - assert!( - ids.is_subset(&actual_ids), - "oximeter did not get the right set of producers" - ); - assert_eq!( - ids.len(), - actual_ids.len() - 1, - "oximeter did not get the right set of producers" - ); - - // Drop and restart oximeter, which should result in the exact same set of - // producers again. - drop(context.oximeter); - context.oximeter = nexus_test_utils::start_oximeter( - context.logctx.log.new(o!("component" => "oximeter")), - context.server.get_http_server_internal_address().await, - context.clickhouse.port(), - oximeter_id, - ) - .await - .expect("failed to restart oximeter"); - - let producers = - context.oximeter.list_producers(None, N_PRODUCERS * 2).await; - let actual_ids: BTreeSet<_> = - producers.iter().map(|info| info.id).collect(); - assert!( - ids.is_subset(&actual_ids), - "oximeter did not get the right set of producers after re-registering" - ); - assert_eq!( - ids.len(), - actual_ids.len() - 1, - "oximeter did not get the right set of producers after re-registering" - ); - context.teardown().await; -} diff --git a/openapi/nexus-internal.json b/openapi/nexus-internal.json index cba8063b7e..db3199833e 100644 --- a/openapi/nexus-internal.json +++ b/openapi/nexus-internal.json @@ -775,6 +775,72 @@ } } }, + "/metrics/collectors/{collector_id}/producers": { + "get": { + "summary": "List all metric producers assigned to an oximeter collector.", + "operationId": "cpapi_assigned_producers_list", + "parameters": [ + { + "in": "path", + "name": "collector_id", + "description": "The ID of the oximeter collector.", + "required": true, + "schema": { + "type": "string", + "format": "uuid" + } + }, + { + "in": "query", + "name": "limit", + "description": "Maximum number of items returned by a single call", + "schema": { + "nullable": true, + "type": "integer", + "format": "uint32", + "minimum": 1 + } + }, + { + "in": "query", + "name": "page_token", + "description": "Token returned by previous call to retrieve the subsequent page", + "schema": { + "nullable": true, + "type": "string" + } + }, + { + "in": "query", + "name": "sort_by", + "schema": { + "$ref": "#/components/schemas/IdSortMode" + } + } + ], + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProducerEndpointResultsPage" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + }, + "x-dropshot-pagination": { + "required": [] + } + } + }, "/metrics/producers": { "post": { "summary": "Accept a registration from a new metric producer", @@ -6171,6 +6237,27 @@ "kind" ] }, + "ProducerEndpointResultsPage": { + "description": "A single page of results", + "type": "object", + "properties": { + "items": { + "description": "list of items on this page of results", + "type": "array", + "items": { + "$ref": "#/components/schemas/ProducerEndpoint" + } + }, + "next_page": { + "nullable": true, + "description": "token used to fetch the next page of results (if any)", + "type": "string" + } + }, + "required": [ + "items" + ] + }, "ProducerKind": { "description": "The kind of metric producer this is.", "oneOf": [ diff --git a/openapi/oximeter.json b/openapi/oximeter.json index f5c78d53cd..4c609474ca 100644 --- a/openapi/oximeter.json +++ b/openapi/oximeter.json @@ -142,6 +142,12 @@ "description": "The collector's UUID.", "type": "string", "format": "uuid" + }, + "last_refresh": { + "nullable": true, + "description": "Last time we refreshed our producer list with Nexus.", + "type": "string", + "format": "date-time" } }, "required": [ diff --git a/oximeter/collector/Cargo.toml b/oximeter/collector/Cargo.toml index 92c91ca101..b7dac716c6 100644 --- a/oximeter/collector/Cargo.toml +++ b/oximeter/collector/Cargo.toml @@ -13,7 +13,6 @@ clap.workspace = true dropshot.workspace = true futures.workspace = true internal-dns.workspace = true -nexus-client.workspace = true nexus-types.workspace = true omicron-common.workspace = true oximeter.workspace = true @@ -33,6 +32,7 @@ tokio.workspace = true toml.workspace = true uuid.workspace = true omicron-workspace-hack.workspace = true +nexus-client.workspace = true [dev-dependencies] expectorate.workspace = true diff --git a/oximeter/collector/src/agent.rs b/oximeter/collector/src/agent.rs index 8fff44bb2d..33146b3579 100644 --- a/oximeter/collector/src/agent.rs +++ b/oximeter/collector/src/agent.rs @@ -11,9 +11,16 @@ use crate::DbConfig; use crate::Error; use crate::ProducerEndpoint; use anyhow::anyhow; +use chrono::DateTime; +use chrono::Utc; +use futures::TryStreamExt; use internal_dns::resolver::Resolver; use internal_dns::ServiceName; +use nexus_client::types::IdSortMode; use omicron_common::address::CLICKHOUSE_PORT; +use omicron_common::address::NEXUS_INTERNAL_PORT; +use omicron_common::backoff; +use omicron_common::backoff::BackoffError; use oximeter::types::ProducerResults; use oximeter::types::ProducerResultsItem; use oximeter_db::Client; @@ -29,12 +36,15 @@ use std::collections::btree_map::Entry; use std::collections::BTreeMap; use std::net::SocketAddr; use std::net::SocketAddrV6; +use std::num::NonZeroU32; use std::ops::Bound; use std::sync::Arc; +use std::sync::Mutex as StdMutex; use std::time::Duration; use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio::sync::Mutex; +use tokio::sync::MutexGuard; use tokio::task::JoinHandle; use tokio::time::interval; use uuid::Uuid; @@ -343,7 +353,7 @@ async fn results_sink( } /// The internal agent the oximeter server uses to collect metrics from producers. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct OximeterAgent { /// The collector ID for this agent pub id: Uuid, @@ -355,6 +365,12 @@ pub struct OximeterAgent { // The actual tokio tasks running the collection on a timer. collection_tasks: Arc>>, + // The interval on which we refresh our list of producers from Nexus + refresh_interval: Duration, + // Handle to the task used to periodically refresh the list of producers. + refresh_task: Arc>>>, + /// The last time we've refreshed our list of producers from Nexus. + pub last_refresh_time: Arc>>>, } impl OximeterAgent { @@ -362,6 +378,7 @@ impl OximeterAgent { pub async fn with_id( id: Uuid, address: SocketAddrV6, + refresh_interval: Duration, db_config: DbConfig, resolver: &Resolver, log: &Logger, @@ -435,13 +452,30 @@ impl OximeterAgent { ) .await }); - Ok(Self { + + let self_ = Self { id, log, collection_target, result_sender, collection_tasks: Arc::new(Mutex::new(BTreeMap::new())), - }) + refresh_interval, + refresh_task: Arc::new(StdMutex::new(None)), + last_refresh_time: Arc::new(StdMutex::new(None)), + }; + + Ok(self_) + } + + /// Ensure the backgrouund task that polls Nexus periodically for our list of + /// assigned producers is running. + pub(crate) fn ensure_producer_refresh_task(&self, resolver: Resolver) { + let mut task = self.refresh_task.lock().unwrap(); + if task.is_none() { + let refresh_task = + tokio::spawn(refresh_producer_list(self.clone(), resolver)); + *task = Some(refresh_task); + } } /// Construct a new standalone `oximeter` collector. @@ -455,6 +489,7 @@ impl OximeterAgent { pub async fn new_standalone( id: Uuid, address: SocketAddrV6, + refresh_interval: Duration, db_config: Option, log: &Logger, ) -> Result { @@ -503,12 +538,21 @@ impl OximeterAgent { collector_ip: (*address.ip()).into(), collector_port: address.port(), }; + + // We don't spawn the task to periodically refresh producers when run + // in standalone mode. We can just pretend we registered once, and + // that's it. + let last_refresh_time = Arc::new(StdMutex::new(Some(Utc::now()))); + Ok(Self { id, log, collection_target, result_sender, collection_tasks: Arc::new(Mutex::new(BTreeMap::new())), + refresh_interval, + refresh_task: Arc::new(StdMutex::new(None)), + last_refresh_time, }) } @@ -517,8 +561,23 @@ impl OximeterAgent { &self, info: ProducerEndpoint, ) -> Result<(), Error> { + let mut tasks = self.collection_tasks.lock().await; + self.register_producer_locked(&mut tasks, info).await; + Ok(()) + } + + // Internal implementation that registers a producer, assuming the lock on + // the map is held. + async fn register_producer_locked( + &self, + tasks: &mut MutexGuard< + '_, + BTreeMap, + >, + info: ProducerEndpoint, + ) { let id = info.id; - match self.collection_tasks.lock().await.entry(id) { + match tasks.entry(id) { Entry::Vacant(value) => { debug!( self.log, @@ -557,7 +616,6 @@ impl OximeterAgent { .unwrap(); } } - Ok(()) } /// Forces a collection from all producers. @@ -607,12 +665,22 @@ impl OximeterAgent { /// Delete a producer by ID, stopping its collection task. pub async fn delete_producer(&self, id: Uuid) -> Result<(), Error> { - let (_info, task) = self - .collection_tasks - .lock() - .await - .remove(&id) - .ok_or_else(|| Error::NoSuchProducer(id))?; + let mut tasks = self.collection_tasks.lock().await; + self.delete_producer_locked(&mut tasks, id).await + } + + // Internal implementation that deletes a producer, assuming the lock on + // the map is held. + async fn delete_producer_locked( + &self, + tasks: &mut MutexGuard< + '_, + BTreeMap, + >, + id: Uuid, + ) -> Result<(), Error> { + let (_info, task) = + tasks.remove(&id).ok_or_else(|| Error::NoSuchProducer(id))?; debug!( self.log, "removed collection task from set"; @@ -633,6 +701,139 @@ impl OximeterAgent { } Ok(()) } + + // Ensure that exactly the set of producers is registered with `self`. + // + // Errors logged, but not returned, and an attempt to register all producers + // is made, even if an error is encountered part-way through. + // + // This returns the number of pruned tasks. + async fn ensure_producers( + &self, + expected_producers: BTreeMap, + ) -> usize { + let mut tasks = self.collection_tasks.lock().await; + + // First prune unwanted collection tasks. + // + // This is set of all producers that we currently have, which are not in + // the new list from Nexus. + let ids_to_prune: Vec<_> = tasks + .keys() + .filter(|id| !expected_producers.contains_key(id)) + .copied() + .collect(); + let n_pruned = ids_to_prune.len(); + for id in ids_to_prune.into_iter() { + // This method only returns an error if the provided ID does not + // exist in the current tasks. That is impossible, because we hold + // the lock, and we've just computed this as the set that _is_ in + // the map, and not in the new set from Nexus. + self.delete_producer_locked(&mut tasks, id).await.unwrap(); + } + + // And then ensure everything in the list. + // + // This will insert new tasks, and update any that we already know + // about. + for info in expected_producers.into_values() { + self.register_producer_locked(&mut tasks, info).await; + } + n_pruned + } +} + +// A task which periodically updates our list of producers from Nexus. +async fn refresh_producer_list(agent: OximeterAgent, resolver: Resolver) { + let mut interval = tokio::time::interval(agent.refresh_interval); + let page_size = Some(NonZeroU32::new(100).unwrap()); + loop { + interval.tick().await; + info!(agent.log, "refreshing list of producers from Nexus"); + let nexus_addr = + resolve_nexus_with_backoff(&agent.log, &resolver).await; + let url = format!("http://{}", nexus_addr); + let client = nexus_client::Client::new(&url, agent.log.clone()); + let mut stream = client.cpapi_assigned_producers_list_stream( + &agent.id, + page_size, + Some(IdSortMode::IdAscending), + ); + let mut expected_producers = BTreeMap::new(); + loop { + match stream.try_next().await { + Err(e) => { + error!( + agent.log, + "error fetching next assigned producer"; + "err" => ?e, + ); + } + Ok(Some(p)) => { + let endpoint = match ProducerEndpoint::try_from(p) { + Ok(ep) => ep, + Err(e) => { + error!( + agent.log, + "failed to convert producer description \ + from Nexus, skipping producer"; + "err" => e + ); + continue; + } + }; + let old = expected_producers.insert(endpoint.id, endpoint); + if let Some(ProducerEndpoint { id, .. }) = old { + error!( + agent.log, + "Nexus appears to have sent duplicate producer info"; + "producer_id" => %id, + ); + } + } + Ok(None) => break, + } + } + let n_current_tasks = expected_producers.len(); + let n_pruned_tasks = agent.ensure_producers(expected_producers).await; + *agent.last_refresh_time.lock().unwrap() = Some(Utc::now()); + info!( + agent.log, + "refreshed list of producers from Nexus"; + "n_pruned_tasks" => n_pruned_tasks, + "n_current_tasks" => n_current_tasks, + ); + } +} + +async fn resolve_nexus_with_backoff( + log: &Logger, + resolver: &Resolver, +) -> SocketAddr { + let log_failure = |error, delay| { + warn!( + log, + "failed to lookup Nexus IP, will retry"; + "delay" => ?delay, + "error" => ?error, + ); + }; + let do_lookup = || async { + resolver + .lookup_ipv6(ServiceName::Nexus) + .await + .map_err(|e| BackoffError::transient(e.to_string())) + .map(|ip| { + SocketAddr::V6(SocketAddrV6::new(ip, NEXUS_INTERNAL_PORT, 0, 0)) + }) + }; + backoff::retry_notify( + backoff::retry_policy_internal_service(), + do_lookup, + log_failure, + ) + .await + .expect("Expected infinite retry loop resolving Nexus address") } #[cfg(test)] @@ -696,6 +897,7 @@ mod tests { let collector = OximeterAgent::new_standalone( Uuid::new_v4(), SocketAddrV6::new(Ipv6Addr::LOCALHOST, 0, 0, 0), + crate::default_refresh_interval(), None, log, ) @@ -772,6 +974,7 @@ mod tests { let collector = OximeterAgent::new_standalone( Uuid::new_v4(), SocketAddrV6::new(Ipv6Addr::LOCALHOST, 0, 0, 0), + crate::default_refresh_interval(), None, log, ) @@ -842,6 +1045,7 @@ mod tests { let collector = OximeterAgent::new_standalone( Uuid::new_v4(), SocketAddrV6::new(Ipv6Addr::LOCALHOST, 0, 0, 0), + crate::default_refresh_interval(), None, log, ) diff --git a/oximeter/collector/src/http_entrypoints.rs b/oximeter/collector/src/http_entrypoints.rs index 493083a40d..e876ed047d 100644 --- a/oximeter/collector/src/http_entrypoints.rs +++ b/oximeter/collector/src/http_entrypoints.rs @@ -7,6 +7,8 @@ // Copyright 2023 Oxide Computer Company use crate::OximeterAgent; +use chrono::DateTime; +use chrono::Utc; use dropshot::endpoint; use dropshot::ApiDescription; use dropshot::EmptyScanParams; @@ -117,6 +119,8 @@ async fn producer_delete( pub struct CollectorInfo { /// The collector's UUID. pub id: Uuid, + /// Last time we refreshed our producer list with Nexus. + pub last_refresh: Option>, } // Return identifying information about this collector @@ -128,6 +132,8 @@ async fn collector_info( request_context: RequestContext>, ) -> Result, HttpError> { let agent = request_context.context(); - let info = CollectorInfo { id: agent.id }; + let id = agent.id; + let last_refresh = *agent.last_refresh_time.lock().unwrap(); + let info = CollectorInfo { id, last_refresh }; Ok(HttpResponseOk(info)) } diff --git a/oximeter/collector/src/lib.rs b/oximeter/collector/src/lib.rs index f3c793d5c2..596c0dc785 100644 --- a/oximeter/collector/src/lib.rs +++ b/oximeter/collector/src/lib.rs @@ -31,6 +31,7 @@ use std::net::SocketAddr; use std::net::SocketAddrV6; use std::path::Path; use std::sync::Arc; +use std::time::Duration; use thiserror::Error; use uuid::Uuid; @@ -114,6 +115,11 @@ impl DbConfig { } } +/// Default interval on which we refresh our list of producers from Nexus. +pub const fn default_refresh_interval() -> Duration { + Duration::from_secs(60 * 10) +} + /// Configuration used to initialize an oximeter server #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Config { @@ -123,6 +129,11 @@ pub struct Config { #[serde(default, skip_serializing_if = "Option::is_none")] pub nexus_address: Option, + /// The interval on which we periodically refresh our list of producers from + /// Nexus. + #[serde(default = "default_refresh_interval")] + pub refresh_interval: Duration, + /// Configuration for working with ClickHouse pub db: DbConfig, @@ -202,6 +213,7 @@ impl Oximeter { OximeterAgent::with_id( args.id, args.address, + config.refresh_interval, config.db, &resolver, &log, @@ -239,7 +251,10 @@ impl Oximeter { .start(); // Notify Nexus that this oximeter instance is available. - let client = reqwest::Client::new(); + let our_info = nexus_client::types::OximeterInfo { + address: server.local_addr().to_string(), + collector_id: agent.id, + }; let notify_nexus = || async { debug!(log, "contacting nexus"); let nexus_address = if let Some(address) = config.nexus_address { @@ -254,18 +269,25 @@ impl Oximeter { 0, )) }; - - client - .post(format!("http://{}/metrics/collectors", nexus_address,)) - .json(&nexus_client::types::OximeterInfo { - address: server.local_addr().to_string(), - collector_id: agent.id, - }) - .send() - .await - .map_err(|e| backoff::BackoffError::transient(e.to_string()))? - .error_for_status() - .map_err(|e| backoff::BackoffError::transient(e.to_string())) + let client = nexus_client::Client::new( + &format!("http://{nexus_address}"), + log.clone(), + ); + client.cpapi_collectors_post(&our_info).await.map_err(|e| { + match &e { + // Failures to reach nexus, or server errors on its side + // are retryable. Everything else is permanent. + nexus_client::Error::CommunicationError(_) => { + backoff::BackoffError::transient(e.to_string()) + } + nexus_client::Error::ErrorResponse(inner) + if inner.status().is_server_error() => + { + backoff::BackoffError::transient(e.to_string()) + } + _ => backoff::BackoffError::permanent(e.to_string()), + } + }) }; let log_notification_failure = |error, delay| { warn!( @@ -282,6 +304,10 @@ impl Oximeter { .await .expect("Expected an infinite retry loop contacting Nexus"); + // Now that we've successfully registered, we'll start periodically + // polling for our list of producers from Nexus. + agent.ensure_producer_refresh_task(resolver); + info!(log, "oximeter registered with nexus"; "id" => ?agent.id); Ok(Self { agent, server }) } @@ -298,6 +324,7 @@ impl Oximeter { OximeterAgent::new_standalone( args.id, args.address, + crate::default_refresh_interval(), db_config, &log, )