From 04769f232eab9c7c5e19b8c7edacb4a5c714dd0a Mon Sep 17 00:00:00 2001 From: boks1971 Date: Sun, 19 Nov 2023 09:07:48 +0530 Subject: [PATCH 1/8] Close previous conn when switching to new conn. This is a different attempt at https://github.com/livekit/server-sdk-go/pull/350. Feels a bit cleaner, i. e. if the internal `conn atomic.Value` is replaced, the previous `conn` is closed before doing that. --- signalclient.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/signalclient.go b/signalclient.go index b4aea996..3ef0a3f3 100644 --- a/signalclient.go +++ b/signalclient.go @@ -38,9 +38,9 @@ const PROTOCOL = 8 type SignalClient struct { conn atomic.Value // *websocket.Conn lock sync.Mutex - isClosed atomic.Bool isStarted atomic.Bool pendingResponse *livekit.SignalResponse + readerClosedCh chan struct{} OnClose func() OnAnswer func(sd webrtc.SessionDescription) @@ -66,6 +66,7 @@ func (c *SignalClient) Start() { if c.isStarted.Swap(true) { return } + c.readerClosedCh = make(chan struct{}) go c.readWorker() } @@ -139,7 +140,7 @@ func (c *SignalClient) Join(urlPrefix string, token string, params *ConnectParam return nil, errors.New(errString) } } - c.isClosed.Store(false) + c.Close() // close previous conn, if any c.conn.Store(conn) // server should send join as soon as connected @@ -172,12 +173,13 @@ func (c *SignalClient) Join(urlPrefix string, token string, params *ConnectParam } func (c *SignalClient) Close() { - if c.isClosed.Swap(true) { - return - } + isStarted := c.IsStarted() if conn := c.websocketConn(); conn != nil { _ = conn.Close() } + if isStarted { + <-c.readerClosedCh + } } func (c *SignalClient) SendICECandidate(candidate webrtc.ICECandidateInit, target livekit.SignalTarget) error { @@ -263,10 +265,6 @@ func (c *SignalClient) SendUpdateParticipantMetadata(metadata *livekit.UpdatePar } func (c *SignalClient) readResponse() (*livekit.SignalResponse, error) { - if c.isClosed.Load() { - return nil, io.EOF - } - conn := c.websocketConn() if conn == nil { return nil, errors.New("cannot read response before join") @@ -351,15 +349,17 @@ func (c *SignalClient) readWorker() { if c.OnClose != nil { c.OnClose() } + c.conn.Store((*websocket.Conn)(nil)) + close(c.readerClosedCh) }() if pending := c.pendingResponse; pending != nil { c.handleResponse(pending) c.pendingResponse = nil } - for !c.isClosed.Load() { + for { res, err := c.readResponse() if err != nil { - if !isIgnoredWebsocketError(err) && !c.isClosed.Load() { + if !isIgnoredWebsocketError(err) { logger.Infow("error while reading from signal client", "err", err) } return From dc71bfbd24d691ca5bf8f9b60a7593d8d8fe6639 Mon Sep 17 00:00:00 2001 From: boks1971 Date: Sun, 19 Nov 2023 09:28:03 +0530 Subject: [PATCH 2/8] Prevent race --- signalclient.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/signalclient.go b/signalclient.go index 3ef0a3f3..a1b6119b 100644 --- a/signalclient.go +++ b/signalclient.go @@ -346,11 +346,12 @@ func (c *SignalClient) handleResponse(res *livekit.SignalResponse) { func (c *SignalClient) readWorker() { defer func() { c.isStarted.Store(false) + c.conn.Store((*websocket.Conn)(nil)) + close(c.readerClosedCh) + if c.OnClose != nil { c.OnClose() } - c.conn.Store((*websocket.Conn)(nil)) - close(c.readerClosedCh) }() if pending := c.pendingResponse; pending != nil { c.handleResponse(pending) From 7aa190c377c9d1fe1275829337a7d752f76f1a6f Mon Sep 17 00:00:00 2001 From: boks1971 Date: Sun, 19 Nov 2023 09:38:50 +0530 Subject: [PATCH 3/8] race --- signalclient.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/signalclient.go b/signalclient.go index a1b6119b..b6d8cd53 100644 --- a/signalclient.go +++ b/signalclient.go @@ -174,12 +174,16 @@ func (c *SignalClient) Join(urlPrefix string, token string, params *ConnectParam func (c *SignalClient) Close() { isStarted := c.IsStarted() - if conn := c.websocketConn(); conn != nil { + conn := c.websocketConn() + if conn != nil { _ = conn.Close() } if isStarted { <-c.readerClosedCh } + if conn != nil { + c.OnClose() + } } func (c *SignalClient) SendICECandidate(candidate webrtc.ICECandidateInit, target livekit.SignalTarget) error { @@ -348,10 +352,6 @@ func (c *SignalClient) readWorker() { c.isStarted.Store(false) c.conn.Store((*websocket.Conn)(nil)) close(c.readerClosedCh) - - if c.OnClose != nil { - c.OnClose() - } }() if pending := c.pendingResponse; pending != nil { c.handleResponse(pending) From b2c826ce7a891098bc180dca4b23dae27bf613b4 Mon Sep 17 00:00:00 2001 From: boks1971 Date: Sun, 19 Nov 2023 09:41:27 +0530 Subject: [PATCH 4/8] check nil --- signalclient.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/signalclient.go b/signalclient.go index b6d8cd53..0097ba2e 100644 --- a/signalclient.go +++ b/signalclient.go @@ -181,7 +181,7 @@ func (c *SignalClient) Close() { if isStarted { <-c.readerClosedCh } - if conn != nil { + if conn != nil && c.OnClose != nil { c.OnClose() } } From 0e1c8fe0ebcd88598b16d75e0cde2ee29513ddc1 Mon Sep 17 00:00:00 2001 From: boks1971 Date: Mon, 20 Nov 2023 10:25:30 +0530 Subject: [PATCH 5/8] Use atomic.Pointer --- README.md | 2 ++ go.mod | 2 +- signalclient.go | 13 +++---------- 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 2ef16fb9..2e9a871f 100644 --- a/README.md +++ b/README.md @@ -239,7 +239,9 @@ func ServeHTTP(w http.ResponseWriter, r *http.Request) { ``` +
+ diff --git a/go.mod b/go.mod index 2c9ba010..d3140802 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/livekit/server-sdk-go -go 1.18 +go 1.19 require ( github.com/bep/debounce v1.2.1 diff --git a/signalclient.go b/signalclient.go index 0097ba2e..c8741c93 100644 --- a/signalclient.go +++ b/signalclient.go @@ -36,7 +36,7 @@ import ( const PROTOCOL = 8 type SignalClient struct { - conn atomic.Value // *websocket.Conn + conn atomic.Pointer[websocket.Conn] lock sync.Mutex isStarted atomic.Bool pendingResponse *livekit.SignalResponse @@ -350,7 +350,7 @@ func (c *SignalClient) handleResponse(res *livekit.SignalResponse) { func (c *SignalClient) readWorker() { defer func() { c.isStarted.Store(false) - c.conn.Store((*websocket.Conn)(nil)) + c.conn.Store(nil) close(c.readerClosedCh) }() if pending := c.pendingResponse; pending != nil { @@ -370,14 +370,7 @@ func (c *SignalClient) readWorker() { } func (c *SignalClient) websocketConn() *websocket.Conn { - obj := c.conn.Load() - if obj == nil { - return nil - } - if conn, ok := obj.(*websocket.Conn); ok { - return conn - } - return nil + return c.conn.Load() } func isIgnoredWebsocketError(err error) bool { From 9a727bc78b3482d2a8e4177e8391f80902d19618 Mon Sep 17 00:00:00 2001 From: boks1971 Date: Mon, 20 Nov 2023 11:38:07 +0530 Subject: [PATCH 6/8] move OnClose back to defer, need to deal with race differently --- signalclient.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/signalclient.go b/signalclient.go index c8741c93..c3f387a0 100644 --- a/signalclient.go +++ b/signalclient.go @@ -181,9 +181,6 @@ func (c *SignalClient) Close() { if isStarted { <-c.readerClosedCh } - if conn != nil && c.OnClose != nil { - c.OnClose() - } } func (c *SignalClient) SendICECandidate(candidate webrtc.ICECandidateInit, target livekit.SignalTarget) error { @@ -352,6 +349,10 @@ func (c *SignalClient) readWorker() { c.isStarted.Store(false) c.conn.Store(nil) close(c.readerClosedCh) + + if c.OnClose != nil { + c.OnClose() + } }() if pending := c.pendingResponse; pending != nil { c.handleResponse(pending) From 0acef170f0908065abd1546e9cee319bbeb0ea19 Mon Sep 17 00:00:00 2001 From: boks1971 Date: Mon, 20 Nov 2023 11:52:42 +0530 Subject: [PATCH 7/8] use a local variable --- signalclient.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/signalclient.go b/signalclient.go index c3f387a0..2a355d0f 100644 --- a/signalclient.go +++ b/signalclient.go @@ -67,7 +67,7 @@ func (c *SignalClient) Start() { return } c.readerClosedCh = make(chan struct{}) - go c.readWorker() + go c.readWorker(c.readerClosedCh) } func (c *SignalClient) IsStarted() bool { @@ -174,12 +174,13 @@ func (c *SignalClient) Join(urlPrefix string, token string, params *ConnectParam func (c *SignalClient) Close() { isStarted := c.IsStarted() + readerClosedCh := c.readerClosedCh conn := c.websocketConn() if conn != nil { _ = conn.Close() } if isStarted { - <-c.readerClosedCh + <-readerClosedCh } } @@ -344,11 +345,11 @@ func (c *SignalClient) handleResponse(res *livekit.SignalResponse) { } } -func (c *SignalClient) readWorker() { +func (c *SignalClient) readWorker(readerClosedCh chan struct{}) { defer func() { c.isStarted.Store(false) c.conn.Store(nil) - close(c.readerClosedCh) + close(readerClosedCh) if c.OnClose != nil { c.OnClose() From a5825d6b8ead3bd60212659869d49bc9542cb8ff Mon Sep 17 00:00:00 2001 From: boks1971 Date: Mon, 20 Nov 2023 12:00:14 +0530 Subject: [PATCH 8/8] nil check, just being paranoid --- signalclient.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/signalclient.go b/signalclient.go index 2a355d0f..b8369385 100644 --- a/signalclient.go +++ b/signalclient.go @@ -179,7 +179,7 @@ func (c *SignalClient) Close() { if conn != nil { _ = conn.Close() } - if isStarted { + if isStarted && readerClosedCh != nil { <-readerClosedCh } }
LiveKit Ecosystem
Client SDKsComponents · JavaScript · iOS/macOS · Android · Flutter · React Native · Rust · Python · Unity (web) · Unity (beta)