diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 7bb26f29..9c280b0d 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -244,7 +244,7 @@ func (c *Controller) Run(ctx context.Context) *info.EgressInfo { logger.Debugw("closing sinks") for _, si := range c.sinks { for _, s := range si { - if err := s.Close(); err != nil && c.playing.IsBroken() { + if err := s.Close(); err != nil && c.playing.IsBroken() && c.FinalizationRequired { c.Info.SetFailed(err) return c.Info } diff --git a/pkg/pipeline/sink/websocket.go b/pkg/pipeline/sink/websocket.go index 4c9a12d5..078344be 100644 --- a/pkg/pipeline/sink/websocket.go +++ b/pkg/pipeline/sink/websocket.go @@ -206,11 +206,13 @@ func (s *WebsocketSink) Close() error { s.mu.Lock() defer s.mu.Unlock() if !s.closed.Swap(true) { + logger.Debugw("closing websocket connection") + // write close message for graceful disconnection _ = s.conn.WriteMessage(websocket.CloseMessage, nil) // terminate connection and close the `closed` channel - return s.conn.Close() + _ = s.conn.Close() } return nil