From 7793888c941d7772a45332219a96b9acc4808fd0 Mon Sep 17 00:00:00 2001 From: Karolis Petrauskas Date: Wed, 10 May 2023 10:45:39 +0300 Subject: [PATCH 1/2] Avoid using locks in the pipe. --- packages/chain/cons/cons_gr/gr.go | 2 +- packages/util/pipe/interface.go | 2 +- packages/util/pipe/pipe.go | 23 +++++++---------------- 3 files changed, 9 insertions(+), 18 deletions(-) diff --git a/packages/chain/cons/cons_gr/gr.go b/packages/chain/cons/cons_gr/gr.go index 3ad47b80af..58086dd5e7 100644 --- a/packages/chain/cons/cons_gr/gr.go +++ b/packages/chain/cons/cons_gr/gr.go @@ -173,7 +173,7 @@ func New( cgr.log.Warnf("Unexpected message, type=%v", recv.MsgType) return } - cgr.netRecvPipe.TryAdd(recv) + cgr.netRecvPipe.TryAdd(recv, cgr.log.Debugf) }) cgr.netDisconnect = unhook diff --git a/packages/util/pipe/interface.go b/packages/util/pipe/interface.go index f772b0557b..38208042d7 100644 --- a/packages/util/pipe/interface.go +++ b/packages/util/pipe/interface.go @@ -23,5 +23,5 @@ type Pipe[E any] interface { Len() int Close() Discard() - TryAdd(e E) bool + TryAdd(e E, log func(msg string, args ...interface{})) } diff --git a/packages/util/pipe/pipe.go b/packages/util/pipe/pipe.go index 05c12db4f2..f63ce9e3dd 100644 --- a/packages/util/pipe/pipe.go +++ b/packages/util/pipe/pipe.go @@ -1,7 +1,5 @@ package pipe -import "sync" - // InfinitePipe provides deserialised sender and receiver: it queues messages // sent by the sender and returns them to the receiver whenever it is ready, // without blocking the sender process. Depending on the backing queue, the pipe @@ -12,8 +10,6 @@ type InfinitePipe[E any] struct { length chan int buffer Queue[E] discardCh chan struct{} - closeLock *sync.RWMutex - closed bool } var _ Pipe[Hashable] = &InfinitePipe[Hashable]{} @@ -57,8 +53,6 @@ func newInfinitePipe[E any](queue Queue[E]) *InfinitePipe[E] { length: make(chan int), buffer: queue, discardCh: make(chan struct{}), - closeLock: &sync.RWMutex{}, - closed: false, } go ch.infiniteBuffer() return ch @@ -77,10 +71,7 @@ func (ch *InfinitePipe[E]) Len() int { } func (ch *InfinitePipe[E]) Close() { - ch.closeLock.Lock() - defer ch.closeLock.Unlock() close(ch.input) - ch.closed = true } func (ch *InfinitePipe[E]) Discard() { @@ -88,14 +79,14 @@ func (ch *InfinitePipe[E]) Discard() { close(ch.discardCh) } -func (ch *InfinitePipe[E]) TryAdd(e E) bool { - ch.closeLock.RLock() - defer ch.closeLock.RUnlock() - if ch.closed { - return false - } +func (ch *InfinitePipe[E]) TryAdd(e E, log func(msg string, args ...interface{})) { + defer func() { + if err := recover(); err != nil { + log("Attempt to write to a closed channel: %v", e) + return + } + }() ch.In() <- e - return true } func (ch *InfinitePipe[E]) infiniteBuffer() { From 81e196a11a28012f31b580b0e1d13fad7d1e2801 Mon Sep 17 00:00:00 2001 From: Karolis Petrauskas Date: Wed, 10 May 2023 10:56:37 +0300 Subject: [PATCH 2/2] Clean the input channel when discarding a pipe. --- packages/util/pipe/pipe.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/util/pipe/pipe.go b/packages/util/pipe/pipe.go index f63ce9e3dd..7a6f546209 100644 --- a/packages/util/pipe/pipe.go +++ b/packages/util/pipe/pipe.go @@ -108,6 +108,10 @@ func (ch *InfinitePipe[E]) infiniteBuffer() { case ch.length <- ch.buffer.Length(): case <-ch.discardCh: // Close the pipe without waiting for the values to be consumed. + for range ch.input { //nolint:revive + // Just clear the channel, to avoid blocking the senders. + // The channel itself should be closed already. + } close(ch.output) close(ch.length) return