diff --git a/pkg/network/p2p/neighbor.go b/pkg/network/p2p/neighbor.go index 56511270d..a265481c6 100644 --- a/pkg/network/p2p/neighbor.go +++ b/pkg/network/p2p/neighbor.go @@ -3,6 +3,7 @@ package p2p import ( "context" "sync" + "sync/atomic" "time" "github.com/libp2p/go-libp2p/core/protocol" @@ -15,7 +16,8 @@ import ( ) const ( - NeighborsSendQueueSize = 20_000 + NeighborsSendQueueSize = 20_000 + DroppedPacketDisconnectThreshold = 100 ) type queuedPacket struct { @@ -31,7 +33,8 @@ type ( // neighbor describes the established p2p connection to another peer. type neighbor struct { - peer *network.Peer + peer *network.Peer + droppedPacketCounter atomic.Uint32 logger log.Logger @@ -84,7 +87,12 @@ func (n *neighbor) Peer() *network.Peer { func (n *neighbor) Enqueue(packet proto.Message, protocolID protocol.ID) { select { case n.sendQueue <- &queuedPacket{protocolID: protocolID, packet: packet}: + n.droppedPacketCounter.Store(0) default: + // Drop a neighbor that does not read from the full queue. + if n.droppedPacketCounter.Add(1) >= DroppedPacketDisconnectThreshold { + n.Close() + } n.logger.LogWarn("Dropped packet due to SendQueue being full") } } diff --git a/pkg/protocol/engine/blockdag/events.go b/pkg/protocol/engine/blockdag/events.go index ef95f5f70..d6ccd6223 100644 --- a/pkg/protocol/engine/blockdag/events.go +++ b/pkg/protocol/engine/blockdag/events.go @@ -3,6 +3,7 @@ package blockdag import ( "github.com/iotaledger/hive.go/runtime/event" "github.com/iotaledger/iota-core/pkg/protocol/engine/blocks" + iotago "github.com/iotaledger/iota.go/v4" ) // Events is a collection of Tangle related Events. @@ -19,6 +20,9 @@ type Events struct { // MissingBlockAppended is triggered when a previously missing Block was appended. MissingBlockAppended *event.Event1[*blocks.Block] + // BlockNotAppended is triggered when an incoming Block could not be successfully appended. + BlockNotAppended *event.Event1[iotago.BlockID] + // BlockInvalid is triggered when a Block is found to be invalid. BlockInvalid *event.Event2[*blocks.Block, error] @@ -32,6 +36,7 @@ var NewEvents = event.CreateGroupConstructor(func() (newEvents *Events) { BlockSolid: event.New1[*blocks.Block](), BlockMissing: event.New1[*blocks.Block](), MissingBlockAppended: event.New1[*blocks.Block](), + BlockNotAppended: event.New1[iotago.BlockID](), BlockInvalid: event.New2[*blocks.Block, error](), } }) diff --git a/pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go b/pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go index 0067c0a97..c20df6f91 100644 --- a/pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go +++ b/pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go @@ -47,6 +47,7 @@ func NewProvider(opts ...options.Option[BlockDAG]) module.Provider[*engine.Engin e.Events.PreSolidFilter.BlockPreAllowed.Hook(func(block *model.Block) { if _, _, err := b.Append(block); err != nil { + b.events.BlockNotAppended.Trigger(block.ID()) b.LogError("failed to append block", "blockID", block.ID(), "issuer", block.ProtocolBlock().Header.IssuerID, "err", err) } }, event.WithWorkerPool(wp)) diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/drr/basicbuffer.go b/pkg/protocol/engine/congestioncontrol/scheduler/drr/basicbuffer.go index dac3020dc..aa7166d32 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/drr/basicbuffer.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/drr/basicbuffer.go @@ -371,7 +371,7 @@ func (b *BasicBuffer) ringInsert(v interface{}) *ring.Ring { func (b *BasicBuffer) waitTime(rate float64, block *blocks.Block) time.Duration { tokensRequired := float64(block.WorkScore()) - (b.tokenBucket + rate*time.Since(b.lastScheduleTime).Seconds()) - return lo.Max(0, time.Duration(tokensRequired/rate)) + return lo.Max(0, time.Duration(tokensRequired/rate)*time.Second) } func (b *BasicBuffer) updateTokenBucket(rate float64, tokenBucketSize float64) { diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go b/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go index 6d576ab90..0ee6afcee 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go @@ -370,10 +370,11 @@ loop: case blockToSchedule = <-s.basicBuffer.blockChan: currentAPI := s.apiProvider.CommittedAPI() rate := currentAPI.ProtocolParameters().CongestionControlParameters().SchedulerRate - if waitTime := s.basicBuffer.waitTime(float64(rate), blockToSchedule); waitTime > 0 { + for waitTime := s.basicBuffer.waitTime(float64(rate), blockToSchedule); waitTime > 0; { timer := time.NewTimer(waitTime) <-timer.C } + s.basicBuffer.updateTokenBucket(float64(rate), float64(currentAPI.MaxBlockWork())) s.scheduleBasicBlock(blockToSchedule) diff --git a/pkg/protocol/engine/engine.go b/pkg/protocol/engine/engine.go index ca993ed0c..4f5c6f47b 100644 --- a/pkg/protocol/engine/engine.go +++ b/pkg/protocol/engine/engine.go @@ -585,15 +585,23 @@ func (e *Engine) setupEvictionState() { func (e *Engine) setupBlockRequester() { e.Events.BlockRequester.LinkTo(e.BlockRequester.Events) - + wp := e.Workers.CreatePool("BlockRequester", workerpool.WithWorkerCount(1)) // Using just 1 worker to avoid contention // We need to hook to make sure that the request is created before the block arrives to avoid a race condition // where we try to delete the request again before it is created. Thus, continuing to request forever. e.Events.BlockDAG.BlockMissing.Hook(func(block *blocks.Block) { e.BlockRequester.StartTicker(block.ID()) }) + e.Events.BlockDAG.MissingBlockAppended.Hook(func(block *blocks.Block) { e.BlockRequester.StopTicker(block.ID()) - }, event.WithWorkerPool(e.Workers.CreatePool("BlockRequester", workerpool.WithWorkerCount(1)))) // Using just 1 worker to avoid contention + }, event.WithWorkerPool(wp)) + + // Remove the block from the ticker if it failed to be appended. + // It's executed for all blocks to avoid locking twice: + // once to check if the block has the ticker and then again to remove it. + e.Events.BlockDAG.BlockNotAppended.Hook(func(blockID iotago.BlockID) { + e.BlockRequester.StopTicker(blockID) + }, event.WithWorkerPool(wp)) } func (e *Engine) setupPruning() {