Skip to content

Commit

Permalink
subscription: don't assume muted tracks stalled
Browse files Browse the repository at this point in the history
When the user mutes the video, we don't get any RTPs. We don't assume
this error anymore and don't change the status of the track and its
telemetry for that.

This removes false-positives related to the
#121
  • Loading branch information
daniel-abramov committed Mar 24, 2023
1 parent 7a1f226 commit bcfebdb
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 15 deletions.
14 changes: 12 additions & 2 deletions pkg/conference/participant/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,11 @@ func (t *Tracker) RemovePublishedTrack(id track.TrackID) {
}

// Subscribes a given participant to the track.
func (t *Tracker) Subscribe(participantID ID, trackID track.TrackID, requirements track.TrackMetadata) error {
func (t *Tracker) Subscribe(
participantID ID,
trackID track.TrackID,
desiredWidth, desiredHeight int,
) error {
// Check if the participant exists that wants to subscribe exists.
participant := t.participants[participantID]
if participant == nil {
Expand All @@ -175,7 +179,13 @@ func (t *Tracker) Subscribe(participantID ID, trackID track.TrackID, requirement
}

// Subscribe to the track.
if err := published.Subscribe(participantID, participant.Peer, requirements, participant.Logger); err != nil {
if err := published.Subscribe(
participantID,
participant.Peer,
desiredWidth,
desiredHeight,
participant.Logger,
); err != nil {
return err
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/conference/peer_message_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,7 @@ func (c *Conference) processTrackSubscriptionMessage(

// Now let's handle the subscribe commands.
for _, track := range msg.Subscribe {
requirements := published.TrackMetadata{track.Width, track.Height}
if err := c.tracker.Subscribe(p.ID, track.TrackID, requirements); err != nil {
if err := c.tracker.Subscribe(p.ID, track.TrackID, track.Width, track.Height); err != nil {
p.Logger.Errorf("Failed to subscribe to track %s: %v", track.TrackID, err)
continue
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/conference/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,19 @@ func streamIntoTrackMetadata(
tracksMetadata := make(map[published.TrackID]published.TrackMetadata)
for _, metadata := range streamMetadata {
for id, track := range metadata.Tracks {
// Determine if a given track is muted.
var muted bool
switch track.Kind {
case "audio":
muted = metadata.AudioMuted
case "video":
muted = metadata.VideoMuted
}

tracksMetadata[id] = published.TrackMetadata{
MaxWidth: track.Width,
MaxHeight: track.Height,
Muted: muted,
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/conference/subscription/audio.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func (s *AudioSubscription) Simulcast() webrtc_ext.SimulcastLayer {
return webrtc_ext.SimulcastLayerNone
}

func (s *AudioSubscription) UpdateMuteState(muted bool) {
// We don't have any business logic at the moment for audio subscriptions.
}

func (s *AudioSubscription) readRTCP() {
// Read incoming RTCP packets. Before these packets are returned they are processed by interceptors.
// For things like NACK this needs to be called.
Expand Down
1 change: 1 addition & 0 deletions pkg/conference/subscription/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type Subscription interface {
WriteRTP(packet rtp.Packet) error
SwitchLayer(simulcast webrtc_ext.SimulcastLayer)
Simulcast() webrtc_ext.SimulcastLayer
UpdateMuteState(muted bool)
}

type SubscriptionController interface {
Expand Down
29 changes: 20 additions & 9 deletions pkg/conference/subscription/video.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type VideoSubscription struct {

info webrtc_ext.TrackInfo
currentLayer atomic.Int32 // atomic webrtc_ext.SimulcastLayer
muted atomic.Bool

controller SubscriptionController
requestKeyFrameFn RequestKeyFrameFn
Expand All @@ -37,6 +38,7 @@ type VideoSubscription struct {
func NewVideoSubscription(
info webrtc_ext.TrackInfo,
simulcast webrtc_ext.SimulcastLayer,
muted bool,
controller SubscriptionController,
requestKeyFrameFn RequestKeyFrameFn,
logger *logrus.Entry,
Expand All @@ -57,11 +59,16 @@ func NewVideoSubscription(
var currentLayer atomic.Int32
currentLayer.Store(int32(simulcast))

// By default we assume that the track is not muted.
var mutedState atomic.Bool
mutedState.Store(muted)

// Create a subscription.
subscription := &VideoSubscription{
rtpSender,
info,
currentLayer,
mutedState,
controller,
requestKeyFrameFn,
nil,
Expand All @@ -77,16 +84,16 @@ func NewVideoSubscription(

// Configure the worker for the subscription.
workerConfig := worker.Config[rtp.Packet]{
ChannelSize: 16, // We really don't need a large buffer here, just to account for spikes.
Timeout: 10 * time.Second, // When do we assume the subscription is stalled.
ChannelSize: 16, // We really don't need a large buffer here, just to account for spikes.
Timeout: 3 * time.Second, // When do we assume the subscription is stalled.
OnTimeout: func() {
// Not receiving RTP packets for 10 seconds can happen either if the video is muted.
// Or if something is wrong with the subscription (i.e. this quality is not being sent anymore).
layer := webrtc_ext.SimulcastLayer(subscription.currentLayer.Load())
logger.Infof("No RTP on subscription to %s (%s) for 10 seconds", subscription.info.TrackID, layer)

// This is susceptible to false-positives for muted videos!
subscription.telemetry.Fail(fmt.Errorf("No incoming RTP packets for 10 seconds"))
// Not receiving RTP packets for 3 seconds can happen either if we're muted (not an error),
// or if the peer does not send any data (that's a problem that potentially means a freeze).
if !subscription.muted.Load() {
layer := webrtc_ext.SimulcastLayer(subscription.currentLayer.Load())
logger.Infof("No RTP on subscription to %s (%s) for 3 seconds", subscription.info.TrackID, layer)
subscription.telemetry.Fail(fmt.Errorf("No incoming RTP packets for 3 seconds"))
}
},
OnTask: workerState.handlePacket,
}
Expand Down Expand Up @@ -130,6 +137,10 @@ func (s *VideoSubscription) Simulcast() webrtc_ext.SimulcastLayer {
return webrtc_ext.SimulcastLayer(s.currentLayer.Load())
}

func (s *VideoSubscription) UpdateMuteState(muted bool) {
s.muted.Store(muted)
}

// Read incoming RTCP packets. Before these packets are returned they are processed by interceptors.
func (s *VideoSubscription) readRTCP() {
for {
Expand Down
1 change: 1 addition & 0 deletions pkg/conference/track/simulcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
// This metadata is only set for video tracks at the moment.
type TrackMetadata struct {
MaxWidth, MaxHeight int
Muted bool
}

// Calculate the layer that we can use based on the requirements passed as parameters and available layers.
Expand Down
10 changes: 8 additions & 2 deletions pkg/conference/track/track.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ func (p *PublishedTrack[SubscriberID]) Stop() {
func (p *PublishedTrack[SubscriberID]) Subscribe(
subscriberID SubscriberID,
controller subscription.SubscriptionController,
requirements TrackMetadata,
desiredWidth int,
desiredHeight int,
logger *logrus.Entry,
) error {
if p.isClosed() {
Expand All @@ -200,7 +201,7 @@ func (p *PublishedTrack[SubscriberID]) Subscribe(
for key := range p.video.publishers {
layers[key] = struct{}{}
}
layer = getOptimalLayer(layers, p.metadata, requirements.MaxWidth, requirements.MaxHeight)
layer = getOptimalLayer(layers, p.metadata, desiredWidth, desiredHeight)
}

// If the subscription exists, let's see if we need to update it.
Expand Down Expand Up @@ -232,6 +233,7 @@ func (p *PublishedTrack[SubscriberID]) Subscribe(
sub, err = subscription.NewVideoSubscription(
p.info,
layer,
p.metadata.Muted,
controller,
handler,
logger,
Expand Down Expand Up @@ -297,4 +299,8 @@ func (p *PublishedTrack[SubscriberID]) SetMetadata(metadata TrackMetadata) {
p.mutex.Lock()
defer p.mutex.Unlock()
p.metadata = metadata

for _, sub := range p.subscriptions {
sub.UpdateMuteState(metadata.Muted)
}
}

0 comments on commit bcfebdb

Please sign in to comment.