Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

try_recv_batch panics on Fuzztest #21

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
{
"cSpell.words": [
"sysinfo"
"Asbt",
"Deque",
"inotify",
"libfuzzer",
"mullr",
"preppers",
"Seedable",
"SIGKILL",
"spinlock",
"SPSC",
"sysinfo",
"unmove",
"xorshift",
"yaque"
],
"cSpell.ignoreWords": [
"toctou"
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
[package]
name = "yaque"
version = "0.6.4"
authors = ["Pedro Arruda <pedrobittencourt3@protonmail.ch>"]
edition = "2018"
version = "0.6.5"
authors = ["Pedro Arruda <pedrobittencourt3@gmail.com>"]
edition = "2021"
description = "Yaque is yet another disk-backed persistent queue for Rust"
license = "Apache-2.0"
homepage = "https://github.com/tokahuke/yaque"
repository = "https://github.com/tokahuke/yaque"
keywords = ["queue", "persistent", "disk", "data-structures"]
readme = "readme.md"
exclude = ["data"]
resolver = "2"

[features]
default = ["recovery", "log-trace"]
Expand Down
32 changes: 21 additions & 11 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ state.
## Version 0.4.3:

* Small improvements to docs.
* Unittests will perform correctly after a previous run was interrupted by
* Unit tests will perform correctly after a previous run was interrupted by
CTRL+C.
* Created the `recovery::recover` function for a "full course" queue recover in
a single command.
Expand All @@ -60,7 +60,7 @@ a single command.

* `recv_timeout` and `recv_batch_timeout` to allow receiving with timeout.
* `recv_batch` is "atomic in an asynchronous context".
* Now, unlock works even if the process respawns with the same PID.
* Now, unlock works even if the process re-spawns with the same PID.
* Recovery of queue works with two modes: replay, which is the behavior of
`recovery::recover`, and "with loss", that discards the bottom segment entirely
(a bit extreme, but we will work on that). Use the
Expand All @@ -78,7 +78,7 @@ is only supported within the same _minor_ version.

## Version 0.5.1:

* Corrected a bug on the `send_metadata` inferrence thingy. Now, all tests are passing.
* Corrected a bug on the `send_metadata` inference thingy. Now, all tests are passing.

## Version 0.6.0:

Expand All @@ -89,7 +89,7 @@ used to ensure "legacy mode", where no parity checking took place. Now, it is a
bit all on itself. This leads to much more robust error detection (up to 2bits,
guaranteed, but you can get lucky with more!).
* Now you can control the sender more finely with `SenderBuilder`. This includes
chosing a segment size that fits your needs and chosing the "maximum size" for the
choosing a segment size that fits your needs and choking the "maximum size" for the
queue.
* And yes, now you can control maximum queue size so that `yaque` doesn't blow up your
hard drive. This means that some major API changes took place:
Expand All @@ -113,14 +113,14 @@ transaction. I could not verify if this invariant always holds. Anyway, there is
assertion in the code to avoid the worse. If you find such a situation, please fill an
issue.
* Dropping the Receiver forced the `state` to be saved, not the `initial_state` (the
state at the begining of the current transaction). Now, `Drop` calls `Receiver::save`
state at the beginning of the current transaction). Now, `Drop` calls `Receiver::save`
so that the behavior will be always consistent.
* We have a backup strategy for saving the queue! It invlves no asyc stuff, so it will
only be triggered at the end of a transction. The current criterion is: save at every
250 items read or every 350ms, whichever comes first. This should dimiinish greatly
* We have a backup strategy for saving the queue! It involves no async stuff, so it will
only be triggered at the end of a transaction. The current criterion is: save at every
250 items read or every 350ms, whichever comes first. This should diminish greatly
the necessity for external control of the save mechanism.
* Created a `ReceiverBuilder` to allow people to costumize the way the queue is saved.
This includes altering the above defaults or disabling queue saving altogther.
* Created a `ReceiverBuilder` to allow people to customize the way the queue is saved.
This includes altering the above defaults or disabling queue saving altogether.


## Version 0.6.2:
Expand All @@ -143,8 +143,18 @@ leading to clearer code.

## Version 0.6.4:

* Update dependencies and fix vulerabilities (dependabot).
* Update dependencies and fix vulnerabilities (dependabot).

### Contributors:

* [@grant0417](https://github.com/grant0417)


## Version 0.6.5:

* Fix `try_recv_batch` misbehavior (panic) when running with [libfuzzer](https://llvm.org/docs/LibFuzzer.html).


### Contributors

* [@mullr](https://github.com/mullr)
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::fmt;
use std::io;
use std::path::PathBuf;

/// An error that occurrs when trying to send into a queue.
/// An error that occurs when trying to send into a queue.
#[derive(Debug)]
pub enum TrySendError<T> {
/// An underlying IO error occurred.
Expand Down
2 changes: 1 addition & 1 deletion src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//! See [this section in Wikipedia](https://en.wikipedia.org/wiki/Hamming_code#General_algorithm).

/// Parity check mask.
const P0: u32 = 0b11_1111_1111_1111_1111_1111_1111; // just a regular parity chech (not Hamming!)
const P0: u32 = 0b11_1111_1111_1111_1111_1111_1111; // just a regular parity check (not Hamming!)
/// Hamming first parity bit mask.
const P1: u32 = 0b10_1010_1010_1010_1101_0101_1011;
/// Hamming second parity bit mask.
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@
//!
//! Unfortunately, there are also times when you get `aborted` or `killed`. These
//! signals cannot be handled by any library whatsoever. When this happens, not
//! everything is lost yet. We provied a whole module, [`recovery`],
//! everything is lost yet. We provide a whole module, [`recovery`],
//! to aid you in automatic queue recovery. Please check the module for the
//! specific function names. From an architectural perspective, we offer two
//! different approaches to queue recovery, which may be suitable to different
Expand All @@ -169,7 +169,7 @@
//! 2. Recover with loss: we can also reconstruct an _upper bound_ for the
//! actual state of the queue: the bottom of the second smallest segment in
//! the queue. In this case, the smallest segment is simply erased and the
//! receiver caries on as if nothing has happened. If replays are intollerable,
//! receiver caries on as if nothing has happened. If replays are intolerable,
//! but some data loss is, this might be the right alternative for you. You can
//! limit data loss by constraining the segment size, configuring this option on
//! [`SenderBuilder`].
Expand Down
14 changes: 7 additions & 7 deletions src/mutex.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! A persistent mutex implementation using the atomicity of [`OpenOptions::create_new`]
//!
//! Please note that this `Mutex` just really works if other processeses in your system
//! Please note that this `Mutex` just really works if other processes in your system
//! are willing to "play nice" with you. In most systems (Unix-like), locks are mostly
//! advisory.

Expand All @@ -18,11 +18,11 @@ pub struct Mutex {

impl Mutex {
/// Opens a new mutex, given the path for a folder in which the mutex will be mounted.
/// This will create a new floder if one does not exist yet.
/// This will create a new folder if one does not exist yet.
///
/// # Errors
///
/// This function fails if it cannot create the folder which is giong to contain the
/// This function fails if it cannot create the folder which is going to contain the
/// mutex.
pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Mutex> {
fs::create_dir_all(&path)?;
Expand All @@ -31,7 +31,7 @@ impl Mutex {
})
}

/// Locks this mutex, awaitng for it to unlock if it is locked.
/// Locks this mutex, awaiting for it to unlock if it is locked.
pub async fn lock(&self) -> io::Result<MutexGuard> {
let file_guard = FileGuard::lock(self.path.join("lock")).await?;
let file = OpenOptions::new()
Expand All @@ -46,7 +46,7 @@ impl Mutex {
})
}

/// Tries to lock this mutex, returnin `None` if it is locked.
/// Tries to lock this mutex, returning `None` if it is locked.
pub fn try_lock(&self) -> io::Result<Option<MutexGuard>> {
let file_guard = FileGuard::try_lock(self.path.join("lock"))?;

Expand Down Expand Up @@ -75,13 +75,13 @@ pub struct MutexGuard {
}

impl MutexGuard {
/// Reas all the contents of the content file into a vector.
/// Reads all the contents of the content file into a vector.
pub fn read(&self) -> io::Result<Vec<u8>> {
(&self.file).seek(io::SeekFrom::Start(0))?;
(&self.file).bytes().collect::<io::Result<Vec<_>>>()
}

/// Writes some data to the content file, ovewritting all the previous content.
/// Writes some data to the content file, overwriting all the previous content.
pub fn write<D: AsRef<[u8]>>(&self, data: D) -> io::Result<()> {
(&self.file).seek(io::SeekFrom::Start(0))?;
self.file.set_len(0)?;
Expand Down
2 changes: 1 addition & 1 deletion src/queue/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl QueueIter {
}

/// Puts the queue in another position in another segment. This forcibly
/// discards the old tail follower and fethces a fresh new one, so be
/// discards the old tail follower and fetches a fresh new one, so be
/// careful.
fn advance_segment(&mut self) -> io::Result<()> {
let current_segment = self.state.segment;
Expand Down
6 changes: 3 additions & 3 deletions src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ mod tests {
}

#[test]
fn test_recv_timeout_dealyed() {
fn test_recv_timeout_delayed() {
futures::executor::block_on(async move {
let (mut sender, mut receiver) = channel("data/recv-timeout-delayed").unwrap();

Expand Down Expand Up @@ -407,7 +407,7 @@ mod tests {
}

#[test]
fn test_recv_batch_timeout_dealyed_1() {
fn test_recv_batch_timeout_delayed_1() {
futures::executor::block_on(async move {
let (mut sender, mut receiver) = channel("data/recv-batch-timeout-delayed-1").unwrap();

Expand All @@ -431,7 +431,7 @@ mod tests {
}

#[test]
fn test_recv_batch_timeout_dealyed_2() {
fn test_recv_batch_timeout_delayed_2() {
futures::executor::block_on(async move {
let (mut sender, mut receiver) = channel("data/recv-batch-timeout-delayed-2").unwrap();

Expand Down
20 changes: 10 additions & 10 deletions src/queue/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl ReceiverBuilder {
/// # Note:
///
/// This policy is enforced _synchronously_. This means that there is no
/// asynchronous behavior involved (i.e. timers). This condidition will only
/// asynchronous behavior involved (i.e. timers). This condition will only
/// be checked when a new element is pushed to the queue.
pub fn save_every(mut self, duration: Option<Duration>) -> ReceiverBuilder {
self.save_every = duration;
Expand Down Expand Up @@ -151,14 +151,14 @@ pub struct Receiver {
maybe_header: Option<[u8; 4]>,
/// The current queue state.
state: QueueState,
/// The queue state as it was in the begining of the current transaction.
/// The queue state as it was in the beginning of the current transaction.
initial_state: QueueState,
/// The queue state saver/loader.
persistence: QueueStatePersistence,
/// Use this queue to buffer elements and provide "atomicity in an
/// asynchronous context". We need to backup the state of the queue before
/// the read so as to restore it as the "initial state" (the _actual_ state
/// of the queue) at the end of a transaction. Otherwise, dataloss would
/// of the queue) at the end of a transaction. Otherwise, data loss would
/// occur.
read_and_unused: VecDeque<Vec<u8>>,
/// Save the queue every n operations
Expand Down Expand Up @@ -196,7 +196,7 @@ impl Receiver {
}

/// Puts the queue in another position in another segment. This forcibly
/// discards the old tail follower and fethces a fresh new one, so be
/// discards the old tail follower and fetches a fresh new one, so be
/// careful.
fn go_to(&mut self, state: QueueState) -> io::Result<()> {
let different_segment = self.state.segment != state.segment;
Expand Down Expand Up @@ -313,10 +313,10 @@ impl Receiver {
/// Reads one element from the queue, inevitably advancing the file reader.
/// Instead of returning the element, this function puts it in the "read and
/// unused" queue to be used later. This enables us to construct "atomic in
/// async context" guarantees for the higher level functions. The ideia is to
/// async context" guarantees for the higher level functions. The idea is to
/// _drain the queue_ only after the last `.await` in the block.
///
/// This operation is also itlsef atomic. If the returned future is not
/// This operation is also itself atomic. If the returned future is not
/// polled to completion, as, e.g., when calling `select`, the operation
/// will count as not done.
async fn read_one(&mut self) -> io::Result<()> {
Expand Down Expand Up @@ -369,7 +369,7 @@ impl Receiver {
fn drain(&mut self, n: usize) -> Vec<Vec<u8>> {
let mut data = Vec::with_capacity(n);

// (careful! need to check if read something to avoid an eroneous POP
// (careful! need to check if read something to avoid an erroneous POP
// from the queue)
if n > 0 {
while let Some(element) = self.read_and_unused.pop_front() {
Expand All @@ -386,7 +386,7 @@ impl Receiver {

/// Saves the receiver queue state. You do not need to use method in most
/// circumstances, since it is automatically done on drop (yes, it will be
/// called eve if your thread panics). However, you cawn use this function to
/// called eve if your thread panics). However, you can use this function to
///
/// 1. Make periodical backups. Use an external timer implementation for this.
///
Expand All @@ -396,7 +396,7 @@ impl Receiver {
/// implemented this way because no errors are allowed to propagate on drop
/// and panicking will abort the program if drop is called during a panic.
pub fn save(&mut self) -> io::Result<()> {
self.persistence.save(&self.initial_state) // this aviods saving an in-flight
self.persistence.save(&self.initial_state) // this avoids saving an in-flight
}

fn maybe_save(&mut self) -> io::Result<()> {
Expand Down Expand Up @@ -550,7 +550,7 @@ impl Receiver {

/// Tries to remove a number of elements from the queue until a given future
/// finished. The values taken from the queue will be the values that were
/// available durng the whole execution of the future and thus less than `n`
/// available during the whole execution of the future and thus less than `n`
/// elements might be returned. The returned items are wrapped in a guard
/// that will only commit state changes to the queue when dropped.
///
Expand Down
Loading