Skip to content

Commit

Permalink
Merge pull request #7 from Cleverse/feature/queue
Browse files Browse the repository at this point in the history
Queue Library
  • Loading branch information
Planxnx authored Oct 23, 2023
2 parents 41abe6c + fedfe6d commit 80a98de
Show file tree
Hide file tree
Showing 7 changed files with 337 additions and 0 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ Pure Golang errors library with stacktrace support (for wrapping and formatting

[See here](errors/README.md).

## queue

A Pure Golang low-level and simple queue library for thread-safe and unlimited-size generics in-memory message queue implementation (async enqueue and blocking dequeue supports).\
The alternative way to communicate between goroutines compared to `channel`

[See here](queue/README.md).

## address

Ethereum address utilities package. (a [go-ethereum](https://github.com/ethereum/go-ethereum) helper library)
Expand Down
1 change: 1 addition & 0 deletions go.work
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ use (
./address
./errors
./fixedpoint
./queue
./utils
)
24 changes: 24 additions & 0 deletions queue/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[![GoDoc](https://godoc.org/github.com/Cleverse/go-utilities/queue?status.svg)](http://godoc.org/github.com/Cleverse/go-utilities/queue)
[![Report card](https://goreportcard.com/badge/github.com/Cleverse/go-utilities/queue)](https://goreportcard.com/report/github.com/Cleverse/go-utilities/queue)

# queue

A Pure Golang thread-safe and unlimited-size generics in-memory message queue implementation
that supports async enqueue and blocking dequeue. \
It's alternative way to communicate between goroutines compared to `channel`

> **Note:** \
> This package is not intended to be used as a distributed message queue. For advanced use-cases like distributed queue, persistent message please use a message broker like Kafka, RabbitMQ, NATES or NSQ instead.
>
> And if your use-case requires a limited-size queue and blocking enqueue, please use a channel instead.
This package is low-level and simple queue library, it's not a full-featured message queue. \
You can build any advanced message queue on top of this queue (use this queue for under the hood)
like an advance message queue like a single-producer with multiple-consumers queue,
broadcast system, multiple topics queue or any other use-cases.

## Installation

```shell
go get github.com/Cleverse/go-utilities/queue
```
3 changes: 3 additions & 0 deletions queue/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/Cleverse/go-utilities/queue

go 1.18
Empty file added queue/go.sum
Empty file.
159 changes: 159 additions & 0 deletions queue/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Package queue provides a Pure Golang thread-safe and unlimited-size generics in-memory message queue implementation
// that supports async enqueue and blocking dequeue.
// It's alternative way to communicate between goroutines compared to `channel`.
// The implementation of this in-memory message queue uses sync.Cond instead of channel.
//
// This queue is low-level and simple library, it's not a full-featured message queue.
// If your use-case requires a limited-size queue and blocking enqueue, please use a channel instead.
// For advanced use-cases like distributed queue, persistent message please use a message broker like Kafka, RabbitMQ, NATES or NSQ instead.
//
// You can build any advanced message queue on top of this queue (use this queue for under the hood)
// like an advance message queue like a single-producer with multiple-consumers queue,
// broadcast system, multiple topics queue or any other use-cases.
package queue

import (
"sync"
)

// Queue a instance of thread-safe and unlimited-size generics in-memory message queue
// and alternative way to communicate between goroutines compared to channel.
type Queue[T comparable] struct {
cond *sync.Cond
items []T
mu sync.RWMutex
isClosed bool
}

// New creates a new message queue.
func New[T comparable]() *Queue[T] {
q := &Queue[T]{
items: make([]T, 0),
}
q.cond = sync.NewCond(&q.mu)
return q
}

// Close closes the queue.
// queue will be permanent unusable after this method is called.
func (q *Queue[T]) Close() {
q.mu.Lock()
defer q.mu.Unlock()

q.isClosed = true
q.items = nil
q.cond.Broadcast()
}

// IsClosed returns true if the queue is closed.
func (q *Queue[T]) IsClosed() bool {
q.mu.RLock()
defer q.mu.RUnlock()
return q.isClosed
}

// Enqueue adds an item to the end of the queue. returns the index of the item.
func (q *Queue[T]) Enqueue(item T) (index int) {
q.mu.Lock()
defer q.mu.Unlock()

if q.isClosed {
return -1
}

q.items = append(q.items, item)
q.cond.Signal()

return len(q.items) - 1
}

// Dequeue removes an item from the front of the queue.
//
// If the queue is empty, this method blocks(wait) until an item is available.
//
// returns second value as false if the queue is closed.
func (q *Queue[T]) Dequeue() (val T, ok bool) {
q.mu.Lock()
defer q.mu.Unlock()

for len(q.items) == 0 {
if q.isClosed {
return val, false
}
q.cond.Wait()
}

// Recheck the length of the queue after waking up from wait.
// or another goroutine might have dequeued the last item.
if q.isClosed || len(q.items) == 0 {
return val, false
}

item := q.items[0]
q.items = q.items[1:]

return item, true
}

// IsEmpty returns true if the queue is empty.
func (q *Queue[T]) IsEmpty() bool {
q.mu.RLock()
defer q.mu.RUnlock()
return len(q.items) == 0
}

// IndexOf returns the index of the first item that matches the target.
func (q *Queue[T]) IndexOf(target T) (index int) {
return q.IndexOfIter(func(item T) bool {
return item == target
})
}

// IndexOfIter returns the index of the first item that matches the callback.
// If no item matches the callback, -1 is returned.
func (q *Queue[T]) IndexOfIter(cb func(item T) bool) (index int) {
q.mu.RLock()
defer q.mu.RUnlock()
for i, item := range q.items {
if cb(item) {
return i
}
}
return -1
}

// Len returns the length of the queue.
func (q *Queue[T]) Len() int {
q.mu.RLock()
defer q.mu.RUnlock()
return len(q.items)
}

// Items returns a copy of the all items in the queue.
func (q *Queue[T]) Items() []T {
q.mu.RLock()
defer q.mu.RUnlock()
items := make([]T, len(q.items))
copy(items, q.items)
return items
}

// RemoveAt removes an item at the given index and returns the new size of the queue.
func (q *Queue[T]) RemoveAt(index int) (size int) {
q.mu.Lock()
defer q.mu.Unlock()
q.items = append(q.items[:index], q.items[index+1:]...)
return len(q.items)
}

// Clear removes all items from the queue and returns the size of the removed items.
func (q *Queue[T]) Clear() (size int) {
q.mu.Lock()
defer q.mu.Unlock()
size = len(q.items)

// clean up the slice without changing the capacity and allocation.
q.items = q.items[:0:cap(q.items)]

return size
}
143 changes: 143 additions & 0 deletions queue/queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package queue

import (
"sync"
"testing"
"time"
)

func TestEnqueueDequeue(t *testing.T) {
expecteds := []string{"a", "b", "c", "d", "e"}

q := New[string]()
for expectedIndex, v := range expecteds {
actualIndex := q.Enqueue(v)
if expectedIndex != actualIndex {
t.Errorf("index should be equal to the expected value, expected: %d, actual: %d", expectedIndex, actualIndex)
}
}

for _, expected := range expecteds {
actual, ok := q.Dequeue()
if !ok {
t.Errorf("item should be dequeued")
}
if expected != actual {
t.Errorf("item should be equal to the expected value, expected: %s, actual: %s", expected, actual)
}
}
}

func TestAsyncEnqueueDequeue(t *testing.T) {
expected := "A"
q := New[string]()
go func() {
time.Sleep(3 * time.Second)
q.Enqueue(expected)
}()

// expected to block until enqueue
actual, ok := q.Dequeue()
if !ok {
t.Errorf("item should be dequeued")
}
if expected != actual {
t.Errorf("item should be equal to the expected value, expected: %s, actual: %s", expected, actual)
}
}

func TestCloseConcurrency(t *testing.T) {
var (
q = New[int]()
wg sync.WaitGroup
)

wg.Add(1)
go func() {
defer wg.Done()
if _, ok := q.Dequeue(); ok {
t.Error("should not dequeue from an empty queue")
}
}()

time.Sleep(1 * time.Second)
wg.Add(1)
go func() {
defer wg.Done()
q.Close()
}()
wg.Wait()

actualIndex := q.Enqueue(1)
actualItem, ok := q.Dequeue()

if actualIndex != -1 {
t.Errorf("should not enqueue to a closed queue")
}
if actualItem != 0 {
t.Errorf("should not dequeue from a closed queue")
}
if ok {
t.Errorf("should not dequeue from a closed queue")
}
}

func TestClear(t *testing.T) {
q := New[int]()

expectedSize := 10
for i := 0; i < expectedSize; i++ {
q.Enqueue(i)
}

itemsCap := cap(q.items)
actualSize := q.Clear()

if q.Len() != 0 {
t.Errorf("queue size should be zero")
}
if expectedSize != actualSize {
t.Errorf("flushed size should be equal to the enqueued size")
}
if itemsCap != cap(q.items) {
t.Errorf("capacity should not change after flush")
}
}

func TestRemoveAt(t *testing.T) {
q := New[int]()
size := 10
expected := make([]int, 0, size)

for i := 0; i < size; i++ {
value := i + 1
expected = append(expected, value)
q.Enqueue(value)
}

removeValue := 5
removeIndex := q.IndexOf(removeValue)
newSize := q.RemoveAt(removeIndex)

// assert.Equal(t, size-1, newSize, "new size should be equal to the old size minus 1")
// assert.Equal(t, -1, q.IndexOf(removedValue), "index of removed item should be -1")
if size-1 != newSize {
t.Errorf("new size should be equal to the old size minus 1")
}
if -1 != q.IndexOf(removeValue) {
t.Errorf("index of removed item should be -1")
}

for i := 0; i < q.Len(); i++ {
item, ok := q.Dequeue()
if !ok {
t.Errorf("item should be dequeued")
}
if i != removeIndex && expected[i] != item {
t.Errorf("item should be equal to the expected value, expected: %d, actual: %d", expected[i], item)
}
if removeValue == item {
t.Errorf("removed item should not be in the queue")
}
}
}

0 comments on commit 80a98de

Please sign in to comment.