Skip to content

Commit

Permalink
Merge pull request #608 from iotaledger/feat/reduce-contention
Browse files Browse the repository at this point in the history
Reduce contention
  • Loading branch information
karimodm authored Dec 11, 2023
2 parents fce8007 + dc1abd0 commit abf24f2
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 65 deletions.
12 changes: 10 additions & 2 deletions components/debugapi/commitment.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,17 @@ func prepareCommitmentGraph(g *graphviz.Graphviz, rootCommitment *protocol.Commi
}

func createNode(graph *cgraph.Graph, commitment *protocol.Commitment) (*cgraph.Node, error) {
node, err := graph.Node(fmt.Sprintf("%d: %s", commitment.ID().Slot(), commitment.ID().String()[:8]))
node, err := graph.CreateNode(fmt.Sprintf("%d-%s", commitment.ID().Slot(), commitment.ID().Identifier().String()[:8]))
if err != nil {
return nil, ierrors.Wrapf(err, "could not create node %s", commitment.ID().String()[:8])
return nil, ierrors.Wrapf(err, "could not retrieve node %s", commitment.ID().Identifier().String()[:8])
}
if node != nil {
return node, nil
}

node, err = graph.CreateNode(fmt.Sprintf("%d-%s", commitment.ID().Slot(), commitment.ID().Identifier().String()[:8]))
if err != nil {
return nil, ierrors.Wrapf(err, "could not create node %s", commitment.ID().Identifier().String()[:8])
}

return node, nil
Expand Down
3 changes: 2 additions & 1 deletion components/inx/server_commitments.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/kvstore"
"github.com/iotaledger/hive.go/runtime/event"
"github.com/iotaledger/hive.go/runtime/workerpool"
inx "github.com/iotaledger/inx/go"
"github.com/iotaledger/iota-core/pkg/model"
Expand Down Expand Up @@ -142,7 +143,7 @@ func (s *Server) ListenToCommitments(req *inx.SlotRangeRequest, srv inx.INX_List
case done:
cancel()
}
}).Unhook
}, event.WithWorkerPool(wp)).Unhook

<-ctx.Done()
unhook()
Expand Down
2 changes: 1 addition & 1 deletion components/inx/server_utxo.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (s *Server) ListenToLedgerUpdates(req *inx.SlotRangeRequest, srv inx.INX_Li
case done:
cancel()
}
}).Unhook
}, event.WithWorkerPool(wp)).Unhook

<-ctx.Done()
unhook()
Expand Down
16 changes: 9 additions & 7 deletions pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"math"
"time"

"go.uber.org/atomic"

"github.com/iotaledger/hive.go/ds/shrinkingmap"
"github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/iota-core/pkg/protocol/engine/blocks"
Expand All @@ -19,7 +21,7 @@ type BufferQueue struct {
activeIssuers *shrinkingmap.ShrinkingMap[iotago.AccountID, *ring.Ring]
ring *ring.Ring
// size is the number of blocks in the buffer.
size int
size atomic.Int64

tokenBucket float64
lastScheduleTime time.Time
Expand Down Expand Up @@ -57,7 +59,7 @@ func (b *BufferQueue) Clear() {

// Size returns the total number of blocks in BufferQueue.
func (b *BufferQueue) Size() int {
return b.size
return int(b.size.Load())
}

// IssuerQueue returns the queue for the corresponding issuer.
Expand Down Expand Up @@ -125,7 +127,7 @@ func (b *BufferQueue) RemoveIssuerQueue(issuerID iotago.AccountID) {
if !isIQ {
panic("buffer contains elements that are not issuer queues")
}
b.size -= issuerQueue.Size()
b.size.Sub(int64(issuerQueue.Size()))

b.ringRemove(element)
b.activeIssuers.Delete(issuerID)
Expand Down Expand Up @@ -156,7 +158,7 @@ func (b *BufferQueue) Submit(blk *blocks.Block, issuerQueue *IssuerQueue, quantu
return nil, false
}

b.size++
b.size.Inc()

// if max buffer size exceeded, drop from tail of the longest mana-scaled queue
if b.Size() > maxBuffer {
Expand All @@ -180,7 +182,7 @@ func (b *BufferQueue) Unsubmit(block *blocks.Block) bool {
return false
}

b.size--
b.size.Dec()

return true
}
Expand Down Expand Up @@ -268,7 +270,7 @@ func (b *BufferQueue) PopFront() *blocks.Block {

}

b.size--
b.size.Dec()

return block
}
Expand Down Expand Up @@ -298,7 +300,7 @@ func (b *BufferQueue) dropTail(quantumFunc func(iotago.AccountID) Deficit, maxBu
maxIssuerID := b.longestQueueIssuerID(quantumFunc)
if longestQueue := b.IssuerQueue(maxIssuerID); longestQueue != nil {
if tail := longestQueue.RemoveTail(); tail != nil {
b.size--
b.size.Dec()
droppedBlocks = append(droppedBlocks, tail)
}
}
Expand Down
19 changes: 2 additions & 17 deletions pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,25 +171,16 @@ func (s *Scheduler) Start() {

// IssuerQueueBlockCount returns the number of blocks in the queue of the given issuer.
func (s *Scheduler) IssuerQueueBlockCount(issuerID iotago.AccountID) int {
s.bufferMutex.RLock()
defer s.bufferMutex.RUnlock()

return s.basicBuffer.IssuerQueueBlockCount(issuerID)
}

// IssuerQueueWork returns the queue size of the given issuer in work units.
func (s *Scheduler) IssuerQueueWork(issuerID iotago.AccountID) iotago.WorkScore {
s.bufferMutex.RLock()
defer s.bufferMutex.RUnlock()

return s.basicBuffer.IssuerQueueWork(issuerID)
}

// ValidatorQueueBlockCount returns the number of validation blocks in the validator queue of the given issuer.
func (s *Scheduler) ValidatorQueueBlockCount(issuerID iotago.AccountID) int {
s.bufferMutex.RLock()
defer s.bufferMutex.RUnlock()

validatorQueue, exists := s.validatorBuffer.Get(issuerID)
if !exists {
return 0
Expand All @@ -198,18 +189,12 @@ func (s *Scheduler) ValidatorQueueBlockCount(issuerID iotago.AccountID) int {
return validatorQueue.Size()
}

// BufferSize returns the current buffer size of the Scheduler as block count.
// BasicBufferSize returns the current buffer size of the Scheduler as block count.
func (s *Scheduler) BasicBufferSize() int {
s.bufferMutex.RLock()
defer s.bufferMutex.RUnlock()

return s.basicBuffer.Size()
}

func (s *Scheduler) ValidatorBufferSize() int {
s.bufferMutex.RLock()
defer s.bufferMutex.RUnlock()

return s.validatorBuffer.Size()
}

Expand Down Expand Up @@ -418,7 +403,7 @@ func (s *Scheduler) selectBlockToScheduleWithLocking() {

s.validatorBuffer.buffer.ForEach(func(accountID iotago.AccountID, validatorQueue *ValidatorQueue) bool {
if s.selectValidationBlockWithoutLocking(validatorQueue) {
s.validatorBuffer.size--
s.validatorBuffer.size.Dec()
}

return true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (q *ValidatorQueue) deductTokens(tokens float64) {

type ValidatorBuffer struct {
buffer *shrinkingmap.ShrinkingMap[iotago.AccountID, *ValidatorQueue]
size int
size atomic.Int64
}

func NewValidatorBuffer() *ValidatorBuffer {
Expand All @@ -190,7 +190,7 @@ func (b *ValidatorBuffer) Size() int {
return 0
}

return b.size
return int(b.size.Load())
}

func (b *ValidatorBuffer) Get(accountID iotago.AccountID) (*ValidatorQueue, bool) {
Expand All @@ -208,10 +208,10 @@ func (b *ValidatorBuffer) Submit(block *blocks.Block, maxBuffer int) (*blocks.Bl
}
droppedBlock, submitted := validatorQueue.Submit(block, maxBuffer)
if submitted {
b.size++
b.size.Inc()
}
if droppedBlock != nil {
b.size--
b.size.Dec()
}

return droppedBlock, submitted
Expand All @@ -222,7 +222,7 @@ func (b *ValidatorBuffer) Delete(accountID iotago.AccountID) {
if !exists {
return
}
b.size -= validatorQueue.Size()
b.size.Sub(int64(validatorQueue.Size()))

b.buffer.Delete(accountID)
}
Expand Down
40 changes: 8 additions & 32 deletions pkg/storage/permanent/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ const (
)

type Settings struct {
mutex syncutils.RWMutex
store kvstore.KVStore
storeSnapshotImported *kvstore.TypedValue[bool]
storeLatestCommitment *kvstore.TypedValue[*model.Commitment]
storeLatestFinalizedSlot *kvstore.TypedValue[iotago.SlotIndex]
storeLatestProcessedSlot *kvstore.TypedValue[iotago.SlotIndex]
storeLatestIssuedValidationBlock *kvstore.TypedValue[*model.Block]

mutex syncutils.RWMutex
storeProtocolVersionEpochMapping *kvstore.TypedStore[iotago.Version, iotago.EpochIndex]
storeFutureProtocolParameters *kvstore.TypedStore[iotago.Version, *types.Tuple[iotago.EpochIndex, iotago.Identifier]]
storeProtocolParameters *kvstore.TypedStore[iotago.Version, iotago.ProtocolParameters]
Expand Down Expand Up @@ -235,30 +236,18 @@ func (s *Settings) StoreFutureProtocolParametersHash(version iotago.Version, has
}

func (s *Settings) IsSnapshotImported() bool {
s.mutex.RLock()
defer s.mutex.RUnlock()

return lo.PanicOnErr(s.storeSnapshotImported.Has())
}

func (s *Settings) SetSnapshotImported() (err error) {
s.mutex.Lock()
defer s.mutex.Unlock()

return s.storeSnapshotImported.Set(true)
}

func (s *Settings) LatestCommitment() *model.Commitment {
s.mutex.RLock()
defer s.mutex.RUnlock()

return s.latestCommitment()
}

func (s *Settings) SetLatestCommitment(latestCommitment *model.Commitment) (err error) {
s.mutex.Lock()
defer s.mutex.Unlock()

s.apiProvider.SetCommittedSlot(latestCommitment.Slot())

// Delete the old future protocol parameters if they exist.
Expand All @@ -280,21 +269,14 @@ func (s *Settings) latestCommitment() *model.Commitment {
return commitment
}

func (s *Settings) LatestFinalizedSlot() iotago.SlotIndex {
s.mutex.RLock()
defer s.mutex.RUnlock()

return s.latestFinalizedSlot()
}

func (s *Settings) SetLatestFinalizedSlot(slot iotago.SlotIndex) (err error) {
s.mutex.Lock()
defer s.mutex.Unlock()

return s.storeLatestFinalizedSlot.Set(slot)
}

func (s *Settings) latestFinalizedSlot() iotago.SlotIndex {
func (s *Settings) LatestFinalizedSlot() iotago.SlotIndex {
latestFinalizedSlot, err := s.storeLatestFinalizedSlot.Get()
if err != nil {
if ierrors.Is(err, kvstore.ErrKeyNotFound) {
Expand All @@ -307,22 +289,19 @@ func (s *Settings) latestFinalizedSlot() iotago.SlotIndex {
}

func (s *Settings) LatestStoredSlot() iotago.SlotIndex {
s.mutex.RLock()
defer s.mutex.RUnlock()

return read(s.storeLatestProcessedSlot)
}

func (s *Settings) SetLatestStoredSlot(slot iotago.SlotIndex) (err error) {
s.mutex.Lock()
defer s.mutex.Unlock()

return s.storeLatestProcessedSlot.Set(slot)
}

func (s *Settings) AdvanceLatestStoredSlot(slot iotago.SlotIndex) (err error) {
s.mutex.Lock()
defer s.mutex.Unlock()
// We don't need to advance the latest stored slot if it's already ahead of the given slot.
// We check this before Compute to avoid contention inside the TypedValue.
if s.LatestStoredSlot() >= slot {
return nil
}

if _, err = s.storeLatestProcessedSlot.Compute(func(latestStoredSlot iotago.SlotIndex, _ bool) (newValue iotago.SlotIndex, err error) {
if latestStoredSlot >= slot {
Expand All @@ -338,9 +317,6 @@ func (s *Settings) AdvanceLatestStoredSlot(slot iotago.SlotIndex) (err error) {
}

func (s *Settings) LatestIssuedValidationBlock() *model.Block {
s.mutex.RLock()
defer s.mutex.RUnlock()

return read(s.storeLatestIssuedValidationBlock)
}

Expand Down

0 comments on commit abf24f2

Please sign in to comment.