Skip to content

Commit

Permalink
Remove the LazyTimeseriesClient in Nexus (oxidecomputer#6925)
Browse files Browse the repository at this point in the history
The `LazyTimeseriesClient` is a vestigial type we've used to connect to
ClickHouse on-demand. This is used in Nexus, which may be created and
start handling requests that require talking to ClickHouse before the
server may be up or reachable.

That's obviated by the `qorb` connection pooling we've added in recent
commits. But since it's still around, that means we create connection
pools _each time_ Nexus handles a timeseries-related request. That leads
to exorbitant numbers of connections, almost all of which timeout, but
which causes ClickHouse to be unable to handle new connections. This
closes oxidecomputer#6923.
  • Loading branch information
bnaecker authored Oct 24, 2024
1 parent 0f8db77 commit 4c89247
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 92 deletions.
16 changes: 0 additions & 16 deletions nexus/src/app/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,6 @@ impl super::Nexus {
// resources they have access to.
opctx.authorize(authz::Action::Read, &authz::FLEET).await?;
self.timeseries_client
.get()
.await
.map_err(|e| {
Error::internal_error(&format!(
"Cannot access timeseries DB: {}",
e
))
})?
.timeseries_schema_list(&pagination.page, limit)
.await
.map_err(|e| match e {
Expand All @@ -145,14 +137,6 @@ impl super::Nexus {
// resources they have access to.
opctx.authorize(authz::Action::Read, &authz::FLEET).await?;
self.timeseries_client
.get()
.await
.map_err(|e| {
Error::internal_error(&format!(
"Cannot access timeseries DB: {}",
e
))
})?
.oxql_query(query)
.await
.map(|result| {
Expand Down
33 changes: 21 additions & 12 deletions nexus/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use self::external_endpoints::NexusCertResolver;
use self::saga::SagaExecutor;
use crate::app::background::BackgroundTasksData;
use crate::app::background::SagaRecoveryHelpers;
use crate::app::oximeter::LazyTimeseriesClient;
use crate::populate::populate_start;
use crate::populate::PopulateArgs;
use crate::populate::PopulateStatus;
Expand All @@ -25,6 +24,7 @@ use nexus_db_queries::authn;
use nexus_db_queries::authz;
use nexus_db_queries::context::OpContext;
use nexus_db_queries::db;
use omicron_common::address::CLICKHOUSE_TCP_PORT;
use omicron_common::address::DENDRITE_PORT;
use omicron_common::address::MGD_PORT;
use omicron_common::address::MGS_PORT;
Expand All @@ -36,6 +36,7 @@ use sagas::common_storage::make_pantry_connection_pool;
use sagas::common_storage::PooledPantryClient;
use slog::Logger;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::net::SocketAddrV6;
use std::net::{IpAddr, Ipv6Addr};
use std::sync::Arc;
Expand Down Expand Up @@ -165,7 +166,7 @@ pub struct Nexus {
reqwest_client: reqwest::Client,

/// Client to the timeseries database.
timeseries_client: LazyTimeseriesClient,
timeseries_client: oximeter_db::Client,

/// Contents of the trusted root role for the TUF repository.
#[allow(dead_code)]
Expand Down Expand Up @@ -409,16 +410,24 @@ impl Nexus {
.build()
.map_err(|e| e.to_string())?;

// Connect to clickhouse - but do so lazily.
// Clickhouse may not be executing when Nexus starts.
let timeseries_client = if let Some(address) =
&config.pkg.timeseries_db.address
{
// If an address was provided, use it instead of DNS.
LazyTimeseriesClient::new_from_address(log.clone(), *address)
} else {
LazyTimeseriesClient::new_from_dns(log.clone(), resolver.clone())
};
// Client to the ClickHouse database.
let timeseries_client =
if let Some(http_address) = &config.pkg.timeseries_db.address {
let native_address =
SocketAddr::new(http_address.ip(), CLICKHOUSE_TCP_PORT);
oximeter_db::Client::new(*http_address, native_address, &log)
} else {
// TODO-cleanup: Remove this when we remove the HTTP client.
let http_resolver =
qorb_resolver.for_service(ServiceName::Clickhouse);
let native_resolver =
qorb_resolver.for_service(ServiceName::ClickhouseNative);
oximeter_db::Client::new_with_pool(
http_resolver,
native_resolver,
&log,
)
};

// TODO-cleanup We may want to make the populator a first-class
// background task.
Expand Down
64 changes: 0 additions & 64 deletions nexus/src/app/oximeter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,14 @@
use crate::external_api::params::ResourceMetrics;
use crate::internal_api::params::OximeterInfo;
use dropshot::PaginationParams;
use internal_dns_resolver::{ResolveError, Resolver};
use internal_dns_types::names::ServiceName;
use nexus_db_queries::context::OpContext;
use nexus_db_queries::db;
use nexus_db_queries::db::DataStore;
use omicron_common::address::CLICKHOUSE_TCP_PORT;
use omicron_common::api::external::{DataPageParams, Error, ListResultVec};
use omicron_common::api::internal::nexus::{self, ProducerEndpoint};
use oximeter_client::Client as OximeterClient;
use oximeter_db::query::Timestamp;
use oximeter_db::Measurement;
use slog::Logger;
use std::convert::TryInto;
use std::net::SocketAddr;
use std::num::NonZeroU32;
Expand All @@ -31,58 +27,6 @@ use uuid::Uuid;
/// some interval of this overall duration.
pub const PRODUCER_LEASE_DURATION: Duration = Duration::from_secs(10 * 60);

/// A client which knows how to connect to Clickhouse, but does so
/// only when a request is actually made.
///
/// This allows callers to set up the mechanism of connection (by address
/// or DNS) separately from actually making that connection. This
/// is particularly useful in situations where configurations are parsed
/// prior to Clickhouse existing.
pub struct LazyTimeseriesClient {
log: Logger,
source: ClientSource,
}

enum ClientSource {
FromDns { resolver: Resolver },
FromIp { address: SocketAddr },
}

impl LazyTimeseriesClient {
pub fn new_from_dns(log: Logger, resolver: Resolver) -> Self {
Self { log, source: ClientSource::FromDns { resolver } }
}

pub fn new_from_address(log: Logger, address: SocketAddr) -> Self {
Self { log, source: ClientSource::FromIp { address } }
}

pub(crate) async fn get(
&self,
) -> Result<oximeter_db::Client, ResolveError> {
let (http_address, native_address) = match &self.source {
ClientSource::FromIp { address } => {
let native_address =
SocketAddr::new(address.ip(), CLICKHOUSE_TCP_PORT);
(*address, native_address)
}
ClientSource::FromDns { resolver } => {
let http_address = SocketAddr::from(
resolver.lookup_socket_v6(ServiceName::Clickhouse).await?,
);
let native_address = SocketAddr::from(
resolver
.lookup_socket_v6(ServiceName::ClickhouseNative)
.await?,
);
(http_address, native_address)
}
};

Ok(oximeter_db::Client::new(http_address, native_address, &self.log))
}
}

impl super::Nexus {
/// Insert a new record of an Oximeter collector server.
pub(crate) async fn upsert_oximeter_collector(
Expand Down Expand Up @@ -202,14 +146,6 @@ impl super::Nexus {

let timeseries_list = self
.timeseries_client
.get()
.await
.map_err(|e| {
Error::internal_error(&format!(
"Cannot access timeseries DB: {}",
e
))
})?
.select_timeseries_with(
timeseries_name,
criteria,
Expand Down

0 comments on commit 4c89247

Please sign in to comment.