Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
bnaecker committed Sep 18, 2023
1 parent c40de05 commit 21602a8
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 125 deletions.
24 changes: 14 additions & 10 deletions oximeter/collector/src/bin/oximeter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ use omicron_common::cmd::CmdError;
use oximeter_collector::oximeter_api;
use oximeter_collector::standalone_nexus_api;
use oximeter_collector::Config;
use oximeter_collector::Error;
use oximeter_collector::Oximeter;
use oximeter_collector::OximeterArguments;
use oximeter_collector::StandaloneNexus;
use std::net::Ipv6Addr;
use std::net::SocketAddr;
use std::net::SocketAddrV6;
use std::path::PathBuf;
use uuid::Uuid;
use oximeter_collector::StandaloneNexus;
use oximeter_collector::Error;

pub fn run_openapi() -> Result<(), String> {
oximeter_api()
Expand Down Expand Up @@ -136,26 +136,30 @@ async fn do_run() -> Result<(), CmdError> {
Args::Standalone { id, address, nexus, clickhouse } => {
// Start the standalone Nexus server, for registration of both the
// collector and producers.
let nexus_server = run_standalone_nexus(nexus).await.map_err(|e| CmdError::Failure(e.to_string()))?;
let nexus_server = run_standalone_nexus(nexus)
.await
.map_err(|e| CmdError::Failure(e.to_string()))?;
let args = OximeterArguments { id, address };
Oximeter::new_standalone(
nexus_server.log(),
&args,
nexus_server.local_addr(),
clickhouse
clickhouse,
)
.await
.unwrap()
.serve_forever()
.await
.map_err(|e| CmdError::Failure(e.to_string()))
.await
.unwrap()
.serve_forever()
.await
.map_err(|e| CmdError::Failure(e.to_string()))
}
Args::StandaloneOpenapi => {
run_standalone_openapi().map_err(CmdError::Failure)
}
}
}

async fn run_standalone_nexus(addr: SocketAddrV6) -> Result<StandaloneNexus, Error> {
async fn run_standalone_nexus(
addr: SocketAddrV6,
) -> Result<StandaloneNexus, Error> {
StandaloneNexus::new(addr.into())
}
29 changes: 19 additions & 10 deletions oximeter/collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,25 @@
// Copyright 2023 Oxide Computer Company

use std::ops::Bound;
use anyhow::anyhow;
use anyhow::Context;
use dropshot::endpoint;
use dropshot::ApiDescription;
use dropshot::ConfigDropshot;
use dropshot::ConfigLogging;
use dropshot::EmptyScanParams;
use dropshot::HttpError;
use dropshot::HttpResponseDeleted;
use dropshot::HttpResponseOk;
use dropshot::HttpResponseUpdatedNoContent;
use dropshot::HttpServer;
use dropshot::HttpServerStarter;
use dropshot::RequestContext;
use dropshot::TypedBody;
use dropshot::PaginationParams;
use dropshot::ResultsPage;
use dropshot::Query;
use dropshot::RequestContext;
use dropshot::ResultsPage;
use dropshot::TypedBody;
use dropshot::WhichPage;
use dropshot::EmptyScanParams;
use internal_dns::resolver::ResolveError;
use internal_dns::resolver::Resolver;
use internal_dns::ServiceName;
Expand All @@ -52,6 +51,7 @@ use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::net::SocketAddr;
use std::net::SocketAddrV6;
use std::ops::Bound;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -64,8 +64,8 @@ use tokio::time::interval;
use uuid::Uuid;

mod standalone;
pub use standalone::Server as StandaloneNexus;
pub use standalone::standalone_nexus_api;
pub use standalone::Server as StandaloneNexus;

/// Errors collecting metric data
#[derive(Debug, Error)]
Expand Down Expand Up @@ -608,7 +608,11 @@ impl OximeterAgent {
}

/// List existing producers.
pub async fn list_producers(&self, start_id: Option<Uuid>, limit: usize) -> Vec<ProducerEndpoint> {
pub async fn list_producers(
&self,
start_id: Option<Uuid>,
limit: usize,
) -> Vec<ProducerEndpoint> {
let start = if let Some(id) = start_id {
Bound::Excluded(id)
} else {
Expand Down Expand Up @@ -894,7 +898,11 @@ impl Oximeter {
}

/// List producers.
pub async fn list_producers(&self, start: Option<Uuid>, limit: usize) -> Vec<ProducerEndpoint> {
pub async fn list_producers(
&self,
start: Option<Uuid>,
limit: usize,
) -> Vec<ProducerEndpoint> {
self.agent.list_producers(start, limit).await
}

Expand Down Expand Up @@ -949,7 +957,7 @@ struct ProducerPage {
}]
async fn producers_list(
request_context: RequestContext<Arc<OximeterAgent>>,
query: Query<PaginationParams<EmptyScanParams, ProducerPage>>
query: Query<PaginationParams<EmptyScanParams, ProducerPage>>,
) -> Result<HttpResponseOk<ResultsPage<ProducerEndpoint>>, HttpError> {
let agent = request_context.context();
let pagination = query.into_inner();
Expand All @@ -963,7 +971,8 @@ async fn producers_list(
producers,
&EmptyScanParams {},
|info: &ProducerEndpoint, _| ProducerPage { id: info.id },
).map(HttpResponseOk)
)
.map(HttpResponseOk)
}

#[derive(Clone, Copy, Debug, Deserialize, schemars::JsonSchema, Serialize)]
Expand Down
108 changes: 54 additions & 54 deletions oximeter/collector/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,31 @@
// Copyright 2023 Oxide Computer Company

use crate::Error;
use dropshot::endpoint;
use tokio::sync::Mutex;
use std::sync::Arc;
use std::collections::HashMap;
use dropshot::ApiDescription;
use dropshot::RequestContext;
use dropshot::TypedBody;
use dropshot::ConfigDropshot;
use dropshot::HttpError;
use dropshot::HttpResponseUpdatedNoContent;
use omicron_common::api::internal::nexus::ProducerEndpoint;
use dropshot::HttpServer;
use dropshot::HttpServerStarter;
use dropshot::RequestContext;
use dropshot::TypedBody;
use nexus_types::internal_api::params::OximeterInfo;
use omicron_common::api::internal::nexus::ProducerEndpoint;
use omicron_common::FileKv;
use oximeter_client::Client;
use rand::seq::IteratorRandom;
use uuid::Uuid;
use slog::Logger;
use slog::Drain;
use slog::o;
use slog::debug;
use slog::error;
use crate::Error;
use dropshot::HttpServer;
use slog::o;
use slog::Drain;
use slog::Logger;
use std::collections::HashMap;
use std::net::SocketAddr;
use dropshot::HttpServerStarter;
use dropshot::ConfigDropshot;
use omicron_common::FileKv;
use std::sync::Arc;
use tokio::sync::Mutex;
use uuid::Uuid;

#[derive(Debug)]
struct ProducerAssignment {
Expand All @@ -47,8 +47,7 @@ struct Inner {

impl Inner {
fn random_collector(&self) -> Option<(Uuid, OximeterInfo)> {
self
.collectors
self.collectors
.iter()
.choose(&mut rand::thread_rng())
.map(|(id, info)| (*id, info.clone()))
Expand All @@ -68,33 +67,38 @@ impl StandaloneNexus {
inner: Mutex::new(Inner {
producers: HashMap::new(),
collectors: HashMap::new(),
})
}),
}
}

async fn register_producer(&self, info: &ProducerEndpoint) -> Result<(), HttpError> {
async fn register_producer(
&self,
info: &ProducerEndpoint,
) -> Result<(), HttpError> {
let mut inner = self.inner.lock().await;
let assignment = match inner.producers.get_mut(&info.id) {
None => {
// There is no record for this producer.
//
// Select a random collector, and assign it to the producer.
// We'll return the assignment from this match block.
let Some((collector_id, collector_info)) = inner.random_collector() else {
return Err(HttpError::for_unavail(None, String::from("No collectors available")));
let Some((collector_id, collector_info)) =
inner.random_collector()
else {
return Err(HttpError::for_unavail(
None,
String::from("No collectors available"),
));
};
let client = Client::new(
format!("http://{}", collector_info.address).as_str(),
self.log.clone(),
);
client
.producers_post(&info.into())
.await
.map_err(|e| HttpError::for_internal_error(e.to_string()))?;
let assignment = ProducerAssignment {
producer: info.clone(),
collector_id,
};
client.producers_post(&info.into()).await.map_err(|e| {
HttpError::for_internal_error(e.to_string())
})?;
let assignment =
ProducerAssignment { producer: info.clone(), collector_id };
assignment
}
Some(existing_assignment) => {
Expand All @@ -108,48 +112,45 @@ impl StandaloneNexus {
// changed its IP address. Re-register it with the collector to
// which it's already assigned.
let collector_id = existing_assignment.collector_id;
let collector_info = inner.collectors.get(&collector_id).unwrap();
let collector_info =
inner.collectors.get(&collector_id).unwrap();
let client = Client::new(
format!("http://{}", collector_info.address).as_str(),
self.log.clone(),
);
client
.producers_post(&info.into())
.await
.map_err(|e| HttpError::for_internal_error(e.to_string()))?;
ProducerAssignment {
producer: info.clone(),
collector_id,
}
client.producers_post(&info.into()).await.map_err(|e| {
HttpError::for_internal_error(e.to_string())
})?;
ProducerAssignment { producer: info.clone(), collector_id }
}
};
inner.producers.insert(info.id, assignment);
Ok(())
}

async fn register_collector(&self, info: OximeterInfo) -> Result<(), HttpError> {
async fn register_collector(
&self,
info: OximeterInfo,
) -> Result<(), HttpError> {
// If this is being registered again, send all its assignments again.
let mut inner = self.inner.lock().await;
if inner.collectors.insert(info.collector_id, info.clone()).is_some() {
let client = Client::new(
format!("http://{}", info.address).as_str(),
self.log.clone(),
);
for producer_info in inner
.producers
.values()
.filter_map(|assignment| {
for producer_info in
inner.producers.values().filter_map(|assignment| {
if assignment.collector_id == info.collector_id {
Some(&assignment.producer)
} else {
None
}
})
{
client
.producers_post(&producer_info.into())
.await
.map_err(|e| HttpError::for_internal_error(e.to_string()))?;
client.producers_post(&producer_info.into()).await.map_err(
|e| HttpError::for_internal_error(e.to_string()),
)?;
}
}
Ok(())
Expand Down Expand Up @@ -177,7 +178,8 @@ async fn cpapi_producers_post(
) -> Result<HttpResponseUpdatedNoContent, HttpError> {
let context = request_context.context();
let producer_info = producer_info.into_inner();
context.register_producer(&producer_info)
context
.register_producer(&producer_info)
.await
.map(|_| HttpResponseUpdatedNoContent())
.map_err(|e| HttpError::for_internal_error(e.to_string()))
Expand All @@ -194,7 +196,8 @@ async fn cpapi_collectors_post(
) -> Result<HttpResponseUpdatedNoContent, HttpError> {
let context = request_context.context();
let oximeter_info = oximeter_info.into_inner();
context.register_collector(oximeter_info)
context
.register_collector(oximeter_info)
.await
.map(|_| HttpResponseUpdatedNoContent())
.map_err(|e| HttpError::for_internal_error(e.to_string()))
Expand All @@ -220,13 +223,10 @@ impl Server {
}

let nexus = Arc::new(StandaloneNexus::new(
log.new(slog::o!("component" => "nexus-standalone"))
log.new(slog::o!("component" => "nexus-standalone")),
));
let server = HttpServerStarter::new(
&ConfigDropshot {
bind_address: address,
..Default::default()
},
&ConfigDropshot { bind_address: address, ..Default::default() },
standalone_nexus_api(),
Arc::clone(&nexus),
&log,
Expand Down
Loading

0 comments on commit 21602a8

Please sign in to comment.