Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close previous conn when switching to new conn. #351

Merged
merged 8 commits into from
Nov 20, 2023
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions signalclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ const PROTOCOL = 8
type SignalClient struct {
conn atomic.Value // *websocket.Conn
lock sync.Mutex
isClosed atomic.Bool
isStarted atomic.Bool
pendingResponse *livekit.SignalResponse
readerClosedCh chan struct{}

OnClose func()
OnAnswer func(sd webrtc.SessionDescription)
Expand All @@ -66,6 +66,7 @@ func (c *SignalClient) Start() {
if c.isStarted.Swap(true) {
return
}
c.readerClosedCh = make(chan struct{})
go c.readWorker()
}

Expand Down Expand Up @@ -139,7 +140,7 @@ func (c *SignalClient) Join(urlPrefix string, token string, params *ConnectParam
return nil, errors.New(errString)
}
}
c.isClosed.Store(false)
c.Close() // close previous conn, if any
c.conn.Store(conn)

// server should send join as soon as connected
Expand Down Expand Up @@ -172,12 +173,17 @@ func (c *SignalClient) Join(urlPrefix string, token string, params *ConnectParam
}

func (c *SignalClient) Close() {
if c.isClosed.Swap(true) {
return
}
if conn := c.websocketConn(); conn != nil {
isStarted := c.IsStarted()
conn := c.websocketConn()
if conn != nil {
_ = conn.Close()
}
if isStarted {
<-c.readerClosedCh
}
if conn != nil && c.OnClose != nil {
c.OnClose()
}
}

func (c *SignalClient) SendICECandidate(candidate webrtc.ICECandidateInit, target livekit.SignalTarget) error {
Expand Down Expand Up @@ -263,10 +269,6 @@ func (c *SignalClient) SendUpdateParticipantMetadata(metadata *livekit.UpdatePar
}

func (c *SignalClient) readResponse() (*livekit.SignalResponse, error) {
if c.isClosed.Load() {
return nil, io.EOF
}

conn := c.websocketConn()
if conn == nil {
return nil, errors.New("cannot read response before join")
Expand Down Expand Up @@ -348,18 +350,17 @@ func (c *SignalClient) handleResponse(res *livekit.SignalResponse) {
func (c *SignalClient) readWorker() {
defer func() {
c.isStarted.Store(false)
if c.OnClose != nil {
c.OnClose()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means OnClose will only be called when Close is called instead of connection is interrupted, and the Engine relies this callback to handle signal disconnection and resume.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @cnderrauber . Will take a look. Was hitting a data race in the tests with the OnClose() in the defer. So, I serialised it :-( My bad. Will take a look at calling back on read loop exit.

Copy link
Contributor Author

@boks1971 boks1971 Nov 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cnderrauber Made changes in these two commits

  • 9a727bc - moving OnClose() callback back to defer of readWorker.
  • 0acef17 - using a local variable to avoid race. The race reported before is something like this. The test was calling simulate scenario reconnect which called signal client close. And the read on readerCloserCh in Close() was being accessed in both Close() and Start() (where it is changed to set up for next read worker). Getting around it by using a local variable in Close() and read on that.

Please have a look when you get a chance and let me know if you have concerns.

}
c.conn.Store((*websocket.Conn)(nil))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we cannot store nil into an atomic.Value; it'll panic: https://pkg.go.dev/sync/atomic#Value.Store

would be good to use typed atomic.Pointer instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Casting nil to the concrete type does not cause a panic. Learnt that today. I can't find that post now, but here is a Go Playground example - https://go.dev/play/p/-O0qiTGJxJs>

But, I will change to look at atomic.Pointer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, because a reference to an interface with a type but a nil object is not nil, which makes nil checks really confusing at time. Ask me how I know...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you know @biglittlebigben 😊?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Panic in prod at 3am. It's always a panic in prod at 3am...

Copy link
Contributor Author

@boks1971 boks1971 Nov 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using atomic.Pointer in this commit.

NOTE: There is comment in README.md that this module requires go 1.18+ since version 1.0. Need to update that to 1.19+ when we cut a release.

close(c.readerClosedCh)
}()
if pending := c.pendingResponse; pending != nil {
c.handleResponse(pending)
c.pendingResponse = nil
}
for !c.isClosed.Load() {
for {
res, err := c.readResponse()
if err != nil {
if !isIgnoredWebsocketError(err) && !c.isClosed.Load() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'll also need to make sure readResponse returns EOF if it's expected close. otherwise it'll keep spamming logs about error while reading.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will look at this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, this does not do anything different. A unilateral close (by calling Close() method) will close the underlying conn and read will fail with reading a closed conn. And this loop will exit/return.

Before this change, the code was only checking isClosed which is set by the unilateral Close().

The remote close will be an error returned by ReadMessage. Yes, this change will log one message more because it will log an error on an unilateral Close() too. But, nothing more.

Once an error is returned, the loop is exited.

Am I missing something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe I'm reading this incorrectly.. here it would previously return io.EOF when Close has been called due to the c.isClosed.Load() check.. currently I think it'll go into errors.New("cannot read response before join") ?

It seems that we should not return an error if Close has been called.

Copy link
Contributor Author

@boks1971 boks1971 Nov 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct @davidzhao . You are reading it right. But, what I was saying is that Close() in this PR sets the conn to nil (the other discussion about atomic.Value == nil, I will change that to an atomic.Pointer). So, yes readResponse will return errors.New("cannot read response before join").

But, the readLoop will exit because of that error return.

Only difference is that previously, the read loop would not have logged an error before exiting as it was checking for isClosed and with this PR, it will log an error before exiting. So, what I was mentioning was that I could not spot any runaway loops repeatedly logging an error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't mean it would be in a loop. Just that in server environments, it would be good to avoid logging errors when the user has closed the connection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I misunderstood. I will look at adding a change to filter out the error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in this PR - #353

if !isIgnoredWebsocketError(err) {
logger.Infow("error while reading from signal client", "err", err)
}
return
Expand Down
Loading