diff --git a/pkg/lksdk_output/lksdk_output.go b/pkg/lksdk_output/lksdk_output.go index 87298dfc..3705b07d 100644 --- a/pkg/lksdk_output/lksdk_output.go +++ b/pkg/lksdk_output/lksdk_output.go @@ -18,6 +18,7 @@ import ( "context" "sync/atomic" + "github.com/frostbyte73/core" "github.com/pion/rtcp" "github.com/pion/webrtc/v3" @@ -37,6 +38,7 @@ type VideoSampleProvider interface { type LKSDKOutput struct { logger logger.Logger room *lksdk.Room + closed core.Fuse params *params.Params } @@ -59,6 +61,7 @@ func NewLKSDKOutput(ctx context.Context, p *params.Params) (*LKSDKOutput, error) room: room, params: p, logger: logger.GetLogger().WithValues("ingressID", p.IngressId, "resourceID", p.State.ResourceId, "roomID", room.SID()), + closed: core.NewFuse(), } s.logger.Infow("connected to room") @@ -85,7 +88,8 @@ func (s *LKSDKOutput) AddAudioTrack(output lksdk.SampleProvider, mimeType string var pub *lksdk.LocalTrackPublication onComplete := func() { s.logger.Debugw("audio track write complete, unpublishing audio track") - if pub != nil { + // don't unpublish if the completion is due to the output closing + if pub != nil && !s.closed.IsBroken() { if err := s.room.LocalParticipant.UnpublishTrack(pub.SID()); err != nil { s.logger.Errorw("could not unpublish audio track", err) } @@ -124,7 +128,8 @@ func (s *LKSDKOutput) AddVideoTrack(outputs []VideoSampleProvider, layers []*liv output := outputs[i] onComplete := func() { s.logger.Debugw("video track layer write complete", "layer", layer.Quality.String()) - if pub != nil { + // don't unpublish if the completion is due to the output closing + if pub != nil && !s.closed.IsBroken() { if atomic.AddInt32(&activeLayerCount, -1) == 0 { s.logger.Debugw("unpublishing video track") if err := s.room.LocalParticipant.UnpublishTrack(pub.SID()); err != nil { @@ -175,5 +180,8 @@ func (s *LKSDKOutput) AddVideoTrack(outputs []VideoSampleProvider, layers []*liv func (s *LKSDKOutput) Close() { s.logger.Debugw("disconnecting from room") + + s.closed.Break() + s.room.Disconnect() }