Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performant RTP Publisher #134

Merged
merged 7 commits into from
Feb 22, 2023
Merged

Performant RTP Publisher #134

merged 7 commits into from
Feb 22, 2023

Conversation

daniel-abramov
Copy link
Contributor

@daniel-abramov daniel-abramov commented Feb 15, 2023

(per commit review is recommended to follow the changes easier; only one of the commits [the latest one] is relatively large due to moving many things around)

A rough idea of the current conference is that the conference owns the state of the conference and was the only entity that modified and mutated the state of the conference. That was convenient as there is no need for any locks and the access to the data was sequentialised (see the sketch: #52). This worked under the assumption that processing a single message takes a very short time and does not really block the conference (i.e. handling all incoming events from the peers, Matrix signaling, etc).

At some, it was clear that certain functions were blocking, while others require more time to get processed than expected, so all blocking operations have been gradually moved to their separate workers/go-routines where the conference loop still received the messages, but offloaded the expensive processing of some of them to these workers while maintaining the state of the conference. That was implemented for e.g. processing the incoming RTP packets from the peers where each subscription had its own worker to process messages. Then the same was repeated for outgoing matrix signaling messages that also were moved into their queue.

While this helped, this did not really solve all the problems since the incoming RTP packets from peers were still sent to the conference (i.e. shared the same message queue) and while the RTP packets were not processed by the conference (the conference just read the packet from the queue, identified which subscription it belonged and dispatched it to the right worker), it still was not optimal as any slight lag or hiccup in the processing of any of the events would potentially delay the dispatching of all incoming RTP packets to their workers which means that a seldom lag could cost us several seconds (or even minutes) of freezes.

Sending RTP packets to the conference was not the best idea and it looked like it does not really belong there given that these packets don’t even mutate the conference state and are only relevant for their particular subscribers, so it was clear that the RTP packet handling must be moved out of the main conference queue processing.

This PR changes this by introducing a new logic where each peer only sends a message to the conference informing it about a new available track (or a simulcast quality). The conference then processes this message by creating a published track (that in turn creates a publisher that runs in its own go-routine). The publisher then reads packets from the incoming track and sends them to the subscriber workers (as the conference did before). This means that the RTP processing is now completely independent from the conference and runs in parallel with another packet handling. The published track (and the publisher) only share the list of available subscriptions with the conference (common shared data structure) the access to which is synchronized, which means that the only moment we currently lock/synchronize the access to the subscriptions is when the state of the subscription changes (layer changed, subscription starts/stops, etc).

This means that the existing publishers and subscribers are not really affected by the main conference loop much and once started, will continue to run efficiently.

Another thing that it allowed us to make is to regulate the size of the queue for the incoming RTP packets that each subscription gets. Since now each published track’s message about the incoming RTP is not sent on a ‘global’ conference queue, we have more control when it comes to the implementation of the backpressure. Currently, each publisher’s worker loop essentially looks like this:

packet = track.read()
if packet.error() {
    inform_conference_that_the_track_is_gone()
    return
}

for subscription in subscriptions {
    queue_full = subscription.enqueue(packet)
    if queue_full {
        // the packet was not delivered, so we drop this packet
        // log it, todo: we probably want to take the old packets from the queue here
    }
}

So now it looks almost exactly the same as the processing of publishers in LiveKit: https://blog.livekit.io/going-beyond-a-single-core-4a464d20d17a/ (except that we went for the approach where we spawn a go-routine per publisher instead of creating a work-pool with a limited amount of workers, as it seems like the last one is a bit more complicated, but does not bring that much of a performance advantage).


Fixes #117
Relates to #120 (but only for RTP packets)

A new generic publisher. It can't get more simple than that.
Now, the peer will just send the remote track to the conference and let
the conference to create publishers and start go-routines for processing
publishers and managing the lifetime of the tracks.
Now published track has its own package that encapsulated the logic
related to the published tracks and manages the lifetime of the track
and its subscriptions.

This means that from now on, each published track (i.e. each Pion's
`TrackRemote`) is handled separately from other tracks and by its own
go-routine, which means that handling of packets that belong to separate
tracks or even separate simulcast layers on a single track, are
processed in parallel.
@daniel-abramov daniel-abramov requested a review from a team February 15, 2023 11:13
pkg/conference/participant/tracker.go Outdated Show resolved Hide resolved
pkg/conference/participant/tracker.go Show resolved Hide resolved
pkg/conference/track/internal.go Outdated Show resolved Hide resolved
@daniel-abramov daniel-abramov requested a review from dbkr February 18, 2023 22:54
}

return nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like the publisher file, because it is how go deals with interfaces. The implementing package should return concrete pointer or struct and the interfaces belong here in the package that uses this. ❤️

An when you move the method: extractRemoteTrack from the track pkg to the publisher pkg you can make the Track interface private.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

An when you move the method: extractRemoteTrack from the track pkg to the publisher pkg you can make the Track interface private.

But wouldn't it defeat the whole purpose of having an interface? 🙂 I.e. currently the Publisher is quite generic, so we can use it with any track that provides capabilities of reading from and to any subscription that provides the capability of writing to it. Making Track private would mean that we would be bound to a specific type of a RemoteTrack as the track that could be used with the Publisher (which is not bad in general given that we don't use any other types of track at the moment, but would arguably be a less generic code, i.e. harder to write a unit test for if the type is fixed, while with interfaces we could provide mock objects easier)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an important key question you're asking. Go is not an object oriented language such as Java or C++. I would say Go is structure orientated language. In Java you hide the implementation behind an interface. So you can offer different implementations. In Go you try to base on minimal external structures and have you have only one implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe an interesting fact that underlines this. In Go there is no inheritance concept. Behavior reuse can only be achieved through composition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say Go is structure orientated language. In Java you hide the implementation behind an interface. So you can offer different implementations. In Go you try to base on minimal external structures and have you have only one implementation.

I think all languages that offer some concept of interfaces or traits use them to primarily allow confining/simplifying what a certain entity represents. I.e. if you have a function foo() that accepts a parameter of an entity a, oftentimes a expects/requires only certain traits from the entity a, so to avoid code being coupled and avoiding mistakes, oftentimes the access to a is confined via an interface, i.e. the interface describes the only traits of an entity a that we require in a given context, which allows us to hide the actual complexity of the entity being passed and make it more generic, so it produces a safer and less coupled code.

As for whether or not only one entity implements the interface or trait IMHO really depends on the use case. But more often than not, an interface may have multiple implementations.

Essentially, by using interfaces in publisher, I wanted to make sure that I don't import too many concrete types and instead provide a generic publisher implementation that could be used regardless of what type of tracks and subscriptions we pass as long as they satisfy the requirements (interfaces). That way we can also unit test things easier and exchange the transport type in the future without making any changes to the publisher.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two purpose for interfaces:

  1. Describe conditions your code need. An interface as method parameters.
  2. Hide implementations. For example a method signature has an interface as return value.

The second purpose leads more or less to inheritance. Inheritance leads often to strong coupling. This is the reason in Go no inheritance concept is existing. So, People do the conclusion: In Go you should avoid the second purpose of interfaces. (Maybe someone else has a more accessible explanation than I do right now :D)

Copy link
Contributor

@EnricoSchw EnricoSchw Feb 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a rule called: accept interfaces return structs. Maybe if you google this its helps more as my bad explanations.


var ErrSubscriptionExists = errors.New("subscription already exists")

type Subscription interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be private like:

type subscription interface

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it work? I.e. the Publisher.AddSubscription() function and Publisher.RemoveSubscription() are both public functions that accept subscription Subscription.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because a struct doesn’t have to implement an interface in Go. Go use structural typing (duck typing). This follows the Interface segregation principle. In Go interfaces should be small as possible and they not describe what you offer they describe what you need.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, yet I still don't think making an interface private would work. Subscription interface is used in a publicly-exposed function which means that the types that it uses must be public as well, otherwise we're using unexported/private types in the exported/public functions.

I quickly tried to change the type to private and indeed it stopped compiling.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this in this way, and it compiling.

type subscription interface {
	// WriteRTP **must not** block (wait on I/O).
	WriteRTP(packet rtp.Packet) error
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool!

If there is anything that I could improve, let me know, I would be glad to make the code better 🙂

From what I understood, currently, you only suggest changing to make interfaces in the publisher package private, right? - I'm not opposed to the change, but I wonder what consequences it would have. I.e. if someone who uses the package would be able to see the private interface in documentation to browse it and see which methods to implement etc. It seems like, in the majority of the cases, interfaces for the packages are exported, and made public (so that the caller can use the interface if there is a need for it).

E.g. I could imagine that if the caller has a hash-map of subscriptions, they might store it as map[string]publisher.Subscription or something like this, whereas if we make it private, we prohibit such usage of a Subscription (I don't see a strong reason of doing it, after all, Subscription is featured in public functions that we export in the package, so it feels like all types that are featured in the public interface, should be readable ideally?). I would understand if it was an internal interface that is only used inside a private structure or function and nowhere else, but it seems like we have a slightly different case here.

Copy link
Contributor

@EnricoSchw EnricoSchw Feb 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second point was, how we deal with signal channels from parent go routines.
A child should tell parent routine about close than reaction on parent routine closing. But that's what we discussed, it can't be implemented right now and we're pushing it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To your example:
I think a caller with a hash-map of subscriptions should knows (define by him self) the structure about what the he save.
The publisher only have to know thats a subscriptions has a method: WriteRTP(packet rtp.Packet) error thats it.
The Caller should not ask the Publisher what he knows about subscriptions?

Maybe the name of this subscription interface is miss leading. In idiomatic Go we would name the interface like:

type rtpWriter interface {
	// WriteRTP **must not** block (wait on I/O).
	WriteRTP(packet rtp.Packet) error
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe another point to this view. (Even if I'm starting a fundamental discussion here 👀 )

Patterns like Factory do not makes sense in Go.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second point was, how we deal with signal channels from parent go routines.
A child should tell parent routine about close than reaction on parent routine closing.

Do you mean that the parent should never inform the child that they're done, signaling the child to stop?

I think a caller with a hash-map of subscriptions should knows (define by him self) the structure about what the he save.

That is true, yet the caller may (for whatever reason) store the list of all available subscriptions in a collection, let's say in a map or in an array. The map or an array can be a collection of heterogenous types, all of which only have one thing in common: they are all subscriptions. This way, the caller may store them map[string]publisher.Subscription, otherwise there will be no other option apart from map[string]interface{} to store heterogeneous types in a single collection.

We as publishers don't care about the exact type or the implementation details of the subscriptions, we're ok accepting any type as long as it satisfies the contract (the interface), the actual subscription could be anything from a WebRTC track to just plain RTP sink or any other stream that accepts rtp packets 🙂

Patterns like Factory do not makes sense in Go.

Sorry, I did not get what you meant by it (in particular, what it related to 🙂).

select {
case t.publishedTrackStopped <- TrackStoppedMessage{remoteTrack.ID(), participantID}:
case <-t.conferenceEnded:
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Select chooses only one case at random if both are ready. The secound case is to to prevent this goroutine from blocking indefinitely? Then the channel t.publishedTrackStopped should never be closed, or we'll panic at some point. on the other hand an unclosed channel cannot be cleaned up by the garbage collector.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we were sure thats t.conferenceEnded is closed before t.publishedTrackStopped is closed and cleaned up, we could doing something like this:

select {
   case <-t.conferenceEnded: 
      return
   default: 
      t.publishedTrackStopped <- TrackStoppedMessage{remoteTrack.ID(), participantID}:	
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A condition thats this not blocking endless is thats all messages from t.publishedTrackStopped are read before close(t.publishedTrackStopped)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We often have that a send channel depends on another open state (here is conference still existsing)

What do you think about this? Has this side effects?

trackIsDone = c.tracker.AddPublishedTrack(sender, msg.RemoteTrack, trackMetadata)
go func() {
   select {
      case <- trackIsDone: 
           c.tracker.RemovePublishedTrack(id track.TrackID)
      case <- t.conferenceEnded:
          // stop track
   }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about this? Has these side effects?

Yeah, this will work! However, it would also mean that now the tracker and related fields in the Conference and Tracker must be protected by a mutex since now we have the possibility of mutating the data from 2 different threads. That's actually the only reason why I'm not calling a function from the go-routine in the current implementation, but sending a message to the conference, to ensure that handling of conference-state events is processed within the same conference loop and that we don't need to synchronize access to the variables. If we were to choose to mutate things directly from a go-routine, we need to ensure that all the data that we access/mutate from different threads is synchronized.

on the other hand an unclosed channel cannot be cleaned up by the garbage collector.

While I agree that generally closing channels is a good thing for the majority of the cases (there are cases when the Go channels are not closed sometimes, i.e. when there are multiple concurrent writers to it), I'm not sure about "unclosed channel cannot be cleaned up by the garbage collector". Are you sure about that? - AFAIK, the channels are freed by the garbage collector once they are not used.

IIRC all "reported cases" of people complaining that their memory is leaked when the channel is not closed are not caused by the channel not being closed, rather they are caused by the incorrect implementation on their side that does continue to use the channel (the sending part) indefinitely and hangs either on the sender or receiver side indefinitely effectively preventing the GC to clean up the channel since they are in use.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure about that? - AFAIK, the channels are freed by the garbage collector once they are not used.

No. I was wrong with that. I was talking about buffered channels. But here as well, when the channel is not
referenced, the values in the channel buffer are also not referenced. An thy get cleaned from the gcc (Ian Lance Taylor, Dave Cheney, I think we can trust them :D)

So we have not to close channels, from this point of few.

@daniel-abramov daniel-abramov merged commit 401026e into main Feb 22, 2023
@daniel-abramov daniel-abramov deleted the performant-rtp-publisher branch February 22, 2023 21:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Make each publisher send RTP packets directly to its subscribers
3 participants