Skip to content

Commit

Permalink
Qorb integration as connection pool for database (#5876)
Browse files Browse the repository at this point in the history
Replaces all usage of bb8 with a new connection pooling library called
[qorb](https://github.com/oxidecomputer/qorb).

qorb, detailed in RFD 477, provides the following benefits over bb8:
- It allows lookup of multiple backends via DNS SRV records
- It dynamically adjusts the number of connections to each bakend based
on their health, and prioritizes vending out connections to healthy
backends
- It should be re-usable for both our database and progenitor clients
(using a different "backend connector", but the same core library and
DNS resolution mechanism).

Fixes #4192
Part of #3763 (fixes CRDB
portion)
  • Loading branch information
smklein authored Aug 27, 2024
1 parent e3ec364 commit dd85331
Show file tree
Hide file tree
Showing 29 changed files with 591 additions and 334 deletions.
210 changes: 163 additions & 47 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -282,13 +282,12 @@ api_identity = { path = "api_identity" }
approx = "0.5.1"
assert_matches = "1.5.0"
assert_cmd = "2.0.16"
async-bb8-diesel = { git = "https://github.com/oxidecomputer/async-bb8-diesel", rev = "ed7ab5ef0513ba303d33efd41d3e9e381169d59b" }
async-bb8-diesel = "0.2"
async-trait = "0.1.81"
atomicwrites = "0.4.3"
authz-macros = { path = "nexus/authz-macros" }
backoff = { version = "0.4.0", features = [ "tokio" ] }
base64 = "0.22.1"
bb8 = "0.8.5"
bcs = "0.1.6"
bincode = "1.3.3"
bootstore = { path = "bootstore" }
Expand Down Expand Up @@ -497,6 +496,7 @@ bhyve_api = { git = "https://github.com/oxidecomputer/propolis", rev = "24a74d0c
propolis-client = { git = "https://github.com/oxidecomputer/propolis", rev = "24a74d0c76b6a63961ecef76acb1516b6e66c5c9" }
propolis-mock-server = { git = "https://github.com/oxidecomputer/propolis", rev = "24a74d0c76b6a63961ecef76acb1516b6e66c5c9" }
proptest = "1.5.0"
qorb = { git = "https://github.com/oxidecomputer/qorb", branch = "master" }
quote = "1.0"
rand = "0.8.5"
rand_core = "0.6.4"
Expand Down
13 changes: 7 additions & 6 deletions dev-tools/omdb/src/bin/omdb/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ impl DbUrlOptions {
eprintln!("note: using database URL {}", &db_url);

let db_config = db::Config { url: db_url.clone() };
let pool = Arc::new(db::Pool::new(&log.clone(), &db_config));
let pool =
Arc::new(db::Pool::new_single_host(&log.clone(), &db_config));

// Being a dev tool, we want to try this operation even if the schema
// doesn't match what we expect. So we use `DataStore::new_unchecked()`
Expand Down Expand Up @@ -4224,7 +4225,7 @@ async fn cmd_db_inventory(
}

async fn cmd_db_inventory_baseboard_ids(
conn: &DataStoreConnection<'_>,
conn: &DataStoreConnection,
limit: NonZeroU32,
) -> Result<(), anyhow::Error> {
#[derive(Tabled)]
Expand Down Expand Up @@ -4261,7 +4262,7 @@ async fn cmd_db_inventory_baseboard_ids(
}

async fn cmd_db_inventory_cabooses(
conn: &DataStoreConnection<'_>,
conn: &DataStoreConnection,
limit: NonZeroU32,
) -> Result<(), anyhow::Error> {
#[derive(Tabled)]
Expand Down Expand Up @@ -4302,7 +4303,7 @@ async fn cmd_db_inventory_cabooses(
}

async fn cmd_db_inventory_physical_disks(
conn: &DataStoreConnection<'_>,
conn: &DataStoreConnection,
limit: NonZeroU32,
args: InvPhysicalDisksArgs,
) -> Result<(), anyhow::Error> {
Expand Down Expand Up @@ -4359,7 +4360,7 @@ async fn cmd_db_inventory_physical_disks(
}

async fn cmd_db_inventory_rot_pages(
conn: &DataStoreConnection<'_>,
conn: &DataStoreConnection,
limit: NonZeroU32,
) -> Result<(), anyhow::Error> {
#[derive(Tabled)]
Expand Down Expand Up @@ -4394,7 +4395,7 @@ async fn cmd_db_inventory_rot_pages(
}

async fn cmd_db_inventory_collections_list(
conn: &DataStoreConnection<'_>,
conn: &DataStoreConnection,
limit: NonZeroU32,
) -> Result<(), anyhow::Error> {
#[derive(Tabled)]
Expand Down
24 changes: 24 additions & 0 deletions nexus-config/src/postgres_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
//! Common objects used for configuration
use std::fmt;
use std::net::SocketAddr;
use std::ops::Deref;
use std::str::FromStr;

Expand Down Expand Up @@ -32,6 +33,29 @@ impl PostgresConfigWithUrl {
pub fn url(&self) -> String {
self.url_raw.clone()
}

/// Accesses the first ip / port pair within the URL.
///
/// # Panics
///
/// This method makes the assumption that the hostname has at least one
/// "host IP / port" pair which can be extracted. If the supplied URL
/// does not have such a pair, this function will panic.
// Yes, panicking in the above scenario sucks. But this type is already
// pretty ubiquitous within Omicron, and integration with the qorb
// connection pooling library requires access to database by SocketAddr.
pub fn address(&self) -> SocketAddr {
let tokio_postgres::config::Host::Tcp(host) =
&self.config.get_hosts()[0]
else {
panic!("Non-TCP hostname");
};
let ip: std::net::IpAddr =
host.parse().expect("Failed to parse host as IP address");

let port = self.config.get_ports()[0];
SocketAddr::new(ip, port)
}
}

impl FromStr for PostgresConfigWithUrl {
Expand Down
6 changes: 4 additions & 2 deletions nexus/db-queries/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,22 @@ omicron-rpaths.workspace = true
anyhow.workspace = true
async-bb8-diesel.workspace = true
async-trait.workspace = true
bb8.workspace = true
camino.workspace = true
chrono.workspace = true
const_format.workspace = true
diesel.workspace = true
diesel-dtrace.workspace = true
dropshot.workspace = true
futures.workspace = true
internal-dns.workspace = true
ipnetwork.workspace = true
macaddr.workspace = true
once_cell.workspace = true
oxnet.workspace = true
paste.workspace = true
# See omicron-rpaths for more about the "pq-sys" dependency.
pq-sys = "*"
qorb = { workspace = true, features = [ "qtop" ] }
rand.workspace = true
ref-cast.workspace = true
schemars.workspace = true
Expand All @@ -45,8 +46,9 @@ strum.workspace = true
swrite.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["full"] }
uuid.workspace = true
url.workspace = true
usdt.workspace = true
uuid.workspace = true

db-macros.workspace = true
nexus-auth.workspace = true
Expand Down
28 changes: 13 additions & 15 deletions nexus/db-queries/src/db/collection_attach.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,9 +578,7 @@ where
mod test {
use super::*;
use crate::db::{self, identity::Resource as IdentityResource};
use async_bb8_diesel::{
AsyncRunQueryDsl, AsyncSimpleConnection, ConnectionManager,
};
use async_bb8_diesel::{AsyncRunQueryDsl, AsyncSimpleConnection};
use chrono::Utc;
use db_macros::Resource;
use diesel::expression_methods::ExpressionMethods;
Expand Down Expand Up @@ -617,8 +615,8 @@ mod test {

async fn setup_db(
pool: &crate::db::Pool,
) -> bb8::PooledConnection<ConnectionManager<DbConnection>> {
let connection = pool.pool().get().await.unwrap();
) -> crate::db::datastore::DataStoreConnection {
let connection = pool.claim().await.unwrap();
(*connection)
.batch_execute_async(
"CREATE SCHEMA IF NOT EXISTS test_schema; \
Expand Down Expand Up @@ -873,7 +871,7 @@ mod test {
dev::test_setup_log("test_attach_missing_collection_fails");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -902,7 +900,7 @@ mod test {
let logctx = dev::test_setup_log("test_attach_missing_resource_fails");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -939,7 +937,7 @@ mod test {
let logctx = dev::test_setup_log("test_attach_once");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -987,7 +985,7 @@ mod test {
let logctx = dev::test_setup_log("test_attach_once_synchronous");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -1036,7 +1034,7 @@ mod test {
let logctx = dev::test_setup_log("test_attach_multiple_times");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -1092,7 +1090,7 @@ mod test {
let logctx = dev::test_setup_log("test_attach_beyond_capacity_fails");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -1156,7 +1154,7 @@ mod test {
let logctx = dev::test_setup_log("test_attach_while_already_attached");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -1263,7 +1261,7 @@ mod test {
let logctx = dev::test_setup_log("test_attach_once");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -1318,7 +1316,7 @@ mod test {
let logctx = dev::test_setup_log("test_attach_deleted_resource_fails");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -1363,7 +1361,7 @@ mod test {
let logctx = dev::test_setup_log("test_attach_without_update_filter");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down
20 changes: 9 additions & 11 deletions nexus/db-queries/src/db/collection_detach.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,9 +482,7 @@ mod test {
use super::*;
use crate::db::collection_attach::DatastoreAttachTarget;
use crate::db::{self, identity::Resource as IdentityResource};
use async_bb8_diesel::{
AsyncRunQueryDsl, AsyncSimpleConnection, ConnectionManager,
};
use async_bb8_diesel::{AsyncRunQueryDsl, AsyncSimpleConnection};
use chrono::Utc;
use db_macros::Resource;
use diesel::expression_methods::ExpressionMethods;
Expand Down Expand Up @@ -521,8 +519,8 @@ mod test {

async fn setup_db(
pool: &crate::db::Pool,
) -> bb8::PooledConnection<ConnectionManager<DbConnection>> {
let connection = pool.pool().get().await.unwrap();
) -> crate::db::datastore::DataStoreConnection {
let connection = pool.claim().await.unwrap();
(*connection)
.batch_execute_async(
"CREATE SCHEMA IF NOT EXISTS test_schema; \
Expand Down Expand Up @@ -786,7 +784,7 @@ mod test {
dev::test_setup_log("test_detach_missing_collection_fails");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -814,7 +812,7 @@ mod test {
let logctx = dev::test_setup_log("test_detach_missing_resource_fails");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -850,7 +848,7 @@ mod test {
let logctx = dev::test_setup_log("test_detach_once");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -890,7 +888,7 @@ mod test {
let logctx = dev::test_setup_log("test_detach_while_already_detached");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -954,7 +952,7 @@ mod test {
let logctx = dev::test_setup_log("test_detach_deleted_resource_fails");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -998,7 +996,7 @@ mod test {
let logctx = dev::test_setup_log("test_detach_without_update_filter");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down
Loading

0 comments on commit dd85331

Please sign in to comment.