From 65f60f39ed60ba4bd52c0aaa1bedb9a81af98bd9 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 20 Nov 2023 13:42:46 +0530 Subject: [PATCH] Close previous conn when switching to new conn. (#351) * Close previous conn when switching to new conn. This is a different attempt at https://github.com/livekit/server-sdk-go/pull/350. Feels a bit cleaner, i. e. if the internal `conn atomic.Value` is replaced, the previous `conn` is closed before doing that. * Prevent race * race * check nil * Use atomic.Pointer * move OnClose back to defer, need to deal with race differently * use a local variable * nil check, just being paranoid --- README.md | 2 ++ go.mod | 2 +- signalclient.go | 42 +++++++++++++++++++----------------------- 3 files changed, 22 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index 2ef16fb9..2e9a871f 100644 --- a/README.md +++ b/README.md @@ -239,7 +239,9 @@ func ServeHTTP(w http.ResponseWriter, r *http.Request) { ``` +
+ diff --git a/go.mod b/go.mod index 2c9ba010..d3140802 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/livekit/server-sdk-go -go 1.18 +go 1.19 require ( github.com/bep/debounce v1.2.1 diff --git a/signalclient.go b/signalclient.go index b4aea996..b8369385 100644 --- a/signalclient.go +++ b/signalclient.go @@ -36,11 +36,11 @@ import ( const PROTOCOL = 8 type SignalClient struct { - conn atomic.Value // *websocket.Conn + conn atomic.Pointer[websocket.Conn] lock sync.Mutex - isClosed atomic.Bool isStarted atomic.Bool pendingResponse *livekit.SignalResponse + readerClosedCh chan struct{} OnClose func() OnAnswer func(sd webrtc.SessionDescription) @@ -66,7 +66,8 @@ func (c *SignalClient) Start() { if c.isStarted.Swap(true) { return } - go c.readWorker() + c.readerClosedCh = make(chan struct{}) + go c.readWorker(c.readerClosedCh) } func (c *SignalClient) IsStarted() bool { @@ -139,7 +140,7 @@ func (c *SignalClient) Join(urlPrefix string, token string, params *ConnectParam return nil, errors.New(errString) } } - c.isClosed.Store(false) + c.Close() // close previous conn, if any c.conn.Store(conn) // server should send join as soon as connected @@ -172,12 +173,15 @@ func (c *SignalClient) Join(urlPrefix string, token string, params *ConnectParam } func (c *SignalClient) Close() { - if c.isClosed.Swap(true) { - return - } - if conn := c.websocketConn(); conn != nil { + isStarted := c.IsStarted() + readerClosedCh := c.readerClosedCh + conn := c.websocketConn() + if conn != nil { _ = conn.Close() } + if isStarted && readerClosedCh != nil { + <-readerClosedCh + } } func (c *SignalClient) SendICECandidate(candidate webrtc.ICECandidateInit, target livekit.SignalTarget) error { @@ -263,10 +267,6 @@ func (c *SignalClient) SendUpdateParticipantMetadata(metadata *livekit.UpdatePar } func (c *SignalClient) readResponse() (*livekit.SignalResponse, error) { - if c.isClosed.Load() { - return nil, io.EOF - } - conn := c.websocketConn() if conn == nil { return nil, errors.New("cannot read response before join") @@ -345,9 +345,12 @@ func (c *SignalClient) handleResponse(res *livekit.SignalResponse) { } } -func (c *SignalClient) readWorker() { +func (c *SignalClient) readWorker(readerClosedCh chan struct{}) { defer func() { c.isStarted.Store(false) + c.conn.Store(nil) + close(readerClosedCh) + if c.OnClose != nil { c.OnClose() } @@ -356,10 +359,10 @@ func (c *SignalClient) readWorker() { c.handleResponse(pending) c.pendingResponse = nil } - for !c.isClosed.Load() { + for { res, err := c.readResponse() if err != nil { - if !isIgnoredWebsocketError(err) && !c.isClosed.Load() { + if !isIgnoredWebsocketError(err) { logger.Infow("error while reading from signal client", "err", err) } return @@ -369,14 +372,7 @@ func (c *SignalClient) readWorker() { } func (c *SignalClient) websocketConn() *websocket.Conn { - obj := c.conn.Load() - if obj == nil { - return nil - } - if conn, ok := obj.(*websocket.Conn); ok { - return conn - } - return nil + return c.conn.Load() } func isIgnoredWebsocketError(err error) bool {
LiveKit Ecosystem
Client SDKsComponents · JavaScript · iOS/macOS · Android · Flutter · React Native · Rust · Python · Unity (web) · Unity (beta)