Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performant RTP Publisher #134

Merged
merged 7 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 67 additions & 115 deletions pkg/conference/participant/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,34 @@ package participant
import (
"fmt"

"github.com/matrix-org/waterfall/pkg/conference/subscription"
pub "github.com/matrix-org/waterfall/pkg/conference/track"
dbkr marked this conversation as resolved.
Show resolved Hide resolved
"github.com/matrix-org/waterfall/pkg/webrtc_ext"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"github.com/sirupsen/logrus"
"golang.org/x/exp/slices"
)

type TrackStoppedMessage struct {
TrackID pub.TrackID
OwnerID ID
}

// Tracks participants and their corresponding tracks.
// These are grouped together as the field in this structure must be kept synchronized.
type Tracker struct {
participants map[ID]*Participant
publishedTracks map[TrackID]*PublishedTrack
publishedTracks map[pub.TrackID]*pub.PublishedTrack[ID]

publishedTrackStopped chan<- TrackStoppedMessage
conferenceEnded <-chan struct{}
dbkr marked this conversation as resolved.
Show resolved Hide resolved
}

func NewParticipantTracker() *Tracker {
func NewParticipantTracker(conferenceEnded <-chan struct{}) (*Tracker, <-chan TrackStoppedMessage) {
publishedTrackStopped := make(chan TrackStoppedMessage)
return &Tracker{
participants: make(map[ID]*Participant),
publishedTracks: make(map[TrackID]*PublishedTrack),
}
participants: make(map[ID]*Participant),
publishedTracks: make(map[pub.TrackID]*pub.PublishedTrack[ID]),
publishedTrackStopped: publishedTrackStopped,
conferenceEnded: conferenceEnded,
}, publishedTrackStopped
}

// Adds a new participant in the list.
Expand Down Expand Up @@ -62,20 +70,17 @@ func (t *Tracker) RemoveParticipant(participantID ID) map[string]bool {
// Remove the participant's tracks from all participants who might have subscribed to them.
streamIdentifiers := make(map[string]bool)
for trackID, track := range t.publishedTracks {
if track.Owner == participantID {
if track.Owner() == participantID {
// Odd way to add to a set in Go.
streamIdentifiers[track.Info.StreamID] = true
streamIdentifiers[track.Info().StreamID] = true
t.RemovePublishedTrack(trackID)
}
}

// Go over all subscriptions and remove the participant from them.
// TODO: Perhaps we could simply react to the subscrpitions dying and remove them from the list.
for _, publishedTrack := range t.publishedTracks {
if subscription, found := publishedTrack.Subscriptions[participantID]; found {
subscription.Unsubscribe()
delete(publishedTrack.Subscriptions, participantID)
}
publishedTrack.Unsubscribe(participantID)
}

return streamIdentifiers
Expand All @@ -85,151 +90,98 @@ func (t *Tracker) RemoveParticipant(participantID ID) map[string]bool {
// that has been published and that we must take into account from now on.
func (t *Tracker) AddPublishedTrack(
participantID ID,
info webrtc_ext.TrackInfo,
simulcast webrtc_ext.SimulcastLayer,
metadata TrackMetadata,
outputTrack *webrtc.TrackLocalStaticRTP,
) {
// If this is a new track, let's add it to the list of published and inform participants.
track, found := t.publishedTracks[info.TrackID]
if !found {
layers := []webrtc_ext.SimulcastLayer{}
if simulcast != webrtc_ext.SimulcastLayerNone {
layers = append(layers, simulcast)
}
track *webrtc.TrackRemote,
metadata pub.TrackMetadata,
) error {
participant := t.participants[participantID]
if participant == nil {
return fmt.Errorf("participant %s does not exist", participantID)
}

t.publishedTracks[info.TrackID] = &PublishedTrack{
Owner: participantID,
Info: info,
Layers: layers,
Metadata: metadata,
OutputTrack: outputTrack,
Subscriptions: make(map[ID]subscription.Subscription),
// If this is a new track, let's add it to the list of published and inform participants.
if published, found := t.publishedTracks[track.ID()]; found {
if err := published.AddPublisher(track); err != nil {
return err
}

return
return nil
}

// If it's just a new layer, let's add it to the list of layers of the existing published track.
fn := func(layer webrtc_ext.SimulcastLayer) bool { return layer == simulcast }
if simulcast != webrtc_ext.SimulcastLayerNone && slices.IndexFunc(track.Layers, fn) == -1 {
track.Layers = append(track.Layers, simulcast)
t.publishedTracks[info.TrackID] = track
published, err := pub.NewPublishedTrack(
participantID,
participant.Peer.RequestKeyFrame,
track,
metadata,
participant.Logger,
)
if err != nil {
return err
}

// Wait for the track to complete and inform the conference about it.
go func() {
// Wait for the track to complete.
<-published.Done()

// Inform the conference that the track is gone. Or stop the go-routine if the conference stopped.
select {
case t.publishedTrackStopped <- TrackStoppedMessage{track.ID(), participantID}:
case <-t.conferenceEnded:
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Select chooses only one case at random if both are ready. The secound case is to to prevent this goroutine from blocking indefinitely? Then the channel t.publishedTrackStopped should never be closed, or we'll panic at some point. on the other hand an unclosed channel cannot be cleaned up by the garbage collector.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we were sure thats t.conferenceEnded is closed before t.publishedTrackStopped is closed and cleaned up, we could doing something like this:

select {
   case <-t.conferenceEnded: 
      return
   default: 
      t.publishedTrackStopped <- TrackStoppedMessage{remoteTrack.ID(), participantID}:	
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A condition thats this not blocking endless is thats all messages from t.publishedTrackStopped are read before close(t.publishedTrackStopped)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We often have that a send channel depends on another open state (here is conference still existsing)

What do you think about this? Has this side effects?

trackIsDone = c.tracker.AddPublishedTrack(sender, msg.RemoteTrack, trackMetadata)
go func() {
   select {
      case <- trackIsDone: 
           c.tracker.RemovePublishedTrack(id track.TrackID)
      case <- t.conferenceEnded:
          // stop track
   }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about this? Has these side effects?

Yeah, this will work! However, it would also mean that now the tracker and related fields in the Conference and Tracker must be protected by a mutex since now we have the possibility of mutating the data from 2 different threads. That's actually the only reason why I'm not calling a function from the go-routine in the current implementation, but sending a message to the conference, to ensure that handling of conference-state events is processed within the same conference loop and that we don't need to synchronize access to the variables. If we were to choose to mutate things directly from a go-routine, we need to ensure that all the data that we access/mutate from different threads is synchronized.

on the other hand an unclosed channel cannot be cleaned up by the garbage collector.

While I agree that generally closing channels is a good thing for the majority of the cases (there are cases when the Go channels are not closed sometimes, i.e. when there are multiple concurrent writers to it), I'm not sure about "unclosed channel cannot be cleaned up by the garbage collector". Are you sure about that? - AFAIK, the channels are freed by the garbage collector once they are not used.

IIRC all "reported cases" of people complaining that their memory is leaked when the channel is not closed are not caused by the channel not being closed, rather they are caused by the incorrect implementation on their side that does continue to use the channel (the sending part) indefinitely and hangs either on the sender or receiver side indefinitely effectively preventing the GC to clean up the channel since they are in use.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure about that? - AFAIK, the channels are freed by the garbage collector once they are not used.

No. I was wrong with that. I was talking about buffered channels. But here as well, when the channel is not
referenced, the values in the channel buffer are also not referenced. An thy get cleaned from the gcc (Ian Lance Taylor, Dave Cheney, I think we can trust them :D)

So we have not to close channels, from this point of few.

}()

t.publishedTracks[track.ID()] = published
return nil
}

// Iterates over published tracks and calls a closure upon each track info.
func (t *Tracker) ForEachPublishedTrackInfo(fn func(ID, webrtc_ext.TrackInfo)) {
for _, track := range t.publishedTracks {
fn(track.Owner, track.Info)
fn(track.Owner(), track.Info())
}
}

// Updates metadata associated with a given track.
func (t *Tracker) UpdatePublishedTrackMetadata(id TrackID, metadata TrackMetadata) {
func (t *Tracker) UpdatePublishedTrackMetadata(id pub.TrackID, metadata pub.TrackMetadata) {
if track, found := t.publishedTracks[id]; found {
track.Metadata = metadata
track.SetMetadata(metadata)
t.publishedTracks[id] = track
}
}

// Informs the tracker that one of the previously published tracks is gone.
func (t *Tracker) RemovePublishedTrack(id TrackID) {
func (t *Tracker) RemovePublishedTrack(id pub.TrackID) {
if publishedTrack, found := t.publishedTracks[id]; found {
// Iterate over all subscriptions and end them.
for subscriberID, subscription := range publishedTrack.Subscriptions {
subscription.Unsubscribe()
delete(publishedTrack.Subscriptions, subscriberID)
}

publishedTrack.Stop()
delete(t.publishedTracks, id)
}
}

// Subscribes a given participant to the track.
func (t *Tracker) Subscribe(participantID ID, trackID TrackID, requirements TrackMetadata) error {
func (t *Tracker) Subscribe(participantID ID, trackID pub.TrackID, requirements pub.TrackMetadata) error {
// Check if the participant exists that wants to subscribe exists.
participant := t.participants[participantID]
if participant == nil {
return fmt.Errorf("participant %s does not exist", participantID)
}

// Check if the track that we want to subscribe to exists.
// Check if the track that we want to subscribe exists.
published := t.publishedTracks[trackID]
if published == nil {
return fmt.Errorf("track %s does not exist", trackID)
}

// Calculate the desired simulcast layer.
desiredLayer := published.GetOptimalLayer(requirements.MaxWidth, requirements.MaxHeight)

// If the subscription exists, let's see if we need to update it.
if sub := published.Subscriptions[participantID]; sub != nil {
if sub.Simulcast() != desiredLayer {
sub.SwitchLayer(desiredLayer)
return nil
}

return fmt.Errorf("subscription already exists and up-to-date")
}

// Find the owner of the track that we're trying to subscribe to.
owner := t.participants[published.Owner]
if owner == nil {
return fmt.Errorf("owner of the track %s does not exist", published.Info.TrackID)
}

var (
sub subscription.Subscription
err error
)

// Subscription does not exist, so let's create it.
switch published.Info.Kind {
case webrtc.RTPCodecTypeVideo:
sub, err = subscription.NewVideoSubscription(
published.Info,
desiredLayer,
participant.Peer,
func(track webrtc_ext.TrackInfo, simulcast webrtc_ext.SimulcastLayer) error {
return owner.Peer.RequestKeyFrame(track, simulcast)
},
participant.Logger,
)
case webrtc.RTPCodecTypeAudio:
sub, err = subscription.NewAudioSubscription(published.OutputTrack, participant.Peer)
}

// If there was an error, let's return it.
if err != nil {
// Subscribe to the track.
if err := published.Subscribe(participantID, participant.Peer, requirements, participant.Logger); err != nil {
return err
}

// Add the subscription to the list of subscriptions.
published.Subscriptions[participantID] = sub

return nil
}

// Unsubscribes a given `participantID` from the track.
func (t *Tracker) Unsubscribe(participantID ID, trackID TrackID) {
func (t *Tracker) Unsubscribe(participantID ID, trackID pub.TrackID) {
if published := t.publishedTracks[trackID]; published != nil {
if sub := published.Subscriptions[participantID]; sub != nil {
sub.Unsubscribe()
delete(published.Subscriptions, participantID)
}
}
}

// Processes an RTP packet received on a given track.
func (t *Tracker) ProcessRTP(info webrtc_ext.TrackInfo, simulcast webrtc_ext.SimulcastLayer, packet *rtp.Packet) {
if published := t.publishedTracks[info.TrackID]; published != nil {
for _, sub := range published.Subscriptions {
if sub.Simulcast() == simulcast {
if err := sub.WriteRTP(*packet); err != nil {
logrus.Errorf("Dropping an RTP packet on %s (%s): %s", info.TrackID, simulcast, err)
}
}
}
published.Unsubscribe(participantID)
}
}
20 changes: 9 additions & 11 deletions pkg/conference/peer_message_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package conference

import (
"github.com/matrix-org/waterfall/pkg/conference/participant"
published "github.com/matrix-org/waterfall/pkg/conference/track"
"github.com/matrix-org/waterfall/pkg/peer"
"github.com/matrix-org/waterfall/pkg/signaling"
"maunium.net/go/mautrix/event"
Expand All @@ -23,23 +24,20 @@ func (c *Conference) processLeftTheCallMessage(sender participant.ID, msg peer.L
}

func (c *Conference) processNewTrackPublishedMessage(sender participant.ID, msg peer.NewTrackPublished) {
c.newLogger(sender).Infof("Published new track: %s (%v)", msg.TrackID, msg.SimulcastLayer)
id := msg.RemoteTrack.ID()
c.newLogger(sender).Infof("Published new track: %s (%v)", id, msg.RemoteTrack.RID())

// Find metadata for a given track.
trackMetadata := streamIntoTrackMetadata(c.streamsMetadata)[msg.TrackID]
trackMetadata := streamIntoTrackMetadata(c.streamsMetadata)[id]

// If a new track has been published, we inform everyone about new track available.
c.tracker.AddPublishedTrack(sender, msg.TrackInfo, msg.SimulcastLayer, trackMetadata, msg.OutputTrack)
c.tracker.AddPublishedTrack(sender, msg.RemoteTrack, trackMetadata)
c.resendMetadataToAllExcept(sender)
}

func (c *Conference) processRTPPacketReceivedMessage(msg peer.RTPPacketReceived) {
c.tracker.ProcessRTP(msg.TrackInfo, msg.SimulcastLayer, msg.Packet)
}

func (c *Conference) processPublishedTrackFailedMessage(sender participant.ID, msg peer.PublishedTrackFailed) {
c.newLogger(sender).Infof("Failed published track: %s", msg.TrackID)
c.tracker.RemovePublishedTrack(msg.TrackID)
func (c *Conference) processPublishedTrackFailedMessage(sender participant.ID, trackID published.TrackID) {
c.newLogger(sender).Infof("Failed published track: %s", trackID)
c.tracker.RemovePublishedTrack(trackID)
c.resendMetadataToAllExcept(sender)
}

Expand Down Expand Up @@ -163,7 +161,7 @@ func (c *Conference) processTrackSubscriptionMessage(
for _, track := range msg.Subscribe {
p.Logger.Debugf("Subscribing to track %s", track.TrackID)

requirements := participant.TrackMetadata{track.Width, track.Height}
requirements := published.TrackMetadata{track.Width, track.Height}
if err := c.tracker.Subscribe(p.ID, track.TrackID, requirements); err != nil {
p.Logger.Errorf("Failed to subscribe to track %s: %v", track.TrackID, err)
continue
Expand Down
6 changes: 2 additions & 4 deletions pkg/conference/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ func (c *Conference) processMessages(signalDone chan struct{}) {
c.processPeerMessage(msg)
case msg := <-c.matrixEvents:
c.processMatrixMessage(msg)
case msg := <-c.publishedTrackStopped:
c.processPublishedTrackFailedMessage(msg.OwnerID, msg.TrackID)
}

// If there are no more participants, stop the conference.
Expand All @@ -42,10 +44,6 @@ func (c *Conference) processPeerMessage(message channel.Message[participant.ID,
c.processLeftTheCallMessage(message.Sender, msg)
case peer.NewTrackPublished:
c.processNewTrackPublishedMessage(message.Sender, msg)
case peer.RTPPacketReceived:
c.processRTPPacketReceivedMessage(msg)
case peer.PublishedTrackFailed:
c.processPublishedTrackFailedMessage(message.Sender, msg)
case peer.NewICECandidate:
c.processNewICECandidateMessage(message.Sender, msg)
case peer.ICEGatheringComplete:
Expand Down
Loading