Skip to content

Commit

Permalink
Merge pull request #2452 from iotaledger/discardable-pipes-no-lock
Browse files Browse the repository at this point in the history
Discardable pipes, no lock
  • Loading branch information
kape1395 authored May 10, 2023
2 parents 74a1588 + 81e196a commit 5b8ec05
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 18 deletions.
2 changes: 1 addition & 1 deletion packages/chain/cons/cons_gr/gr.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func New(
cgr.log.Warnf("Unexpected message, type=%v", recv.MsgType)
return
}
cgr.netRecvPipe.TryAdd(recv)
cgr.netRecvPipe.TryAdd(recv, cgr.log.Debugf)
})
cgr.netDisconnect = unhook

Expand Down
2 changes: 1 addition & 1 deletion packages/util/pipe/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ type Pipe[E any] interface {
Len() int
Close()
Discard()
TryAdd(e E) bool
TryAdd(e E, log func(msg string, args ...interface{}))
}
27 changes: 11 additions & 16 deletions packages/util/pipe/pipe.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package pipe

import "sync"

// InfinitePipe provides deserialised sender and receiver: it queues messages
// sent by the sender and returns them to the receiver whenever it is ready,
// without blocking the sender process. Depending on the backing queue, the pipe
Expand All @@ -12,8 +10,6 @@ type InfinitePipe[E any] struct {
length chan int
buffer Queue[E]
discardCh chan struct{}
closeLock *sync.RWMutex
closed bool
}

var _ Pipe[Hashable] = &InfinitePipe[Hashable]{}
Expand Down Expand Up @@ -57,8 +53,6 @@ func newInfinitePipe[E any](queue Queue[E]) *InfinitePipe[E] {
length: make(chan int),
buffer: queue,
discardCh: make(chan struct{}),
closeLock: &sync.RWMutex{},
closed: false,
}
go ch.infiniteBuffer()
return ch
Expand All @@ -77,25 +71,22 @@ func (ch *InfinitePipe[E]) Len() int {
}

func (ch *InfinitePipe[E]) Close() {
ch.closeLock.Lock()
defer ch.closeLock.Unlock()
close(ch.input)
ch.closed = true
}

func (ch *InfinitePipe[E]) Discard() {
ch.Close()
close(ch.discardCh)
}

func (ch *InfinitePipe[E]) TryAdd(e E) bool {
ch.closeLock.RLock()
defer ch.closeLock.RUnlock()
if ch.closed {
return false
}
func (ch *InfinitePipe[E]) TryAdd(e E, log func(msg string, args ...interface{})) {
defer func() {
if err := recover(); err != nil {
log("Attempt to write to a closed channel: %v", e)
return
}
}()
ch.In() <- e
return true
}

func (ch *InfinitePipe[E]) infiniteBuffer() {
Expand All @@ -117,6 +108,10 @@ func (ch *InfinitePipe[E]) infiniteBuffer() {
case ch.length <- ch.buffer.Length():
case <-ch.discardCh:
// Close the pipe without waiting for the values to be consumed.
for range ch.input { //nolint:revive
// Just clear the channel, to avoid blocking the senders.
// The channel itself should be closed already.
}
close(ch.output)
close(ch.length)
return
Expand Down

0 comments on commit 5b8ec05

Please sign in to comment.