From 1a6aa954b115721c521330e1d376772fd480aac8 Mon Sep 17 00:00:00 2001 From: Pedro Arruda Date: Thu, 11 Mar 2021 22:50:05 -0300 Subject: [PATCH] final touches --- changelog.md | 3 +++ src/queue/receiver.rs | 11 ++++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/changelog.md b/changelog.md index b12f20a..921d609 100644 --- a/changelog.md +++ b/changelog.md @@ -101,3 +101,6 @@ hard drive. This means that some major API changes took place: * You can also just _try_ to receive items, without the need to `.await` anything. For each fo the receiving methods `recv`, `recv_batch` and `recv_until` you now have the try versions: `try_recv`, `try_recv_batch`, `try_recv_until`. +* Solved a bug regarding the rollback of batch transactions when crossing over a segment. +Older versions will do a complete mess out of this. The side effect: `commit` now returns +a `Result`, which has to be treated. diff --git a/src/queue/receiver.rs b/src/queue/receiver.rs index f360bcc..cd7366e 100644 --- a/src/queue/receiver.rs +++ b/src/queue/receiver.rs @@ -42,12 +42,19 @@ pub(crate) async fn acquire_recv_lock>(base: P) -> io::Result, + /// The current queue state. state: QueueState, + /// The queue state as it was in the begining of the current transaction. initial_state: QueueState, - base: PathBuf, + /// The queue state saver/loader. persistence: QueueStatePersistence, /// Use this queue to buffer elements and provide "atomicity in an /// asynchronous context". @@ -501,6 +508,7 @@ impl Receiver { predicate(None).await; // Poor man's do-while (aka. until) + // Strategy: fill `read_and_unused` to the brim and then drain at the end. loop { // Need to fetch from disk? if n_read == self.read_and_unused.len() { @@ -518,6 +526,7 @@ impl Receiver { // And now, drain! let data = self.drain(n_read); + Ok(RecvGuard { receiver: self, item: Some(data),