Skip to content

Commit

Permalink
Rename crate, add error module, add Cargo.toml
Browse files Browse the repository at this point in the history
  • Loading branch information
mgeier committed Oct 28, 2020
1 parent be8f3a4 commit e92831f
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 38 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Cargo.lock
/target/
20 changes: 20 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "rtrb"
version = "0.0.0"
authors = [
"Stjepan Glavina <[email protected]>",
"Matthias Geier <[email protected]>",
]
repository = "https://github.com/mgeier/rtrb"
description = "A realtime-safe single-producer single-consumer ring buffer"
categories = ["concurrency", "data-structures"]
keywords = ["lock-free", "wait-free", "spsc", "queue"]
license = "MIT OR Apache-2.0"
edition = "2018"

[dependencies]
cache-padded = "1.1"

[dev-dependencies]
crossbeam-utils = "0.8"
rand = "0.7.3"
11 changes: 11 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// TODO: Display impls

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum PopError {
Empty,
}

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum PushError<T> {
Full(T),
}
50 changes: 22 additions & 28 deletions src/spsc.rs → src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
//! # Examples
//!
//! ```
//! use crossbeam_queue::spsc;
//!
//! let (p, c) = spsc::new(2);
//! let (p, c) = rtrb::new(2);
//!
//! assert!(p.push(1).is_ok());
//! assert!(p.push(2).is_ok());
Expand All @@ -23,9 +21,11 @@ use std::mem;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use crossbeam_utils::CachePadded;
use cache_padded::CachePadded;

mod error;

use err::{PopError, PushError};
pub use error::{PopError, PushError};

/// The inner representation of a single-producer single-consumer queue.
struct Inner<T> {
Expand Down Expand Up @@ -118,9 +118,7 @@ impl<T> Drop for Inner<T> {
/// # Examples
///
/// ```
/// use crossbeam_queue::spsc;
///
/// let (p, c) = spsc::new::<i32>(100);
/// let (p, c) = rtrb::new::<i32>(100);
/// ```
pub fn new<T>(cap: usize) -> (Producer<T>, Consumer<T>) {
assert!(cap > 0, "capacity must be non-zero");
Expand Down Expand Up @@ -161,12 +159,12 @@ pub fn new<T>(cap: usize) -> (Producer<T>, Consumer<T>) {
/// # Examples
///
/// ```
/// use crossbeam_queue::{spsc, PushError};
/// use rtrb::PushError;
///
/// let (p, c) = spsc::new::<i32>(1);
/// let (p, c) = rtrb::new::<i32>(1);
///
/// assert_eq!(p.push(10), Ok(()));
/// assert_eq!(p.push(20), Err(PushError(20)));
/// assert_eq!(p.push(20), Err(PushError::Full(20)));
/// ```
pub struct Producer<T> {
/// The inner representation of the queue.
Expand All @@ -193,12 +191,12 @@ impl<T> Producer<T> {
/// # Examples
///
/// ```
/// use crossbeam_queue::{spsc, PushError};
/// use rtrb::PushError;
///
/// let (p, c) = spsc::new(1);
/// let (p, c) = rtrb::new(1);
///
/// assert_eq!(p.push(10), Ok(()));
/// assert_eq!(p.push(20), Err(PushError(20)));
/// assert_eq!(p.push(20), Err(PushError::Full(20)));
/// ```
pub fn push(&self, value: T) -> Result<(), PushError<T>> {
let mut head = self.head.get();
Expand All @@ -212,7 +210,7 @@ impl<T> Producer<T> {

// Is the queue *really* full?
if self.inner.distance(head, tail) == self.inner.cap {
return Err(PushError(value));
return Err(PushError::Full(value));
}
}

Expand All @@ -234,9 +232,7 @@ impl<T> Producer<T> {
/// # Examples
///
/// ```
/// use crossbeam_queue::spsc;
///
/// let (p, c) = spsc::new::<i32>(100);
/// let (p, c) = rtrb::new::<i32>(100);
///
/// assert_eq!(p.capacity(), 100);
/// ```
Expand All @@ -256,13 +252,13 @@ impl<T> fmt::Debug for Producer<T> {
/// # Examples
///
/// ```
/// use crossbeam_queue::{spsc, PopError};
/// use rtrb::PopError;
///
/// let (p, c) = spsc::new(1);
/// let (p, c) = rtrb::new(1);
/// assert_eq!(p.push(10), Ok(()));
///
/// assert_eq!(c.pop(), Ok(10));
/// assert_eq!(c.pop(), Err(PopError));
/// assert_eq!(c.pop(), Err(PopError::Empty));
/// ```
pub struct Consumer<T> {
/// The inner representation of the queue.
Expand All @@ -289,13 +285,13 @@ impl<T> Consumer<T> {
/// # Examples
///
/// ```
/// use crossbeam_queue::{spsc, PopError};
/// use rtrb::PopError;
///
/// let (p, c) = spsc::new(1);
/// let (p, c) = rtrb::new(1);
/// assert_eq!(p.push(10), Ok(()));
///
/// assert_eq!(c.pop(), Ok(10));
/// assert_eq!(c.pop(), Err(PopError));
/// assert_eq!(c.pop(), Err(PopError::Empty));
/// ```
pub fn pop(&self) -> Result<T, PopError> {
let mut head = self.head.get();
Expand All @@ -309,7 +305,7 @@ impl<T> Consumer<T> {

// Is the queue *really* empty?
if head == tail {
return Err(PopError);
return Err(PopError::Empty);
}
}

Expand All @@ -329,9 +325,7 @@ impl<T> Consumer<T> {
/// # Examples
///
/// ```
/// use crossbeam_queue::spsc;
///
/// let (p, c) = spsc::new::<i32>(100);
/// let (p, c) = rtrb::new::<i32>(100);
///
/// assert_eq!(c.capacity(), 100);
/// ```
Expand Down
15 changes: 5 additions & 10 deletions tests/spsc.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
extern crate crossbeam_queue;
extern crate crossbeam_utils;
extern crate rand;

use std::sync::atomic::{AtomicUsize, Ordering};

use crossbeam_queue::spsc;
use crossbeam_utils::thread::scope;
use rand::{thread_rng, Rng};

#[test]
fn smoke() {
let (p, c) = spsc::new(1);
let (p, c) = rtrb::new(1);

p.push(7).unwrap();
assert_eq!(c.pop(), Ok(7));
Expand All @@ -23,7 +18,7 @@ fn smoke() {
#[test]
fn capacity() {
for i in 1..10 {
let (p, c) = spsc::new::<i32>(i);
let (p, c) = rtrb::new::<i32>(i);
assert_eq!(p.capacity(), i);
assert_eq!(c.capacity(), i);
}
Expand All @@ -32,14 +27,14 @@ fn capacity() {
#[test]
#[should_panic(expected = "capacity must be non-zero")]
fn zero_capacity() {
let _ = spsc::new::<i32>(0);
let _ = rtrb::new::<i32>(0);
}

#[test]
fn parallel() {
const COUNT: usize = 100_000;

let (p, c) = spsc::new(3);
let (p, c) = rtrb::new(3);

scope(|s| {
s.spawn(move |_| {
Expand Down Expand Up @@ -85,7 +80,7 @@ fn drops() {
let additional = rng.gen_range(0, 50);

DROPS.store(0, Ordering::SeqCst);
let (p, c) = spsc::new(50);
let (p, c) = rtrb::new(50);

let p = scope(|s| {
s.spawn(move |_| {
Expand Down

0 comments on commit e92831f

Please sign in to comment.