Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix scheduler bug and add panics #946

Merged
merged 5 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 31 additions & 37 deletions pkg/protocol/engine/congestioncontrol/scheduler/drr/basicbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ func (b *BasicBuffer) Size() int {

// IssuerQueue returns the queue for the corresponding issuer.
func (b *BasicBuffer) IssuerQueue(issuerID iotago.AccountID) *IssuerQueue {
element, ok := b.activeIssuers.Get(issuerID)
if !ok {
element, exists := b.activeIssuers.Get(issuerID)
if !exists {
return nil
}
issuerQueue, isIQ := element.Value.(*IssuerQueue)
if !isIQ {
return nil
panic("buffer contains elements that are not issuer queues")
}

return issuerQueue
Expand Down Expand Up @@ -119,8 +119,8 @@ func (b *BasicBuffer) GetOrCreateIssuerQueue(issuerID iotago.AccountID) *IssuerQ

// RemoveIssuerQueue removes all blocks (submitted and ready) for the given issuer and deletes the issuer queue.
func (b *BasicBuffer) RemoveIssuerQueue(issuerID iotago.AccountID) {
element, ok := b.activeIssuers.Get(issuerID)
if !ok {
element, exists := b.activeIssuers.Get(issuerID)
if !exists {
return
}
issuerQueue, isIQ := element.Value.(*IssuerQueue)
Expand All @@ -135,8 +135,8 @@ func (b *BasicBuffer) RemoveIssuerQueue(issuerID iotago.AccountID) {

// RemoveIssuerQueueIfEmpty removes all blocks (submitted and ready) for the given issuer and deletes the issuer queue if it is empty.
func (b *BasicBuffer) RemoveIssuerQueueIfEmpty(issuerID iotago.AccountID) {
element, ok := b.activeIssuers.Get(issuerID)
if !ok {
element, exists := b.activeIssuers.Get(issuerID)
if !exists {
return
}
issuerQueue, isIQ := element.Value.(*IssuerQueue)
Expand Down Expand Up @@ -168,25 +168,6 @@ func (b *BasicBuffer) Submit(blk *blocks.Block, issuerQueue *IssuerQueue, quantu
return nil, true
}

// Unsubmit removes a block from the submitted blocks.
// If that block is already marked as ready, Unsubmit has no effect.
func (b *BasicBuffer) Unsubmit(block *blocks.Block) bool {
issuerID := block.IssuerID()

issuerQueue := b.IssuerQueue(issuerID)
if issuerQueue == nil {
return false
}

if !issuerQueue.Unsubmit(block) {
return false
}

b.size.Dec()

return true
}

// Ready marks a previously submitted block as ready to be scheduled.
func (b *BasicBuffer) Ready(block *blocks.Block) bool {
issuerQueue := b.IssuerQueue(block.IssuerID())
Expand All @@ -205,7 +186,7 @@ func (b *BasicBuffer) ReadyBlocksCount() (readyBlocksCount int) {
}

for q := start; ; {
readyBlocksCount += q.inbox.Len()
readyBlocksCount += q.readyHeap.Len()
q = b.Next()
if q == start {
break
Expand All @@ -222,8 +203,8 @@ func (b *BasicBuffer) TotalBlocksCount() (blocksCount int) {
return
}
for q := start; ; {
blocksCount += q.inbox.Len()
blocksCount += q.submitted.Size()
blocksCount += q.readyHeap.Len()
blocksCount += q.nonReadyMap.Size()
q = b.Next()
if q == start {
break
Expand Down Expand Up @@ -296,23 +277,32 @@ func (b *BasicBuffer) dropTail(quantumFunc func(iotago.AccountID) Deficit, maxBu
// remove as many blocks as necessary to stay within max buffer size
for b.Size() > maxBuffer {
// find the longest mana-scaled queue
maxIssuerID := b.longestQueueIssuerID(quantumFunc)
if longestQueue := b.IssuerQueue(maxIssuerID); longestQueue != nil {
if tail := longestQueue.RemoveTail(); tail != nil {
b.size.Dec()
droppedBlocks = append(droppedBlocks, tail)
}
maxIssuerID := b.mustLongestQueueIssuerID(quantumFunc)
longestQueue := b.IssuerQueue(maxIssuerID)
if longestQueue == nil {
panic("buffer is full, but longest queue does not exist")
}

tail := longestQueue.RemoveTail()
if tail == nil {
panic("buffer is full, but tail of longest queue does not exist")
}

b.size.Dec()
droppedBlocks = append(droppedBlocks, tail)

}

return droppedBlocks
}

func (b *BasicBuffer) longestQueueIssuerID(quantumFunc func(iotago.AccountID) Deficit) iotago.AccountID {
// mustLongestQueueIssuerID returns the issuerID of the longest queue in the buffer.
// This function panics if no longest queue is found.
func (b *BasicBuffer) mustLongestQueueIssuerID(quantumFunc func(iotago.AccountID) Deficit) iotago.AccountID {
start := b.Current()
ringStart := b.ring
maxScale := math.Inf(-1)
var maxIssuerID iotago.AccountID
maxIssuerID := iotago.EmptyAccountID
for q := start; ; {
if issuerQuantum := quantumFunc(q.IssuerID()); issuerQuantum > 0 {
if scale := float64(q.Work()) / float64(issuerQuantum); scale > maxScale {
Expand All @@ -332,6 +322,10 @@ func (b *BasicBuffer) longestQueueIssuerID(quantumFunc func(iotago.AccountID) De
}
}

if maxIssuerID == iotago.EmptyAccountID {
panic("no longest queue determined")
}

return maxIssuerID
}

Expand Down
83 changes: 42 additions & 41 deletions pkg/protocol/engine/congestioncontrol/scheduler/drr/issuerqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ import (

// IssuerQueue keeps the submitted blocks of an issuer.
type IssuerQueue struct {
issuerID iotago.AccountID
submitted *shrinkingmap.ShrinkingMap[iotago.BlockID, *blocks.Block]
inbox generalheap.Heap[timed.HeapKey, *blocks.Block]
size atomic.Int64
work atomic.Int64
issuerID iotago.AccountID
nonReadyMap *shrinkingmap.ShrinkingMap[iotago.BlockID, *blocks.Block]
readyHeap generalheap.Heap[timed.HeapKey, *blocks.Block]
size atomic.Int64
work atomic.Int64
}

// NewIssuerQueue returns a new IssuerQueue.
func NewIssuerQueue(issuerID iotago.AccountID) *IssuerQueue {
return &IssuerQueue{
issuerID: issuerID,
submitted: shrinkingmap.New[iotago.BlockID, *blocks.Block](),
issuerID: issuerID,
nonReadyMap: shrinkingmap.New[iotago.BlockID, *blocks.Block](),
}
}

Expand Down Expand Up @@ -58,18 +58,18 @@ func (q *IssuerQueue) IssuerID() iotago.AccountID {
return q.issuerID
}

// Submit submits a block for the queue.
// Submit submits a block for the queue. Return true if submitted and false if it was already submitted.
func (q *IssuerQueue) Submit(element *blocks.Block) bool {
// this is just a debugging check, it will never happen in practice
// this is just a debugging check, it should never happen in practice and we should panic if it does.
if blkIssuerID := element.IssuerID(); q.issuerID != blkIssuerID {
panic(fmt.Sprintf("issuerqueue: queue issuer ID(%x) and issuer ID(%x) does not match.", q.issuerID, blkIssuerID))
}

if _, submitted := q.submitted.Get(element.ID()); submitted {
if _, submitted := q.nonReadyMap.Get(element.ID()); submitted {
return false
}

q.submitted.Set(element.ID(), element)
q.nonReadyMap.Set(element.ID(), element)
q.size.Inc()
q.work.Add(int64(element.WorkScore()))

Expand All @@ -78,11 +78,11 @@ func (q *IssuerQueue) Submit(element *blocks.Block) bool {

// Unsubmit removes a previously submitted block from the queue.
func (q *IssuerQueue) Unsubmit(block *blocks.Block) bool {
if _, submitted := q.submitted.Get(block.ID()); !submitted {
if _, submitted := q.nonReadyMap.Get(block.ID()); !submitted {
return false
}

q.submitted.Delete(block.ID())
q.nonReadyMap.Delete(block.ID())
q.size.Dec()
q.work.Sub(int64(block.WorkScore()))

Expand All @@ -91,21 +91,21 @@ func (q *IssuerQueue) Unsubmit(block *blocks.Block) bool {

// Ready marks a previously submitted block as ready to be scheduled.
func (q *IssuerQueue) Ready(block *blocks.Block) bool {
if _, submitted := q.submitted.Get(block.ID()); !submitted {
if _, submitted := q.nonReadyMap.Get(block.ID()); !submitted {
return false
}

q.submitted.Delete(block.ID())
heap.Push(&q.inbox, &generalheap.HeapElement[timed.HeapKey, *blocks.Block]{Value: block, Key: timed.HeapKey(block.IssuingTime())})
q.nonReadyMap.Delete(block.ID())
heap.Push(&q.readyHeap, &generalheap.HeapElement[timed.HeapKey, *blocks.Block]{Value: block, Key: timed.HeapKey(block.IssuingTime())})

return true
}

// IDs returns the IDs of all submitted blocks (ready or not).
func (q *IssuerQueue) IDs() (ids []iotago.BlockID) {
ids = q.submitted.Keys()
ids = q.nonReadyMap.Keys()

for _, block := range q.inbox {
for _, block := range q.readyHeap {
ids = append(ids, block.Value.ID())
}

Expand All @@ -114,22 +114,22 @@ func (q *IssuerQueue) IDs() (ids []iotago.BlockID) {

// Front returns the first ready block in the queue.
func (q *IssuerQueue) Front() *blocks.Block {
if q == nil || q.inbox.Len() == 0 {
if q == nil || q.readyHeap.Len() == 0 {
return nil
}

return q.inbox[0].Value
return q.readyHeap[0].Value
}

// PopFront removes the first ready block from the queue.
func (q *IssuerQueue) PopFront() *blocks.Block {
if q.inbox.Len() == 0 {
if q.readyHeap.Len() == 0 {
return nil
}

heapElement, isHeapElement := heap.Pop(&q.inbox).(*generalheap.HeapElement[timed.HeapKey, *blocks.Block])
heapElement, isHeapElement := heap.Pop(&q.readyHeap).(*generalheap.HeapElement[timed.HeapKey, *blocks.Block])
if !isHeapElement {
return nil
panic("unable to pop from a non-empty heap.")
}
blk := heapElement.Value
q.size.Dec()
Expand All @@ -138,31 +138,32 @@ func (q *IssuerQueue) PopFront() *blocks.Block {
return blk
}

// RemoveTail removes the oldest block from the queue.
func (q *IssuerQueue) RemoveTail() *blocks.Block {
var oldestSubmittedBlock *blocks.Block
q.submitted.ForEach(func(_ iotago.BlockID, block *blocks.Block) bool {
if oldestSubmittedBlock == nil || oldestSubmittedBlock.IssuingTime().After(block.IssuingTime()) {
oldestSubmittedBlock = block
var oldestNonReadyBlock *blocks.Block
q.nonReadyMap.ForEach(func(_ iotago.BlockID, block *blocks.Block) bool {
if oldestNonReadyBlock == nil || oldestNonReadyBlock.IssuingTime().After(block.IssuingTime()) {
oldestNonReadyBlock = block
}

return true
})

tail := q.tail()
// if heap tail does not exist or tail is newer than oldest submitted block, unsubmit oldest block
if oldestSubmittedBlock != nil && (tail < 0 || q.inbox[tail].Key.CompareTo(timed.HeapKey(oldestSubmittedBlock.IssuingTime())) > 0) {
q.Unsubmit(oldestSubmittedBlock)

return oldestSubmittedBlock
} else if tail < 0 {
// should never happen that the oldest submitted block does not exist and the tail does not exist.
return nil
heapTailIndex := q.heapTail()
// if heap tail (oldest ready block) does not exist or is newer than oldest non-ready block, unsubmit the oldest non-ready block
if oldestNonReadyBlock != nil && (heapTailIndex < 0 || q.readyHeap[heapTailIndex].Key.CompareTo(timed.HeapKey(oldestNonReadyBlock.IssuingTime())) > 0) {
if q.Unsubmit(oldestNonReadyBlock) {
return oldestNonReadyBlock
}
} else if heapTailIndex < 0 { // the heap is empty
// should never happen that the oldest submitted block does not exist and the heap is empty.
panic("heap tail and oldest submitted block do not exist. Trying to remove tail of an empty queue?")
}

// if the tail exists and is older than the oldest submitted block, drop it
heapElement, isHeapElement := heap.Remove(&q.inbox, tail).(*generalheap.HeapElement[timed.HeapKey, *blocks.Block])
// if the oldest ready block is older than the oldest non-ready block, drop it
heapElement, isHeapElement := heap.Remove(&q.readyHeap, heapTailIndex).(*generalheap.HeapElement[timed.HeapKey, *blocks.Block])
if !isHeapElement {
return nil
panic("trying to remove a heap element that does not exist.")
}
blk := heapElement.Value
q.size.Dec()
Expand All @@ -171,8 +172,8 @@ func (q *IssuerQueue) RemoveTail() *blocks.Block {
return blk
}

func (q *IssuerQueue) tail() int {
h := q.inbox
func (q *IssuerQueue) heapTail() int {
h := q.readyHeap
if h.Len() <= 0 {
return -1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,15 +580,15 @@ func (s *Scheduler) selectIssuer(start *IssuerQueue, slot iotago.SlotIndex) (Def

func (s *Scheduler) removeIssuer(issuerID iotago.AccountID, err error) {
q := s.basicBuffer.IssuerQueue(issuerID)
q.submitted.ForEach(func(_ iotago.BlockID, block *blocks.Block) bool {
q.nonReadyMap.ForEach(func(_ iotago.BlockID, block *blocks.Block) bool {
block.SetDropped()
s.events.BlockDropped.Trigger(block, err)

return true
})

for q.inbox.Len() > 0 {
block := q.PopFront()
for i := 0; i < q.readyHeap.Len(); i++ {
block := q.readyHeap[i].Value
block.SetDropped()
s.events.BlockDropped.Trigger(block, err)
}
Expand Down
Loading