Skip to content

Commit

Permalink
Merge pull request #440 from iotaledger/chore/scheduler-todos
Browse files Browse the repository at this point in the history
Scheduler todos
  • Loading branch information
cyberphysic4l authored Oct 30, 2023
2 parents 71b37b9 + 37a8edf commit 426e751
Showing 2 changed files with 131 additions and 107 deletions.
178 changes: 101 additions & 77 deletions pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go
Original file line number Diff line number Diff line change
@@ -6,16 +6,12 @@ import (
"time"

"github.com/iotaledger/hive.go/ds/shrinkingmap"
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/iota-core/pkg/protocol/engine/blocks"

iotago "github.com/iotaledger/iota.go/v4"
)

// ErrInsufficientMana is returned when the mana is insufficient.
var ErrInsufficientMana = ierrors.New("insufficient issuer's mana to schedule the block")

// region BufferQueue /////////////////////////////////////////////////////////////////////////////////////////////

// BufferQueue represents a buffer of IssuerQueue.
@@ -65,31 +61,83 @@ func (b *BufferQueue) IssuerQueue(issuerID iotago.AccountID) *IssuerQueue {
return issuerQueue
}

// IssuerQueueWork returns the total WorkScore of block in the queue for the corresponding issuer.
func (b *BufferQueue) IssuerQueueWork(issuerID iotago.AccountID) iotago.WorkScore {
issuerQueue := b.IssuerQueue(issuerID)
if issuerQueue == nil {
return 0
}

return issuerQueue.Work()
}

// IssuerQueueSize returns the number of blocks in the queue for the corresponding issuer.
func (b *BufferQueue) IssuerQueueBlockCount(issuerID iotago.AccountID) int {
issuerQueue := b.IssuerQueue(issuerID)
if issuerQueue == nil {
return 0
}

return issuerQueue.Size()
}

func (b *BufferQueue) CreateIssuerQueue(issuerID iotago.AccountID) *IssuerQueue {
issuerQueue := NewIssuerQueue(issuerID)
b.activeIssuers.Set(issuerID, b.ringInsert(issuerQueue))

return issuerQueue
}

func (b *BufferQueue) GetIssuerQueue(issuerID iotago.AccountID) (*IssuerQueue, error) {
func (b *BufferQueue) GetOrCreateIssuerQueue(issuerID iotago.AccountID) *IssuerQueue {
element, issuerActive := b.activeIssuers.Get(issuerID)
if !issuerActive {
return nil, ierrors.New("issuer queue does not exist")
// create new issuer queue
return b.CreateIssuerQueue(issuerID)
}
issuerQueue, isIQ := element.Value.(*IssuerQueue)
if !isIQ {
return nil, ierrors.New("buffer contains elements that are not issuer queues")
panic("buffer contains elements that are not issuer queues")
}

// issuer queue exists
return issuerQueue, nil
return issuerQueue
}

// RemoveIssuerQueue removes all blocks (submitted and ready) for the given issuer and deletes the issuer queue.
func (b *BufferQueue) RemoveIssuerQueue(issuerID iotago.AccountID) {
element, ok := b.activeIssuers.Get(issuerID)
if !ok {
return
}
issuerQueue, isIQ := element.Value.(*IssuerQueue)
if !isIQ {
panic("buffer contains elements that are not issuer queues")
}
b.size -= issuerQueue.Size()

b.ringRemove(element)
b.activeIssuers.Delete(issuerID)
}

// RemoveIssuerQueueIfEmpty removes all blocks (submitted and ready) for the given issuer and deletes the issuer queue if it is empty.
func (b *BufferQueue) RemoveIssuerQueueIfEmpty(issuerID iotago.AccountID) {
element, ok := b.activeIssuers.Get(issuerID)
if !ok {
return
}
issuerQueue, isIQ := element.Value.(*IssuerQueue)
if !isIQ {
panic("buffer contains elements that are not issuer queues")
}

if issuerQueue.Size() == 0 {
b.ringRemove(element)
b.activeIssuers.Delete(issuerID)
}
}

// Submit submits a block. Return blocks dropped from the scheduler to make room for the submitted block.
// The submitted block can also be returned as dropped if the issuer does not have enough mana.
func (b *BufferQueue) Submit(blk *blocks.Block, issuerQueue *IssuerQueue, quantumFunc func(iotago.AccountID) Deficit, maxBuffer int) ([]*blocks.Block, bool) {

// first we submit the block, and if it turns out that the issuer doesn't have enough bandwidth to submit, it will be removed by dropTail
if !issuerQueue.Submit(blk) {
return nil, false
@@ -105,45 +153,6 @@ func (b *BufferQueue) Submit(blk *blocks.Block, issuerQueue *IssuerQueue, quantu
return nil, true
}

func (b *BufferQueue) dropTail(quantumFunc func(iotago.AccountID) Deficit, maxBuffer int) (droppedBlocks []*blocks.Block) {
start := b.Current()
ringStart := b.ring
// remove as many blocks as necessary to stay within max buffer size
for b.Size() > maxBuffer {
// TODO: extract to util func
// find longest mana-scaled queue
maxScale := math.Inf(-1)
var maxIssuerID iotago.AccountID
for q := start; ; {
if issuerQuantum := quantumFunc(q.IssuerID()); issuerQuantum > 0 {
if scale := float64(q.Work()) / float64(issuerQuantum); scale > maxScale {
maxScale = scale
maxIssuerID = q.IssuerID()
}
} else if q.Size() > 0 {
maxIssuerID = q.IssuerID()
// return to the start of the issuer ring and break as this is the max value we can have.
b.ring = ringStart

break
}
q = b.Next()
if q == start {
break
}
}

if longestQueue := b.IssuerQueue(maxIssuerID); longestQueue != nil {
if tail := longestQueue.RemoveTail(); tail != nil {
b.size--
droppedBlocks = append(droppedBlocks, tail)
}
}
}

return droppedBlocks
}

// Unsubmit removes a block from the submitted blocks.
// If that block is already marked as ready, Unsubmit has no effect.
func (b *BufferQueue) Unsubmit(block *blocks.Block) bool {
@@ -209,34 +218,6 @@ func (b *BufferQueue) TotalBlocksCount() (blocksCount int) {
return
}

// InsertIssuer creates a queue for the given issuer and adds it to the list of active issuers.
func (b *BufferQueue) InsertIssuer(issuerID iotago.AccountID) {
_, issuerActive := b.activeIssuers.Get(issuerID)
if issuerActive {
return
}

issuerQueue := NewIssuerQueue(issuerID)
b.activeIssuers.Set(issuerID, b.ringInsert(issuerQueue))
}

// RemoveIssuer removes all blocks (submitted and ready) for the given issuer.
func (b *BufferQueue) RemoveIssuer(issuerID iotago.AccountID) {
element, ok := b.activeIssuers.Get(issuerID)
if !ok {
return
}

issuerQueue, isIQ := element.Value.(*IssuerQueue)
if !isIQ {
return
}
b.size -= issuerQueue.Size()

b.ringRemove(element)
b.activeIssuers.Delete(issuerID)
}

// Next returns the next IssuerQueue in round-robin order.
func (b *BufferQueue) Next() *IssuerQueue {
if b.ring != nil {
@@ -297,6 +278,49 @@ func (b *BufferQueue) IssuerIDs() []iotago.AccountID {
return issuerIDs
}

func (b *BufferQueue) dropTail(quantumFunc func(iotago.AccountID) Deficit, maxBuffer int) (droppedBlocks []*blocks.Block) {
// remove as many blocks as necessary to stay within max buffer size
for b.Size() > maxBuffer {
// find the longest mana-scaled queue
maxIssuerID := b.longestQueueIssuerID(quantumFunc)
if longestQueue := b.IssuerQueue(maxIssuerID); longestQueue != nil {
if tail := longestQueue.RemoveTail(); tail != nil {
b.size--
droppedBlocks = append(droppedBlocks, tail)
}
}
}

return droppedBlocks
}

func (b *BufferQueue) longestQueueIssuerID(quantumFunc func(iotago.AccountID) Deficit) iotago.AccountID {
start := b.Current()
ringStart := b.ring
maxScale := math.Inf(-1)
var maxIssuerID iotago.AccountID
for q := start; ; {
if issuerQuantum := quantumFunc(q.IssuerID()); issuerQuantum > 0 {
if scale := float64(q.Work()) / float64(issuerQuantum); scale > maxScale {
maxScale = scale
maxIssuerID = q.IssuerID()
}
} else if q.Size() > 0 {
// if the issuer has no quantum, then this is the max queue size
maxIssuerID = q.IssuerID()
b.ring = ringStart

break
}
q = b.Next()
if q == start {
break
}
}

return maxIssuerID
}

func (b *BufferQueue) ringRemove(r *ring.Ring) {
n := b.ring.Next()
if r == b.ring {
60 changes: 30 additions & 30 deletions pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go
Original file line number Diff line number Diff line change
@@ -154,20 +154,20 @@ func (s *Scheduler) Start() {
s.TriggerInitialized()
}

// IssuerQueueSizeCount returns the number of blocks in the queue of the given issuer.
// IssuerQueueBlockCount returns the number of blocks in the queue of the given issuer.
func (s *Scheduler) IssuerQueueBlockCount(issuerID iotago.AccountID) int {
s.bufferMutex.RLock()
defer s.bufferMutex.RUnlock()

return s.basicBuffer.IssuerQueue(issuerID).Size()
return s.basicBuffer.IssuerQueueBlockCount(issuerID)
}

// IssuerQueueWork returns the queue size of the given issuer in work units.
func (s *Scheduler) IssuerQueueWork(issuerID iotago.AccountID) iotago.WorkScore {
s.bufferMutex.RLock()
defer s.bufferMutex.RUnlock()

return s.basicBuffer.IssuerQueue(issuerID).Work()
return s.basicBuffer.IssuerQueueWork(issuerID)
}

// ValidatorQueueBlockCount returns the number of validation blocks in the validator queue of the given issuer.
@@ -233,7 +233,7 @@ func (s *Scheduler) IsBlockIssuerReady(accountID iotago.AccountID, blocks ...*bl
return false
}

return deficit >= s.deficitFromWork(work+s.basicBuffer.IssuerQueue(accountID).Work())
return deficit >= s.deficitFromWork(work+s.basicBuffer.IssuerQueueWork(accountID))
}

func (s *Scheduler) AddBlock(block *blocks.Block) {
@@ -251,17 +251,7 @@ func (s *Scheduler) enqueueBasicBlock(block *blocks.Block) {
slot := s.latestCommittedSlot()

issuerID := block.ProtocolBlock().Header.IssuerID
issuerQueue, err := s.basicBuffer.GetIssuerQueue(issuerID)
if err != nil {
// this should only ever happen if the issuer has been removed due to insufficient Mana.
// if Mana is now sufficient again, we can add the issuer again.
_, quantumErr := s.quantumFunc(issuerID, slot)
if quantumErr != nil {
s.errorHandler(ierrors.Wrapf(quantumErr, "failed to retrieve quantum for issuerID %s in slot %d when adding a block", issuerID, slot))
}

issuerQueue = s.createIssuer(issuerID)
}
issuerQueue := s.getOrCreateIssuer(issuerID)

droppedBlocks, submitted := s.basicBuffer.Submit(
block,
@@ -443,7 +433,7 @@ func (s *Scheduler) selectBasicBlockWithoutLocking() {
// increment every issuer's deficit for the required number of rounds
for q := start; ; {
issuerID := q.IssuerID()
if err := s.incrementDeficit(issuerID, rounds, slot); err != nil {
if _, err := s.incrementDeficit(issuerID, rounds, slot); err != nil {
s.errorHandler(ierrors.Wrapf(err, "failed to increment deficit for issuerID %s in slot %d", issuerID, slot))
s.removeIssuer(issuerID, err)

@@ -462,20 +452,24 @@ func (s *Scheduler) selectBasicBlockWithoutLocking() {
// increment the deficit for all issuers before schedulingIssuer one more time
for q := start; q != schedulingIssuer; q = s.basicBuffer.Next() {
issuerID := q.IssuerID()
if err := s.incrementDeficit(issuerID, 1, slot); err != nil {
newDeficit, err := s.incrementDeficit(issuerID, 1, slot)
if err != nil {
s.errorHandler(ierrors.Wrapf(err, "failed to increment deficit for issuerID %s in slot %d", issuerID, slot))
s.removeIssuer(issuerID, err)

return
}

// remove empty issuer queues of issuers with max deficit.
if newDeficit == s.maxDeficit() {
s.basicBuffer.RemoveIssuerQueueIfEmpty(issuerID)
}
}

// remove the block from the buffer and adjust issuer's deficit
block := s.basicBuffer.PopFront()
issuerID := block.ProtocolBlock().Header.IssuerID
err := s.updateDeficit(issuerID, -s.deficitFromWork(block.WorkScore()))

if err != nil {
if _, err := s.updateDeficit(issuerID, -s.deficitFromWork(block.WorkScore())); err != nil {
// if something goes wrong with deficit update, drop the block instead of scheduling it.
block.SetDropped()
s.events.BlockDropped.Trigger(block, err)
@@ -576,8 +570,14 @@ func (s *Scheduler) removeIssuer(issuerID iotago.AccountID, err error) {
}

s.deficits.Delete(issuerID)
s.basicBuffer.RemoveIssuerQueue(issuerID)
}

s.basicBuffer.RemoveIssuer(issuerID)
func (s *Scheduler) getOrCreateIssuer(accountID iotago.AccountID) *IssuerQueue {
issuerQueue := s.basicBuffer.GetOrCreateIssuerQueue(accountID)
s.deficits.GetOrCreate(accountID, func() Deficit { return 0 })

return issuerQueue
}

func (s *Scheduler) createIssuer(accountID iotago.AccountID) *IssuerQueue {
@@ -587,16 +587,16 @@ func (s *Scheduler) createIssuer(accountID iotago.AccountID) *IssuerQueue {
return issuerQueue
}

func (s *Scheduler) updateDeficit(accountID iotago.AccountID, delta Deficit) error {
func (s *Scheduler) updateDeficit(accountID iotago.AccountID, delta Deficit) (Deficit, error) {
var updateErr error
s.deficits.Compute(accountID, func(currentValue Deficit, exists bool) Deficit {
updatedDeficit := s.deficits.Compute(accountID, func(currentValue Deficit, exists bool) Deficit {
if !exists {
updateErr = ierrors.Errorf("could not get deficit for issuer %s", accountID)
return 0
}
newDeficit, err := safemath.SafeAdd(currentValue, delta)
if err != nil {
// It can only overflow. We never allow the value to go below 0, so underflow is impossible.
// It can only overflow. We never allow the value to go below 0, so underflow is impossible.
if err != nil || newDeficit >= s.maxDeficit() {
return s.maxDeficit()
}

@@ -606,22 +606,22 @@ func (s *Scheduler) updateDeficit(accountID iotago.AccountID, delta Deficit) err
return 0
}

return lo.Min(newDeficit, s.maxDeficit())
return newDeficit
})

if updateErr != nil {
s.removeIssuer(accountID, updateErr)

return updateErr
return 0, updateErr
}

return nil
return updatedDeficit, nil
}

func (s *Scheduler) incrementDeficit(issuerID iotago.AccountID, rounds Deficit, slot iotago.SlotIndex) error {
func (s *Scheduler) incrementDeficit(issuerID iotago.AccountID, rounds Deficit, slot iotago.SlotIndex) (Deficit, error) {
quantum, err := s.quantumFunc(issuerID, slot)
if err != nil {
return ierrors.Wrap(err, "failed to retrieve quantum")
return 0, ierrors.Wrap(err, "failed to retrieve quantum")
}

delta, err := safemath.SafeMul(quantum, rounds)

0 comments on commit 426e751

Please sign in to comment.