Skip to content

Commit

Permalink
WIP: paginated_read
Browse files Browse the repository at this point in the history
  • Loading branch information
davepacheco committed Dec 6, 2023
1 parent 001143c commit d39b175
Showing 1 changed file with 63 additions and 31 deletions.
94 changes: 63 additions & 31 deletions nexus/db-queries/src/db/datastore/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,45 @@ impl DataStore {
.await
}

async fn paginated_read<T, QueryFn, QueryFnOutput, MarkerFn, NameType>(
batch_size: NonZeroU32,
item2marker: MarkerFn,
do_query: QueryFn,
) -> Result<Vec<T>, Error>
where
QueryFn:
for<'a> Fn(DataPageParams<'a, NameType>, usize) -> QueryFnOutput,
QueryFnOutput: futures::Future<Output = Result<Vec<T>, Error>>,
MarkerFn: Fn(&T) -> &NameType,
NameType: Clone,
{
let mut results = Vec::new();
let mut marker = None;

loop {
let pagparams = DataPageParams {
marker: marker.as_ref(),
direction: dropshot::PaginationOrder::Ascending,
limit: batch_size,
};
let results_batch = do_query(pagparams, results.len()).await?;
let done = results_batch.len()
< usize::try_from(batch_size.get()).unwrap();
if let Some(last_record) = results_batch.last() {
marker = Some(item2marker(last_record).clone())
} else {
assert!(done);
}

results.extend(results_batch.into_iter());
if done {
break;
}
}

Ok(results)
}

/// Private helper for reading a specific version of a group's DNS config
async fn dns_config_read_version(
&self,
Expand Down Expand Up @@ -241,37 +280,30 @@ impl DataStore {

let mut zones = Vec::with_capacity(dns_zones.len());
for zone in dns_zones {
let mut zone_records = Vec::new();
let mut marker = None;

loop {
debug!(log, "listing DNS names for zone";
"dns_zone_id" => zone.id.to_string(),
"dns_zone_name" => &zone.zone_name,
"version" => i64::from(&version.version.0),
"found_so_far" => zone_records.len(),
"batch_size" => batch_size.get(),
);
let pagparams = DataPageParams {
marker: marker.as_ref(),
direction: dropshot::PaginationOrder::Ascending,
limit: batch_size,
};
let names_batch = self
.dns_names_list(opctx, zone.id, version.version, &pagparams)
.await?;
let done = names_batch.len()
< usize::try_from(batch_size.get()).unwrap();
if let Some((last_name, _)) = names_batch.last() {
marker = Some(last_name.clone());
} else {
assert!(done);
}
zone_records.extend(names_batch.into_iter());
if done {
break;
}
}
let zone_name = &zone.zone_name;
let zone_records = Self::paginated_read(
batch_size,
|(name, _)| name,
|pagparams, nsofar| async move {
let zone_name = zone_name.clone();
debug!(log, "listing DNS names for zone";
"dns_zone_id" => zone.id.to_string(),
"dns_zone_name" => zone_name,
"version" => i64::from(&version.version.0),
"found_so_far" => nsofar,
"batch_size" => batch_size.get(),
);

self.dns_names_list(
opctx,
zone.id,
version.version,
&pagparams,
)
.await
},
)
.await?;

debug!(log, "found all DNS names for zone";
"dns_zone_id" => zone.id.to_string(),
Expand Down

0 comments on commit d39b175

Please sign in to comment.