Skip to content

Commit

Permalink
Merge pull request #319 from iotaledger/feat/validation-scheduler
Browse files Browse the repository at this point in the history
Validation block scheduler
  • Loading branch information
cyberphysic4l authored Sep 13, 2023
2 parents ca451e2 + 8649ccb commit 83057da
Show file tree
Hide file tree
Showing 21 changed files with 726 additions and 163 deletions.
109 changes: 80 additions & 29 deletions components/metrics/metrics_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,21 @@ import (
const (
schedulerNamespace = "scheduler"

queueSizePerNodeWork = "queue_size_per_node_work" //nolint:gosec
queueSizePerNodeCount = "queue_size_per_node_count"
schedulerProcessedBlocks = "processed_blocks"
manaAmountPerNode = "mana_per_node"
scheduledBlockLabel = "scheduled"
skippedBlockLabel = "skipped"
droppedBlockLabel = "dropped"
enqueuedBlockLabel = "enqueued"
bufferReadyBlockCount = "buffer_ready_block_total" //nolint:gosec
bufferTotalSize = "buffer_size_block_total"
bufferMaxSize = "buffer_max_size"
rate = "rate"
queueSizePerNodeWork = "queue_size_per_node_work" //nolint:gosec
queueSizePerNodeCount = "queue_size_per_node_count"
validatorQueueSizePerNodeCount = "validator_queue_size_per_node_count"
schedulerProcessedBlocks = "processed_blocks"
manaAmountPerNode = "mana_per_node"
scheduledBlockLabel = "scheduled"
skippedBlockLabel = "skipped"
droppedBlockLabel = "dropped"
enqueuedBlockLabel = "enqueued"
basicBufferReadyBlockCount = "buffer_ready_block_total" //nolint:gosec
basicBufferTotalSize = "buffer_size_block_total"
basicBufferMaxSize = "buffer_max_size"
rate = "rate"
validatorBufferTotalSize = "validator_buffer_size_block_total"
validatorQueueMaxSize = "validator_buffer_max_size"
)

var SchedulerMetrics = collector.NewCollection(schedulerNamespace,
Expand Down Expand Up @@ -55,31 +58,65 @@ var SchedulerMetrics = collector.NewCollection(schedulerNamespace,
}, event.WithWorkerPool(Component.WorkerPool))
}),
)),

collector.WithMetric(collector.NewMetric(queueSizePerNodeCount,
collector.WithType(collector.Gauge),
collector.WithLabels("issuer_id"),
collector.WithPruningDelay(10*time.Minute),
collector.WithHelp("Current size of each node's queue (as block count)."),
collector.WithInitFunc(func() {
deps.Protocol.Events.Engine.Scheduler.BlockEnqueued.Hook(func(block *blocks.Block) {
deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())

if _, isBasic := block.BasicBlock(); isBasic {
deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())
}
}, event.WithWorkerPool(Component.WorkerPool))

deps.Protocol.Events.Engine.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) {
deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())

if _, isBasic := block.BasicBlock(); isBasic {
deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())
}
}, event.WithWorkerPool(Component.WorkerPool))

deps.Protocol.Events.Engine.Scheduler.BlockDropped.Hook(func(block *blocks.Block, _ error) {
deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())

if _, isBasic := block.BasicBlock(); isBasic {
deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())
}
}, event.WithWorkerPool(Component.WorkerPool))

deps.Protocol.Events.Engine.Scheduler.BlockScheduled.Hook(func(block *blocks.Block) {
deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())
if _, isBasic := block.BasicBlock(); isBasic {
deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())
}
}, event.WithWorkerPool(Component.WorkerPool))
}),
)),
collector.WithMetric(collector.NewMetric(validatorQueueSizePerNodeCount,
collector.WithType(collector.Gauge),
collector.WithLabels("issuer_id"),
collector.WithPruningDelay(10*time.Minute),
collector.WithHelp("Current number of validation blocks in each validator's queue."),
collector.WithInitFunc(func() {
deps.Protocol.Events.Engine.Scheduler.BlockEnqueued.Hook(func(block *blocks.Block) {
if _, isValidation := block.ValidationBlock(); isValidation {
deps.Collector.Update(schedulerNamespace, validatorQueueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.ValidatorQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())
}
}, event.WithWorkerPool(Component.WorkerPool))

deps.Protocol.Events.Engine.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) {
if _, isValidation := block.ValidationBlock(); isValidation {
deps.Collector.Update(schedulerNamespace, validatorQueueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.ValidatorQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())
}
}, event.WithWorkerPool(Component.WorkerPool))

deps.Protocol.Events.Engine.Scheduler.BlockDropped.Hook(func(block *blocks.Block, _ error) {
if _, isValidation := block.ValidationBlock(); isValidation {
deps.Collector.Update(schedulerNamespace, validatorQueueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.ValidatorQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())
}
}, event.WithWorkerPool(Component.WorkerPool))

deps.Protocol.Events.Engine.Scheduler.BlockScheduled.Hook(func(block *blocks.Block) {
if _, isValidation := block.ValidationBlock(); isValidation {
deps.Collector.Update(schedulerNamespace, validatorQueueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.ValidatorQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())
}
}, event.WithWorkerPool(Component.WorkerPool))
}),
)),
Expand Down Expand Up @@ -127,32 +164,46 @@ var SchedulerMetrics = collector.NewCollection(schedulerNamespace,
}, event.WithWorkerPool(Component.WorkerPool))
}),
)),
collector.WithMetric(collector.NewMetric(bufferMaxSize,
collector.WithMetric(collector.NewMetric(basicBufferMaxSize,
collector.WithType(collector.Gauge),
collector.WithHelp("Maximum number of blocks that can be stored in the buffer."),
collector.WithHelp("Maximum number of basic blocks that can be stored in the buffer."),
collector.WithCollectFunc(func() (float64, []string) {
return float64(deps.Protocol.MainEngineInstance().Scheduler.MaxBufferSize()), []string{}
return float64(deps.Protocol.MainEngineInstance().CurrentAPI().ProtocolParameters().CongestionControlParameters().MaxBufferSize), []string{}
}),
)),
collector.WithMetric(collector.NewMetric(bufferReadyBlockCount,
collector.WithMetric(collector.NewMetric(basicBufferReadyBlockCount,
collector.WithType(collector.Gauge),
collector.WithHelp("Number of ready blocks in the scheduler buffer."),
collector.WithCollectFunc(func() (float64, []string) {
return float64(deps.Protocol.MainEngineInstance().Scheduler.ReadyBlocksCount()), []string{}
}),
)),
collector.WithMetric(collector.NewMetric(bufferTotalSize,
collector.WithMetric(collector.NewMetric(basicBufferTotalSize,
collector.WithType(collector.Gauge),
collector.WithHelp("Current size of the scheduler buffer (in bytes)."),
collector.WithHelp("Current number of basic blocks in the scheduler buffer."),
collector.WithCollectFunc(func() (float64, []string) {
return float64(deps.Protocol.MainEngineInstance().Scheduler.BufferSize()), []string{}
return float64(deps.Protocol.MainEngineInstance().Scheduler.BasicBufferSize()), []string{}
}),
)),
collector.WithMetric(collector.NewMetric(rate,
collector.WithType(collector.Gauge),
collector.WithHelp("Current rate of the scheduler."),
collector.WithHelp("Current scheduling rate of basic blocks."),
collector.WithCollectFunc(func() (float64, []string) {
return float64(deps.Protocol.MainEngineInstance().CurrentAPI().ProtocolParameters().CongestionControlParameters().SchedulerRate), []string{}
}),
)),
collector.WithMetric(collector.NewMetric(validatorBufferTotalSize,
collector.WithType(collector.Gauge),
collector.WithHelp("Current number of validation blocks in the scheduling buffer."),
collector.WithCollectFunc(func() (float64, []string) {
return float64(deps.Protocol.MainEngineInstance().Scheduler.ValidatorBufferSize()), []string{}
}),
)),
collector.WithMetric(collector.NewMetric(validatorQueueMaxSize,
collector.WithType(collector.Gauge),
collector.WithHelp("Maximum number of validation blocks that can be stored in each validator queue."),
collector.WithCollectFunc(func() (float64, []string) {
return float64(deps.Protocol.MainEngineInstance().Scheduler.Rate()), []string{}
return float64(deps.Protocol.MainEngineInstance().CurrentAPI().ProtocolParameters().CongestionControlParameters().MaxValidationBufferSize), []string{}
}),
)),
)
2 changes: 1 addition & 1 deletion components/validator/issuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,6 @@ func issueValidatorBlock(ctx context.Context) {
return
}

Component.LogDebug("Issued validator block: %s - commitment %s %d - latest finalized slot %d", modelBlock.ID(), modelBlock.ProtocolBlock().SlotCommitmentID, modelBlock.ProtocolBlock().SlotCommitmentID.Index(), modelBlock.ProtocolBlock().LatestFinalizedSlot)
Component.LogDebugf("Issued validator block: %s - commitment %s %d - latest finalized slot %d", modelBlock.ID(), modelBlock.ProtocolBlock().SlotCommitmentID, modelBlock.ProtocolBlock().SlotCommitmentID.Index(), modelBlock.ProtocolBlock().LatestFinalizedSlot)

}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/iotaledger/hive.go/stringify v0.0.0-20230906114834-b50190b9f9c2
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20230829161228-3f4eb50a4d14
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20230829160617-69b96c7c9f9b
github.com/iotaledger/iota.go/v4 v4.0.0-20230912141328-810f7e83d265
github.com/iotaledger/iota.go/v4 v4.0.0-20230913143616-917572c7752d
github.com/labstack/echo/v4 v4.11.1
github.com/labstack/gommon v0.4.0
github.com/libp2p/go-libp2p v0.30.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,8 @@ github.com/iotaledger/inx-app v1.0.0-rc.3.0.20230829161228-3f4eb50a4d14 h1:BkDuQ
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20230829161228-3f4eb50a4d14/go.mod h1:ADBXzdHXTldP0NB2Vf+KbhDxkYciGRjzQVXT6Rdne1g=
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20230829160617-69b96c7c9f9b h1:EPB/+iWeSx/WgJlzaXl8yjinxuD8CCOdi2ZPMLeeMVY=
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20230829160617-69b96c7c9f9b/go.mod h1:B7gyJP6GshCSlEmY3CxEk5TZdsMs3UNz5U92hkFDdMs=
github.com/iotaledger/iota.go/v4 v4.0.0-20230912141328-810f7e83d265 h1:0j8ljlBmo/f5Gxva83mLWqZLB/xSO9PgJFMPfJ7tyRY=
github.com/iotaledger/iota.go/v4 v4.0.0-20230912141328-810f7e83d265/go.mod h1:DWCa+mXRTGWBV0EHVuvToUxAEcICe2Pab9hBlxBamKo=
github.com/iotaledger/iota.go/v4 v4.0.0-20230913143616-917572c7752d h1:p9IchKq6kft758XDlnN/tAEXJMXGlmQPmbdxolba1gs=
github.com/iotaledger/iota.go/v4 v4.0.0-20230913143616-917572c7752d/go.mod h1:DWCa+mXRTGWBV0EHVuvToUxAEcICe2Pab9hBlxBamKo=
github.com/ipfs/boxo v0.10.0 h1:tdDAxq8jrsbRkYoF+5Rcqyeb91hgWe2hp7iLu7ORZLY=
github.com/ipfs/boxo v0.10.0/go.mod h1:Fg+BnfxZ0RPzR0nOodzdIq3A7KgoWAOWsEIImrIQdBM=
github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s=
Expand Down
26 changes: 25 additions & 1 deletion pkg/protocol/engine/accounts/accountsledger/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ func (m *Manager) Account(accountID iotago.AccountID, targetIndex iotago.SlotInd
m.mutex.RLock()
defer m.mutex.RUnlock()

return m.account(accountID, targetIndex)

}

func (m *Manager) account(accountID iotago.AccountID, targetIndex iotago.SlotIndex) (accountData *accounts.AccountData, exists bool, err error) {
// if m.latestCommittedSlot < maxCommittableAge we should have all history
maxCommittableAge := m.apiProvider.APIForSlot(targetIndex).ProtocolParameters().MaxCommittableAge()
if m.latestCommittedSlot >= maxCommittableAge && targetIndex+maxCommittableAge < m.latestCommittedSlot {
Expand Down Expand Up @@ -412,8 +417,9 @@ func (m *Manager) preserveDestroyedAccountData(accountID iotago.AccountID) (acco

func (m *Manager) computeBlockBurnsForSlot(slotIndex iotago.SlotIndex, rmc iotago.Mana) (burns map[iotago.AccountID]iotago.Mana, err error) {
burns = make(map[iotago.AccountID]iotago.Mana)
validationBlockCount := make(map[iotago.AccountID]int)
apiForSlot := m.apiProvider.APIForSlot(slotIndex)
if set, exists := m.blockBurns.Get(slotIndex); exists {
// Get RMC for this slot
for it := set.Iterator(); it.HasNext(); {
blockID := it.Next()
block, blockLoaded := m.block(blockID)
Expand All @@ -422,6 +428,24 @@ func (m *Manager) computeBlockBurnsForSlot(slotIndex iotago.SlotIndex, rmc iotag
}
if _, isBasicBlock := block.BasicBlock(); isBasicBlock {
burns[block.ProtocolBlock().IssuerID] += iotago.Mana(block.WorkScore()) * rmc
} else if _, isValidationBlock := block.ValidationBlock(); isValidationBlock {
validationBlockCount[block.ProtocolBlock().IssuerID]++
}
}
validationBlocksPerSlot := int(apiForSlot.ProtocolParameters().ValidationBlocksPerSlot())
for accountID, count := range validationBlockCount {
if count > validationBlocksPerSlot {
// penalize over-issuance
accountData, exists, err := m.account(accountID, m.latestCommittedSlot)
if !exists {
return nil, ierrors.Wrapf(err, "cannot compute penalty for over-issuing validator, account %s could not be retrieved", accountID)
}
punishmentEpochs := apiForSlot.ProtocolParameters().PunishmentEpochs()
manaPunishment, err := apiForSlot.ManaDecayProvider().ManaGenerationWithDecay(accountData.ValidatorStake, slotIndex, slotIndex+apiForSlot.TimeProvider().EpochDurationSlots()*iotago.SlotIndex(punishmentEpochs))
if err != nil {
return nil, ierrors.Wrapf(err, "cannot compute penalty for over-issuing validator with account ID %s due to problem with mana generation", accountID)
}
burns[accountID] += iotago.Mana(count-validationBlocksPerSlot) * manaPunishment
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func NewTestFramework(test *testing.T) *TestFramework {
testAPI := iotago.V3API(
iotago.NewV3ProtocolParameters(
iotago.WithNetworkOptions("TestJungle", "tgl"),
iotago.WithSupplyOptions(10000, 0, 0, 0, 0, 0),
iotago.WithSupplyOptions(10000, 0, 0, 0, 0, 0, 0),
iotago.WithLivenessOptions(1, 1, 2, 8),
),
)
Expand Down
56 changes: 37 additions & 19 deletions pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package drr
import (
"container/ring"
"math"
"time"

"github.com/iotaledger/hive.go/ds/shrinkingmap"
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/iota-core/pkg/protocol/engine/blocks"

iotago "github.com/iotaledger/iota.go/v4"
Expand All @@ -18,21 +20,24 @@ var ErrInsufficientMana = ierrors.New("insufficient issuer's mana to schedule th

// BufferQueue represents a buffer of IssuerQueue.
type BufferQueue struct {
// maxBuffer is the maximum buffer size in number of blocks.
maxBuffer int

activeIssuers *shrinkingmap.ShrinkingMap[iotago.AccountID, *ring.Ring]
ring *ring.Ring
// size is the number of blocks in the buffer.
size int

tokenBucket float64
lastScheduleTime time.Time

blockChan chan *blocks.Block
}

// NewBufferQueue returns a new BufferQueue.
func NewBufferQueue(maxBuffer int) *BufferQueue {
func NewBufferQueue() *BufferQueue {
return &BufferQueue{
maxBuffer: maxBuffer,
activeIssuers: shrinkingmap.New[iotago.AccountID, *ring.Ring](),
ring: nil,
activeIssuers: shrinkingmap.New[iotago.AccountID, *ring.Ring](),
ring: nil,
lastScheduleTime: time.Now(),
blockChan: make(chan *blocks.Block, 1),
}
}

Expand All @@ -41,11 +46,6 @@ func (b *BufferQueue) NumActiveIssuers() int {
return b.activeIssuers.Size()
}

// MaxSize returns the max number of blocks in BufferQueue.
func (b *BufferQueue) MaxSize() int {
return b.maxBuffer
}

// Size returns the total number of blocks in BufferQueue.
func (b *BufferQueue) Size() int {
return b.size
Expand Down Expand Up @@ -88,28 +88,28 @@ func (b *BufferQueue) GetIssuerQueue(issuerID iotago.AccountID) (*IssuerQueue, e

// Submit submits a block. Return blocks dropped from the scheduler to make room for the submitted block.
// The submitted block can also be returned as dropped if the issuer does not have enough mana.
func (b *BufferQueue) Submit(blk *blocks.Block, issuerQueue *IssuerQueue, quantumFunc func(iotago.AccountID) Deficit) (elements []*blocks.Block, err error) {
func (b *BufferQueue) Submit(blk *blocks.Block, issuerQueue *IssuerQueue, quantumFunc func(iotago.AccountID) Deficit, maxBuffer int) ([]*blocks.Block, bool) {

// first we submit the block, and if it turns out that the issuer doesn't have enough bandwidth to submit, it will be removed by dropTail
if !issuerQueue.Submit(blk) {
return nil, ierrors.Errorf("block already submitted %s", blk)
return nil, false
}

b.size++

// if max buffer size exceeded, drop from tail of the longest mana-scaled queue
if b.Size() > b.maxBuffer {
return b.dropTail(quantumFunc), nil
if b.Size() > maxBuffer {
return b.dropTail(quantumFunc, maxBuffer), true
}

return nil, nil
return nil, true
}

func (b *BufferQueue) dropTail(quantumFunc func(iotago.AccountID) Deficit) (droppedBlocks []*blocks.Block) {
func (b *BufferQueue) dropTail(quantumFunc func(iotago.AccountID) Deficit, maxBuffer int) (droppedBlocks []*blocks.Block) {
start := b.Current()
ringStart := b.ring
// remove as many blocks as necessary to stay within max buffer size
for b.Size() > b.maxBuffer {
for b.Size() > maxBuffer {
// TODO: extract to util func
// find longest mana-scaled queue
maxScale := math.Inf(-1)
Expand Down Expand Up @@ -320,4 +320,22 @@ func (b *BufferQueue) ringInsert(v interface{}) *ring.Ring {
return p.Link(b.ring)
}

func (b *BufferQueue) waitTime(rate float64, block *blocks.Block) time.Duration {
tokensRequired := float64(block.WorkScore()) - (b.tokenBucket + rate*time.Since(b.lastScheduleTime).Seconds())

return lo.Max(0, time.Duration(tokensRequired/rate))
}

func (b *BufferQueue) updateTokenBucket(rate float64, tokenBucketSize float64) {
b.tokenBucket = lo.Min(
tokenBucketSize,
b.tokenBucket+rate*time.Since(b.lastScheduleTime).Seconds(),
)
b.lastScheduleTime = time.Now()
}

func (b *BufferQueue) deductTokens(tokens float64) {
b.tokenBucket -= tokens
}

// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
Loading

0 comments on commit 83057da

Please sign in to comment.