Skip to content

Commit

Permalink
add a helper for querying the database in batches (#4632)
Browse files Browse the repository at this point in the history
  • Loading branch information
davepacheco authored Dec 6, 2023
1 parent 1a3443c commit c581163
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 20 deletions.
29 changes: 10 additions & 19 deletions nexus/db-queries/src/db/datastore/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::db::model::DnsZone;
use crate::db::model::Generation;
use crate::db::model::InitialDnsGroup;
use crate::db::pagination::paginated;
use crate::db::pagination::Paginator;
use crate::db::pool::DbConnection;
use crate::db::TransactionError;
use async_bb8_diesel::AsyncConnection;
Expand Down Expand Up @@ -242,35 +243,25 @@ impl DataStore {
let mut zones = Vec::with_capacity(dns_zones.len());
for zone in dns_zones {
let mut zone_records = Vec::new();
let mut marker = None;

loop {
let mut paginator = Paginator::new(batch_size);
while let Some(p) = paginator.next() {
debug!(log, "listing DNS names for zone";
"dns_zone_id" => zone.id.to_string(),
"dns_zone_name" => &zone.zone_name,
"version" => i64::from(&version.version.0),
"found_so_far" => zone_records.len(),
"batch_size" => batch_size.get(),
);
let pagparams = DataPageParams {
marker: marker.as_ref(),
direction: dropshot::PaginationOrder::Ascending,
limit: batch_size,
};
let names_batch = self
.dns_names_list(opctx, zone.id, version.version, &pagparams)
.dns_names_list(
opctx,
zone.id,
version.version,
&p.current_pagparams(),
)
.await?;
let done = names_batch.len()
< usize::try_from(batch_size.get()).unwrap();
if let Some((last_name, _)) = names_batch.last() {
marker = Some(last_name.clone());
} else {
assert!(done);
}
paginator = p.found_batch(&names_batch, &|(n, _)| n.clone());
zone_records.extend(names_batch.into_iter());
if done {
break;
}
}

debug!(log, "found all DNS names for zone";
Expand Down
3 changes: 2 additions & 1 deletion nexus/db-queries/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ pub(crate) mod error;
mod explain;
pub mod fixed_data;
pub mod lookup;
mod pagination;
// Public for doctests.
pub mod pagination;
mod pool;
// This is marked public because the error types are used elsewhere, e.g., in
// sagas.
Expand Down
184 changes: 184 additions & 0 deletions nexus/db-queries/src/db/pagination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use diesel::AppearsOnTable;
use diesel::Column;
use diesel::{ExpressionMethods, QueryDsl};
use omicron_common::api::external::DataPageParams;
use std::num::NonZeroU32;

// Shorthand alias for "the SQL type of the whole table".
type TableSqlType<T> = <T as AsQuery>::SqlType;
Expand Down Expand Up @@ -169,6 +170,145 @@ where
}
}

/// Helper for querying a large number of records from the database in batches
///
/// Without this helper: a typical way to perform paginated queries would be to
/// invoke some existing "list" function in the datastore that itself is
/// paginated. Such functions accept a `pagparams: &DataPageParams` argument
/// that uses a marker to identify where the next page of results starts. For
/// the first call, the marker inside `pagparams` is `None`. For subsequent
/// calls, it's typically some field from the last item returned in the previous
/// page. You're finished when you get a result set smaller than the batch
/// size.
///
/// This helper takes care of most of the logic for you. To use this, you first
/// create a `Paginator` with a specific batch_size. Then you call `next()` in
/// a loop. Each iteration will provide you with a `DataPageParams` to use to
/// call your list function. When you've fetched the next page, you have to
/// let the helper look at it to determine if there's another page to fetch and
/// what marker to use.
///
/// ## Example
///
/// ```
/// use nexus_db_queries::db::pagination::Paginator;
/// use omicron_common::api::external::DataPageParams;
///
/// let batch_size = std::num::NonZeroU32::new(3).unwrap();
///
/// // Assume you've got an existing paginated "list items" function.
/// // This simple implementation returns a few full batches, then a partial
/// // batch.
/// type Marker = u32;
/// type Item = u32;
/// let do_query = |pagparams: &DataPageParams<'_, Marker> | {
/// match pagparams.marker {
/// None => (0..batch_size.get()).collect(),
/// Some(x) if *x < 2 * batch_size.get() => (x+1..x+1+batch_size.get()).collect(),
/// Some(x) => vec![*x + 1],
/// }
/// };
///
/// // This closure translates from one of the returned item to the field in
/// // that item that servers as the marker. This example is contrived.
/// let item2marker: &dyn Fn(&Item) -> Marker = &|u: &u32| *u;
///
/// let mut all_records = Vec::new();
/// let mut paginator = Paginator::new(batch_size);
/// while let Some(p) = paginator.next() {
/// let records_batch = do_query(&p.current_pagparams());
/// paginator = p.found_batch(&records_batch, item2marker);
/// all_records.extend(records_batch.into_iter());
/// }
///
/// // Results are in `all_records`.
/// assert_eq!(all_records, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
/// ```
///
/// ## Design notes
///
/// The separation of `Paginator` and `PaginatorHelper` is aimed at making it
/// harder to misuse this interface. We could skip the helper altogether and
/// just have `Paginator::next()` return the DatePageParams directly. But you'd
/// still need a `Paginator::found_batch()`. And it would be easy to forget to
/// call this, leading to an infinite loop at runtime. To avoid this mistake,
/// `Paginator::next()` consumes `self`. You can't get another `Paginator` back
/// until you use `PaginatorHelper::found_batch()`. That also consumes `self`
/// so that you can't keep using the old `DataPageParams`.
pub struct Paginator<N> {
batch_size: NonZeroU32,
state: PaginatorState<N>,
}

impl<N> Paginator<N> {
pub fn new(batch_size: NonZeroU32) -> Paginator<N> {
Paginator { batch_size, state: PaginatorState::Initial }
}

pub fn next(self) -> Option<PaginatorHelper<N>> {
match self.state {
PaginatorState::Initial => Some(PaginatorHelper {
batch_size: self.batch_size,
marker: None,
}),
PaginatorState::Middle { marker } => Some(PaginatorHelper {
batch_size: self.batch_size,
marker: Some(marker),
}),
PaginatorState::Done => None,
}
}
}

enum PaginatorState<N> {
Initial,
Middle { marker: N },
Done,
}

pub struct PaginatorHelper<N> {
batch_size: NonZeroU32,
marker: Option<N>,
}

impl<N> PaginatorHelper<N> {
/// Returns the `DatePageParams` to use to fetch the next page of results
pub fn current_pagparams(&self) -> DataPageParams<'_, N> {
DataPageParams {
marker: self.marker.as_ref(),
direction: dropshot::PaginationOrder::Ascending,
limit: self.batch_size,
}
}

/// Report a page of results
///
/// This function looks at the returned results to determine whether we've
/// finished iteration or whether we need to fetch another page (and if so,
/// this determines the marker for the next fetch operation).
///
/// This function returns a `Paginator` used to make the next request. See
/// the example on `Paginator` for usage.
pub fn found_batch<T>(
self,
batch: &[T],
item2marker: &dyn Fn(&T) -> N,
) -> Paginator<N> {
let state =
if batch.len() < usize::try_from(self.batch_size.get()).unwrap() {
PaginatorState::Done
} else {
// self.batch_size is non-zero, so if we got at least that many
// items, then there's at least one.
let last = batch.iter().last().unwrap();
let marker = item2marker(last);
PaginatorState::Middle { marker }
};

Paginator { batch_size: self.batch_size, state }
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down Expand Up @@ -433,4 +573,48 @@ mod test {
let _ = db.cleanup().await;
logctx.cleanup_successful();
}

#[test]
fn test_paginator() {
// The doctest exercises a basic case for Paginator. Here we test some
// edge cases.
let batch_size = std::num::NonZeroU32::new(3).unwrap();

type Marker = u32;
#[derive(Debug, PartialEq, Eq)]
struct Item {
value: String,
marker: Marker,
}

let do_list =
|query: &dyn Fn(&DataPageParams<'_, Marker>) -> Vec<Item>| {
let mut all_records = Vec::new();
let mut paginator = Paginator::new(batch_size);
while let Some(p) = paginator.next() {
let records_batch = query(&p.current_pagparams());
paginator =
p.found_batch(&records_batch, &|i: &Item| i.marker);
all_records.extend(records_batch.into_iter());
}
all_records
};

fn mkitem(v: u32) -> Item {
Item { value: v.to_string(), marker: v }
}

// Trivial case: first page is empty
assert_eq!(Vec::<Item>::new(), do_list(&|_| Vec::new()));

// Exactly one batch-size worth of items
// (exercises the cases where the last non-empty batch is full, and
// where any batch is empty)
let my_query =
|pagparams: &DataPageParams<'_, Marker>| match &pagparams.marker {
None => (0..batch_size.get()).map(mkitem).collect(),
Some(_) => Vec::new(),
};
assert_eq!(vec![mkitem(0), mkitem(1), mkitem(2)], do_list(&my_query));
}
}

0 comments on commit c581163

Please sign in to comment.