Skip to content

Commit

Permalink
WIP, with standlone nexus
Browse files Browse the repository at this point in the history
  • Loading branch information
bnaecker committed Sep 17, 2023
1 parent e4369c3 commit c40de05
Show file tree
Hide file tree
Showing 3 changed files with 224 additions and 60 deletions.
57 changes: 46 additions & 11 deletions oximeter/collector/src/bin/oximeter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@ use clap::Parser;
use omicron_common::cmd::fatal;
use omicron_common::cmd::CmdError;
use oximeter_collector::oximeter_api;
use oximeter_collector::standalone_nexus_api;
use oximeter_collector::Config;
use oximeter_collector::Oximeter;
use oximeter_collector::OximeterArguments;
use oximeter_collector::ProducerInfo;
use std::net::Ipv6Addr;
use std::net::SocketAddr;
use std::net::SocketAddrV6;
use std::path::PathBuf;
use uuid::Uuid;
use oximeter_collector::StandaloneNexus;
use oximeter_collector::Error;

pub fn run_openapi() -> Result<(), String> {
oximeter_api()
Expand All @@ -30,6 +32,16 @@ pub fn run_openapi() -> Result<(), String> {
.map_err(|e| e.to_string())
}

pub fn run_standalone_openapi() -> Result<(), String> {
standalone_nexus_api()
.openapi("Oxide Nexus API", "0.0.1")
.description("API for interacting with Nexus")
.contact_url("https://oxide.computer")
.contact_email("[email protected]")
.write(&mut std::io::stdout())
.map_err(|e| e.to_string())
}

/// Run an oximeter metric collection server in the Oxide Control Plane.
#[derive(Parser)]
#[clap(name = "oximeter", about = "See README.adoc for more information")]
Expand Down Expand Up @@ -60,13 +72,6 @@ enum Args {
/// producers to collect from, and optionally the location of a ClickHouse
/// database in which to insert records.
Standalone {
/// A list of producers to start collecting data from automatically.
///
/// Producers are listed as a set of comma-delimited strings. Each item
/// is formatted like `<uuid>@<address>`.
#[arg(value_delimiter = ',', num_args = 0..)]
producers: Vec<ProducerInfo>,

/// The ID for the collector.
///
/// Default is to generate a new, random UUID.
Expand All @@ -83,13 +88,28 @@ enum Args {
)]
address: SocketAddrV6,

/// The address for the fake Nexus server used to register.
///
/// This program starts a fake version of Nexus, which is used only to
/// register the producers and collectors. This allows them to operate
/// as they usually would, registering each other with Nexus so that an
/// assignment between them can be made.
#[arg(
long,
default_value_t = SocketAddrV6::new(Ipv6Addr::LOCALHOST, 0, 0, 0)
)]
nexus: SocketAddrV6,

/// The address for ClickHouse.
///
/// If not provided, `oximeter` will not attempt to insert records into
/// the database at all.
#[arg(short, long)]
#[arg(long)]
clickhouse: Option<SocketAddr>,
},

/// Print the fake Nexus's standalone API.
StandaloneOpenapi,
}

#[tokio::main]
Expand All @@ -113,14 +133,29 @@ async fn do_run() -> Result<(), CmdError> {
.await
.map_err(|e| CmdError::Failure(e.to_string()))
}
Args::Standalone { producers, id, address, clickhouse } => {
Args::Standalone { id, address, nexus, clickhouse } => {
// Start the standalone Nexus server, for registration of both the
// collector and producers.
let nexus_server = run_standalone_nexus(nexus).await.map_err(|e| CmdError::Failure(e.to_string()))?;
let args = OximeterArguments { id, address };
Oximeter::new_standalone(&args, clickhouse, &producers)
Oximeter::new_standalone(
nexus_server.log(),
&args,
nexus_server.local_addr(),
clickhouse
)
.await
.unwrap()
.serve_forever()
.await
.map_err(|e| CmdError::Failure(e.to_string()))
}
Args::StandaloneOpenapi => {
run_standalone_openapi().map_err(CmdError::Failure)
}
}
}

async fn run_standalone_nexus(addr: SocketAddrV6) -> Result<StandaloneNexus, Error> {
StandaloneNexus::new(addr.into())
}
59 changes: 34 additions & 25 deletions oximeter/collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ use tokio::time::interval;
use uuid::Uuid;

mod standalone;
pub use standalone::StandaloneNexus;
pub use standalone::Server as StandaloneNexus;
pub use standalone::standalone_nexus_api;

/// Errors collecting metric data
#[derive(Debug, Error)]
Expand Down Expand Up @@ -816,27 +817,16 @@ impl Oximeter {

/// Create a new `oximeter` collector running in standalone mode.
pub async fn new_standalone(
log: &Logger,
args: &OximeterArguments,
nexus: SocketAddr,
clickhouse: Option<SocketAddr>,
producers: &[ProducerInfo],
) -> Result<Self, Error> {
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::FullFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).build().fuse();
let (drain, registration) = slog_dtrace::with_drain(drain);
let log = slog::Logger::root(drain.fuse(), o!(FileKv));
if let slog_dtrace::ProbeRegistration::Failed(e) = registration {
let msg = format!("failed to register DTrace probes: {}", e);
error!(log, "{}", msg);
return Err(Error::Server(msg));
} else {
debug!(log, "registered DTrace probes");
}

let db_config = clickhouse.map(DbConfig::with_address);
let agent = Arc::new(
OximeterAgent::new_standalone(args.id, db_config, &log).await?,
);

let dropshot_log = log.new(o!("component" => "dropshot"));
let server = HttpServerStarter::new(
&ConfigDropshot {
Expand All @@ -849,20 +839,39 @@ impl Oximeter {
)
.map_err(|e| Error::Server(e.to_string()))?
.start();
info!(log, "started oximeter standalone server");

for producer in producers.iter() {
agent
.register_producer(ProducerEndpoint {
id: producer.id,
address: producer.address,
base_route: "/collect".to_string(),
interval: Duration::from_secs(10),
// Notify the standalone nexus.
let client = reqwest::Client::new();
let notify_nexus = || async {
debug!(log, "contacting nexus");
client
.post(format!("http://{}/metrics/collectors", nexus))
.json(&nexus_client::types::OximeterInfo {
address: server.local_addr().to_string(),
collector_id: agent.id,
})
.send()
.await
.context("failed to register standalone producers")?;
}
.map_err(|e| backoff::BackoffError::transient(e.to_string()))?
.error_for_status()
.map_err(|e| backoff::BackoffError::transient(e.to_string()))
};
let log_notification_failure = |error, delay| {
warn!(
log,
"failed to contact nexus, will retry in {:?}", delay;
"error" => ?error
);
};
backoff::retry_notify(
backoff::retry_policy_internal_service(),
notify_nexus,
log_notification_failure,
)
.await
.expect("Expected an infinite retry loop contacting Nexus");

info!(log, "started oximeter standalone server");
Ok(Self { agent, server })
}

Expand Down
Loading

0 comments on commit c40de05

Please sign in to comment.