Skip to content

Commit

Permalink
Move some queries into DataStore impls
Browse files Browse the repository at this point in the history
remove unused arguments

don't use a transaction when only one query is performed
  • Loading branch information
jmpesp committed May 30, 2024
1 parent 9970286 commit a155af9
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 179 deletions.
244 changes: 65 additions & 179 deletions dev-tools/omdb/src/bin/omdb/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ use nexus_db_model::SnapshotState;
use nexus_db_model::SwCaboose;
use nexus_db_model::SwRotPage;
use nexus_db_model::UpstairsRepairNotification;
use nexus_db_model::UpstairsRepairNotificationType;
use nexus_db_model::UpstairsRepairProgress;
use nexus_db_model::Vmm;
use nexus_db_model::Volume;
Expand Down Expand Up @@ -603,7 +602,6 @@ impl DbArgs {
command: RegionReplacementCommands::List(args),
}) => {
cmd_db_region_replacement_list(
&opctx,
&datastore,
&self.fetch_opts,
args,
Expand All @@ -623,24 +621,14 @@ impl DbArgs {
DbCommands::RegionReplacement(RegionReplacementArgs {
command: RegionReplacementCommands::Info(args),
}) => {
cmd_db_region_replacement_info(
&opctx,
&datastore,
&self.fetch_opts,
args,
)
.await
cmd_db_region_replacement_info(&opctx, &datastore, args).await
}
DbCommands::RegionReplacement(RegionReplacementArgs {
command: RegionReplacementCommands::Request(args),
}) => {
let token = omdb.check_allow_destructive()?;
cmd_db_region_replacement_request(
&opctx,
&datastore,
&self.fetch_opts,
args,
token,
&opctx, &datastore, args, token,
)
.await
}
Expand Down Expand Up @@ -1530,59 +1518,56 @@ async fn cmd_db_snapshot_info(

/// List all region replacement requests
async fn cmd_db_region_replacement_list(
_opctx: &OpContext,
datastore: &DataStore,
fetch_opts: &DbFetchOptions,
args: &RegionReplacementListArgs,
) -> Result<(), anyhow::Error> {
let ctx = || "listing region replacement requests".to_string();
let limit = fetch_opts.fetch_limit;

let requests: Vec<RegionReplacement> = datastore
.pool_connection_for_tests()
.await?
.transaction_async(|conn| async move {
use db::schema::region_replacement::dsl;

match (args.state, args.after) {
(Some(state), Some(after)) => {
dsl::region_replacement
.filter(dsl::replacement_state.eq(state))
.filter(dsl::request_time.gt(after))
.limit(i64::from(u32::from(limit)))
.select(RegionReplacement::as_select())
.get_results_async(&conn)
.await
}
let requests: Vec<RegionReplacement> = {
let conn = datastore.pool_connection_for_tests().await?;

(Some(state), None) => {
dsl::region_replacement
.filter(dsl::replacement_state.eq(state))
.limit(i64::from(u32::from(limit)))
.select(RegionReplacement::as_select())
.get_results_async(&conn)
.await
}
use db::schema::region_replacement::dsl;

(None, Some(after)) => {
dsl::region_replacement
.filter(dsl::request_time.gt(after))
.limit(i64::from(u32::from(limit)))
.select(RegionReplacement::as_select())
.get_results_async(&conn)
.await
}
match (args.state, args.after) {
(Some(state), Some(after)) => {
dsl::region_replacement
.filter(dsl::replacement_state.eq(state))
.filter(dsl::request_time.gt(after))
.limit(i64::from(u32::from(limit)))
.select(RegionReplacement::as_select())
.get_results_async(&*conn)
.await?
}

(None, None) => {
dsl::region_replacement
.limit(i64::from(u32::from(limit)))
.select(RegionReplacement::as_select())
.get_results_async(&conn)
.await
}
(Some(state), None) => {
dsl::region_replacement
.filter(dsl::replacement_state.eq(state))
.limit(i64::from(u32::from(limit)))
.select(RegionReplacement::as_select())
.get_results_async(&*conn)
.await?
}

(None, Some(after)) => {
dsl::region_replacement
.filter(dsl::request_time.gt(after))
.limit(i64::from(u32::from(limit)))
.select(RegionReplacement::as_select())
.get_results_async(&*conn)
.await?
}
})
.await?;

(None, None) => {
dsl::region_replacement
.limit(i64::from(u32::from(limit)))
.select(RegionReplacement::as_select())
.get_results_async(&*conn)
.await?
}
}
};

check_limit(&requests, limit, ctx);

Expand Down Expand Up @@ -1617,29 +1602,25 @@ async fn cmd_db_region_replacement_list(

/// Display all non-complete region replacements
async fn cmd_db_region_replacement_status(
_opctx: &OpContext,
opctx: &OpContext,
datastore: &DataStore,
fetch_opts: &DbFetchOptions,
) -> Result<(), anyhow::Error> {
let ctx = || "listing region replacement requests".to_string();
let limit = fetch_opts.fetch_limit;

let requests: Vec<RegionReplacement> = datastore
.pool_connection_for_tests()
.await?
.transaction_async(|conn| async move {
use db::schema::region_replacement::dsl;
let requests: Vec<RegionReplacement> = {
let conn = datastore.pool_connection_for_tests().await?;

dsl::region_replacement
.filter(
dsl::replacement_state.ne(RegionReplacementState::Complete),
)
.limit(i64::from(u32::from(limit)))
.select(RegionReplacement::as_select())
.get_results_async(&conn)
.await
})
.await?;
use db::schema::region_replacement::dsl;

dsl::region_replacement
.filter(dsl::replacement_state.ne(RegionReplacementState::Complete))
.limit(i64::from(u32::from(limit)))
.select(RegionReplacement::as_select())
.get_results_async(&*conn)
.await?
};

check_limit(&requests, limit, ctx);

Expand All @@ -1658,41 +1639,16 @@ async fn cmd_db_region_replacement_status(
// downstairs being repaired is a "new" region id. This will give us
// the most recent repair id.
let maybe_repair: Option<UpstairsRepairNotification> = datastore
.pool_connection_for_tests()
.await?
.transaction_async(|conn| async move {
use db::schema::upstairs_repair_notification::dsl;

dsl::upstairs_repair_notification
.filter(dsl::region_id.eq(new_region_id))
.filter(
dsl::notification_type
.eq(UpstairsRepairNotificationType::Started),
)
.order_by(dsl::time.desc())
.limit(1)
.first_async(&conn)
.await
.optional()
})
.most_recent_started_repair_notification(opctx, new_region_id)
.await?;

if let Some(repair) = maybe_repair {
let maybe_repair_progress: Option<UpstairsRepairProgress> =
datastore
.pool_connection_for_tests()
.await?
.transaction_async(|conn| async move {
use db::schema::upstairs_repair_progress::dsl;

dsl::upstairs_repair_progress
.filter(dsl::repair_id.eq(repair.repair_id))
.order_by(dsl::time.desc())
.limit(1)
.first_async(&conn)
.await
.optional()
})
.most_recent_repair_progress(
opctx,
repair.repair_id.into(),
)
.await?;

if let Some(repair_progress) = maybe_repair_progress {
Expand Down Expand Up @@ -1720,34 +1676,14 @@ async fn cmd_db_region_replacement_status(

/// Show details for a single region replacement
async fn cmd_db_region_replacement_info(
_opctx: &OpContext,
opctx: &OpContext,
datastore: &DataStore,
_fetch_opts: &DbFetchOptions,
args: &RegionReplacementInfoArgs,
) -> Result<(), anyhow::Error> {
let request: Option<RegionReplacement> = datastore
.pool_connection_for_tests()
.await?
.transaction_async(|conn| async move {
use db::schema::region_replacement::dsl;

dsl::region_replacement
.filter(dsl::id.eq(args.replacement_id))
.limit(i64::from(1))
.select(RegionReplacement::as_select())
.first_async(&conn)
.await
.optional()
})
let request = datastore
.get_region_replacement_request_by_id(opctx, args.replacement_id)
.await?;

let Some(request) = request else {
bail!(
"no region replacement request with id {} found",
args.replacement_id,
);
};

// Show details
println!(" started: {}", request.request_time);
println!(" state: {:?}", request.replacement_state);
Expand All @@ -1758,18 +1694,7 @@ async fn cmd_db_region_replacement_info(
if let Some(new_region_id) = request.new_region_id {
// Find all related notifications
let notifications: Vec<UpstairsRepairNotification> = datastore
.pool_connection_for_tests()
.await?
.transaction_async(|conn| async move {
use db::schema::upstairs_repair_notification::dsl;

dsl::upstairs_repair_notification
.filter(dsl::region_id.eq(new_region_id))
.order_by(dsl::time.asc())
.select(UpstairsRepairNotification::as_select())
.get_results_async(&conn)
.await
})
.repair_notifications_for_region(opctx, new_region_id)
.await?;

#[derive(Tabled)]
Expand Down Expand Up @@ -1816,41 +1741,13 @@ async fn cmd_db_region_replacement_info(
// and use that to search for progress.

let maybe_repair: Option<UpstairsRepairNotification> = datastore
.pool_connection_for_tests()
.await?
.transaction_async(|conn| async move {
use db::schema::upstairs_repair_notification::dsl;

dsl::upstairs_repair_notification
.filter(dsl::region_id.eq(new_region_id))
.filter(
dsl::notification_type
.eq(UpstairsRepairNotificationType::Started),
)
.order_by(dsl::time.desc())
.limit(1)
.first_async(&conn)
.await
.optional()
})
.most_recent_started_repair_notification(opctx, new_region_id)
.await?;

if let Some(repair) = maybe_repair {
let maybe_repair_progress: Option<UpstairsRepairProgress> =
datastore
.pool_connection_for_tests()
.await?
.transaction_async(|conn| async move {
use db::schema::upstairs_repair_progress::dsl;

dsl::upstairs_repair_progress
.filter(dsl::repair_id.eq(repair.repair_id))
.order_by(dsl::time.desc())
.limit(1)
.first_async(&conn)
.await
.optional()
})
.most_recent_repair_progress(opctx, repair.repair_id.into())
.await?;

if let Some(repair_progress) = maybe_repair_progress {
Expand All @@ -1872,17 +1769,7 @@ async fn cmd_db_region_replacement_info(
// Find the steps that the driver saga has committed to the DB.

let steps: Vec<RegionReplacementStep> = datastore
.pool_connection_for_tests()
.await?
.transaction_async(|conn| async move {
use db::schema::region_replacement_step::dsl;

dsl::region_replacement_step
.filter(dsl::replacement_id.eq(args.replacement_id))
.order_by(dsl::step_time.asc())
.load_async(&conn)
.await
})
.region_replacement_request_steps(opctx, args.replacement_id)
.await?;

#[derive(Tabled)]
Expand Down Expand Up @@ -1938,7 +1825,6 @@ async fn cmd_db_region_replacement_info(
async fn cmd_db_region_replacement_request(
opctx: &OpContext,
datastore: &DataStore,
_fetch_opts: &DbFetchOptions,
args: &RegionReplacementRequestArgs,
_destruction_token: DestructiveOperationToken,
) -> Result<(), anyhow::Error> {
Expand Down
18 changes: 18 additions & 0 deletions nexus/db-queries/src/db/datastore/region_replacement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,24 @@ impl DataStore {
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))
}

/// Return all steps for a region replacement request
pub async fn region_replacement_request_steps(
&self,
opctx: &OpContext,
id: Uuid,
) -> Result<Vec<RegionReplacementStep>, Error> {
use db::schema::region_replacement_step::dsl;

dsl::region_replacement_step
.filter(dsl::replacement_id.eq(id))
.order_by(dsl::step_time.desc())
.get_results_async::<RegionReplacementStep>(
&*self.pool_connection_authorized(opctx).await?,
)
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))
}

/// Record a step taken to drive a region replacement forward
pub async fn add_region_replacement_request_step(
&self,
Expand Down
Loading

0 comments on commit a155af9

Please sign in to comment.