diff --git a/openvidu/queue/queue.go b/openvidu/queue/queue.go index 26719c17684..8aca69b4a2e 100644 --- a/openvidu/queue/queue.go +++ b/openvidu/queue/queue.go @@ -16,6 +16,7 @@ package queue import ( "errors" + "sync" ) type Queue[T any] interface { @@ -30,7 +31,10 @@ var ( ErrQueueFull = errors.New("queue full") ) -type SliceQueue[T any] []T +type SliceQueue[T any] struct { + data []T + mutex sync.RWMutex +} func NewSliceQueue[T any]() Queue[T] { return &SliceQueue[T]{} @@ -38,34 +42,44 @@ func NewSliceQueue[T any]() Queue[T] { // Len returns the number of elements in the queue. func (q *SliceQueue[T]) Len() int { - return len(*q) + q.mutex.RLock() + defer q.mutex.RUnlock() + return len(q.data) } // Enqueue adds an element to the end of the queue. func (q *SliceQueue[T]) Enqueue(value T) error { - *q = append(*q, value) + q.mutex.Lock() + defer q.mutex.Unlock() + q.data = append(q.data, value) return nil } // Dequeue removes and returns the first element from the queue. func (q *SliceQueue[T]) Dequeue() (T, error) { - queue := *q - if len(*q) > 0 { - element := queue[0] - *q = queue[1:] - return element, nil + q.mutex.Lock() + defer q.mutex.Unlock() + + if len(q.data) == 0 { + var empty T + return empty, ErrQueueEmpty } - var empty T - return empty, ErrQueueEmpty + element := q.data[0] + q.data = q.data[1:] + return element, nil } // Contains checks if an element exists in the queue based on the given equality function. func (q *SliceQueue[T]) Contains(element T, equals func(T, T) bool) bool { - for _, e := range *q { + q.mutex.RLock() + defer q.mutex.RUnlock() + + for _, e := range q.data { if equals(e, element) { return true } } + return false }