Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
craymond12 authored Sep 10, 2024
2 parents 179397b + 28adb77 commit c6e7fdb
Showing 1 changed file with 14 additions and 13 deletions.
27 changes: 14 additions & 13 deletions internal/mux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,16 @@ type Config struct {

// Mux allows multiplexing
type Mux struct {
lock sync.RWMutex
nextConn net.Conn
endpoints map[*Endpoint]MatchFunc
bufferSize int
closedCh chan struct{}
lock sync.Mutex
endpoints map[*Endpoint]MatchFunc
isClosed bool

pendingPackets [][]byte

log logging.LeveledLogger
closedCh chan struct{}
log logging.LeveledLogger
}

// NewMux creates a new Mux
Expand Down Expand Up @@ -96,6 +97,7 @@ func (m *Mux) Close() error {

delete(m.endpoints, e)
}
m.isClosed = true
m.lock.Unlock()

err := m.nextConn.Close()
Expand Down Expand Up @@ -154,22 +156,21 @@ func (m *Mux) dispatch(buf []byte) error {
break
}
}
m.lock.Unlock()

if endpoint == nil {
m.lock.Lock()
defer m.lock.Unlock()

if len(m.pendingPackets) >= maxPendingPackets {
m.log.Warnf("Warning: mux: no endpoint for packet starting with %d, not adding to queue size(%d)", buf[0], len(m.pendingPackets))
} else {
m.log.Warnf("Warning: mux: no endpoint for packet starting with %d, adding to queue size(%d)", buf[0], len(m.pendingPackets))
m.pendingPackets = append(m.pendingPackets, append([]byte{}, buf...))
if !m.isClosed {
if len(m.pendingPackets) >= maxPendingPackets {
m.log.Warnf("Warning: mux: no endpoint for packet starting with %d, not adding to queue size(%d)", buf[0], len(m.pendingPackets))
} else {
m.log.Warnf("Warning: mux: no endpoint for packet starting with %d, adding to queue size(%d)", buf[0], len(m.pendingPackets))
m.pendingPackets = append(m.pendingPackets, append([]byte{}, buf...))
}
}

return nil
}

m.lock.Unlock()
_, err := endpoint.buffer.Write(buf)

// Expected when bytes are received faster than the endpoint can process them (#2152, #2180)
Expand Down

0 comments on commit c6e7fdb

Please sign in to comment.