Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler todos #440

Merged
merged 6 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 101 additions & 77 deletions pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,12 @@ import (
"time"

"github.com/iotaledger/hive.go/ds/shrinkingmap"
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/iota-core/pkg/protocol/engine/blocks"

iotago "github.com/iotaledger/iota.go/v4"
)

// ErrInsufficientMana is returned when the mana is insufficient.
var ErrInsufficientMana = ierrors.New("insufficient issuer's mana to schedule the block")

// region BufferQueue /////////////////////////////////////////////////////////////////////////////////////////////

// BufferQueue represents a buffer of IssuerQueue.
Expand Down Expand Up @@ -65,31 +61,83 @@ func (b *BufferQueue) IssuerQueue(issuerID iotago.AccountID) *IssuerQueue {
return issuerQueue
}

// IssuerQueueWork returns the total WorkScore of block in the queue for the corresponding issuer.
func (b *BufferQueue) IssuerQueueWork(issuerID iotago.AccountID) iotago.WorkScore {
issuerQueue := b.IssuerQueue(issuerID)
if issuerQueue == nil {
return 0
}

return issuerQueue.Work()
}

// IssuerQueueSize returns the number of blocks in the queue for the corresponding issuer.
func (b *BufferQueue) IssuerQueueBlockCount(issuerID iotago.AccountID) int {
issuerQueue := b.IssuerQueue(issuerID)
if issuerQueue == nil {
return 0
}

return issuerQueue.Size()
}

func (b *BufferQueue) CreateIssuerQueue(issuerID iotago.AccountID) *IssuerQueue {
issuerQueue := NewIssuerQueue(issuerID)
b.activeIssuers.Set(issuerID, b.ringInsert(issuerQueue))

return issuerQueue
}

func (b *BufferQueue) GetIssuerQueue(issuerID iotago.AccountID) (*IssuerQueue, error) {
func (b *BufferQueue) GetOrCreateIssuerQueue(issuerID iotago.AccountID) *IssuerQueue {
element, issuerActive := b.activeIssuers.Get(issuerID)
if !issuerActive {
return nil, ierrors.New("issuer queue does not exist")
// create new issuer queue
return b.CreateIssuerQueue(issuerID)
}
issuerQueue, isIQ := element.Value.(*IssuerQueue)
if !isIQ {
return nil, ierrors.New("buffer contains elements that are not issuer queues")
panic("buffer contains elements that are not issuer queues")
}

// issuer queue exists
return issuerQueue, nil
return issuerQueue
}

// RemoveIssuerQueue removes all blocks (submitted and ready) for the given issuer and deletes the issuer queue.
func (b *BufferQueue) RemoveIssuerQueue(issuerID iotago.AccountID) {
element, ok := b.activeIssuers.Get(issuerID)
if !ok {
return
}
issuerQueue, isIQ := element.Value.(*IssuerQueue)
if !isIQ {
panic("buffer contains elements that are not issuer queues")
}
b.size -= issuerQueue.Size()

b.ringRemove(element)
b.activeIssuers.Delete(issuerID)
}

// RemoveIssuerQueueIfEmpty removes all blocks (submitted and ready) for the given issuer and deletes the issuer queue if it is empty.
func (b *BufferQueue) RemoveIssuerQueueIfEmpty(issuerID iotago.AccountID) {
element, ok := b.activeIssuers.Get(issuerID)
if !ok {
return
}
issuerQueue, isIQ := element.Value.(*IssuerQueue)
if !isIQ {
panic("buffer contains elements that are not issuer queues")
}

if issuerQueue.Size() == 0 {
b.ringRemove(element)
b.activeIssuers.Delete(issuerID)
}
}

// Submit submits a block. Return blocks dropped from the scheduler to make room for the submitted block.
// The submitted block can also be returned as dropped if the issuer does not have enough mana.
func (b *BufferQueue) Submit(blk *blocks.Block, issuerQueue *IssuerQueue, quantumFunc func(iotago.AccountID) Deficit, maxBuffer int) ([]*blocks.Block, bool) {

// first we submit the block, and if it turns out that the issuer doesn't have enough bandwidth to submit, it will be removed by dropTail
if !issuerQueue.Submit(blk) {
return nil, false
Expand All @@ -105,45 +153,6 @@ func (b *BufferQueue) Submit(blk *blocks.Block, issuerQueue *IssuerQueue, quantu
return nil, true
}

func (b *BufferQueue) dropTail(quantumFunc func(iotago.AccountID) Deficit, maxBuffer int) (droppedBlocks []*blocks.Block) {
start := b.Current()
ringStart := b.ring
// remove as many blocks as necessary to stay within max buffer size
for b.Size() > maxBuffer {
// TODO: extract to util func
// find longest mana-scaled queue
maxScale := math.Inf(-1)
var maxIssuerID iotago.AccountID
for q := start; ; {
if issuerQuantum := quantumFunc(q.IssuerID()); issuerQuantum > 0 {
if scale := float64(q.Work()) / float64(issuerQuantum); scale > maxScale {
maxScale = scale
maxIssuerID = q.IssuerID()
}
} else if q.Size() > 0 {
maxIssuerID = q.IssuerID()
// return to the start of the issuer ring and break as this is the max value we can have.
b.ring = ringStart

break
}
q = b.Next()
if q == start {
break
}
}

if longestQueue := b.IssuerQueue(maxIssuerID); longestQueue != nil {
if tail := longestQueue.RemoveTail(); tail != nil {
b.size--
droppedBlocks = append(droppedBlocks, tail)
}
}
}

return droppedBlocks
}

// Unsubmit removes a block from the submitted blocks.
// If that block is already marked as ready, Unsubmit has no effect.
func (b *BufferQueue) Unsubmit(block *blocks.Block) bool {
Expand Down Expand Up @@ -209,34 +218,6 @@ func (b *BufferQueue) TotalBlocksCount() (blocksCount int) {
return
}

// InsertIssuer creates a queue for the given issuer and adds it to the list of active issuers.
func (b *BufferQueue) InsertIssuer(issuerID iotago.AccountID) {
_, issuerActive := b.activeIssuers.Get(issuerID)
if issuerActive {
return
}

issuerQueue := NewIssuerQueue(issuerID)
b.activeIssuers.Set(issuerID, b.ringInsert(issuerQueue))
}

// RemoveIssuer removes all blocks (submitted and ready) for the given issuer.
func (b *BufferQueue) RemoveIssuer(issuerID iotago.AccountID) {
element, ok := b.activeIssuers.Get(issuerID)
if !ok {
return
}

issuerQueue, isIQ := element.Value.(*IssuerQueue)
if !isIQ {
return
}
b.size -= issuerQueue.Size()

b.ringRemove(element)
b.activeIssuers.Delete(issuerID)
}

// Next returns the next IssuerQueue in round-robin order.
func (b *BufferQueue) Next() *IssuerQueue {
if b.ring != nil {
Expand Down Expand Up @@ -297,6 +278,49 @@ func (b *BufferQueue) IssuerIDs() []iotago.AccountID {
return issuerIDs
}

func (b *BufferQueue) dropTail(quantumFunc func(iotago.AccountID) Deficit, maxBuffer int) (droppedBlocks []*blocks.Block) {
// remove as many blocks as necessary to stay within max buffer size
for b.Size() > maxBuffer {
// find the longest mana-scaled queue
maxIssuerID := b.longestQueueIssuerID(quantumFunc)
if longestQueue := b.IssuerQueue(maxIssuerID); longestQueue != nil {
if tail := longestQueue.RemoveTail(); tail != nil {
b.size--
droppedBlocks = append(droppedBlocks, tail)
}
}
}

return droppedBlocks
}

func (b *BufferQueue) longestQueueIssuerID(quantumFunc func(iotago.AccountID) Deficit) iotago.AccountID {
start := b.Current()
ringStart := b.ring
maxScale := math.Inf(-1)
var maxIssuerID iotago.AccountID
for q := start; ; {
if issuerQuantum := quantumFunc(q.IssuerID()); issuerQuantum > 0 {
if scale := float64(q.Work()) / float64(issuerQuantum); scale > maxScale {
maxScale = scale
maxIssuerID = q.IssuerID()
}
} else if q.Size() > 0 {
// if the issuer has no quantum, then this is the max queue size
maxIssuerID = q.IssuerID()
b.ring = ringStart

break
}
q = b.Next()
if q == start {
break
}
}

return maxIssuerID
}

func (b *BufferQueue) ringRemove(r *ring.Ring) {
n := b.ring.Next()
if r == b.ring {
Expand Down
60 changes: 30 additions & 30 deletions pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,20 +154,20 @@ func (s *Scheduler) Start() {
s.TriggerInitialized()
}

// IssuerQueueSizeCount returns the number of blocks in the queue of the given issuer.
// IssuerQueueBlockCount returns the number of blocks in the queue of the given issuer.
func (s *Scheduler) IssuerQueueBlockCount(issuerID iotago.AccountID) int {
s.bufferMutex.RLock()
defer s.bufferMutex.RUnlock()

return s.basicBuffer.IssuerQueue(issuerID).Size()
return s.basicBuffer.IssuerQueueBlockCount(issuerID)
}

// IssuerQueueWork returns the queue size of the given issuer in work units.
func (s *Scheduler) IssuerQueueWork(issuerID iotago.AccountID) iotago.WorkScore {
s.bufferMutex.RLock()
defer s.bufferMutex.RUnlock()

return s.basicBuffer.IssuerQueue(issuerID).Work()
return s.basicBuffer.IssuerQueueWork(issuerID)
}

// ValidatorQueueBlockCount returns the number of validation blocks in the validator queue of the given issuer.
Expand Down Expand Up @@ -233,7 +233,7 @@ func (s *Scheduler) IsBlockIssuerReady(accountID iotago.AccountID, blocks ...*bl
return false
}

return deficit >= s.deficitFromWork(work+s.basicBuffer.IssuerQueue(accountID).Work())
return deficit >= s.deficitFromWork(work+s.basicBuffer.IssuerQueueWork(accountID))
}

func (s *Scheduler) AddBlock(block *blocks.Block) {
Expand All @@ -251,17 +251,7 @@ func (s *Scheduler) enqueueBasicBlock(block *blocks.Block) {
slot := s.latestCommittedSlot()

issuerID := block.ProtocolBlock().Header.IssuerID
issuerQueue, err := s.basicBuffer.GetIssuerQueue(issuerID)
if err != nil {
// this should only ever happen if the issuer has been removed due to insufficient Mana.
// if Mana is now sufficient again, we can add the issuer again.
_, quantumErr := s.quantumFunc(issuerID, slot)
if quantumErr != nil {
s.errorHandler(ierrors.Wrapf(quantumErr, "failed to retrieve quantum for issuerID %s in slot %d when adding a block", issuerID, slot))
}

issuerQueue = s.createIssuer(issuerID)
}
issuerQueue := s.getOrCreateIssuer(issuerID)

droppedBlocks, submitted := s.basicBuffer.Submit(
block,
Expand Down Expand Up @@ -443,7 +433,7 @@ func (s *Scheduler) selectBasicBlockWithoutLocking() {
// increment every issuer's deficit for the required number of rounds
for q := start; ; {
issuerID := q.IssuerID()
if err := s.incrementDeficit(issuerID, rounds, slot); err != nil {
if _, err := s.incrementDeficit(issuerID, rounds, slot); err != nil {
s.errorHandler(ierrors.Wrapf(err, "failed to increment deficit for issuerID %s in slot %d", issuerID, slot))
s.removeIssuer(issuerID, err)

Expand All @@ -462,20 +452,24 @@ func (s *Scheduler) selectBasicBlockWithoutLocking() {
// increment the deficit for all issuers before schedulingIssuer one more time
for q := start; q != schedulingIssuer; q = s.basicBuffer.Next() {
issuerID := q.IssuerID()
if err := s.incrementDeficit(issuerID, 1, slot); err != nil {
newDeficit, err := s.incrementDeficit(issuerID, 1, slot)
if err != nil {
s.errorHandler(ierrors.Wrapf(err, "failed to increment deficit for issuerID %s in slot %d", issuerID, slot))
s.removeIssuer(issuerID, err)

return
}

// remove empty issuer queues of issuers with max deficit.
if newDeficit == s.maxDeficit() {
s.basicBuffer.RemoveIssuerQueueIfEmpty(issuerID)
}
}

// remove the block from the buffer and adjust issuer's deficit
block := s.basicBuffer.PopFront()
issuerID := block.ProtocolBlock().Header.IssuerID
err := s.updateDeficit(issuerID, -s.deficitFromWork(block.WorkScore()))

if err != nil {
if _, err := s.updateDeficit(issuerID, -s.deficitFromWork(block.WorkScore())); err != nil {
// if something goes wrong with deficit update, drop the block instead of scheduling it.
block.SetDropped()
s.events.BlockDropped.Trigger(block, err)
Expand Down Expand Up @@ -576,8 +570,14 @@ func (s *Scheduler) removeIssuer(issuerID iotago.AccountID, err error) {
}

s.deficits.Delete(issuerID)
s.basicBuffer.RemoveIssuerQueue(issuerID)
}

s.basicBuffer.RemoveIssuer(issuerID)
func (s *Scheduler) getOrCreateIssuer(accountID iotago.AccountID) *IssuerQueue {
issuerQueue := s.basicBuffer.GetOrCreateIssuerQueue(accountID)
s.deficits.GetOrCreate(accountID, func() Deficit { return 0 })

return issuerQueue
}

func (s *Scheduler) createIssuer(accountID iotago.AccountID) *IssuerQueue {
Expand All @@ -587,16 +587,16 @@ func (s *Scheduler) createIssuer(accountID iotago.AccountID) *IssuerQueue {
return issuerQueue
}

func (s *Scheduler) updateDeficit(accountID iotago.AccountID, delta Deficit) error {
func (s *Scheduler) updateDeficit(accountID iotago.AccountID, delta Deficit) (Deficit, error) {
var updateErr error
s.deficits.Compute(accountID, func(currentValue Deficit, exists bool) Deficit {
updatedDeficit := s.deficits.Compute(accountID, func(currentValue Deficit, exists bool) Deficit {
if !exists {
updateErr = ierrors.Errorf("could not get deficit for issuer %s", accountID)
return 0
}
newDeficit, err := safemath.SafeAdd(currentValue, delta)
if err != nil {
// It can only overflow. We never allow the value to go below 0, so underflow is impossible.
// It can only overflow. We never allow the value to go below 0, so underflow is impossible.
if err != nil || newDeficit >= s.maxDeficit() {
return s.maxDeficit()
}

Expand All @@ -606,22 +606,22 @@ func (s *Scheduler) updateDeficit(accountID iotago.AccountID, delta Deficit) err
return 0
}

return lo.Min(newDeficit, s.maxDeficit())
return newDeficit
})

if updateErr != nil {
s.removeIssuer(accountID, updateErr)

return updateErr
return 0, updateErr
}

return nil
return updatedDeficit, nil
}

func (s *Scheduler) incrementDeficit(issuerID iotago.AccountID, rounds Deficit, slot iotago.SlotIndex) error {
func (s *Scheduler) incrementDeficit(issuerID iotago.AccountID, rounds Deficit, slot iotago.SlotIndex) (Deficit, error) {
quantum, err := s.quantumFunc(issuerID, slot)
if err != nil {
return ierrors.Wrap(err, "failed to retrieve quantum")
return 0, ierrors.Wrap(err, "failed to retrieve quantum")
}

delta, err := safemath.SafeMul(quantum, rounds)
Expand Down
Loading