Skip to content

Commit

Permalink
receiver builder
Browse files Browse the repository at this point in the history
  • Loading branch information
Pedro Arruda committed Mar 13, 2021
1 parent 1cd5f6d commit 99630be
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 28 deletions.
8 changes: 7 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ This is _quite_ significant in terms of throughput.

## Version 0.3.3:

* Solved a bug in `recovery::unlock`: the file was not being parse correctly.
* Solved a bug in `recovery::unlock`: the file was not being parsed correctly.
* `recovery::unlock` now ignores missing files, as it should.
* Exposed `FileGuard`.

Expand Down Expand Up @@ -115,3 +115,9 @@ issue.
* Dropping the Receiver forced the `state` to be saved, not the `initial_state` (the
state at the begining of the current transaction). Now, `Drop` calls `Receiver::save`
so that the behavior will be always consistent.
* We have a backup strategy for saving the queue! It invlves no asyc stuff, so it will
only be triggered at the end of a transction. The current criterion is: save at every
250 items read or every 350ms, whichever comes first. This should dimiinish greatly
the necessity for external control of the save mechanism.
* Created a `ReceiverBuilder` to allow people to costumize the way the queue is saved.
This includes altering the above defaults or disabling queue saving altogther.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,4 @@ pub mod queue;
pub mod recovery;

pub use error::{TryRecvError, TrySendError};
pub use queue::{channel, Receiver, Sender, SenderBuilder};
pub use queue::{channel, Receiver, Sender, SenderBuilder, ReceiverBuilder};
2 changes: 1 addition & 1 deletion src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
mod receiver;
mod sender;

pub use receiver::{Receiver, RecvGuard};
pub use receiver::{Receiver, ReceiverBuilder, RecvGuard};
pub use sender::{Sender, SenderBuilder};

#[cfg(feature = "recovery")]
Expand Down
131 changes: 106 additions & 25 deletions src/queue/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::future::Future;
use std::io::{self};
use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};

use crate::error::TryRecvError;
use crate::header::Header;
Expand Down Expand Up @@ -39,32 +40,35 @@ pub(crate) async fn acquire_recv_lock<P: AsRef<Path>>(base: P) -> io::Result<Fil
FileGuard::lock(recv_lock_filename(base.as_ref())).await
}

/// The receiver part of the queue. This part is asynchronous and therefore
/// needs an executor that will the poll the futures to completion.
pub struct Receiver {
/// The path to the folder holding the queue.
base: PathBuf,
/// The acquired receiver lock file for this queue.
_file_guard: FileGuard,
/// The current segment being tailed.
tail_follower: TailFollower,
/// The last header read from the queue.
maybe_header: Option<[u8; 4]>,
/// The current queue state.
state: QueueState,
/// The queue state as it was in the begining of the current transaction.
initial_state: QueueState,
/// The queue state saver/loader.
persistence: QueueStatePersistence,
/// Use this queue to buffer elements and provide "atomicity in an
/// asynchronous context". We need to backup the state of the queue before
/// the read so as to restore it as the "initial state" (the _actual_ state
/// of the queue) at the end of a transaction. Otherwise, dataloss would
/// occur.
read_and_unused: VecDeque<Vec<u8>>,
pub struct ReceiverBuilder {
save_every_nth: Option<usize>,
save_every: Option<Duration>,
}

impl Receiver {
impl Default for ReceiverBuilder {
fn default() -> ReceiverBuilder {
ReceiverBuilder {
save_every_nth: Some(250),
save_every: Some(Duration::from_millis(350)),
}
}
}

impl ReceiverBuilder {
pub fn new() -> ReceiverBuilder {
ReceiverBuilder::default()
}

pub fn save_every_nth(mut self, nth: Option<usize>) -> ReceiverBuilder {
self.save_every_nth = nth;
self
}

pub fn save_every(mut self, duration: Option<Duration>) -> ReceiverBuilder {
self.save_every = duration;
self
}

/// Opens a queue for reading. The access will be exclusive, based on the
/// existence of the temporary file `recv.lock` inside the queue folder.
///
Expand All @@ -78,7 +82,7 @@ impl Receiver {
///
/// This function will panic if it is not able to set up the notification
/// handler to watch for file changes.
pub fn open<P: AsRef<Path>>(base: P) -> io::Result<Receiver> {
pub fn open<P: AsRef<Path>>(self, base: P) -> io::Result<Receiver> {
// Guarantee that the queue exists:
create_dir_all(base.as_ref())?;

Expand Down Expand Up @@ -109,7 +113,62 @@ impl Receiver {
base: PathBuf::from(base.as_ref()),
persistence,
read_and_unused: VecDeque::new(),
save_every: self.save_every,
save_every_nth: self.save_every_nth,
n_reads: 0,
last_saved_at: Instant::now(),
})
}}

/// The receiver part of the queue. This part is asynchronous and therefore
/// needs an executor that will the poll the futures to completion.
pub struct Receiver {
/// The path to the folder holding the queue.
base: PathBuf,
/// The acquired receiver lock file for this queue.
_file_guard: FileGuard,
/// The current segment being tailed.
tail_follower: TailFollower,
/// The last header read from the queue.
maybe_header: Option<[u8; 4]>,
/// The current queue state.
state: QueueState,
/// The queue state as it was in the begining of the current transaction.
initial_state: QueueState,
/// The queue state saver/loader.
persistence: QueueStatePersistence,
/// Use this queue to buffer elements and provide "atomicity in an
/// asynchronous context". We need to backup the state of the queue before
/// the read so as to restore it as the "initial state" (the _actual_ state
/// of the queue) at the end of a transaction. Otherwise, dataloss would
/// occur.
read_and_unused: VecDeque<Vec<u8>>,
/// Save the queue every n operations
save_every_nth: Option<usize>,
/// Save the queue every interval of time. This will be enforced _synchronously_; no timers involved.
save_every: Option<Duration>,
/// Number of operations done in this `Receiver`
n_reads: usize,
/// Last time the queue was saved:
last_saved_at: Instant,
}

impl Receiver {
/// Opens a queue for reading. The access will be exclusive, based on the
/// existence of the temporary file `recv.lock` inside the queue folder.
///
/// # Errors
///
/// This function will return an IO error if the queue is already in use for
/// receiving, which is indicated by a lock file. Also, any other IO error
/// encountered while opening will be sent.
///
/// # Panics
///
/// This function will panic if it is not able to set up the notification
/// handler to watch for file changes.
pub fn open<P: AsRef<Path>>(base: P) -> io::Result<Receiver> {
ReceiverBuilder::default().open(base)
}

/// Starts a transaction in the queue.
Expand Down Expand Up @@ -193,6 +252,9 @@ impl Receiver {
// self.state
// };

// Finally save if it is time to save:
self.maybe_save()?;

Ok(())
}

Expand Down Expand Up @@ -257,6 +319,9 @@ impl Receiver {
// Ready to be used:
self.read_and_unused.push_back(data);

// Bookkeeping:
self.n_reads += 1;

Ok(())
}

Expand Down Expand Up @@ -315,6 +380,22 @@ impl Receiver {
self.persistence.save(&self.initial_state) // this aviods saving an in-flight
}

fn maybe_save(&mut self) -> io::Result<()> {
if let Some(save_every_nth) = self.save_every_nth {
if self.n_reads % save_every_nth == 0 {
self.save()?;
}
}

else if let Some(save_every) = self.save_every {
if self.last_saved_at.elapsed() >= save_every {
self.save()?;
}
}

Ok(())
}

/// Retrieves an element from the queue. The returned value is a
/// guard that will only commit state changes to the queue when dropped.
///
Expand Down

0 comments on commit 99630be

Please sign in to comment.