Skip to content

Commit

Permalink
Gracefully shutdown of the websocket client (#213)
Browse files Browse the repository at this point in the history
Resolves #163 

Co-authored-by: Evan Bradley <[email protected]>
Co-authored-by: Srikanth Chekuri <[email protected]>
Co-authored-by: Tigran Najaryan <[email protected]>
  • Loading branch information
4 people authored Mar 26, 2024
1 parent c319cfa commit c7fc585
Show file tree
Hide file tree
Showing 4 changed files with 326 additions and 35 deletions.
20 changes: 19 additions & 1 deletion client/internal/wsreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ type wsReceiver struct {
sender *WSSender
callbacks types.Callbacks
processor receivedProcessor

// Indicates that the receiver has fully stopped.
stopped chan struct{}
}

// NewWSReceiver creates a new Receiver that uses WebSocket to receive
Expand All @@ -36,25 +39,40 @@ func NewWSReceiver(
sender: sender,
callbacks: callbacks,
processor: newReceivedProcessor(logger, callbacks, sender, clientSyncedState, packagesStateProvider, capabilities),
stopped: make(chan struct{}),
}

return w
}

// ReceiverLoop runs the receiver loop. To stop the receiver cancel the context.
// Start starts the receiver loop.
func (r *wsReceiver) Start(ctx context.Context) {
go r.ReceiverLoop(ctx)
}

// IsStopped returns a channel that's closed when the receiver is stopped.
func (r *wsReceiver) IsStopped() <-chan struct{} {
return r.stopped
}

// ReceiverLoop runs the receiver loop.
// To stop the receiver cancel the context and close the websocket connection
func (r *wsReceiver) ReceiverLoop(ctx context.Context) {
type receivedMessage struct {
message *protobufs.ServerToAgent
err error
}

defer func() { close(r.stopped) }()

for {
select {
case <-ctx.Done():
return
default:
result := make(chan receivedMessage, 1)

// To stop this goroutine, close the websocket connection
go func() {
var message protobufs.ServerToAgent
err := r.receiveMessage(&message)
Expand Down
33 changes: 29 additions & 4 deletions client/internal/wssender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internal

import (
"context"
"time"

"github.com/gorilla/websocket"
"google.golang.org/protobuf/proto"
Expand All @@ -11,13 +12,19 @@ import (
"github.com/open-telemetry/opamp-go/protobufs"
)

const (
defaultSendCloseMessageTimeout = 5 * time.Second
)

// WSSender implements the WebSocket client's sending portion of OpAMP protocol.
type WSSender struct {
SenderCommon
conn *websocket.Conn
logger types.Logger

// Indicates that the sender has fully stopped.
stopped chan struct{}
err error
}

// NewSender creates a new Sender that uses WebSocket to send
Expand All @@ -37,15 +44,22 @@ func (s *WSSender) Start(ctx context.Context, conn *websocket.Conn) error {

// Run the sender in the background.
s.stopped = make(chan struct{})
s.err = nil
go s.run(ctx)

return err
}

// WaitToStop blocks until the sender is stopped. To stop the sender cancel the context
// that was passed to Start().
func (s *WSSender) WaitToStop() {
<-s.stopped
// IsStopped returns a channel that's closed when the sender is stopped.
func (s *WSSender) IsStopped() <-chan struct{} {
return s.stopped
}

// StoppingErr returns an error if there was a problem with stopping the sender.
// If stopping was successful will return nil.
// StoppingErr() can be called only after IsStopped() is signalled.
func (s *WSSender) StoppingErr() error {
return s.err
}

func (s *WSSender) run(ctx context.Context) {
Expand All @@ -56,13 +70,24 @@ out:
s.sendNextMessage(ctx)

case <-ctx.Done():
if err := s.sendCloseMessage(); err != nil && err != websocket.ErrCloseSent {
s.err = err
}
break out
}
}

close(s.stopped)
}

func (s *WSSender) sendCloseMessage() error {
return s.conn.WriteControl(
websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, "Normal closure"),
time.Now().Add(defaultSendCloseMessageTimeout),
)
}

func (s *WSSender) sendNextMessage(ctx context.Context) error {
msgToSend := s.nextMessage.PopPending()
if msgToSend != nil && !proto.Equal(msgToSend, &protobufs.AgentToServer{}) {
Expand Down
87 changes: 57 additions & 30 deletions client/wsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (
"github.com/open-telemetry/opamp-go/protobufs"
)

const (
defaultShutdownTimeout = 5 * time.Second
)

// wsClient is an OpAMP Client implementation for WebSocket transport.
// See specification: https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#websocket-transport
type wsClient struct {
Expand All @@ -40,6 +44,10 @@ type wsClient struct {
// last non-nil internal error that was encountered in the conn retry loop,
// currently used only for testing.
lastInternalErr atomic.Pointer[error]

// Network connection timeout used for the WebSocket closing handshake.
// This field is currently only modified during testing.
connShutdownTimeout time.Duration
}

// NewWebSocket creates a new OpAMP Client that uses WebSocket transport.
Expand All @@ -50,8 +58,9 @@ func NewWebSocket(logger types.Logger) *wsClient {

sender := internal.NewSender(logger)
w := &wsClient{
common: internal.NewClientCommon(logger, sender),
sender: sender,
common: internal.NewClientCommon(logger, sender),
sender: sender,
connShutdownTimeout: defaultShutdownTimeout,
}
return w
}
Expand Down Expand Up @@ -85,15 +94,6 @@ func (c *wsClient) Start(ctx context.Context, settings types.StartSettings) erro
}

func (c *wsClient) Stop(ctx context.Context) error {
// Close connection if any.
c.connMutex.RLock()
conn := c.conn
c.connMutex.RUnlock()

if conn != nil {
_ = conn.Close()
}

return c.common.Stop(ctx)
}

Expand Down Expand Up @@ -232,19 +232,25 @@ func (c *wsClient) ensureConnected(ctx context.Context) error {
// runOneCycle performs the following actions:
// 1. connect (try until succeeds).
// 2. send first status report.
// 3. receive and process messages until error happens.
// 3. start the sender to wait for scheduled messages and send them to the server.
// 4. start the receiver to receive and process messages until an error happens.
// 5. wait until both the sender and receiver are stopped.
//
// If it encounters an error it closes the connection and returns.
// Will stop and return if Stop() is called (ctx is cancelled, isStopping is set).
// runOneCycle will close the connection it created before it return.
//
// When Stop() is called (ctx is cancelled, isStopping is set), wsClient will shutdown gracefully:
// 1. sender will be cancelled by the ctx, send the close message to server and return the error via sender.Err().
// 2. runOneCycle will handle that error and wait for the close message from server until timeout.
func (c *wsClient) runOneCycle(ctx context.Context) {
if err := c.ensureConnected(ctx); err != nil {
// Can't connect, so can't move forward. This currently happens when we
// are being stopped.
return
}
// Close the underlying connection.
defer c.conn.Close()

if c.common.IsStopping() {
_ = c.conn.Close()
return
}

Expand All @@ -256,15 +262,14 @@ func (c *wsClient) runOneCycle(ctx context.Context) {
}

// Create a cancellable context for background processors.
procCtx, procCancel := context.WithCancel(ctx)
senderCtx, stopSender := context.WithCancel(ctx)
defer stopSender()

// Connected successfully. Start the sender. This will also send the first
// status report.
if err := c.sender.Start(procCtx, c.conn); err != nil {
c.common.Logger.Errorf(procCtx, "Failed to send first status report: %v", err)
if err := c.sender.Start(senderCtx, c.conn); err != nil {
c.common.Logger.Errorf(senderCtx, "Failed to send first status report: %v", err)
// We could not send the report, the only thing we can do is start over.
_ = c.conn.Close()
procCancel()
return
}

Expand All @@ -278,19 +283,41 @@ func (c *wsClient) runOneCycle(ctx context.Context) {
c.common.PackagesStateProvider,
c.common.Capabilities,
)
r.ReceiverLoop(ctx)

// Stop the background processors.
procCancel()

// If we exited receiverLoop it means there is a connection error, we cannot
// read messages anymore. We need to start over.
// When the wsclient is closed, the context passed to runOneCycle will be canceled.
// The receiver should keep running and processing messages
// until it received a Close message from the server which means the server has no more messages.
receiverCtx, stopReceiver := context.WithCancel(context.Background())
defer stopReceiver()
r.Start(receiverCtx)

select {
case <-c.sender.IsStopped():
// sender will send close message to initiate the close handshake
if err := c.sender.StoppingErr(); err != nil {
c.common.Logger.Debugf(ctx, "Error stopping the sender: %v", err)

stopReceiver()
<-r.IsStopped()
break
}

// Close the connection to unblock the WSSender as well.
_ = c.conn.Close()
c.common.Logger.Debugf(ctx, "Waiting for receiver to stop.")
select {
case <-r.IsStopped():
c.common.Logger.Debugf(ctx, "Receiver stopped.")
case <-time.After(c.connShutdownTimeout):
c.common.Logger.Debugf(ctx, "Timeout waiting for receiver to stop.")
stopReceiver()
<-r.IsStopped()
}
case <-r.IsStopped():
// If we exited receiverLoop it means there is a connection error, we cannot
// read messages anymore. We need to start over.

// Wait for WSSender to stop.
c.sender.WaitToStop()
stopSender()
<-c.sender.IsStopped()
}
}

func (c *wsClient) runUntilStopped(ctx context.Context) {
Expand Down
Loading

0 comments on commit c7fc585

Please sign in to comment.