diff --git a/components/debugapi/commitment.go b/components/debugapi/commitment.go index 4319459c4..815d5319a 100644 --- a/components/debugapi/commitment.go +++ b/components/debugapi/commitment.go @@ -95,9 +95,17 @@ func prepareCommitmentGraph(g *graphviz.Graphviz, rootCommitment *protocol.Commi } func createNode(graph *cgraph.Graph, commitment *protocol.Commitment) (*cgraph.Node, error) { - node, err := graph.Node(fmt.Sprintf("%d: %s", commitment.ID().Slot(), commitment.ID().String()[:8])) + node, err := graph.CreateNode(fmt.Sprintf("%d-%s", commitment.ID().Slot(), commitment.ID().Identifier().String()[:8])) if err != nil { - return nil, ierrors.Wrapf(err, "could not create node %s", commitment.ID().String()[:8]) + return nil, ierrors.Wrapf(err, "could not retrieve node %s", commitment.ID().Identifier().String()[:8]) + } + if node != nil { + return node, nil + } + + node, err = graph.CreateNode(fmt.Sprintf("%d-%s", commitment.ID().Slot(), commitment.ID().Identifier().String()[:8])) + if err != nil { + return nil, ierrors.Wrapf(err, "could not create node %s", commitment.ID().Identifier().String()[:8]) } return node, nil diff --git a/components/inx/server_commitments.go b/components/inx/server_commitments.go index 1e8fc3961..fafb7445e 100644 --- a/components/inx/server_commitments.go +++ b/components/inx/server_commitments.go @@ -8,6 +8,7 @@ import ( "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/kvstore" + "github.com/iotaledger/hive.go/runtime/event" "github.com/iotaledger/hive.go/runtime/workerpool" inx "github.com/iotaledger/inx/go" "github.com/iotaledger/iota-core/pkg/model" @@ -142,7 +143,7 @@ func (s *Server) ListenToCommitments(req *inx.SlotRangeRequest, srv inx.INX_List case done: cancel() } - }).Unhook + }, event.WithWorkerPool(wp)).Unhook <-ctx.Done() unhook() diff --git a/components/inx/server_utxo.go b/components/inx/server_utxo.go index fd815e01a..8724f5f90 100644 --- a/components/inx/server_utxo.go +++ b/components/inx/server_utxo.go @@ -346,7 +346,7 @@ func (s *Server) ListenToLedgerUpdates(req *inx.SlotRangeRequest, srv inx.INX_Li case done: cancel() } - }).Unhook + }, event.WithWorkerPool(wp)).Unhook <-ctx.Done() unhook() diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go b/pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go index 40207e998..9bdafe155 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go @@ -5,6 +5,8 @@ import ( "math" "time" + "go.uber.org/atomic" + "github.com/iotaledger/hive.go/ds/shrinkingmap" "github.com/iotaledger/hive.go/lo" "github.com/iotaledger/iota-core/pkg/protocol/engine/blocks" @@ -19,7 +21,7 @@ type BufferQueue struct { activeIssuers *shrinkingmap.ShrinkingMap[iotago.AccountID, *ring.Ring] ring *ring.Ring // size is the number of blocks in the buffer. - size int + size atomic.Int64 tokenBucket float64 lastScheduleTime time.Time @@ -57,7 +59,7 @@ func (b *BufferQueue) Clear() { // Size returns the total number of blocks in BufferQueue. func (b *BufferQueue) Size() int { - return b.size + return int(b.size.Load()) } // IssuerQueue returns the queue for the corresponding issuer. @@ -125,7 +127,7 @@ func (b *BufferQueue) RemoveIssuerQueue(issuerID iotago.AccountID) { if !isIQ { panic("buffer contains elements that are not issuer queues") } - b.size -= issuerQueue.Size() + b.size.Sub(int64(issuerQueue.Size())) b.ringRemove(element) b.activeIssuers.Delete(issuerID) @@ -156,7 +158,7 @@ func (b *BufferQueue) Submit(blk *blocks.Block, issuerQueue *IssuerQueue, quantu return nil, false } - b.size++ + b.size.Inc() // if max buffer size exceeded, drop from tail of the longest mana-scaled queue if b.Size() > maxBuffer { @@ -180,7 +182,7 @@ func (b *BufferQueue) Unsubmit(block *blocks.Block) bool { return false } - b.size-- + b.size.Dec() return true } @@ -268,7 +270,7 @@ func (b *BufferQueue) PopFront() *blocks.Block { } - b.size-- + b.size.Dec() return block } @@ -298,7 +300,7 @@ func (b *BufferQueue) dropTail(quantumFunc func(iotago.AccountID) Deficit, maxBu maxIssuerID := b.longestQueueIssuerID(quantumFunc) if longestQueue := b.IssuerQueue(maxIssuerID); longestQueue != nil { if tail := longestQueue.RemoveTail(); tail != nil { - b.size-- + b.size.Dec() droppedBlocks = append(droppedBlocks, tail) } } diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go b/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go index 3bc5ac351..cb9a1da1c 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go @@ -171,25 +171,16 @@ func (s *Scheduler) Start() { // IssuerQueueBlockCount returns the number of blocks in the queue of the given issuer. func (s *Scheduler) IssuerQueueBlockCount(issuerID iotago.AccountID) int { - s.bufferMutex.RLock() - defer s.bufferMutex.RUnlock() - return s.basicBuffer.IssuerQueueBlockCount(issuerID) } // IssuerQueueWork returns the queue size of the given issuer in work units. func (s *Scheduler) IssuerQueueWork(issuerID iotago.AccountID) iotago.WorkScore { - s.bufferMutex.RLock() - defer s.bufferMutex.RUnlock() - return s.basicBuffer.IssuerQueueWork(issuerID) } // ValidatorQueueBlockCount returns the number of validation blocks in the validator queue of the given issuer. func (s *Scheduler) ValidatorQueueBlockCount(issuerID iotago.AccountID) int { - s.bufferMutex.RLock() - defer s.bufferMutex.RUnlock() - validatorQueue, exists := s.validatorBuffer.Get(issuerID) if !exists { return 0 @@ -198,18 +189,12 @@ func (s *Scheduler) ValidatorQueueBlockCount(issuerID iotago.AccountID) int { return validatorQueue.Size() } -// BufferSize returns the current buffer size of the Scheduler as block count. +// BasicBufferSize returns the current buffer size of the Scheduler as block count. func (s *Scheduler) BasicBufferSize() int { - s.bufferMutex.RLock() - defer s.bufferMutex.RUnlock() - return s.basicBuffer.Size() } func (s *Scheduler) ValidatorBufferSize() int { - s.bufferMutex.RLock() - defer s.bufferMutex.RUnlock() - return s.validatorBuffer.Size() } @@ -418,7 +403,7 @@ func (s *Scheduler) selectBlockToScheduleWithLocking() { s.validatorBuffer.buffer.ForEach(func(accountID iotago.AccountID, validatorQueue *ValidatorQueue) bool { if s.selectValidationBlockWithoutLocking(validatorQueue) { - s.validatorBuffer.size-- + s.validatorBuffer.size.Dec() } return true diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/drr/validatorqueue.go b/pkg/protocol/engine/congestioncontrol/scheduler/drr/validatorqueue.go index 56542a0c8..f327ae556 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/drr/validatorqueue.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/drr/validatorqueue.go @@ -176,7 +176,7 @@ func (q *ValidatorQueue) deductTokens(tokens float64) { type ValidatorBuffer struct { buffer *shrinkingmap.ShrinkingMap[iotago.AccountID, *ValidatorQueue] - size int + size atomic.Int64 } func NewValidatorBuffer() *ValidatorBuffer { @@ -190,7 +190,7 @@ func (b *ValidatorBuffer) Size() int { return 0 } - return b.size + return int(b.size.Load()) } func (b *ValidatorBuffer) Get(accountID iotago.AccountID) (*ValidatorQueue, bool) { @@ -208,10 +208,10 @@ func (b *ValidatorBuffer) Submit(block *blocks.Block, maxBuffer int) (*blocks.Bl } droppedBlock, submitted := validatorQueue.Submit(block, maxBuffer) if submitted { - b.size++ + b.size.Inc() } if droppedBlock != nil { - b.size-- + b.size.Dec() } return droppedBlock, submitted @@ -222,7 +222,7 @@ func (b *ValidatorBuffer) Delete(accountID iotago.AccountID) { if !exists { return } - b.size -= validatorQueue.Size() + b.size.Sub(int64(validatorQueue.Size())) b.buffer.Delete(accountID) } diff --git a/pkg/storage/permanent/settings.go b/pkg/storage/permanent/settings.go index a9bf979b2..96c89497b 100644 --- a/pkg/storage/permanent/settings.go +++ b/pkg/storage/permanent/settings.go @@ -30,13 +30,14 @@ const ( ) type Settings struct { - mutex syncutils.RWMutex store kvstore.KVStore storeSnapshotImported *kvstore.TypedValue[bool] storeLatestCommitment *kvstore.TypedValue[*model.Commitment] storeLatestFinalizedSlot *kvstore.TypedValue[iotago.SlotIndex] storeLatestProcessedSlot *kvstore.TypedValue[iotago.SlotIndex] storeLatestIssuedValidationBlock *kvstore.TypedValue[*model.Block] + + mutex syncutils.RWMutex storeProtocolVersionEpochMapping *kvstore.TypedStore[iotago.Version, iotago.EpochIndex] storeFutureProtocolParameters *kvstore.TypedStore[iotago.Version, *types.Tuple[iotago.EpochIndex, iotago.Identifier]] storeProtocolParameters *kvstore.TypedStore[iotago.Version, iotago.ProtocolParameters] @@ -235,30 +236,18 @@ func (s *Settings) StoreFutureProtocolParametersHash(version iotago.Version, has } func (s *Settings) IsSnapshotImported() bool { - s.mutex.RLock() - defer s.mutex.RUnlock() - return lo.PanicOnErr(s.storeSnapshotImported.Has()) } func (s *Settings) SetSnapshotImported() (err error) { - s.mutex.Lock() - defer s.mutex.Unlock() - return s.storeSnapshotImported.Set(true) } func (s *Settings) LatestCommitment() *model.Commitment { - s.mutex.RLock() - defer s.mutex.RUnlock() - return s.latestCommitment() } func (s *Settings) SetLatestCommitment(latestCommitment *model.Commitment) (err error) { - s.mutex.Lock() - defer s.mutex.Unlock() - s.apiProvider.SetCommittedSlot(latestCommitment.Slot()) // Delete the old future protocol parameters if they exist. @@ -280,13 +269,6 @@ func (s *Settings) latestCommitment() *model.Commitment { return commitment } -func (s *Settings) LatestFinalizedSlot() iotago.SlotIndex { - s.mutex.RLock() - defer s.mutex.RUnlock() - - return s.latestFinalizedSlot() -} - func (s *Settings) SetLatestFinalizedSlot(slot iotago.SlotIndex) (err error) { s.mutex.Lock() defer s.mutex.Unlock() @@ -294,7 +276,7 @@ func (s *Settings) SetLatestFinalizedSlot(slot iotago.SlotIndex) (err error) { return s.storeLatestFinalizedSlot.Set(slot) } -func (s *Settings) latestFinalizedSlot() iotago.SlotIndex { +func (s *Settings) LatestFinalizedSlot() iotago.SlotIndex { latestFinalizedSlot, err := s.storeLatestFinalizedSlot.Get() if err != nil { if ierrors.Is(err, kvstore.ErrKeyNotFound) { @@ -307,22 +289,19 @@ func (s *Settings) latestFinalizedSlot() iotago.SlotIndex { } func (s *Settings) LatestStoredSlot() iotago.SlotIndex { - s.mutex.RLock() - defer s.mutex.RUnlock() - return read(s.storeLatestProcessedSlot) } func (s *Settings) SetLatestStoredSlot(slot iotago.SlotIndex) (err error) { - s.mutex.Lock() - defer s.mutex.Unlock() - return s.storeLatestProcessedSlot.Set(slot) } func (s *Settings) AdvanceLatestStoredSlot(slot iotago.SlotIndex) (err error) { - s.mutex.Lock() - defer s.mutex.Unlock() + // We don't need to advance the latest stored slot if it's already ahead of the given slot. + // We check this before Compute to avoid contention inside the TypedValue. + if s.LatestStoredSlot() >= slot { + return nil + } if _, err = s.storeLatestProcessedSlot.Compute(func(latestStoredSlot iotago.SlotIndex, _ bool) (newValue iotago.SlotIndex, err error) { if latestStoredSlot >= slot { @@ -338,9 +317,6 @@ func (s *Settings) AdvanceLatestStoredSlot(slot iotago.SlotIndex) (err error) { } func (s *Settings) LatestIssuedValidationBlock() *model.Block { - s.mutex.RLock() - defer s.mutex.RUnlock() - return read(s.storeLatestIssuedValidationBlock) }