Skip to content

Commit

Permalink
Close previous conn when switching to new conn.
Browse files Browse the repository at this point in the history
This is a different attempt at
#350. Feels a bit cleaner,
i. e. if the internal `conn atomic.Value` is replaced, the previous
`conn` is closed before doing that.
  • Loading branch information
boks1971 committed Nov 19, 2023
1 parent fa53741 commit 04769f2
Showing 1 changed file with 11 additions and 11 deletions.
22 changes: 11 additions & 11 deletions signalclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ const PROTOCOL = 8
type SignalClient struct {
conn atomic.Value // *websocket.Conn
lock sync.Mutex
isClosed atomic.Bool
isStarted atomic.Bool
pendingResponse *livekit.SignalResponse
readerClosedCh chan struct{}

OnClose func()
OnAnswer func(sd webrtc.SessionDescription)
Expand All @@ -66,6 +66,7 @@ func (c *SignalClient) Start() {
if c.isStarted.Swap(true) {
return
}
c.readerClosedCh = make(chan struct{})
go c.readWorker()
}

Expand Down Expand Up @@ -139,7 +140,7 @@ func (c *SignalClient) Join(urlPrefix string, token string, params *ConnectParam
return nil, errors.New(errString)
}
}
c.isClosed.Store(false)
c.Close() // close previous conn, if any
c.conn.Store(conn)

// server should send join as soon as connected
Expand Down Expand Up @@ -172,12 +173,13 @@ func (c *SignalClient) Join(urlPrefix string, token string, params *ConnectParam
}

func (c *SignalClient) Close() {
if c.isClosed.Swap(true) {
return
}
isStarted := c.IsStarted()
if conn := c.websocketConn(); conn != nil {
_ = conn.Close()
}
if isStarted {
<-c.readerClosedCh
}
}

func (c *SignalClient) SendICECandidate(candidate webrtc.ICECandidateInit, target livekit.SignalTarget) error {
Expand Down Expand Up @@ -263,10 +265,6 @@ func (c *SignalClient) SendUpdateParticipantMetadata(metadata *livekit.UpdatePar
}

func (c *SignalClient) readResponse() (*livekit.SignalResponse, error) {
if c.isClosed.Load() {
return nil, io.EOF
}

conn := c.websocketConn()
if conn == nil {
return nil, errors.New("cannot read response before join")
Expand Down Expand Up @@ -351,15 +349,17 @@ func (c *SignalClient) readWorker() {
if c.OnClose != nil {
c.OnClose()
}
c.conn.Store((*websocket.Conn)(nil))
close(c.readerClosedCh)
}()
if pending := c.pendingResponse; pending != nil {
c.handleResponse(pending)
c.pendingResponse = nil
}
for !c.isClosed.Load() {
for {
res, err := c.readResponse()
if err != nil {
if !isIgnoredWebsocketError(err) && !c.isClosed.Load() {
if !isIgnoredWebsocketError(err) {
logger.Infow("error while reading from signal client", "err", err)
}
return
Expand Down

0 comments on commit 04769f2

Please sign in to comment.