diff --git a/pkg/conference/participant/tracker.go b/pkg/conference/participant/tracker.go index 9be34f7..d1208c4 100644 --- a/pkg/conference/participant/tracker.go +++ b/pkg/conference/participant/tracker.go @@ -161,7 +161,11 @@ func (t *Tracker) RemovePublishedTrack(id track.TrackID) { } // Subscribes a given participant to the track. -func (t *Tracker) Subscribe(participantID ID, trackID track.TrackID, requirements track.TrackMetadata) error { +func (t *Tracker) Subscribe( + participantID ID, + trackID track.TrackID, + desiredWidth, desiredHeight int, +) error { // Check if the participant exists that wants to subscribe exists. participant := t.participants[participantID] if participant == nil { @@ -175,7 +179,13 @@ func (t *Tracker) Subscribe(participantID ID, trackID track.TrackID, requirement } // Subscribe to the track. - if err := published.Subscribe(participantID, participant.Peer, requirements, participant.Logger); err != nil { + if err := published.Subscribe( + participantID, + participant.Peer, + desiredWidth, + desiredHeight, + participant.Logger, + ); err != nil { return err } diff --git a/pkg/conference/peer_message_processing.go b/pkg/conference/peer_message_processing.go index a2f9369..169a2b5 100644 --- a/pkg/conference/peer_message_processing.go +++ b/pkg/conference/peer_message_processing.go @@ -172,8 +172,7 @@ func (c *Conference) processTrackSubscriptionMessage( // Now let's handle the subscribe commands. for _, track := range msg.Subscribe { - requirements := published.TrackMetadata{track.Width, track.Height} - if err := c.tracker.Subscribe(p.ID, track.TrackID, requirements); err != nil { + if err := c.tracker.Subscribe(p.ID, track.TrackID, track.Width, track.Height); err != nil { p.Logger.Errorf("Failed to subscribe to track %s: %v", track.TrackID, err) continue } diff --git a/pkg/conference/state.go b/pkg/conference/state.go index c6a7400..14c9129 100644 --- a/pkg/conference/state.go +++ b/pkg/conference/state.go @@ -122,9 +122,19 @@ func streamIntoTrackMetadata( tracksMetadata := make(map[published.TrackID]published.TrackMetadata) for _, metadata := range streamMetadata { for id, track := range metadata.Tracks { + // Determine if a given track is muted. + var muted bool + switch track.Kind { + case "audio": + muted = metadata.AudioMuted + case "video": + muted = metadata.VideoMuted + } + tracksMetadata[id] = published.TrackMetadata{ MaxWidth: track.Width, MaxHeight: track.Height, + Muted: muted, } } } diff --git a/pkg/conference/subscription/audio.go b/pkg/conference/subscription/audio.go index c1b40c3..ceaab15 100644 --- a/pkg/conference/subscription/audio.go +++ b/pkg/conference/subscription/audio.go @@ -49,6 +49,10 @@ func (s *AudioSubscription) Simulcast() webrtc_ext.SimulcastLayer { return webrtc_ext.SimulcastLayerNone } +func (s *AudioSubscription) UpdateMuteState(muted bool) { + // We don't have any business logic at the moment for audio subscriptions. +} + func (s *AudioSubscription) readRTCP() { // Read incoming RTCP packets. Before these packets are returned they are processed by interceptors. // For things like NACK this needs to be called. diff --git a/pkg/conference/subscription/subscription.go b/pkg/conference/subscription/subscription.go index bf6f6b4..1664184 100644 --- a/pkg/conference/subscription/subscription.go +++ b/pkg/conference/subscription/subscription.go @@ -11,6 +11,7 @@ type Subscription interface { WriteRTP(packet rtp.Packet) error SwitchLayer(simulcast webrtc_ext.SimulcastLayer) Simulcast() webrtc_ext.SimulcastLayer + UpdateMuteState(muted bool) } type SubscriptionController interface { diff --git a/pkg/conference/subscription/video.go b/pkg/conference/subscription/video.go index 908f902..54d1b4c 100644 --- a/pkg/conference/subscription/video.go +++ b/pkg/conference/subscription/video.go @@ -25,6 +25,7 @@ type VideoSubscription struct { info webrtc_ext.TrackInfo currentLayer atomic.Int32 // atomic webrtc_ext.SimulcastLayer + muted atomic.Bool controller SubscriptionController requestKeyFrameFn RequestKeyFrameFn @@ -37,6 +38,7 @@ type VideoSubscription struct { func NewVideoSubscription( info webrtc_ext.TrackInfo, simulcast webrtc_ext.SimulcastLayer, + muted bool, controller SubscriptionController, requestKeyFrameFn RequestKeyFrameFn, logger *logrus.Entry, @@ -57,11 +59,16 @@ func NewVideoSubscription( var currentLayer atomic.Int32 currentLayer.Store(int32(simulcast)) + // By default we assume that the track is not muted. + var mutedState atomic.Bool + mutedState.Store(muted) + // Create a subscription. subscription := &VideoSubscription{ rtpSender, info, currentLayer, + mutedState, controller, requestKeyFrameFn, nil, @@ -77,16 +84,16 @@ func NewVideoSubscription( // Configure the worker for the subscription. workerConfig := worker.Config[rtp.Packet]{ - ChannelSize: 16, // We really don't need a large buffer here, just to account for spikes. - Timeout: 10 * time.Second, // When do we assume the subscription is stalled. + ChannelSize: 16, // We really don't need a large buffer here, just to account for spikes. + Timeout: 3 * time.Second, // When do we assume the subscription is stalled. OnTimeout: func() { - // Not receiving RTP packets for 10 seconds can happen either if the video is muted. - // Or if something is wrong with the subscription (i.e. this quality is not being sent anymore). - layer := webrtc_ext.SimulcastLayer(subscription.currentLayer.Load()) - logger.Infof("No RTP on subscription to %s (%s) for 10 seconds", subscription.info.TrackID, layer) - - // This is susceptible to false-positives for muted videos! - subscription.telemetry.Fail(fmt.Errorf("No incoming RTP packets for 10 seconds")) + // Not receiving RTP packets for 3 seconds can happen either if we're muted (not an error), + // or if the peer does not send any data (that's a problem that potentially means a freeze). + if !subscription.muted.Load() { + layer := webrtc_ext.SimulcastLayer(subscription.currentLayer.Load()) + logger.Infof("No RTP on subscription to %s (%s) for 3 seconds", subscription.info.TrackID, layer) + subscription.telemetry.Fail(fmt.Errorf("No incoming RTP packets for 3 seconds")) + } }, OnTask: workerState.handlePacket, } @@ -130,6 +137,10 @@ func (s *VideoSubscription) Simulcast() webrtc_ext.SimulcastLayer { return webrtc_ext.SimulcastLayer(s.currentLayer.Load()) } +func (s *VideoSubscription) UpdateMuteState(muted bool) { + s.muted.Store(muted) +} + // Read incoming RTCP packets. Before these packets are returned they are processed by interceptors. func (s *VideoSubscription) readRTCP() { for { diff --git a/pkg/conference/track/simulcast.go b/pkg/conference/track/simulcast.go index b481f9e..9b3faca 100644 --- a/pkg/conference/track/simulcast.go +++ b/pkg/conference/track/simulcast.go @@ -8,6 +8,7 @@ import ( // This metadata is only set for video tracks at the moment. type TrackMetadata struct { MaxWidth, MaxHeight int + Muted bool } // Calculate the layer that we can use based on the requirements passed as parameters and available layers. diff --git a/pkg/conference/track/track.go b/pkg/conference/track/track.go index e8a99f0..5e94bde 100644 --- a/pkg/conference/track/track.go +++ b/pkg/conference/track/track.go @@ -182,7 +182,8 @@ func (p *PublishedTrack[SubscriberID]) Stop() { func (p *PublishedTrack[SubscriberID]) Subscribe( subscriberID SubscriberID, controller subscription.SubscriptionController, - requirements TrackMetadata, + desiredWidth int, + desiredHeight int, logger *logrus.Entry, ) error { if p.isClosed() { @@ -200,7 +201,7 @@ func (p *PublishedTrack[SubscriberID]) Subscribe( for key := range p.video.publishers { layers[key] = struct{}{} } - layer = getOptimalLayer(layers, p.metadata, requirements.MaxWidth, requirements.MaxHeight) + layer = getOptimalLayer(layers, p.metadata, desiredWidth, desiredHeight) } // If the subscription exists, let's see if we need to update it. @@ -232,6 +233,7 @@ func (p *PublishedTrack[SubscriberID]) Subscribe( sub, err = subscription.NewVideoSubscription( p.info, layer, + p.metadata.Muted, controller, handler, logger, @@ -297,4 +299,8 @@ func (p *PublishedTrack[SubscriberID]) SetMetadata(metadata TrackMetadata) { p.mutex.Lock() defer p.mutex.Unlock() p.metadata = metadata + + for _, sub := range p.subscriptions { + sub.UpdateMuteState(metadata.Muted) + } }