Skip to content

Commit

Permalink
Merge pull request #52 from matrix-org/refactoring
Browse files Browse the repository at this point in the history
SFU Refactoring
  • Loading branch information
daniel-abramov authored Dec 5, 2022
2 parents 50b12b2 + 537f4c0 commit 2b0597d
Show file tree
Hide file tree
Showing 39 changed files with 1,928 additions and 1,583 deletions.
2 changes: 2 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ linters:
- gomnd # we use status code numbers and for our use case it's not practical
- godox # we have TODOs at this stage of the project, enable in future
- forbidigo # we use things like fmt.Printf for debugging, enable in future
- wsl # somehow this plugin causes more harm than use as it enables lots of things to be configured without causing spaghetti-code (grouping similar things together)
- nlreturn # not always practical, it was disabled before strict lints were introduced, then added, now it's clear why it was disabled at the first place :)
fast: true
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ COPY go.sum ./
# source code do not invalidate our downloaded layer.
RUN go mod download

COPY ./src ./src
COPY ./pkg ./pkg

RUN go build -o /waterfall ./src
RUN go build -o /waterfall ./pkg


##
Expand Down
10 changes: 6 additions & 4 deletions config.yaml.sample
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
homeserverurl: "http://localhost:8008"
userid: "@sfu:shadowfax"
accesstoken: "..."
timeout: 30
matrix:
homeserverurl: "http://localhost:8008"
userid: "@sfu:shadowfax"
accesstoken: "..."
conference:
timeout: 30
10 changes: 6 additions & 4 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ services:
environment:
# Set the `CONFIG` to the configuration you want.
CONFIG: |
homeserverurl: "http://localhost:8008"
userid: "@sfu:shadowfax"
accesstoken: "..."
timeout: 30
matrix:
homeserverurl: "http://localhost:8008"
userid: "@sfu:shadowfax"
accesstoken: "..."
conference:
timeout: 30
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require github.com/pion/webrtc/v3 v3.1.31
require (
github.com/pion/rtcp v1.2.9
github.com/sirupsen/logrus v1.9.0
golang.org/x/exp v0.0.0-20221114191408-850992195362
gopkg.in/yaml.v3 v3.0.1
maunium.net/go/mautrix v0.11.0
)
Expand Down Expand Up @@ -34,7 +35,7 @@ require (
github.com/tidwall/sjson v1.2.4 // indirect
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
golang.org/x/sys v0.1.0 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
)

Expand Down
5 changes: 4 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20221114191408-850992195362 h1:NoHlPRbyl1VFI6FjwHtPQCN7wAMXI6cKcqrmXhOOfBQ=
golang.org/x/exp v0.0.0-20221114191408-850992195362/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
Expand Down Expand Up @@ -134,8 +136,9 @@ golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
70 changes: 70 additions & 0 deletions pkg/common/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package common

import "sync/atomic"

// In Go, unbounded channel means something different than what it means in Rust.
// I.e. unlike Rust, "unbounded" in Go means that the channel has **no buffer**,
// meaning that each attempt to send will block the channel until the receiver
// reads it. Majority of primitives here in `waterfall` are designed under assumption
// that sending is not blocking.
const UnboundedChannelSize = 128

// Creates a new channel, returns two counterparts of it where one can only send and another can only receive.
// Unlike traditional Go channels, these allow the receiver to mark the channel as closed which would then fail
// to send any messages to the channel over `Send“.
func NewChannel[M any]() (Sender[M], Receiver[M]) {
channel := make(chan M, UnboundedChannelSize)
closed := &atomic.Bool{}
sender := Sender[M]{channel, closed}
receiver := Receiver[M]{channel, closed}
return sender, receiver
}

// Sender counterpart of the channel.
type Sender[M any] struct {
// The channel itself.
channel chan<- M
// Atomic variable that indicates whether the channel is closed.
receiverClosed *atomic.Bool
}

// Tries to send a message if the channel is not closed.
// Returns the message back if the channel is closed.
func (s *Sender[M]) Send(message M) *M {
if !s.receiverClosed.Load() {
s.channel <- message
return nil
} else {
return &message
}
}

// The receiver counterpart of the channel.
type Receiver[M any] struct {
// The channel itself. It's public, so that we can combine it in `select` statements.
Channel <-chan M
// Atomic variable that indicates whether the channel is closed.
receiverClosed *atomic.Bool
}

// Marks the channel as closed, which means that no messages could be sent via this channel.
// Any attempt to send a message would result in an error. This is similar to closing the
// channel except that we don't close the underlying channel (since in Go receivers can't
// close the channel).
//
// This function reads (in a non-blocking way) all pending messages until blocking. Otherwise,
// they will stay forver in a channel and get lost.
func (r *Receiver[M]) Close() []M {
r.receiverClosed.Store(true)

messages := make([]M, 0)
for {
msg, ok := <-r.Channel
if !ok {
break
}
messages = append(messages, msg)
}

return messages
}
61 changes: 61 additions & 0 deletions pkg/common/message_sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package common

import (
"errors"
"sync/atomic"
)

// MessageSink is a helper struct that allows to send messages to a message sink.
// The MessageSink abstracts the message sink which has a certain sender, so that
// the sender does not have to be specified every time a message is sent.
// At the same it guarantees that the caller can't alter the `sender`, which means that
// the sender can't impersonate another sender (and we guarantee this on a compile-time).
type MessageSink[SenderType comparable, MessageType any] struct {
// The sender of the messages. This is useful for multiple-producer-single-consumer scenarios.
sender SenderType
// The message sink to which the messages are sent.
messageSink chan<- Message[SenderType, MessageType]
// Atomic variable that indicates whether the message sink is sealed.
// This is used to prevent sending messages to a sealed message sink.
// The variable is atomic because it may be accessed from multiple goroutines.
sealed atomic.Bool
}

// Creates a new MessageSink. The function is generic allowing us to use it for multiple use cases.
func NewMessageSink[S comparable, M any](sender S, messageSink chan<- Message[S, M]) *MessageSink[S, M] {
return &MessageSink[S, M]{
sender: sender,
messageSink: messageSink,
}
}

// Sends a message to the message sink.
func (s *MessageSink[S, M]) Send(message M) error {
if s.sealed.Load() {
return errors.New("The channel is sealed, you can't send any messages over it")
}

s.messageSink <- Message[S, M]{
Sender: s.sender,
Content: message,
}

return nil
}

// Seals the channel, which means that no messages could be sent via this channel.
// Any attempt to send a message would result in an error. This is similar to closing the
// channel except that we don't close the underlying channel (since there might be other
// senders that may want to use it).
func (s *MessageSink[S, M]) Seal() {
s.sealed.Store(true)
}

// Messages that are sent from the peer to the conference in order to communicate with other peers.
// Since each peer is isolated from others, it can't influence the state of other peers directly.
type Message[SenderType comparable, MessageType any] struct {
// The sender of the message.
Sender SenderType
// The content of the message.
Content MessageType
}
8 changes: 8 additions & 0 deletions pkg/conference/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package conference

// Configuration for the group conferences (calls).
type Config struct {
// Keep-alive timeout for WebRTC connections. If no keep-alive has been received
// from the client for this duration, the connection is considered dead (in seconds).
KeepAliveTimeout int `yaml:"timeout"`
}
78 changes: 78 additions & 0 deletions pkg/conference/data_channel_message_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package conference

import (
"github.com/pion/webrtc/v3"
"golang.org/x/exp/slices"
"maunium.net/go/mautrix/event"
)

// Handle the `SFUMessage` event from the DataChannel message.
func (c *Conference) processSelectDCMessage(participant *Participant, msg event.SFUMessage) {
participant.logger.Info("Received select request over DC")

// Find tracks based on what we were asked for.
tracks := c.getTracks(msg.Start)

// Let's check if we have all the tracks that we were asked for are there.
// If not, we will list which are not available (later on we must inform participant
// about it unless the participant retries it).
if len(tracks) != len(msg.Start) {
for _, expected := range msg.Start {
found := slices.IndexFunc(tracks, func(track *webrtc.TrackLocalStaticRTP) bool {
return track.StreamID() == expected.StreamID && track.ID() == expected.TrackID
})

if found == -1 {
c.logger.Warnf("Track not found: %s", expected.TrackID)
}
}
}

// Subscribe to the found tracks.
for _, track := range tracks {
if err := participant.peer.SubscribeTo(track); err != nil {
participant.logger.Errorf("Failed to subscribe to track: %v", err)
return
}
}
}

func (c *Conference) processAnswerDCMessage(participant *Participant, msg event.SFUMessage) {
participant.logger.Info("Received SDP answer over DC")

if err := participant.peer.ProcessSDPAnswer(msg.SDP); err != nil {
participant.logger.Errorf("Failed to set SDP answer: %v", err)
return
}
}

func (c *Conference) processPublishDCMessage(participant *Participant, msg event.SFUMessage) {
participant.logger.Info("Received SDP offer over DC")

answer, err := participant.peer.ProcessSDPOffer(msg.SDP)
if err != nil {
participant.logger.Errorf("Failed to set SDP offer: %v", err)
return
}

participant.streamMetadata = msg.Metadata

participant.sendDataChannelMessage(event.SFUMessage{
Op: event.SFUOperationAnswer,
SDP: answer.SDP,
Metadata: c.getAvailableStreamsFor(participant.id),
})
}

func (c *Conference) processUnpublishDCMessage(participant *Participant) {
participant.logger.Info("Received unpublish over DC")
}

func (c *Conference) processAliveDCMessage(participant *Participant) {
participant.peer.ProcessHeartbeat()
}

func (c *Conference) processMetadataDCMessage(participant *Participant, msg event.SFUMessage) {
participant.streamMetadata = msg.Metadata
c.resendMetadataToAllExcept(participant.id)
}
Loading

0 comments on commit 2b0597d

Please sign in to comment.