From d39b175dfb0b3ce325dcd068979cd7b591fb1c91 Mon Sep 17 00:00:00 2001 From: David Pacheco Date: Wed, 6 Dec 2023 10:18:18 -0800 Subject: [PATCH] WIP: paginated_read --- nexus/db-queries/src/db/datastore/dns.rs | 94 ++++++++++++++++-------- 1 file changed, 63 insertions(+), 31 deletions(-) diff --git a/nexus/db-queries/src/db/datastore/dns.rs b/nexus/db-queries/src/db/datastore/dns.rs index cfd25d6a4f..908c43674f 100644 --- a/nexus/db-queries/src/db/datastore/dns.rs +++ b/nexus/db-queries/src/db/datastore/dns.rs @@ -208,6 +208,45 @@ impl DataStore { .await } + async fn paginated_read( + batch_size: NonZeroU32, + item2marker: MarkerFn, + do_query: QueryFn, + ) -> Result, Error> + where + QueryFn: + for<'a> Fn(DataPageParams<'a, NameType>, usize) -> QueryFnOutput, + QueryFnOutput: futures::Future, Error>>, + MarkerFn: Fn(&T) -> &NameType, + NameType: Clone, + { + let mut results = Vec::new(); + let mut marker = None; + + loop { + let pagparams = DataPageParams { + marker: marker.as_ref(), + direction: dropshot::PaginationOrder::Ascending, + limit: batch_size, + }; + let results_batch = do_query(pagparams, results.len()).await?; + let done = results_batch.len() + < usize::try_from(batch_size.get()).unwrap(); + if let Some(last_record) = results_batch.last() { + marker = Some(item2marker(last_record).clone()) + } else { + assert!(done); + } + + results.extend(results_batch.into_iter()); + if done { + break; + } + } + + Ok(results) + } + /// Private helper for reading a specific version of a group's DNS config async fn dns_config_read_version( &self, @@ -241,37 +280,30 @@ impl DataStore { let mut zones = Vec::with_capacity(dns_zones.len()); for zone in dns_zones { - let mut zone_records = Vec::new(); - let mut marker = None; - - loop { - debug!(log, "listing DNS names for zone"; - "dns_zone_id" => zone.id.to_string(), - "dns_zone_name" => &zone.zone_name, - "version" => i64::from(&version.version.0), - "found_so_far" => zone_records.len(), - "batch_size" => batch_size.get(), - ); - let pagparams = DataPageParams { - marker: marker.as_ref(), - direction: dropshot::PaginationOrder::Ascending, - limit: batch_size, - }; - let names_batch = self - .dns_names_list(opctx, zone.id, version.version, &pagparams) - .await?; - let done = names_batch.len() - < usize::try_from(batch_size.get()).unwrap(); - if let Some((last_name, _)) = names_batch.last() { - marker = Some(last_name.clone()); - } else { - assert!(done); - } - zone_records.extend(names_batch.into_iter()); - if done { - break; - } - } + let zone_name = &zone.zone_name; + let zone_records = Self::paginated_read( + batch_size, + |(name, _)| name, + |pagparams, nsofar| async move { + let zone_name = zone_name.clone(); + debug!(log, "listing DNS names for zone"; + "dns_zone_id" => zone.id.to_string(), + "dns_zone_name" => zone_name, + "version" => i64::from(&version.version.0), + "found_so_far" => nsofar, + "batch_size" => batch_size.get(), + ); + + self.dns_names_list( + opctx, + zone.id, + version.version, + &pagparams, + ) + .await + }, + ) + .await?; debug!(log, "found all DNS names for zone"; "dns_zone_id" => zone.id.to_string(),