Skip to content
This repository has been archived by the owner on Jan 24, 2025. It is now read-only.

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
muXxer committed Apr 30, 2024
1 parent 541fd4c commit 4a847d5
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ import (
// IssuerQueue keeps the submitted blocks of an issuer.
type IssuerQueue struct {
issuerID iotago.AccountID
nonReadyMap *shrinkingmap.ShrinkingMap[iotago.BlockID, *blocks.Block]
sizeChangedFunc func(totalSizeDelta int64, readySizeDelta int64, workDelta int64)

readyHeap generalheap.Heap[timed.HeapKey, *blocks.Block]
size atomic.Int64
work atomic.Int64
nonReadyMap *shrinkingmap.ShrinkingMap[iotago.BlockID, *blocks.Block]
readyHeap generalheap.Heap[timed.HeapKey, *blocks.Block]

size atomic.Int64
work atomic.Int64
}

// NewIssuerQueue returns a new IssuerQueue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,10 @@ func (s *Scheduler) selectBlockToScheduleWithLocking() {
s.bufferMutex.Lock()
defer s.bufferMutex.Unlock()

s.validatorBuffer.ScheduleNext()
s.validatorBuffer.ForEachValidatorQueue(func(_ iotago.AccountID, validatorQueue *ValidatorQueue) bool {
validatorQueue.ScheduleNext()
return true
})

s.selectBasicBlockWithoutLocking()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package drr

import (
"go.uber.org/atomic"

"github.com/iotaledger/hive.go/ds/shrinkingmap"
"github.com/iotaledger/iota-core/pkg/protocol/engine/blocks"
iotago "github.com/iotaledger/iota.go/v4"
)

type ValidatorBuffer struct {
buffer *shrinkingmap.ShrinkingMap[iotago.AccountID, *ValidatorQueue]
size atomic.Int64
}

func NewValidatorBuffer() *ValidatorBuffer {
return &ValidatorBuffer{
buffer: shrinkingmap.New[iotago.AccountID, *ValidatorQueue](),
}
}

func (b *ValidatorBuffer) Size() int {
if b == nil {
return 0
}

return int(b.size.Load())
}

func (b *ValidatorBuffer) Get(accountID iotago.AccountID) (*ValidatorQueue, bool) {
return b.buffer.Get(accountID)
}

func (b *ValidatorBuffer) GetOrCreate(accountID iotago.AccountID, onCreateCallback func(*ValidatorQueue)) *ValidatorQueue {
return b.buffer.Compute(accountID, func(currentValue *ValidatorQueue, exists bool) *ValidatorQueue {
if exists {
return currentValue
}

queue := NewValidatorQueue(accountID, func(totalSizeDelta int64) {
b.size.Add(totalSizeDelta)
})
if onCreateCallback != nil {
onCreateCallback(queue)
}

return queue
})
}

// Ready marks a previously submitted block as ready to be scheduled.
func (b *ValidatorBuffer) Ready(block *blocks.Block) {
if validatorQueue, exists := b.Get(block.IssuerID()); exists {
validatorQueue.Ready(block)
}
}

// ForEachValidatorQueue iterates over all validator queues.
func (b *ValidatorBuffer) ForEachValidatorQueue(consumer func(accountID iotago.AccountID, validatorQueue *ValidatorQueue) bool) {
b.buffer.ForEach(func(accountID iotago.AccountID, validatorQueue *ValidatorQueue) bool {
return consumer(accountID, validatorQueue)
})
}

// Delete removes all validator queues that match the predicate.
func (b *ValidatorBuffer) Delete(predicate func(element *ValidatorQueue) bool) {
b.buffer.ForEach(func(accountID iotago.AccountID, validatorQueue *ValidatorQueue) bool {
if predicate(validatorQueue) {
// validator workers need to be shut down first, otherwise they will hang on the shutdown channel.
validatorQueue.Shutdown()
b.buffer.Delete(accountID)
}

return true
})
}

func (b *ValidatorBuffer) Clear() {
b.Delete(func(_ *ValidatorQueue) bool {
// remove all
return true
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -217,79 +217,3 @@ func (q ValidatorQueue) Shutdown() {

q.Clear()
}

type ValidatorBuffer struct {
buffer *shrinkingmap.ShrinkingMap[iotago.AccountID, *ValidatorQueue]
size atomic.Int64
}

func NewValidatorBuffer() *ValidatorBuffer {
return &ValidatorBuffer{
buffer: shrinkingmap.New[iotago.AccountID, *ValidatorQueue](),
}
}

func (b *ValidatorBuffer) Size() int {
if b == nil {
return 0
}

return int(b.size.Load())
}

func (b *ValidatorBuffer) Get(accountID iotago.AccountID) (*ValidatorQueue, bool) {
return b.buffer.Get(accountID)
}

func (b *ValidatorBuffer) GetOrCreate(accountID iotago.AccountID, onCreateCallback func(*ValidatorQueue)) *ValidatorQueue {
return b.buffer.Compute(accountID, func(currentValue *ValidatorQueue, exists bool) *ValidatorQueue {
if exists {
return currentValue
}

queue := NewValidatorQueue(accountID, func(totalSizeDelta int64) {
b.size.Add(totalSizeDelta)
})
if onCreateCallback != nil {
onCreateCallback(queue)
}

return queue
})
}

// Ready marks a previously submitted block as ready to be scheduled.
func (b *ValidatorBuffer) Ready(block *blocks.Block) {
if validatorQueue, exists := b.Get(block.IssuerID()); exists {
validatorQueue.Ready(block)
}
}

// ScheduleNext schedules the next blocks of all validator queues.
func (b *ValidatorBuffer) ScheduleNext() {
b.buffer.ForEach(func(_ iotago.AccountID, validatorQueue *ValidatorQueue) bool {
validatorQueue.ScheduleNext()

return true
})
}

// Delete removes all validator queues that match the predicate.
func (b *ValidatorBuffer) Delete(predicate func(element *ValidatorQueue) bool) {
b.buffer.ForEach(func(accountID iotago.AccountID, validatorQueue *ValidatorQueue) bool {
if predicate(validatorQueue) {
// validator workers need to be shut down first, otherwise they will hang on the shutdown channel.
validatorQueue.Shutdown()
b.buffer.Delete(accountID)
}

return true
})
}

func (b *ValidatorBuffer) Clear() {
b.Delete(func(_ *ValidatorQueue) bool {
// remove all
return true
})
}

0 comments on commit 4a847d5

Please sign in to comment.