diff --git a/.vscode/settings.json b/.vscode/settings.json index 3d827a7..b9a8b21 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,6 +1,19 @@ { "cSpell.words": [ - "sysinfo" + "Asbt", + "Deque", + "inotify", + "libfuzzer", + "mullr", + "preppers", + "Seedable", + "SIGKILL", + "spinlock", + "SPSC", + "sysinfo", + "unmove", + "xorshift", + "yaque" ], "cSpell.ignoreWords": [ "toctou" diff --git a/Cargo.lock b/Cargo.lock index b269ba5..354c553 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -634,7 +634,7 @@ checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd" [[package]] name = "yaque" -version = "0.6.4" +version = "0.6.5" dependencies = [ "ctor", "futures", diff --git a/Cargo.toml b/Cargo.toml index 65987f6..8c39894 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,8 +1,8 @@ [package] name = "yaque" -version = "0.6.4" -authors = ["Pedro Arruda "] -edition = "2018" +version = "0.6.5" +authors = ["Pedro Arruda "] +edition = "2021" description = "Yaque is yet another disk-backed persistent queue for Rust" license = "Apache-2.0" homepage = "https://github.com/tokahuke/yaque" @@ -10,7 +10,6 @@ repository = "https://github.com/tokahuke/yaque" keywords = ["queue", "persistent", "disk", "data-structures"] readme = "readme.md" exclude = ["data"] -resolver = "2" [features] default = ["recovery", "log-trace"] diff --git a/changelog.md b/changelog.md index 568c6e5..6e44258 100644 --- a/changelog.md +++ b/changelog.md @@ -51,7 +51,7 @@ state. ## Version 0.4.3: * Small improvements to docs. -* Unittests will perform correctly after a previous run was interrupted by +* Unit tests will perform correctly after a previous run was interrupted by CTRL+C. * Created the `recovery::recover` function for a "full course" queue recover in a single command. @@ -60,7 +60,7 @@ a single command. * `recv_timeout` and `recv_batch_timeout` to allow receiving with timeout. * `recv_batch` is "atomic in an asynchronous context". -* Now, unlock works even if the process respawns with the same PID. +* Now, unlock works even if the process re-spawns with the same PID. * Recovery of queue works with two modes: replay, which is the behavior of `recovery::recover`, and "with loss", that discards the bottom segment entirely (a bit extreme, but we will work on that). Use the @@ -78,7 +78,7 @@ is only supported within the same _minor_ version. ## Version 0.5.1: -* Corrected a bug on the `send_metadata` inferrence thingy. Now, all tests are passing. +* Corrected a bug on the `send_metadata` inference thingy. Now, all tests are passing. ## Version 0.6.0: @@ -89,7 +89,7 @@ used to ensure "legacy mode", where no parity checking took place. Now, it is a bit all on itself. This leads to much more robust error detection (up to 2bits, guaranteed, but you can get lucky with more!). * Now you can control the sender more finely with `SenderBuilder`. This includes -chosing a segment size that fits your needs and chosing the "maximum size" for the +choosing a segment size that fits your needs and choking the "maximum size" for the queue. * And yes, now you can control maximum queue size so that `yaque` doesn't blow up your hard drive. This means that some major API changes took place: @@ -113,14 +113,14 @@ transaction. I could not verify if this invariant always holds. Anyway, there is assertion in the code to avoid the worse. If you find such a situation, please fill an issue. * Dropping the Receiver forced the `state` to be saved, not the `initial_state` (the -state at the begining of the current transaction). Now, `Drop` calls `Receiver::save` +state at the beginning of the current transaction). Now, `Drop` calls `Receiver::save` so that the behavior will be always consistent. -* We have a backup strategy for saving the queue! It invlves no asyc stuff, so it will -only be triggered at the end of a transction. The current criterion is: save at every -250 items read or every 350ms, whichever comes first. This should dimiinish greatly +* We have a backup strategy for saving the queue! It involves no async stuff, so it will +only be triggered at the end of a transaction. The current criterion is: save at every +250 items read or every 350ms, whichever comes first. This should diminish greatly the necessity for external control of the save mechanism. -* Created a `ReceiverBuilder` to allow people to costumize the way the queue is saved. -This includes altering the above defaults or disabling queue saving altogther. +* Created a `ReceiverBuilder` to allow people to customize the way the queue is saved. +This includes altering the above defaults or disabling queue saving altogether. ## Version 0.6.2: @@ -143,8 +143,18 @@ leading to clearer code. ## Version 0.6.4: -* Update dependencies and fix vulerabilities (dependabot). +* Update dependencies and fix vulnerabilities (dependabot). ### Contributors: * [@grant0417](https://github.com/grant0417) + + +## Version 0.6.5: + +* Fix `try_recv_batch` misbehavior (panic) when running with [libfuzzer](https://llvm.org/docs/LibFuzzer.html). + + +### Contributors + +* [@mullr](https://github.com/mullr) diff --git a/src/error.rs b/src/error.rs index c98e843..4242d54 100644 --- a/src/error.rs +++ b/src/error.rs @@ -4,7 +4,7 @@ use std::fmt; use std::io; use std::path::PathBuf; -/// An error that occurrs when trying to send into a queue. +/// An error that occurs when trying to send into a queue. #[derive(Debug)] pub enum TrySendError { /// An underlying IO error occurred. diff --git a/src/header.rs b/src/header.rs index 148c569..2b4061e 100644 --- a/src/header.rs +++ b/src/header.rs @@ -15,7 +15,7 @@ //! See [this section in Wikipedia](https://en.wikipedia.org/wiki/Hamming_code#General_algorithm). /// Parity check mask. -const P0: u32 = 0b11_1111_1111_1111_1111_1111_1111; // just a regular parity chech (not Hamming!) +const P0: u32 = 0b11_1111_1111_1111_1111_1111_1111; // just a regular parity check (not Hamming!) /// Hamming first parity bit mask. const P1: u32 = 0b10_1010_1010_1010_1101_0101_1011; /// Hamming second parity bit mask. diff --git a/src/lib.rs b/src/lib.rs index 4dbca52..9ea9566 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -150,7 +150,7 @@ //! //! Unfortunately, there are also times when you get `aborted` or `killed`. These //! signals cannot be handled by any library whatsoever. When this happens, not -//! everything is lost yet. We provied a whole module, [`recovery`], +//! everything is lost yet. We provide a whole module, [`recovery`], //! to aid you in automatic queue recovery. Please check the module for the //! specific function names. From an architectural perspective, we offer two //! different approaches to queue recovery, which may be suitable to different @@ -169,7 +169,7 @@ //! 2. Recover with loss: we can also reconstruct an _upper bound_ for the //! actual state of the queue: the bottom of the second smallest segment in //! the queue. In this case, the smallest segment is simply erased and the -//! receiver caries on as if nothing has happened. If replays are intollerable, +//! receiver caries on as if nothing has happened. If replays are intolerable, //! but some data loss is, this might be the right alternative for you. You can //! limit data loss by constraining the segment size, configuring this option on //! [`SenderBuilder`]. diff --git a/src/mutex.rs b/src/mutex.rs index cd0a469..7b7582c 100644 --- a/src/mutex.rs +++ b/src/mutex.rs @@ -1,6 +1,6 @@ //! A persistent mutex implementation using the atomicity of [`OpenOptions::create_new`] //! -//! Please note that this `Mutex` just really works if other processeses in your system +//! Please note that this `Mutex` just really works if other processes in your system //! are willing to "play nice" with you. In most systems (Unix-like), locks are mostly //! advisory. @@ -18,11 +18,11 @@ pub struct Mutex { impl Mutex { /// Opens a new mutex, given the path for a folder in which the mutex will be mounted. - /// This will create a new floder if one does not exist yet. + /// This will create a new folder if one does not exist yet. /// /// # Errors /// - /// This function fails if it cannot create the folder which is giong to contain the + /// This function fails if it cannot create the folder which is going to contain the /// mutex. pub fn open>(path: P) -> io::Result { fs::create_dir_all(&path)?; @@ -31,7 +31,7 @@ impl Mutex { }) } - /// Locks this mutex, awaitng for it to unlock if it is locked. + /// Locks this mutex, awaiting for it to unlock if it is locked. pub async fn lock(&self) -> io::Result { let file_guard = FileGuard::lock(self.path.join("lock")).await?; let file = OpenOptions::new() @@ -46,7 +46,7 @@ impl Mutex { }) } - /// Tries to lock this mutex, returnin `None` if it is locked. + /// Tries to lock this mutex, returning `None` if it is locked. pub fn try_lock(&self) -> io::Result> { let file_guard = FileGuard::try_lock(self.path.join("lock"))?; @@ -75,13 +75,13 @@ pub struct MutexGuard { } impl MutexGuard { - /// Reas all the contents of the content file into a vector. + /// Reads all the contents of the content file into a vector. pub fn read(&self) -> io::Result> { (&self.file).seek(io::SeekFrom::Start(0))?; (&self.file).bytes().collect::>>() } - /// Writes some data to the content file, ovewritting all the previous content. + /// Writes some data to the content file, overwriting all the previous content. pub fn write>(&self, data: D) -> io::Result<()> { (&self.file).seek(io::SeekFrom::Start(0))?; self.file.set_len(0)?; diff --git a/src/queue/iter.rs b/src/queue/iter.rs index ddd02c6..3d1d8d7 100644 --- a/src/queue/iter.rs +++ b/src/queue/iter.rs @@ -76,7 +76,7 @@ impl QueueIter { } /// Puts the queue in another position in another segment. This forcibly - /// discards the old tail follower and fethces a fresh new one, so be + /// discards the old tail follower and fetches a fresh new one, so be /// careful. fn advance_segment(&mut self) -> io::Result<()> { let current_segment = self.state.segment; diff --git a/src/queue/mod.rs b/src/queue/mod.rs index ac5e383..e2e4a56 100644 --- a/src/queue/mod.rs +++ b/src/queue/mod.rs @@ -353,7 +353,7 @@ mod tests { } #[test] - fn test_recv_timeout_dealyed() { + fn test_recv_timeout_delayed() { futures::executor::block_on(async move { let (mut sender, mut receiver) = channel("data/recv-timeout-delayed").unwrap(); @@ -407,7 +407,7 @@ mod tests { } #[test] - fn test_recv_batch_timeout_dealyed_1() { + fn test_recv_batch_timeout_delayed_1() { futures::executor::block_on(async move { let (mut sender, mut receiver) = channel("data/recv-batch-timeout-delayed-1").unwrap(); @@ -431,7 +431,7 @@ mod tests { } #[test] - fn test_recv_batch_timeout_dealyed_2() { + fn test_recv_batch_timeout_delayed_2() { futures::executor::block_on(async move { let (mut sender, mut receiver) = channel("data/recv-batch-timeout-delayed-2").unwrap(); diff --git a/src/queue/receiver.rs b/src/queue/receiver.rs index a29a5d3..0c0cfff 100644 --- a/src/queue/receiver.rs +++ b/src/queue/receiver.rs @@ -79,7 +79,7 @@ impl ReceiverBuilder { /// # Note: /// /// This policy is enforced _synchronously_. This means that there is no - /// asynchronous behavior involved (i.e. timers). This condidition will only + /// asynchronous behavior involved (i.e. timers). This condition will only /// be checked when a new element is pushed to the queue. pub fn save_every(mut self, duration: Option) -> ReceiverBuilder { self.save_every = duration; @@ -151,14 +151,14 @@ pub struct Receiver { maybe_header: Option<[u8; 4]>, /// The current queue state. state: QueueState, - /// The queue state as it was in the begining of the current transaction. + /// The queue state as it was in the beginning of the current transaction. initial_state: QueueState, /// The queue state saver/loader. persistence: QueueStatePersistence, /// Use this queue to buffer elements and provide "atomicity in an /// asynchronous context". We need to backup the state of the queue before /// the read so as to restore it as the "initial state" (the _actual_ state - /// of the queue) at the end of a transaction. Otherwise, dataloss would + /// of the queue) at the end of a transaction. Otherwise, data loss would /// occur. read_and_unused: VecDeque>, /// Save the queue every n operations @@ -196,7 +196,7 @@ impl Receiver { } /// Puts the queue in another position in another segment. This forcibly - /// discards the old tail follower and fethces a fresh new one, so be + /// discards the old tail follower and fetches a fresh new one, so be /// careful. fn go_to(&mut self, state: QueueState) -> io::Result<()> { let different_segment = self.state.segment != state.segment; @@ -313,10 +313,10 @@ impl Receiver { /// Reads one element from the queue, inevitably advancing the file reader. /// Instead of returning the element, this function puts it in the "read and /// unused" queue to be used later. This enables us to construct "atomic in - /// async context" guarantees for the higher level functions. The ideia is to + /// async context" guarantees for the higher level functions. The idea is to /// _drain the queue_ only after the last `.await` in the block. /// - /// This operation is also itlsef atomic. If the returned future is not + /// This operation is also itself atomic. If the returned future is not /// polled to completion, as, e.g., when calling `select`, the operation /// will count as not done. async fn read_one(&mut self) -> io::Result<()> { @@ -369,7 +369,7 @@ impl Receiver { fn drain(&mut self, n: usize) -> Vec> { let mut data = Vec::with_capacity(n); - // (careful! need to check if read something to avoid an eroneous POP + // (careful! need to check if read something to avoid an erroneous POP // from the queue) if n > 0 { while let Some(element) = self.read_and_unused.pop_front() { @@ -386,7 +386,7 @@ impl Receiver { /// Saves the receiver queue state. You do not need to use method in most /// circumstances, since it is automatically done on drop (yes, it will be - /// called eve if your thread panics). However, you cawn use this function to + /// called eve if your thread panics). However, you can use this function to /// /// 1. Make periodical backups. Use an external timer implementation for this. /// @@ -396,7 +396,7 @@ impl Receiver { /// implemented this way because no errors are allowed to propagate on drop /// and panicking will abort the program if drop is called during a panic. pub fn save(&mut self) -> io::Result<()> { - self.persistence.save(&self.initial_state) // this aviods saving an in-flight + self.persistence.save(&self.initial_state) // this avoids saving an in-flight } fn maybe_save(&mut self) -> io::Result<()> { @@ -550,7 +550,7 @@ impl Receiver { /// Tries to remove a number of elements from the queue until a given future /// finished. The values taken from the queue will be the values that were - /// available durng the whole execution of the future and thus less than `n` + /// available during the whole execution of the future and thus less than `n` /// elements might be returned. The returned items are wrapped in a guard /// that will only commit state changes to the queue when dropped. /// diff --git a/src/queue/sender.rs b/src/queue/sender.rs index 30ee8d4..e283bd7 100644 --- a/src/queue/sender.rs +++ b/src/queue/sender.rs @@ -62,10 +62,10 @@ pub(crate) fn get_queue_size>(base: P) -> io::Result { } /// A builder for the sender side of the queue. Use this if you want to have fine-grained control -/// over the configuration of the queue. Most defaults sould be ok of most applications. +/// over the configuration of the queue. Most defaults should be ok of most applications. pub struct SenderBuilder { /// The segment size in bytes that will trigger a new segment to be created. Segments an be - /// bigger than this to accomodate the last element, but nothing beyond that (each segment + /// bigger than this to accommodate the last element, but nothing beyond that (each segment /// must store at least one element). /// /// Default value: 4MB @@ -73,11 +73,11 @@ pub struct SenderBuilder { /// The queue size that will block the sender from creating a new segment (until the receiver /// catches up, deleting old segments). The queue can get bigger than that, but only to - /// accomodate the last segment (the queue must have at least one segment). Set this to `None` + /// accommodate the last segment (the queue must have at least one segment). Set this to `None` /// to create an unbounded queue. /// /// This value will be ignored if the queue has only one segment, since the queue would - /// deadlock otherwise. It is recomended that you set `max_queue_size >> segment_size`. + /// deadlock otherwise. It is recommended that you set `max_queue_size >> segment_size`. /// /// Default value: None max_queue_size: Option, @@ -99,7 +99,7 @@ impl SenderBuilder { } /// The segment size in bytes that will trigger a new segment to be created. Segments an be - /// bigger than this to accomodate the last element, but nothing beyond that (each segment + /// bigger than this to accommodate the last element, but nothing beyond that (each segment /// must store at least one element). /// /// Default value: `4 * 1024 * 1024`, or 4MB. @@ -115,11 +115,11 @@ impl SenderBuilder { /// The queue size that will block the sender from creating a new segment (until the receiver /// catches up, deleting old segments). The queue can get bigger than that, but only to - /// accomodate the last segment (the queue must have at least one segment). Set this to `None` + /// accommodate the last segment (the queue must have at least one segment). Set this to `None` /// to create an unbounded queue. /// /// This value will be ignored if the queue has only one segment, since the queue would - /// deadlock otherwise. It is recomended that you set `max_queue_size >> segment_size`. + /// deadlock otherwise. It is recommended that you set `max_queue_size >> segment_size`. /// /// Default value: `None` /// @@ -149,7 +149,7 @@ impl SenderBuilder { // Versioning stuff (this should be lightning-fast. Therefore, shameless block): check_queue_version(base.as_ref())?; - // Acquire lock and guess statestate: + // Acquire lock and guess state: let file_guard = try_acquire_send_lock(base.as_ref())?; let state = QueueState::for_send_metadata(base.as_ref())?; @@ -185,7 +185,7 @@ pub struct Sender { _file_guard: FileGuard, file: io::BufWriter, state: QueueState, - deletion_stream: Option, // lazy inited! + deletion_stream: Option, // lazy initiated! base: PathBuf, } @@ -210,7 +210,7 @@ impl Sender { /// /// 2. Handle possible IO errors in sending. The `drop` implementation will /// ignore (but log) any io errors, which may lead to data loss in an - /// unreliable filesystem. It was implmemented this way because no errors + /// unreliable filesystem. It was implemented this way because no errors /// are allowed to propagate on drop and panicking will abort the program if /// drop is called during a panic. #[deprecated( @@ -283,7 +283,7 @@ impl Sender { // If so, create a new file, if you are able to: if !self.try_cap_off_and_move()? { log::trace!( - "could not capp off and move. The queue `{:?}` is full", + "could not cap off and move. The queue `{:?}` is full", self.base ); diff --git a/src/recovery.rs b/src/recovery.rs index 86493e8..8501862 100644 --- a/src/recovery.rs +++ b/src/recovery.rs @@ -18,7 +18,7 @@ //! 2. Recover with loss: we can also reconstruct an _upper bound_ for the //! actual state of the queue: the bottom of the second smallest segment in //! the queue. In this case, the smallest segment is simply erased and the -//! receiver caries on as if nothing has happened. If replays are intollerable, +//! receiver caries on as if nothing has happened. If replays are intolerable, //! but some data loss is, this might be the right alternative for you. You can //! limit data loss by constraining the segment size, configuring this option on //! [`crate::SenderBuilder`]. @@ -277,7 +277,7 @@ pub fn guess_recv_metadata_with_loss>(base: P) -> io::Result<()> /// # Panics /// /// This function panics if there is a file in the queue folder with extension -/// `.q` whose name is not an integer, such as `foo.q` or if the lockfiles for +/// `.q` whose name is not an integer, such as `foo.q` or if the lock files for /// either sending or receiving cannot be parsed. pub fn recover>(base: P) -> io::Result<()> { unlock_queue(base.as_ref())?; @@ -301,7 +301,7 @@ pub fn recover>(base: P) -> io::Result<()> { /// # Panics /// /// This function panics if there is a file in the queue folder with extension -/// `.q` whose name is not an integer, such as `foo.q` or if the lockfiles for +/// `.q` whose name is not an integer, such as `foo.q` or if the lock files for /// either sending or receiving cannot be parsed. pub fn recover_with_loss>(base: P) -> io::Result<()> { unlock_queue(base.as_ref())?; diff --git a/src/sync.rs b/src/sync.rs index 0331ee9..20412b8 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -14,8 +14,8 @@ use std::task::{Context, Poll, Waker}; use crate::watcher::{file_removal_watcher, file_watcher, removal_watcher}; lazy_static! { - /// A unique token to differentiate between processes wich might have the - /// same PID, but are otherwise differente instances. + /// A unique token to differentiate between processes which might have the + /// same PID, but are otherwise different instances. pub(crate) static ref UNIQUE_PROCESS_TOKEN: u64 = rand::thread_rng().gen(); } @@ -171,7 +171,7 @@ impl TailFollower { /// /// This function will panic if unable to seek while rewinding to recover /// from an incomplete operation. This may change in the future. - #[must_use = "futures do nothing untill polled"] + #[must_use = "futures do nothing until polled"] pub fn read_exact<'a>(&'a mut self, buffer: &'a mut [u8]) -> ReadExact<'a> { // Rewind if last invocation was not polled to conclusion: if self.read_and_unused != 0 { diff --git a/src/version.rs b/src/version.rs index 406b7ef..e2a3546 100644 --- a/src/version.rs +++ b/src/version.rs @@ -63,7 +63,8 @@ use crate::mutex::Mutex; /// Gets the version of the queue, or sets it if there is not one and then checks if the version is /// compatible with the current loaded version of `yaque`. It uses a mutex to implement atomicity /// (yes, we have had some race conditions during testing), but, for the sake of API compatibility -/// in [`crate::Sender::open`] and [`crate::Receiver::open`], it performs a spinlock, instead of `.await`ing. +/// in [`crate::Sender::open`] and [`crate::Receiver::open`], it performs a spinlock, instead of +/// `.await`ing. pub fn check_queue_version>(base: P) -> io::Result<()> { let mutex = Mutex::open(base.as_ref().join("version"))?;