From 4a847d56020274c809283f5832cfa64d0b33fc74 Mon Sep 17 00:00:00 2001 From: muXxer Date: Tue, 30 Apr 2024 10:25:00 +0200 Subject: [PATCH] Address review comments --- .../scheduler/drr/issuerqueue.go | 9 +- .../scheduler/drr/scheduler.go | 5 +- .../scheduler/drr/validatorbuffer.go | 83 +++++++++++++++++++ .../scheduler/drr/validatorqueue.go | 76 ----------------- 4 files changed, 92 insertions(+), 81 deletions(-) create mode 100644 pkg/protocol/engine/congestioncontrol/scheduler/drr/validatorbuffer.go diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/drr/issuerqueue.go b/pkg/protocol/engine/congestioncontrol/scheduler/drr/issuerqueue.go index f612551ca..7cde0038c 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/drr/issuerqueue.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/drr/issuerqueue.go @@ -19,12 +19,13 @@ import ( // IssuerQueue keeps the submitted blocks of an issuer. type IssuerQueue struct { issuerID iotago.AccountID - nonReadyMap *shrinkingmap.ShrinkingMap[iotago.BlockID, *blocks.Block] sizeChangedFunc func(totalSizeDelta int64, readySizeDelta int64, workDelta int64) - readyHeap generalheap.Heap[timed.HeapKey, *blocks.Block] - size atomic.Int64 - work atomic.Int64 + nonReadyMap *shrinkingmap.ShrinkingMap[iotago.BlockID, *blocks.Block] + readyHeap generalheap.Heap[timed.HeapKey, *blocks.Block] + + size atomic.Int64 + work atomic.Int64 } // NewIssuerQueue returns a new IssuerQueue. diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go b/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go index 3414a334c..d9dcf8412 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go @@ -387,7 +387,10 @@ func (s *Scheduler) selectBlockToScheduleWithLocking() { s.bufferMutex.Lock() defer s.bufferMutex.Unlock() - s.validatorBuffer.ScheduleNext() + s.validatorBuffer.ForEachValidatorQueue(func(_ iotago.AccountID, validatorQueue *ValidatorQueue) bool { + validatorQueue.ScheduleNext() + return true + }) s.selectBasicBlockWithoutLocking() } diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/drr/validatorbuffer.go b/pkg/protocol/engine/congestioncontrol/scheduler/drr/validatorbuffer.go new file mode 100644 index 000000000..8b68a2176 --- /dev/null +++ b/pkg/protocol/engine/congestioncontrol/scheduler/drr/validatorbuffer.go @@ -0,0 +1,83 @@ +package drr + +import ( + "go.uber.org/atomic" + + "github.com/iotaledger/hive.go/ds/shrinkingmap" + "github.com/iotaledger/iota-core/pkg/protocol/engine/blocks" + iotago "github.com/iotaledger/iota.go/v4" +) + +type ValidatorBuffer struct { + buffer *shrinkingmap.ShrinkingMap[iotago.AccountID, *ValidatorQueue] + size atomic.Int64 +} + +func NewValidatorBuffer() *ValidatorBuffer { + return &ValidatorBuffer{ + buffer: shrinkingmap.New[iotago.AccountID, *ValidatorQueue](), + } +} + +func (b *ValidatorBuffer) Size() int { + if b == nil { + return 0 + } + + return int(b.size.Load()) +} + +func (b *ValidatorBuffer) Get(accountID iotago.AccountID) (*ValidatorQueue, bool) { + return b.buffer.Get(accountID) +} + +func (b *ValidatorBuffer) GetOrCreate(accountID iotago.AccountID, onCreateCallback func(*ValidatorQueue)) *ValidatorQueue { + return b.buffer.Compute(accountID, func(currentValue *ValidatorQueue, exists bool) *ValidatorQueue { + if exists { + return currentValue + } + + queue := NewValidatorQueue(accountID, func(totalSizeDelta int64) { + b.size.Add(totalSizeDelta) + }) + if onCreateCallback != nil { + onCreateCallback(queue) + } + + return queue + }) +} + +// Ready marks a previously submitted block as ready to be scheduled. +func (b *ValidatorBuffer) Ready(block *blocks.Block) { + if validatorQueue, exists := b.Get(block.IssuerID()); exists { + validatorQueue.Ready(block) + } +} + +// ForEachValidatorQueue iterates over all validator queues. +func (b *ValidatorBuffer) ForEachValidatorQueue(consumer func(accountID iotago.AccountID, validatorQueue *ValidatorQueue) bool) { + b.buffer.ForEach(func(accountID iotago.AccountID, validatorQueue *ValidatorQueue) bool { + return consumer(accountID, validatorQueue) + }) +} + +// Delete removes all validator queues that match the predicate. +func (b *ValidatorBuffer) Delete(predicate func(element *ValidatorQueue) bool) { + b.buffer.ForEach(func(accountID iotago.AccountID, validatorQueue *ValidatorQueue) bool { + if predicate(validatorQueue) { + // validator workers need to be shut down first, otherwise they will hang on the shutdown channel. + validatorQueue.Shutdown() + b.buffer.Delete(accountID) + } + + return true + }) +} + +func (b *ValidatorBuffer) Clear() { + b.Delete(func(_ *ValidatorQueue) bool { + // remove all + return true + }) +} diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/drr/validatorqueue.go b/pkg/protocol/engine/congestioncontrol/scheduler/drr/validatorqueue.go index acf6a6193..39aea226b 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/drr/validatorqueue.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/drr/validatorqueue.go @@ -217,79 +217,3 @@ func (q ValidatorQueue) Shutdown() { q.Clear() } - -type ValidatorBuffer struct { - buffer *shrinkingmap.ShrinkingMap[iotago.AccountID, *ValidatorQueue] - size atomic.Int64 -} - -func NewValidatorBuffer() *ValidatorBuffer { - return &ValidatorBuffer{ - buffer: shrinkingmap.New[iotago.AccountID, *ValidatorQueue](), - } -} - -func (b *ValidatorBuffer) Size() int { - if b == nil { - return 0 - } - - return int(b.size.Load()) -} - -func (b *ValidatorBuffer) Get(accountID iotago.AccountID) (*ValidatorQueue, bool) { - return b.buffer.Get(accountID) -} - -func (b *ValidatorBuffer) GetOrCreate(accountID iotago.AccountID, onCreateCallback func(*ValidatorQueue)) *ValidatorQueue { - return b.buffer.Compute(accountID, func(currentValue *ValidatorQueue, exists bool) *ValidatorQueue { - if exists { - return currentValue - } - - queue := NewValidatorQueue(accountID, func(totalSizeDelta int64) { - b.size.Add(totalSizeDelta) - }) - if onCreateCallback != nil { - onCreateCallback(queue) - } - - return queue - }) -} - -// Ready marks a previously submitted block as ready to be scheduled. -func (b *ValidatorBuffer) Ready(block *blocks.Block) { - if validatorQueue, exists := b.Get(block.IssuerID()); exists { - validatorQueue.Ready(block) - } -} - -// ScheduleNext schedules the next blocks of all validator queues. -func (b *ValidatorBuffer) ScheduleNext() { - b.buffer.ForEach(func(_ iotago.AccountID, validatorQueue *ValidatorQueue) bool { - validatorQueue.ScheduleNext() - - return true - }) -} - -// Delete removes all validator queues that match the predicate. -func (b *ValidatorBuffer) Delete(predicate func(element *ValidatorQueue) bool) { - b.buffer.ForEach(func(accountID iotago.AccountID, validatorQueue *ValidatorQueue) bool { - if predicate(validatorQueue) { - // validator workers need to be shut down first, otherwise they will hang on the shutdown channel. - validatorQueue.Shutdown() - b.buffer.Delete(accountID) - } - - return true - }) -} - -func (b *ValidatorBuffer) Clear() { - b.Delete(func(_ *ValidatorQueue) bool { - // remove all - return true - }) -}