Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validation block scheduler #319

Merged
merged 28 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
e5aeb47
start implementing validation block scheduler
cyberphysic4l Aug 31, 2023
cb2c531
switch to simplified validation scheduler
cyberphysic4l Sep 6, 2023
084c422
complete simplified scheduler
cyberphysic4l Sep 6, 2023
9f43ad2
good doggy
cyberphysic4l Sep 6, 2023
87d6cbf
shutdown correctly
cyberphysic4l Sep 6, 2023
991b933
Merge branch 'develop' into feat/validation-scheduler
cyberphysic4l Sep 6, 2023
9a7c072
race debugging
cyberphysic4l Sep 6, 2023
e31a4f5
race debugging
cyberphysic4l Sep 6, 2023
8729aed
charge maxBlockWork for over-issued validation blocks
cyberphysic4l Sep 7, 2023
e128af6
Merge branch 'develop' into feat/validation-scheduler
cyberphysic4l Sep 7, 2023
c7453b6
go mod tidy
cyberphysic4l Sep 7, 2023
e50578d
go mod tidy
cyberphysic4l Sep 7, 2023
9ae3447
defer penalizing over-issuer validators to issue #338
cyberphysic4l Sep 7, 2023
a9ff82f
Update to new protocol parameters
jkrvivian Sep 6, 2023
4bf1417
Update iota.go version
jkrvivian Sep 8, 2023
64dec2d
move committee check for validator to the filter
cyberphysic4l Sep 9, 2023
b21e9f2
add buffer drop policy to validator queues
cyberphysic4l Sep 9, 2023
91f7132
update presets with buffer params
cyberphysic4l Sep 9, 2023
f53f638
Add validator buffer metrics
cyberphysic4l Sep 11, 2023
2b5fa12
go mod tidy
cyberphysic4l Sep 11, 2023
fa0e406
Merge branch 'develop' into feat/validation-scheduler
cyberphysic4l Sep 11, 2023
1a89898
go mod tidy
cyberphysic4l Sep 11, 2023
a4ba938
go mod tidy
cyberphysic4l Sep 11, 2023
6d303f3
add burning of Mana proportional to stake for over issuance
cyberphysic4l Sep 12, 2023
fee1bc8
increase validation blocks per slot in tests
cyberphysic4l Sep 12, 2023
2e539d5
fix bugs in account manager
cyberphysic4l Sep 13, 2023
81dade8
go mod tidy
cyberphysic4l Sep 13, 2023
8649ccb
Merge branch 'develop' into feat/validation-scheduler
cyberphysic4l Sep 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 80 additions & 29 deletions components/metrics/metrics_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,21 @@ import (
const (
schedulerNamespace = "scheduler"

queueSizePerNodeWork = "queue_size_per_node_work" //nolint:gosec
queueSizePerNodeCount = "queue_size_per_node_count"
schedulerProcessedBlocks = "processed_blocks"
manaAmountPerNode = "mana_per_node"
scheduledBlockLabel = "scheduled"
skippedBlockLabel = "skipped"
droppedBlockLabel = "dropped"
enqueuedBlockLabel = "enqueued"
bufferReadyBlockCount = "buffer_ready_block_total" //nolint:gosec
bufferTotalSize = "buffer_size_block_total"
bufferMaxSize = "buffer_max_size"
rate = "rate"
queueSizePerNodeWork = "queue_size_per_node_work" //nolint:gosec
queueSizePerNodeCount = "queue_size_per_node_count"
validatorQueueSizePerNodeCount = "validator_queue_size_per_node_count"
schedulerProcessedBlocks = "processed_blocks"
manaAmountPerNode = "mana_per_node"
scheduledBlockLabel = "scheduled"
skippedBlockLabel = "skipped"
droppedBlockLabel = "dropped"
enqueuedBlockLabel = "enqueued"
basicBufferReadyBlockCount = "buffer_ready_block_total" //nolint:gosec
basicBufferTotalSize = "buffer_size_block_total"
basicBufferMaxSize = "buffer_max_size"
rate = "rate"
validatorBufferTotalSize = "validator_buffer_size_block_total"
validatorQueueMaxSize = "validator_buffer_max_size"
)

var SchedulerMetrics = collector.NewCollection(schedulerNamespace,
Expand Down Expand Up @@ -55,31 +58,65 @@ var SchedulerMetrics = collector.NewCollection(schedulerNamespace,
}, event.WithWorkerPool(Component.WorkerPool))
}),
)),

collector.WithMetric(collector.NewMetric(queueSizePerNodeCount,
collector.WithType(collector.Gauge),
collector.WithLabels("issuer_id"),
collector.WithPruningDelay(10*time.Minute),
collector.WithHelp("Current size of each node's queue (as block count)."),
collector.WithInitFunc(func() {
deps.Protocol.Events.Engine.Scheduler.BlockEnqueued.Hook(func(block *blocks.Block) {
deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())

if _, isBasic := block.BasicBlock(); isBasic {
deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())
}
}, event.WithWorkerPool(Component.WorkerPool))

deps.Protocol.Events.Engine.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) {
deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())

if _, isBasic := block.BasicBlock(); isBasic {
deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())
}
}, event.WithWorkerPool(Component.WorkerPool))

deps.Protocol.Events.Engine.Scheduler.BlockDropped.Hook(func(block *blocks.Block, _ error) {
deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())

if _, isBasic := block.BasicBlock(); isBasic {
deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())
}
}, event.WithWorkerPool(Component.WorkerPool))

deps.Protocol.Events.Engine.Scheduler.BlockScheduled.Hook(func(block *blocks.Block) {
deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())
if _, isBasic := block.BasicBlock(); isBasic {
deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())
}
}, event.WithWorkerPool(Component.WorkerPool))
}),
)),
collector.WithMetric(collector.NewMetric(validatorQueueSizePerNodeCount,
collector.WithType(collector.Gauge),
collector.WithLabels("issuer_id"),
collector.WithPruningDelay(10*time.Minute),
collector.WithHelp("Current number of validation blocks in each validator's queue."),
collector.WithInitFunc(func() {
deps.Protocol.Events.Engine.Scheduler.BlockEnqueued.Hook(func(block *blocks.Block) {
if _, isValidation := block.ValidationBlock(); isValidation {
deps.Collector.Update(schedulerNamespace, validatorQueueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.ValidatorQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())
}
}, event.WithWorkerPool(Component.WorkerPool))

deps.Protocol.Events.Engine.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) {
if _, isValidation := block.ValidationBlock(); isValidation {
deps.Collector.Update(schedulerNamespace, validatorQueueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.ValidatorQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())
}
}, event.WithWorkerPool(Component.WorkerPool))

deps.Protocol.Events.Engine.Scheduler.BlockDropped.Hook(func(block *blocks.Block, _ error) {
if _, isValidation := block.ValidationBlock(); isValidation {
deps.Collector.Update(schedulerNamespace, validatorQueueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.ValidatorQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())
}
}, event.WithWorkerPool(Component.WorkerPool))

deps.Protocol.Events.Engine.Scheduler.BlockScheduled.Hook(func(block *blocks.Block) {
if _, isValidation := block.ValidationBlock(); isValidation {
deps.Collector.Update(schedulerNamespace, validatorQueueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.ValidatorQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String())
}
}, event.WithWorkerPool(Component.WorkerPool))
}),
)),
Expand Down Expand Up @@ -127,32 +164,46 @@ var SchedulerMetrics = collector.NewCollection(schedulerNamespace,
}, event.WithWorkerPool(Component.WorkerPool))
}),
)),
collector.WithMetric(collector.NewMetric(bufferMaxSize,
collector.WithMetric(collector.NewMetric(basicBufferMaxSize,
collector.WithType(collector.Gauge),
collector.WithHelp("Maximum number of blocks that can be stored in the buffer."),
collector.WithHelp("Maximum number of basic blocks that can be stored in the buffer."),
collector.WithCollectFunc(func() (float64, []string) {
return float64(deps.Protocol.MainEngineInstance().Scheduler.MaxBufferSize()), []string{}
return float64(deps.Protocol.MainEngineInstance().CurrentAPI().ProtocolParameters().CongestionControlParameters().MaxBufferSize), []string{}
}),
)),
collector.WithMetric(collector.NewMetric(bufferReadyBlockCount,
collector.WithMetric(collector.NewMetric(basicBufferReadyBlockCount,
collector.WithType(collector.Gauge),
collector.WithHelp("Number of ready blocks in the scheduler buffer."),
collector.WithCollectFunc(func() (float64, []string) {
return float64(deps.Protocol.MainEngineInstance().Scheduler.ReadyBlocksCount()), []string{}
}),
)),
collector.WithMetric(collector.NewMetric(bufferTotalSize,
collector.WithMetric(collector.NewMetric(basicBufferTotalSize,
collector.WithType(collector.Gauge),
collector.WithHelp("Current size of the scheduler buffer (in bytes)."),
collector.WithHelp("Current number of basic blocks in the scheduler buffer."),
collector.WithCollectFunc(func() (float64, []string) {
return float64(deps.Protocol.MainEngineInstance().Scheduler.BufferSize()), []string{}
return float64(deps.Protocol.MainEngineInstance().Scheduler.BasicBufferSize()), []string{}
}),
)),
collector.WithMetric(collector.NewMetric(rate,
collector.WithType(collector.Gauge),
collector.WithHelp("Current rate of the scheduler."),
collector.WithHelp("Current scheduling rate of basic blocks."),
collector.WithCollectFunc(func() (float64, []string) {
return float64(deps.Protocol.MainEngineInstance().CurrentAPI().ProtocolParameters().CongestionControlParameters().SchedulerRate), []string{}
}),
)),
collector.WithMetric(collector.NewMetric(validatorBufferTotalSize,
collector.WithType(collector.Gauge),
collector.WithHelp("Current number of validation blocks in the scheduling buffer."),
collector.WithCollectFunc(func() (float64, []string) {
return float64(deps.Protocol.MainEngineInstance().Scheduler.ValidatorBufferSize()), []string{}
}),
)),
collector.WithMetric(collector.NewMetric(validatorQueueMaxSize,
collector.WithType(collector.Gauge),
collector.WithHelp("Maximum number of validation blocks that can be stored in each validator queue."),
collector.WithCollectFunc(func() (float64, []string) {
return float64(deps.Protocol.MainEngineInstance().Scheduler.Rate()), []string{}
return float64(deps.Protocol.MainEngineInstance().CurrentAPI().ProtocolParameters().CongestionControlParameters().MaxValidationBufferSize), []string{}
}),
)),
)
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/iotaledger/hive.go/stringify v0.0.0-20230906114834-b50190b9f9c2
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20230829161228-3f4eb50a4d14
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20230829160617-69b96c7c9f9b
github.com/iotaledger/iota.go/v4 v4.0.0-20230908070236-ae553965e1a3
github.com/iotaledger/iota.go/v4 v4.0.0-20230911163626-ec5833cb0094
github.com/labstack/echo/v4 v4.11.1
github.com/labstack/gommon v0.4.0
github.com/libp2p/go-libp2p v0.30.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,8 @@ github.com/iotaledger/inx-app v1.0.0-rc.3.0.20230829161228-3f4eb50a4d14 h1:BkDuQ
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20230829161228-3f4eb50a4d14/go.mod h1:ADBXzdHXTldP0NB2Vf+KbhDxkYciGRjzQVXT6Rdne1g=
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20230829160617-69b96c7c9f9b h1:EPB/+iWeSx/WgJlzaXl8yjinxuD8CCOdi2ZPMLeeMVY=
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20230829160617-69b96c7c9f9b/go.mod h1:B7gyJP6GshCSlEmY3CxEk5TZdsMs3UNz5U92hkFDdMs=
github.com/iotaledger/iota.go/v4 v4.0.0-20230908070236-ae553965e1a3 h1:VnS5NOlgtmL9bEXHRKD9oYOzMSKYg8c9LiOUbd/CHN0=
github.com/iotaledger/iota.go/v4 v4.0.0-20230908070236-ae553965e1a3/go.mod h1:MM3RLtTEsfT6Wh0EhpgmzVO/HM0/NOw+E7+mnGTnyA0=
github.com/iotaledger/iota.go/v4 v4.0.0-20230911163626-ec5833cb0094 h1:zd3ar9F3j/LJF1fedOdSUc4X876gJCFy1zMFa9yg1X8=
github.com/iotaledger/iota.go/v4 v4.0.0-20230911163626-ec5833cb0094/go.mod h1:MM3RLtTEsfT6Wh0EhpgmzVO/HM0/NOw+E7+mnGTnyA0=
github.com/ipfs/boxo v0.10.0 h1:tdDAxq8jrsbRkYoF+5Rcqyeb91hgWe2hp7iLu7ORZLY=
github.com/ipfs/boxo v0.10.0/go.mod h1:Fg+BnfxZ0RPzR0nOodzdIq3A7KgoWAOWsEIImrIQdBM=
github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s=
Expand Down
34 changes: 20 additions & 14 deletions pkg/protocol/engine/accounts/accountsledger/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,8 @@ func (m *Manager) preserveDestroyedAccountData(accountID iotago.AccountID) (acco

func (m *Manager) computeBlockBurnsForSlot(slotIndex iotago.SlotIndex, rmc iotago.Mana) (burns map[iotago.AccountID]iotago.Mana, err error) {
burns = make(map[iotago.AccountID]iotago.Mana)
validationBlockCount := make(map[iotago.AccountID]int)
apiForSlot := m.apiProvider.APIForSlot(slotIndex)
if set, exists := m.blockBurns.Get(slotIndex); exists {
for it := set.Iterator(); it.HasNext(); {
blockID := it.Next()
Expand All @@ -421,22 +423,26 @@ func (m *Manager) computeBlockBurnsForSlot(slotIndex iotago.SlotIndex, rmc iotag
}
if _, isBasicBlock := block.BasicBlock(); isBasicBlock {
burns[block.ProtocolBlock().IssuerID] += iotago.Mana(block.WorkScore()) * rmc
} else if _, isValidationBlock := block.ValidationBlock(); isValidationBlock {
validationBlockCount[block.ProtocolBlock().IssuerID]++
}
}
validationBlocksPerSlot := int(apiForSlot.ProtocolParameters().ValidationBlocksPerSlot())
for accountID, count := range validationBlockCount {
if count > validationBlocksPerSlot {
// penalize over-issuance
accountData, exists, err := m.Account(accountID, slotIndex)
if !exists {
return nil, ierrors.Wrapf(err, "cannot compute penalty for over-issuing validator, account %s could not be retrieved", accountID)
}
punishmentEpochs := apiForSlot.ProtocolParameters().PunishmentEpochs()
manaPunishment, err := apiForSlot.ManaDecayProvider().ManaGenerationWithDecay(accountData.ValidatorStake, slotIndex, slotIndex+apiForSlot.TimeProvider().EpochDurationSlots()*iotago.SlotIndex(punishmentEpochs))
if err != nil {
return nil, ierrors.Wrapf(err, "cannot compute penalty for over-issuing validator with account ID %s due to problem with mana generation", accountID)
}
burns[accountID] += iotago.Mana(count-validationBlocksPerSlot) * manaPunishment
}
}

// TODO: issue #338 enable this block of code and fix the tests to issue correct rate of validation blocks.
// validationBlockCount := make(map[iotago.AccountID]int)
// else if _, isValidationBlock := block.ValidationBlock(); isValidationBlock {
// validationBlockCount[block.ProtocolBlock().IssuerID]++
// }
// }
// validationBlocksPerSlot := int(m.apiProvider.APIForSlot(slotIndex).ProtocolParameters().ValidationBlocksPerSlot())
// for accountID, count := range validationBlockCount {
// if count > validationBlocksPerSlot {
// // penalize over-issuance by charging for a maximum work score block for each validation block over the quota
// burns[accountID] += iotago.Mana(count-validationBlocksPerSlot) * iotago.Mana(m.apiProvider.CurrentAPI().MaxBlockWork()) * rmc
// }
// }
}

return burns, nil
Expand Down
25 changes: 8 additions & 17 deletions pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ var ErrInsufficientMana = ierrors.New("insufficient issuer's mana to schedule th

// BufferQueue represents a buffer of IssuerQueue.
type BufferQueue struct {
// maxBuffer is the maximum buffer size in number of blocks.
maxBuffer int

activeIssuers *shrinkingmap.ShrinkingMap[iotago.AccountID, *ring.Ring]
ring *ring.Ring
// size is the number of blocks in the buffer.
Expand All @@ -35,9 +32,8 @@ type BufferQueue struct {
}

// NewBufferQueue returns a new BufferQueue.
func NewBufferQueue(maxBuffer int) *BufferQueue {
func NewBufferQueue() *BufferQueue {
return &BufferQueue{
maxBuffer: maxBuffer,
activeIssuers: shrinkingmap.New[iotago.AccountID, *ring.Ring](),
ring: nil,
lastScheduleTime: time.Now(),
Expand All @@ -50,11 +46,6 @@ func (b *BufferQueue) NumActiveIssuers() int {
return b.activeIssuers.Size()
}

// MaxSize returns the max number of blocks in BufferQueue.
func (b *BufferQueue) MaxSize() int {
return b.maxBuffer
}

// Size returns the total number of blocks in BufferQueue.
func (b *BufferQueue) Size() int {
return b.size
Expand Down Expand Up @@ -97,28 +88,28 @@ func (b *BufferQueue) GetIssuerQueue(issuerID iotago.AccountID) (*IssuerQueue, e

// Submit submits a block. Return blocks dropped from the scheduler to make room for the submitted block.
// The submitted block can also be returned as dropped if the issuer does not have enough mana.
func (b *BufferQueue) Submit(blk *blocks.Block, issuerQueue *IssuerQueue, quantumFunc func(iotago.AccountID) Deficit) (elements []*blocks.Block, err error) {
func (b *BufferQueue) Submit(blk *blocks.Block, issuerQueue *IssuerQueue, quantumFunc func(iotago.AccountID) Deficit, maxBuffer int) ([]*blocks.Block, bool) {

// first we submit the block, and if it turns out that the issuer doesn't have enough bandwidth to submit, it will be removed by dropTail
if !issuerQueue.Submit(blk) {
return nil, ierrors.Errorf("block already submitted %s", blk)
return nil, false
}

b.size++

// if max buffer size exceeded, drop from tail of the longest mana-scaled queue
if b.Size() > b.maxBuffer {
return b.dropTail(quantumFunc), nil
if b.Size() > maxBuffer {
return b.dropTail(quantumFunc, maxBuffer), true
}

return nil, nil
return nil, true
}

func (b *BufferQueue) dropTail(quantumFunc func(iotago.AccountID) Deficit) (droppedBlocks []*blocks.Block) {
func (b *BufferQueue) dropTail(quantumFunc func(iotago.AccountID) Deficit, maxBuffer int) (droppedBlocks []*blocks.Block) {
start := b.Current()
ringStart := b.ring
// remove as many blocks as necessary to stay within max buffer size
for b.Size() > b.maxBuffer {
for b.Size() > maxBuffer {
// TODO: extract to util func
// find longest mana-scaled queue
maxScale := math.Inf(-1)
Expand Down
Loading
Loading