diff --git a/signalclient.go b/signalclient.go index b4aea996..3ef0a3f3 100644 --- a/signalclient.go +++ b/signalclient.go @@ -38,9 +38,9 @@ const PROTOCOL = 8 type SignalClient struct { conn atomic.Value // *websocket.Conn lock sync.Mutex - isClosed atomic.Bool isStarted atomic.Bool pendingResponse *livekit.SignalResponse + readerClosedCh chan struct{} OnClose func() OnAnswer func(sd webrtc.SessionDescription) @@ -66,6 +66,7 @@ func (c *SignalClient) Start() { if c.isStarted.Swap(true) { return } + c.readerClosedCh = make(chan struct{}) go c.readWorker() } @@ -139,7 +140,7 @@ func (c *SignalClient) Join(urlPrefix string, token string, params *ConnectParam return nil, errors.New(errString) } } - c.isClosed.Store(false) + c.Close() // close previous conn, if any c.conn.Store(conn) // server should send join as soon as connected @@ -172,12 +173,13 @@ func (c *SignalClient) Join(urlPrefix string, token string, params *ConnectParam } func (c *SignalClient) Close() { - if c.isClosed.Swap(true) { - return - } + isStarted := c.IsStarted() if conn := c.websocketConn(); conn != nil { _ = conn.Close() } + if isStarted { + <-c.readerClosedCh + } } func (c *SignalClient) SendICECandidate(candidate webrtc.ICECandidateInit, target livekit.SignalTarget) error { @@ -263,10 +265,6 @@ func (c *SignalClient) SendUpdateParticipantMetadata(metadata *livekit.UpdatePar } func (c *SignalClient) readResponse() (*livekit.SignalResponse, error) { - if c.isClosed.Load() { - return nil, io.EOF - } - conn := c.websocketConn() if conn == nil { return nil, errors.New("cannot read response before join") @@ -351,15 +349,17 @@ func (c *SignalClient) readWorker() { if c.OnClose != nil { c.OnClose() } + c.conn.Store((*websocket.Conn)(nil)) + close(c.readerClosedCh) }() if pending := c.pendingResponse; pending != nil { c.handleResponse(pending) c.pendingResponse = nil } - for !c.isClosed.Load() { + for { res, err := c.readResponse() if err != nil { - if !isIgnoredWebsocketError(err) && !c.isClosed.Load() { + if !isIgnoredWebsocketError(err) { logger.Infow("error while reading from signal client", "err", err) } return