Skip to content

Commit

Permalink
Prefer future stream over JoinSet in downloader (#469)
Browse files Browse the repository at this point in the history
This avoids introducing a static lifetime requirement and, in my
benchmarks, is even a little faster.
  • Loading branch information
charliermarsh authored Nov 20, 2023
1 parent 8decb29 commit 60f595b
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 33 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/puffin-installer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pypi-types = { path = "../pypi-types" }

anyhow = { workspace = true }
fs-err = { workspace = true }
futures = { workspace = true }
fxhash = { workspace = true }
rayon = { workspace = true }
tempfile = { workspace = true }
Expand Down
63 changes: 30 additions & 33 deletions crates/puffin-installer/src/downloader.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::cmp::Reverse;
use std::path::{Path, PathBuf};
use std::path::Path;
use std::sync::Arc;

use anyhow::{bail, Result};
use tokio::task::JoinSet;
use futures::StreamExt;

use distribution_types::{Dist, RemoteSource};
use puffin_client::RegistryClient;
Expand Down Expand Up @@ -55,27 +55,12 @@ impl<'a> Downloader<'a> {
});

// Fetch the distributions in parallel.
let mut fetches = JoinSet::new();
let mut downloads = Vec::with_capacity(dists.len());
for dist in dists {
if self.no_build && matches!(dist, Dist::Source(_)) {
bail!(
"Building source distributions is disabled, not downloading {}",
dist
);
}

fetches.spawn(fetch(
dist.clone(),
self.client.clone(),
self.cache.to_path_buf(),
self.locks.clone(),
));
}

while let Some(result) = fetches.join_next().await.transpose()? {
let result = result?;
let mut fetches = futures::stream::iter(dists)
.map(|dist| self.fetch(dist))
.buffer_unordered(50);

while let Some(result) = fetches.next().await.transpose()? {
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_download_progress(&result);
}
Expand All @@ -89,19 +74,31 @@ impl<'a> Downloader<'a> {

Ok(downloads)
}
}

/// Download a built distribution (wheel) or source distribution (sdist).
async fn fetch(
dist: Dist,
client: RegistryClient,
cache: PathBuf,
locks: Arc<Locks>,
) -> Result<Download> {
let lock = locks.acquire(&dist).await;
let _guard = lock.lock().await;
let metadata = Fetcher::new(&cache).fetch_dist(&dist, &client).await?;
Ok(metadata)
/// Download a built distribution (wheel) or source distribution (sdist).
async fn fetch(&self, dist: Dist) -> Result<Download> {
match dist {
Dist::Source(_) => {
if self.no_build {
bail!("Building source distributions is disabled; skipping: {dist}");
}

let lock = self.locks.acquire(&dist).await;
let _guard = lock.lock().await;

let metadata = Fetcher::new(self.cache)
.fetch_dist(&dist, self.client)
.await?;
Ok(metadata)
}
Dist::Built(_) => {
let metadata = Fetcher::new(self.cache)
.fetch_dist(&dist, self.client)
.await?;
Ok(metadata)
}
}
}
}

pub trait Reporter: Send + Sync {
Expand Down

0 comments on commit 60f595b

Please sign in to comment.