Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validation block scheduler #319

Merged
merged 28 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
e5aeb47
start implementing validation block scheduler
cyberphysic4l Aug 31, 2023
cb2c531
switch to simplified validation scheduler
cyberphysic4l Sep 6, 2023
084c422
complete simplified scheduler
cyberphysic4l Sep 6, 2023
9f43ad2
good doggy
cyberphysic4l Sep 6, 2023
87d6cbf
shutdown correctly
cyberphysic4l Sep 6, 2023
991b933
Merge branch 'develop' into feat/validation-scheduler
cyberphysic4l Sep 6, 2023
9a7c072
race debugging
cyberphysic4l Sep 6, 2023
e31a4f5
race debugging
cyberphysic4l Sep 6, 2023
8729aed
charge maxBlockWork for over-issued validation blocks
cyberphysic4l Sep 7, 2023
e128af6
Merge branch 'develop' into feat/validation-scheduler
cyberphysic4l Sep 7, 2023
c7453b6
go mod tidy
cyberphysic4l Sep 7, 2023
e50578d
go mod tidy
cyberphysic4l Sep 7, 2023
9ae3447
defer penalizing over-issuer validators to issue #338
cyberphysic4l Sep 7, 2023
a9ff82f
Update to new protocol parameters
jkrvivian Sep 6, 2023
4bf1417
Update iota.go version
jkrvivian Sep 8, 2023
64dec2d
move committee check for validator to the filter
cyberphysic4l Sep 9, 2023
b21e9f2
add buffer drop policy to validator queues
cyberphysic4l Sep 9, 2023
91f7132
update presets with buffer params
cyberphysic4l Sep 9, 2023
f53f638
Add validator buffer metrics
cyberphysic4l Sep 11, 2023
2b5fa12
go mod tidy
cyberphysic4l Sep 11, 2023
fa0e406
Merge branch 'develop' into feat/validation-scheduler
cyberphysic4l Sep 11, 2023
1a89898
go mod tidy
cyberphysic4l Sep 11, 2023
a4ba938
go mod tidy
cyberphysic4l Sep 11, 2023
6d303f3
add burning of Mana proportional to stake for over issuance
cyberphysic4l Sep 12, 2023
fee1bc8
increase validation blocks per slot in tests
cyberphysic4l Sep 12, 2023
2e539d5
fix bugs in account manager
cyberphysic4l Sep 13, 2023
81dade8
go mod tidy
cyberphysic4l Sep 13, 2023
8649ccb
Merge branch 'develop' into feat/validation-scheduler
cyberphysic4l Sep 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 80 additions & 29 deletions components/metrics/metrics_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,21 @@ import (
const (
schedulerNamespace = "scheduler"

queueSizePerNodeWork = "queue_size_per_node_work" //nolint:gosec
queueSizePerNodeCount = "queue_size_per_node_count"
schedulerProcessedBlocks = "processed_blocks"
manaAmountPerNode = "mana_per_node"
scheduledBlockLabel = "scheduled"
skippedBlockLabel = "skipped"
droppedBlockLabel = "dropped"
enqueuedBlockLabel = "enqueued"
bufferReadyBlockCount = "buffer_ready_block_total" //nolint:gosec
bufferTotalSize = "buffer_size_block_total"
bufferMaxSize = "buffer_max_size"
rate = "rate"
queueSizePerNodeWork = "queue_size_per_node_work" //nolint:gosec
queueSizePerNodeCount = "queue_size_per_node_count"
validatorQueueSizePerNodeCount = "validator_queue_size_per_node_count"
schedulerProcessedBlocks = "processed_blocks"
manaAmountPerNode = "mana_per_node"
scheduledBlockLabel = "scheduled"
skippedBlockLabel = "skipped"
droppedBlockLabel = "dropped"
enqueuedBlockLabel = "enqueued"
basicBufferReadyBlockCount = "buffer_ready_block_total" //nolint:gosec
basicBufferTotalSize = "buffer_size_block_total"
basicBufferMaxSize = "buffer_max_size"
rate = "rate"
validatorBufferTotalSize = "validator_buffer_size_block_total"
validatorQueueMaxSize = "validator_buffer_max_size"
)

var SchedulerMetrics = collector.NewCollection(schedulerNamespace,
Expand Down Expand Up @@ -55,31 +58,65 @@ var SchedulerMetrics = collector.NewCollection(schedulerNamespace,
}, event.WithWorkerPool(Component.WorkerPool))
}),
)),

collector.WithMetric(collector.NewMetric(queueSizePerNodeCount,
collector.WithType(collector.Gauge),
collector.WithLabels("issuer_id"),
collector.WithPruningDelay(10*time.Minute),
collector.WithHelp("Current size of each node's queue (as block count)."),
collector.WithInitFunc(func() {
deps.Protocol.Events.Engine.Scheduler.BlockEnqueued.Hook(func(block *blocks.Block) {
deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())

if _, isBasic := block.BasicBlock(); isBasic {
deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())
}
}, event.WithWorkerPool(Component.WorkerPool))

deps.Protocol.Events.Engine.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) {
deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())

if _, isBasic := block.BasicBlock(); isBasic {
deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())
}
}, event.WithWorkerPool(Component.WorkerPool))

deps.Protocol.Events.Engine.Scheduler.BlockDropped.Hook(func(block *blocks.Block, _ error) {
deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())

if _, isBasic := block.BasicBlock(); isBasic {
deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())
}
}, event.WithWorkerPool(Component.WorkerPool))

deps.Protocol.Events.Engine.Scheduler.BlockScheduled.Hook(func(block *blocks.Block) {
deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())
if _, isBasic := block.BasicBlock(); isBasic {
deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())
}
}, event.WithWorkerPool(Component.WorkerPool))
}),
)),
collector.WithMetric(collector.NewMetric(validatorQueueSizePerNodeCount,
collector.WithType(collector.Gauge),
collector.WithLabels("issuer_id"),
collector.WithPruningDelay(10*time.Minute),
collector.WithHelp("Current number of validation blocks in each validator's queue."),
collector.WithInitFunc(func() {
deps.Protocol.Events.Engine.Scheduler.BlockEnqueued.Hook(func(block *blocks.Block) {
if _, isValidation := block.ValidationBlock(); isValidation {
deps.Collector.Update(schedulerNamespace, validatorQueueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.ValidatorQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())
}
}, event.WithWorkerPool(Component.WorkerPool))

deps.Protocol.Events.Engine.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) {
if _, isValidation := block.ValidationBlock(); isValidation {
deps.Collector.Update(schedulerNamespace, validatorQueueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.ValidatorQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())
}
}, event.WithWorkerPool(Component.WorkerPool))

deps.Protocol.Events.Engine.Scheduler.BlockDropped.Hook(func(block *blocks.Block, _ error) {
if _, isValidation := block.ValidationBlock(); isValidation {
deps.Collector.Update(schedulerNamespace, validatorQueueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.ValidatorQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())
}
}, event.WithWorkerPool(Component.WorkerPool))

deps.Protocol.Events.Engine.Scheduler.BlockScheduled.Hook(func(block *blocks.Block) {
if _, isValidation := block.ValidationBlock(); isValidation {
deps.Collector.Update(schedulerNamespace, validatorQueueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.ValidatorQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())
}
}, event.WithWorkerPool(Component.WorkerPool))
}),
)),
Expand Down Expand Up @@ -127,32 +164,46 @@ var SchedulerMetrics = collector.NewCollection(schedulerNamespace,
}, event.WithWorkerPool(Component.WorkerPool))
}),
)),
collector.WithMetric(collector.NewMetric(bufferMaxSize,
collector.WithMetric(collector.NewMetric(basicBufferMaxSize,
collector.WithType(collector.Gauge),
collector.WithHelp("Maximum number of blocks that can be stored in the buffer."),
collector.WithHelp("Maximum number of basic blocks that can be stored in the buffer."),
collector.WithCollectFunc(func() (float64, []string) {
return float64(deps.Protocol.MainEngineInstance().Scheduler.MaxBufferSize()), []string{}
return float64(deps.Protocol.MainEngineInstance().CurrentAPI().ProtocolParameters().CongestionControlParameters().MaxBufferSize), []string{}
}),
)),
collector.WithMetric(collector.NewMetric(bufferReadyBlockCount,
collector.WithMetric(collector.NewMetric(basicBufferReadyBlockCount,
collector.WithType(collector.Gauge),
collector.WithHelp("Number of ready blocks in the scheduler buffer."),
collector.WithCollectFunc(func() (float64, []string) {
return float64(deps.Protocol.MainEngineInstance().Scheduler.ReadyBlocksCount()), []string{}
}),
)),
collector.WithMetric(collector.NewMetric(bufferTotalSize,
collector.WithMetric(collector.NewMetric(basicBufferTotalSize,
collector.WithType(collector.Gauge),
collector.WithHelp("Current size of the scheduler buffer (in bytes)."),
collector.WithHelp("Current number of basic blocks in the scheduler buffer."),
collector.WithCollectFunc(func() (float64, []string) {
return float64(deps.Protocol.MainEngineInstance().Scheduler.BufferSize()), []string{}
return float64(deps.Protocol.MainEngineInstance().Scheduler.BasicBufferSize()), []string{}
}),
)),
collector.WithMetric(collector.NewMetric(rate,
collector.WithType(collector.Gauge),
collector.WithHelp("Current rate of the scheduler."),
collector.WithHelp("Current scheduling rate of basic blocks."),
collector.WithCollectFunc(func() (float64, []string) {
return float64(deps.Protocol.MainEngineInstance().CurrentAPI().ProtocolParameters().CongestionControlParameters().SchedulerRate), []string{}
}),
)),
collector.WithMetric(collector.NewMetric(validatorBufferTotalSize,
collector.WithType(collector.Gauge),
collector.WithHelp("Current number of validation blocks in the scheduling buffer."),
collector.WithCollectFunc(func() (float64, []string) {
return float64(deps.Protocol.MainEngineInstance().Scheduler.ValidatorBufferSize()), []string{}
}),
)),
collector.WithMetric(collector.NewMetric(validatorQueueMaxSize,
collector.WithType(collector.Gauge),
collector.WithHelp("Maximum number of validation blocks that can be stored in each validator queue."),
collector.WithCollectFunc(func() (float64, []string) {
return float64(deps.Protocol.MainEngineInstance().Scheduler.Rate()), []string{}
return float64(deps.Protocol.MainEngineInstance().CurrentAPI().ProtocolParameters().CongestionControlParameters().MaxValidationBufferSize), []string{}
}),
)),
)
2 changes: 1 addition & 1 deletion components/validator/issuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,6 @@ func issueValidatorBlock(ctx context.Context) {
return
}

Component.LogDebug("Issued validator block: %s - commitment %s %d - latest finalized slot %d", modelBlock.ID(), modelBlock.ProtocolBlock().SlotCommitmentID, modelBlock.ProtocolBlock().SlotCommitmentID.Index(), modelBlock.ProtocolBlock().LatestFinalizedSlot)
Component.LogDebugf("Issued validator block: %s - commitment %s %d - latest finalized slot %d", modelBlock.ID(), modelBlock.ProtocolBlock().SlotCommitmentID, modelBlock.ProtocolBlock().SlotCommitmentID.Index(), modelBlock.ProtocolBlock().LatestFinalizedSlot)

}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/iotaledger/hive.go/stringify v0.0.0-20230906114834-b50190b9f9c2
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20230829161228-3f4eb50a4d14
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20230829160617-69b96c7c9f9b
github.com/iotaledger/iota.go/v4 v4.0.0-20230912141328-810f7e83d265
github.com/iotaledger/iota.go/v4 v4.0.0-20230913143616-917572c7752d
github.com/labstack/echo/v4 v4.11.1
github.com/labstack/gommon v0.4.0
github.com/libp2p/go-libp2p v0.30.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,8 @@ github.com/iotaledger/inx-app v1.0.0-rc.3.0.20230829161228-3f4eb50a4d14 h1:BkDuQ
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20230829161228-3f4eb50a4d14/go.mod h1:ADBXzdHXTldP0NB2Vf+KbhDxkYciGRjzQVXT6Rdne1g=
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20230829160617-69b96c7c9f9b h1:EPB/+iWeSx/WgJlzaXl8yjinxuD8CCOdi2ZPMLeeMVY=
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20230829160617-69b96c7c9f9b/go.mod h1:B7gyJP6GshCSlEmY3CxEk5TZdsMs3UNz5U92hkFDdMs=
github.com/iotaledger/iota.go/v4 v4.0.0-20230912141328-810f7e83d265 h1:0j8ljlBmo/f5Gxva83mLWqZLB/xSO9PgJFMPfJ7tyRY=
github.com/iotaledger/iota.go/v4 v4.0.0-20230912141328-810f7e83d265/go.mod h1:DWCa+mXRTGWBV0EHVuvToUxAEcICe2Pab9hBlxBamKo=
github.com/iotaledger/iota.go/v4 v4.0.0-20230913143616-917572c7752d h1:p9IchKq6kft758XDlnN/tAEXJMXGlmQPmbdxolba1gs=
github.com/iotaledger/iota.go/v4 v4.0.0-20230913143616-917572c7752d/go.mod h1:DWCa+mXRTGWBV0EHVuvToUxAEcICe2Pab9hBlxBamKo=
github.com/ipfs/boxo v0.10.0 h1:tdDAxq8jrsbRkYoF+5Rcqyeb91hgWe2hp7iLu7ORZLY=
github.com/ipfs/boxo v0.10.0/go.mod h1:Fg+BnfxZ0RPzR0nOodzdIq3A7KgoWAOWsEIImrIQdBM=
github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s=
Expand Down
26 changes: 25 additions & 1 deletion pkg/protocol/engine/accounts/accountsledger/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ func (m *Manager) Account(accountID iotago.AccountID, targetIndex iotago.SlotInd
m.mutex.RLock()
defer m.mutex.RUnlock()

return m.account(accountID, targetIndex)

}

func (m *Manager) account(accountID iotago.AccountID, targetIndex iotago.SlotIndex) (accountData *accounts.AccountData, exists bool, err error) {
// if m.latestCommittedSlot < maxCommittableAge we should have all history
maxCommittableAge := m.apiProvider.APIForSlot(targetIndex).ProtocolParameters().MaxCommittableAge()
if m.latestCommittedSlot >= maxCommittableAge && targetIndex+maxCommittableAge < m.latestCommittedSlot {
Expand Down Expand Up @@ -412,8 +417,9 @@ func (m *Manager) preserveDestroyedAccountData(accountID iotago.AccountID) (acco

func (m *Manager) computeBlockBurnsForSlot(slotIndex iotago.SlotIndex, rmc iotago.Mana) (burns map[iotago.AccountID]iotago.Mana, err error) {
burns = make(map[iotago.AccountID]iotago.Mana)
validationBlockCount := make(map[iotago.AccountID]int)
apiForSlot := m.apiProvider.APIForSlot(slotIndex)
if set, exists := m.blockBurns.Get(slotIndex); exists {
// Get RMC for this slot
for it := set.Iterator(); it.HasNext(); {
blockID := it.Next()
block, blockLoaded := m.block(blockID)
Expand All @@ -422,6 +428,24 @@ func (m *Manager) computeBlockBurnsForSlot(slotIndex iotago.SlotIndex, rmc iotag
}
if _, isBasicBlock := block.BasicBlock(); isBasicBlock {
burns[block.ProtocolBlock().IssuerID] += iotago.Mana(block.WorkScore()) * rmc
} else if _, isValidationBlock := block.ValidationBlock(); isValidationBlock {
validationBlockCount[block.ProtocolBlock().IssuerID]++
}
}
validationBlocksPerSlot := int(apiForSlot.ProtocolParameters().ValidationBlocksPerSlot())
for accountID, count := range validationBlockCount {
if count > validationBlocksPerSlot {
// penalize over-issuance
accountData, exists, err := m.account(accountID, m.latestCommittedSlot)
if !exists {
return nil, ierrors.Wrapf(err, "cannot compute penalty for over-issuing validator, account %s could not be retrieved", accountID)
}
punishmentEpochs := apiForSlot.ProtocolParameters().PunishmentEpochs()
manaPunishment, err := apiForSlot.ManaDecayProvider().ManaGenerationWithDecay(accountData.ValidatorStake, slotIndex, slotIndex+apiForSlot.TimeProvider().EpochDurationSlots()*iotago.SlotIndex(punishmentEpochs))
if err != nil {
return nil, ierrors.Wrapf(err, "cannot compute penalty for over-issuing validator with account ID %s due to problem with mana generation", accountID)
}
burns[accountID] += iotago.Mana(count-validationBlocksPerSlot) * manaPunishment
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func NewTestFramework(test *testing.T) *TestFramework {
testAPI := iotago.V3API(
iotago.NewV3ProtocolParameters(
iotago.WithNetworkOptions("TestJungle", "tgl"),
iotago.WithSupplyOptions(10000, 0, 0, 0, 0, 0),
iotago.WithSupplyOptions(10000, 0, 0, 0, 0, 0, 0),
iotago.WithLivenessOptions(1, 1, 2, 8),
),
)
Expand Down
56 changes: 37 additions & 19 deletions pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package drr
import (
"container/ring"
"math"
"time"

"github.com/iotaledger/hive.go/ds/shrinkingmap"
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/iota-core/pkg/protocol/engine/blocks"

iotago "github.com/iotaledger/iota.go/v4"
Expand All @@ -18,21 +20,24 @@ var ErrInsufficientMana = ierrors.New("insufficient issuer's mana to schedule th

// BufferQueue represents a buffer of IssuerQueue.
type BufferQueue struct {
// maxBuffer is the maximum buffer size in number of blocks.
maxBuffer int

activeIssuers *shrinkingmap.ShrinkingMap[iotago.AccountID, *ring.Ring]
ring *ring.Ring
// size is the number of blocks in the buffer.
size int

tokenBucket float64
lastScheduleTime time.Time

blockChan chan *blocks.Block
}

// NewBufferQueue returns a new BufferQueue.
func NewBufferQueue(maxBuffer int) *BufferQueue {
func NewBufferQueue() *BufferQueue {
return &BufferQueue{
maxBuffer: maxBuffer,
activeIssuers: shrinkingmap.New[iotago.AccountID, *ring.Ring](),
ring: nil,
activeIssuers: shrinkingmap.New[iotago.AccountID, *ring.Ring](),
ring: nil,
lastScheduleTime: time.Now(),
blockChan: make(chan *blocks.Block, 1),
}
}

Expand All @@ -41,11 +46,6 @@ func (b *BufferQueue) NumActiveIssuers() int {
return b.activeIssuers.Size()
}

// MaxSize returns the max number of blocks in BufferQueue.
func (b *BufferQueue) MaxSize() int {
return b.maxBuffer
}

// Size returns the total number of blocks in BufferQueue.
func (b *BufferQueue) Size() int {
return b.size
Expand Down Expand Up @@ -88,28 +88,28 @@ func (b *BufferQueue) GetIssuerQueue(issuerID iotago.AccountID) (*IssuerQueue, e

// Submit submits a block. Return blocks dropped from the scheduler to make room for the submitted block.
// The submitted block can also be returned as dropped if the issuer does not have enough mana.
func (b *BufferQueue) Submit(blk *blocks.Block, issuerQueue *IssuerQueue, quantumFunc func(iotago.AccountID) Deficit) (elements []*blocks.Block, err error) {
func (b *BufferQueue) Submit(blk *blocks.Block, issuerQueue *IssuerQueue, quantumFunc func(iotago.AccountID) Deficit, maxBuffer int) ([]*blocks.Block, bool) {

// first we submit the block, and if it turns out that the issuer doesn't have enough bandwidth to submit, it will be removed by dropTail
if !issuerQueue.Submit(blk) {
return nil, ierrors.Errorf("block already submitted %s", blk)
return nil, false
}

b.size++

// if max buffer size exceeded, drop from tail of the longest mana-scaled queue
if b.Size() > b.maxBuffer {
return b.dropTail(quantumFunc), nil
if b.Size() > maxBuffer {
return b.dropTail(quantumFunc, maxBuffer), true
}

return nil, nil
return nil, true
}

func (b *BufferQueue) dropTail(quantumFunc func(iotago.AccountID) Deficit) (droppedBlocks []*blocks.Block) {
func (b *BufferQueue) dropTail(quantumFunc func(iotago.AccountID) Deficit, maxBuffer int) (droppedBlocks []*blocks.Block) {
start := b.Current()
ringStart := b.ring
// remove as many blocks as necessary to stay within max buffer size
for b.Size() > b.maxBuffer {
for b.Size() > maxBuffer {
// TODO: extract to util func
// find longest mana-scaled queue
maxScale := math.Inf(-1)
Expand Down Expand Up @@ -320,4 +320,22 @@ func (b *BufferQueue) ringInsert(v interface{}) *ring.Ring {
return p.Link(b.ring)
}

func (b *BufferQueue) waitTime(rate float64, block *blocks.Block) time.Duration {
tokensRequired := float64(block.WorkScore()) - (b.tokenBucket + rate*time.Since(b.lastScheduleTime).Seconds())

return lo.Max(0, time.Duration(tokensRequired/rate))
}

func (b *BufferQueue) updateTokenBucket(rate float64, tokenBucketSize float64) {
b.tokenBucket = lo.Min(
tokenBucketSize,
b.tokenBucket+rate*time.Since(b.lastScheduleTime).Seconds(),
)
b.lastScheduleTime = time.Now()
}

func (b *BufferQueue) deductTokens(tokens float64) {
b.tokenBucket -= tokens
}

// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
Loading
Loading