Skip to content

Commit

Permalink
Add mutex for client.send
Browse files Browse the repository at this point in the history
  • Loading branch information
drauggres committed Oct 6, 2020
1 parent 778006f commit a9a487a
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 11 deletions.
6 changes: 5 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
log "github.com/sirupsen/logrus"
"net/http"
"net/url"
"sync"
"time"
)

Expand Down Expand Up @@ -37,6 +38,7 @@ type Client struct {
stopSignal chan interface{}
receiver *ReceiverHub
wda *WdaHub
mutex *sync.Mutex
}

func (c *Client) readPump() {
Expand Down Expand Up @@ -115,11 +117,13 @@ func (c *Client) stop() {
log.Warn("Client.stop() called more then once")
return
}
c.mutex.Lock()
close(*c.send)
c.send = nil
if c.stopSignal != nil {
c.stopSignal <- nil
}
c.mutex.Unlock()
}

func (c *Client) runWda(udid string) {
Expand All @@ -143,7 +147,7 @@ func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
return
}
send := make(chan []byte, 256)
client := &Client{hub: hub, conn: conn, send: &send}
client := &Client{hub: hub, conn: conn, send: &send, mutex: &sync.Mutex{}}
client.hub.register <- client

go client.writePump()
Expand Down
7 changes: 4 additions & 3 deletions hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,17 @@ func (h *Hub) run(stopSignal chan interface{}) {
h.unregisterClient(client)
case message := <-h.broadcast:
for client := range h.clients {
send := client.send
if send == nil {
if client.send == nil {
continue
}
client.mutex.Lock()
select {
case *send <- message:
case *client.send <- message:
default:
client.stop()
delete(h.clients, client)
}
client.mutex.Unlock()
}
}
log.Debug("Clients count: ", len(h.clients))
Expand Down
18 changes: 11 additions & 7 deletions receiverHub.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,10 @@ func (r *ReceiverHub) run() {
}
case data := <-r.send:
for client, status := range r.clients {
send := client.send
if send == nil {
if client.send == nil {
continue
}
client.mutex.Lock()
nalUnitType := data[4] & 31
if nalUnitType == PPS {
r.storeNalUnit(&r.pps, &data)
Expand All @@ -151,37 +151,41 @@ func (r *ReceiverHub) run() {
r.storeNalUnit(&r.sei, &data)
}
if status.gotIFrame {
*send <- data
*client.send <- data
} else {
if !status.gotPPS && r.pps != nil {
status.gotPPS = true
*send <- r.pps
*client.send <- r.pps
if nalUnitType == PPS {
client.mutex.Unlock()
continue
}
}
if !status.gotSPS && r.sps != nil {
status.gotSPS = true
*send <- r.sps
*client.send <- r.sps
if nalUnitType == SPS {
client.mutex.Unlock()
continue
}
}
if !status.gotSEI && r.sei != nil {
status.gotSEI = true
*send <- r.sei
*client.send <- r.sei
if nalUnitType == SEI {
client.mutex.Unlock()
continue
}
}
isIframe := nalUnitType == IDR
if status.gotPPS && status.gotSPS && status.gotSEI && isIframe {
status.gotIFrame = true
*send <- data
*client.send <- data
} else {
// log.Info("Receiver. ", "skipping frame for client: ", nalUnitType)
}
}
client.mutex.Unlock()
}
}
}
Expand Down

0 comments on commit a9a487a

Please sign in to comment.