diff --git a/nexus/db-queries/src/db/datastore/address_lot.rs b/nexus/db-queries/src/db/datastore/address_lot.rs index acef27e363..5c2ffbf1d0 100644 --- a/nexus/db-queries/src/db/datastore/address_lot.rs +++ b/nexus/db-queries/src/db/datastore/address_lot.rs @@ -13,10 +13,9 @@ use crate::db::error::TransactionError; use crate::db::model::Name; use crate::db::model::{AddressLot, AddressLotBlock, AddressLotReservedBlock}; use crate::db::pagination::paginated; -use crate::transaction_retry::RetryHelper; -use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl, Connection}; +use crate::transaction_retry::OptionalError; +use async_bb8_diesel::{AsyncRunQueryDsl, Connection}; use chrono::Utc; -use diesel::result::Error as DieselError; use diesel::{ExpressionMethods, QueryDsl, SelectableHelper}; use diesel_dtrace::DTraceConnection; use ipnetwork::IpNetwork; @@ -29,7 +28,6 @@ use omicron_common::api::external::{ }; use ref_cast::RefCast; use serde::{Deserialize, Serialize}; -use std::sync::{Arc, OnceLock}; use uuid::Uuid; #[derive(Clone, Debug, Deserialize, Serialize)] @@ -115,16 +113,12 @@ impl DataStore { LotInUse, } - let err = Arc::new(OnceLock::new()); - let retry_helper = RetryHelper::new( - &self.transaction_retry_producer, - "address_lot_delete", - ); + let err = OptionalError::new(); // TODO https://github.com/oxidecomputer/omicron/issues/2811 // Audit external networking database transaction usage - conn.transaction_async_with_retry( - |conn| { + self.transaction_retry_wrapper("address_lot_delete") + .transaction(&conn, |conn| { let err = err.clone(); async move { let rsvd: Vec = @@ -136,8 +130,7 @@ impl DataStore { .await?; if !rsvd.is_empty() { - err.set(AddressLotDeleteError::LotInUse).unwrap(); - return Err(DieselError::RollbackTransaction); + return Err(err.bail(AddressLotDeleteError::LotInUse)); } let now = Utc::now(); @@ -155,21 +148,19 @@ impl DataStore { Ok(()) } - }, - retry_helper.as_callback(), - ) - .await - .map_err(|e| { - if let Some(err) = err.get() { - match err { - AddressLotDeleteError::LotInUse => { - Error::invalid_request("lot is in use") + }) + .await + .map_err(|e| { + if let Some(err) = err.take() { + match err { + AddressLotDeleteError::LotInUse => { + Error::invalid_request("lot is in use") + } } + } else { + public_error_from_diesel(e, ErrorHandler::Server) } - } else { - public_error_from_diesel(e, ErrorHandler::Server) - } - }) + }) } pub async fn address_lot_list( diff --git a/nexus/db-queries/src/db/datastore/bgp.rs b/nexus/db-queries/src/db/datastore/bgp.rs index 3d8ff1a68f..28075b0ded 100644 --- a/nexus/db-queries/src/db/datastore/bgp.rs +++ b/nexus/db-queries/src/db/datastore/bgp.rs @@ -2,15 +2,13 @@ use super::DataStore; use crate::context::OpContext; use crate::db; use crate::db::error::public_error_from_diesel; -use crate::db::error::retryable; use crate::db::error::ErrorHandler; use crate::db::model::Name; use crate::db::model::{BgpAnnounceSet, BgpAnnouncement, BgpConfig}; use crate::db::pagination::paginated; -use crate::transaction_retry::RetryHelper; -use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl}; +use crate::transaction_retry::OptionalError; +use async_bb8_diesel::AsyncRunQueryDsl; use chrono::Utc; -use diesel::result::Error as DieselError; use diesel::{ExpressionMethods, QueryDsl, SelectableHelper}; use nexus_types::external_api::params; use nexus_types::identity::Resource; @@ -20,7 +18,6 @@ use omicron_common::api::external::{ ResourceType, }; use ref_cast::RefCast; -use std::sync::{Arc, OnceLock}; use uuid::Uuid; impl DataStore { @@ -33,14 +30,9 @@ impl DataStore { use db::schema::{ bgp_announce_set, bgp_announce_set::dsl as announce_set_dsl, }; - let pool = self.pool_connection_authorized(opctx).await?; - - let retry_helper = RetryHelper::new( - &self.transaction_retry_producer, - "bgp_config_set", - ); - pool.transaction_async_with_retry( - |conn| async move { + let conn = self.pool_connection_authorized(opctx).await?; + self.transaction_retry_wrapper("bgp_config_set") + .transaction(&conn, |conn| async move { let id: Uuid = match &config.bgp_announce_set_id { NameOrId::Name(name) => { announce_set_dsl::bgp_announce_set @@ -62,11 +54,9 @@ impl DataStore { .get_result_async(&conn) .await?; Ok(result) - }, - retry_helper.as_callback(), - ) - .await - .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + }) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) } pub async fn bgp_config_delete( @@ -85,62 +75,58 @@ impl DataStore { ConfigInUse, } - let err = Arc::new(OnceLock::new()); - let retry_helper = RetryHelper::new( - &self.transaction_retry_producer, - "bgp_config_delete", - ); - let pool = self.pool_connection_authorized(opctx).await?; - pool.transaction_async_with_retry(|conn| { - let err = err.clone(); - async move { - let name_or_id = sel.name_or_id.clone(); - - let id: Uuid = match name_or_id { - NameOrId::Id(id) => id, - NameOrId::Name(name) => { - bgp_config_dsl::bgp_config - .filter(bgp_config::name.eq(name.to_string())) - .select(bgp_config::id) - .limit(1) - .first_async::(&conn) - .await? + let err = OptionalError::new(); + let conn = self.pool_connection_authorized(opctx).await?; + self.transaction_retry_wrapper("bgp_config_delete") + .transaction(&conn, |conn| { + let err = err.clone(); + async move { + let name_or_id = sel.name_or_id.clone(); + + let id: Uuid = match name_or_id { + NameOrId::Id(id) => id, + NameOrId::Name(name) => { + bgp_config_dsl::bgp_config + .filter(bgp_config::name.eq(name.to_string())) + .select(bgp_config::id) + .limit(1) + .first_async::(&conn) + .await? + } + }; + + let count = + sps_bgp_peer_config_dsl::switch_port_settings_bgp_peer_config + .filter(sps_bgp_peer_config::bgp_config_id.eq(id)) + .count() + .execute_async(&conn) + .await?; + + if count > 0 { + return Err(err.bail(BgpConfigDeleteError::ConfigInUse)); } - }; - let count = - sps_bgp_peer_config_dsl::switch_port_settings_bgp_peer_config - .filter(sps_bgp_peer_config::bgp_config_id.eq(id)) - .count() + diesel::update(bgp_config_dsl::bgp_config) + .filter(bgp_config_dsl::id.eq(id)) + .set(bgp_config_dsl::time_deleted.eq(Utc::now())) .execute_async(&conn) .await?; - if count > 0 { - err.set(BgpConfigDeleteError::ConfigInUse).unwrap(); - return Err(DieselError::RollbackTransaction); + Ok(()) } - - diesel::update(bgp_config_dsl::bgp_config) - .filter(bgp_config_dsl::id.eq(id)) - .set(bgp_config_dsl::time_deleted.eq(Utc::now())) - .execute_async(&conn) - .await?; - - Ok(()) - } - }, retry_helper.as_callback()) - .await - .map_err(|e| { - if let Some(err) = err.get() { - match err { - BgpConfigDeleteError::ConfigInUse => { - Error::invalid_request("BGP config in use") + }) + .await + .map_err(|e| { + if let Some(err) = err.take() { + match err { + BgpConfigDeleteError::ConfigInUse => { + Error::invalid_request("BGP config in use") + } } + } else { + public_error_from_diesel(e, ErrorHandler::Server) } - } else { - public_error_from_diesel(e, ErrorHandler::Server) - } - }) + }) } pub async fn bgp_config_get( @@ -150,7 +136,7 @@ impl DataStore { ) -> LookupResult { use db::schema::bgp_config; use db::schema::bgp_config::dsl; - let pool = self.pool_connection_authorized(opctx).await?; + let conn = self.pool_connection_authorized(opctx).await?; let name_or_id = name_or_id.clone(); @@ -159,14 +145,14 @@ impl DataStore { .filter(bgp_config::name.eq(name.to_string())) .select(BgpConfig::as_select()) .limit(1) - .first_async::(&*pool) + .first_async::(&*conn) .await .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)), NameOrId::Id(id) => dsl::bgp_config .filter(bgp_config::id.eq(id)) .select(BgpConfig::as_select()) .limit(1) - .first_async::(&*pool) + .first_async::(&*conn) .await .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)), }?; @@ -181,7 +167,7 @@ impl DataStore { ) -> ListResultVec { use db::schema::bgp_config::dsl; - let pool = self.pool_connection_authorized(opctx).await?; + let conn = self.pool_connection_authorized(opctx).await?; match pagparams { PaginatedBy::Id(pagparams) => { @@ -195,7 +181,7 @@ impl DataStore { } .filter(dsl::time_deleted.is_null()) .select(BgpConfig::as_select()) - .load_async(&*pool) + .load_async(&*conn) .await .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) } @@ -215,37 +201,38 @@ impl DataStore { AnnounceSetNotFound(Name), } - let err = Arc::new(OnceLock::new()); - let retry_helper = RetryHelper::new( - &self.transaction_retry_producer, - "bgp_announce_list", - ); - let pool = self.pool_connection_authorized(opctx).await?; - pool.transaction_async_with_retry( - |conn| { + let err = OptionalError::new(); + let conn = self.pool_connection_authorized(opctx).await?; + self.transaction_retry_wrapper("bgp_announce_list") + .transaction(&conn, |conn| { let err = err.clone(); async move { let name_or_id = sel.name_or_id.clone(); let announce_id: Uuid = match name_or_id { - NameOrId::Id(id) => id, - NameOrId::Name(name) => announce_set_dsl::bgp_announce_set - .filter(bgp_announce_set::time_deleted.is_null()) - .filter(bgp_announce_set::name.eq(name.to_string())) - .select(bgp_announce_set::id) - .limit(1) - .first_async::(&conn) - .await - .map_err(|e| { - if retryable(&e) { - return e; - } - err.set(BgpAnnounceListError::AnnounceSetNotFound( - Name::from(name.clone()), - )).unwrap(); - DieselError::RollbackTransaction - })?, - }; + NameOrId::Id(id) => id, + NameOrId::Name(name) => { + announce_set_dsl::bgp_announce_set + .filter( + bgp_announce_set::time_deleted.is_null(), + ) + .filter( + bgp_announce_set::name.eq(name.to_string()), + ) + .select(bgp_announce_set::id) + .limit(1) + .first_async::(&conn) + .await + .map_err(|e| { + err.bail_retryable_or( + e, + BgpAnnounceListError::AnnounceSetNotFound( + Name::from(name.clone()), + ) + ) + })? + } + }; let result = announce_dsl::bgp_announcement .filter(announce_dsl::announce_set_id.eq(announce_id)) @@ -255,24 +242,22 @@ impl DataStore { Ok(result) } - }, - retry_helper.as_callback(), - ) - .await - .map_err(|e| { - if let Some(err) = err.get() { - match err { - BgpAnnounceListError::AnnounceSetNotFound(name) => { - Error::not_found_by_name( - ResourceType::BgpAnnounceSet, - &name, - ) + }) + .await + .map_err(|e| { + if let Some(err) = err.take() { + match err { + BgpAnnounceListError::AnnounceSetNotFound(name) => { + Error::not_found_by_name( + ResourceType::BgpAnnounceSet, + &name, + ) + } } + } else { + public_error_from_diesel(e, ErrorHandler::Server) } - } else { - public_error_from_diesel(e, ErrorHandler::Server) - } - }) + }) } pub async fn bgp_create_announce_set( @@ -283,13 +268,9 @@ impl DataStore { use db::schema::bgp_announce_set::dsl as announce_set_dsl; use db::schema::bgp_announcement::dsl as bgp_announcement_dsl; - let retry_helper = RetryHelper::new( - &self.transaction_retry_producer, - "bgp_create_announce_set", - ); - let pool = self.pool_connection_authorized(opctx).await?; - pool.transaction_async_with_retry( - |conn| async move { + let conn = self.pool_connection_authorized(opctx).await?; + self.transaction_retry_wrapper("bgp_create_announce_set") + .transaction(&conn, |conn| async move { let bas: BgpAnnounceSet = announce.clone().into(); let db_as: BgpAnnounceSet = @@ -317,11 +298,9 @@ impl DataStore { } Ok((db_as, db_annoucements)) - }, - retry_helper.as_callback(), - ) - .await - .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + }) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) } pub async fn bgp_delete_announce_set( @@ -341,16 +320,12 @@ impl DataStore { AnnounceSetInUse, } - let pool = self.pool_connection_authorized(opctx).await?; + let conn = self.pool_connection_authorized(opctx).await?; let name_or_id = sel.name_or_id.clone(); - let err = Arc::new(OnceLock::new()); - let retry_helper = RetryHelper::new( - &self.transaction_retry_producer, - "bgp_delete_announce_set", - ); - pool.transaction_async_with_retry( - |conn| { + let err = OptionalError::new(); + self.transaction_retry_wrapper("bgp_delete_announce_set") + .transaction(&conn, |conn| { let err = err.clone(); let name_or_id = name_or_id.clone(); async move { @@ -375,9 +350,9 @@ impl DataStore { .await?; if count > 0 { - err.set(BgpAnnounceSetDeleteError::AnnounceSetInUse) - .unwrap(); - return Err(DieselError::RollbackTransaction); + return Err(err.bail( + BgpAnnounceSetDeleteError::AnnounceSetInUse, + )); } diesel::update(announce_set_dsl::bgp_announce_set) @@ -393,20 +368,18 @@ impl DataStore { Ok(()) } - }, - retry_helper.as_callback(), - ) - .await - .map_err(|e| { - if let Some(err) = err.get() { - match err { - BgpAnnounceSetDeleteError::AnnounceSetInUse => { - Error::invalid_request("BGP announce set in use") + }) + .await + .map_err(|e| { + if let Some(err) = err.take() { + match err { + BgpAnnounceSetDeleteError::AnnounceSetInUse => { + Error::invalid_request("BGP announce set in use") + } } + } else { + public_error_from_diesel(e, ErrorHandler::Server) } - } else { - public_error_from_diesel(e, ErrorHandler::Server) - } - }) + }) } } diff --git a/nexus/db-queries/src/db/datastore/identity_provider.rs b/nexus/db-queries/src/db/datastore/identity_provider.rs index cb15e20549..cee577acd6 100644 --- a/nexus/db-queries/src/db/datastore/identity_provider.rs +++ b/nexus/db-queries/src/db/datastore/identity_provider.rs @@ -14,8 +14,6 @@ use crate::db::identity::Resource; use crate::db::model::IdentityProvider; use crate::db::model::Name; use crate::db::pagination::paginated; -use crate::transaction_retry::RetryHelper; -use async_bb8_diesel::AsyncConnection; use async_bb8_diesel::AsyncRunQueryDsl; use diesel::prelude::*; use omicron_common::api::external::http_pagination::PaginatedBy; @@ -63,62 +61,49 @@ impl DataStore { opctx.authorize(authz::Action::CreateChild, authz_idp_list).await?; assert_eq!(provider.silo_id, authz_idp_list.silo().id()); - let retry_helper = RetryHelper::new( - &self.transaction_retry_producer, - "saml_identity_provider_create", - ); let name = provider.identity().name.to_string(); - self.pool_connection_authorized(opctx) - .await? - .transaction_async_with_retry( - |conn| { - let provider = provider.clone(); - async move { - // insert silo identity provider record with type Saml - use db::schema::identity_provider::dsl as idp_dsl; - diesel::insert_into(idp_dsl::identity_provider) - .values(db::model::IdentityProvider { - identity: db::model::IdentityProviderIdentity { - id: provider.identity.id, - name: provider.identity.name.clone(), - description: provider - .identity - .description - .clone(), - time_created: provider - .identity - .time_created, - time_modified: provider - .identity - .time_modified, - time_deleted: provider - .identity - .time_deleted, - }, - silo_id: provider.silo_id, - provider_type: - db::model::IdentityProviderType::Saml, - }) - .execute_async(&conn) - .await?; + let conn = self.pool_connection_authorized(opctx).await?; - // insert silo saml identity provider record - use db::schema::saml_identity_provider::dsl; - let result = diesel::insert_into( - dsl::saml_identity_provider, - ) - .values(provider) - .returning( - db::model::SamlIdentityProvider::as_returning(), - ) - .get_result_async(&conn) + self.transaction_retry_wrapper("saml_identity_provider_create") + .transaction(&conn, |conn| { + let provider = provider.clone(); + async move { + // insert silo identity provider record with type Saml + use db::schema::identity_provider::dsl as idp_dsl; + diesel::insert_into(idp_dsl::identity_provider) + .values(db::model::IdentityProvider { + identity: db::model::IdentityProviderIdentity { + id: provider.identity.id, + name: provider.identity.name.clone(), + description: provider + .identity + .description + .clone(), + time_created: provider.identity.time_created, + time_modified: provider.identity.time_modified, + time_deleted: provider.identity.time_deleted, + }, + silo_id: provider.silo_id, + provider_type: + db::model::IdentityProviderType::Saml, + }) + .execute_async(&conn) .await?; - Ok(result) - } - }, - retry_helper.as_callback(), - ) + // insert silo saml identity provider record + use db::schema::saml_identity_provider::dsl; + let result = + diesel::insert_into(dsl::saml_identity_provider) + .values(provider) + .returning( + db::model::SamlIdentityProvider::as_returning(), + ) + .get_result_async(&conn) + .await?; + + Ok(result) + } + }) .await .map_err(|e| { public_error_from_diesel( diff --git a/nexus/db-queries/src/db/datastore/mod.rs b/nexus/db-queries/src/db/datastore/mod.rs index c63cfeec34..e5baed0a17 100644 --- a/nexus/db-queries/src/db/datastore/mod.rs +++ b/nexus/db-queries/src/db/datastore/mod.rs @@ -231,7 +231,6 @@ impl DataStore { ) } - // TODO: Do we need this anymore... #[cfg(test)] pub(crate) fn transaction_retry_producer( &self, diff --git a/nexus/db-queries/src/db/datastore/network_interface.rs b/nexus/db-queries/src/db/datastore/network_interface.rs index b01dc16ced..4d4e43c9a7 100644 --- a/nexus/db-queries/src/db/datastore/network_interface.rs +++ b/nexus/db-queries/src/db/datastore/network_interface.rs @@ -12,7 +12,6 @@ use crate::db::collection_insert::AsyncInsertError; use crate::db::collection_insert::DatastoreCollection; use crate::db::cte_utils::BoxedQuery; use crate::db::error::public_error_from_diesel; -use crate::db::error::retryable; use crate::db::error::ErrorHandler; use crate::db::model::IncompleteNetworkInterface; use crate::db::model::Instance; @@ -25,8 +24,7 @@ use crate::db::model::VpcSubnet; use crate::db::pagination::paginated; use crate::db::pool::DbConnection; use crate::db::queries::network_interface; -use crate::transaction_retry::RetryHelper; -use async_bb8_diesel::AsyncConnection; +use crate::transaction_retry::OptionalError; use async_bb8_diesel::AsyncRunQueryDsl; use chrono::Utc; use diesel::prelude::*; @@ -41,7 +39,6 @@ use omicron_common::api::external::ResourceType; use omicron_common::api::external::UpdateResult; use ref_cast::RefCast; use sled_agent_client::types as sled_client_types; -use std::sync::{Arc, OnceLock}; use uuid::Uuid; /// OPTE requires information that's currently split across the network @@ -469,88 +466,78 @@ impl DataStore { FailedToUnsetPrimary(DieselError), } - let err = Arc::new(OnceLock::new()); - let retry_helper = RetryHelper::new( - &self.transaction_retry_producer, - "instance_update_network_interface", - ); + let err = OptionalError::new(); let conn = self.pool_connection_authorized(opctx).await?; if primary { - conn.transaction_async_with_retry(|conn| { - let err = err.clone(); - let stopped = stopped.clone(); - let update_target_query = update_target_query.clone(); - async move { - let instance_runtime = - instance_query.get_result_async(&conn).await?.runtime_state; - if instance_runtime.propolis_id.is_some() - || instance_runtime.nexus_state != stopped - { - err.set(NetworkInterfaceUpdateError::InstanceNotStopped).unwrap(); - return Err(diesel::result::Error::RollbackTransaction); - } - - // First, get the primary interface - let primary_interface = - find_primary_query.get_result_async(&conn).await?; - // If the target and primary are different, we need to toggle - // the primary into a secondary. - if primary_interface.identity.id != interface_id { - use crate::db::schema::network_interface::dsl; - if let Err(e) = diesel::update(dsl::network_interface) - .filter(dsl::id.eq(primary_interface.identity.id)) - .filter(dsl::kind.eq(NetworkInterfaceKind::Instance)) - .filter(dsl::time_deleted.is_null()) - .set(dsl::is_primary.eq(false)) - .execute_async(&conn) - .await + self.transaction_retry_wrapper("instance_update_network_interface") + .transaction(&conn, |conn| { + let err = err.clone(); + let stopped = stopped.clone(); + let update_target_query = update_target_query.clone(); + async move { + let instance_runtime = + instance_query.get_result_async(&conn).await?.runtime_state; + if instance_runtime.propolis_id.is_some() + || instance_runtime.nexus_state != stopped { - if retryable(&e) { - return Err(e); + return Err(err.bail(NetworkInterfaceUpdateError::InstanceNotStopped)); + } + + // First, get the primary interface + let primary_interface = + find_primary_query.get_result_async(&conn).await?; + // If the target and primary are different, we need to toggle + // the primary into a secondary. + if primary_interface.identity.id != interface_id { + use crate::db::schema::network_interface::dsl; + if let Err(e) = diesel::update(dsl::network_interface) + .filter(dsl::id.eq(primary_interface.identity.id)) + .filter(dsl::kind.eq(NetworkInterfaceKind::Instance)) + .filter(dsl::time_deleted.is_null()) + .set(dsl::is_primary.eq(false)) + .execute_async(&conn) + .await + { + return Err(err.bail_retryable_or_else( + e, + |e| NetworkInterfaceUpdateError::FailedToUnsetPrimary(e) + )); } - err.set(NetworkInterfaceUpdateError::FailedToUnsetPrimary(e)).unwrap(); - return Err(diesel::result::Error::RollbackTransaction); } - } - // In any case, update the actual target - update_target_query.get_result_async(&conn).await - } - }, - retry_helper.as_callback() - ) + // In any case, update the actual target + update_target_query.get_result_async(&conn).await + } + }).await } else { // In this case, we can just directly apply the updates. By // construction, `updates.primary` is `None`, so nothing will // be done there. The other columns always need to be updated, and // we're only hitting a single row. Note that we still need to // verify the instance is stopped. - conn.transaction_async_with_retry(|conn| { - let err = err.clone(); - let stopped = stopped.clone(); - let update_target_query = update_target_query.clone(); - async move { - let instance_state = - instance_query.get_result_async(&conn).await?.runtime_state; - if instance_state.propolis_id.is_some() - || instance_state.nexus_state != stopped - { - err.set(NetworkInterfaceUpdateError::InstanceNotStopped).unwrap(); - return Err(diesel::result::Error::RollbackTransaction); + self.transaction_retry_wrapper("instance_update_network_interface") + .transaction(&conn, |conn| { + let err = err.clone(); + let stopped = stopped.clone(); + let update_target_query = update_target_query.clone(); + async move { + let instance_state = + instance_query.get_result_async(&conn).await?.runtime_state; + if instance_state.propolis_id.is_some() + || instance_state.nexus_state != stopped + { + return Err(err.bail(NetworkInterfaceUpdateError::InstanceNotStopped)); + } + update_target_query.get_result_async(&conn).await } - update_target_query.get_result_async(&conn).await - } - }, - retry_helper.as_callback() - ) + }).await } - .await // Convert to `InstanceNetworkInterface` before returning, we know // this is valid as we've filtered appropriately above. .map(NetworkInterface::as_instance) .map_err(|e| { - if let Some(err) = Arc::try_unwrap(err).unwrap().take() { + if let Some(err) = err.take() { match err { NetworkInterfaceUpdateError::InstanceNotStopped => { return Error::invalid_request( diff --git a/nexus/db-queries/src/db/datastore/project.rs b/nexus/db-queries/src/db/datastore/project.rs index 8c63f6e71b..ba0c64abfd 100644 --- a/nexus/db-queries/src/db/datastore/project.rs +++ b/nexus/db-queries/src/db/datastore/project.rs @@ -12,7 +12,6 @@ use crate::db; use crate::db::collection_insert::AsyncInsertError; use crate::db::collection_insert::DatastoreCollection; use crate::db::error::public_error_from_diesel; -use crate::db::error::retryable; use crate::db::error::ErrorHandler; use crate::db::fixed_data::project::SERVICES_PROJECT; use crate::db::fixed_data::silo::INTERNAL_SILO_ID; @@ -24,11 +23,10 @@ use crate::db::model::ProjectUpdate; use crate::db::model::Silo; use crate::db::model::VirtualProvisioningCollection; use crate::db::pagination::paginated; -use crate::transaction_retry::RetryHelper; -use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl}; +use crate::transaction_retry::OptionalError; +use async_bb8_diesel::AsyncRunQueryDsl; use chrono::Utc; use diesel::prelude::*; -use diesel::result::Error as DieselError; use omicron_common::api::external::http_pagination::PaginatedBy; use omicron_common::api::external::CreateResult; use omicron_common::api::external::DeleteResult; @@ -39,7 +37,6 @@ use omicron_common::api::external::LookupType; use omicron_common::api::external::ResourceType; use omicron_common::api::external::UpdateResult; use ref_cast::RefCast; -use std::sync::{Arc, OnceLock}; // Generates internal functions used for validation during project deletion. // Used simply to reduce boilerplate. @@ -154,16 +151,13 @@ impl DataStore { use db::schema::project::dsl; - let err = Arc::new(OnceLock::new()); - let retry_helper = RetryHelper::new( - &self.transaction_retry_producer, - "project_create_in_silo", - ); + let err = OptionalError::new(); let name = project.name().as_str().to_string(); + let conn = self.pool_connection_authorized(opctx).await?; + let db_project = self - .pool_connection_authorized(opctx) - .await? - .transaction_async_with_retry(|conn| { + .transaction_retry_wrapper("project_create_in_silo") + .transaction(&conn, |conn| { let err = err.clone(); let authz_silo_inner = authz_silo_inner.clone(); @@ -178,22 +172,21 @@ impl DataStore { .await .map_err(|e| match e { AsyncInsertError::CollectionNotFound => { - err.set(authz_silo_inner.not_found()).unwrap(); - return DieselError::RollbackTransaction; - } - AsyncInsertError::DatabaseError(e) => { - if retryable(&e) { - return e; - } - err.set(public_error_from_diesel( - e, - ErrorHandler::Conflict( - ResourceType::Project, - &name, - ), - )).unwrap(); - return DieselError::RollbackTransaction; + err.bail(authz_silo_inner.not_found()) } + AsyncInsertError::DatabaseError(diesel_error) => err + .bail_retryable_or_else( + diesel_error, + |diesel_error| { + public_error_from_diesel( + diesel_error, + ErrorHandler::Conflict( + ResourceType::Project, + &name, + ), + ) + }, + ), })?; // Create resource provisioning for the project. @@ -207,13 +200,11 @@ impl DataStore { .await?; Ok(project) } - }, - retry_helper.as_callback(), - ) + }) .await .map_err(|e| { - if let Some(err) = err.get() { - return err.clone(); + if let Some(err) = err.take() { + return err; } public_error_from_diesel(e, ErrorHandler::Server) })?; @@ -252,17 +243,15 @@ impl DataStore { use db::schema::project::dsl; - let err = Arc::new(OnceLock::new()); - let retry_helper = RetryHelper::new( - &self.transaction_retry_producer, - "project_delete", - ); - self.pool_connection_authorized(opctx) - .await? - .transaction_async_with_retry(|conn| { + let err = OptionalError::new(); + let conn = self.pool_connection_authorized(opctx).await?; + + self.transaction_retry_wrapper("project_delete") + .transaction(&conn, |conn| { let err = err.clone(); async move { - let now = Utc::now(); let updated_rows = diesel::update(dsl::project) + let now = Utc::now(); + let updated_rows = diesel::update(dsl::project) .filter(dsl::time_deleted.is_null()) .filter(dsl::id.eq(authz_project.id())) .filter(dsl::rcgen.eq(db_project.rcgen)) @@ -271,23 +260,22 @@ impl DataStore { .execute_async(&conn) .await .map_err(|e| { - if retryable(&e) { - return e; - } - err.set(public_error_from_diesel( - e, - ErrorHandler::NotFoundByResource(authz_project), - )).unwrap(); - DieselError::RollbackTransaction + err.bail_retryable_or_else(e, |e| { + public_error_from_diesel( + e, + ErrorHandler::NotFoundByResource( + authz_project, + ), + ) + }) })?; if updated_rows == 0 { - err.set(Error::InvalidRequest { + return Err(err.bail(Error::InvalidRequest { message: "deletion failed due to concurrent modification" .to_string(), - }).unwrap(); - return Err(DieselError::RollbackTransaction); + })); } self.virtual_provisioning_collection_delete_on_connection( @@ -298,11 +286,11 @@ impl DataStore { .await?; Ok(()) } - }, retry_helper.as_callback()) + }) .await .map_err(|e| { - if let Some(err) = err.get() { - return err.clone(); + if let Some(err) = err.take() { + return err; } public_error_from_diesel(e, ErrorHandler::Server) })?; diff --git a/nexus/db-queries/src/db/datastore/rack.rs b/nexus/db-queries/src/db/datastore/rack.rs index a7406db568..b678a32e5a 100644 --- a/nexus/db-queries/src/db/datastore/rack.rs +++ b/nexus/db-queries/src/db/datastore/rack.rs @@ -27,7 +27,7 @@ use crate::db::model::Rack; use crate::db::model::Zpool; use crate::db::pagination::paginated; use crate::db::pool::DbConnection; -use crate::transaction_retry::RetryHelper; +use crate::transaction_retry::OptionalError; use async_bb8_diesel::AsyncConnection; use async_bb8_diesel::AsyncRunQueryDsl; use chrono::Utc; @@ -570,10 +570,6 @@ impl DataStore { // NOTE: This transaction cannot yet be made retryable, as it uses // nested transactions. - // let retry_helper = RetryHelper::new( - // &self.transaction_retry_producer, - // "rack_set_initialized", - // ); let rack = self .pool_connection_authorized(opctx) .await? @@ -800,61 +796,50 @@ impl DataStore { use crate::db::schema::external_ip::dsl as extip_dsl; use crate::db::schema::service::dsl as service_dsl; - let err = Arc::new(OnceLock::new()); - let retry_helper = RetryHelper::new( - &self.transaction_retry_producer, - "nexus_external_addresses", - ); - self.pool_connection_authorized(opctx) - .await? - .transaction_async_with_retry( - |conn| { - let err = err.clone(); - async move { - let ips = - extip_dsl::external_ip - .inner_join(service_dsl::service.on( - service_dsl::id.eq( - extip_dsl::parent_id.assume_not_null(), - ), - )) - .filter(extip_dsl::parent_id.is_not_null()) - .filter(extip_dsl::time_deleted.is_null()) - .filter(extip_dsl::is_service) - .filter( - service_dsl::kind - .eq(db::model::ServiceKind::Nexus), - ) - .select(ExternalIp::as_select()) - .get_results_async(&conn) - .await? - .into_iter() - .map(|external_ip| external_ip.ip.ip()) - .collect(); - - let dns_zones = self - .dns_zones_list_all_on_connection( - opctx, - &conn, - DnsGroup::External, - ) - .await - .map_err(|e| match e.retryable() { - NotRetryable(not_retryable_err) => { - err.set(not_retryable_err).unwrap(); - DieselError::RollbackTransaction - } - Retryable(retryable_err) => retryable_err, - })?; - - Ok((ips, dns_zones)) - } - }, - retry_helper.as_callback(), - ) + let err = OptionalError::new(); + let conn = self.pool_connection_authorized(opctx).await?; + self.transaction_retry_wrapper("nexus_external_addresses") + .transaction(&conn, |conn| { + let err = err.clone(); + async move { + let ips = extip_dsl::external_ip + .inner_join( + service_dsl::service.on(service_dsl::id + .eq(extip_dsl::parent_id.assume_not_null())), + ) + .filter(extip_dsl::parent_id.is_not_null()) + .filter(extip_dsl::time_deleted.is_null()) + .filter(extip_dsl::is_service) + .filter( + service_dsl::kind.eq(db::model::ServiceKind::Nexus), + ) + .select(ExternalIp::as_select()) + .get_results_async(&conn) + .await? + .into_iter() + .map(|external_ip| external_ip.ip.ip()) + .collect(); + + let dns_zones = self + .dns_zones_list_all_on_connection( + opctx, + &conn, + DnsGroup::External, + ) + .await + .map_err(|e| match e.retryable() { + NotRetryable(not_retryable_err) => { + err.bail(not_retryable_err) + } + Retryable(retryable_err) => retryable_err, + })?; + + Ok((ips, dns_zones)) + } + }) .await .map_err(|e| { - if let Some(err) = Arc::try_unwrap(err).unwrap().take() { + if let Some(err) = err.take() { return err.into(); } public_error_from_diesel(e, ErrorHandler::Server) diff --git a/nexus/db-queries/src/db/datastore/region.rs b/nexus/db-queries/src/db/datastore/region.rs index cfb6dbeb66..b055a3e85c 100644 --- a/nexus/db-queries/src/db/datastore/region.rs +++ b/nexus/db-queries/src/db/datastore/region.rs @@ -13,8 +13,7 @@ use crate::db::error::ErrorHandler; use crate::db::lookup::LookupPath; use crate::db::model::Dataset; use crate::db::model::Region; -use crate::transaction_retry::RetryHelper; -use async_bb8_diesel::AsyncConnection; +use crate::transaction_retry::OptionalError; use async_bb8_diesel::AsyncRunQueryDsl; use diesel::prelude::*; use nexus_types::external_api::params; @@ -23,7 +22,6 @@ use omicron_common::api::external::DeleteResult; use omicron_common::api::external::Error; use omicron_common::nexus_config::RegionAllocationStrategy; use slog::Logger; -use std::sync::{Arc, OnceLock}; use uuid::Uuid; impl DataStore { @@ -164,16 +162,10 @@ impl DataStore { #[error("Numeric error: {0}")] NumericError(String), } - let err = Arc::new(OnceLock::new()); - - let retry_helper = RetryHelper::new( - &self.transaction_retry_producer, - "regions_hard_delete", - ); - - self.pool_connection_unauthorized() - .await? - .transaction_async_with_retry(|conn| { + let err = OptionalError::new(); + let conn = self.pool_connection_unauthorized().await?; + self.transaction_retry_wrapper("regions_hard_delete") + .transaction(&conn, |conn| { let err = err.clone(); let region_ids = region_ids.clone(); async move { @@ -208,10 +200,9 @@ impl DataStore { let dataset_total_occupied_size: db::model::ByteCount = dataset_total_occupied_size.try_into().map_err( |e: anyhow::Error| { - err.set(RegionDeleteError::NumericError( + err.bail(RegionDeleteError::NumericError( e.to_string(), - )).unwrap(); - diesel::result::Error::RollbackTransaction + )) }, )?; @@ -230,12 +221,10 @@ impl DataStore { } Ok(()) } - }, - retry_helper.as_callback(), - ) + }) .await .map_err(|e| { - if let Some(err) = err.get() { + if let Some(err) = err.take() { match err { RegionDeleteError::NumericError(err) => { return Error::internal_error( diff --git a/nexus/db-queries/src/db/datastore/silo_group.rs b/nexus/db-queries/src/db/datastore/silo_group.rs index 56a62d6b3c..29fcb7490b 100644 --- a/nexus/db-queries/src/db/datastore/silo_group.rs +++ b/nexus/db-queries/src/db/datastore/silo_group.rs @@ -15,7 +15,6 @@ use crate::db::error::TransactionError; use crate::db::model::SiloGroup; use crate::db::model::SiloGroupMembership; use crate::db::pagination::paginated; -use crate::transaction_retry::RetryHelper; use async_bb8_diesel::AsyncConnection; use async_bb8_diesel::AsyncRunQueryDsl; use chrono::Utc; @@ -146,47 +145,40 @@ impl DataStore { ) -> UpdateResult<()> { opctx.authorize(authz::Action::Modify, authz_silo_user).await?; - let retry_helper = RetryHelper::new( - &self.transaction_retry_producer, - "silo_group_membership_replace_for_user", - ); + let conn = self.pool_connection_authorized(opctx).await?; - self.pool_connection_authorized(opctx) - .await? - .transaction_async_with_retry( - |conn| { - let silo_group_ids = silo_group_ids.clone(); - async move { - use db::schema::silo_group_membership::dsl; + self.transaction_retry_wrapper("silo_group_membership_replace_for_user") + .transaction(&conn, |conn| { + let silo_group_ids = silo_group_ids.clone(); + async move { + use db::schema::silo_group_membership::dsl; - // Delete existing memberships for user - let silo_user_id = authz_silo_user.id(); - diesel::delete(dsl::silo_group_membership) - .filter(dsl::silo_user_id.eq(silo_user_id)) - .execute_async(&conn) - .await?; + // Delete existing memberships for user + let silo_user_id = authz_silo_user.id(); + diesel::delete(dsl::silo_group_membership) + .filter(dsl::silo_user_id.eq(silo_user_id)) + .execute_async(&conn) + .await?; - // Create new memberships for user - let silo_group_memberships: Vec< - db::model::SiloGroupMembership, - > = silo_group_ids - .iter() - .map(|group_id| db::model::SiloGroupMembership { - silo_group_id: *group_id, - silo_user_id, - }) - .collect(); + // Create new memberships for user + let silo_group_memberships: Vec< + db::model::SiloGroupMembership, + > = silo_group_ids + .iter() + .map(|group_id| db::model::SiloGroupMembership { + silo_group_id: *group_id, + silo_user_id, + }) + .collect(); - diesel::insert_into(dsl::silo_group_membership) - .values(silo_group_memberships) - .execute_async(&conn) - .await?; + diesel::insert_into(dsl::silo_group_membership) + .values(silo_group_memberships) + .execute_async(&conn) + .await?; - Ok(()) - } - }, - retry_helper.as_callback(), - ) + Ok(()) + } + }) .await .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) } diff --git a/nexus/db-queries/src/db/datastore/sled.rs b/nexus/db-queries/src/db/datastore/sled.rs index 57c56ffcda..023384a9bf 100644 --- a/nexus/db-queries/src/db/datastore/sled.rs +++ b/nexus/db-queries/src/db/datastore/sled.rs @@ -15,8 +15,7 @@ use crate::db::model::SledResource; use crate::db::model::SledUpdate; use crate::db::pagination::paginated; use crate::db::update_and_check::UpdateAndCheck; -use crate::transaction_retry::RetryHelper; -use async_bb8_diesel::AsyncConnection; +use crate::transaction_retry::OptionalError; use async_bb8_diesel::AsyncRunQueryDsl; use chrono::Utc; use diesel::prelude::*; @@ -26,7 +25,6 @@ use omicron_common::api::external::DataPageParams; use omicron_common::api::external::DeleteResult; use omicron_common::api::external::ListResultVec; use omicron_common::api::external::ResourceType; -use std::sync::{Arc, OnceLock}; use uuid::Uuid; impl DataStore { @@ -91,76 +89,74 @@ impl DataStore { enum SledReservationError { NotFound, } - let err = Arc::new(OnceLock::new()); - let retry_helper = RetryHelper::new( - &self.transaction_retry_producer, - "sled_reservation_create", - ); - self.pool_connection_authorized(opctx) - .await? - .transaction_async_with_retry( - |conn| { - // Clone variables into retryable function - let err = err.clone(); - let constraints = constraints.clone(); - let resources = resources.clone(); - - async move { - use db::schema::sled_resource::dsl as resource_dsl; - // Check if resource ID already exists - if so, return it. - let old_resource = resource_dsl::sled_resource - .filter(resource_dsl::id.eq(resource_id)) - .select(SledResource::as_select()) - .limit(1) - .load_async(&conn) - .await?; - - if !old_resource.is_empty() { - return Ok(old_resource[0].clone()); - } + let err = OptionalError::new(); + + let conn = self.pool_connection_authorized(opctx).await?; + + self.transaction_retry_wrapper("sled_reservation_create") + .transaction(&conn, |conn| { + // Clone variables into retryable function + let err = err.clone(); + let constraints = constraints.clone(); + let resources = resources.clone(); + + async move { + use db::schema::sled_resource::dsl as resource_dsl; + // Check if resource ID already exists - if so, return it. + let old_resource = resource_dsl::sled_resource + .filter(resource_dsl::id.eq(resource_id)) + .select(SledResource::as_select()) + .limit(1) + .load_async(&conn) + .await?; + + if !old_resource.is_empty() { + return Ok(old_resource[0].clone()); + } - // If it doesn't already exist, find a sled with enough space - // for the resources we're requesting. - use db::schema::sled::dsl as sled_dsl; - // This answers the boolean question: - // "Does the SUM of all hardware thread usage, plus the one we're trying - // to allocate, consume less threads than exists on the sled?" - let sled_has_space_for_threads = - (diesel::dsl::sql::( - &format!( - "COALESCE(SUM(CAST({} as INT8)), 0)", - resource_dsl::hardware_threads::NAME - ), - ) + resources.hardware_threads) - .le(sled_dsl::usable_hardware_threads); - - // This answers the boolean question: - // "Does the SUM of all RAM usage, plus the one we're trying - // to allocate, consume less RAM than exists on the sled?" - let sled_has_space_for_rss = - (diesel::dsl::sql::( - &format!( - "COALESCE(SUM(CAST({} as INT8)), 0)", - resource_dsl::rss_ram::NAME - ), - ) + resources.rss_ram) - .le(sled_dsl::usable_physical_ram); - - // Determine whether adding this service's reservoir allocation - // to what's allocated on the sled would avoid going over quota. - let sled_has_space_in_reservoir = - (diesel::dsl::sql::( - &format!( - "COALESCE(SUM(CAST({} as INT8)), 0)", - resource_dsl::reservoir_ram::NAME - ), - ) + resources.reservoir_ram) - .le(sled_dsl::reservoir_size); - - // Generate a query describing all of the sleds that have space - // for this reservation. - let mut sled_targets = sled_dsl::sled + // If it doesn't already exist, find a sled with enough space + // for the resources we're requesting. + use db::schema::sled::dsl as sled_dsl; + // This answers the boolean question: + // "Does the SUM of all hardware thread usage, plus the one we're trying + // to allocate, consume less threads than exists on the sled?" + let sled_has_space_for_threads = + (diesel::dsl::sql::( + &format!( + "COALESCE(SUM(CAST({} as INT8)), 0)", + resource_dsl::hardware_threads::NAME + ), + ) + resources.hardware_threads) + .le(sled_dsl::usable_hardware_threads); + + // This answers the boolean question: + // "Does the SUM of all RAM usage, plus the one we're trying + // to allocate, consume less RAM than exists on the sled?" + let sled_has_space_for_rss = + (diesel::dsl::sql::( + &format!( + "COALESCE(SUM(CAST({} as INT8)), 0)", + resource_dsl::rss_ram::NAME + ), + ) + resources.rss_ram) + .le(sled_dsl::usable_physical_ram); + + // Determine whether adding this service's reservoir allocation + // to what's allocated on the sled would avoid going over quota. + let sled_has_space_in_reservoir = + (diesel::dsl::sql::( + &format!( + "COALESCE(SUM(CAST({} as INT8)), 0)", + resource_dsl::reservoir_ram::NAME + ), + ) + resources.reservoir_ram) + .le(sled_dsl::reservoir_size); + + // Generate a query describing all of the sleds that have space + // for this reservation. + let mut sled_targets = + sled_dsl::sled .left_join( resource_dsl::sled_resource .on(resource_dsl::sled_id.eq(sled_dsl::id)), @@ -179,49 +175,46 @@ impl DataStore { .select(sled_dsl::id) .into_boxed(); - // Further constrain the sled IDs according to any caller- - // supplied constraints. - if let Some(must_select_from) = - constraints.must_select_from() - { - sled_targets = sled_targets.filter( - sled_dsl::id.eq_any(must_select_from.to_vec()), - ); - } - - sql_function!(fn random() -> diesel::sql_types::Float); - let sled_targets = sled_targets - .order(random()) - .limit(1) - .get_results_async::(&conn) - .await?; - - if sled_targets.is_empty() { - err.set(SledReservationError::NotFound).unwrap(); - return Err(diesel::result::Error::NotFound); - } - - // Create a SledResource record, associate it with the target - // sled. - let resource = SledResource::new( - resource_id, - sled_targets[0], - resource_kind, - resources, + // Further constrain the sled IDs according to any caller- + // supplied constraints. + if let Some(must_select_from) = + constraints.must_select_from() + { + sled_targets = sled_targets.filter( + sled_dsl::id.eq_any(must_select_from.to_vec()), ); + } - diesel::insert_into(resource_dsl::sled_resource) - .values(resource) - .returning(SledResource::as_returning()) - .get_result_async(&conn) - .await + sql_function!(fn random() -> diesel::sql_types::Float); + let sled_targets = sled_targets + .order(random()) + .limit(1) + .get_results_async::(&conn) + .await?; + + if sled_targets.is_empty() { + return Err(err.bail(SledReservationError::NotFound)); } - }, - retry_helper.as_callback(), - ) + + // Create a SledResource record, associate it with the target + // sled. + let resource = SledResource::new( + resource_id, + sled_targets[0], + resource_kind, + resources, + ); + + diesel::insert_into(resource_dsl::sled_resource) + .values(resource) + .returning(SledResource::as_returning()) + .get_result_async(&conn) + .await + } + }) .await .map_err(|e| { - if let Some(err) = err.get() { + if let Some(err) = err.take() { match err { SledReservationError::NotFound => { return external::Error::unavail( diff --git a/nexus/db-queries/src/db/datastore/snapshot.rs b/nexus/db-queries/src/db/datastore/snapshot.rs index 5fd6326f56..03058eda61 100644 --- a/nexus/db-queries/src/db/datastore/snapshot.rs +++ b/nexus/db-queries/src/db/datastore/snapshot.rs @@ -20,12 +20,10 @@ use crate::db::model::SnapshotState; use crate::db::pagination::paginated; use crate::db::update_and_check::UpdateAndCheck; use crate::db::update_and_check::UpdateStatus; -use crate::transaction_retry::RetryHelper; -use async_bb8_diesel::AsyncConnection; +use crate::transaction_retry::OptionalError; use async_bb8_diesel::AsyncRunQueryDsl; use chrono::Utc; use diesel::prelude::*; -use diesel::result::Error as DieselError; use diesel::OptionalExtension; use nexus_types::identity::Resource; use omicron_common::api::external::http_pagination::PaginatedBy; @@ -37,7 +35,6 @@ use omicron_common::api::external::ResourceType; use omicron_common::api::external::UpdateResult; use omicron_common::bail_unless; use ref_cast::RefCast; -use std::sync::{Arc, OnceLock}; use uuid::Uuid; impl DataStore { @@ -62,106 +59,93 @@ impl DataStore { let snapshot_name = snapshot.name().to_string(); let project_id = snapshot.project_id; - let err = Arc::new(OnceLock::new()); - let retry_helper = RetryHelper::new( - &self.transaction_retry_producer, - "project_ensure_snapshot", - ); + let err = OptionalError::new(); + let conn = self.pool_connection_authorized(opctx).await?; + let snapshot: Snapshot = self - .pool_connection_authorized(opctx) - .await? - .transaction_async_with_retry( - |conn| { - let err = err.clone(); - let snapshot = snapshot.clone(); - async move { - use db::schema::snapshot::dsl; + .transaction_retry_wrapper("project_ensure_snapshot") + .transaction(&conn, |conn| { + let err = err.clone(); + let snapshot = snapshot.clone(); + async move { + use db::schema::snapshot::dsl; - // If an undeleted snapshot exists in the database with the - // same name and project but a different id to the snapshot - // this function was passed as an argument, then return an - // error here. - // - // As written below, - // - // .on_conflict((dsl::project_id, dsl::name)) - // .filter_target(dsl::time_deleted.is_null()) - // .do_update() - // .set(dsl::time_modified.eq(dsl::time_modified)) - // - // will set any existing record's `time_modified` if the - // project id and name match, even if the snapshot ID does - // not match. diesel supports adding a filter below like so - // (marked with >>): - // - // .on_conflict((dsl::project_id, dsl::name)) - // .filter_target(dsl::time_deleted.is_null()) - // .do_update() - // .set(dsl::time_modified.eq(dsl::time_modified)) - // >> .filter(dsl::id.eq(snapshot.id())) - // - // which will restrict the `insert_into`'s set so that it - // only applies if the snapshot ID matches. But, - // AsyncInsertError does not have a ObjectAlreadyExists - // variant, so this will be returned as CollectionNotFound - // due to the `insert_into` failing. - // - // If this function is passed a snapshot with an ID that - // does not match, but a project and name that does, return - // ObjectAlreadyExists here. + // If an undeleted snapshot exists in the database with the + // same name and project but a different id to the snapshot + // this function was passed as an argument, then return an + // error here. + // + // As written below, + // + // .on_conflict((dsl::project_id, dsl::name)) + // .filter_target(dsl::time_deleted.is_null()) + // .do_update() + // .set(dsl::time_modified.eq(dsl::time_modified)) + // + // will set any existing record's `time_modified` if the + // project id and name match, even if the snapshot ID does + // not match. diesel supports adding a filter below like so + // (marked with >>): + // + // .on_conflict((dsl::project_id, dsl::name)) + // .filter_target(dsl::time_deleted.is_null()) + // .do_update() + // .set(dsl::time_modified.eq(dsl::time_modified)) + // >> .filter(dsl::id.eq(snapshot.id())) + // + // which will restrict the `insert_into`'s set so that it + // only applies if the snapshot ID matches. But, + // AsyncInsertError does not have a ObjectAlreadyExists + // variant, so this will be returned as CollectionNotFound + // due to the `insert_into` failing. + // + // If this function is passed a snapshot with an ID that + // does not match, but a project and name that does, return + // ObjectAlreadyExists here. - let existing_snapshot_id: Option = dsl::snapshot - .filter(dsl::time_deleted.is_null()) - .filter(dsl::name.eq(snapshot.name().to_string())) - .filter(dsl::project_id.eq(snapshot.project_id)) - .select(dsl::id) - .limit(1) - .first_async(&conn) - .await - .optional()?; + let existing_snapshot_id: Option = dsl::snapshot + .filter(dsl::time_deleted.is_null()) + .filter(dsl::name.eq(snapshot.name().to_string())) + .filter(dsl::project_id.eq(snapshot.project_id)) + .select(dsl::id) + .limit(1) + .first_async(&conn) + .await + .optional()?; - if let Some(existing_snapshot_id) = existing_snapshot_id - { - if existing_snapshot_id != snapshot.id() { - err.set(CustomError::ResourceAlreadyExists) - .unwrap(); - return Err(DieselError::RollbackTransaction); - } + if let Some(existing_snapshot_id) = existing_snapshot_id { + if existing_snapshot_id != snapshot.id() { + return Err( + err.bail(CustomError::ResourceAlreadyExists) + ); } - - Project::insert_resource( - project_id, - diesel::insert_into(dsl::snapshot) - .values(snapshot) - .on_conflict((dsl::project_id, dsl::name)) - .filter_target(dsl::time_deleted.is_null()) - .do_update() - .set(dsl::time_modified.eq(dsl::time_modified)), - ) - .insert_and_get_result_async(&conn) - .await - .map_err(|e| match e { - AsyncInsertError::CollectionNotFound => { - err.set(CustomError::InsertError( - Error::ObjectNotFound { - type_name: ResourceType::Project, - lookup_type: LookupType::ById( - project_id, - ), - }, - )) - .unwrap(); - DieselError::RollbackTransaction - } - AsyncInsertError::DatabaseError(e) => e, - }) } - }, - retry_helper.as_callback(), - ) + + Project::insert_resource( + project_id, + diesel::insert_into(dsl::snapshot) + .values(snapshot) + .on_conflict((dsl::project_id, dsl::name)) + .filter_target(dsl::time_deleted.is_null()) + .do_update() + .set(dsl::time_modified.eq(dsl::time_modified)), + ) + .insert_and_get_result_async(&conn) + .await + .map_err(|e| match e { + AsyncInsertError::CollectionNotFound => err.bail( + CustomError::InsertError(Error::ObjectNotFound { + type_name: ResourceType::Project, + lookup_type: LookupType::ById(project_id), + }), + ), + AsyncInsertError::DatabaseError(e) => e, + }) + } + }) .await .map_err(|e| { - if let Some(err) = Arc::try_unwrap(err).unwrap().take() { + if let Some(err) = err.take() { match err { CustomError::ResourceAlreadyExists => { Error::ObjectAlreadyExists { diff --git a/nexus/db-queries/src/db/datastore/switch_interface.rs b/nexus/db-queries/src/db/datastore/switch_interface.rs index a671de2358..67f16fa08f 100644 --- a/nexus/db-queries/src/db/datastore/switch_interface.rs +++ b/nexus/db-queries/src/db/datastore/switch_interface.rs @@ -13,9 +13,8 @@ use crate::db::error::public_error_from_diesel; use crate::db::error::ErrorHandler; use crate::db::model::LoopbackAddress; use crate::db::pagination::paginated; -use crate::transaction_retry::RetryHelper; -use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl}; -use diesel::result::Error as DieselError; +use crate::transaction_retry::OptionalError; +use async_bb8_diesel::AsyncRunQueryDsl; use diesel::{ExpressionMethods, QueryDsl, SelectableHelper}; use ipnetwork::IpNetwork; use nexus_types::external_api::params::LoopbackAddressCreate; @@ -23,7 +22,6 @@ use omicron_common::api::external::{ CreateResult, DataPageParams, DeleteResult, Error, ListResultVec, LookupResult, ResourceType, }; -use std::sync::{Arc, OnceLock}; use uuid::Uuid; impl DataStore { @@ -46,16 +44,12 @@ impl DataStore { let inet = IpNetwork::new(params.address, params.mask) .map_err(|_| Error::invalid_request("invalid address"))?; - let err = Arc::new(OnceLock::new()); - let retry_helper = RetryHelper::new( - &self.transaction_retry_producer, - "loopback_address_create", - ); + let err = OptionalError::new(); // TODO https://github.com/oxidecomputer/omicron/issues/2811 // Audit external networking database transaction usage - conn.transaction_async_with_retry( - |conn| { + self.transaction_retry_wrapper("loopback_address_create") + .transaction(&conn, |conn| { let err = err.clone(); async move { let lot_id = authz_address_lot.id(); @@ -68,13 +62,9 @@ impl DataStore { ) .await .map_err(|e| match e { - ReserveBlockTxnError::CustomError(e) => { - err.set( - LoopbackAddressCreateError::ReserveBlock(e), - ) - .unwrap(); - DieselError::RollbackTransaction - } + ReserveBlockTxnError::CustomError(e) => err.bail( + LoopbackAddressCreateError::ReserveBlock(e), + ), ReserveBlockTxnError::Database(e) => e, })?; @@ -99,30 +89,28 @@ impl DataStore { Ok(db_addr) } - }, - retry_helper.as_callback(), - ) - .await - .map_err(|e| { - if let Some(err) = err.get() { - match err { - LoopbackAddressCreateError::ReserveBlock( - ReserveBlockError::AddressUnavailable, - ) => Error::invalid_request("address unavailable"), - LoopbackAddressCreateError::ReserveBlock( - ReserveBlockError::AddressNotInLot, - ) => Error::invalid_request("address not in lot"), + }) + .await + .map_err(|e| { + if let Some(err) = err.take() { + match err { + LoopbackAddressCreateError::ReserveBlock( + ReserveBlockError::AddressUnavailable, + ) => Error::invalid_request("address unavailable"), + LoopbackAddressCreateError::ReserveBlock( + ReserveBlockError::AddressNotInLot, + ) => Error::invalid_request("address not in lot"), + } + } else { + public_error_from_diesel( + e, + ErrorHandler::Conflict( + ResourceType::LoopbackAddress, + &format!("lo {}", inet), + ), + ) } - } else { - public_error_from_diesel( - e, - ErrorHandler::Conflict( - ResourceType::LoopbackAddress, - &format!("lo {}", inet), - ), - ) - } - }) + }) } pub async fn loopback_address_delete( @@ -139,12 +127,8 @@ impl DataStore { // TODO https://github.com/oxidecomputer/omicron/issues/2811 // Audit external networking database transaction usage - let retry_helper = RetryHelper::new( - &self.transaction_retry_producer, - "loopback_address_delete", - ); - conn.transaction_async_with_retry( - |conn| async move { + self.transaction_retry_wrapper("loopback_address_delete") + .transaction(&conn, |conn| async move { let la = diesel::delete(dsl::loopback_address) .filter(dsl::id.eq(id)) .returning(LoopbackAddress::as_returning()) @@ -157,11 +141,9 @@ impl DataStore { .await?; Ok(()) - }, - retry_helper.as_callback(), - ) - .await - .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + }) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) } pub async fn loopback_address_get( diff --git a/nexus/db-queries/src/db/datastore/volume.rs b/nexus/db-queries/src/db/datastore/volume.rs index b754df7a9d..4f31efd610 100644 --- a/nexus/db-queries/src/db/datastore/volume.rs +++ b/nexus/db-queries/src/db/datastore/volume.rs @@ -7,7 +7,6 @@ use super::DataStore; use crate::db; use crate::db::error::public_error_from_diesel; -use crate::db::error::retryable; use crate::db::error::ErrorHandler; use crate::db::identity::Asset; use crate::db::model::Dataset; @@ -15,12 +14,10 @@ use crate::db::model::Region; use crate::db::model::RegionSnapshot; use crate::db::model::Volume; use crate::db::queries::volume::DecreaseCrucibleResourceCountAndSoftDeleteVolume; -use crate::transaction_retry::RetryHelper; +use crate::transaction_retry::OptionalError; use anyhow::bail; -use async_bb8_diesel::AsyncConnection; use async_bb8_diesel::AsyncRunQueryDsl; use diesel::prelude::*; -use diesel::result::Error as DieselError; use diesel::OptionalExtension; use omicron_common::api::external::CreateResult; use omicron_common::api::external::DeleteResult; @@ -32,7 +29,6 @@ use serde::Deserialize; use serde::Deserializer; use serde::Serialize; use sled_agent_client::types::VolumeConstructionRequest; -use std::sync::{Arc, OnceLock}; use uuid::Uuid; impl DataStore { @@ -68,47 +64,42 @@ impl DataStore { crucible_targets }; - let err = Arc::new(OnceLock::new()); - let retry_helper = - RetryHelper::new(&self.transaction_retry_producer, "volume_create"); - self.pool_connection_unauthorized() - .await? - .transaction_async_with_retry( - |conn| { - let err = err.clone(); - let crucible_targets = crucible_targets.clone(); - let volume = volume.clone(); - async move { - let maybe_volume: Option = dsl::volume - .filter(dsl::id.eq(volume.id())) - .select(Volume::as_select()) - .first_async(&conn) - .await - .optional()?; + let err = OptionalError::new(); + let conn = self.pool_connection_unauthorized().await?; + self.transaction_retry_wrapper("volume_create") + .transaction(&conn, |conn| { + let err = err.clone(); + let crucible_targets = crucible_targets.clone(); + let volume = volume.clone(); + async move { + let maybe_volume: Option = dsl::volume + .filter(dsl::id.eq(volume.id())) + .select(Volume::as_select()) + .first_async(&conn) + .await + .optional()?; - // If the volume existed already, return it and do not increase - // usage counts. - if let Some(volume) = maybe_volume { - return Ok(volume); - } + // If the volume existed already, return it and do not increase + // usage counts. + if let Some(volume) = maybe_volume { + return Ok(volume); + } - // TODO do we need on_conflict do_nothing here? if the transaction - // model is read-committed, the SELECT above could return nothing, - // and the INSERT here could still result in a conflict. - // - // See also https://github.com/oxidecomputer/omicron/issues/1168 - let volume: Volume = diesel::insert_into(dsl::volume) - .values(volume.clone()) - .on_conflict(dsl::id) - .do_nothing() - .returning(Volume::as_returning()) - .get_result_async(&conn) - .await - .map_err(|e| { - if retryable(&e) { - return e; - } - err.set(VolumeCreationError::Public( + // TODO do we need on_conflict do_nothing here? if the transaction + // model is read-committed, the SELECT above could return nothing, + // and the INSERT here could still result in a conflict. + // + // See also https://github.com/oxidecomputer/omicron/issues/1168 + let volume: Volume = diesel::insert_into(dsl::volume) + .values(volume.clone()) + .on_conflict(dsl::id) + .do_nothing() + .returning(Volume::as_returning()) + .get_result_async(&conn) + .await + .map_err(|e| { + err.bail_retryable_or_else(e, |e| { + VolumeCreationError::Public( public_error_from_diesel( e, ErrorHandler::Conflict( @@ -116,41 +107,37 @@ impl DataStore { volume.id().to_string().as_str(), ), ), - )) - .unwrap(); - DieselError::RollbackTransaction - })?; - - // Increase the usage count for Crucible resources according to the - // contents of the volume. - - // Increase the number of uses for each referenced region snapshot. - use db::schema::region_snapshot::dsl as rs_dsl; - for read_only_target in - &crucible_targets.read_only_targets - { - diesel::update(rs_dsl::region_snapshot) - .filter( - rs_dsl::snapshot_addr - .eq(read_only_target.clone()), ) - .filter(rs_dsl::deleting.eq(false)) - .set( - rs_dsl::volume_references - .eq(rs_dsl::volume_references + 1), - ) - .execute_async(&conn) - .await?; - } + }) + })?; - Ok(volume) + // Increase the usage count for Crucible resources according to the + // contents of the volume. + + // Increase the number of uses for each referenced region snapshot. + use db::schema::region_snapshot::dsl as rs_dsl; + for read_only_target in &crucible_targets.read_only_targets + { + diesel::update(rs_dsl::region_snapshot) + .filter( + rs_dsl::snapshot_addr + .eq(read_only_target.clone()), + ) + .filter(rs_dsl::deleting.eq(false)) + .set( + rs_dsl::volume_references + .eq(rs_dsl::volume_references + 1), + ) + .execute_async(&conn) + .await?; } - }, - retry_helper.as_callback(), - ) + + Ok(volume) + } + }) .await .map_err(|e| { - if let Some(err) = Arc::try_unwrap(err).unwrap().take() { + if let Some(err) = err.take() { match err { VolumeCreationError::Public(err) => err, VolumeCreationError::SerdeError(err) => { @@ -221,14 +208,11 @@ impl DataStore { // types that require it). The generation number (along with the // rest of the volume data) that was in the database is what is // returned to the caller. - let err = Arc::new(OnceLock::new()); - let retry_helper = RetryHelper::new( - &self.transaction_retry_producer, - "volume_checkout", - ); - self.pool_connection_unauthorized() - .await? - .transaction_async_with_retry(|conn| { + let err = OptionalError::new(); + let conn = self.pool_connection_unauthorized().await?; + + self.transaction_retry_wrapper("volume_checkout") + .transaction(&conn, |conn| { let err = err.clone(); async move { // Grab the volume in question. @@ -241,8 +225,7 @@ impl DataStore { // Turn the volume.data into the VolumeConstructionRequest let vcr: VolumeConstructionRequest = serde_json::from_str(volume.data()).map_err(|e| { - err.set(VolumeGetError::SerdeError(e)).unwrap(); - DieselError::RollbackTransaction + err.bail(VolumeGetError::SerdeError(e)) })?; // Look to see if the VCR is a Volume type, and if so, look at @@ -302,8 +285,7 @@ impl DataStore { &new_vcr, ) .map_err(|e| { - err.set(VolumeGetError::SerdeError(e)).unwrap(); - DieselError::RollbackTransaction + err.bail(VolumeGetError::SerdeError(e)) })?; // Update the original volume_id with the new @@ -320,13 +302,12 @@ impl DataStore { // not, then something is terribly wrong in the // database. if num_updated != 1 { - err.set( + return Err(err.bail( VolumeGetError::UnexpectedDatabaseUpdate( num_updated, 1, ), - ).unwrap(); - return Err(DieselError::RollbackTransaction); + )); } } } @@ -355,10 +336,10 @@ impl DataStore { } Ok(volume) } - }, retry_helper.as_callback()) + }) .await .map_err(|e| { - if let Some(err) = err.get() { + if let Some(err) = err.take() { return Error::internal_error(&format!("Transaction error: {}", err)); } public_error_from_diesel(e, ErrorHandler::Server) @@ -677,14 +658,10 @@ impl DataStore { // data from original volume_id. // - Put the new temp VCR into the temp volume.data, update the // temp_volume in the database. - let err = Arc::new(OnceLock::new()); - let retry_helper = RetryHelper::new( - &self.transaction_retry_producer, - "volume_remove_rop", - ); - self.pool_connection_unauthorized() - .await? - .transaction_async_with_retry(|conn| { + let err = OptionalError::new(); + let conn = self.pool_connection_unauthorized().await?; + self.transaction_retry_wrapper("volume_remove_rop") + .transaction(&conn, |conn| { let err = err.clone(); async move { // Grab the volume in question. If the volume record was already @@ -724,12 +701,11 @@ impl DataStore { volume.data() ) .map_err(|e| { - err.set( + err.bail( RemoveReadOnlyParentError::SerdeError( e, ) - ).unwrap(); - DieselError::RollbackTransaction + ) })?; match vcr { @@ -757,10 +733,9 @@ impl DataStore { &new_vcr ) .map_err(|e| { - err.set(RemoveReadOnlyParentError::SerdeError( + err.bail(RemoveReadOnlyParentError::SerdeError( e, - )).unwrap(); - DieselError::RollbackTransaction + )) })?; // Update the original volume_id with the new @@ -776,8 +751,7 @@ impl DataStore { // not, then something is terribly wrong in the // database. if num_updated != 1 { - err.set(RemoveReadOnlyParentError::UnexpectedDatabaseUpdate(num_updated, 1)).unwrap(); - return Err(DieselError::RollbackTransaction); + return Err(err.bail(RemoveReadOnlyParentError::UnexpectedDatabaseUpdate(num_updated, 1))); } // Make a new VCR, with the information from @@ -794,10 +768,9 @@ impl DataStore { &rop_vcr ) .map_err(|e| { - err.set(RemoveReadOnlyParentError::SerdeError( + err.bail(RemoveReadOnlyParentError::SerdeError( e, - )).unwrap(); - DieselError::RollbackTransaction + )) })?; // Update the temp_volume_id with the volume // data that contains the read_only_parent. @@ -809,8 +782,7 @@ impl DataStore { .execute_async(&conn) .await?; if num_updated != 1 { - err.set(RemoveReadOnlyParentError::UnexpectedDatabaseUpdate(num_updated, 1)).unwrap(); - return Err(DieselError::RollbackTransaction); + return Err(err.bail(RemoveReadOnlyParentError::UnexpectedDatabaseUpdate(num_updated, 1))); } Ok(true) } @@ -828,10 +800,10 @@ impl DataStore { } } } - }, retry_helper.as_callback()) + }) .await .map_err(|e| { - if let Some(err) = err.get() { + if let Some(err) = err.take() { return Error::internal_error(&format!("Transaction error: {}", err)); } public_error_from_diesel(e, ErrorHandler::Server) diff --git a/nexus/db-queries/src/db/datastore/vpc.rs b/nexus/db-queries/src/db/datastore/vpc.rs index 67ee644942..069ce63028 100644 --- a/nexus/db-queries/src/db/datastore/vpc.rs +++ b/nexus/db-queries/src/db/datastore/vpc.rs @@ -36,8 +36,7 @@ use crate::db::queries::vpc::InsertVpcQuery; use crate::db::queries::vpc::VniSearchIter; use crate::db::queries::vpc_subnet::FilterConflictingVpcSubnetRangesQuery; use crate::db::queries::vpc_subnet::SubnetError; -use crate::transaction_retry::RetryHelper; -use async_bb8_diesel::AsyncConnection; +use crate::transaction_retry::OptionalError; use async_bb8_diesel::AsyncRunQueryDsl; use chrono::Utc; use diesel::prelude::*; @@ -61,7 +60,6 @@ use omicron_common::api::external::UpdateResult; use omicron_common::api::external::Vni as ExternalVni; use ref_cast::RefCast; use std::collections::BTreeMap; -use std::sync::{Arc, OnceLock}; use uuid::Uuid; impl DataStore { @@ -586,56 +584,46 @@ impl DataStore { CollectionNotFound, } - let err = Arc::new(OnceLock::new()); - let retry_helper = RetryHelper::new( - &self.transaction_retry_producer, - "vpc_update_firewall_rules", - ); + let err = OptionalError::new(); // TODO-scalability: Ideally this would be a CTE so we don't need to // hold a transaction open across multiple roundtrips from the database, // but for now we're using a transaction due to the severely decreased // legibility of CTEs via diesel right now. - self.pool_connection_authorized(opctx) - .await? - .transaction_async_with_retry( - |conn| { - let err = err.clone(); - let delete_old_query = delete_old_query.clone(); - let rules = rules.clone(); - async move { - delete_old_query.execute_async(&conn).await?; - - // The generation count update on the vpc table row will take a - // write lock on the row, ensuring that the vpc was not deleted - // concurently. - if rules_is_empty { - return Ok(vec![]); - } - Vpc::insert_resource( - authz_vpc.id(), - diesel::insert_into(dsl::vpc_firewall_rule) - .values(rules), - ) - .insert_and_get_results_async(&conn) - .await - .map_err(|e| match e { - AsyncInsertError::CollectionNotFound => { - err.set( - FirewallUpdateError::CollectionNotFound, - ) - .unwrap(); - return DieselError::RollbackTransaction; - } - AsyncInsertError::DatabaseError(e) => e, - }) + let conn = self.pool_connection_authorized(opctx).await?; + + self.transaction_retry_wrapper("vpc_update_firewall_rules") + .transaction(&conn, |conn| { + let err = err.clone(); + let delete_old_query = delete_old_query.clone(); + let rules = rules.clone(); + async move { + delete_old_query.execute_async(&conn).await?; + + // The generation count update on the vpc table row will take a + // write lock on the row, ensuring that the vpc was not deleted + // concurently. + if rules_is_empty { + return Ok(vec![]); } - }, - retry_helper.as_callback(), - ) + Vpc::insert_resource( + authz_vpc.id(), + diesel::insert_into(dsl::vpc_firewall_rule) + .values(rules), + ) + .insert_and_get_results_async(&conn) + .await + .map_err(|e| match e { + AsyncInsertError::CollectionNotFound => { + err.bail(FirewallUpdateError::CollectionNotFound) + } + AsyncInsertError::DatabaseError(e) => e, + }) + } + }) .await .map_err(|e| { - if let Some(err) = err.get() { + if let Some(err) = err.take() { match err { FirewallUpdateError::CollectionNotFound => { Error::not_found_by_id( diff --git a/nexus/db-queries/src/transaction_retry.rs b/nexus/db-queries/src/transaction_retry.rs index 1cbf72c121..6675eb7c71 100644 --- a/nexus/db-queries/src/transaction_retry.rs +++ b/nexus/db-queries/src/transaction_retry.rs @@ -241,7 +241,6 @@ mod test { use super::*; use crate::db::datastore::datastore_test; - use async_bb8_diesel::AsyncConnection; use nexus_test_utils::db::test_setup_database; use omicron_test_utils::dev; use oximeter::types::FieldValue; @@ -259,18 +258,15 @@ mod test { let conn = datastore.pool_connection_for_tests().await.unwrap(); - let retry_helper = RetryHelper::new( - datastore.transaction_retry_producer(), - "test_transaction_rollback_produces_no_samples", - ); - conn.transaction_async_with_retry( - |_conn| async move { + datastore + .transaction_retry_wrapper( + "test_transaction_rollback_produces_no_samples", + ) + .transaction(&conn, |_conn| async move { Err::<(), _>(diesel::result::Error::RollbackTransaction) - }, - retry_helper.as_callback(), - ) - .await - .expect_err("Should have failed"); + }) + .await + .expect_err("Should have failed"); let samples = datastore .transaction_retry_producer() @@ -294,22 +290,18 @@ mod test { let (_opctx, datastore) = datastore_test(&logctx, &db).await; let conn = datastore.pool_connection_for_tests().await.unwrap(); - - let retry_helper = RetryHelper::new( - datastore.transaction_retry_producer(), - "test_transaction_retry_produces_samples", - ); - conn.transaction_async_with_retry( - |_conn| async move { + datastore + .transaction_retry_wrapper( + "test_transaction_retry_produces_samples", + ) + .transaction(&conn, |_conn| async move { Err::<(), _>(diesel::result::Error::DatabaseError( diesel::result::DatabaseErrorKind::SerializationFailure, Box::new("restart transaction: Retry forever!".to_string()), )) - }, - retry_helper.as_callback(), - ) - .await - .expect_err("Should have failed"); + }) + .await + .expect_err("Should have failed"); let samples = datastore .transaction_retry_producer()