Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SFU Refactoring #52

Merged
merged 62 commits into from
Dec 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
bf6b987
refactor: clean up main.go a bit
daniel-abramov Nov 17, 2022
c5c6206
refactor: renaming modules and functions names
daniel-abramov Nov 17, 2022
ba2fa4b
refactor: define a new skeleton for the project
daniel-abramov Nov 18, 2022
85aa12c
refactor: create a better structure for the sources
daniel-abramov Nov 21, 2022
ba6eb3f
refactor: finalize signaling and peer communication
daniel-abramov Nov 22, 2022
abef0c5
refactor: define a package for message sink
daniel-abramov Nov 24, 2022
532773f
general: rename `src` to `pkg`
daniel-abramov Nov 24, 2022
4d7970c
conference: define sane logic for new participants
daniel-abramov Nov 24, 2022
83a49b5
conference: handle publish messages from peers
daniel-abramov Nov 24, 2022
ce32481
paer: unsubscribe from obsolete tracks
daniel-abramov Nov 24, 2022
b47600c
peer: add naive handling of RTCPs
daniel-abramov Nov 24, 2022
a8312b0
router: remove useles checks for types
daniel-abramov Nov 25, 2022
0b75dfc
conference: rename peer message sink for clarity
daniel-abramov Nov 25, 2022
e0f4cb7
conference: ensure a proper conference lifetime
daniel-abramov Nov 25, 2022
34ae746
refactor(conference): rename matrix message struct
daniel-abramov Nov 26, 2022
877f7c2
conference: inform owner when the conference ended
daniel-abramov Nov 26, 2022
e2d73f9
general: remove custom logger
daniel-abramov Nov 28, 2022
bd023b6
logger: skip checking the TTY
daniel-abramov Nov 28, 2022
b53d197
router: fix a typo for the OnInvite event
daniel-abramov Nov 28, 2022
40cb5fe
router: fix a typo on the type of the printf
daniel-abramov Nov 28, 2022
c629c7e
refactor(conference): simplify new participant
daniel-abramov Nov 28, 2022
3274194
conference: add additional logging
daniel-abramov Nov 28, 2022
6b31a98
router: ignore messages for unknown conferences
daniel-abramov Nov 28, 2022
b991a3a
signaling: fix wrongly set party ID
daniel-abramov Nov 28, 2022
9d45674
ice: add an empty user fragment to the ICE
daniel-abramov Nov 28, 2022
1c3b702
peer: use GatheringCompletePromise from Pion
daniel-abramov Nov 28, 2022
82dac24
Revert "peer: use GatheringCompletePromise from Pion"
daniel-abramov Nov 28, 2022
415e242
conference: add additional call invite logging
daniel-abramov Nov 28, 2022
cc09692
conference: fix unsoundness in channel usage
daniel-abramov Nov 28, 2022
db900b2
signaling: add additional debug logs
daniel-abramov Nov 28, 2022
2f809b1
conference: add `call_id` handling
daniel-abramov Nov 28, 2022
01fef5c
conference: use SFU device ID on SelectAnswer
daniel-abramov Nov 28, 2022
479863d
config: make log level configurable
daniel-abramov Nov 28, 2022
7fbacad
channel: improve documentation and API surface
daniel-abramov Nov 28, 2022
a4f420e
conference: check for nil rtp tracks
daniel-abramov Nov 29, 2022
67e469e
conference: fix the subscribe/unsubscribe logic
daniel-abramov Nov 29, 2022
ff687e2
peer: fix a potential segfault
daniel-abramov Nov 29, 2022
d5ffe81
peer: don't fail on certain RTCP write errors
daniel-abramov Nov 30, 2022
5563dd5
peer: use sdpAnswer from crateAnswer
daniel-abramov Nov 30, 2022
ab53c8e
conference: handle metadata of streams properly
daniel-abramov Nov 30, 2022
0c8d528
conference: add hacky way to send unknown metadata
daniel-abramov Nov 30, 2022
bd313c0
conference: update TODOs and FIXMEs
daniel-abramov Nov 30, 2022
1b7bdbe
conference: attach stream metadata to offers
daniel-abramov Nov 30, 2022
39deed0
conference: accept the metadata from the DC offer
daniel-abramov Nov 30, 2022
74014f0
minor: fix typo in a comment in a signaling comment
daniel-abramov Nov 30, 2022
5fe188d
Update the documentation in a conference state
daniel-abramov Dec 1, 2022
88997da
minor: rename `UserID` to `userID`
daniel-abramov Dec 1, 2022
d2cce02
peer: implement heartbeat handling for keepalive
daniel-abramov Dec 1, 2022
ec67906
minor: rename `RunSync` -> `RunSyncing()`
daniel-abramov Dec 2, 2022
4ef47e0
Remove ugly RTCP handling
SimonBrandner Dec 3, 2022
ce110b5
Implement RTCP forwarding
SimonBrandner Dec 3, 2022
ae5da43
Further refactor code
SimonBrandner Dec 3, 2022
73a6947
Make `minimalPLIInterval` local
SimonBrandner Dec 5, 2022
5a78cca
Fix typo
SimonBrandner Dec 5, 2022
eecdde2
Handle missing track
SimonBrandner Dec 5, 2022
e208b4b
`ForwardRTCP` -> `RTCPReceived`
SimonBrandner Dec 5, 2022
70a4ede
Decapitalize
SimonBrandner Dec 5, 2022
fd545d0
Explain `lastPLITimestamp`
SimonBrandner Dec 5, 2022
2649e1f
Use `time.Time` as type
SimonBrandner Dec 5, 2022
0369070
Simplify `lastPLITimestamp`
SimonBrandner Dec 5, 2022
dc09318
Remove leftover
SimonBrandner Dec 5, 2022
537f4c0
general: resolve leftovers after rebase
daniel-abramov Dec 5, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ linters:
- gomnd # we use status code numbers and for our use case it's not practical
- godox # we have TODOs at this stage of the project, enable in future
- forbidigo # we use things like fmt.Printf for debugging, enable in future
- wsl # somehow this plugin causes more harm than use as it enables lots of things to be configured without causing spaghetti-code (grouping similar things together)
- nlreturn # not always practical, it was disabled before strict lints were introduced, then added, now it's clear why it was disabled at the first place :)
fast: true
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ COPY go.sum ./
# source code do not invalidate our downloaded layer.
RUN go mod download

COPY ./src ./src
COPY ./pkg ./pkg

RUN go build -o /waterfall ./src
RUN go build -o /waterfall ./pkg


##
Expand Down
10 changes: 6 additions & 4 deletions config.yaml.sample
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
homeserverurl: "http://localhost:8008"
userid: "@sfu:shadowfax"
accesstoken: "..."
timeout: 30
matrix:
homeserverurl: "http://localhost:8008"
userid: "@sfu:shadowfax"
accesstoken: "..."
conference:
timeout: 30
10 changes: 6 additions & 4 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ services:
environment:
# Set the `CONFIG` to the configuration you want.
CONFIG: |
homeserverurl: "http://localhost:8008"
userid: "@sfu:shadowfax"
accesstoken: "..."
timeout: 30
matrix:
homeserverurl: "http://localhost:8008"
userid: "@sfu:shadowfax"
accesstoken: "..."
conference:
timeout: 30
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require github.com/pion/webrtc/v3 v3.1.31
require (
github.com/pion/rtcp v1.2.9
github.com/sirupsen/logrus v1.9.0
golang.org/x/exp v0.0.0-20221114191408-850992195362
gopkg.in/yaml.v3 v3.0.1
maunium.net/go/mautrix v0.11.0
)
Expand Down Expand Up @@ -34,7 +35,7 @@ require (
github.com/tidwall/sjson v1.2.4 // indirect
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
golang.org/x/sys v0.1.0 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
)

Expand Down
5 changes: 4 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20221114191408-850992195362 h1:NoHlPRbyl1VFI6FjwHtPQCN7wAMXI6cKcqrmXhOOfBQ=
golang.org/x/exp v0.0.0-20221114191408-850992195362/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
Expand Down Expand Up @@ -134,8 +136,9 @@ golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
70 changes: 70 additions & 0 deletions pkg/common/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package common

import "sync/atomic"

// In Go, unbounded channel means something different than what it means in Rust.
// I.e. unlike Rust, "unbounded" in Go means that the channel has **no buffer**,
// meaning that each attempt to send will block the channel until the receiver
// reads it. Majority of primitives here in `waterfall` are designed under assumption
// that sending is not blocking.
const UnboundedChannelSize = 128

// Creates a new channel, returns two counterparts of it where one can only send and another can only receive.
// Unlike traditional Go channels, these allow the receiver to mark the channel as closed which would then fail
// to send any messages to the channel over `Send“.
func NewChannel[M any]() (Sender[M], Receiver[M]) {
channel := make(chan M, UnboundedChannelSize)
closed := &atomic.Bool{}
sender := Sender[M]{channel, closed}
receiver := Receiver[M]{channel, closed}
return sender, receiver
}

// Sender counterpart of the channel.
type Sender[M any] struct {
// The channel itself.
channel chan<- M
// Atomic variable that indicates whether the channel is closed.
receiverClosed *atomic.Bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that this is considered a bit of a code smell in Go, since a sender should never be trying to write to a closed channel. I guess this means we have something where we can't know this and need to check each time?

Copy link
Contributor Author

@daniel-abramov daniel-abramov Dec 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good that you've noticed it: tbh I did not like the part of writing this particular structure as it felt like it's not very elegant, but unfortunately I did not find a better way to write it.

The problem that I'm trying to solve is to indeed not write to the closed channel, but the problem is that from Go's standpoint the channel is not closed since the channels could only be closed in Go from the sender's side, but not from the receiver's side.

A practical example of where it happens in our code:

  • There is a router in the code: a structure that simply has a map of all conferences running on the SFU. The router receives signaling messages (To-Device messages), checks their conf_id, and simply sends the message to the conference that is responsible for this message. So there is a channel between the router and a particular conference.
  • Conference is listening to the channel and reacting to the messages. But at some point, all participants leave the conference which means that the conference must be ended. So the conference goroutine and the main loop of the conference end, sending a message to the router telling "Ok, I'm done, you can remove it".

The problem occurs because at the moment the conference is considered ended, the channel remains open (the conference can't close the channel because the conference only holds a receiver part of the channel). And in Go there is no way to check if the channel is alive or if there is someone listening on the channel. So this means that the router might have sent certain messages to the conference expecting that the conference will read them whereas, in reality, the conference has stopped listening on the channel by the moment the router gets to know that the conference is dead.

To sum up:

  • There is a way in Go to check if the channel is closed by the sender on the receiver's side.
  • There is no way in Go to check if there is any listener on the sender's side.
  • We need a way to know (on the sender side) if someone is listening to the channel (in order to know if it makes sense to post a message).

That's pretty much the problem that I tried to solve with this logic, i.e. create a wrapper around the channel where I could inform the sender that the receiver is not listening to the changes, so that if the sender tries to send the message to such a channel, it gets the message back and knows that there are no receivers anymore.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed explanation - I've also found Go is a bit light on actually explaining how you would write correct & non-racy code sticking to the given paradigms.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right! I've also spent some time trying to understand how to do things safely. In Rust, for instance, we always know if there is a sender and if there is a receiver, but in Go, an elegant implementation of this does not work because of garbage collection: the receivers are not deleted immediately after leaving the scope of a function, but only get removed when the GC kicks in which means that the lifetime of the objects is not strictly defined, hence the consequence of us not knowing when the other counterpart of a channel is dead.

}

// Tries to send a message if the channel is not closed.
// Returns the message back if the channel is closed.
func (s *Sender[M]) Send(message M) *M {
if !s.receiverClosed.Load() {
s.channel <- message
return nil
} else {
return &message
}
}

// The receiver counterpart of the channel.
type Receiver[M any] struct {
// The channel itself. It's public, so that we can combine it in `select` statements.
Channel <-chan M
// Atomic variable that indicates whether the channel is closed.
receiverClosed *atomic.Bool
}

// Marks the channel as closed, which means that no messages could be sent via this channel.
// Any attempt to send a message would result in an error. This is similar to closing the
// channel except that we don't close the underlying channel (since in Go receivers can't
// close the channel).
//
// This function reads (in a non-blocking way) all pending messages until blocking. Otherwise,
// they will stay forver in a channel and get lost.
func (r *Receiver[M]) Close() []M {
r.receiverClosed.Store(true)

messages := make([]M, 0)
for {
msg, ok := <-r.Channel
if !ok {
break
}
messages = append(messages, msg)
}

return messages
}
61 changes: 61 additions & 0 deletions pkg/common/message_sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package common

import (
"errors"
"sync/atomic"
)

// MessageSink is a helper struct that allows to send messages to a message sink.
// The MessageSink abstracts the message sink which has a certain sender, so that
// the sender does not have to be specified every time a message is sent.
// At the same it guarantees that the caller can't alter the `sender`, which means that
// the sender can't impersonate another sender (and we guarantee this on a compile-time).
type MessageSink[SenderType comparable, MessageType any] struct {
// The sender of the messages. This is useful for multiple-producer-single-consumer scenarios.
sender SenderType
// The message sink to which the messages are sent.
messageSink chan<- Message[SenderType, MessageType]
// Atomic variable that indicates whether the message sink is sealed.
// This is used to prevent sending messages to a sealed message sink.
// The variable is atomic because it may be accessed from multiple goroutines.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For clarity's sake, could we define what 'sealed' means here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, will add a comment about that! Perhaps I need to rewrite the comment for this variable altogether.

sealed atomic.Bool
}

// Creates a new MessageSink. The function is generic allowing us to use it for multiple use cases.
func NewMessageSink[S comparable, M any](sender S, messageSink chan<- Message[S, M]) *MessageSink[S, M] {
return &MessageSink[S, M]{
sender: sender,
messageSink: messageSink,
}
}

// Sends a message to the message sink.
func (s *MessageSink[S, M]) Send(message M) error {
if s.sealed.Load() {
return errors.New("The channel is sealed, you can't send any messages over it")
}

s.messageSink <- Message[S, M]{
Sender: s.sender,
Content: message,
}

return nil
}

// Seals the channel, which means that no messages could be sent via this channel.
// Any attempt to send a message would result in an error. This is similar to closing the
// channel except that we don't close the underlying channel (since there might be other
// senders that may want to use it).
func (s *MessageSink[S, M]) Seal() {
s.sealed.Store(true)
}

// Messages that are sent from the peer to the conference in order to communicate with other peers.
// Since each peer is isolated from others, it can't influence the state of other peers directly.
type Message[SenderType comparable, MessageType any] struct {
// The sender of the message.
Sender SenderType
// The content of the message.
Content MessageType
}
8 changes: 8 additions & 0 deletions pkg/conference/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package conference

// Configuration for the group conferences (calls).
type Config struct {
// Keep-alive timeout for WebRTC connections. If no keep-alive has been received
// from the client for this duration, the connection is considered dead (in seconds).
KeepAliveTimeout int `yaml:"timeout"`
}
78 changes: 78 additions & 0 deletions pkg/conference/data_channel_message_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package conference

import (
"github.com/pion/webrtc/v3"
"golang.org/x/exp/slices"
"maunium.net/go/mautrix/event"
)

// Handle the `SFUMessage` event from the DataChannel message.
func (c *Conference) processSelectDCMessage(participant *Participant, msg event.SFUMessage) {
participant.logger.Info("Received select request over DC")

// Find tracks based on what we were asked for.
tracks := c.getTracks(msg.Start)

// Let's check if we have all the tracks that we were asked for are there.
// If not, we will list which are not available (later on we must inform participant
// about it unless the participant retries it).
if len(tracks) != len(msg.Start) {
for _, expected := range msg.Start {
found := slices.IndexFunc(tracks, func(track *webrtc.TrackLocalStaticRTP) bool {
return track.StreamID() == expected.StreamID && track.ID() == expected.TrackID
})

if found == -1 {
c.logger.Warnf("Track not found: %s", expected.TrackID)
}
}
}

// Subscribe to the found tracks.
for _, track := range tracks {
if err := participant.peer.SubscribeTo(track); err != nil {
participant.logger.Errorf("Failed to subscribe to track: %v", err)
return
}
}
}

func (c *Conference) processAnswerDCMessage(participant *Participant, msg event.SFUMessage) {
participant.logger.Info("Received SDP answer over DC")

if err := participant.peer.ProcessSDPAnswer(msg.SDP); err != nil {
participant.logger.Errorf("Failed to set SDP answer: %v", err)
return
}
}

func (c *Conference) processPublishDCMessage(participant *Participant, msg event.SFUMessage) {
participant.logger.Info("Received SDP offer over DC")

answer, err := participant.peer.ProcessSDPOffer(msg.SDP)
if err != nil {
participant.logger.Errorf("Failed to set SDP offer: %v", err)
return
}

participant.streamMetadata = msg.Metadata

participant.sendDataChannelMessage(event.SFUMessage{
Op: event.SFUOperationAnswer,
SDP: answer.SDP,
Metadata: c.getAvailableStreamsFor(participant.id),
})
}

func (c *Conference) processUnpublishDCMessage(participant *Participant) {
participant.logger.Info("Received unpublish over DC")
}

func (c *Conference) processAliveDCMessage(participant *Participant) {
participant.peer.ProcessHeartbeat()
}

func (c *Conference) processMetadataDCMessage(participant *Participant, msg event.SFUMessage) {
participant.streamMetadata = msg.Metadata
c.resendMetadataToAllExcept(participant.id)
}
Loading