diff --git a/components/metrics/metrics_scheduler.go b/components/metrics/metrics_scheduler.go index 4bf22e2af..0d81472b6 100644 --- a/components/metrics/metrics_scheduler.go +++ b/components/metrics/metrics_scheduler.go @@ -13,18 +13,21 @@ import ( const ( schedulerNamespace = "scheduler" - queueSizePerNodeWork = "queue_size_per_node_work" //nolint:gosec - queueSizePerNodeCount = "queue_size_per_node_count" - schedulerProcessedBlocks = "processed_blocks" - manaAmountPerNode = "mana_per_node" - scheduledBlockLabel = "scheduled" - skippedBlockLabel = "skipped" - droppedBlockLabel = "dropped" - enqueuedBlockLabel = "enqueued" - bufferReadyBlockCount = "buffer_ready_block_total" //nolint:gosec - bufferTotalSize = "buffer_size_block_total" - bufferMaxSize = "buffer_max_size" - rate = "rate" + queueSizePerNodeWork = "queue_size_per_node_work" //nolint:gosec + queueSizePerNodeCount = "queue_size_per_node_count" + validatorQueueSizePerNodeCount = "validator_queue_size_per_node_count" + schedulerProcessedBlocks = "processed_blocks" + manaAmountPerNode = "mana_per_node" + scheduledBlockLabel = "scheduled" + skippedBlockLabel = "skipped" + droppedBlockLabel = "dropped" + enqueuedBlockLabel = "enqueued" + basicBufferReadyBlockCount = "buffer_ready_block_total" //nolint:gosec + basicBufferTotalSize = "buffer_size_block_total" + basicBufferMaxSize = "buffer_max_size" + rate = "rate" + validatorBufferTotalSize = "validator_buffer_size_block_total" + validatorQueueMaxSize = "validator_buffer_max_size" ) var SchedulerMetrics = collector.NewCollection(schedulerNamespace, @@ -55,7 +58,6 @@ var SchedulerMetrics = collector.NewCollection(schedulerNamespace, }, event.WithWorkerPool(Component.WorkerPool)) }), )), - collector.WithMetric(collector.NewMetric(queueSizePerNodeCount, collector.WithType(collector.Gauge), collector.WithLabels("issuer_id"), @@ -63,23 +65,58 @@ var SchedulerMetrics = collector.NewCollection(schedulerNamespace, collector.WithHelp("Current size of each node's queue (as block count)."), collector.WithInitFunc(func() { deps.Protocol.Events.Engine.Scheduler.BlockEnqueued.Hook(func(block *blocks.Block) { - deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String()) - + if _, isBasic := block.BasicBlock(); isBasic { + deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String()) + } }, event.WithWorkerPool(Component.WorkerPool)) deps.Protocol.Events.Engine.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) { - deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String()) - + if _, isBasic := block.BasicBlock(); isBasic { + deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String()) + } }, event.WithWorkerPool(Component.WorkerPool)) deps.Protocol.Events.Engine.Scheduler.BlockDropped.Hook(func(block *blocks.Block, _ error) { - deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String()) - + if _, isBasic := block.BasicBlock(); isBasic { + deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String()) + } }, event.WithWorkerPool(Component.WorkerPool)) deps.Protocol.Events.Engine.Scheduler.BlockScheduled.Hook(func(block *blocks.Block) { - deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String()) + if _, isBasic := block.BasicBlock(); isBasic { + deps.Collector.Update(schedulerNamespace, queueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.IssuerQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String()) + } + }, event.WithWorkerPool(Component.WorkerPool)) + }), + )), + collector.WithMetric(collector.NewMetric(validatorQueueSizePerNodeCount, + collector.WithType(collector.Gauge), + collector.WithLabels("issuer_id"), + collector.WithPruningDelay(10*time.Minute), + collector.WithHelp("Current number of validation blocks in each validator's queue."), + collector.WithInitFunc(func() { + deps.Protocol.Events.Engine.Scheduler.BlockEnqueued.Hook(func(block *blocks.Block) { + if _, isValidation := block.ValidationBlock(); isValidation { + deps.Collector.Update(schedulerNamespace, validatorQueueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.ValidatorQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String()) + } + }, event.WithWorkerPool(Component.WorkerPool)) + deps.Protocol.Events.Engine.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) { + if _, isValidation := block.ValidationBlock(); isValidation { + deps.Collector.Update(schedulerNamespace, validatorQueueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.ValidatorQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String()) + } + }, event.WithWorkerPool(Component.WorkerPool)) + + deps.Protocol.Events.Engine.Scheduler.BlockDropped.Hook(func(block *blocks.Block, _ error) { + if _, isValidation := block.ValidationBlock(); isValidation { + deps.Collector.Update(schedulerNamespace, validatorQueueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.ValidatorQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String()) + } + }, event.WithWorkerPool(Component.WorkerPool)) + + deps.Protocol.Events.Engine.Scheduler.BlockScheduled.Hook(func(block *blocks.Block) { + if _, isValidation := block.ValidationBlock(); isValidation { + deps.Collector.Update(schedulerNamespace, validatorQueueSizePerNodeCount, float64(deps.Protocol.MainEngineInstance().Scheduler.ValidatorQueueBlockCount(block.ProtocolBlock().IssuerID)), block.ProtocolBlock().IssuerID.String()) + } }, event.WithWorkerPool(Component.WorkerPool)) }), )), @@ -127,32 +164,46 @@ var SchedulerMetrics = collector.NewCollection(schedulerNamespace, }, event.WithWorkerPool(Component.WorkerPool)) }), )), - collector.WithMetric(collector.NewMetric(bufferMaxSize, + collector.WithMetric(collector.NewMetric(basicBufferMaxSize, collector.WithType(collector.Gauge), - collector.WithHelp("Maximum number of blocks that can be stored in the buffer."), + collector.WithHelp("Maximum number of basic blocks that can be stored in the buffer."), collector.WithCollectFunc(func() (float64, []string) { - return float64(deps.Protocol.MainEngineInstance().Scheduler.MaxBufferSize()), []string{} + return float64(deps.Protocol.MainEngineInstance().CurrentAPI().ProtocolParameters().CongestionControlParameters().MaxBufferSize), []string{} }), )), - collector.WithMetric(collector.NewMetric(bufferReadyBlockCount, + collector.WithMetric(collector.NewMetric(basicBufferReadyBlockCount, collector.WithType(collector.Gauge), collector.WithHelp("Number of ready blocks in the scheduler buffer."), collector.WithCollectFunc(func() (float64, []string) { return float64(deps.Protocol.MainEngineInstance().Scheduler.ReadyBlocksCount()), []string{} }), )), - collector.WithMetric(collector.NewMetric(bufferTotalSize, + collector.WithMetric(collector.NewMetric(basicBufferTotalSize, collector.WithType(collector.Gauge), - collector.WithHelp("Current size of the scheduler buffer (in bytes)."), + collector.WithHelp("Current number of basic blocks in the scheduler buffer."), collector.WithCollectFunc(func() (float64, []string) { - return float64(deps.Protocol.MainEngineInstance().Scheduler.BufferSize()), []string{} + return float64(deps.Protocol.MainEngineInstance().Scheduler.BasicBufferSize()), []string{} }), )), collector.WithMetric(collector.NewMetric(rate, collector.WithType(collector.Gauge), - collector.WithHelp("Current rate of the scheduler."), + collector.WithHelp("Current scheduling rate of basic blocks."), + collector.WithCollectFunc(func() (float64, []string) { + return float64(deps.Protocol.MainEngineInstance().CurrentAPI().ProtocolParameters().CongestionControlParameters().SchedulerRate), []string{} + }), + )), + collector.WithMetric(collector.NewMetric(validatorBufferTotalSize, + collector.WithType(collector.Gauge), + collector.WithHelp("Current number of validation blocks in the scheduling buffer."), + collector.WithCollectFunc(func() (float64, []string) { + return float64(deps.Protocol.MainEngineInstance().Scheduler.ValidatorBufferSize()), []string{} + }), + )), + collector.WithMetric(collector.NewMetric(validatorQueueMaxSize, + collector.WithType(collector.Gauge), + collector.WithHelp("Maximum number of validation blocks that can be stored in each validator queue."), collector.WithCollectFunc(func() (float64, []string) { - return float64(deps.Protocol.MainEngineInstance().Scheduler.Rate()), []string{} + return float64(deps.Protocol.MainEngineInstance().CurrentAPI().ProtocolParameters().CongestionControlParameters().MaxValidationBufferSize), []string{} }), )), ) diff --git a/components/validator/issuer.go b/components/validator/issuer.go index 9b913159c..83555a134 100644 --- a/components/validator/issuer.go +++ b/components/validator/issuer.go @@ -62,6 +62,6 @@ func issueValidatorBlock(ctx context.Context) { return } - Component.LogDebug("Issued validator block: %s - commitment %s %d - latest finalized slot %d", modelBlock.ID(), modelBlock.ProtocolBlock().SlotCommitmentID, modelBlock.ProtocolBlock().SlotCommitmentID.Index(), modelBlock.ProtocolBlock().LatestFinalizedSlot) + Component.LogDebugf("Issued validator block: %s - commitment %s %d - latest finalized slot %d", modelBlock.ID(), modelBlock.ProtocolBlock().SlotCommitmentID, modelBlock.ProtocolBlock().SlotCommitmentID.Index(), modelBlock.ProtocolBlock().LatestFinalizedSlot) } diff --git a/go.mod b/go.mod index 07912fa41..7b6135a3d 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( github.com/iotaledger/hive.go/stringify v0.0.0-20230906114834-b50190b9f9c2 github.com/iotaledger/inx-app v1.0.0-rc.3.0.20230829161228-3f4eb50a4d14 github.com/iotaledger/inx/go v1.0.0-rc.2.0.20230829160617-69b96c7c9f9b - github.com/iotaledger/iota.go/v4 v4.0.0-20230912141328-810f7e83d265 + github.com/iotaledger/iota.go/v4 v4.0.0-20230913143616-917572c7752d github.com/labstack/echo/v4 v4.11.1 github.com/labstack/gommon v0.4.0 github.com/libp2p/go-libp2p v0.30.0 diff --git a/go.sum b/go.sum index f30191d02..d2bb86a59 100644 --- a/go.sum +++ b/go.sum @@ -307,8 +307,8 @@ github.com/iotaledger/inx-app v1.0.0-rc.3.0.20230829161228-3f4eb50a4d14 h1:BkDuQ github.com/iotaledger/inx-app v1.0.0-rc.3.0.20230829161228-3f4eb50a4d14/go.mod h1:ADBXzdHXTldP0NB2Vf+KbhDxkYciGRjzQVXT6Rdne1g= github.com/iotaledger/inx/go v1.0.0-rc.2.0.20230829160617-69b96c7c9f9b h1:EPB/+iWeSx/WgJlzaXl8yjinxuD8CCOdi2ZPMLeeMVY= github.com/iotaledger/inx/go v1.0.0-rc.2.0.20230829160617-69b96c7c9f9b/go.mod h1:B7gyJP6GshCSlEmY3CxEk5TZdsMs3UNz5U92hkFDdMs= -github.com/iotaledger/iota.go/v4 v4.0.0-20230912141328-810f7e83d265 h1:0j8ljlBmo/f5Gxva83mLWqZLB/xSO9PgJFMPfJ7tyRY= -github.com/iotaledger/iota.go/v4 v4.0.0-20230912141328-810f7e83d265/go.mod h1:DWCa+mXRTGWBV0EHVuvToUxAEcICe2Pab9hBlxBamKo= +github.com/iotaledger/iota.go/v4 v4.0.0-20230913143616-917572c7752d h1:p9IchKq6kft758XDlnN/tAEXJMXGlmQPmbdxolba1gs= +github.com/iotaledger/iota.go/v4 v4.0.0-20230913143616-917572c7752d/go.mod h1:DWCa+mXRTGWBV0EHVuvToUxAEcICe2Pab9hBlxBamKo= github.com/ipfs/boxo v0.10.0 h1:tdDAxq8jrsbRkYoF+5Rcqyeb91hgWe2hp7iLu7ORZLY= github.com/ipfs/boxo v0.10.0/go.mod h1:Fg+BnfxZ0RPzR0nOodzdIq3A7KgoWAOWsEIImrIQdBM= github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s= diff --git a/pkg/protocol/engine/accounts/accountsledger/manager.go b/pkg/protocol/engine/accounts/accountsledger/manager.go index 35114d498..5bf108a72 100644 --- a/pkg/protocol/engine/accounts/accountsledger/manager.go +++ b/pkg/protocol/engine/accounts/accountsledger/manager.go @@ -200,6 +200,11 @@ func (m *Manager) Account(accountID iotago.AccountID, targetIndex iotago.SlotInd m.mutex.RLock() defer m.mutex.RUnlock() + return m.account(accountID, targetIndex) + +} + +func (m *Manager) account(accountID iotago.AccountID, targetIndex iotago.SlotIndex) (accountData *accounts.AccountData, exists bool, err error) { // if m.latestCommittedSlot < maxCommittableAge we should have all history maxCommittableAge := m.apiProvider.APIForSlot(targetIndex).ProtocolParameters().MaxCommittableAge() if m.latestCommittedSlot >= maxCommittableAge && targetIndex+maxCommittableAge < m.latestCommittedSlot { @@ -412,8 +417,9 @@ func (m *Manager) preserveDestroyedAccountData(accountID iotago.AccountID) (acco func (m *Manager) computeBlockBurnsForSlot(slotIndex iotago.SlotIndex, rmc iotago.Mana) (burns map[iotago.AccountID]iotago.Mana, err error) { burns = make(map[iotago.AccountID]iotago.Mana) + validationBlockCount := make(map[iotago.AccountID]int) + apiForSlot := m.apiProvider.APIForSlot(slotIndex) if set, exists := m.blockBurns.Get(slotIndex); exists { - // Get RMC for this slot for it := set.Iterator(); it.HasNext(); { blockID := it.Next() block, blockLoaded := m.block(blockID) @@ -422,6 +428,24 @@ func (m *Manager) computeBlockBurnsForSlot(slotIndex iotago.SlotIndex, rmc iotag } if _, isBasicBlock := block.BasicBlock(); isBasicBlock { burns[block.ProtocolBlock().IssuerID] += iotago.Mana(block.WorkScore()) * rmc + } else if _, isValidationBlock := block.ValidationBlock(); isValidationBlock { + validationBlockCount[block.ProtocolBlock().IssuerID]++ + } + } + validationBlocksPerSlot := int(apiForSlot.ProtocolParameters().ValidationBlocksPerSlot()) + for accountID, count := range validationBlockCount { + if count > validationBlocksPerSlot { + // penalize over-issuance + accountData, exists, err := m.account(accountID, m.latestCommittedSlot) + if !exists { + return nil, ierrors.Wrapf(err, "cannot compute penalty for over-issuing validator, account %s could not be retrieved", accountID) + } + punishmentEpochs := apiForSlot.ProtocolParameters().PunishmentEpochs() + manaPunishment, err := apiForSlot.ManaDecayProvider().ManaGenerationWithDecay(accountData.ValidatorStake, slotIndex, slotIndex+apiForSlot.TimeProvider().EpochDurationSlots()*iotago.SlotIndex(punishmentEpochs)) + if err != nil { + return nil, ierrors.Wrapf(err, "cannot compute penalty for over-issuing validator with account ID %s due to problem with mana generation", accountID) + } + burns[accountID] += iotago.Mana(count-validationBlocksPerSlot) * manaPunishment } } } diff --git a/pkg/protocol/engine/attestation/slotattestation/testframework_test.go b/pkg/protocol/engine/attestation/slotattestation/testframework_test.go index 4362ed8d3..8e756c266 100644 --- a/pkg/protocol/engine/attestation/slotattestation/testframework_test.go +++ b/pkg/protocol/engine/attestation/slotattestation/testframework_test.go @@ -70,7 +70,7 @@ func NewTestFramework(test *testing.T) *TestFramework { testAPI := iotago.V3API( iotago.NewV3ProtocolParameters( iotago.WithNetworkOptions("TestJungle", "tgl"), - iotago.WithSupplyOptions(10000, 0, 0, 0, 0, 0), + iotago.WithSupplyOptions(10000, 0, 0, 0, 0, 0, 0), iotago.WithLivenessOptions(1, 1, 2, 8), ), ) diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go b/pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go index 572743971..0fa45bc4d 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go @@ -3,9 +3,11 @@ package drr import ( "container/ring" "math" + "time" "github.com/iotaledger/hive.go/ds/shrinkingmap" "github.com/iotaledger/hive.go/ierrors" + "github.com/iotaledger/hive.go/lo" "github.com/iotaledger/iota-core/pkg/protocol/engine/blocks" iotago "github.com/iotaledger/iota.go/v4" @@ -18,21 +20,24 @@ var ErrInsufficientMana = ierrors.New("insufficient issuer's mana to schedule th // BufferQueue represents a buffer of IssuerQueue. type BufferQueue struct { - // maxBuffer is the maximum buffer size in number of blocks. - maxBuffer int - activeIssuers *shrinkingmap.ShrinkingMap[iotago.AccountID, *ring.Ring] ring *ring.Ring // size is the number of blocks in the buffer. size int + + tokenBucket float64 + lastScheduleTime time.Time + + blockChan chan *blocks.Block } // NewBufferQueue returns a new BufferQueue. -func NewBufferQueue(maxBuffer int) *BufferQueue { +func NewBufferQueue() *BufferQueue { return &BufferQueue{ - maxBuffer: maxBuffer, - activeIssuers: shrinkingmap.New[iotago.AccountID, *ring.Ring](), - ring: nil, + activeIssuers: shrinkingmap.New[iotago.AccountID, *ring.Ring](), + ring: nil, + lastScheduleTime: time.Now(), + blockChan: make(chan *blocks.Block, 1), } } @@ -41,11 +46,6 @@ func (b *BufferQueue) NumActiveIssuers() int { return b.activeIssuers.Size() } -// MaxSize returns the max number of blocks in BufferQueue. -func (b *BufferQueue) MaxSize() int { - return b.maxBuffer -} - // Size returns the total number of blocks in BufferQueue. func (b *BufferQueue) Size() int { return b.size @@ -88,28 +88,28 @@ func (b *BufferQueue) GetIssuerQueue(issuerID iotago.AccountID) (*IssuerQueue, e // Submit submits a block. Return blocks dropped from the scheduler to make room for the submitted block. // The submitted block can also be returned as dropped if the issuer does not have enough mana. -func (b *BufferQueue) Submit(blk *blocks.Block, issuerQueue *IssuerQueue, quantumFunc func(iotago.AccountID) Deficit) (elements []*blocks.Block, err error) { +func (b *BufferQueue) Submit(blk *blocks.Block, issuerQueue *IssuerQueue, quantumFunc func(iotago.AccountID) Deficit, maxBuffer int) ([]*blocks.Block, bool) { // first we submit the block, and if it turns out that the issuer doesn't have enough bandwidth to submit, it will be removed by dropTail if !issuerQueue.Submit(blk) { - return nil, ierrors.Errorf("block already submitted %s", blk) + return nil, false } b.size++ // if max buffer size exceeded, drop from tail of the longest mana-scaled queue - if b.Size() > b.maxBuffer { - return b.dropTail(quantumFunc), nil + if b.Size() > maxBuffer { + return b.dropTail(quantumFunc, maxBuffer), true } - return nil, nil + return nil, true } -func (b *BufferQueue) dropTail(quantumFunc func(iotago.AccountID) Deficit) (droppedBlocks []*blocks.Block) { +func (b *BufferQueue) dropTail(quantumFunc func(iotago.AccountID) Deficit, maxBuffer int) (droppedBlocks []*blocks.Block) { start := b.Current() ringStart := b.ring // remove as many blocks as necessary to stay within max buffer size - for b.Size() > b.maxBuffer { + for b.Size() > maxBuffer { // TODO: extract to util func // find longest mana-scaled queue maxScale := math.Inf(-1) @@ -320,4 +320,22 @@ func (b *BufferQueue) ringInsert(v interface{}) *ring.Ring { return p.Link(b.ring) } +func (b *BufferQueue) waitTime(rate float64, block *blocks.Block) time.Duration { + tokensRequired := float64(block.WorkScore()) - (b.tokenBucket + rate*time.Since(b.lastScheduleTime).Seconds()) + + return lo.Max(0, time.Duration(tokensRequired/rate)) +} + +func (b *BufferQueue) updateTokenBucket(rate float64, tokenBucketSize float64) { + b.tokenBucket = lo.Min( + tokenBucketSize, + b.tokenBucket+rate*time.Since(b.lastScheduleTime).Seconds(), + ) + b.lastScheduleTime = time.Now() +} + +func (b *BufferQueue) deductTokens(tokens float64) { + b.tokenBucket -= tokens +} + // endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go b/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go index df7bd39e7..bc698b39e 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go @@ -11,15 +11,19 @@ import ( "github.com/iotaledger/hive.go/runtime/module" "github.com/iotaledger/hive.go/runtime/options" "github.com/iotaledger/hive.go/runtime/syncutils" + "github.com/iotaledger/iota-core/pkg/model" "github.com/iotaledger/iota-core/pkg/protocol/engine" "github.com/iotaledger/iota-core/pkg/protocol/engine/blocks" "github.com/iotaledger/iota-core/pkg/protocol/engine/congestioncontrol/scheduler" + "github.com/iotaledger/iota-core/pkg/protocol/sybilprotection/seatmanager" iotago "github.com/iotaledger/iota.go/v4" "github.com/iotaledger/iota.go/v4/api" ) type Deficit int64 +type SubSlotIndex int + type Scheduler struct { events *scheduler.Events @@ -29,18 +33,16 @@ type Scheduler struct { apiProvider api.Provider - buffer *BufferQueue - bufferMutex syncutils.RWMutex + seatManager seatmanager.SeatManager - deficits *shrinkingmap.ShrinkingMap[iotago.AccountID, Deficit] + basicBuffer *BufferQueue + validatorBuffer *ValidatorBuffer + bufferMutex syncutils.RWMutex - tokenBucket float64 - lastScheduleTime time.Time + deficits *shrinkingmap.ShrinkingMap[iotago.AccountID, Deficit] shutdownSignal chan struct{} - blockChan chan *blocks.Block - blockCache *blocks.Blocks errorHandler func(error) @@ -52,11 +54,33 @@ func NewProvider(opts ...options.Option[Scheduler]) module.Provider[*engine.Engi return module.Provide(func(e *engine.Engine) scheduler.Scheduler { s := New(e, opts...) s.errorHandler = e.ErrorHandler("scheduler") - s.buffer = NewBufferQueue(int(s.apiProvider.CurrentAPI().ProtocolParameters().CongestionControlParameters().MaxBufferSize)) + s.basicBuffer = NewBufferQueue() e.HookConstructed(func() { + s.latestCommittedSlot = func() iotago.SlotIndex { + return e.Storage.Settings().LatestCommitment().Index() + } s.blockCache = e.BlockCache e.Events.Scheduler.LinkTo(s.events) + e.SybilProtection.HookInitialized(func() { + s.seatManager = e.SybilProtection.SeatManager() + }) + e.Events.Notarization.LatestCommitmentUpdated.Hook(func(commitment *model.Commitment) { + // when the last slot of an epoch is committed, remove the queues of validators that are no longer in the committee. + if e.CurrentAPI().TimeProvider().SlotsBeforeNextEpoch(commitment.Index()) == 0 { + s.bufferMutex.Lock() + defer s.bufferMutex.Unlock() + + s.validatorBuffer.buffer.ForEach(func(accountID iotago.AccountID, validatorQueue *ValidatorQueue) bool { + if !s.seatManager.Committee(commitment.Index() + 1).HasAccount(accountID) { + s.shutdownValidatorQueue(validatorQueue) + s.validatorBuffer.Delete(accountID) + } + + return true + }) + } + }) e.Ledger.HookInitialized(func() { // quantum retrieve function gets the account's Mana and returns the quantum for that account s.quantumFunc = func(accountID iotago.AccountID, manaSlot iotago.SlotIndex) (Deficit, error) { @@ -72,24 +96,12 @@ func NewProvider(opts ...options.Option[Scheduler]) module.Provider[*engine.Engi return Deficit(mana / minMana), nil } - s.latestCommittedSlot = func() iotago.SlotIndex { - return e.Storage.Settings().LatestCommitment().Index() - } }) s.TriggerConstructed() e.Events.Booker.BlockBooked.Hook(func(block *blocks.Block) { - if _, isBasic := block.BasicBlock(); isBasic { - s.AddBlock(block) - s.selectBlockToScheduleWithLocking() - } else { // immediately schedule validator blocks for now. TODO: implement scheduling for validator blocks issue #236 - block.SetEnqueued() - block.SetScheduled() - // check for another block ready to schedule - s.updateChildrenWithLocking(block) - s.selectBlockToScheduleWithLocking() - - s.events.BlockScheduled.Trigger(block) - } + s.AddBlock(block) + s.selectBlockToScheduleWithLocking() + }) e.Events.Ledger.AccountCreated.Hook(func(accountID iotago.AccountID) { s.bufferMutex.Lock() @@ -114,15 +126,20 @@ func NewProvider(opts ...options.Option[Scheduler]) module.Provider[*engine.Engi func New(apiProvider api.Provider, opts ...options.Option[Scheduler]) *Scheduler { return options.Apply( &Scheduler{ - events: scheduler.NewEvents(), - lastScheduleTime: time.Now(), - deficits: shrinkingmap.New[iotago.AccountID, Deficit](), - apiProvider: apiProvider, + events: scheduler.NewEvents(), + deficits: shrinkingmap.New[iotago.AccountID, Deficit](), + apiProvider: apiProvider, + validatorBuffer: NewValidatorBuffer(), }, opts, ) } func (s *Scheduler) Shutdown() { + s.validatorBuffer.buffer.ForEach(func(_ iotago.AccountID, validatorQueue *ValidatorQueue) bool { + s.shutdownValidatorQueue(validatorQueue) + + return true + }) close(s.shutdownSignal) s.TriggerStopped() } @@ -130,23 +147,17 @@ func (s *Scheduler) Shutdown() { // Start starts the scheduler. func (s *Scheduler) Start() { s.shutdownSignal = make(chan struct{}, 1) - s.blockChan = make(chan *blocks.Block, 1) - go s.mainLoop() + go s.basicBlockLoop() s.TriggerInitialized() } -// Rate gets the rate of the scheduler in units of work per second. -func (s *Scheduler) Rate() iotago.WorkScore { - return s.apiProvider.CurrentAPI().ProtocolParameters().CongestionControlParameters().SchedulerRate -} - -// IssuerQueueSizeCount returns the queue size of the given issuer as block count. +// IssuerQueueSizeCount returns the number of blocks in the queue of the given issuer. func (s *Scheduler) IssuerQueueBlockCount(issuerID iotago.AccountID) int { s.bufferMutex.RLock() defer s.bufferMutex.RUnlock() - return s.buffer.IssuerQueue(issuerID).Size() + return s.basicBuffer.IssuerQueue(issuerID).Size() } // IssuerQueueWork returns the queue size of the given issuer in work units. @@ -154,15 +165,35 @@ func (s *Scheduler) IssuerQueueWork(issuerID iotago.AccountID) iotago.WorkScore s.bufferMutex.RLock() defer s.bufferMutex.RUnlock() - return s.buffer.IssuerQueue(issuerID).Work() + return s.basicBuffer.IssuerQueue(issuerID).Work() +} + +// ValidatorQueueBlockCount returns the number of validation blocks in the validator queue of the given issuer. +func (s *Scheduler) ValidatorQueueBlockCount(issuerID iotago.AccountID) int { + s.bufferMutex.RLock() + defer s.bufferMutex.RUnlock() + + validatorQueue, exists := s.validatorBuffer.Get(issuerID) + if !exists { + return 0 + } + + return validatorQueue.Size() } // BufferSize returns the current buffer size of the Scheduler as block count. -func (s *Scheduler) BufferSize() int { +func (s *Scheduler) BasicBufferSize() int { s.bufferMutex.RLock() defer s.bufferMutex.RUnlock() - return s.buffer.Size() + return s.basicBuffer.Size() +} + +func (s *Scheduler) ValidatorBufferSize() int { + s.bufferMutex.RLock() + defer s.bufferMutex.RUnlock() + + return s.validatorBuffer.Size() } // MaxBufferSize returns the max buffer size of the Scheduler as block count. @@ -175,7 +206,7 @@ func (s *Scheduler) ReadyBlocksCount() int { s.bufferMutex.RLock() defer s.bufferMutex.RUnlock() - return s.buffer.ReadyBlocksCount() + return s.basicBuffer.ReadyBlocksCount() } func (s *Scheduler) IsBlockIssuerReady(accountID iotago.AccountID, blocks ...*blocks.Block) bool { @@ -183,7 +214,7 @@ func (s *Scheduler) IsBlockIssuerReady(accountID iotago.AccountID, blocks ...*bl defer s.bufferMutex.RUnlock() // if the buffer is completely empty, any issuer can issue a block. - if s.buffer.Size() == 0 { + if s.basicBuffer.Size() == 0 { return true } work := iotago.WorkScore(0) @@ -200,17 +231,25 @@ func (s *Scheduler) IsBlockIssuerReady(accountID iotago.AccountID, blocks ...*bl return false } - return deficit >= s.deficitFromWork(work+s.buffer.IssuerQueue(accountID).Work()) + return deficit >= s.deficitFromWork(work+s.basicBuffer.IssuerQueue(accountID).Work()) } func (s *Scheduler) AddBlock(block *blocks.Block) { + if _, isValidation := block.ValidationBlock(); isValidation { + s.enqueueValidationBlock(block) + } else if _, isBasic := block.BasicBlock(); isBasic { + s.enqueueBasicBlock(block) + } +} + +func (s *Scheduler) enqueueBasicBlock(block *blocks.Block) { s.bufferMutex.Lock() defer s.bufferMutex.Unlock() slotIndex := s.latestCommittedSlot() issuerID := block.ProtocolBlock().IssuerID - issuerQueue, err := s.buffer.GetIssuerQueue(issuerID) + issuerQueue, err := s.basicBuffer.GetIssuerQueue(issuerID) if err != nil { // this should only ever happen if the issuer has been removed due to insufficient Mana. // if Mana is now sufficient again, we can add the issuer again. @@ -222,23 +261,28 @@ func (s *Scheduler) AddBlock(block *blocks.Block) { issuerQueue = s.createIssuer(issuerID) } - droppedBlocks, err := s.buffer.Submit(block, issuerQueue, func(issuerID iotago.AccountID) Deficit { - quantum, quantumErr := s.quantumFunc(issuerID, slotIndex) - if quantumErr != nil { - s.errorHandler(ierrors.Wrapf(quantumErr, "failed to retrieve deficit for issuerID %d in slot %d when submitting a block", issuerID, slotIndex)) + droppedBlocks, submitted := s.basicBuffer.Submit( + block, + issuerQueue, + func(issuerID iotago.AccountID) Deficit { + quantum, quantumErr := s.quantumFunc(issuerID, slotIndex) + if quantumErr != nil { + s.errorHandler(ierrors.Wrapf(quantumErr, "failed to retrieve deficit for issuerID %d in slot %d when submitting a block", issuerID, slotIndex)) - return 0 - } + return 0 + } - return quantum - }) + return quantum + }, + int(s.apiProvider.CurrentAPI().ProtocolParameters().CongestionControlParameters().MaxBufferSize), + ) // error submitting indicates that the block was already submitted so we do nothing else. - if err != nil { + if !submitted { return } for _, b := range droppedBlocks { b.SetDropped() - s.events.BlockDropped.Trigger(b, ierrors.New("block dropped from buffer")) + s.events.BlockDropped.Trigger(b, ierrors.New("basic block dropped from buffer")) } if block.SetEnqueued() { s.events.BlockEnqueued.Trigger(block) @@ -246,7 +290,30 @@ func (s *Scheduler) AddBlock(block *blocks.Block) { } } -func (s *Scheduler) mainLoop() { +func (s *Scheduler) enqueueValidationBlock(block *blocks.Block) { + s.bufferMutex.Lock() + defer s.bufferMutex.Unlock() + + _, exists := s.validatorBuffer.Get(block.ProtocolBlock().IssuerID) + if !exists { + s.addValidator(block.ProtocolBlock().IssuerID) + } + droppedBlock, submitted := s.validatorBuffer.Submit(block, int(s.apiProvider.CurrentAPI().ProtocolParameters().CongestionControlParameters().MaxValidationBufferSize)) + if !submitted { + return + } + if droppedBlock != nil { + droppedBlock.SetDropped() + s.events.BlockDropped.Trigger(droppedBlock, ierrors.New("validation block dropped from buffer")) + } + + if block.SetEnqueued() { + s.events.BlockEnqueued.Trigger(block) + s.tryReadyValidationBlock(block) + } +} + +func (s *Scheduler) basicBlockLoop() { var blockToSchedule *blocks.Block loop: for { @@ -255,29 +322,62 @@ loop: case <-s.shutdownSignal: break loop // when a block is pushed by the buffer - case blockToSchedule = <-s.blockChan: + case blockToSchedule = <-s.basicBuffer.blockChan: currentAPI := s.apiProvider.CurrentAPI() rate := currentAPI.ProtocolParameters().CongestionControlParameters().SchedulerRate - tokensRequired := float64(blockToSchedule.WorkScore()) - (s.tokenBucket + float64(rate)*time.Since(s.lastScheduleTime).Seconds()) - if tokensRequired > 0 { - // wait until sufficient tokens in token bucket - timer := time.NewTimer(time.Duration(tokensRequired/float64(rate)) * time.Second) + if waitTime := s.basicBuffer.waitTime(float64(rate), blockToSchedule); waitTime > 0 { + timer := time.NewTimer(waitTime) <-timer.C } - s.tokenBucket = lo.Min( - float64(currentAPI.MaxBlockWork()), - s.tokenBucket+float64(rate)*time.Since(s.lastScheduleTime).Seconds(), - ) - s.lastScheduleTime = time.Now() - s.scheduleBlock(blockToSchedule) + s.basicBuffer.updateTokenBucket(float64(rate), float64(currentAPI.MaxBlockWork())) + + s.scheduleBasicBlock(blockToSchedule) } } } -func (s *Scheduler) scheduleBlock(block *blocks.Block) { +func (s *Scheduler) validatorLoop(validatorQueue *ValidatorQueue) { + var blockToSchedule *blocks.Block +loop: + for { + select { + // on close, exit the loop + case <-validatorQueue.shutdownSignal: + break loop + // when a block is pushed by this validator queue. + case blockToSchedule = <-validatorQueue.blockChan: + currentAPI := s.apiProvider.CurrentAPI() + validationBlocksPerSlot := float64(currentAPI.ProtocolParameters().ValidationBlocksPerSlot()) + rate := validationBlocksPerSlot / float64(currentAPI.TimeProvider().SlotDurationSeconds()) + if waitTime := validatorQueue.waitTime(rate); waitTime > 0 { + timer := time.NewTimer(waitTime) + <-timer.C + } + // allow a maximum burst of validationBlocksPerSlot by setting this as max token bucket size. + validatorQueue.updateTokenBucket(rate, validationBlocksPerSlot) + + s.scheduleValidationBlock(blockToSchedule, validatorQueue) + } + } +} + +func (s *Scheduler) scheduleBasicBlock(block *blocks.Block) { if block.SetScheduled() { // deduct tokens from the token bucket according to the scheduled block's work. - s.tokenBucket -= float64(block.WorkScore()) + s.basicBuffer.deductTokens(float64(block.WorkScore())) + + // check for another block ready to schedule + s.updateChildrenWithLocking(block) + s.selectBlockToScheduleWithLocking() + + s.events.BlockScheduled.Trigger(block) + } +} + +func (s *Scheduler) scheduleValidationBlock(block *blocks.Block, validatorQueue *ValidatorQueue) { + if block.SetScheduled() { + // deduct 1 token from the token bucket of this validator's queue. + validatorQueue.deductTokens(1) // check for another block ready to schedule s.updateChildrenWithLocking(block) @@ -291,13 +391,40 @@ func (s *Scheduler) selectBlockToScheduleWithLocking() { s.bufferMutex.Lock() defer s.bufferMutex.Unlock() + s.validatorBuffer.buffer.ForEach(func(accountID iotago.AccountID, validatorQueue *ValidatorQueue) bool { + if s.selectValidationBlockWithoutLocking(validatorQueue) { + s.validatorBuffer.size-- + } + + return true + }) + s.selectBasicBlockWithoutLocking() + +} + +func (s *Scheduler) selectValidationBlockWithoutLocking(validatorQueue *ValidatorQueue) bool { + // already a block selected to be scheduled. + if len(validatorQueue.blockChan) > 0 { + return false + } + + if blockToSchedule := validatorQueue.PopFront(); blockToSchedule != nil { + validatorQueue.blockChan <- blockToSchedule + + return true + } + + return false +} + +func (s *Scheduler) selectBasicBlockWithoutLocking() { slotIndex := s.latestCommittedSlot() // already a block selected to be scheduled. - if len(s.blockChan) > 0 { + if len(s.basicBuffer.blockChan) > 0 { return } - start := s.buffer.Current() + start := s.basicBuffer.Current() // no blocks submitted if start == nil { return @@ -318,9 +445,9 @@ func (s *Scheduler) selectBlockToScheduleWithLocking() { s.errorHandler(ierrors.Wrapf(err, "failed to increment deficit for issuerID %s in slot %d", issuerID, slotIndex)) s.removeIssuer(issuerID, err) - q = s.buffer.Current() + q = s.basicBuffer.Current() } else { - q = s.buffer.Next() + q = s.basicBuffer.Next() } if q == nil { return @@ -331,7 +458,7 @@ func (s *Scheduler) selectBlockToScheduleWithLocking() { } } // increment the deficit for all issuers before schedulingIssuer one more time - for q := start; q != schedulingIssuer; q = s.buffer.Next() { + for q := start; q != schedulingIssuer; q = s.basicBuffer.Next() { issuerID := q.IssuerID() if err := s.incrementDeficit(issuerID, 1, slotIndex); err != nil { s.errorHandler(ierrors.Wrapf(err, "failed to increment deficit for issuerID %s in slot %d", issuerID, slotIndex)) @@ -342,7 +469,7 @@ func (s *Scheduler) selectBlockToScheduleWithLocking() { } // remove the block from the buffer and adjust issuer's deficit - block := s.buffer.PopFront() + block := s.basicBuffer.PopFront() issuerID := block.ProtocolBlock().IssuerID err := s.updateDeficit(issuerID, -s.deficitFromWork(block.WorkScore())) @@ -353,7 +480,7 @@ func (s *Scheduler) selectBlockToScheduleWithLocking() { return } - s.blockChan <- block + s.basicBuffer.blockChan <- block } func (s *Scheduler) selectIssuer(start *IssuerQueue, slotIndex iotago.SlotIndex) (Deficit, *IssuerQueue) { @@ -372,7 +499,7 @@ func (s *Scheduler) selectIssuer(start *IssuerQueue, slotIndex iotago.SlotIndex) s.events.BlockSkipped.Trigger(block) } - s.buffer.PopFront() + s.basicBuffer.PopFront() block = q.Front() @@ -419,9 +546,9 @@ func (s *Scheduler) selectIssuer(start *IssuerQueue, slotIndex iotago.SlotIndex) } if issuerRemoved { - q = s.buffer.Current() + q = s.basicBuffer.Current() } else { - q = s.buffer.Next() + q = s.basicBuffer.Next() } if q == start || q == nil { break @@ -432,7 +559,7 @@ func (s *Scheduler) selectIssuer(start *IssuerQueue, slotIndex iotago.SlotIndex) } func (s *Scheduler) removeIssuer(issuerID iotago.AccountID, err error) { - q := s.buffer.IssuerQueue(issuerID) + q := s.basicBuffer.IssuerQueue(issuerID) q.submitted.ForEach(func(id iotago.BlockID, block *blocks.Block) bool { block.SetDropped() s.events.BlockDropped.Trigger(block, err) @@ -448,11 +575,11 @@ func (s *Scheduler) removeIssuer(issuerID iotago.AccountID, err error) { s.deficits.Delete(issuerID) - s.buffer.RemoveIssuer(issuerID) + s.basicBuffer.RemoveIssuer(issuerID) } func (s *Scheduler) createIssuer(accountID iotago.AccountID) *IssuerQueue { - issuerQueue := s.buffer.CreateIssuerQueue(accountID) + issuerQueue := s.basicBuffer.CreateIssuerQueue(accountID) s.deficits.Set(accountID, 0) return issuerQueue @@ -531,8 +658,21 @@ func (s *Scheduler) tryReady(block *blocks.Block) { } } +// tryReadyValidator tries to set the given validation block as ready. +func (s *Scheduler) tryReadyValidationBlock(block *blocks.Block) { + if s.isReady(block) { + s.readyValidationBlock(block) + } +} + func (s *Scheduler) ready(block *blocks.Block) { - s.buffer.Ready(block) + s.basicBuffer.Ready(block) +} + +func (s *Scheduler) readyValidationBlock(block *blocks.Block) { + if validatorQueue, exists := s.validatorBuffer.Get(block.ProtocolBlock().IssuerID); exists { + validatorQueue.Ready(block) + } } // updateChildrenWithLocking locks the buffer mutex and iterates over the direct children of the given blockID and @@ -549,7 +689,13 @@ func (s *Scheduler) updateChildrenWithLocking(block *blocks.Block) { func (s *Scheduler) updateChildrenWithoutLocking(block *blocks.Block) { for _, childBlock := range block.Children() { if _, childBlockExists := s.blockCache.Block(childBlock.ID()); childBlockExists && childBlock.IsEnqueued() { - s.tryReady(childBlock) + if _, isBasic := childBlock.BasicBlock(); isBasic { + s.tryReady(childBlock) + } else if _, isValidation := childBlock.ValidationBlock(); isValidation { + s.tryReadyValidationBlock(childBlock) + } else { + panic("invalid block type") + } } } } @@ -563,3 +709,15 @@ func (s *Scheduler) deficitFromWork(work iotago.WorkScore) Deficit { deficitScaleFactor := s.maxDeficit() / Deficit(s.apiProvider.CurrentAPI().MaxBlockWork()) return Deficit(work) * deficitScaleFactor } + +func (s *Scheduler) addValidator(accountID iotago.AccountID) *ValidatorQueue { + validatorQueue := NewValidatorQueue(accountID) + s.validatorBuffer.Set(accountID, validatorQueue) + go s.validatorLoop(validatorQueue) + + return validatorQueue +} + +func (s *Scheduler) shutdownValidatorQueue(validatorQueue *ValidatorQueue) { + validatorQueue.shutdownSignal <- struct{}{} +} diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/drr/validatorqueue.go b/pkg/protocol/engine/congestioncontrol/scheduler/drr/validatorqueue.go new file mode 100644 index 000000000..8d2fd0aaf --- /dev/null +++ b/pkg/protocol/engine/congestioncontrol/scheduler/drr/validatorqueue.go @@ -0,0 +1,228 @@ +package drr + +import ( + "container/heap" + "fmt" + "time" + + "go.uber.org/atomic" + + "github.com/iotaledger/hive.go/ds/generalheap" + "github.com/iotaledger/hive.go/ds/shrinkingmap" + "github.com/iotaledger/hive.go/lo" + "github.com/iotaledger/hive.go/runtime/timed" + "github.com/iotaledger/iota-core/pkg/protocol/engine/blocks" + + iotago "github.com/iotaledger/iota.go/v4" +) + +type ValidatorQueue struct { + accountID iotago.AccountID + submitted *shrinkingmap.ShrinkingMap[iotago.BlockID, *blocks.Block] + inbox generalheap.Heap[timed.HeapKey, *blocks.Block] + size atomic.Int64 + + tokenBucket float64 + lastScheduleTime time.Time + + blockChan chan *blocks.Block + shutdownSignal chan struct{} +} + +func NewValidatorQueue(accountID iotago.AccountID) *ValidatorQueue { + return &ValidatorQueue{ + accountID: accountID, + submitted: shrinkingmap.New[iotago.BlockID, *blocks.Block](), + blockChan: make(chan *blocks.Block, 1), + shutdownSignal: make(chan struct{}), + tokenBucket: 1, + lastScheduleTime: time.Now(), + } +} + +func (q *ValidatorQueue) Size() int { + if q == nil { + return 0 + } + + return int(q.size.Load()) +} + +func (q *ValidatorQueue) AccountID() iotago.AccountID { + return q.accountID +} + +func (q *ValidatorQueue) Submit(block *blocks.Block, maxBuffer int) (*blocks.Block, bool) { + if blkAccountID := block.ProtocolBlock().IssuerID; q.accountID != blkAccountID { + panic(fmt.Sprintf("issuerqueue: queue issuer ID(%x) and issuer ID(%x) does not match.", q.accountID, blkAccountID)) + } + + if _, submitted := q.submitted.Get(block.ID()); submitted { + return nil, false + } + + q.submitted.Set(block.ID(), block) + q.size.Inc() + + if int(q.size.Load()) > maxBuffer { + return q.RemoveTail(), true + } + + return nil, true +} + +func (q *ValidatorQueue) Unsubmit(block *blocks.Block) bool { + if _, submitted := q.submitted.Get(block.ID()); !submitted { + return false + } + + q.submitted.Delete(block.ID()) + q.size.Dec() + + return true +} + +func (q *ValidatorQueue) Ready(block *blocks.Block) bool { + if _, submitted := q.submitted.Get(block.ID()); !submitted { + return false + } + + q.submitted.Delete(block.ID()) + heap.Push(&q.inbox, &generalheap.HeapElement[timed.HeapKey, *blocks.Block]{Value: block, Key: timed.HeapKey(block.IssuingTime())}) + + return true +} + +// PopFront removes the first ready block from the queue. +func (q *ValidatorQueue) PopFront() *blocks.Block { + if q.inbox.Len() == 0 { + return nil + } + + heapElement, isHeapElement := heap.Pop(&q.inbox).(*generalheap.HeapElement[timed.HeapKey, *blocks.Block]) + if !isHeapElement { + return nil + } + blk := heapElement.Value + q.size.Dec() + + return blk +} + +func (q *ValidatorQueue) RemoveTail() *blocks.Block { + var oldestSubmittedBlock *blocks.Block + q.submitted.ForEach(func(_ iotago.BlockID, block *blocks.Block) bool { + if oldestSubmittedBlock == nil || oldestSubmittedBlock.IssuingTime().After(block.IssuingTime()) { + oldestSubmittedBlock = block + } + + return true + }) + + tail := q.tail() + // if heap tail does not exist or tail is newer than oldest submitted block, unsubmit oldest block + if oldestSubmittedBlock != nil && (tail < 0 || q.inbox[tail].Key.CompareTo(timed.HeapKey(oldestSubmittedBlock.IssuingTime())) > 0) { + q.Unsubmit(oldestSubmittedBlock) + + return oldestSubmittedBlock + } else if tail < 0 { + // should never happen that the oldest submitted block does not exist and the tail does not exist. + return nil + } + + // if the tail exists and is older than the oldest submitted block, drop it + heapElement, isHeapElement := heap.Remove(&q.inbox, tail).(*generalheap.HeapElement[timed.HeapKey, *blocks.Block]) + if !isHeapElement { + return nil + } + blk := heapElement.Value + q.size.Dec() + + return blk +} + +func (q *ValidatorQueue) tail() int { + h := q.inbox + if h.Len() <= 0 { + return -1 + } + tail := 0 + for i := range h { + if !h.Less(i, tail) { // less means older issue time + tail = i + } + } + + return tail +} + +func (q *ValidatorQueue) waitTime(rate float64) time.Duration { + tokensRequired := 1 - (q.tokenBucket + rate*time.Since(q.lastScheduleTime).Seconds()) + + return lo.Max(0, time.Duration(tokensRequired/rate)) +} + +func (q *ValidatorQueue) updateTokenBucket(rate float64, tokenBucketSize float64) { + q.tokenBucket = lo.Min( + tokenBucketSize, + q.tokenBucket+rate*time.Since(q.lastScheduleTime).Seconds(), + ) + q.lastScheduleTime = time.Now() +} + +func (q *ValidatorQueue) deductTokens(tokens float64) { + q.tokenBucket -= tokens +} + +type ValidatorBuffer struct { + buffer *shrinkingmap.ShrinkingMap[iotago.AccountID, *ValidatorQueue] + size int +} + +func NewValidatorBuffer() *ValidatorBuffer { + return &ValidatorBuffer{ + buffer: shrinkingmap.New[iotago.AccountID, *ValidatorQueue](), + } +} + +func (b *ValidatorBuffer) Size() int { + if b == nil { + return 0 + } + + return b.size +} + +func (b *ValidatorBuffer) Get(accountID iotago.AccountID) (*ValidatorQueue, bool) { + return b.buffer.Get(accountID) +} + +func (b *ValidatorBuffer) Set(accountID iotago.AccountID, validatorQueue *ValidatorQueue) bool { + return b.buffer.Set(accountID, validatorQueue) +} + +func (b *ValidatorBuffer) Submit(block *blocks.Block, maxBuffer int) (*blocks.Block, bool) { + validatorQueue, exists := b.buffer.Get(block.ProtocolBlock().IssuerID) + if !exists { + return nil, false + } + droppedBlock, submitted := validatorQueue.Submit(block, maxBuffer) + if submitted { + b.size++ + } + if droppedBlock != nil { + b.size-- + } + + return droppedBlock, submitted +} + +func (b *ValidatorBuffer) Delete(accountID iotago.AccountID) { + validatorQueue, exists := b.buffer.Get(accountID) + if !exists { + return + } + b.size -= validatorQueue.Size() + + b.buffer.Delete(accountID) +} diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/passthrough/scheduler.go b/pkg/protocol/engine/congestioncontrol/scheduler/passthrough/scheduler.go index 716288b5f..1c034ac7b 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/passthrough/scheduler.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/passthrough/scheduler.go @@ -42,23 +42,23 @@ func (s *Scheduler) IsBlockIssuerReady(_ iotago.AccountID, _ ...*blocks.Block) b return true } -func (s *Scheduler) Rate() iotago.WorkScore { +func (s *Scheduler) BasicBufferSize() int { return 0 } -func (s *Scheduler) BufferSize() int { +func (s *Scheduler) ValidatorBufferSize() int { return 0 } -func (s *Scheduler) MaxBufferSize() int { +func (s *Scheduler) ReadyBlocksCount() int { return 0 } -func (s *Scheduler) ReadyBlocksCount() int { +func (s *Scheduler) IssuerQueueBlockCount(_ iotago.AccountID) int { return 0 } -func (s *Scheduler) IssuerQueueBlockCount(_ iotago.AccountID) int { +func (s *Scheduler) ValidatorQueueBlockCount(_ iotago.AccountID) int { return 0 } diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/scheduler.go b/pkg/protocol/engine/congestioncontrol/scheduler/scheduler.go index 28e81f71d..997106554 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/scheduler.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/scheduler.go @@ -11,18 +11,18 @@ type Scheduler interface { AddBlock(*blocks.Block) // IsBlockIssuerReady returns true if the block issuer is ready to issuer a block, i.e., if the block issuer were to add a block to the scheduler, would it be scheduled. IsBlockIssuerReady(iotago.AccountID, ...*blocks.Block) bool - // Rate returns the specified rate of the Scheduler in work units. - Rate() iotago.WorkScore // BufferSize returns the current buffer size of the Scheduler as block count. - BufferSize() int - // MaxBufferSize returns the max buffer size of the Scheduler as block count. - MaxBufferSize() int + BasicBufferSize() int + // ValidatorBufferSize returns the current buffer size of the Scheduler as block count. + ValidatorBufferSize() int // ReadyBlocksCount returns the number of ready blocks. ReadyBlocksCount() int // IssuerQueueBlockCount returns the queue size of the given issuer as block count. IssuerQueueBlockCount(issuerID iotago.AccountID) int // IssuerQueueWork returns the queue size of the given issuer in work units. IssuerQueueWork(issuerID iotago.AccountID) iotago.WorkScore + // ValidatorQueueBlockCount returns the queue size of the given validator as block count. + ValidatorQueueBlockCount(validatorID iotago.AccountID) int module.Interface } diff --git a/pkg/protocol/engine/filter/blockfilter/filter.go b/pkg/protocol/engine/filter/blockfilter/filter.go index f3e01bf2e..1e99a7da1 100644 --- a/pkg/protocol/engine/filter/blockfilter/filter.go +++ b/pkg/protocol/engine/filter/blockfilter/filter.go @@ -8,13 +8,16 @@ import ( "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/runtime/module" "github.com/iotaledger/hive.go/runtime/options" + "github.com/iotaledger/iota-core/pkg/core/account" "github.com/iotaledger/iota-core/pkg/model" "github.com/iotaledger/iota-core/pkg/protocol/engine" "github.com/iotaledger/iota-core/pkg/protocol/engine/filter" + iotago "github.com/iotaledger/iota.go/v4" "github.com/iotaledger/iota.go/v4/api" ) var ErrBlockTimeTooFarAheadInFuture = ierrors.New("a block cannot be too far ahead in the future") +var ErrValidatorNotInCommittee = ierrors.New("validation block issuer is not in the committee") // Filter filters blocks. type Filter struct { @@ -24,6 +27,8 @@ type Filter struct { optsMaxAllowedWallClockDrift time.Duration + committeeFunc func(iotago.SlotIndex) *account.SeatedAccounts + module.Module } @@ -34,7 +39,9 @@ func NewProvider(opts ...options.Option[Filter]) module.Provider[*engine.Engine, e.HookConstructed(func() { e.Events.Filter.LinkTo(f.events) - + e.SybilProtection.HookInitialized(func() { + f.committeeFunc = e.SybilProtection.SeatManager().Committee + }) f.TriggerInitialized() }) @@ -69,6 +76,29 @@ func (f *Filter) ProcessReceivedBlock(block *model.Block, source peer.ID) { return } + if _, isValidation := block.ValidationBlock(); isValidation { + blockAPI, err := f.apiProvider.APIForVersion(block.ProtocolBlock().ProtocolVersion) + if err != nil { + f.events.BlockPreFiltered.Trigger(&filter.BlockPreFilteredEvent{ + Block: block, + Reason: ierrors.Wrapf(err, "could not get API for version %d", block.ProtocolBlock().ProtocolVersion), + Source: source, + }) + + return + } + blockSlot := blockAPI.TimeProvider().SlotFromTime(block.ProtocolBlock().IssuingTime) + if !f.committeeFunc(blockSlot).HasAccount(block.ProtocolBlock().IssuerID) { + f.events.BlockPreFiltered.Trigger(&filter.BlockPreFilteredEvent{ + Block: block, + Reason: ierrors.Wrapf(ErrValidatorNotInCommittee, "validation block issuer %s is not part of the committee for slot %d", block.ProtocolBlock().IssuerID, blockSlot), + Source: source, + }) + + return + } + } + f.events.BlockPreAllowed.Trigger(block) } diff --git a/pkg/protocol/engine/filter/blockfilter/filter_test.go b/pkg/protocol/engine/filter/blockfilter/filter_test.go index 772394143..18ba4e9b1 100644 --- a/pkg/protocol/engine/filter/blockfilter/filter_test.go +++ b/pkg/protocol/engine/filter/blockfilter/filter_test.go @@ -9,6 +9,7 @@ import ( "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/runtime/options" "github.com/iotaledger/hive.go/serializer/v2/serix" + "github.com/iotaledger/iota-core/pkg/core/account" "github.com/iotaledger/iota-core/pkg/model" "github.com/iotaledger/iota-core/pkg/protocol/engine/filter" iotago "github.com/iotaledger/iota.go/v4" @@ -68,6 +69,30 @@ func (t *TestFramework) IssueUnsignedBlockAtTime(alias string, issuingTime time. return t.processBlock(alias, block) } +func (t *TestFramework) IssueValidationBlockAtTime(alias string, issuingTime time.Time, validatorAccountID iotago.AccountID) error { + version := t.apiProvider.LatestAPI().ProtocolParameters().Version() + block, err := builder.NewValidationBlockBuilder(t.apiProvider.LatestAPI()). + StrongParents(iotago.BlockIDs{tpkg.RandBlockID()}). + HighestSupportedVersion(version). + Sign(validatorAccountID, tpkg.RandEd25519PrivateKey()). + IssuingTime(issuingTime). + Build() + require.NoError(t.Test, err) + + return t.processBlock(alias, block) +} + +func mockedCommitteeFunc(validatorAccountID iotago.AccountID) func(iotago.SlotIndex) *account.SeatedAccounts { + mockedAccounts := account.NewAccounts() + mockedAccounts.Set(validatorAccountID, new(account.Pool)) + seatedAccounts := account.NewSeatedAccounts(mockedAccounts) + seatedAccounts.Set(account.SeatIndex(0), validatorAccountID) + + return func(slotIndex iotago.SlotIndex) *account.SeatedAccounts { + return seatedAccounts + } +} + func TestFilter_WithMaxAllowedWallClockDrift(t *testing.T) { allowedDrift := 3 * time.Second @@ -92,3 +117,30 @@ func TestFilter_WithMaxAllowedWallClockDrift(t *testing.T) { require.NoError(t, tf.IssueUnsignedBlockAtTime("acceptedFuture", time.Now().Add(allowedDrift))) require.NoError(t, tf.IssueUnsignedBlockAtTime("tooFarAheadFuture", time.Now().Add(allowedDrift).Add(1*time.Second))) } + +func TestFilter_ValidationBlocks(t *testing.T) { + testAPI := tpkg.TestAPI + + tf := NewTestFramework(t, + api.SingleVersionProvider(testAPI), + ) + + validatorAccountID := tpkg.RandAccountID() + nonValidatorAccountID := tpkg.RandAccountID() + + tf.Filter.committeeFunc = mockedCommitteeFunc(validatorAccountID) + + tf.Filter.events.BlockPreAllowed.Hook(func(block *model.Block) { + require.Equal(t, "validator", block.ID().Alias()) + require.NotEqual(t, "nonValidator", block.ID().Alias()) + }) + + tf.Filter.events.BlockPreFiltered.Hook(func(event *filter.BlockPreFilteredEvent) { + require.NotEqual(t, "validator", event.Block.ID().Alias()) + require.Equal(t, "nonValidator", event.Block.ID().Alias()) + require.True(t, ierrors.Is(event.Reason, ErrValidatorNotInCommittee)) + }) + + require.NoError(t, tf.IssueValidationBlockAtTime("validator", time.Now(), validatorAccountID)) + require.NoError(t, tf.IssueValidationBlockAtTime("nonValidator", time.Now(), nonValidatorAccountID)) +} diff --git a/pkg/testsuite/testsuite.go b/pkg/testsuite/testsuite.go index 3432c3352..9b5095114 100644 --- a/pkg/testsuite/testsuite.go +++ b/pkg/testsuite/testsuite.go @@ -114,6 +114,7 @@ func NewTestSuite(testingT *testing.T, opts ...options.Option[TestSuite]) *TestS 10, 100, 100, + 100, ), iotago.WithTimeProviderOptions( time.Now().Truncate(10*time.Second).Unix()-t.optsGenesisTimestampOffset, @@ -135,8 +136,9 @@ func NewTestSuite(testingT *testing.T, opts ...options.Option[TestSuite]) *TestS t.optsSchedulerRate, t.optsMinMana, t.optsMaxBufferSize, + t.optsMaxBufferSize, ), - iotago.WithStakingOptions(1), + iotago.WithStakingOptions(1, 100, 1), ), ) diff --git a/tools/evil-spammer/go.mod b/tools/evil-spammer/go.mod index 5d457c436..43f8810ea 100644 --- a/tools/evil-spammer/go.mod +++ b/tools/evil-spammer/go.mod @@ -17,7 +17,7 @@ require ( github.com/iotaledger/hive.go/runtime v0.0.0-20230906114834-b50190b9f9c2 github.com/iotaledger/iota-core v0.0.0-00010101000000-000000000000 github.com/iotaledger/iota-core/tools/genesis-snapshot v0.0.0-00010101000000-000000000000 - github.com/iotaledger/iota.go/v4 v4.0.0-20230912141328-810f7e83d265 + github.com/iotaledger/iota.go/v4 v4.0.0-20230913143616-917572c7752d github.com/mr-tron/base58 v1.2.0 go.uber.org/atomic v1.11.0 ) diff --git a/tools/evil-spammer/go.sum b/tools/evil-spammer/go.sum index cad6309ec..08959c5c6 100644 --- a/tools/evil-spammer/go.sum +++ b/tools/evil-spammer/go.sum @@ -197,8 +197,8 @@ github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20230912111751-d84fba0 github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20230912111751-d84fba02bb7c/go.mod h1:IJgaaxbgKCsNat18jlJJEAxCY2oVYR3F30B+M4vJ89I= github.com/iotaledger/hive.go/stringify v0.0.0-20230906114834-b50190b9f9c2 h1:exATYMLT/d8fgMuVNO6kMDsFn9DUJEcyCuoBv9sP13g= github.com/iotaledger/hive.go/stringify v0.0.0-20230906114834-b50190b9f9c2/go.mod h1:FTo/UWzNYgnQ082GI9QVM9HFDERqf9rw9RivNpqrnTs= -github.com/iotaledger/iota.go/v4 v4.0.0-20230912141328-810f7e83d265 h1:0j8ljlBmo/f5Gxva83mLWqZLB/xSO9PgJFMPfJ7tyRY= -github.com/iotaledger/iota.go/v4 v4.0.0-20230912141328-810f7e83d265/go.mod h1:DWCa+mXRTGWBV0EHVuvToUxAEcICe2Pab9hBlxBamKo= +github.com/iotaledger/iota.go/v4 v4.0.0-20230913143616-917572c7752d h1:p9IchKq6kft758XDlnN/tAEXJMXGlmQPmbdxolba1gs= +github.com/iotaledger/iota.go/v4 v4.0.0-20230913143616-917572c7752d/go.mod h1:DWCa+mXRTGWBV0EHVuvToUxAEcICe2Pab9hBlxBamKo= github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s= github.com/ipfs/go-cid v0.4.1/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= diff --git a/tools/gendoc/go.mod b/tools/gendoc/go.mod index 4ae986ec8..bf2f8d4ba 100644 --- a/tools/gendoc/go.mod +++ b/tools/gendoc/go.mod @@ -72,7 +72,7 @@ require ( github.com/iotaledger/hive.go/stringify v0.0.0-20230906114834-b50190b9f9c2 // indirect github.com/iotaledger/inx-app v1.0.0-rc.3.0.20230829161228-3f4eb50a4d14 // indirect github.com/iotaledger/inx/go v1.0.0-rc.2.0.20230829160617-69b96c7c9f9b // indirect - github.com/iotaledger/iota.go/v4 v4.0.0-20230912141328-810f7e83d265 // indirect + github.com/iotaledger/iota.go/v4 v4.0.0-20230913143616-917572c7752d // indirect github.com/ipfs/boxo v0.10.0 // indirect github.com/ipfs/go-cid v0.4.1 // indirect github.com/ipfs/go-datastore v0.6.0 // indirect diff --git a/tools/gendoc/go.sum b/tools/gendoc/go.sum index 4ecbaaa4b..3a508dff0 100644 --- a/tools/gendoc/go.sum +++ b/tools/gendoc/go.sum @@ -311,8 +311,8 @@ github.com/iotaledger/inx-app v1.0.0-rc.3.0.20230829161228-3f4eb50a4d14 h1:BkDuQ github.com/iotaledger/inx-app v1.0.0-rc.3.0.20230829161228-3f4eb50a4d14/go.mod h1:ADBXzdHXTldP0NB2Vf+KbhDxkYciGRjzQVXT6Rdne1g= github.com/iotaledger/inx/go v1.0.0-rc.2.0.20230829160617-69b96c7c9f9b h1:EPB/+iWeSx/WgJlzaXl8yjinxuD8CCOdi2ZPMLeeMVY= github.com/iotaledger/inx/go v1.0.0-rc.2.0.20230829160617-69b96c7c9f9b/go.mod h1:B7gyJP6GshCSlEmY3CxEk5TZdsMs3UNz5U92hkFDdMs= -github.com/iotaledger/iota.go/v4 v4.0.0-20230912141328-810f7e83d265 h1:0j8ljlBmo/f5Gxva83mLWqZLB/xSO9PgJFMPfJ7tyRY= -github.com/iotaledger/iota.go/v4 v4.0.0-20230912141328-810f7e83d265/go.mod h1:DWCa+mXRTGWBV0EHVuvToUxAEcICe2Pab9hBlxBamKo= +github.com/iotaledger/iota.go/v4 v4.0.0-20230913143616-917572c7752d h1:p9IchKq6kft758XDlnN/tAEXJMXGlmQPmbdxolba1gs= +github.com/iotaledger/iota.go/v4 v4.0.0-20230913143616-917572c7752d/go.mod h1:DWCa+mXRTGWBV0EHVuvToUxAEcICe2Pab9hBlxBamKo= github.com/ipfs/boxo v0.10.0 h1:tdDAxq8jrsbRkYoF+5Rcqyeb91hgWe2hp7iLu7ORZLY= github.com/ipfs/boxo v0.10.0/go.mod h1:Fg+BnfxZ0RPzR0nOodzdIq3A7KgoWAOWsEIImrIQdBM= github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s= diff --git a/tools/genesis-snapshot/go.mod b/tools/genesis-snapshot/go.mod index 9095c1edb..8927fb9b5 100644 --- a/tools/genesis-snapshot/go.mod +++ b/tools/genesis-snapshot/go.mod @@ -10,7 +10,7 @@ require ( github.com/iotaledger/hive.go/lo v0.0.0-20230906114834-b50190b9f9c2 github.com/iotaledger/hive.go/runtime v0.0.0-20230906114834-b50190b9f9c2 github.com/iotaledger/iota-core v0.0.0-00010101000000-000000000000 - github.com/iotaledger/iota.go/v4 v4.0.0-20230912141328-810f7e83d265 + github.com/iotaledger/iota.go/v4 v4.0.0-20230913143616-917572c7752d github.com/mr-tron/base58 v1.2.0 github.com/spf13/pflag v1.0.5 golang.org/x/crypto v0.13.0 diff --git a/tools/genesis-snapshot/go.sum b/tools/genesis-snapshot/go.sum index 5c3c52d0e..eab46e060 100644 --- a/tools/genesis-snapshot/go.sum +++ b/tools/genesis-snapshot/go.sum @@ -50,8 +50,8 @@ github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20230912111751-d84fba0 github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20230912111751-d84fba02bb7c/go.mod h1:IJgaaxbgKCsNat18jlJJEAxCY2oVYR3F30B+M4vJ89I= github.com/iotaledger/hive.go/stringify v0.0.0-20230906114834-b50190b9f9c2 h1:exATYMLT/d8fgMuVNO6kMDsFn9DUJEcyCuoBv9sP13g= github.com/iotaledger/hive.go/stringify v0.0.0-20230906114834-b50190b9f9c2/go.mod h1:FTo/UWzNYgnQ082GI9QVM9HFDERqf9rw9RivNpqrnTs= -github.com/iotaledger/iota.go/v4 v4.0.0-20230912141328-810f7e83d265 h1:0j8ljlBmo/f5Gxva83mLWqZLB/xSO9PgJFMPfJ7tyRY= -github.com/iotaledger/iota.go/v4 v4.0.0-20230912141328-810f7e83d265/go.mod h1:DWCa+mXRTGWBV0EHVuvToUxAEcICe2Pab9hBlxBamKo= +github.com/iotaledger/iota.go/v4 v4.0.0-20230913143616-917572c7752d h1:p9IchKq6kft758XDlnN/tAEXJMXGlmQPmbdxolba1gs= +github.com/iotaledger/iota.go/v4 v4.0.0-20230913143616-917572c7752d/go.mod h1:DWCa+mXRTGWBV0EHVuvToUxAEcICe2Pab9hBlxBamKo= github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s= github.com/ipfs/go-cid v0.4.1/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk= github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= diff --git a/tools/genesis-snapshot/presets/presets.go b/tools/genesis-snapshot/presets/presets.go index 55b143742..6115b4203 100644 --- a/tools/genesis-snapshot/presets/presets.go +++ b/tools/genesis-snapshot/presets/presets.go @@ -22,11 +22,11 @@ var Base = []options.Option[snapshotcreator.Options]{ snapshotcreator.WithProtocolParameters( iotago.NewV3ProtocolParameters( iotago.WithNetworkOptions("default", "rms"), - iotago.WithSupplyOptions(10_000_000_000, 100, 1, 10, 100, 100), + iotago.WithSupplyOptions(10_000_000_000, 100, 1, 10, 100, 100, 100), iotago.WithTimeProviderOptions(time.Now().Unix(), 10, 13), iotago.WithLivenessOptions(5, 7, 14, 30), // increase/decrease threshold = fraction * slotDurationInSeconds * schedulerRate - iotago.WithCongestionControlOptions(500, 500, 500, 800000, 500000, 100000, 1, 100*iotago.MaxBlockSize), + iotago.WithCongestionControlOptions(500, 500, 500, 800000, 500000, 100000, 1, 1000, 100), iotago.WithWorkScoreOptions(25, 1, 10, 100, 50, 10, 10, 50, 1, 10, 250, 2), ), ), @@ -96,11 +96,11 @@ var Docker = []options.Option[snapshotcreator.Options]{ snapshotcreator.WithProtocolParameters( iotago.NewV3ProtocolParameters( iotago.WithNetworkOptions("docker", "rms"), - iotago.WithSupplyOptions(10_000_000_000, 1, 1, 10, 100, 100), + iotago.WithSupplyOptions(10_000_000_000, 1, 1, 10, 100, 100, 100), iotago.WithTimeProviderOptions(time.Now().Unix(), 10, 13), iotago.WithLivenessOptions(5, 7, 14, 30), // increase/decrease threshold = fraction * slotDurationInSeconds * schedulerRate - iotago.WithCongestionControlOptions(500, 500, 500, 800000, 500000, 100000, 1, 100*iotago.MaxBlockSize), + iotago.WithCongestionControlOptions(500, 500, 500, 800000, 500000, 100000, 1, 1000, 100), iotago.WithWorkScoreOptions(25, 1, 10, 100, 50, 10, 10, 50, 1, 10, 250, 2), ), ), @@ -150,11 +150,11 @@ var Feature = []options.Option[snapshotcreator.Options]{ snapshotcreator.WithProtocolParameters( iotago.NewV3ProtocolParameters( iotago.WithNetworkOptions("feature", "rms"), - iotago.WithSupplyOptions(10_000_000_000, 100, 1, 10, 100, 100), + iotago.WithSupplyOptions(10_000_000_000, 100, 1, 10, 100, 100, 100), iotago.WithTimeProviderOptions(1689848996, 10, 13), iotago.WithLivenessOptions(5, 10, 20, 30), // increase/decrease threshold = fraction * slotDurationInSeconds * schedulerRate - iotago.WithCongestionControlOptions(500, 500, 500, 800000, 500000, 100000, 1, 100*iotago.MaxBlockSize), + iotago.WithCongestionControlOptions(500, 500, 500, 800000, 500000, 100000, 1, 1000, 100), iotago.WithWorkScoreOptions(25, 1, 10, 100, 50, 10, 10, 50, 1, 10, 250, 2), ), ),