-
Notifications
You must be signed in to change notification settings - Fork 99
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Close previous conn when switching to new conn. #351
Changes from all commits
04769f2
dc71bfb
7aa190c
b2c826c
0e1c8fe
9a727bc
0acef17
a5825d6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
module github.com/livekit/server-sdk-go | ||
|
||
go 1.18 | ||
go 1.19 | ||
|
||
require ( | ||
github.com/bep/debounce v1.2.1 | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,11 +36,11 @@ import ( | |
const PROTOCOL = 8 | ||
|
||
type SignalClient struct { | ||
conn atomic.Value // *websocket.Conn | ||
conn atomic.Pointer[websocket.Conn] | ||
lock sync.Mutex | ||
isClosed atomic.Bool | ||
isStarted atomic.Bool | ||
pendingResponse *livekit.SignalResponse | ||
readerClosedCh chan struct{} | ||
|
||
OnClose func() | ||
OnAnswer func(sd webrtc.SessionDescription) | ||
|
@@ -66,7 +66,8 @@ func (c *SignalClient) Start() { | |
if c.isStarted.Swap(true) { | ||
return | ||
} | ||
go c.readWorker() | ||
c.readerClosedCh = make(chan struct{}) | ||
go c.readWorker(c.readerClosedCh) | ||
} | ||
|
||
func (c *SignalClient) IsStarted() bool { | ||
|
@@ -139,7 +140,7 @@ func (c *SignalClient) Join(urlPrefix string, token string, params *ConnectParam | |
return nil, errors.New(errString) | ||
} | ||
} | ||
c.isClosed.Store(false) | ||
c.Close() // close previous conn, if any | ||
c.conn.Store(conn) | ||
|
||
// server should send join as soon as connected | ||
|
@@ -172,12 +173,15 @@ func (c *SignalClient) Join(urlPrefix string, token string, params *ConnectParam | |
} | ||
|
||
func (c *SignalClient) Close() { | ||
if c.isClosed.Swap(true) { | ||
return | ||
} | ||
if conn := c.websocketConn(); conn != nil { | ||
isStarted := c.IsStarted() | ||
readerClosedCh := c.readerClosedCh | ||
conn := c.websocketConn() | ||
if conn != nil { | ||
_ = conn.Close() | ||
} | ||
if isStarted && readerClosedCh != nil { | ||
<-readerClosedCh | ||
} | ||
} | ||
|
||
func (c *SignalClient) SendICECandidate(candidate webrtc.ICECandidateInit, target livekit.SignalTarget) error { | ||
|
@@ -263,10 +267,6 @@ func (c *SignalClient) SendUpdateParticipantMetadata(metadata *livekit.UpdatePar | |
} | ||
|
||
func (c *SignalClient) readResponse() (*livekit.SignalResponse, error) { | ||
if c.isClosed.Load() { | ||
return nil, io.EOF | ||
} | ||
|
||
conn := c.websocketConn() | ||
if conn == nil { | ||
return nil, errors.New("cannot read response before join") | ||
|
@@ -345,9 +345,12 @@ func (c *SignalClient) handleResponse(res *livekit.SignalResponse) { | |
} | ||
} | ||
|
||
func (c *SignalClient) readWorker() { | ||
func (c *SignalClient) readWorker(readerClosedCh chan struct{}) { | ||
defer func() { | ||
c.isStarted.Store(false) | ||
c.conn.Store(nil) | ||
close(readerClosedCh) | ||
|
||
if c.OnClose != nil { | ||
c.OnClose() | ||
} | ||
|
@@ -356,10 +359,10 @@ func (c *SignalClient) readWorker() { | |
c.handleResponse(pending) | ||
c.pendingResponse = nil | ||
} | ||
for !c.isClosed.Load() { | ||
for { | ||
res, err := c.readResponse() | ||
if err != nil { | ||
if !isIgnoredWebsocketError(err) && !c.isClosed.Load() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we'll also need to make sure readResponse returns EOF if it's expected close. otherwise it'll keep spamming logs about error while reading. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will look at this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On second thought, this does not do anything different. A unilateral close (by calling Close() method) will close the underlying conn and read will fail with Before this change, the code was only checking The remote close will be an error returned by Once an error is returned, the loop is exited. Am I missing something? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe I'm reading this incorrectly.. here it would previously return io.EOF when It seems that we should not return an error if Close has been called. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's correct @davidzhao . You are reading it right. But, what I was saying is that But, the Only difference is that previously, the read loop would not have logged an error before exiting as it was checking for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, I didn't mean it would be in a loop. Just that in server environments, it would be good to avoid logging errors when the user has closed the connection. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it, I misunderstood. I will look at adding a change to filter out the error There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressed in this PR - #353 |
||
if !isIgnoredWebsocketError(err) { | ||
logger.Infow("error while reading from signal client", "err", err) | ||
} | ||
return | ||
|
@@ -369,14 +372,7 @@ func (c *SignalClient) readWorker() { | |
} | ||
|
||
func (c *SignalClient) websocketConn() *websocket.Conn { | ||
obj := c.conn.Load() | ||
if obj == nil { | ||
return nil | ||
} | ||
if conn, ok := obj.(*websocket.Conn); ok { | ||
return conn | ||
} | ||
return nil | ||
return c.conn.Load() | ||
} | ||
|
||
func isIgnoredWebsocketError(err error) bool { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This means
OnClose
will only be called whenClose
is called instead of connection is interrupted, and theEngine
relies this callback to handle signal disconnection and resume.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @cnderrauber . Will take a look. Was hitting a data race in the tests with the
OnClose()
in thedefer
. So, I serialised it :-( My bad. Will take a look at calling back on read loop exit.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cnderrauber Made changes in these two commits
OnClose()
callback back todefer
ofreadWorker
.readerCloserCh
inClose()
was being accessed in bothClose()
andStart()
(where it is changed to set up for next read worker). Getting around it by using a local variable inClose()
and read on that.Please have a look when you get a chance and let me know if you have concerns.