Skip to content

Commit

Permalink
internal/mux: Implement pending handler
Browse files Browse the repository at this point in the history
  • Loading branch information
lactyy committed Jul 19, 2024
1 parent b874788 commit a1c1347
Showing 1 changed file with 55 additions and 7 deletions.
62 changes: 55 additions & 7 deletions internal/mux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"net"
"sync"
"time"

"github.com/pion/ice/v3"
"github.com/pion/logging"
Expand All @@ -34,20 +35,25 @@ type Mux struct {
bufferSize int
closedCh chan struct{}

pendingPackets map[*pendingPacket]struct{}
pendingPacketsLock sync.Mutex

log logging.LeveledLogger
}

// NewMux creates a new Mux
func NewMux(config Config) *Mux {
m := &Mux{
nextConn: config.Conn,
endpoints: make(map[*Endpoint]MatchFunc),
bufferSize: config.BufferSize,
closedCh: make(chan struct{}),
log: config.LoggerFactory.NewLogger("mux"),
nextConn: config.Conn,
endpoints: make(map[*Endpoint]MatchFunc),
bufferSize: config.BufferSize,
closedCh: make(chan struct{}),
log: config.LoggerFactory.NewLogger("mux"),
pendingPackets: make(map[*pendingPacket]struct{}),
}

go m.readLoop()
go m.pendingPacketsHandler()

return m
}
Expand Down Expand Up @@ -140,10 +146,17 @@ func (m *Mux) dispatch(buf []byte) error {

if endpoint == nil {
if len(buf) > 0 {
m.log.Warnf("Warning: mux: no endpoint for packet starting with %d", buf[0])
m.log.Warnf("Warning: mux: no endpoint for packet starting with %d, queueing packet as pending", buf[0])
} else {
m.log.Warnf("Warning: mux: no endpoint for zero length packet")
m.log.Warnf("Warning: mux: no endpoint for zero length packet, queueing packet as pending")

Check warning on line 151 in internal/mux/mux.go

View check run for this annotation

Codecov / codecov/patch

internal/mux/mux.go#L151

Added line #L151 was not covered by tests
}

m.pendingPacketsLock.Lock()
m.pendingPackets[&pendingPacket{
t: time.Now(),
data: buf,
}] = struct{}{}
m.pendingPacketsLock.Unlock()
return nil
}

Expand All @@ -157,3 +170,38 @@ func (m *Mux) dispatch(buf []byte) error {

return err
}

func (m *Mux) pendingPacketsHandler() {
ticker := time.NewTicker(time.Millisecond)
defer ticker.Stop()

for {
select {
case <-ticker.C:
m.lock.RLock()
m.pendingPacketsLock.Lock()
for p := range m.pendingPackets {
if time.Since(p.t) > time.Second*5 {
m.log.Warnf("Warning: mux: dropping packet after 5 seconds in pending queue")
delete(m.pendingPackets, p)

Check warning on line 186 in internal/mux/mux.go

View check run for this annotation

Codecov / codecov/patch

internal/mux/mux.go#L185-L186

Added lines #L185 - L186 were not covered by tests
}
for endpoint, f := range m.endpoints {
if f(p.data) {
_, _ = endpoint.buffer.Write(p.data)
delete(m.pendingPackets, p)
m.log.Warnf("Warning: mux: found endpoint for packet after %s in pending queue", time.Since(p.t))

Check warning on line 192 in internal/mux/mux.go

View check run for this annotation

Codecov / codecov/patch

internal/mux/mux.go#L190-L192

Added lines #L190 - L192 were not covered by tests
}
}
}
m.pendingPacketsLock.Unlock()
m.lock.RUnlock()
case <-m.closedCh:
return
}
}
}

type pendingPacket struct {
t time.Time
data []byte
}

0 comments on commit a1c1347

Please sign in to comment.