Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Total queue size in the disk #35

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
mod iter;
mod receiver;
mod sender;
mod size;

pub use iter::QueueIter;
pub use receiver::{Receiver, ReceiverBuilder, RecvGuard};
Expand Down
10 changes: 10 additions & 0 deletions src/queue/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,16 @@ impl Receiver {
ReceiverBuilder::default().open(base)
}

/// Gets the total size of the queue in bytes in disk.
///
/// # Errors
///
/// This function will return an IO error if any errors are encountered while going
/// throught the listed files in disk.
pub fn get_size(&self) -> io::Result<u64> {
super::size::total_directory_size(&self.base)
}

/// Starts a transaction in the queue.
fn begin(&mut self) {
log::debug!("begin transaction in {:?} at {:?}", self.base, self.state);
Expand Down
10 changes: 10 additions & 0 deletions src/queue/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,16 @@ impl Sender {
SenderBuilder::default().open(base)
}

/// Gets the total size of the queue in bytes in disk.
///
/// # Errors
///
/// This function will return an IO error if any errors are encountered while going
/// throught the listed files in disk.
pub fn get_size(&self) -> io::Result<u64> {
super::size::total_directory_size(&self.base)
}

/// Saves the sender queue state. You do not need to use method in most
/// circumstances, since it is automatically done on drop (yes, it will be
/// called eve if your thread panics). However, you can use this function to
Expand Down
25 changes: 25 additions & 0 deletions src/queue/size.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
//! Utilities for calculating the size of a queue.

use std::path::Path;
use std::{fs, io};

/// Goes through each file in the given directory _non-recursively_ and adds up all their
/// sizes.
pub(crate) fn total_directory_size(dir: &Path) -> Result<u64, io::Error> {
let mut size = 0;

for maybe_entry in fs::read_dir(dir)? {
let metadata = maybe_entry?.metadata()?;
if !metadata.is_file() {
continue;
}

// Here, we will count the size of _all_ files, not only of the queue segment
// files `*.q`. This might be useful for wusers with many small queues. So, no
// filtering based on extension.

size += metadata.len();
}

Ok(size)
}
2 changes: 2 additions & 0 deletions src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ impl QueueStatePersistence {
QueueStatePersistence::default()
}

/// Opens a new file persistence stored inside the _folder_ `base` (i.e., it will
/// look for the file `{base}/recv-metadata`).
pub fn open<P: AsRef<Path>>(&mut self, base: P) -> io::Result<QueueState> {
let path = recv_persistence_filename(base);
self.path = Some(path.clone());
Expand Down