From bcfebdb2280431ddcecf6c565999e7b7c6732cfe Mon Sep 17 00:00:00 2001 From: Daniel Abramov Date: Fri, 24 Mar 2023 14:42:30 +0100 Subject: [PATCH] subscription: don't assume muted tracks stalled When the user mutes the video, we don't get any RTPs. We don't assume this error anymore and don't change the status of the track and its telemetry for that. This removes false-positives related to the https://github.com/matrix-org/waterfall/issues/121 --- pkg/conference/participant/tracker.go | 14 ++++++++-- pkg/conference/peer_message_processing.go | 3 +-- pkg/conference/state.go | 10 +++++++ pkg/conference/subscription/audio.go | 4 +++ pkg/conference/subscription/subscription.go | 1 + pkg/conference/subscription/video.go | 29 ++++++++++++++------- pkg/conference/track/simulcast.go | 1 + pkg/conference/track/track.go | 10 +++++-- 8 files changed, 57 insertions(+), 15 deletions(-) diff --git a/pkg/conference/participant/tracker.go b/pkg/conference/participant/tracker.go index 9be34f7..d1208c4 100644 --- a/pkg/conference/participant/tracker.go +++ b/pkg/conference/participant/tracker.go @@ -161,7 +161,11 @@ func (t *Tracker) RemovePublishedTrack(id track.TrackID) { } // Subscribes a given participant to the track. -func (t *Tracker) Subscribe(participantID ID, trackID track.TrackID, requirements track.TrackMetadata) error { +func (t *Tracker) Subscribe( + participantID ID, + trackID track.TrackID, + desiredWidth, desiredHeight int, +) error { // Check if the participant exists that wants to subscribe exists. participant := t.participants[participantID] if participant == nil { @@ -175,7 +179,13 @@ func (t *Tracker) Subscribe(participantID ID, trackID track.TrackID, requirement } // Subscribe to the track. - if err := published.Subscribe(participantID, participant.Peer, requirements, participant.Logger); err != nil { + if err := published.Subscribe( + participantID, + participant.Peer, + desiredWidth, + desiredHeight, + participant.Logger, + ); err != nil { return err } diff --git a/pkg/conference/peer_message_processing.go b/pkg/conference/peer_message_processing.go index a2f9369..169a2b5 100644 --- a/pkg/conference/peer_message_processing.go +++ b/pkg/conference/peer_message_processing.go @@ -172,8 +172,7 @@ func (c *Conference) processTrackSubscriptionMessage( // Now let's handle the subscribe commands. for _, track := range msg.Subscribe { - requirements := published.TrackMetadata{track.Width, track.Height} - if err := c.tracker.Subscribe(p.ID, track.TrackID, requirements); err != nil { + if err := c.tracker.Subscribe(p.ID, track.TrackID, track.Width, track.Height); err != nil { p.Logger.Errorf("Failed to subscribe to track %s: %v", track.TrackID, err) continue } diff --git a/pkg/conference/state.go b/pkg/conference/state.go index c6a7400..14c9129 100644 --- a/pkg/conference/state.go +++ b/pkg/conference/state.go @@ -122,9 +122,19 @@ func streamIntoTrackMetadata( tracksMetadata := make(map[published.TrackID]published.TrackMetadata) for _, metadata := range streamMetadata { for id, track := range metadata.Tracks { + // Determine if a given track is muted. + var muted bool + switch track.Kind { + case "audio": + muted = metadata.AudioMuted + case "video": + muted = metadata.VideoMuted + } + tracksMetadata[id] = published.TrackMetadata{ MaxWidth: track.Width, MaxHeight: track.Height, + Muted: muted, } } } diff --git a/pkg/conference/subscription/audio.go b/pkg/conference/subscription/audio.go index c1b40c3..ceaab15 100644 --- a/pkg/conference/subscription/audio.go +++ b/pkg/conference/subscription/audio.go @@ -49,6 +49,10 @@ func (s *AudioSubscription) Simulcast() webrtc_ext.SimulcastLayer { return webrtc_ext.SimulcastLayerNone } +func (s *AudioSubscription) UpdateMuteState(muted bool) { + // We don't have any business logic at the moment for audio subscriptions. +} + func (s *AudioSubscription) readRTCP() { // Read incoming RTCP packets. Before these packets are returned they are processed by interceptors. // For things like NACK this needs to be called. diff --git a/pkg/conference/subscription/subscription.go b/pkg/conference/subscription/subscription.go index bf6f6b4..1664184 100644 --- a/pkg/conference/subscription/subscription.go +++ b/pkg/conference/subscription/subscription.go @@ -11,6 +11,7 @@ type Subscription interface { WriteRTP(packet rtp.Packet) error SwitchLayer(simulcast webrtc_ext.SimulcastLayer) Simulcast() webrtc_ext.SimulcastLayer + UpdateMuteState(muted bool) } type SubscriptionController interface { diff --git a/pkg/conference/subscription/video.go b/pkg/conference/subscription/video.go index 908f902..54d1b4c 100644 --- a/pkg/conference/subscription/video.go +++ b/pkg/conference/subscription/video.go @@ -25,6 +25,7 @@ type VideoSubscription struct { info webrtc_ext.TrackInfo currentLayer atomic.Int32 // atomic webrtc_ext.SimulcastLayer + muted atomic.Bool controller SubscriptionController requestKeyFrameFn RequestKeyFrameFn @@ -37,6 +38,7 @@ type VideoSubscription struct { func NewVideoSubscription( info webrtc_ext.TrackInfo, simulcast webrtc_ext.SimulcastLayer, + muted bool, controller SubscriptionController, requestKeyFrameFn RequestKeyFrameFn, logger *logrus.Entry, @@ -57,11 +59,16 @@ func NewVideoSubscription( var currentLayer atomic.Int32 currentLayer.Store(int32(simulcast)) + // By default we assume that the track is not muted. + var mutedState atomic.Bool + mutedState.Store(muted) + // Create a subscription. subscription := &VideoSubscription{ rtpSender, info, currentLayer, + mutedState, controller, requestKeyFrameFn, nil, @@ -77,16 +84,16 @@ func NewVideoSubscription( // Configure the worker for the subscription. workerConfig := worker.Config[rtp.Packet]{ - ChannelSize: 16, // We really don't need a large buffer here, just to account for spikes. - Timeout: 10 * time.Second, // When do we assume the subscription is stalled. + ChannelSize: 16, // We really don't need a large buffer here, just to account for spikes. + Timeout: 3 * time.Second, // When do we assume the subscription is stalled. OnTimeout: func() { - // Not receiving RTP packets for 10 seconds can happen either if the video is muted. - // Or if something is wrong with the subscription (i.e. this quality is not being sent anymore). - layer := webrtc_ext.SimulcastLayer(subscription.currentLayer.Load()) - logger.Infof("No RTP on subscription to %s (%s) for 10 seconds", subscription.info.TrackID, layer) - - // This is susceptible to false-positives for muted videos! - subscription.telemetry.Fail(fmt.Errorf("No incoming RTP packets for 10 seconds")) + // Not receiving RTP packets for 3 seconds can happen either if we're muted (not an error), + // or if the peer does not send any data (that's a problem that potentially means a freeze). + if !subscription.muted.Load() { + layer := webrtc_ext.SimulcastLayer(subscription.currentLayer.Load()) + logger.Infof("No RTP on subscription to %s (%s) for 3 seconds", subscription.info.TrackID, layer) + subscription.telemetry.Fail(fmt.Errorf("No incoming RTP packets for 3 seconds")) + } }, OnTask: workerState.handlePacket, } @@ -130,6 +137,10 @@ func (s *VideoSubscription) Simulcast() webrtc_ext.SimulcastLayer { return webrtc_ext.SimulcastLayer(s.currentLayer.Load()) } +func (s *VideoSubscription) UpdateMuteState(muted bool) { + s.muted.Store(muted) +} + // Read incoming RTCP packets. Before these packets are returned they are processed by interceptors. func (s *VideoSubscription) readRTCP() { for { diff --git a/pkg/conference/track/simulcast.go b/pkg/conference/track/simulcast.go index b481f9e..9b3faca 100644 --- a/pkg/conference/track/simulcast.go +++ b/pkg/conference/track/simulcast.go @@ -8,6 +8,7 @@ import ( // This metadata is only set for video tracks at the moment. type TrackMetadata struct { MaxWidth, MaxHeight int + Muted bool } // Calculate the layer that we can use based on the requirements passed as parameters and available layers. diff --git a/pkg/conference/track/track.go b/pkg/conference/track/track.go index e8a99f0..5e94bde 100644 --- a/pkg/conference/track/track.go +++ b/pkg/conference/track/track.go @@ -182,7 +182,8 @@ func (p *PublishedTrack[SubscriberID]) Stop() { func (p *PublishedTrack[SubscriberID]) Subscribe( subscriberID SubscriberID, controller subscription.SubscriptionController, - requirements TrackMetadata, + desiredWidth int, + desiredHeight int, logger *logrus.Entry, ) error { if p.isClosed() { @@ -200,7 +201,7 @@ func (p *PublishedTrack[SubscriberID]) Subscribe( for key := range p.video.publishers { layers[key] = struct{}{} } - layer = getOptimalLayer(layers, p.metadata, requirements.MaxWidth, requirements.MaxHeight) + layer = getOptimalLayer(layers, p.metadata, desiredWidth, desiredHeight) } // If the subscription exists, let's see if we need to update it. @@ -232,6 +233,7 @@ func (p *PublishedTrack[SubscriberID]) Subscribe( sub, err = subscription.NewVideoSubscription( p.info, layer, + p.metadata.Muted, controller, handler, logger, @@ -297,4 +299,8 @@ func (p *PublishedTrack[SubscriberID]) SetMetadata(metadata TrackMetadata) { p.mutex.Lock() defer p.mutex.Unlock() p.metadata = metadata + + for _, sub := range p.subscriptions { + sub.UpdateMuteState(metadata.Muted) + } }