From a9a487ad1da187b0fa40b7888907f3e93d34fc35 Mon Sep 17 00:00:00 2001 From: Sergey Volkov Date: Tue, 6 Oct 2020 21:56:09 +0300 Subject: [PATCH] Add mutex for client.send --- client.go | 6 +++++- hub.go | 7 ++++--- receiverHub.go | 18 +++++++++++------- 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/client.go b/client.go index 1568f72..fe00a61 100644 --- a/client.go +++ b/client.go @@ -7,6 +7,7 @@ import ( log "github.com/sirupsen/logrus" "net/http" "net/url" + "sync" "time" ) @@ -37,6 +38,7 @@ type Client struct { stopSignal chan interface{} receiver *ReceiverHub wda *WdaHub + mutex *sync.Mutex } func (c *Client) readPump() { @@ -115,11 +117,13 @@ func (c *Client) stop() { log.Warn("Client.stop() called more then once") return } + c.mutex.Lock() close(*c.send) c.send = nil if c.stopSignal != nil { c.stopSignal <- nil } + c.mutex.Unlock() } func (c *Client) runWda(udid string) { @@ -143,7 +147,7 @@ func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) { return } send := make(chan []byte, 256) - client := &Client{hub: hub, conn: conn, send: &send} + client := &Client{hub: hub, conn: conn, send: &send, mutex: &sync.Mutex{}} client.hub.register <- client go client.writePump() diff --git a/hub.go b/hub.go index b1b5193..cb0921e 100644 --- a/hub.go +++ b/hub.go @@ -114,16 +114,17 @@ func (h *Hub) run(stopSignal chan interface{}) { h.unregisterClient(client) case message := <-h.broadcast: for client := range h.clients { - send := client.send - if send == nil { + if client.send == nil { continue } + client.mutex.Lock() select { - case *send <- message: + case *client.send <- message: default: client.stop() delete(h.clients, client) } + client.mutex.Unlock() } } log.Debug("Clients count: ", len(h.clients)) diff --git a/receiverHub.go b/receiverHub.go index 5ce13ff..d1c2047 100644 --- a/receiverHub.go +++ b/receiverHub.go @@ -138,10 +138,10 @@ func (r *ReceiverHub) run() { } case data := <-r.send: for client, status := range r.clients { - send := client.send - if send == nil { + if client.send == nil { continue } + client.mutex.Lock() nalUnitType := data[4] & 31 if nalUnitType == PPS { r.storeNalUnit(&r.pps, &data) @@ -151,37 +151,41 @@ func (r *ReceiverHub) run() { r.storeNalUnit(&r.sei, &data) } if status.gotIFrame { - *send <- data + *client.send <- data } else { if !status.gotPPS && r.pps != nil { status.gotPPS = true - *send <- r.pps + *client.send <- r.pps if nalUnitType == PPS { + client.mutex.Unlock() continue } } if !status.gotSPS && r.sps != nil { status.gotSPS = true - *send <- r.sps + *client.send <- r.sps if nalUnitType == SPS { + client.mutex.Unlock() continue } } if !status.gotSEI && r.sei != nil { status.gotSEI = true - *send <- r.sei + *client.send <- r.sei if nalUnitType == SEI { + client.mutex.Unlock() continue } } isIframe := nalUnitType == IDR if status.gotPPS && status.gotSPS && status.gotSEI && isIframe { status.gotIFrame = true - *send <- data + *client.send <- data } else { // log.Info("Receiver. ", "skipping frame for client: ", nalUnitType) } } + client.mutex.Unlock() } } }