From dd853311ff9a654537973fa3b830b70894cb50fa Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Tue, 27 Aug 2024 14:44:17 -0700 Subject: [PATCH] Qorb integration as connection pool for database (#5876) Replaces all usage of bb8 with a new connection pooling library called [qorb](https://github.com/oxidecomputer/qorb). qorb, detailed in RFD 477, provides the following benefits over bb8: - It allows lookup of multiple backends via DNS SRV records - It dynamically adjusts the number of connections to each bakend based on their health, and prioritizes vending out connections to healthy backends - It should be re-usable for both our database and progenitor clients (using a different "backend connector", but the same core library and DNS resolution mechanism). Fixes https://github.com/oxidecomputer/omicron/issues/4192 Part of https://github.com/oxidecomputer/omicron/issues/3763 (fixes CRDB portion) --- Cargo.lock | 210 ++++++++++++++---- Cargo.toml | 4 +- dev-tools/omdb/src/bin/omdb/db.rs | 13 +- nexus-config/src/postgres_config.rs | 24 ++ nexus/db-queries/Cargo.toml | 6 +- nexus/db-queries/src/db/collection_attach.rs | 28 ++- nexus/db-queries/src/db/collection_detach.rs | 20 +- .../src/db/collection_detach_many.rs | 24 +- nexus/db-queries/src/db/collection_insert.rs | 12 +- .../src/db/datastore/db_metadata.rs | 10 +- .../db-queries/src/db/datastore/inventory.rs | 2 +- nexus/db-queries/src/db/datastore/mod.rs | 15 +- nexus/db-queries/src/db/datastore/probe.rs | 34 +-- .../src/db/datastore/pub_test_utils.rs | 2 +- nexus/db-queries/src/db/explain.rs | 11 +- nexus/db-queries/src/db/pagination.rs | 12 +- nexus/db-queries/src/db/pool.rs | 203 ++++++++++------- nexus/db-queries/src/db/pool_connection.rs | 133 +++++++++-- .../db-queries/src/db/queries/external_ip.rs | 3 +- nexus/db-queries/src/db/queries/next_item.rs | 12 +- .../src/db/queries/region_allocation.rs | 4 +- .../virtual_provisioning_collection_update.rs | 16 +- nexus/db-queries/src/db/queries/vpc_subnet.rs | 11 +- .../src/app/background/tasks/saga_recovery.rs | 2 +- nexus/src/bin/schema-updater.rs | 2 +- nexus/src/context.rs | 88 +++----- nexus/src/populate.rs | 16 +- nexus/tests/integration_tests/schema.rs | 4 +- workspace-hack/Cargo.toml | 4 +- 29 files changed, 591 insertions(+), 334 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4b8f8cdf6f..a3a963d030 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -256,12 +256,14 @@ checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" [[package]] name = "async-bb8-diesel" -version = "0.1.0" -source = "git+https://github.com/oxidecomputer/async-bb8-diesel?rev=ed7ab5ef0513ba303d33efd41d3e9e381169d59b#ed7ab5ef0513ba303d33efd41d3e9e381169d59b" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc03a2806f66f36513d65e0a7f34200382230250cadcf8a8397cfbe3f26b795" dependencies = [ "async-trait", "bb8", "diesel", + "futures", "thiserror", "tokio", ] @@ -703,7 +705,7 @@ dependencies = [ name = "bootstrap-agent-api" version = "0.1.0" dependencies = [ - "dropshot", + "dropshot 0.10.2-dev", "nexus-client", "omicron-common", "omicron-uuid-kinds", @@ -973,7 +975,7 @@ version = "0.1.0" dependencies = [ "anyhow", "clap", - "dropshot", + "dropshot 0.10.2-dev", "futures", "libc", "omicron-rpaths", @@ -1117,7 +1119,7 @@ checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" name = "clickhouse-admin-api" version = "0.1.0" dependencies = [ - "dropshot", + "dropshot 0.10.2-dev", "omicron-common", "omicron-uuid-kinds", "omicron-workspace-hack", @@ -1160,7 +1162,7 @@ name = "cockroach-admin-api" version = "0.1.0" dependencies = [ "cockroach-admin-types", - "dropshot", + "dropshot 0.10.2-dev", "omicron-common", "omicron-uuid-kinds", "omicron-workspace-hack", @@ -1387,7 +1389,7 @@ name = "crdb-seed" version = "0.1.0" dependencies = [ "anyhow", - "dropshot", + "dropshot 0.10.2-dev", "omicron-test-utils", "omicron-workspace-hack", "slog", @@ -1539,7 +1541,7 @@ dependencies = [ "anyhow", "atty", "crucible-workspace-hack", - "dropshot", + "dropshot 0.10.2-dev", "nix 0.28.0", "rusqlite", "rustls-pemfile 1.0.4", @@ -2104,7 +2106,7 @@ dependencies = [ "clap", "dns-server-api", "dns-service-client", - "dropshot", + "dropshot 0.10.2-dev", "expectorate", "hickory-client", "hickory-proto", @@ -2137,7 +2139,7 @@ name = "dns-server-api" version = "0.1.0" dependencies = [ "chrono", - "dropshot", + "dropshot 0.10.2-dev", "omicron-workspace-hack", "schemars", "serde", @@ -2210,6 +2212,52 @@ dependencies = [ "uuid", ] +[[package]] +name = "dropshot" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a391eeedf8a75a188eb670327c704b7ab10eb2bb890e2ec0880dd21d609fb6e8" +dependencies = [ + "async-stream", + "async-trait", + "base64 0.22.1", + "bytes", + "camino", + "chrono", + "debug-ignore", + "dropshot_endpoint 0.10.1", + "form_urlencoded", + "futures", + "hostname 0.4.0", + "http 0.2.12", + "hyper 0.14.30", + "indexmap 2.4.0", + "multer", + "openapiv3", + "paste", + "percent-encoding", + "rustls 0.22.4", + "rustls-pemfile 2.1.3", + "schemars", + "scopeguard", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sha1", + "slog", + "slog-async", + "slog-bunyan", + "slog-json", + "slog-term", + "tokio", + "tokio-rustls 0.25.0", + "toml 0.8.19", + "uuid", + "version_check", + "waitgroup", +] + [[package]] name = "dropshot" version = "0.10.2-dev" @@ -2222,7 +2270,7 @@ dependencies = [ "camino", "chrono", "debug-ignore", - "dropshot_endpoint", + "dropshot_endpoint 0.10.2-dev", "form_urlencoded", "futures", "hostname 0.4.0", @@ -2256,6 +2304,19 @@ dependencies = [ "waitgroup", ] +[[package]] +name = "dropshot_endpoint" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9058c9c7e4a6b378cd12e71dc155bb15d0d4f8e1e6039ce2cf0a7c0c81043e33" +dependencies = [ + "proc-macro2", + "quote", + "serde", + "serde_tokenstream", + "syn 2.0.74", +] + [[package]] name = "dropshot_endpoint" version = "0.10.2-dev" @@ -2875,7 +2936,7 @@ dependencies = [ name = "gateway-api" version = "0.1.0" dependencies = [ - "dropshot", + "dropshot 0.10.2-dev", "gateway-types", "omicron-common", "omicron-uuid-kinds", @@ -2979,7 +3040,7 @@ name = "gateway-test-utils" version = "0.1.0" dependencies = [ "camino", - "dropshot", + "dropshot 0.10.2-dev", "gateway-messages", "gateway-types", "omicron-gateway", @@ -3982,7 +4043,7 @@ name = "installinator-api" version = "0.1.0" dependencies = [ "anyhow", - "dropshot", + "dropshot 0.10.2-dev", "hyper 0.14.30", "installinator-common", "omicron-common", @@ -4050,7 +4111,7 @@ dependencies = [ "chrono", "dns-server", "dns-service-client", - "dropshot", + "dropshot 0.10.2-dev", "expectorate", "futures", "hickory-resolver", @@ -4077,7 +4138,7 @@ version = "0.1.0" dependencies = [ "anyhow", "clap", - "dropshot", + "dropshot 0.10.2-dev", "hickory-resolver", "internal-dns", "omicron-common", @@ -4912,7 +4973,7 @@ dependencies = [ "base64 0.22.1", "chrono", "cookie 0.18.1", - "dropshot", + "dropshot 0.10.2-dev", "futures", "headers", "http 0.2.12", @@ -4969,7 +5030,7 @@ version = "0.1.0" dependencies = [ "anyhow", "camino", - "dropshot", + "dropshot 0.10.2-dev", "expectorate", "libc", "omicron-common", @@ -5052,7 +5113,6 @@ dependencies = [ "assert_matches", "async-bb8-diesel", "async-trait", - "bb8", "camino", "camino-tempfile", "chrono", @@ -5060,7 +5120,7 @@ dependencies = [ "db-macros", "diesel", "diesel-dtrace", - "dropshot", + "dropshot 0.10.2-dev", "expectorate", "futures", "gateway-client", @@ -5097,6 +5157,7 @@ dependencies = [ "pq-sys", "predicates", "pretty_assertions", + "qorb", "rand", "rcgen", "ref-cast", @@ -5118,6 +5179,7 @@ dependencies = [ "term", "thiserror", "tokio", + "url", "usdt", "uuid", ] @@ -5139,7 +5201,7 @@ dependencies = [ name = "nexus-internal-api" version = "0.1.0" dependencies = [ - "dropshot", + "dropshot 0.10.2-dev", "nexus-types", "omicron-common", "omicron-uuid-kinds", @@ -5395,7 +5457,7 @@ dependencies = [ "crucible-agent-client", "dns-server", "dns-service-client", - "dropshot", + "dropshot 0.10.2-dev", "futures", "gateway-messages", "gateway-test-utils", @@ -5453,7 +5515,7 @@ dependencies = [ "derive-where", "derive_more", "dns-service-client", - "dropshot", + "dropshot 0.10.2-dev", "futures", "gateway-client", "http 0.2.12", @@ -5778,7 +5840,7 @@ dependencies = [ "chrono", "clap", "clickhouse-admin-api", - "dropshot", + "dropshot 0.10.2-dev", "expectorate", "http 0.2.12", "illumos-utils", @@ -5815,7 +5877,7 @@ dependencies = [ "cockroach-admin-api", "cockroach-admin-types", "csv", - "dropshot", + "dropshot 0.10.2-dev", "expectorate", "http 0.2.12", "illumos-utils", @@ -5857,7 +5919,7 @@ dependencies = [ "camino", "camino-tempfile", "chrono", - "dropshot", + "dropshot 0.10.2-dev", "expectorate", "futures", "hex", @@ -5925,7 +5987,7 @@ version = "0.1.0" dependencies = [ "anyhow", "clap", - "dropshot", + "dropshot 0.10.2-dev", "expectorate", "futures", "libc", @@ -5965,7 +6027,7 @@ dependencies = [ "camino", "chrono", "clap", - "dropshot", + "dropshot 0.10.2-dev", "expectorate", "futures", "gateway-api", @@ -6029,7 +6091,7 @@ dependencies = [ "dns-server", "dns-service-client", "dpd-client", - "dropshot", + "dropshot 0.10.2-dev", "expectorate", "fatfs", "futures", @@ -6152,7 +6214,7 @@ dependencies = [ "crucible-agent-client", "csv", "diesel", - "dropshot", + "dropshot 0.10.2-dev", "dyn-clone", "expectorate", "futures", @@ -6315,7 +6377,7 @@ dependencies = [ "dns-server", "dns-service-client", "dpd-client", - "dropshot", + "dropshot 0.10.2-dev", "expectorate", "flate2", "flume", @@ -6400,7 +6462,7 @@ dependencies = [ "atomicwrites", "camino", "camino-tempfile", - "dropshot", + "dropshot 0.10.2-dev", "expectorate", "filetime", "gethostname", @@ -6636,7 +6698,7 @@ dependencies = [ "clickhouse-admin-api", "cockroach-admin-api", "dns-server-api", - "dropshot", + "dropshot 0.10.2-dev", "fs-err", "gateway-api", "indent_write", @@ -6860,7 +6922,7 @@ name = "oximeter-api" version = "0.1.0" dependencies = [ "chrono", - "dropshot", + "dropshot 0.10.2-dev", "omicron-common", "omicron-workspace-hack", "schemars", @@ -6891,7 +6953,7 @@ dependencies = [ "camino", "chrono", "clap", - "dropshot", + "dropshot 0.10.2-dev", "expectorate", "futures", "hyper 0.14.30", @@ -6939,7 +7001,7 @@ dependencies = [ "clap", "clickward", "crossterm 0.28.1", - "dropshot", + "dropshot 0.10.2-dev", "expectorate", "futures", "highway", @@ -6981,7 +7043,7 @@ version = "0.1.0" dependencies = [ "cfg-if", "chrono", - "dropshot", + "dropshot 0.10.2-dev", "futures", "http 0.2.12", "hyper 0.14.30", @@ -7017,7 +7079,7 @@ dependencies = [ "anyhow", "chrono", "clap", - "dropshot", + "dropshot 0.10.2-dev", "internal-dns", "nexus-client", "omicron-common", @@ -8012,7 +8074,7 @@ dependencies = [ "atty", "base64 0.21.7", "clap", - "dropshot", + "dropshot 0.10.2-dev", "futures", "hyper 0.14.30", "progenitor", @@ -8099,6 +8161,29 @@ dependencies = [ "psl-types", ] +[[package]] +name = "qorb" +version = "0.0.1" +source = "git+https://github.com/oxidecomputer/qorb?branch=master#163a77838a3cfe8f7741d32e443f76d995b89df3" +dependencies = [ + "anyhow", + "async-trait", + "debug-ignore", + "derive-where", + "dropshot 0.10.1", + "futures", + "hickory-resolver", + "rand", + "schemars", + "serde", + "serde_json", + "thiserror", + "tokio", + "tokio-stream", + "tokio-tungstenite 0.23.1", + "tracing", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -8272,7 +8357,7 @@ dependencies = [ "camino-tempfile", "clap", "dns-service-client", - "dropshot", + "dropshot 0.10.2-dev", "expectorate", "humantime", "indexmap 2.4.0", @@ -9528,7 +9613,7 @@ name = "sled-agent-api" version = "0.1.0" dependencies = [ "camino", - "dropshot", + "dropshot 0.10.2-dev", "nexus-sled-agent-shared", "omicron-common", "omicron-uuid-kinds", @@ -9898,7 +9983,7 @@ dependencies = [ "anyhow", "async-trait", "clap", - "dropshot", + "dropshot 0.10.2-dev", "futures", "gateway-messages", "gateway-types", @@ -10715,6 +10800,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] @@ -10741,6 +10827,18 @@ dependencies = [ "tungstenite 0.21.0", ] +[[package]] +name = "tokio-tungstenite" +version = "0.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6989540ced10490aaf14e6bad2e3d33728a2813310a0c71d1574304c49631cd" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.23.0", +] + [[package]] name = "tokio-util" version = "0.7.11" @@ -11092,6 +11190,24 @@ dependencies = [ "utf-8", ] +[[package]] +name = "tungstenite" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e2e2ce1e47ed2994fd43b04c8f618008d4cabdd5ee34027cf14f9d918edd9c8" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.1.0", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "utf-8", +] + [[package]] name = "twox-hash" version = "1.6.3" @@ -11293,7 +11409,7 @@ dependencies = [ "clap", "debug-ignore", "display-error-chain", - "dropshot", + "dropshot 0.10.2-dev", "futures", "hex", "hubtools", @@ -11760,7 +11876,7 @@ version = "0.1.0" dependencies = [ "anyhow", "dpd-client", - "dropshot", + "dropshot 0.10.2-dev", "gateway-client", "maplit", "omicron-common", @@ -11816,7 +11932,7 @@ dependencies = [ "debug-ignore", "display-error-chain", "dpd-client", - "dropshot", + "dropshot 0.10.2-dev", "either", "expectorate", "flate2", @@ -11883,7 +11999,7 @@ name = "wicketd-api" version = "0.1.0" dependencies = [ "bootstrap-agent-client", - "dropshot", + "dropshot 0.10.2-dev", "gateway-client", "omicron-common", "omicron-passwords", @@ -12339,7 +12455,7 @@ dependencies = [ "anyhow", "camino", "clap", - "dropshot", + "dropshot 0.10.2-dev", "illumos-utils", "omicron-common", "omicron-sled-agent", diff --git a/Cargo.toml b/Cargo.toml index 2c3902f7bc..6565265c1a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -282,13 +282,12 @@ api_identity = { path = "api_identity" } approx = "0.5.1" assert_matches = "1.5.0" assert_cmd = "2.0.16" -async-bb8-diesel = { git = "https://github.com/oxidecomputer/async-bb8-diesel", rev = "ed7ab5ef0513ba303d33efd41d3e9e381169d59b" } +async-bb8-diesel = "0.2" async-trait = "0.1.81" atomicwrites = "0.4.3" authz-macros = { path = "nexus/authz-macros" } backoff = { version = "0.4.0", features = [ "tokio" ] } base64 = "0.22.1" -bb8 = "0.8.5" bcs = "0.1.6" bincode = "1.3.3" bootstore = { path = "bootstore" } @@ -497,6 +496,7 @@ bhyve_api = { git = "https://github.com/oxidecomputer/propolis", rev = "24a74d0c propolis-client = { git = "https://github.com/oxidecomputer/propolis", rev = "24a74d0c76b6a63961ecef76acb1516b6e66c5c9" } propolis-mock-server = { git = "https://github.com/oxidecomputer/propolis", rev = "24a74d0c76b6a63961ecef76acb1516b6e66c5c9" } proptest = "1.5.0" +qorb = { git = "https://github.com/oxidecomputer/qorb", branch = "master" } quote = "1.0" rand = "0.8.5" rand_core = "0.6.4" diff --git a/dev-tools/omdb/src/bin/omdb/db.rs b/dev-tools/omdb/src/bin/omdb/db.rs index 9ce4c66a80..48f5137698 100644 --- a/dev-tools/omdb/src/bin/omdb/db.rs +++ b/dev-tools/omdb/src/bin/omdb/db.rs @@ -246,7 +246,8 @@ impl DbUrlOptions { eprintln!("note: using database URL {}", &db_url); let db_config = db::Config { url: db_url.clone() }; - let pool = Arc::new(db::Pool::new(&log.clone(), &db_config)); + let pool = + Arc::new(db::Pool::new_single_host(&log.clone(), &db_config)); // Being a dev tool, we want to try this operation even if the schema // doesn't match what we expect. So we use `DataStore::new_unchecked()` @@ -4224,7 +4225,7 @@ async fn cmd_db_inventory( } async fn cmd_db_inventory_baseboard_ids( - conn: &DataStoreConnection<'_>, + conn: &DataStoreConnection, limit: NonZeroU32, ) -> Result<(), anyhow::Error> { #[derive(Tabled)] @@ -4261,7 +4262,7 @@ async fn cmd_db_inventory_baseboard_ids( } async fn cmd_db_inventory_cabooses( - conn: &DataStoreConnection<'_>, + conn: &DataStoreConnection, limit: NonZeroU32, ) -> Result<(), anyhow::Error> { #[derive(Tabled)] @@ -4302,7 +4303,7 @@ async fn cmd_db_inventory_cabooses( } async fn cmd_db_inventory_physical_disks( - conn: &DataStoreConnection<'_>, + conn: &DataStoreConnection, limit: NonZeroU32, args: InvPhysicalDisksArgs, ) -> Result<(), anyhow::Error> { @@ -4359,7 +4360,7 @@ async fn cmd_db_inventory_physical_disks( } async fn cmd_db_inventory_rot_pages( - conn: &DataStoreConnection<'_>, + conn: &DataStoreConnection, limit: NonZeroU32, ) -> Result<(), anyhow::Error> { #[derive(Tabled)] @@ -4394,7 +4395,7 @@ async fn cmd_db_inventory_rot_pages( } async fn cmd_db_inventory_collections_list( - conn: &DataStoreConnection<'_>, + conn: &DataStoreConnection, limit: NonZeroU32, ) -> Result<(), anyhow::Error> { #[derive(Tabled)] diff --git a/nexus-config/src/postgres_config.rs b/nexus-config/src/postgres_config.rs index 2509ae4fca..0c72d2ba9e 100644 --- a/nexus-config/src/postgres_config.rs +++ b/nexus-config/src/postgres_config.rs @@ -5,6 +5,7 @@ //! Common objects used for configuration use std::fmt; +use std::net::SocketAddr; use std::ops::Deref; use std::str::FromStr; @@ -32,6 +33,29 @@ impl PostgresConfigWithUrl { pub fn url(&self) -> String { self.url_raw.clone() } + + /// Accesses the first ip / port pair within the URL. + /// + /// # Panics + /// + /// This method makes the assumption that the hostname has at least one + /// "host IP / port" pair which can be extracted. If the supplied URL + /// does not have such a pair, this function will panic. + // Yes, panicking in the above scenario sucks. But this type is already + // pretty ubiquitous within Omicron, and integration with the qorb + // connection pooling library requires access to database by SocketAddr. + pub fn address(&self) -> SocketAddr { + let tokio_postgres::config::Host::Tcp(host) = + &self.config.get_hosts()[0] + else { + panic!("Non-TCP hostname"); + }; + let ip: std::net::IpAddr = + host.parse().expect("Failed to parse host as IP address"); + + let port = self.config.get_ports()[0]; + SocketAddr::new(ip, port) + } } impl FromStr for PostgresConfigWithUrl { diff --git a/nexus/db-queries/Cargo.toml b/nexus/db-queries/Cargo.toml index 5192528944..c6c5caab6a 100644 --- a/nexus/db-queries/Cargo.toml +++ b/nexus/db-queries/Cargo.toml @@ -14,7 +14,6 @@ omicron-rpaths.workspace = true anyhow.workspace = true async-bb8-diesel.workspace = true async-trait.workspace = true -bb8.workspace = true camino.workspace = true chrono.workspace = true const_format.workspace = true @@ -22,6 +21,7 @@ diesel.workspace = true diesel-dtrace.workspace = true dropshot.workspace = true futures.workspace = true +internal-dns.workspace = true ipnetwork.workspace = true macaddr.workspace = true once_cell.workspace = true @@ -29,6 +29,7 @@ oxnet.workspace = true paste.workspace = true # See omicron-rpaths for more about the "pq-sys" dependency. pq-sys = "*" +qorb = { workspace = true, features = [ "qtop" ] } rand.workspace = true ref-cast.workspace = true schemars.workspace = true @@ -45,8 +46,9 @@ strum.workspace = true swrite.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["full"] } -uuid.workspace = true +url.workspace = true usdt.workspace = true +uuid.workspace = true db-macros.workspace = true nexus-auth.workspace = true diff --git a/nexus/db-queries/src/db/collection_attach.rs b/nexus/db-queries/src/db/collection_attach.rs index 95e6afeb4b..c009d60483 100644 --- a/nexus/db-queries/src/db/collection_attach.rs +++ b/nexus/db-queries/src/db/collection_attach.rs @@ -578,9 +578,7 @@ where mod test { use super::*; use crate::db::{self, identity::Resource as IdentityResource}; - use async_bb8_diesel::{ - AsyncRunQueryDsl, AsyncSimpleConnection, ConnectionManager, - }; + use async_bb8_diesel::{AsyncRunQueryDsl, AsyncSimpleConnection}; use chrono::Utc; use db_macros::Resource; use diesel::expression_methods::ExpressionMethods; @@ -617,8 +615,8 @@ mod test { async fn setup_db( pool: &crate::db::Pool, - ) -> bb8::PooledConnection> { - let connection = pool.pool().get().await.unwrap(); + ) -> crate::db::datastore::DataStoreConnection { + let connection = pool.claim().await.unwrap(); (*connection) .batch_execute_async( "CREATE SCHEMA IF NOT EXISTS test_schema; \ @@ -873,7 +871,7 @@ mod test { dev::test_setup_log("test_attach_missing_collection_fails"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); let conn = setup_db(&pool).await; @@ -902,7 +900,7 @@ mod test { let logctx = dev::test_setup_log("test_attach_missing_resource_fails"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); let conn = setup_db(&pool).await; @@ -939,7 +937,7 @@ mod test { let logctx = dev::test_setup_log("test_attach_once"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); let conn = setup_db(&pool).await; @@ -987,7 +985,7 @@ mod test { let logctx = dev::test_setup_log("test_attach_once_synchronous"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); let conn = setup_db(&pool).await; @@ -1036,7 +1034,7 @@ mod test { let logctx = dev::test_setup_log("test_attach_multiple_times"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); let conn = setup_db(&pool).await; @@ -1092,7 +1090,7 @@ mod test { let logctx = dev::test_setup_log("test_attach_beyond_capacity_fails"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); let conn = setup_db(&pool).await; @@ -1156,7 +1154,7 @@ mod test { let logctx = dev::test_setup_log("test_attach_while_already_attached"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); let conn = setup_db(&pool).await; @@ -1263,7 +1261,7 @@ mod test { let logctx = dev::test_setup_log("test_attach_once"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); let conn = setup_db(&pool).await; @@ -1318,7 +1316,7 @@ mod test { let logctx = dev::test_setup_log("test_attach_deleted_resource_fails"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); let conn = setup_db(&pool).await; @@ -1363,7 +1361,7 @@ mod test { let logctx = dev::test_setup_log("test_attach_without_update_filter"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); let conn = setup_db(&pool).await; diff --git a/nexus/db-queries/src/db/collection_detach.rs b/nexus/db-queries/src/db/collection_detach.rs index 03e09d41ca..bc547d5127 100644 --- a/nexus/db-queries/src/db/collection_detach.rs +++ b/nexus/db-queries/src/db/collection_detach.rs @@ -482,9 +482,7 @@ mod test { use super::*; use crate::db::collection_attach::DatastoreAttachTarget; use crate::db::{self, identity::Resource as IdentityResource}; - use async_bb8_diesel::{ - AsyncRunQueryDsl, AsyncSimpleConnection, ConnectionManager, - }; + use async_bb8_diesel::{AsyncRunQueryDsl, AsyncSimpleConnection}; use chrono::Utc; use db_macros::Resource; use diesel::expression_methods::ExpressionMethods; @@ -521,8 +519,8 @@ mod test { async fn setup_db( pool: &crate::db::Pool, - ) -> bb8::PooledConnection> { - let connection = pool.pool().get().await.unwrap(); + ) -> crate::db::datastore::DataStoreConnection { + let connection = pool.claim().await.unwrap(); (*connection) .batch_execute_async( "CREATE SCHEMA IF NOT EXISTS test_schema; \ @@ -786,7 +784,7 @@ mod test { dev::test_setup_log("test_detach_missing_collection_fails"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); let conn = setup_db(&pool).await; @@ -814,7 +812,7 @@ mod test { let logctx = dev::test_setup_log("test_detach_missing_resource_fails"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); let conn = setup_db(&pool).await; @@ -850,7 +848,7 @@ mod test { let logctx = dev::test_setup_log("test_detach_once"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); let conn = setup_db(&pool).await; @@ -890,7 +888,7 @@ mod test { let logctx = dev::test_setup_log("test_detach_while_already_detached"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); let conn = setup_db(&pool).await; @@ -954,7 +952,7 @@ mod test { let logctx = dev::test_setup_log("test_detach_deleted_resource_fails"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); let conn = setup_db(&pool).await; @@ -998,7 +996,7 @@ mod test { let logctx = dev::test_setup_log("test_detach_without_update_filter"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); let conn = setup_db(&pool).await; diff --git a/nexus/db-queries/src/db/collection_detach_many.rs b/nexus/db-queries/src/db/collection_detach_many.rs index 986cfb70b7..36755599d4 100644 --- a/nexus/db-queries/src/db/collection_detach_many.rs +++ b/nexus/db-queries/src/db/collection_detach_many.rs @@ -480,9 +480,7 @@ mod test { use super::*; use crate::db::collection_attach::DatastoreAttachTarget; use crate::db::{self, identity::Resource as IdentityResource}; - use async_bb8_diesel::{ - AsyncRunQueryDsl, AsyncSimpleConnection, ConnectionManager, - }; + use async_bb8_diesel::{AsyncRunQueryDsl, AsyncSimpleConnection}; use chrono::Utc; use db_macros::Resource; use diesel::expression_methods::ExpressionMethods; @@ -519,8 +517,8 @@ mod test { async fn setup_db( pool: &crate::db::Pool, - ) -> bb8::PooledConnection> { - let connection = pool.pool().get().await.unwrap(); + ) -> crate::db::datastore::DataStoreConnection { + let connection = pool.claim().await.unwrap(); (*connection) .batch_execute_async( "CREATE SCHEMA IF NOT EXISTS test_schema; \ @@ -778,7 +776,7 @@ mod test { dev::test_setup_log("test_detach_missing_collection_fails"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); let conn = setup_db(&pool).await; @@ -808,7 +806,7 @@ mod test { dev::test_setup_log("test_detach_missing_resource_succeeds"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); let conn = setup_db(&pool).await; @@ -849,7 +847,7 @@ mod test { let logctx = dev::test_setup_log("test_detach_once"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); let conn = setup_db(&pool).await; @@ -892,7 +890,7 @@ mod test { let logctx = dev::test_setup_log("test_detach_once_synchronous"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); let conn = setup_db(&pool).await; @@ -937,7 +935,7 @@ mod test { let logctx = dev::test_setup_log("test_detach_while_already_detached"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); let conn = setup_db(&pool).await; @@ -993,7 +991,7 @@ mod test { let logctx = dev::test_setup_log("test_detach_filter_collection"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); let conn = setup_db(&pool).await; @@ -1044,7 +1042,7 @@ mod test { let logctx = dev::test_setup_log("test_detach_deleted_resource"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); let conn = setup_db(&pool).await; @@ -1102,7 +1100,7 @@ mod test { let logctx = dev::test_setup_log("test_detach_many"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); let conn = setup_db(&pool).await; diff --git a/nexus/db-queries/src/db/collection_insert.rs b/nexus/db-queries/src/db/collection_insert.rs index 69906e6498..3aaea6aeb1 100644 --- a/nexus/db-queries/src/db/collection_insert.rs +++ b/nexus/db-queries/src/db/collection_insert.rs @@ -406,9 +406,7 @@ where mod test { use super::*; use crate::db::{self, identity::Resource as IdentityResource}; - use async_bb8_diesel::{ - AsyncRunQueryDsl, AsyncSimpleConnection, ConnectionManager, - }; + use async_bb8_diesel::{AsyncRunQueryDsl, AsyncSimpleConnection}; use chrono::{DateTime, Utc}; use db_macros::Resource; use diesel::expression_methods::ExpressionMethods; @@ -443,8 +441,8 @@ mod test { async fn setup_db( pool: &crate::db::Pool, - ) -> bb8::PooledConnection> { - let connection = pool.pool().get().await.unwrap(); + ) -> crate::db::datastore::DataStoreConnection { + let connection = pool.claim().await.unwrap(); (*connection) .batch_execute_async( "CREATE SCHEMA IF NOT EXISTS test_schema; \ @@ -560,7 +558,7 @@ mod test { let logctx = dev::test_setup_log("test_collection_not_present"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); let conn = setup_db(&pool).await; @@ -590,7 +588,7 @@ mod test { let logctx = dev::test_setup_log("test_collection_present"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); let conn = setup_db(&pool).await; diff --git a/nexus/db-queries/src/db/datastore/db_metadata.rs b/nexus/db-queries/src/db/datastore/db_metadata.rs index 4169cc06bd..b997bf384f 100644 --- a/nexus/db-queries/src/db/datastore/db_metadata.rs +++ b/nexus/db-queries/src/db/datastore/db_metadata.rs @@ -511,7 +511,7 @@ mod test { let mut crdb = test_db::test_setup_database(&logctx.log).await; let cfg = db::Config { url: crdb.pg_config().clone() }; - let pool = Arc::new(db::Pool::new(&logctx.log, &cfg)); + let pool = Arc::new(db::Pool::new_single_host(&logctx.log, &cfg)); let datastore = Arc::new(DataStore::new(&logctx.log, pool, None).await.unwrap()); @@ -559,8 +559,8 @@ mod test { let mut crdb = test_db::test_setup_database(&logctx.log).await; let cfg = db::Config { url: crdb.pg_config().clone() }; - let pool = Arc::new(db::Pool::new(&logctx.log, &cfg)); - let conn = pool.pool().get().await.unwrap(); + let pool = Arc::new(db::Pool::new_single_host(&logctx.log, &cfg)); + let conn = pool.claim().await.unwrap(); // Mimic the layout of "schema/crdb". let config_dir = Utf8TempDir::new().unwrap(); @@ -671,8 +671,8 @@ mod test { let mut crdb = test_db::test_setup_database(&logctx.log).await; let cfg = db::Config { url: crdb.pg_config().clone() }; - let pool = Arc::new(db::Pool::new(&logctx.log, &cfg)); - let conn = pool.pool().get().await.unwrap(); + let pool = Arc::new(db::Pool::new_single_host(&logctx.log, &cfg)); + let conn = pool.claim().await.unwrap(); // Mimic the layout of "schema/crdb". let config_dir = Utf8TempDir::new().unwrap(); diff --git a/nexus/db-queries/src/db/datastore/inventory.rs b/nexus/db-queries/src/db/datastore/inventory.rs index 1774a25c48..8888f2caaa 100644 --- a/nexus/db-queries/src/db/datastore/inventory.rs +++ b/nexus/db-queries/src/db/datastore/inventory.rs @@ -2164,7 +2164,7 @@ mod test { } impl CollectionCounts { - async fn new(conn: &DataStoreConnection<'_>) -> anyhow::Result { + async fn new(conn: &DataStoreConnection) -> anyhow::Result { conn.transaction_async(|conn| async move { conn.batch_execute_async(ALLOW_FULL_TABLE_SCAN_SQL) .await diff --git a/nexus/db-queries/src/db/datastore/mod.rs b/nexus/db-queries/src/db/datastore/mod.rs index 2cd21754f8..d424e08b61 100644 --- a/nexus/db-queries/src/db/datastore/mod.rs +++ b/nexus/db-queries/src/db/datastore/mod.rs @@ -27,7 +27,7 @@ use crate::db::{ error::{public_error_from_diesel, ErrorHandler}, }; use ::oximeter::types::ProducerRegistry; -use async_bb8_diesel::{AsyncRunQueryDsl, ConnectionManager}; +use async_bb8_diesel::AsyncRunQueryDsl; use diesel::pg::Pg; use diesel::prelude::*; use diesel::query_builder::{QueryFragment, QueryId}; @@ -174,8 +174,8 @@ impl RunnableQuery for T where { } -pub type DataStoreConnection<'a> = - bb8::PooledConnection<'a, ConnectionManager>; +pub type DataStoreConnection = + qorb::claim::Handle>; pub struct DataStore { log: Logger, @@ -279,8 +279,7 @@ impl DataStore { opctx: &OpContext, ) -> Result { opctx.authorize(authz::Action::Query, &authz::DATABASE).await?; - let pool = self.pool.pool(); - let connection = pool.get().await.map_err(|err| { + let connection = self.pool.claim().await.map_err(|err| { Error::unavail(&format!("Failed to access DB connection: {err}")) })?; Ok(connection) @@ -294,7 +293,7 @@ impl DataStore { pub(super) async fn pool_connection_unauthorized( &self, ) -> Result { - let connection = self.pool.pool().get().await.map_err(|err| { + let connection = self.pool.claim().await.map_err(|err| { Error::unavail(&format!("Failed to access DB connection: {err}")) })?; Ok(connection) @@ -1587,7 +1586,7 @@ mod test { dev::test_setup_log("test_queries_do_not_require_full_table_scan"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); let datastore = DataStore::new(&logctx.log, Arc::new(pool), None).await.unwrap(); let conn = datastore.pool_connection_for_tests().await.unwrap(); @@ -1632,7 +1631,7 @@ mod test { let logctx = dev::test_setup_log("test_sled_ipv6_address_allocation"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = Arc::new(db::Pool::new(&logctx.log, &cfg)); + let pool = Arc::new(db::Pool::new_single_host(&logctx.log, &cfg)); let datastore = Arc::new(DataStore::new(&logctx.log, pool, None).await.unwrap()); let opctx = OpContext::for_tests( diff --git a/nexus/db-queries/src/db/datastore/probe.rs b/nexus/db-queries/src/db/datastore/probe.rs index f3e0614552..434bf25760 100644 --- a/nexus/db-queries/src/db/datastore/probe.rs +++ b/nexus/db-queries/src/db/datastore/probe.rs @@ -62,7 +62,7 @@ impl super::DataStore { use db::schema::probe::dsl; use db::schema::vpc_subnet::dsl as vpc_subnet_dsl; - let pool = self.pool_connection_authorized(opctx).await?; + let conn = self.pool_connection_authorized(opctx).await?; let probes = match pagparams { PaginatedBy::Id(pagparams) => { @@ -77,7 +77,7 @@ impl super::DataStore { .filter(dsl::project_id.eq(authz_project.id())) .filter(dsl::time_deleted.is_null()) .select(Probe::as_select()) - .load_async(&*pool) + .load_async(&*conn) .await .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; @@ -99,7 +99,7 @@ impl super::DataStore { let db_subnet = vpc_subnet_dsl::vpc_subnet .filter(vpc_subnet_dsl::id.eq(interface.subnet_id)) .select(VpcSubnet::as_select()) - .first_async(&*pool) + .first_async(&*conn) .await .map_err(|e| { public_error_from_diesel(e, ErrorHandler::Server) @@ -126,7 +126,7 @@ impl super::DataStore { &self, opctx: &OpContext, probe: &Probe, - pool: &DataStoreConnection<'_>, + conn: &DataStoreConnection, ) -> LookupResult { use db::schema::vpc_subnet::dsl as vpc_subnet_dsl; @@ -143,7 +143,7 @@ impl super::DataStore { let db_subnet = vpc_subnet_dsl::vpc_subnet .filter(vpc_subnet_dsl::id.eq(interface.subnet_id)) .select(VpcSubnet::as_select()) - .first_async(&**pool) + .first_async(&**conn) .await .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; @@ -172,20 +172,20 @@ impl super::DataStore { ) -> ListResultVec { use db::schema::probe::dsl; - let pool = self.pool_connection_authorized(opctx).await?; + let conn = self.pool_connection_authorized(opctx).await?; let probes = paginated(dsl::probe, dsl::id, pagparams) .filter(dsl::time_deleted.is_null()) .filter(dsl::sled.eq(sled)) .select(Probe::as_select()) - .load_async(&*pool) + .load_async(&*conn) .await .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; let mut result = Vec::with_capacity(probes.len()); for probe in probes.into_iter() { - result.push(self.resolve_probe_info(opctx, &probe, &pool).await?); + result.push(self.resolve_probe_info(opctx, &probe, &conn).await?); } Ok(result) @@ -200,7 +200,7 @@ impl super::DataStore { ) -> LookupResult { use db::schema::probe; use db::schema::probe::dsl; - let pool = self.pool_connection_authorized(opctx).await?; + let conn = self.pool_connection_authorized(opctx).await?; let name_or_id = name_or_id.clone(); @@ -211,7 +211,7 @@ impl super::DataStore { .filter(probe::project_id.eq(authz_project.id())) .select(Probe::as_select()) .limit(1) - .first_async::(&*pool) + .first_async::(&*conn) .await .map_err(|e| { public_error_from_diesel( @@ -227,7 +227,7 @@ impl super::DataStore { .filter(probe::project_id.eq(authz_project.id())) .select(Probe::as_select()) .limit(1) - .first_async::(&*pool) + .first_async::(&*conn) .await .map_err(|e| { public_error_from_diesel( @@ -240,7 +240,7 @@ impl super::DataStore { }), }?; - self.resolve_probe_info(opctx, &probe, &pool).await + self.resolve_probe_info(opctx, &probe, &conn).await } /// Add a probe to the data store. @@ -253,7 +253,7 @@ impl super::DataStore { ) -> CreateResult { //TODO in transaction use db::schema::probe::dsl; - let pool = self.pool_connection_authorized(opctx).await?; + let conn = self.pool_connection_authorized(opctx).await?; let _eip = self .allocate_probe_ephemeral_ip( @@ -306,7 +306,7 @@ impl super::DataStore { let result = diesel::insert_into(dsl::probe) .values(probe.clone()) .returning(Probe::as_returning()) - .get_result_async(&*pool) + .get_result_async(&*conn) .await .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; @@ -322,7 +322,7 @@ impl super::DataStore { ) -> DeleteResult { use db::schema::probe; use db::schema::probe::dsl; - let pool = self.pool_connection_authorized(opctx).await?; + let conn = self.pool_connection_authorized(opctx).await?; let name_or_id = name_or_id.clone(); @@ -334,7 +334,7 @@ impl super::DataStore { .filter(probe::project_id.eq(authz_project.id())) .select(probe::id) .limit(1) - .first_async::(&*pool) + .first_async::(&*conn) .await .map_err(|e| { public_error_from_diesel(e, ErrorHandler::Server) @@ -350,7 +350,7 @@ impl super::DataStore { .filter(dsl::id.eq(id)) .filter(dsl::project_id.eq(authz_project.id())) .set(dsl::time_deleted.eq(Utc::now())) - .execute_async(&*pool) + .execute_async(&*conn) .await .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; diff --git a/nexus/db-queries/src/db/datastore/pub_test_utils.rs b/nexus/db-queries/src/db/datastore/pub_test_utils.rs index 93a172bd15..bcf6a6c80f 100644 --- a/nexus/db-queries/src/db/datastore/pub_test_utils.rs +++ b/nexus/db-queries/src/db/datastore/pub_test_utils.rs @@ -29,7 +29,7 @@ pub async fn datastore_test( use crate::authn; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = Arc::new(db::Pool::new(&logctx.log, &cfg)); + let pool = Arc::new(db::Pool::new_single_host(&logctx.log, &cfg)); let datastore = Arc::new(DataStore::new(&logctx.log, pool, None).await.unwrap()); diff --git a/nexus/db-queries/src/db/explain.rs b/nexus/db-queries/src/db/explain.rs index 24fd993040..52844c204f 100644 --- a/nexus/db-queries/src/db/explain.rs +++ b/nexus/db-queries/src/db/explain.rs @@ -124,8 +124,7 @@ mod test { } async fn create_schema(pool: &db::Pool) { - pool.pool() - .get() + pool.claim() .await .unwrap() .batch_execute_async( @@ -145,8 +144,8 @@ mod test { let logctx = dev::test_setup_log("test_explain_async"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); - let conn = pool.pool().get().await.unwrap(); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); + let conn = pool.claim().await.unwrap(); create_schema(&pool).await; @@ -170,8 +169,8 @@ mod test { let logctx = dev::test_setup_log("test_explain_full_table_scan"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); - let conn = pool.pool().get().await.unwrap(); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); + let conn = pool.claim().await.unwrap(); create_schema(&pool).await; diff --git a/nexus/db-queries/src/db/pagination.rs b/nexus/db-queries/src/db/pagination.rs index 4fc1cf5966..9920440ade 100644 --- a/nexus/db-queries/src/db/pagination.rs +++ b/nexus/db-queries/src/db/pagination.rs @@ -354,7 +354,7 @@ mod test { async fn populate_users(pool: &db::Pool, values: &Vec<(i64, i64)>) { use schema::test_users::dsl; - let conn = pool.pool().get().await.unwrap(); + let conn = pool.claim().await.unwrap(); // The indexes here work around the check that prevents full table // scans. @@ -392,7 +392,7 @@ mod test { pool: &db::Pool, query: BoxedQuery, ) -> Vec { - let conn = pool.pool().get().await.unwrap(); + let conn = pool.claim().await.unwrap(); query.select(User::as_select()).load_async(&*conn).await.unwrap() } @@ -402,7 +402,7 @@ mod test { dev::test_setup_log("test_paginated_single_column_ascending"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); use schema::test_users::dsl; @@ -437,7 +437,7 @@ mod test { dev::test_setup_log("test_paginated_single_column_descending"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); use schema::test_users::dsl; @@ -472,7 +472,7 @@ mod test { dev::test_setup_log("test_paginated_multicolumn_ascending"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); use schema::test_users::dsl; @@ -526,7 +526,7 @@ mod test { dev::test_setup_log("test_paginated_multicolumn_descending"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&logctx.log, &cfg); + let pool = db::Pool::new_single_host(&logctx.log, &cfg); use schema::test_users::dsl; diff --git a/nexus/db-queries/src/db/pool.rs b/nexus/db-queries/src/db/pool.rs index 497c8d97c5..dccee6fa3f 100644 --- a/nexus/db-queries/src/db/pool.rs +++ b/nexus/db-queries/src/db/pool.rs @@ -3,108 +3,155 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. //! Database connection pooling -// This whole thing is a placeholder for prototyping. -// -// TODO-robustness TODO-resilience We will want to carefully think about the -// connection pool that we use and its parameters. It's not clear from the -// survey so far whether an existing module is suitable for our purposes. See -// the Cueball Internals document for details on the sorts of behaviors we'd -// like here. Even if by luck we stick with bb8, we definitely want to think -// through the various parameters. -// -// Notes about bb8's behavior: -// * When the database is completely offline, and somebody wants a connection, -// it still waits for the connection timeout before giving up. That seems -// like not what we want. (To be clear, this is a failure mode where we know -// the database is offline, not one where it's partitioned and we can't tell.) -// * Although the `build_unchecked()` builder allows the pool to start up with -// no connections established (good), it also _seems_ to not establish any -// connections even when it could, resulting in a latency bubble for the first -// operation after startup. That's not what we're looking for. -// // TODO-design Need TLS support (the types below hardcode NoTls). use super::Config as DbConfig; -use async_bb8_diesel::ConnectionError; -use async_bb8_diesel::ConnectionManager; +use crate::db::pool_connection::{DieselPgConnector, DieselPgConnectorArgs}; + +use qorb::backend; +use qorb::policy::Policy; +use qorb::resolver::{AllBackends, Resolver}; +use qorb::resolvers::dns::{DnsResolver, DnsResolverConfig}; +use qorb::service; +use slog::Logger; +use std::collections::BTreeMap; +use std::net::SocketAddr; +use std::sync::Arc; +use tokio::sync::watch; pub use super::pool_connection::DbConnection; +type QorbConnection = async_bb8_diesel::Connection; +type QorbPool = qorb::pool::Pool; + /// Wrapper around a database connection pool. /// /// Expected to be used as the primary interface to the database. pub struct Pool { - pool: bb8::Pool>, + inner: QorbPool, } -impl Pool { - pub fn new(log: &slog::Logger, db_config: &DbConfig) -> Self { - // Make sure diesel-dtrace's USDT probes are enabled. - usdt::register_probes().expect("Failed to register USDT DTrace probes"); - Self::new_builder(log, db_config, bb8::Builder::new()) - } +// Provides an alternative to the DNS resolver for cases where we want to +// contact the database without performing resolution. +struct SingleHostResolver { + tx: watch::Sender, +} - pub fn new_failfast_for_tests( - log: &slog::Logger, - db_config: &DbConfig, - ) -> Self { - Self::new_builder( - log, - db_config, - bb8::Builder::new() - .connection_timeout(std::time::Duration::from_millis(1)), - ) +impl SingleHostResolver { + fn new(config: &DbConfig) -> Self { + let backends = Arc::new(BTreeMap::from([( + backend::Name::new("singleton"), + backend::Backend { address: config.url.address() }, + )])); + let (tx, _rx) = watch::channel(backends.clone()); + Self { tx } } +} - fn new_builder( - log: &slog::Logger, - db_config: &DbConfig, - builder: bb8::Builder>, - ) -> Self { - let url = db_config.url.url(); - let log = log.new(o!( - "database_url" => url.clone(), - "component" => "db::Pool" - )); - info!(&log, "database connection pool"); - let error_sink = LoggingErrorSink::new(log); - let manager = ConnectionManager::::new(url); - let pool = builder - .connection_customizer(Box::new( - super::pool_connection::ConnectionCustomizer::new(), - )) - .error_sink(Box::new(error_sink)) - .build_unchecked(manager); - Pool { pool } +impl Resolver for SingleHostResolver { + fn monitor(&mut self) -> watch::Receiver { + self.tx.subscribe() } +} - /// Returns a reference to the underlying pool. - pub fn pool(&self) -> &bb8::Pool> { - &self.pool - } +fn make_dns_resolver( + bootstrap_dns: Vec, +) -> qorb::resolver::BoxedResolver { + Box::new(DnsResolver::new( + service::Name(internal_dns::ServiceName::Cockroach.srv_name()), + bootstrap_dns, + DnsResolverConfig { + hardcoded_ttl: Some(tokio::time::Duration::MAX), + ..Default::default() + }, + )) } -#[derive(Clone, Debug)] -struct LoggingErrorSink { - log: slog::Logger, +fn make_single_host_resolver( + config: &DbConfig, +) -> qorb::resolver::BoxedResolver { + Box::new(SingleHostResolver::new(config)) } -impl LoggingErrorSink { - fn new(log: slog::Logger) -> LoggingErrorSink { - LoggingErrorSink { log } - } +fn make_postgres_connector( + log: &Logger, +) -> qorb::backend::SharedConnector { + // Create postgres connections. + // + // We're currently relying on the DieselPgConnector doing the following: + // - Disallowing full table scans in its implementation of "on_acquire" + // - Creating async_bb8_diesel connections that also wrap DTraceConnections. + let user = "root"; + let db = "omicron"; + let args = vec![("sslmode", "disable")]; + Arc::new(DieselPgConnector::new( + log, + DieselPgConnectorArgs { user, db, args }, + )) } -impl bb8::ErrorSink for LoggingErrorSink { - fn sink(&self, error: ConnectionError) { - error!( - &self.log, - "database connection error"; - "error_message" => #%error - ); +impl Pool { + /// Creates a new qorb-backed connection pool to the database. + /// + /// Creating this pool does not necessarily wait for connections to become + /// available, as backends may shift over time. + pub fn new(log: &Logger, bootstrap_dns: Vec) -> Self { + // Make sure diesel-dtrace's USDT probes are enabled. + usdt::register_probes().expect("Failed to register USDT DTrace probes"); + + let resolver = make_dns_resolver(bootstrap_dns); + let connector = make_postgres_connector(log); + + let policy = Policy::default(); + Pool { inner: qorb::pool::Pool::new(resolver, connector, policy) } + } + + /// Creates a new qorb-backed connection pool to a single instance of the + /// database. + /// + /// This is intended for tests that want to skip DNS resolution, relying + /// on a single instance of the database. + /// + /// In production, [Self::new] should be preferred. + pub fn new_single_host(log: &Logger, db_config: &DbConfig) -> Self { + // Make sure diesel-dtrace's USDT probes are enabled. + usdt::register_probes().expect("Failed to register USDT DTrace probes"); + + let resolver = make_single_host_resolver(db_config); + let connector = make_postgres_connector(log); + + let policy = Policy::default(); + Pool { inner: qorb::pool::Pool::new(resolver, connector, policy) } + } + + /// Creates a new qorb-backed connection pool which returns an error + /// if claims are not available within one millisecond. + /// + /// This is intended for test-only usage, in particular for tests where + /// claim requests should rapidly return errors when a backend has been + /// intentionally disabled. + #[cfg(any(test, feature = "testing"))] + pub fn new_single_host_failfast( + log: &Logger, + db_config: &DbConfig, + ) -> Self { + // Make sure diesel-dtrace's USDT probes are enabled. + usdt::register_probes().expect("Failed to register USDT DTrace probes"); + + let resolver = make_single_host_resolver(db_config); + let connector = make_postgres_connector(log); + + let policy = Policy { + claim_timeout: tokio::time::Duration::from_millis(1), + ..Default::default() + }; + Pool { inner: qorb::pool::Pool::new(resolver, connector, policy) } } - fn boxed_clone(&self) -> Box> { - Box::new(self.clone()) + /// Returns a connection from the pool + pub async fn claim( + &self, + ) -> anyhow::Result> { + Ok(self.inner.claim().await?) } } diff --git a/nexus/db-queries/src/db/pool_connection.rs b/nexus/db-queries/src/db/pool_connection.rs index dae6a0ee51..9a33370a5a 100644 --- a/nexus/db-queries/src/db/pool_connection.rs +++ b/nexus/db-queries/src/db/pool_connection.rs @@ -4,46 +4,139 @@ //! Customization that happens on each connection as they're acquired. +use anyhow::anyhow; +use async_bb8_diesel::AsyncR2D2Connection; use async_bb8_diesel::AsyncSimpleConnection; -use async_bb8_diesel::Connection; -use async_bb8_diesel::ConnectionError; use async_trait::async_trait; -use bb8::CustomizeConnection; +use diesel::Connection; use diesel::PgConnection; use diesel_dtrace::DTraceConnection; +use qorb::backend::{self, Backend, Error}; +use slog::Logger; +use url::Url; pub type DbConnection = DTraceConnection; pub const DISALLOW_FULL_TABLE_SCAN_SQL: &str = "set disallow_full_table_scans = on; set large_full_scan_rows = 0;"; -/// A customizer for all new connections made to CockroachDB, from Diesel. -#[derive(Debug)] -pub(crate) struct ConnectionCustomizer {} +/// A [backend::Connector] which provides access to [PgConnection]. +pub(crate) struct DieselPgConnector { + log: Logger, + user: String, + db: String, + args: Vec<(String, String)>, +} + +pub(crate) struct DieselPgConnectorArgs<'a> { + pub(crate) user: &'a str, + pub(crate) db: &'a str, + pub(crate) args: Vec<(&'a str, &'a str)>, +} -impl ConnectionCustomizer { - pub(crate) fn new() -> Self { - Self {} +impl DieselPgConnector { + /// Creates a new "connector" to a database, which + /// swaps out the IP address at runtime depending on the selected backend. + /// + /// Format of the url is: + /// + /// - postgresql://{user}@{address}/{db} + /// + /// Or, if arguments are supplied: + /// + /// - postgresql://{user}@{address}/{db}?{args} + pub(crate) fn new(log: &Logger, args: DieselPgConnectorArgs<'_>) -> Self { + let DieselPgConnectorArgs { user, db, args } = args; + Self { + log: log.clone(), + user: user.to_string(), + db: db.to_string(), + args: args + .into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), + } } - async fn disallow_full_table_scans( + fn to_url( &self, - conn: &mut Connection, - ) -> Result<(), ConnectionError> { - conn.batch_execute_async(DISALLOW_FULL_TABLE_SCAN_SQL).await?; - Ok(()) + address: std::net::SocketAddr, + ) -> Result { + let user = &self.user; + let db = &self.db; + let mut url = + Url::parse(&format!("postgresql://{user}@{address}/{db}"))?; + + for (k, v) in &self.args { + url.query_pairs_mut().append_pair(k, v); + } + + Ok(url.as_str().to_string()) } } #[async_trait] -impl CustomizeConnection, ConnectionError> - for ConnectionCustomizer -{ +impl backend::Connector for DieselPgConnector { + type Connection = async_bb8_diesel::Connection; + + async fn connect( + &self, + backend: &Backend, + ) -> Result { + let url = self.to_url(backend.address).map_err(Error::Other)?; + + let conn = tokio::task::spawn_blocking(move || { + let pg_conn = DbConnection::establish(&url) + .map_err(|e| Error::Other(anyhow!(e)))?; + Ok::<_, Error>(async_bb8_diesel::Connection::new(pg_conn)) + }) + .await + .expect("Task panicked establishing connection") + .map_err(|e| { + warn!( + self.log, + "Failed to make connection"; + "error" => e.to_string(), + "backend" => backend.address, + ); + e + })?; + Ok(conn) + } + async fn on_acquire( &self, - conn: &mut Connection, - ) -> Result<(), ConnectionError> { - self.disallow_full_table_scans(conn).await?; + conn: &mut Self::Connection, + ) -> Result<(), Error> { + conn.batch_execute_async(DISALLOW_FULL_TABLE_SCAN_SQL).await.map_err( + |e| { + warn!( + self.log, + "Failed on_acquire execution"; + "error" => e.to_string() + ); + Error::Other(anyhow!(e)) + }, + )?; Ok(()) } + + async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Error> { + let is_broken = conn.is_broken_async().await; + if is_broken { + warn!( + self.log, + "Failed is_valid check; connection known to be broken" + ); + return Err(Error::Other(anyhow!("Connection broken"))); + } + conn.ping_async().await.map_err(|e| { + warn!( + self.log, + "Failed is_valid check; connection failed ping"; + "error" => e.to_string() + ); + Error::Other(anyhow!(e)) + }) + } } diff --git a/nexus/db-queries/src/db/queries/external_ip.rs b/nexus/db-queries/src/db/queries/external_ip.rs index 7ea44b33fb..4d752d451b 100644 --- a/nexus/db-queries/src/db/queries/external_ip.rs +++ b/nexus/db-queries/src/db/queries/external_ip.rs @@ -918,7 +918,8 @@ mod tests { crate::db::datastore::test_utils::datastore_test(&logctx, &db) .await; let cfg = crate::db::Config { url: db.pg_config().clone() }; - let pool = Arc::new(crate::db::Pool::new(&logctx.log, &cfg)); + let pool = + Arc::new(crate::db::Pool::new_single_host(&logctx.log, &cfg)); let db_datastore = Arc::new( crate::db::DataStore::new(&logctx.log, Arc::clone(&pool), None) .await diff --git a/nexus/db-queries/src/db/queries/next_item.rs b/nexus/db-queries/src/db/queries/next_item.rs index 769c891349..658d151a5b 100644 --- a/nexus/db-queries/src/db/queries/next_item.rs +++ b/nexus/db-queries/src/db/queries/next_item.rs @@ -616,7 +616,7 @@ mod tests { } async fn setup_test_schema(pool: &db::Pool) { - let connection = pool.pool().get().await.unwrap(); + let connection = pool.claim().await.unwrap(); (*connection) .batch_execute_async( "CREATE SCHEMA IF NOT EXISTS test_schema; \ @@ -708,8 +708,9 @@ mod tests { let log = logctx.log.new(o!()); let mut db = test_setup_database(&log).await; let cfg = crate::db::Config { url: db.pg_config().clone() }; - let pool = Arc::new(crate::db::Pool::new(&logctx.log, &cfg)); - let conn = pool.pool().get().await.unwrap(); + let pool = + Arc::new(crate::db::Pool::new_single_host(&logctx.log, &cfg)); + let conn = pool.claim().await.unwrap(); // We're going to operate on a separate table, for simplicity. setup_test_schema(&pool).await; @@ -770,8 +771,9 @@ mod tests { let log = logctx.log.new(o!()); let mut db = test_setup_database(&log).await; let cfg = crate::db::Config { url: db.pg_config().clone() }; - let pool = Arc::new(crate::db::Pool::new(&logctx.log, &cfg)); - let conn = pool.pool().get().await.unwrap(); + let pool = + Arc::new(crate::db::Pool::new_single_host(&logctx.log, &cfg)); + let conn = pool.claim().await.unwrap(); // We're going to operate on a separate table, for simplicity. setup_test_schema(&pool).await; diff --git a/nexus/db-queries/src/db/queries/region_allocation.rs b/nexus/db-queries/src/db/queries/region_allocation.rs index 7cf378d53b..dbf37fda2e 100644 --- a/nexus/db-queries/src/db/queries/region_allocation.rs +++ b/nexus/db-queries/src/db/queries/region_allocation.rs @@ -507,8 +507,8 @@ mod test { let log = logctx.log.new(o!()); let mut db = test_setup_database(&log).await; let cfg = crate::db::Config { url: db.pg_config().clone() }; - let pool = crate::db::Pool::new(&logctx.log, &cfg); - let conn = pool.pool().get().await.unwrap(); + let pool = crate::db::Pool::new_single_host(&logctx.log, &cfg); + let conn = pool.claim().await.unwrap(); let volume_id = Uuid::new_v4(); let params = RegionParameters { diff --git a/nexus/db-queries/src/db/queries/virtual_provisioning_collection_update.rs b/nexus/db-queries/src/db/queries/virtual_provisioning_collection_update.rs index 902d955a79..9d2ed04c85 100644 --- a/nexus/db-queries/src/db/queries/virtual_provisioning_collection_update.rs +++ b/nexus/db-queries/src/db/queries/virtual_provisioning_collection_update.rs @@ -568,8 +568,8 @@ mod test { let log = logctx.log.new(o!()); let mut db = test_setup_database(&log).await; let cfg = crate::db::Config { url: db.pg_config().clone() }; - let pool = crate::db::Pool::new(&logctx.log, &cfg); - let conn = pool.pool().get().await.unwrap(); + let pool = crate::db::Pool::new_single_host(&logctx.log, &cfg); + let conn = pool.claim().await.unwrap(); let id = Uuid::nil(); let project_id = Uuid::nil(); @@ -597,8 +597,8 @@ mod test { let log = logctx.log.new(o!()); let mut db = test_setup_database(&log).await; let cfg = crate::db::Config { url: db.pg_config().clone() }; - let pool = crate::db::Pool::new(&logctx.log, &cfg); - let conn = pool.pool().get().await.unwrap(); + let pool = crate::db::Pool::new_single_host(&logctx.log, &cfg); + let conn = pool.claim().await.unwrap(); let id = Uuid::nil(); let project_id = Uuid::nil(); @@ -624,8 +624,8 @@ mod test { let log = logctx.log.new(o!()); let mut db = test_setup_database(&log).await; let cfg = crate::db::Config { url: db.pg_config().clone() }; - let pool = crate::db::Pool::new(&logctx.log, &cfg); - let conn = pool.pool().get().await.unwrap(); + let pool = crate::db::Pool::new_single_host(&logctx.log, &cfg); + let conn = pool.claim().await.unwrap(); let id = InstanceUuid::nil(); let project_id = Uuid::nil(); @@ -650,8 +650,8 @@ mod test { let log = logctx.log.new(o!()); let mut db = test_setup_database(&log).await; let cfg = crate::db::Config { url: db.pg_config().clone() }; - let pool = crate::db::Pool::new(&logctx.log, &cfg); - let conn = pool.pool().get().await.unwrap(); + let pool = crate::db::Pool::new_single_host(&logctx.log, &cfg); + let conn = pool.claim().await.unwrap(); let id = InstanceUuid::nil(); let project_id = Uuid::nil(); diff --git a/nexus/db-queries/src/db/queries/vpc_subnet.rs b/nexus/db-queries/src/db/queries/vpc_subnet.rs index 8cbf4495ca..85c771c050 100644 --- a/nexus/db-queries/src/db/queries/vpc_subnet.rs +++ b/nexus/db-queries/src/db/queries/vpc_subnet.rs @@ -313,8 +313,9 @@ mod test { let log = logctx.log.new(o!()); let mut db = test_setup_database(&log).await; let cfg = crate::db::Config { url: db.pg_config().clone() }; - let pool = Arc::new(crate::db::Pool::new(&logctx.log, &cfg)); - let conn = pool.pool().get().await.unwrap(); + let pool = + Arc::new(crate::db::Pool::new_single_host(&logctx.log, &cfg)); + let conn = pool.claim().await.unwrap(); let explain = query.explain_async(&conn).await.unwrap(); println!("{explain}"); db.cleanup().await.unwrap(); @@ -352,7 +353,8 @@ mod test { let log = logctx.log.new(o!()); let mut db = test_setup_database(&log).await; let cfg = crate::db::Config { url: db.pg_config().clone() }; - let pool = Arc::new(crate::db::Pool::new(&logctx.log, &cfg)); + let pool = + Arc::new(crate::db::Pool::new_single_host(&logctx.log, &cfg)); let db_datastore = Arc::new( crate::db::DataStore::new(&log, Arc::clone(&pool), None) .await @@ -544,7 +546,8 @@ mod test { let log = logctx.log.new(o!()); let mut db = test_setup_database(&log).await; let cfg = crate::db::Config { url: db.pg_config().clone() }; - let pool = Arc::new(crate::db::Pool::new(&logctx.log, &cfg)); + let pool = + Arc::new(crate::db::Pool::new_single_host(&logctx.log, &cfg)); let db_datastore = Arc::new( crate::db::DataStore::new(&log, Arc::clone(&pool), None) .await diff --git a/nexus/src/app/background/tasks/saga_recovery.rs b/nexus/src/app/background/tasks/saga_recovery.rs index 7b0fe1b331..42069ac4ed 100644 --- a/nexus/src/app/background/tasks/saga_recovery.rs +++ b/nexus/src/app/background/tasks/saga_recovery.rs @@ -517,7 +517,7 @@ mod test { ) -> (dev::db::CockroachInstance, Arc) { let db = test_setup_database(&log).await; let cfg = nexus_db_queries::db::Config { url: db.pg_config().clone() }; - let pool = Arc::new(db::Pool::new(log, &cfg)); + let pool = Arc::new(db::Pool::new_single_host(log, &cfg)); let db_datastore = Arc::new( db::DataStore::new(&log, Arc::clone(&pool), None).await.unwrap(), ); diff --git a/nexus/src/bin/schema-updater.rs b/nexus/src/bin/schema-updater.rs index 7fe1ed84a4..4a43698f00 100644 --- a/nexus/src/bin/schema-updater.rs +++ b/nexus/src/bin/schema-updater.rs @@ -71,7 +71,7 @@ async fn main() -> anyhow::Result<()> { let log = Logger::root(drain, slog::o!("unit" => "schema_updater")); let crdb_cfg = db::Config { url: args.url }; - let pool = Arc::new(db::Pool::new(&log, &crdb_cfg)); + let pool = Arc::new(db::Pool::new_single_host(&log, &crdb_cfg)); let schema_config = SchemaConfig { schema_dir: args.schema_directory }; let all_versions = AllSchemaVersions::load(&schema_config.schema_dir)?; diff --git a/nexus/src/context.rs b/nexus/src/context.rs index 95d69e0c88..8cb696c62f 100644 --- a/nexus/src/context.rs +++ b/nexus/src/context.rs @@ -11,9 +11,7 @@ use authn::external::token::HttpAuthnToken; use authn::external::HttpAuthnScheme; use camino::Utf8PathBuf; use chrono::Duration; -use internal_dns::ServiceName; use nexus_config::NexusConfig; -use nexus_config::PostgresConfigWithUrl; use nexus_config::SchemeName; use nexus_db_queries::authn::external::session_cookie::SessionStore; use nexus_db_queries::authn::ConsoleSessionWithSiloId; @@ -25,7 +23,6 @@ use oximeter::types::ProducerRegistry; use oximeter_instruments::http::{HttpService, LatencyTracker}; use slog::Logger; use std::env; -use std::str::FromStr; use std::sync::Arc; use uuid::Uuid; @@ -210,7 +207,7 @@ impl ServerContext { // nexus in dev for everyone // Set up DNS Client - let resolver = match config.deployment.internal_dns { + let (resolver, dns_addrs) = match config.deployment.internal_dns { nexus_config::InternalDns::FromSubnet { subnet } => { let az_subnet = Ipv6Subnet::::new(subnet.net().addr()); @@ -219,11 +216,21 @@ impl ServerContext { "Setting up resolver using DNS servers for subnet: {:?}", az_subnet ); - internal_dns::resolver::Resolver::new_from_subnet( - log.new(o!("component" => "DnsResolver")), - az_subnet, + let resolver = + internal_dns::resolver::Resolver::new_from_subnet( + log.new(o!("component" => "DnsResolver")), + az_subnet, + ) + .map_err(|e| { + format!("Failed to create DNS resolver: {}", e) + })?; + + ( + resolver, + internal_dns::resolver::Resolver::servers_from_subnet( + az_subnet, + ), ) - .map_err(|e| format!("Failed to create DNS resolver: {}", e))? } nexus_config::InternalDns::FromAddress { address } => { info!( @@ -231,56 +238,33 @@ impl ServerContext { "Setting up resolver using DNS address: {:?}", address ); - internal_dns::resolver::Resolver::new_from_addrs( - log.new(o!("component" => "DnsResolver")), - &[address], - ) - .map_err(|e| format!("Failed to create DNS resolver: {}", e))? + let resolver = + internal_dns::resolver::Resolver::new_from_addrs( + log.new(o!("component" => "DnsResolver")), + &[address], + ) + .map_err(|e| { + format!("Failed to create DNS resolver: {}", e) + })?; + + (resolver, vec![address]) } }; - // Set up DB pool - let url = match &config.deployment.database { - nexus_config::Database::FromUrl { url } => url.clone(), + let pool = match &config.deployment.database { + nexus_config::Database::FromUrl { url } => { + info!(log, "Setting up qorb pool from a single host"; "url" => #?url); + db::Pool::new_single_host( + &log, + &db::Config { url: url.clone() }, + ) + } nexus_config::Database::FromDns => { - info!(log, "Accessing DB url from DNS"); - // It's been requested but unfortunately not supported to - // directly connect using SRV based lookup. - // TODO-robustness: the set of cockroachdb hosts we'll use will - // be fixed to whatever we got back from DNS at Nexus start. - // This means a new cockroachdb instance won't picked up until - // Nexus restarts. - let addrs = loop { - match resolver - .lookup_all_socket_v6(ServiceName::Cockroach) - .await - { - Ok(addrs) => break addrs, - Err(e) => { - warn!( - log, - "Failed to lookup cockroach addresses: {e}" - ); - tokio::time::sleep(std::time::Duration::from_secs( - 1, - )) - .await; - } - } - }; - let addrs_str = addrs - .iter() - .map(ToString::to_string) - .collect::>() - .join(","); - info!(log, "DB addresses: {}", addrs_str); - PostgresConfigWithUrl::from_str(&format!( - "postgresql://root@{addrs_str}/omicron?sslmode=disable", - )) - .map_err(|e| format!("Cannot parse Postgres URL: {}", e))? + info!(log, "Setting up qorb pool from DNS"; "dns_addrs" => #?dns_addrs); + db::Pool::new(&log, dns_addrs) } }; - let pool = db::Pool::new(&log, &db::Config { url }); + let nexus = Nexus::new_with_id( rack_id, log.new(o!("component" => "nexus")), diff --git a/nexus/src/populate.rs b/nexus/src/populate.rs index 4fcb126356..f026b1b504 100644 --- a/nexus/src/populate.rs +++ b/nexus/src/populate.rs @@ -380,7 +380,7 @@ mod test { let logctx = dev::test_setup_log("test_populator"); let mut db = test_setup_database(&logctx.log).await; let cfg = db::Config { url: db.pg_config().clone() }; - let pool = Arc::new(db::Pool::new(&logctx.log, &cfg)); + let pool = Arc::new(db::Pool::new_single_host(&logctx.log, &cfg)); let datastore = Arc::new( db::DataStore::new(&logctx.log, pool, None).await.unwrap(), ); @@ -422,19 +422,13 @@ mod test { }) .unwrap(); - // Test again with the database offline. In principle we could do this - // immediately without creating a new pool and datastore. However, the - // pool's default behavior is to wait 30 seconds for a connection, which - // makes this test take a long time. (See the note in - // nexus/src/db/pool.rs about this.) So let's create a pool with an - // arbitrarily short timeout now. (We wouldn't want to do this above - // because we do want to wait a bit when we expect things to work, in - // case the test system is busy.) + // Test again with the database offline. In principle we could do this + // immediately without creating a new pool and datastore. // - // Anyway, if we try again with a broken database, we should get a + // If we try again with a broken database, we should get a // ServiceUnavailable error, which indicates a transient failure. let pool = - Arc::new(db::Pool::new_failfast_for_tests(&logctx.log, &cfg)); + Arc::new(db::Pool::new_single_host_failfast(&logctx.log, &cfg)); // We need to create the datastore before tearing down the database, as // it verifies the schema version of the DB while booting. let datastore = Arc::new( diff --git a/nexus/tests/integration_tests/schema.rs b/nexus/tests/integration_tests/schema.rs index bf73855ea7..5201b5c971 100644 --- a/nexus/tests/integration_tests/schema.rs +++ b/nexus/tests/integration_tests/schema.rs @@ -954,12 +954,12 @@ async fn dbinit_equals_sum_of_all_up() { // Create a connection pool after we apply the first schema version but // before applying the rest, and grab a connection from that pool. We'll use // it for an extra check later. - let pool = nexus_db_queries::db::Pool::new( + let pool = nexus_db_queries::db::Pool::new_single_host( log, &nexus_db_queries::db::Config { url: crdb.pg_config().clone() }, ); let conn_from_pool = - pool.pool().get().await.expect("failed to get pooled connection"); + pool.claim().await.expect("failed to get pooled connection"); // Go from the second version to the latest version. for version in all_versions.iter_versions().skip(1) { diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index edb92c8c77..746a0bd3ab 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -109,7 +109,7 @@ syn-f595c2ba2a3f28df = { package = "syn", version = "2.0.74", features = ["extra time = { version = "0.3.36", features = ["formatting", "local-offset", "macros", "parsing"] } tokio = { version = "1.39.3", features = ["full", "test-util"] } tokio-postgres = { version = "0.7.11", features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] } -tokio-stream = { version = "0.1.15", features = ["net"] } +tokio-stream = { version = "0.1.15", features = ["net", "sync"] } tokio-util = { version = "0.7.11", features = ["codec", "io-util"] } toml = { version = "0.7.8" } toml_datetime = { version = "0.6.8", default-features = false, features = ["serde"] } @@ -219,7 +219,7 @@ time = { version = "0.3.36", features = ["formatting", "local-offset", "macros", time-macros = { version = "0.2.18", default-features = false, features = ["formatting", "parsing"] } tokio = { version = "1.39.3", features = ["full", "test-util"] } tokio-postgres = { version = "0.7.11", features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] } -tokio-stream = { version = "0.1.15", features = ["net"] } +tokio-stream = { version = "0.1.15", features = ["net", "sync"] } tokio-util = { version = "0.7.11", features = ["codec", "io-util"] } toml = { version = "0.7.8" } toml_datetime = { version = "0.6.8", default-features = false, features = ["serde"] }