Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qorb integration as connection pool for database #5876

Merged
merged 21 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 220 additions & 38 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,12 @@ api_identity = { path = "api_identity" }
approx = "0.5.1"
assert_matches = "1.5.0"
assert_cmd = "2.0.14"
async-bb8-diesel = { git = "https://github.com/oxidecomputer/async-bb8-diesel", rev = "ed7ab5ef0513ba303d33efd41d3e9e381169d59b" }
async-bb8-diesel = "0.2"
async-trait = "0.1.80"
atomicwrites = "0.4.3"
authz-macros = { path = "nexus/authz-macros" }
backoff = { version = "0.4.0", features = [ "tokio" ] }
base64 = "0.22.1"
bb8 = "0.8.3"
bcs = "0.1.6"
bincode = "1.3.3"
bootstore = { path = "bootstore" }
Expand Down Expand Up @@ -398,6 +397,7 @@ bhyve_api = { git = "https://github.com/oxidecomputer/propolis", rev = "6d7ed9a0
propolis-client = { git = "https://github.com/oxidecomputer/propolis", rev = "6d7ed9a033babc054db9eff5b59dee978d2b0d76" }
propolis-mock-server = { git = "https://github.com/oxidecomputer/propolis", rev = "6d7ed9a033babc054db9eff5b59dee978d2b0d76" }
proptest = "1.4.0"
qorb = { git = "https://github.com/oxidecomputer/qorb", branch = "master" }
quote = "1.0"
rand = "0.8.5"
rand_core = "0.6.4"
Expand Down Expand Up @@ -475,7 +475,7 @@ textwrap = "0.16.1"
test-strategy = "0.3.1"
thiserror = "1.0"
tofino = { git = "http://github.com/oxidecomputer/tofino", branch = "main" }
tokio = "1.37.0"
tokio = "1.38.0"
tokio-postgres = { version = "0.7", features = [ "with-chrono-0_4", "with-uuid-1" ] }
tokio-stream = "0.1.15"
tokio-tungstenite = "0.20"
Expand Down
13 changes: 7 additions & 6 deletions dev-tools/omdb/src/bin/omdb/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ impl DbUrlOptions {
eprintln!("note: using database URL {}", &db_url);

let db_config = db::Config { url: db_url.clone() };
let pool = Arc::new(db::Pool::new(&log.clone(), &db_config));
let pool =
Arc::new(db::Pool::new_single_host(&log.clone(), &db_config));

// Being a dev tool, we want to try this operation even if the schema
// doesn't match what we expect. So we use `DataStore::new_unchecked()`
Expand Down Expand Up @@ -3069,7 +3070,7 @@ async fn cmd_db_inventory(
}

async fn cmd_db_inventory_baseboard_ids(
conn: &DataStoreConnection<'_>,
conn: &DataStoreConnection,
limit: NonZeroU32,
) -> Result<(), anyhow::Error> {
#[derive(Tabled)]
Expand Down Expand Up @@ -3106,7 +3107,7 @@ async fn cmd_db_inventory_baseboard_ids(
}

async fn cmd_db_inventory_cabooses(
conn: &DataStoreConnection<'_>,
conn: &DataStoreConnection,
limit: NonZeroU32,
) -> Result<(), anyhow::Error> {
#[derive(Tabled)]
Expand Down Expand Up @@ -3147,7 +3148,7 @@ async fn cmd_db_inventory_cabooses(
}

async fn cmd_db_inventory_physical_disks(
conn: &DataStoreConnection<'_>,
conn: &DataStoreConnection,
limit: NonZeroU32,
args: PhysicalDisksArgs,
) -> Result<(), anyhow::Error> {
Expand Down Expand Up @@ -3204,7 +3205,7 @@ async fn cmd_db_inventory_physical_disks(
}

async fn cmd_db_inventory_rot_pages(
conn: &DataStoreConnection<'_>,
conn: &DataStoreConnection,
limit: NonZeroU32,
) -> Result<(), anyhow::Error> {
#[derive(Tabled)]
Expand Down Expand Up @@ -3239,7 +3240,7 @@ async fn cmd_db_inventory_rot_pages(
}

async fn cmd_db_inventory_collections_list(
conn: &DataStoreConnection<'_>,
conn: &DataStoreConnection,
limit: NonZeroU32,
) -> Result<(), anyhow::Error> {
#[derive(Tabled)]
Expand Down
24 changes: 24 additions & 0 deletions nexus-config/src/postgres_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
//! Common objects used for configuration

use std::fmt;
use std::net::SocketAddr;
use std::ops::Deref;
use std::str::FromStr;

Expand Down Expand Up @@ -32,6 +33,29 @@ impl PostgresConfigWithUrl {
pub fn url(&self) -> String {
self.url_raw.clone()
}

/// Accesses the first ip / port pair within the URL.
///
/// # Panics
///
/// This method makes the assumption that the hostname has at least one
/// "host IP / port" pair which can be extracted. If the supplied URL
/// does not have such a pair, this function will panic.
// Yes, panicking in the above scenario sucks. But this type is already
// pretty ubiquitous within Omicron, and integration with the qorb
// connection pooling library requires access to database by SocketAddr.
pub fn address(&self) -> SocketAddr {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love that this method makes this assumption, though this appears to be embedded mostly within tests.

It's necessary within nexus/db-queries/src/db/pool.rs to implement SingleHostResolver, which is our way of identifying backends as SocketAddr types.

I can try to make the "address" of the backend generic through qorb, so we could leave it as a URL?

let tokio_postgres::config::Host::Tcp(host) =
&self.config.get_hosts()[0]
else {
panic!("Non-TCP hostname");
};
let ip: std::net::IpAddr =
host.parse().expect("Failed to parse host as IP address");

let port = self.config.get_ports()[0];
SocketAddr::new(ip, port)
}
}

impl FromStr for PostgresConfigWithUrl {
Expand Down
3 changes: 2 additions & 1 deletion nexus/db-queries/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,22 @@ omicron-rpaths.workspace = true
anyhow.workspace = true
async-bb8-diesel.workspace = true
async-trait.workspace = true
bb8.workspace = true
camino.workspace = true
chrono.workspace = true
const_format.workspace = true
diesel.workspace = true
diesel-dtrace.workspace = true
dropshot.workspace = true
futures.workspace = true
internal-dns.workspace = true
ipnetwork.workspace = true
macaddr.workspace = true
once_cell.workspace = true
oxnet.workspace = true
paste.workspace = true
# See omicron-rpaths for more about the "pq-sys" dependency.
pq-sys = "*"
qorb = { workspace = true, features = [ "qtop" ] }
rand.workspace = true
ref-cast.workspace = true
schemars.workspace = true
Expand Down
28 changes: 13 additions & 15 deletions nexus/db-queries/src/db/collection_attach.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,9 +564,7 @@ where
mod test {
use super::*;
use crate::db::{self, identity::Resource as IdentityResource};
use async_bb8_diesel::{
AsyncRunQueryDsl, AsyncSimpleConnection, ConnectionManager,
};
use async_bb8_diesel::{AsyncRunQueryDsl, AsyncSimpleConnection};
use chrono::Utc;
use db_macros::Resource;
use diesel::expression_methods::ExpressionMethods;
Expand Down Expand Up @@ -603,8 +601,8 @@ mod test {

async fn setup_db(
pool: &crate::db::Pool,
) -> bb8::PooledConnection<ConnectionManager<DbConnection>> {
let connection = pool.pool().get().await.unwrap();
) -> crate::db::datastore::DataStoreConnection {
let connection = pool.claim().await.unwrap();
(*connection)
.batch_execute_async(
"CREATE SCHEMA IF NOT EXISTS test_schema; \
Expand Down Expand Up @@ -859,7 +857,7 @@ mod test {
dev::test_setup_log("test_attach_missing_collection_fails");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -888,7 +886,7 @@ mod test {
let logctx = dev::test_setup_log("test_attach_missing_resource_fails");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -925,7 +923,7 @@ mod test {
let logctx = dev::test_setup_log("test_attach_once");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -973,7 +971,7 @@ mod test {
let logctx = dev::test_setup_log("test_attach_once_synchronous");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -1022,7 +1020,7 @@ mod test {
let logctx = dev::test_setup_log("test_attach_multiple_times");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -1078,7 +1076,7 @@ mod test {
let logctx = dev::test_setup_log("test_attach_beyond_capacity_fails");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -1142,7 +1140,7 @@ mod test {
let logctx = dev::test_setup_log("test_attach_while_already_attached");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -1249,7 +1247,7 @@ mod test {
let logctx = dev::test_setup_log("test_attach_once");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -1304,7 +1302,7 @@ mod test {
let logctx = dev::test_setup_log("test_attach_deleted_resource_fails");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -1349,7 +1347,7 @@ mod test {
let logctx = dev::test_setup_log("test_attach_without_update_filter");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down
20 changes: 9 additions & 11 deletions nexus/db-queries/src/db/collection_detach.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,9 +482,7 @@ mod test {
use super::*;
use crate::db::collection_attach::DatastoreAttachTarget;
use crate::db::{self, identity::Resource as IdentityResource};
use async_bb8_diesel::{
AsyncRunQueryDsl, AsyncSimpleConnection, ConnectionManager,
};
use async_bb8_diesel::{AsyncRunQueryDsl, AsyncSimpleConnection};
use chrono::Utc;
use db_macros::Resource;
use diesel::expression_methods::ExpressionMethods;
Expand Down Expand Up @@ -521,8 +519,8 @@ mod test {

async fn setup_db(
pool: &crate::db::Pool,
) -> bb8::PooledConnection<ConnectionManager<DbConnection>> {
let connection = pool.pool().get().await.unwrap();
) -> crate::db::datastore::DataStoreConnection {
let connection = pool.claim().await.unwrap();
(*connection)
.batch_execute_async(
"CREATE SCHEMA IF NOT EXISTS test_schema; \
Expand Down Expand Up @@ -786,7 +784,7 @@ mod test {
dev::test_setup_log("test_detach_missing_collection_fails");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -814,7 +812,7 @@ mod test {
let logctx = dev::test_setup_log("test_detach_missing_resource_fails");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -850,7 +848,7 @@ mod test {
let logctx = dev::test_setup_log("test_detach_once");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -890,7 +888,7 @@ mod test {
let logctx = dev::test_setup_log("test_detach_while_already_detached");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -954,7 +952,7 @@ mod test {
let logctx = dev::test_setup_log("test_detach_deleted_resource_fails");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down Expand Up @@ -998,7 +996,7 @@ mod test {
let logctx = dev::test_setup_log("test_detach_without_update_filter");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&logctx.log, &cfg);
let pool = db::Pool::new_single_host(&logctx.log, &cfg);

let conn = setup_db(&pool).await;

Expand Down
Loading
Loading