-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Performant RTP Publisher #134
Conversation
A new generic publisher. It can't get more simple than that.
Now, the peer will just send the remote track to the conference and let the conference to create publishers and start go-routines for processing publishers and managing the lifetime of the tracks.
Now published track has its own package that encapsulated the logic related to the published tracks and manages the lifetime of the track and its subscriptions. This means that from now on, each published track (i.e. each Pion's `TrackRemote`) is handled separately from other tracks and by its own go-routine, which means that handling of packets that belong to separate tracks or even separate simulcast layers on a single track, are processed in parallel.
Co-authored-by: David Baker <[email protected]>
} | ||
|
||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like the publisher file, because it is how go deals with interfaces. The implementing package should return concrete pointer or struct and the interfaces belong here in the package that uses this. ❤️
An when you move the method: extractRemoteTrack from the track
pkg to the publisher
pkg you can make the Track
interface private.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
An when you move the method: extractRemoteTrack from the track pkg to the publisher pkg you can make the Track interface private.
But wouldn't it defeat the whole purpose of having an interface? 🙂 I.e. currently the Publisher
is quite generic, so we can use it with any track that provides capabilities of reading from and to any subscription that provides the capability of writing to it. Making Track
private would mean that we would be bound to a specific type of a RemoteTrack
as the track that could be used with the Publisher
(which is not bad in general given that we don't use any other types of track at the moment, but would arguably be a less generic code, i.e. harder to write a unit test for if the type is fixed, while with interfaces we could provide mock objects easier)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's an important key question you're asking. Go is not an object oriented language such as Java or C++. I would say Go is structure orientated language. In Java you hide the implementation behind an interface. So you can offer different implementations. In Go you try to base on minimal external structures and have you have only one implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe an interesting fact that underlines this. In Go there is no inheritance concept. Behavior reuse can only be achieved through composition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say Go is structure orientated language. In Java you hide the implementation behind an interface. So you can offer different implementations. In Go you try to base on minimal external structures and have you have only one implementation.
I think all languages that offer some concept of interfaces or traits use them to primarily allow confining/simplifying what a certain entity represents. I.e. if you have a function foo()
that accepts a parameter of an entity a
, oftentimes a
expects/requires only certain traits from the entity a
, so to avoid code being coupled and avoiding mistakes, oftentimes the access to a
is confined via an interface, i.e. the interface describes the only traits of an entity a
that we require in a given context, which allows us to hide the actual complexity of the entity being passed and make it more generic, so it produces a safer and less coupled code.
As for whether or not only one entity implements the interface or trait IMHO really depends on the use case. But more often than not, an interface may have multiple implementations.
Essentially, by using interfaces in publisher
, I wanted to make sure that I don't import too many concrete types and instead provide a generic publisher implementation that could be used regardless of what type of tracks and subscriptions we pass as long as they satisfy the requirements (interfaces). That way we can also unit test things easier and exchange the transport type in the future without making any changes to the publisher.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two purpose for interfaces:
- Describe conditions your code need. An interface as method parameters.
- Hide implementations. For example a method signature has an interface as return value.
The second purpose leads more or less to inheritance. Inheritance leads often to strong coupling. This is the reason in Go no inheritance concept is existing. So, People do the conclusion: In Go you should avoid the second purpose of interfaces. (Maybe someone else has a more accessible explanation than I do right now :D)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a rule called: accept interfaces return structs
. Maybe if you google this its helps more as my bad explanations.
|
||
var ErrSubscriptionExists = errors.New("subscription already exists") | ||
|
||
type Subscription interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be private like:
type subscription interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will it work? I.e. the Publisher.AddSubscription()
function and Publisher.RemoveSubscription()
are both public functions that accept subscription Subscription
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, because a struct doesn’t have to implement an interface in Go. Go use structural typing (duck typing). This follows the Interface segregation principle. In Go interfaces should be small as possible and they not describe what you offer they describe what you need.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, yet I still don't think making an interface private would work. Subscription
interface is used in a publicly-exposed function which means that the types that it uses must be public as well, otherwise we're using unexported/private types in the exported/public functions.
I quickly tried to change the type to private and indeed it stopped compiling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed this in this way, and it compiling.
type subscription interface {
// WriteRTP **must not** block (wait on I/O).
WriteRTP(packet rtp.Packet) error
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool!
If there is anything that I could improve, let me know, I would be glad to make the code better 🙂
From what I understood, currently, you only suggest changing to make interfaces in the publisher
package private, right? - I'm not opposed to the change, but I wonder what consequences it would have. I.e. if someone who uses the package would be able to see the private interface in documentation to browse it and see which methods to implement etc. It seems like, in the majority of the cases, interfaces for the packages are exported, and made public (so that the caller can use the interface if there is a need for it).
E.g. I could imagine that if the caller has a hash-map of subscriptions, they might store it as map[string]publisher.Subscription
or something like this, whereas if we make it private, we prohibit such usage of a Subscription
(I don't see a strong reason of doing it, after all, Subscription
is featured in public functions that we export in the package, so it feels like all types that are featured in the public interface, should be readable ideally?). I would understand if it was an internal interface that is only used inside a private structure or function and nowhere else, but it seems like we have a slightly different case here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The second point was, how we deal with signal channels from parent go routines.
A child should tell parent routine about close than reaction on parent routine closing. But that's what we discussed, it can't be implemented right now and we're pushing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To your example:
I think a caller with a hash-map of subscriptions should knows (define by him self) the structure about what the he save.
The publisher only have to know thats a subscriptions has a method: WriteRTP(packet rtp.Packet) error
thats it.
The Caller should not ask the Publisher what he knows about subscriptions?
Maybe the name of this subscription interface is miss leading. In idiomatic Go we would name the interface like:
type rtpWriter interface {
// WriteRTP **must not** block (wait on I/O).
WriteRTP(packet rtp.Packet) error
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe another point to this view. (Even if I'm starting a fundamental discussion here 👀 )
Patterns like Factory do not makes sense in Go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The second point was, how we deal with signal channels from parent go routines.
A child should tell parent routine about close than reaction on parent routine closing.
Do you mean that the parent should never inform the child that they're done, signaling the child to stop?
I think a caller with a hash-map of subscriptions should knows (define by him self) the structure about what the he save.
That is true, yet the caller may (for whatever reason) store the list of all available subscriptions in a collection, let's say in a map or in an array. The map or an array can be a collection of heterogenous types, all of which only have one thing in common: they are all subscriptions. This way, the caller may store them map[string]publisher.Subscription
, otherwise there will be no other option apart from map[string]interface{}
to store heterogeneous types in a single collection.
We as publishers don't care about the exact type or the implementation details of the subscriptions, we're ok accepting any type as long as it satisfies the contract (the interface), the actual subscription could be anything from a WebRTC track to just plain RTP sink or any other stream that accepts rtp packets 🙂
Patterns like Factory do not makes sense in Go.
Sorry, I did not get what you meant by it (in particular, what it related to 🙂).
select { | ||
case t.publishedTrackStopped <- TrackStoppedMessage{remoteTrack.ID(), participantID}: | ||
case <-t.conferenceEnded: | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Select chooses only one case at random if both are ready. The secound case is to to prevent this goroutine from blocking indefinitely? Then the channel t.publishedTrackStopped
should never be closed, or we'll panic at some point. on the other hand an unclosed channel cannot be cleaned up by the garbage collector.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we were sure thats t.conferenceEnded
is closed before t.publishedTrackStopped
is closed and cleaned up, we could doing something like this:
select {
case <-t.conferenceEnded:
return
default:
t.publishedTrackStopped <- TrackStoppedMessage{remoteTrack.ID(), participantID}:
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A condition thats this not blocking endless is thats all messages from t.publishedTrackStopped
are read before close(t.publishedTrackStopped)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We often have that a send channel depends on another open state (here is conference still existsing)
What do you think about this? Has this side effects?
trackIsDone = c.tracker.AddPublishedTrack(sender, msg.RemoteTrack, trackMetadata)
go func() {
select {
case <- trackIsDone:
c.tracker.RemovePublishedTrack(id track.TrackID)
case <- t.conferenceEnded:
// stop track
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about this? Has these side effects?
Yeah, this will work! However, it would also mean that now the tracker and related fields in the Conference
and Tracker
must be protected by a mutex since now we have the possibility of mutating the data from 2 different threads. That's actually the only reason why I'm not calling a function from the go-routine in the current implementation, but sending a message to the conference, to ensure that handling of conference-state events is processed within the same conference loop and that we don't need to synchronize access to the variables. If we were to choose to mutate things directly from a go-routine, we need to ensure that all the data that we access/mutate from different threads is synchronized.
on the other hand an unclosed channel cannot be cleaned up by the garbage collector.
While I agree that generally closing channels is a good thing for the majority of the cases (there are cases when the Go channels are not closed sometimes, i.e. when there are multiple concurrent writers to it), I'm not sure about "unclosed channel cannot be cleaned up by the garbage collector". Are you sure about that? - AFAIK, the channels are freed by the garbage collector once they are not used.
IIRC all "reported cases" of people complaining that their memory is leaked when the channel is not closed are not caused by the channel not being closed, rather they are caused by the incorrect implementation on their side that does continue to use the channel (the sending part) indefinitely and hangs either on the sender or receiver side indefinitely effectively preventing the GC to clean up the channel since they are in use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure about that? - AFAIK, the channels are freed by the garbage collector once they are not used.
No. I was wrong with that. I was talking about buffered channels. But here as well, when the channel is not
referenced, the values in the channel buffer are also not referenced. An thy get cleaned from the gcc (Ian Lance Taylor, Dave Cheney, I think we can trust them :D)
So we have not to close channels, from this point of few.
(per commit review is recommended to follow the changes easier; only one of the commits [the latest one] is relatively large due to moving many things around)
A rough idea of the current conference is that the conference owns the state of the conference and was the only entity that modified and mutated the state of the conference. That was convenient as there is no need for any locks and the access to the data was sequentialised (see the sketch: #52). This worked under the assumption that processing a single message takes a very short time and does not really block the conference (i.e. handling all incoming events from the peers, Matrix signaling, etc).
At some, it was clear that certain functions were blocking, while others require more time to get processed than expected, so all blocking operations have been gradually moved to their separate workers/go-routines where the conference loop still received the messages, but offloaded the expensive processing of some of them to these workers while maintaining the state of the conference. That was implemented for e.g. processing the incoming RTP packets from the peers where each subscription had its own worker to process messages. Then the same was repeated for outgoing matrix signaling messages that also were moved into their queue.
While this helped, this did not really solve all the problems since the incoming RTP packets from peers were still sent to the conference (i.e. shared the same message queue) and while the RTP packets were not processed by the conference (the conference just read the packet from the queue, identified which subscription it belonged and dispatched it to the right worker), it still was not optimal as any slight lag or hiccup in the processing of any of the events would potentially delay the dispatching of all incoming RTP packets to their workers which means that a seldom lag could cost us several seconds (or even minutes) of freezes.
Sending RTP packets to the conference was not the best idea and it looked like it does not really belong there given that these packets don’t even mutate the conference state and are only relevant for their particular subscribers, so it was clear that the RTP packet handling must be moved out of the main conference queue processing.
This PR changes this by introducing a new logic where each peer only sends a message to the conference informing it about a new available track (or a simulcast quality). The conference then processes this message by creating a published track (that in turn creates a publisher that runs in its own go-routine). The publisher then reads packets from the incoming track and sends them to the subscriber workers (as the conference did before). This means that the RTP processing is now completely independent from the conference and runs in parallel with another packet handling. The published track (and the publisher) only share the list of available subscriptions with the conference (common shared data structure) the access to which is synchronized, which means that the only moment we currently lock/synchronize the access to the subscriptions is when the state of the subscription changes (layer changed, subscription starts/stops, etc).
This means that the existing publishers and subscribers are not really affected by the main conference loop much and once started, will continue to run efficiently.
Another thing that it allowed us to make is to regulate the size of the queue for the incoming RTP packets that each subscription gets. Since now each published track’s message about the incoming RTP is not sent on a ‘global’ conference queue, we have more control when it comes to the implementation of the backpressure. Currently, each publisher’s worker loop essentially looks like this:
So now it looks almost exactly the same as the processing of publishers in LiveKit: https://blog.livekit.io/going-beyond-a-single-core-4a464d20d17a/ (except that we went for the approach where we spawn a go-routine per publisher instead of creating a work-pool with a limited amount of workers, as it seems like the last one is a bit more complicated, but does not bring that much of a performance advantage).
Fixes #117
Relates to #120 (but only for RTP packets)