-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SFU Refactoring #52
SFU Refactoring #52
Changes from all commits
bf6b987
c5c6206
ba2fa4b
85aa12c
ba6eb3f
abef0c5
532773f
4d7970c
83a49b5
ce32481
b47600c
a8312b0
0b75dfc
e0f4cb7
34ae746
877f7c2
e2d73f9
bd023b6
b53d197
40cb5fe
c629c7e
3274194
6b31a98
b991a3a
9d45674
1c3b702
82dac24
415e242
cc09692
db900b2
2f809b1
01fef5c
479863d
7fbacad
a4f420e
67e469e
ff687e2
d5ffe81
5563dd5
ab53c8e
0c8d528
bd313c0
1b7bdbe
39deed0
74014f0
5fe188d
88997da
d2cce02
ec67906
4ef47e0
ce110b5
ae5da43
73a6947
5a78cca
eecdde2
e208b4b
70a4ede
fd545d0
2649e1f
0369070
dc09318
537f4c0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,6 @@ | ||
homeserverurl: "http://localhost:8008" | ||
userid: "@sfu:shadowfax" | ||
accesstoken: "..." | ||
timeout: 30 | ||
matrix: | ||
homeserverurl: "http://localhost:8008" | ||
userid: "@sfu:shadowfax" | ||
accesstoken: "..." | ||
conference: | ||
timeout: 30 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
package common | ||
|
||
import "sync/atomic" | ||
|
||
// In Go, unbounded channel means something different than what it means in Rust. | ||
// I.e. unlike Rust, "unbounded" in Go means that the channel has **no buffer**, | ||
// meaning that each attempt to send will block the channel until the receiver | ||
// reads it. Majority of primitives here in `waterfall` are designed under assumption | ||
// that sending is not blocking. | ||
const UnboundedChannelSize = 128 | ||
|
||
// Creates a new channel, returns two counterparts of it where one can only send and another can only receive. | ||
// Unlike traditional Go channels, these allow the receiver to mark the channel as closed which would then fail | ||
// to send any messages to the channel over `Send“. | ||
func NewChannel[M any]() (Sender[M], Receiver[M]) { | ||
channel := make(chan M, UnboundedChannelSize) | ||
closed := &atomic.Bool{} | ||
sender := Sender[M]{channel, closed} | ||
receiver := Receiver[M]{channel, closed} | ||
return sender, receiver | ||
} | ||
|
||
// Sender counterpart of the channel. | ||
type Sender[M any] struct { | ||
// The channel itself. | ||
channel chan<- M | ||
// Atomic variable that indicates whether the channel is closed. | ||
receiverClosed *atomic.Bool | ||
} | ||
|
||
// Tries to send a message if the channel is not closed. | ||
// Returns the message back if the channel is closed. | ||
func (s *Sender[M]) Send(message M) *M { | ||
if !s.receiverClosed.Load() { | ||
s.channel <- message | ||
return nil | ||
} else { | ||
return &message | ||
} | ||
} | ||
|
||
// The receiver counterpart of the channel. | ||
type Receiver[M any] struct { | ||
// The channel itself. It's public, so that we can combine it in `select` statements. | ||
Channel <-chan M | ||
// Atomic variable that indicates whether the channel is closed. | ||
receiverClosed *atomic.Bool | ||
} | ||
|
||
// Marks the channel as closed, which means that no messages could be sent via this channel. | ||
// Any attempt to send a message would result in an error. This is similar to closing the | ||
// channel except that we don't close the underlying channel (since in Go receivers can't | ||
// close the channel). | ||
// | ||
// This function reads (in a non-blocking way) all pending messages until blocking. Otherwise, | ||
// they will stay forver in a channel and get lost. | ||
func (r *Receiver[M]) Close() []M { | ||
r.receiverClosed.Store(true) | ||
|
||
messages := make([]M, 0) | ||
for { | ||
msg, ok := <-r.Channel | ||
if !ok { | ||
break | ||
} | ||
messages = append(messages, msg) | ||
} | ||
|
||
return messages | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
package common | ||
|
||
import ( | ||
"errors" | ||
"sync/atomic" | ||
) | ||
|
||
// MessageSink is a helper struct that allows to send messages to a message sink. | ||
// The MessageSink abstracts the message sink which has a certain sender, so that | ||
// the sender does not have to be specified every time a message is sent. | ||
// At the same it guarantees that the caller can't alter the `sender`, which means that | ||
// the sender can't impersonate another sender (and we guarantee this on a compile-time). | ||
type MessageSink[SenderType comparable, MessageType any] struct { | ||
// The sender of the messages. This is useful for multiple-producer-single-consumer scenarios. | ||
sender SenderType | ||
// The message sink to which the messages are sent. | ||
messageSink chan<- Message[SenderType, MessageType] | ||
// Atomic variable that indicates whether the message sink is sealed. | ||
// This is used to prevent sending messages to a sealed message sink. | ||
// The variable is atomic because it may be accessed from multiple goroutines. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For clarity's sake, could we define what 'sealed' means here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, will add a comment about that! Perhaps I need to rewrite the comment for this variable altogether. |
||
sealed atomic.Bool | ||
} | ||
|
||
// Creates a new MessageSink. The function is generic allowing us to use it for multiple use cases. | ||
func NewMessageSink[S comparable, M any](sender S, messageSink chan<- Message[S, M]) *MessageSink[S, M] { | ||
return &MessageSink[S, M]{ | ||
sender: sender, | ||
messageSink: messageSink, | ||
} | ||
} | ||
|
||
// Sends a message to the message sink. | ||
func (s *MessageSink[S, M]) Send(message M) error { | ||
if s.sealed.Load() { | ||
return errors.New("The channel is sealed, you can't send any messages over it") | ||
} | ||
|
||
s.messageSink <- Message[S, M]{ | ||
Sender: s.sender, | ||
Content: message, | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// Seals the channel, which means that no messages could be sent via this channel. | ||
// Any attempt to send a message would result in an error. This is similar to closing the | ||
// channel except that we don't close the underlying channel (since there might be other | ||
// senders that may want to use it). | ||
func (s *MessageSink[S, M]) Seal() { | ||
s.sealed.Store(true) | ||
} | ||
|
||
// Messages that are sent from the peer to the conference in order to communicate with other peers. | ||
// Since each peer is isolated from others, it can't influence the state of other peers directly. | ||
type Message[SenderType comparable, MessageType any] struct { | ||
// The sender of the message. | ||
Sender SenderType | ||
// The content of the message. | ||
Content MessageType | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
package conference | ||
|
||
// Configuration for the group conferences (calls). | ||
type Config struct { | ||
// Keep-alive timeout for WebRTC connections. If no keep-alive has been received | ||
// from the client for this duration, the connection is considered dead (in seconds). | ||
KeepAliveTimeout int `yaml:"timeout"` | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
package conference | ||
|
||
import ( | ||
"github.com/pion/webrtc/v3" | ||
"golang.org/x/exp/slices" | ||
"maunium.net/go/mautrix/event" | ||
) | ||
|
||
// Handle the `SFUMessage` event from the DataChannel message. | ||
func (c *Conference) processSelectDCMessage(participant *Participant, msg event.SFUMessage) { | ||
participant.logger.Info("Received select request over DC") | ||
|
||
// Find tracks based on what we were asked for. | ||
tracks := c.getTracks(msg.Start) | ||
|
||
// Let's check if we have all the tracks that we were asked for are there. | ||
// If not, we will list which are not available (later on we must inform participant | ||
// about it unless the participant retries it). | ||
if len(tracks) != len(msg.Start) { | ||
for _, expected := range msg.Start { | ||
found := slices.IndexFunc(tracks, func(track *webrtc.TrackLocalStaticRTP) bool { | ||
return track.StreamID() == expected.StreamID && track.ID() == expected.TrackID | ||
}) | ||
|
||
if found == -1 { | ||
c.logger.Warnf("Track not found: %s", expected.TrackID) | ||
} | ||
} | ||
} | ||
|
||
// Subscribe to the found tracks. | ||
for _, track := range tracks { | ||
if err := participant.peer.SubscribeTo(track); err != nil { | ||
participant.logger.Errorf("Failed to subscribe to track: %v", err) | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (c *Conference) processAnswerDCMessage(participant *Participant, msg event.SFUMessage) { | ||
participant.logger.Info("Received SDP answer over DC") | ||
|
||
if err := participant.peer.ProcessSDPAnswer(msg.SDP); err != nil { | ||
participant.logger.Errorf("Failed to set SDP answer: %v", err) | ||
return | ||
} | ||
} | ||
|
||
func (c *Conference) processPublishDCMessage(participant *Participant, msg event.SFUMessage) { | ||
participant.logger.Info("Received SDP offer over DC") | ||
|
||
answer, err := participant.peer.ProcessSDPOffer(msg.SDP) | ||
if err != nil { | ||
participant.logger.Errorf("Failed to set SDP offer: %v", err) | ||
return | ||
} | ||
|
||
participant.streamMetadata = msg.Metadata | ||
|
||
participant.sendDataChannelMessage(event.SFUMessage{ | ||
Op: event.SFUOperationAnswer, | ||
SDP: answer.SDP, | ||
Metadata: c.getAvailableStreamsFor(participant.id), | ||
}) | ||
} | ||
|
||
func (c *Conference) processUnpublishDCMessage(participant *Participant) { | ||
participant.logger.Info("Received unpublish over DC") | ||
} | ||
|
||
func (c *Conference) processAliveDCMessage(participant *Participant) { | ||
participant.peer.ProcessHeartbeat() | ||
} | ||
|
||
func (c *Conference) processMetadataDCMessage(participant *Participant, msg event.SFUMessage) { | ||
participant.streamMetadata = msg.Metadata | ||
c.resendMetadataToAllExcept(participant.id) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is that this is considered a bit of a code smell in Go, since a sender should never be trying to write to a closed channel. I guess this means we have something where we can't know this and need to check each time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good that you've noticed it: tbh I did not like the part of writing this particular structure as it felt like it's not very elegant, but unfortunately I did not find a better way to write it.
The problem that I'm trying to solve is to indeed not write to the closed channel, but the problem is that from Go's standpoint the channel is not closed since the channels could only be closed in Go from the sender's side, but not from the receiver's side.
A practical example of where it happens in our code:
conf_id,
and simply sends the message to the conference that is responsible for this message. So there is a channel between the router and a particular conference.The problem occurs because at the moment the conference is considered ended, the channel remains open (the conference can't close the channel because the conference only holds a receiver part of the channel). And in Go there is no way to check if the channel is alive or if there is someone listening on the channel. So this means that the router might have sent certain messages to the conference expecting that the conference will read them whereas, in reality, the conference has stopped listening on the channel by the moment the router gets to know that the conference is dead.
To sum up:
That's pretty much the problem that I tried to solve with this logic, i.e. create a wrapper around the channel where I could inform the sender that the receiver is not listening to the changes, so that if the sender tries to send the message to such a channel, it gets the message back and knows that there are no receivers anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the detailed explanation - I've also found Go is a bit light on actually explaining how you would write correct & non-racy code sticking to the given paradigms.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right! I've also spent some time trying to understand how to do things safely. In Rust, for instance, we always know if there is a sender and if there is a receiver, but in Go, an elegant implementation of this does not work because of garbage collection: the receivers are not deleted immediately after leaving the scope of a function, but only get removed when the GC kicks in which means that the lifetime of the objects is not strictly defined, hence the consequence of us not knowing when the other counterpart of a channel is dead.