-
I'm trying to walk a directory using the code below. It works fine when the directory is small. But on large directories, I quickly get "Couldn't read the directory" because of "Too many open files". The basic problem is clear: I'm traversing the tree in a breadth-first manner, with a task for each directory. If too many of these are spawned at once, it exhausts an OS limit. Is there a clean way to solve this? Perhaps by somehow limiting simultaneous concurrency, so only so many use std::path::PathBuf;
use tokio::{fs,sync::mpsc};
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(1024);
scan(PathBuf::from("."), tx).await;
while let Some(p) = rx.recv().await {
println!("{:?}", p);
}
}
async fn scan(path: PathBuf, tx: mpsc::Sender<PathBuf>) {
let mut dir = fs::read_dir(path).await.expect("Couldn't read the directory");
let mut children = Vec::new();
while let Some(child) = dir.next_entry().await.expect("Couldn't read the next directory entry") {
let path = child.path();
let meta = fs::symlink_metadata(&path).await.expect("Couldn't get metadata");
tx.send(path.clone()).await;
if meta.is_dir() {
let path = path.clone();
children.push(path);
}
}
scan_children(children, tx);
}
fn scan_children(children: Vec<PathBuf>, tx: mpsc::Sender<PathBuf>) {
for path in children {
let tx = tx.clone();
tokio::spawn(async move {
scan(path, tx).await
});
}
} |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 9 replies
-
Put a |
Beta Was this translation helpful? Give feedback.
-
I have one more, only tangentially related question. In my example, if I make all the functions return let mut dir = fs::read_dir(path).await.expect("Couldn't read the directory"); to let mut dir = fs::read_dir(path).await?; I don't get the errors, but instead the code fails silently: only a small fraction of the files get reported. I guess the issue is that the errors don't get propagated to the right place. Is there a standard, idiomatic way to fix this? |
Beta Was this translation helpful? Give feedback.
Put a
Semaphore
in anArc
to share it between all the tasks and acquire a new permit on the semaphore every time you open a file (but before opening it). If you make sure the permit is dropped after the file descriptor is closed, this will ensure that no more files are opened than there are permits in the semaphore.