Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close previous conn when switching to new conn. #351

Merged
merged 8 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,9 @@ func ServeHTTP(w http.ResponseWriter, r *http.Request) {
```

<!--BEGIN_REPO_NAV-->

<br/><table>

<thead><tr><th colspan="2">LiveKit Ecosystem</th></tr></thead>
<tbody>
<tr><td>Client SDKs</td><td><a href="https://github.com/livekit/components-js">Components</a> · <a href="https://github.com/livekit/client-sdk-js">JavaScript</a> · <a href="https://github.com/livekit/client-sdk-swift">iOS/macOS</a> · <a href="https://github.com/livekit/client-sdk-android">Android</a> · <a href="https://github.com/livekit/client-sdk-flutter">Flutter</a> · <a href="https://github.com/livekit/client-sdk-react-native">React Native</a> · <a href="https://github.com/livekit/client-sdk-rust">Rust</a> · <a href="https://github.com/livekit/client-sdk-python">Python</a> · <a href="https://github.com/livekit/client-sdk-unity-web">Unity (web)</a> · <a href="https://github.com/livekit/client-sdk-unity">Unity (beta)</a></td></tr><tr></tr>
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/livekit/server-sdk-go

go 1.18
go 1.19

require (
github.com/bep/debounce v1.2.1
Expand Down
42 changes: 19 additions & 23 deletions signalclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ import (
const PROTOCOL = 8

type SignalClient struct {
conn atomic.Value // *websocket.Conn
conn atomic.Pointer[websocket.Conn]
lock sync.Mutex
isClosed atomic.Bool
isStarted atomic.Bool
pendingResponse *livekit.SignalResponse
readerClosedCh chan struct{}

OnClose func()
OnAnswer func(sd webrtc.SessionDescription)
Expand All @@ -66,7 +66,8 @@ func (c *SignalClient) Start() {
if c.isStarted.Swap(true) {
return
}
go c.readWorker()
c.readerClosedCh = make(chan struct{})
go c.readWorker(c.readerClosedCh)
}

func (c *SignalClient) IsStarted() bool {
Expand Down Expand Up @@ -139,7 +140,7 @@ func (c *SignalClient) Join(urlPrefix string, token string, params *ConnectParam
return nil, errors.New(errString)
}
}
c.isClosed.Store(false)
c.Close() // close previous conn, if any
c.conn.Store(conn)

// server should send join as soon as connected
Expand Down Expand Up @@ -172,12 +173,15 @@ func (c *SignalClient) Join(urlPrefix string, token string, params *ConnectParam
}

func (c *SignalClient) Close() {
if c.isClosed.Swap(true) {
return
}
if conn := c.websocketConn(); conn != nil {
isStarted := c.IsStarted()
readerClosedCh := c.readerClosedCh
conn := c.websocketConn()
if conn != nil {
_ = conn.Close()
}
if isStarted && readerClosedCh != nil {
<-readerClosedCh
}
}

func (c *SignalClient) SendICECandidate(candidate webrtc.ICECandidateInit, target livekit.SignalTarget) error {
Expand Down Expand Up @@ -263,10 +267,6 @@ func (c *SignalClient) SendUpdateParticipantMetadata(metadata *livekit.UpdatePar
}

func (c *SignalClient) readResponse() (*livekit.SignalResponse, error) {
if c.isClosed.Load() {
return nil, io.EOF
}

conn := c.websocketConn()
if conn == nil {
return nil, errors.New("cannot read response before join")
Expand Down Expand Up @@ -345,9 +345,12 @@ func (c *SignalClient) handleResponse(res *livekit.SignalResponse) {
}
}

func (c *SignalClient) readWorker() {
func (c *SignalClient) readWorker(readerClosedCh chan struct{}) {
defer func() {
c.isStarted.Store(false)
c.conn.Store(nil)
close(readerClosedCh)

if c.OnClose != nil {
c.OnClose()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means OnClose will only be called when Close is called instead of connection is interrupted, and the Engine relies this callback to handle signal disconnection and resume.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @cnderrauber . Will take a look. Was hitting a data race in the tests with the OnClose() in the defer. So, I serialised it :-( My bad. Will take a look at calling back on read loop exit.

Copy link
Contributor Author

@boks1971 boks1971 Nov 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cnderrauber Made changes in these two commits

  • 9a727bc - moving OnClose() callback back to defer of readWorker.
  • 0acef17 - using a local variable to avoid race. The race reported before is something like this. The test was calling simulate scenario reconnect which called signal client close. And the read on readerCloserCh in Close() was being accessed in both Close() and Start() (where it is changed to set up for next read worker). Getting around it by using a local variable in Close() and read on that.

Please have a look when you get a chance and let me know if you have concerns.

}
Expand All @@ -356,10 +359,10 @@ func (c *SignalClient) readWorker() {
c.handleResponse(pending)
c.pendingResponse = nil
}
for !c.isClosed.Load() {
for {
res, err := c.readResponse()
if err != nil {
if !isIgnoredWebsocketError(err) && !c.isClosed.Load() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'll also need to make sure readResponse returns EOF if it's expected close. otherwise it'll keep spamming logs about error while reading.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will look at this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, this does not do anything different. A unilateral close (by calling Close() method) will close the underlying conn and read will fail with reading a closed conn. And this loop will exit/return.

Before this change, the code was only checking isClosed which is set by the unilateral Close().

The remote close will be an error returned by ReadMessage. Yes, this change will log one message more because it will log an error on an unilateral Close() too. But, nothing more.

Once an error is returned, the loop is exited.

Am I missing something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe I'm reading this incorrectly.. here it would previously return io.EOF when Close has been called due to the c.isClosed.Load() check.. currently I think it'll go into errors.New("cannot read response before join") ?

It seems that we should not return an error if Close has been called.

Copy link
Contributor Author

@boks1971 boks1971 Nov 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct @davidzhao . You are reading it right. But, what I was saying is that Close() in this PR sets the conn to nil (the other discussion about atomic.Value == nil, I will change that to an atomic.Pointer). So, yes readResponse will return errors.New("cannot read response before join").

But, the readLoop will exit because of that error return.

Only difference is that previously, the read loop would not have logged an error before exiting as it was checking for isClosed and with this PR, it will log an error before exiting. So, what I was mentioning was that I could not spot any runaway loops repeatedly logging an error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't mean it would be in a loop. Just that in server environments, it would be good to avoid logging errors when the user has closed the connection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I misunderstood. I will look at adding a change to filter out the error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in this PR - #353

if !isIgnoredWebsocketError(err) {
logger.Infow("error while reading from signal client", "err", err)
}
return
Expand All @@ -369,14 +372,7 @@ func (c *SignalClient) readWorker() {
}

func (c *SignalClient) websocketConn() *websocket.Conn {
obj := c.conn.Load()
if obj == nil {
return nil
}
if conn, ok := obj.(*websocket.Conn); ok {
return conn
}
return nil
return c.conn.Load()
}

func isIgnoredWebsocketError(err error) bool {
Expand Down
Loading