Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue Library #7

Merged
merged 4 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ Pure Golang errors library with stacktrace support (for wrapping and formatting

[See here](errors/README.md).

## queue

A Pure Golang low-level and simple queue library for thread-safe and unlimited-size generics in-memory message queue implementation (async enqueue and blocking dequeue supports).\
The alternative way to communicate between goroutines compared to `channel`

[See here](queue/README.md).

## address

Ethereum address utilities package. (a [go-ethereum](https://github.com/ethereum/go-ethereum) helper library)
Expand Down
1 change: 1 addition & 0 deletions go.work
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ use (
./address
./errors
./fixedpoint
./queue
./utils
)
24 changes: 24 additions & 0 deletions queue/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[![GoDoc](https://godoc.org/github.com/Cleverse/go-utilities/queue?status.svg)](http://godoc.org/github.com/Cleverse/go-utilities/queue)
[![Report card](https://goreportcard.com/badge/github.com/Cleverse/go-utilities/queue)](https://goreportcard.com/report/github.com/Cleverse/go-utilities/queue)

# queue

A Pure Golang thread-safe and unlimited-size generics in-memory message queue implementation
that supports async enqueue and blocking dequeue. \
It's alternative way to communicate between goroutines compared to `channel`

> **Note:** \
> This package is not intended to be used as a distributed message queue. For advanced use-cases like distributed queue, persistent message please use a message broker like Kafka, RabbitMQ, NATES or NSQ instead.
>
> And if your use-case requires a limited-size queue and blocking enqueue, please use a channel instead.

This package is low-level and simple queue library, it's not a full-featured message queue. \
You can build any advanced message queue on top of this queue (use this queue for under the hood)
like an advance message queue like a single-producer with multiple-consumers queue,
broadcast system, multiple topics queue or any other use-cases.

## Installation

```shell
go get github.com/Cleverse/go-utilities/queue
```
3 changes: 3 additions & 0 deletions queue/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/Cleverse/go-utilities/queue

go 1.18
Empty file added queue/go.sum
Empty file.
159 changes: 159 additions & 0 deletions queue/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Package queue provides a Pure Golang thread-safe and unlimited-size generics in-memory message queue implementation
// that supports async enqueue and blocking dequeue.
// It's alternative way to communicate between goroutines compared to `channel`.
// The implementation of this in-memory message queue uses sync.Cond instead of channel.
//
// This queue is low-level and simple library, it's not a full-featured message queue.
// If your use-case requires a limited-size queue and blocking enqueue, please use a channel instead.
// For advanced use-cases like distributed queue, persistent message please use a message broker like Kafka, RabbitMQ, NATES or NSQ instead.
//
// You can build any advanced message queue on top of this queue (use this queue for under the hood)
// like an advance message queue like a single-producer with multiple-consumers queue,
// broadcast system, multiple topics queue or any other use-cases.
package queue

import (
"sync"
)

// Queue a instance of thread-safe and unlimited-size generics in-memory message queue
// and alternative way to communicate between goroutines compared to channel.
type Queue[T comparable] struct {
cond *sync.Cond
items []T
mu sync.RWMutex
isClosed bool
}

// New creates a new message queue.
func New[T comparable]() *Queue[T] {
q := &Queue[T]{
items: make([]T, 0),
}
q.cond = sync.NewCond(&q.mu)
return q
}

// Close closes the queue.
// queue will be permanent unusable after this method is called.
func (q *Queue[T]) Close() {
q.mu.Lock()
defer q.mu.Unlock()

q.isClosed = true
q.items = nil
q.cond.Broadcast()
}

// IsClosed returns true if the queue is closed.
func (q *Queue[T]) IsClosed() bool {
q.mu.RLock()
defer q.mu.RUnlock()
return q.isClosed
}

// Enqueue adds an item to the end of the queue. returns the index of the item.
func (q *Queue[T]) Enqueue(item T) (index int) {
q.mu.Lock()
defer q.mu.Unlock()

if q.isClosed {
return -1
}

q.items = append(q.items, item)
q.cond.Signal()

return len(q.items) - 1
}

// Dequeue removes an item from the front of the queue.
//
// If the queue is empty, this method blocks(wait) until an item is available.
//
// returns second value as false if the queue is closed.
func (q *Queue[T]) Dequeue() (val T, ok bool) {
q.mu.Lock()
defer q.mu.Unlock()

for len(q.items) == 0 {
if q.isClosed {
return val, false
}
q.cond.Wait()
}

// Recheck the length of the queue after waking up from wait.
// or another goroutine might have dequeued the last item.
if q.isClosed || len(q.items) == 0 {
return val, false
}

item := q.items[0]
q.items = q.items[1:]

return item, true
}

// IsEmpty returns true if the queue is empty.
func (q *Queue[T]) IsEmpty() bool {
q.mu.RLock()
defer q.mu.RUnlock()
return len(q.items) == 0
}

// IndexOf returns the index of the first item that matches the target.
func (q *Queue[T]) IndexOf(target T) (index int) {
return q.IndexOfIter(func(item T) bool {
return item == target
})
}

// IndexOfIter returns the index of the first item that matches the callback.
// If no item matches the callback, -1 is returned.
func (q *Queue[T]) IndexOfIter(cb func(item T) bool) (index int) {
q.mu.RLock()
defer q.mu.RUnlock()
for i, item := range q.items {
if cb(item) {
return i
}
}
return -1
}

// Len returns the length of the queue.
func (q *Queue[T]) Len() int {
q.mu.RLock()
defer q.mu.RUnlock()
return len(q.items)
}

// Items returns a copy of the all items in the queue.
func (q *Queue[T]) Items() []T {
q.mu.RLock()
defer q.mu.RUnlock()
items := make([]T, len(q.items))
copy(items, q.items)
return items
}

// RemoveAt removes an item at the given index and returns the new size of the queue.
func (q *Queue[T]) RemoveAt(index int) (size int) {
q.mu.Lock()
defer q.mu.Unlock()
q.items = append(q.items[:index], q.items[index+1:]...)
return len(q.items)
}

// Clear removes all items from the queue and returns the size of the removed items.
func (q *Queue[T]) Clear() (size int) {
q.mu.Lock()
defer q.mu.Unlock()
size = len(q.items)

// clean up the slice without changing the capacity and allocation.
q.items = q.items[:0:cap(q.items)]

return size
}
143 changes: 143 additions & 0 deletions queue/queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package queue

import (
"sync"
"testing"
"time"
)

func TestEnqueueDequeue(t *testing.T) {
expecteds := []string{"a", "b", "c", "d", "e"}

q := New[string]()
for expectedIndex, v := range expecteds {
actualIndex := q.Enqueue(v)
if expectedIndex != actualIndex {
t.Errorf("index should be equal to the expected value, expected: %d, actual: %d", expectedIndex, actualIndex)
}
}

for _, expected := range expecteds {
actual, ok := q.Dequeue()
if !ok {
t.Errorf("item should be dequeued")
}
if expected != actual {
t.Errorf("item should be equal to the expected value, expected: %s, actual: %s", expected, actual)
}
}
}

func TestAsyncEnqueueDequeue(t *testing.T) {
expected := "A"
q := New[string]()
go func() {
time.Sleep(3 * time.Second)
q.Enqueue(expected)
}()

// expected to block until enqueue
actual, ok := q.Dequeue()
if !ok {
t.Errorf("item should be dequeued")
}
if expected != actual {
t.Errorf("item should be equal to the expected value, expected: %s, actual: %s", expected, actual)
}
}

func TestCloseConcurrency(t *testing.T) {
var (
q = New[int]()
wg sync.WaitGroup
)

wg.Add(1)
go func() {
defer wg.Done()
if _, ok := q.Dequeue(); ok {
t.Error("should not dequeue from an empty queue")
}
}()

time.Sleep(1 * time.Second)
wg.Add(1)
go func() {
defer wg.Done()
q.Close()
}()
wg.Wait()

actualIndex := q.Enqueue(1)
actualItem, ok := q.Dequeue()

if actualIndex != -1 {
t.Errorf("should not enqueue to a closed queue")
}
if actualItem != 0 {
t.Errorf("should not dequeue from a closed queue")
}
if ok {
t.Errorf("should not dequeue from a closed queue")
}
}

func TestClear(t *testing.T) {
q := New[int]()

expectedSize := 10
for i := 0; i < expectedSize; i++ {
q.Enqueue(i)
}

itemsCap := cap(q.items)
actualSize := q.Clear()

if q.Len() != 0 {
t.Errorf("queue size should be zero")
}
if expectedSize != actualSize {
t.Errorf("flushed size should be equal to the enqueued size")
}
if itemsCap != cap(q.items) {
t.Errorf("capacity should not change after flush")
}
}

func TestRemoveAt(t *testing.T) {
q := New[int]()
size := 10
expected := make([]int, 0, size)

for i := 0; i < size; i++ {
value := i + 1
expected = append(expected, value)
q.Enqueue(value)
}

removeValue := 5
removeIndex := q.IndexOf(removeValue)
newSize := q.RemoveAt(removeIndex)

// assert.Equal(t, size-1, newSize, "new size should be equal to the old size minus 1")
// assert.Equal(t, -1, q.IndexOf(removedValue), "index of removed item should be -1")
if size-1 != newSize {
t.Errorf("new size should be equal to the old size minus 1")
}
if -1 != q.IndexOf(removeValue) {
t.Errorf("index of removed item should be -1")
}

for i := 0; i < q.Len(); i++ {
item, ok := q.Dequeue()
if !ok {
t.Errorf("item should be dequeued")
}
if i != removeIndex && expected[i] != item {
t.Errorf("item should be equal to the expected value, expected: %d, actual: %d", expected[i], item)
}
if removeValue == item {
t.Errorf("removed item should not be in the queue")
}
}
}
Loading