From 7ae23aead5deebd45ca99d64fcccde4ddee7b297 Mon Sep 17 00:00:00 2001 From: OrlandoCo Date: Tue, 5 Jan 2021 22:08:55 -0600 Subject: [PATCH] feat(turn): Add turn server (#362) * feat(turn): Add turn server * feat(turn): Fix merge conflicts * Tidy port range setting Co-authored-by: Tarrence van As * Add TCP listener * Add TLS/DTLS listener * Set fixed relay port range on turn enabled * Disable turn server by default Co-authored-by: Tarrence van As --- config.toml | 14 +++++ go.mod | 2 + pkg/sfu/sfu.go | 37 +++++++----- pkg/sfu/turn.go | 152 ++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 192 insertions(+), 13 deletions(-) create mode 100644 pkg/sfu/turn.go diff --git a/config.toml b/config.toml index 02e21e777..299d1ec6e 100644 --- a/config.toml +++ b/config.toml @@ -52,5 +52,19 @@ sdpsemantics = "unified-plan" # nat1to1 = ["1.2.3.4"] # icelite = true +[turn] +# Enables embeded turn server +enabled = false +# Sets the realm for turn server +realm ="ion" +# The address the TURN server will listen on. +address= "0.0.0.0:3478" +# Sets the credentials pairs +credentials= "pion=ion,pion2=ion2" +# Certs path to config tls/dtls +# cert="path/to/cert.pem" +# key="path/to/key.pem" + + [log] level = "trace" diff --git a/go.mod b/go.mod index 82c36bc02..a2a55bfcf 100644 --- a/go.mod +++ b/go.mod @@ -11,11 +11,13 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/improbable-eng/grpc-web v0.13.0 github.com/lucsky/cuid v1.0.2 + github.com/pion/dtls/v2 v2.0.4 github.com/pion/ion-log v1.0.0 github.com/pion/rtcp v1.2.6 github.com/pion/rtp v1.6.2 github.com/pion/sdp/v3 v3.0.3 github.com/pion/transport v0.12.2 + github.com/pion/turn/v2 v2.0.5 github.com/pion/webrtc/v3 v3.0.3 github.com/prometheus/client_golang v1.9.0 github.com/rs/cors v1.7.0 // indirect diff --git a/pkg/sfu/sfu.go b/pkg/sfu/sfu.go index 2db5c566c..e759c284c 100644 --- a/pkg/sfu/sfu.go +++ b/pkg/sfu/sfu.go @@ -6,13 +6,11 @@ import ( "sync" "time" - "github.com/pion/ion-sfu/pkg/stats" - + log "github.com/pion/ion-log" "github.com/pion/ion-sfu/pkg/buffer" - + "github.com/pion/ion-sfu/pkg/stats" + "github.com/pion/turn/v2" "github.com/pion/webrtc/v3" - - log "github.com/pion/ion-log" ) // ICEServerConfig defines parameters for ice servers @@ -51,6 +49,7 @@ type Config struct { WebRTC WebRTCConfig `mapstructure:"webrtc"` Log log.Config `mapstructure:"log"` Router RouterConfig `mapstructure:"router"` + Turn TurnConfig `mapstructure:"turn"` } var ( @@ -60,9 +59,10 @@ var ( // SFU represents an sfu instance type SFU struct { + sync.RWMutex webrtc WebRTCTransportConfig router RouterConfig - mu sync.RWMutex + turn *turn.Server sessions map[string]*Session withStats bool } @@ -73,7 +73,10 @@ func NewWebRTCTransportConfig(c Config) WebRTCTransportConfig { var icePortStart, icePortEnd uint16 - if len(c.WebRTC.ICEPortRange) == 2 { + if c.Turn.Enabled { + icePortStart = sfuMinPort + icePortEnd = sfuMaxPort + } else if len(c.WebRTC.ICEPortRange) == 2 { icePortStart = c.WebRTC.ICEPortRange[0] icePortEnd = c.WebRTC.ICEPortRange[1] } @@ -152,6 +155,14 @@ func NewSFU(c Config) *SFU { withStats: c.Router.WithStats, } + if c.Turn.Enabled { + ts, err := initTurnServer(c.Turn, nil) + if err != nil { + log.Panicf("Could not init turn server err: %v", err) + } + s.turn = ts + } + runtime.KeepAlive(ballast) return s } @@ -161,18 +172,18 @@ func (s *SFU) newSession(id string) *Session { session := NewSession(id) session.OnClose(func() { - s.mu.Lock() + s.Lock() delete(s.sessions, id) - s.mu.Unlock() + s.Unlock() if s.withStats { stats.Sessions.Dec() } }) - s.mu.Lock() + s.Lock() s.sessions[id] = session - s.mu.Unlock() + s.Unlock() if s.withStats { stats.Sessions.Inc() @@ -183,8 +194,8 @@ func (s *SFU) newSession(id string) *Session { // GetSession by id func (s *SFU) getSession(id string) *Session { - s.mu.RLock() - defer s.mu.RUnlock() + s.RLock() + defer s.RUnlock() return s.sessions[id] } diff --git a/pkg/sfu/turn.go b/pkg/sfu/turn.go new file mode 100644 index 000000000..38abd93f4 --- /dev/null +++ b/pkg/sfu/turn.go @@ -0,0 +1,152 @@ +package sfu + +import ( + "context" + "crypto/rand" + "crypto/tls" + "net" + "regexp" + "strconv" + "strings" + "time" + + "github.com/pion/dtls/v2" + log "github.com/pion/ion-log" + "github.com/pion/turn/v2" +) + +const ( + turnMinPort = 32768 + turnMaxPort = 46883 + sfuMinPort = 46884 + sfuMaxPort = 60999 +) + +// WebRTCConfig defines parameters for ice +type TurnConfig struct { + Enabled bool `mapstructure:"enabled"` + Realm string `mapstructure:"realm"` + Address string `mapstructure:"address"` + Credentials string `mapstructure:"credentials"` + Cert string `mapstructure:"cert"` + Key string `mapstructure:"key"` +} + +func initTurnServer(conf TurnConfig, auth func(username, real string, srcAddr net.Addr) ([]byte, bool)) (*turn.Server, error) { + var listeners []turn.ListenerConfig + + // Create a UDP listener to pass into pion/turn + // pion/turn itself doesn't allocate any UDP sockets, but lets the user pass them in + // this allows us to add logging, storage or modify inbound/outbound traffic + udpListener, err := net.ListenPacket("udp4", conf.Address) + if err != nil { + return nil, err + } + // Create a TCP listener to pass into pion/turn + // pion/turn itself doesn't allocate any TCP listeners, but lets the user pass them in + // this allows us to add logging, storage or modify inbound/outbound traffic + tcpListener, err := net.Listen("tcp4", conf.Address) + if err != nil { + return nil, err + } + + addr := strings.Split(conf.Address, ":") + + if len(conf.Cert) > 0 && len(conf.Key) > 0 { + // Create a TLS listener to pass into pion/turn + cert, err := tls.LoadX509KeyPair(conf.Cert, conf.Key) + if err != nil { + return nil, err + } + config := tls.Config{Certificates: []tls.Certificate{cert}} + config.Rand = rand.Reader + tlsListener, err := tls.Listen("tcp4", conf.Address, &config) + if err != nil { + return nil, err + } + listeners = append(listeners, turn.ListenerConfig{ + Listener: tlsListener, + RelayAddressGenerator: &turn.RelayAddressGeneratorPortRange{ + RelayAddress: net.ParseIP(addr[0]), + Address: "0.0.0.0", + MinPort: turnMinPort, + MaxPort: turnMaxPort, + }, + }) + // Create a DTLS listener to pass into pion/turn + ctx := context.Background() + dtlsConf := &dtls.Config{ + Certificates: []tls.Certificate{cert}, + ExtendedMasterSecret: dtls.RequireExtendedMasterSecret, + // Create timeout context for accepted connection. + ConnectContextMaker: func() (context.Context, func()) { + return context.WithTimeout(ctx, 30*time.Second) + }, + } + port, err := strconv.ParseInt(addr[1], 10, 64) + if err != nil { + return nil, err + } + a := &net.UDPAddr{IP: net.ParseIP(addr[0]), Port: int(port)} + dtlsListener, err := dtls.Listen("udp4", a, dtlsConf) + if err != nil { + return nil, err + } + listeners = append(listeners, turn.ListenerConfig{ + Listener: dtlsListener, + RelayAddressGenerator: &turn.RelayAddressGeneratorPortRange{ + RelayAddress: net.ParseIP(addr[0]), + Address: "0.0.0.0", + MinPort: turnMinPort, + MaxPort: turnMaxPort, + }, + }) + } + + if auth == nil { + usersMap := map[string][]byte{} + for _, kv := range regexp.MustCompile(`(\w+)=(\w+)`).FindAllStringSubmatch(conf.Credentials, -1) { + usersMap[kv[1]] = turn.GenerateAuthKey(kv[1], conf.Realm, kv[2]) + } + if len(usersMap) == 0 { + log.Panicf("No turn auth provided") + } + auth = func(username string, realm string, srcAddr net.Addr) ([]byte, bool) { + if key, ok := usersMap[username]; ok { + return key, true + } + return nil, false + } + } + + return turn.NewServer(turn.ServerConfig{ + Realm: conf.Realm, + // Set AuthHandler callback + // This is called everytime a user tries to authenticate with the TURN server + // Return the key for that user, or false when no user is found + AuthHandler: auth, + // ListenerConfig is a list of Listeners and the configuration around them + ListenerConfigs: append(listeners, turn.ListenerConfig{ + Listener: tcpListener, + RelayAddressGenerator: &turn.RelayAddressGeneratorPortRange{ + RelayAddress: net.ParseIP(addr[0]), + Address: "0.0.0.0", + MinPort: turnMinPort, + MaxPort: turnMaxPort, + }, + }, + ), + // PacketConnConfigs is a list of UDP Listeners and the configuration around them + PacketConnConfigs: []turn.PacketConnConfig{ + { + PacketConn: udpListener, + RelayAddressGenerator: &turn.RelayAddressGeneratorPortRange{ + RelayAddress: net.ParseIP(addr[0]), + Address: "0.0.0.0", + MinPort: turnMinPort, + MaxPort: turnMaxPort, + }, + }, + }, + }) +}