Skip to content

Commit

Permalink
Update to qorb 0.2.0, now with USDT probes! (#6970)
Browse files Browse the repository at this point in the history
  • Loading branch information
bnaecker authored Oct 31, 2024
1 parent d1ff1bb commit e143846
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 42 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ propolis_api_types = { git = "https://github.com/oxidecomputer/propolis", rev =
propolis-client = { git = "https://github.com/oxidecomputer/propolis", rev = "86101eaf80b55e7f405b5cafe9b0de0e9f331656" }
propolis-mock-server = { git = "https://github.com/oxidecomputer/propolis", rev = "86101eaf80b55e7f405b5cafe9b0de0e9f331656" }
proptest = "1.5.0"
qorb = "0.1.2"
qorb = "0.2.0"
quote = "1.0"
rand = "0.8.5"
rand_core = "0.6.4"
Expand Down
48 changes: 33 additions & 15 deletions nexus/db-queries/src/db/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,18 @@ impl Pool {

let resolver = resolver.for_service(ServiceName::Cockroach);
let connector = make_postgres_connector(log);

let policy = Policy::default();
Pool {
inner: qorb::pool::Pool::new(resolver, connector, policy),
terminated: std::sync::atomic::AtomicBool::new(false),
}
let inner = match qorb::pool::Pool::new(resolver, connector, policy) {
Ok(pool) => {
debug!(log, "registered USDT probes");
pool
}
Err(err) => {
error!(log, "failed to register USDT probes");
err.into_inner()
}
};
Pool { inner, terminated: std::sync::atomic::AtomicBool::new(false) }
}

/// Creates a new qorb-backed connection pool to a single instance of the
Expand All @@ -110,12 +116,18 @@ impl Pool {

let resolver = make_single_host_resolver(db_config);
let connector = make_postgres_connector(log);

let policy = Policy::default();
Pool {
inner: qorb::pool::Pool::new(resolver, connector, policy),
terminated: std::sync::atomic::AtomicBool::new(false),
}
let inner = match qorb::pool::Pool::new(resolver, connector, policy) {
Ok(pool) => {
debug!(log, "registered USDT probes");
pool
}
Err(err) => {
error!(log, "failed to register USDT probes");
err.into_inner()
}
};
Pool { inner, terminated: std::sync::atomic::AtomicBool::new(false) }
}

/// Creates a new qorb-backed connection pool which returns an error
Expand All @@ -134,15 +146,21 @@ impl Pool {

let resolver = make_single_host_resolver(db_config);
let connector = make_postgres_connector(log);

let policy = Policy {
claim_timeout: tokio::time::Duration::from_millis(1),
..Default::default()
};
Pool {
inner: qorb::pool::Pool::new(resolver, connector, policy),
terminated: std::sync::atomic::AtomicBool::new(false),
}
let inner = match qorb::pool::Pool::new(resolver, connector, policy) {
Ok(pool) => {
debug!(log, "registered USDT probes");
pool
}
Err(err) => {
error!(log, "failed to register USDT probes");
err.into_inner()
}
};
Pool { inner, terminated: std::sync::atomic::AtomicBool::new(false) }
}

/// Returns a connection from the pool
Expand Down
7 changes: 5 additions & 2 deletions nexus/src/app/sagas/common_storage/pantry_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,12 @@ impl backend::Connector for PantryConnector {
pub(crate) fn make_pantry_connection_pool(
qorb_resolver: &QorbResolver,
) -> pool::Pool<PooledPantryClient> {
pool::Pool::new(
match pool::Pool::new(
qorb_resolver.for_service(ServiceName::CruciblePantry),
Arc::new(PantryConnector),
qorb::policy::Policy::default(),
)
) {
Ok(pool) => pool,
Err(e) => e.into_inner(),
}
}
13 changes: 11 additions & 2 deletions oximeter/collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,11 +344,20 @@ impl Oximeter {
))
};

qorb::pool::Pool::new(
match qorb::pool::Pool::new(
nexus_resolver,
Arc::new(NexusConnector { log: log.clone() }),
qorb::policy::Policy::default(),
)
) {
Ok(pool) => {
debug!(log, "registered USDT probes");
pool
}
Err(err) => {
error!(log, "failed to register USDT probes");
err.into_inner()
}
}
};

let notify_nexus = || async {
Expand Down
64 changes: 46 additions & 18 deletions oximeter/db/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,21 +193,39 @@ impl Client {
));
let schema = Mutex::new(BTreeMap::new());
let request_timeout = DEFAULT_REQUEST_TIMEOUT;
let pool = match Pool::new(
http_resolver,
Arc::new(ReqwestConnector {}),
qorb::policy::Policy::default(),
) {
Ok(pool) => {
debug!(log, "registered USDT probes");
pool
}
Err(err) => {
error!(log, "failed to register USDT probes");
err.into_inner()
}
};
let native_pool = match Pool::new(
native_resolver,
Arc::new(native::connection::Connector),
qorb::policy::Policy::default(),
) {
Ok(pool) => {
debug!(log, "registered USDT probes");
pool
}
Err(err) => {
error!(log, "failed to register USDT probes");
err.into_inner()
}
};
Self {
_id: id,
log,
source: ClientSource::Pool {
pool: DebugIgnore(Pool::new(
http_resolver,
Arc::new(ReqwestConnector {}),
qorb::policy::Policy::default(),
)),
},
native_pool: DebugIgnore(Pool::new(
native_resolver,
Arc::new(native::connection::Connector),
Default::default(),
)),
source: ClientSource::Pool { pool: DebugIgnore(pool) },
native_pool: DebugIgnore(native_pool),
schema,
request_timeout,
}
Expand Down Expand Up @@ -243,15 +261,25 @@ impl Client {
let client = reqwest::Client::new();
let url = format!("http://{}", http_address);
let schema = Mutex::new(BTreeMap::new());
let native_pool = match Pool::new(
Box::new(SingleHostResolver::new(native_address)),
Arc::new(native::connection::Connector),
Default::default(),
) {
Ok(pool) => {
debug!(log, "registered USDT probes");
pool
}
Err(err) => {
error!(log, "failed to register USDT probes");
err.into_inner()
}
};
Self {
_id: id,
log,
source: ClientSource::Static(ReqwestClient { url, client }),
native_pool: DebugIgnore(Pool::new(
Box::new(SingleHostResolver::new(native_address)),
Arc::new(native::connection::Connector),
Default::default(),
)),
native_pool: DebugIgnore(native_pool),
schema,
request_timeout,
}
Expand Down Expand Up @@ -1787,7 +1815,7 @@ mod tests {
.ping()
.await
.expect_err("Should fail to ping non-existent server");
let Error::Connection(qorb::pool::Error::TimedOut) = &e else {
let Error::Connection(_) = &e else {
panic!("Expected connection error, found {e:?}");
};
logctx.cleanup_successful();
Expand Down
4 changes: 2 additions & 2 deletions workspace-hack/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pkcs8 = { version = "0.10.2", default-features = false, features = ["encryption"
postgres-types = { version = "0.2.8", default-features = false, features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] }
predicates = { version = "3.1.2" }
proc-macro2 = { version = "1.0.87" }
qorb = { version = "0.1.2", features = ["qtop"] }
qorb = { version = "0.2.0", features = ["qtop"] }
quote = { version = "1.0.37" }
rand = { version = "0.8.5", features = ["small_rng"] }
regex = { version = "1.11.0" }
Expand Down Expand Up @@ -207,7 +207,7 @@ pkcs8 = { version = "0.10.2", default-features = false, features = ["encryption"
postgres-types = { version = "0.2.8", default-features = false, features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] }
predicates = { version = "3.1.2" }
proc-macro2 = { version = "1.0.87" }
qorb = { version = "0.1.2", features = ["qtop"] }
qorb = { version = "0.2.0", features = ["qtop"] }
quote = { version = "1.0.37" }
rand = { version = "0.8.5", features = ["small_rng"] }
regex = { version = "1.11.0" }
Expand Down

0 comments on commit e143846

Please sign in to comment.