Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MM-50294] IPv6 support #106

Merged
merged 2 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,5 @@ jobs:
go mod download
go mod verify
make test
env:
CI: true
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
# Variables

## General Variables

# CI
CI ?= false

# Branch Variables
PROTECTED_BRANCH := master
CURRENT_BRANCH := $(shell git rev-parse --abbrev-ref HEAD)
Expand Down Expand Up @@ -290,6 +294,7 @@ go-test: ## to run tests
$(AT)$(DOCKER) run ${DOCKER_OPTS} \
-v $(PWD):/app -w /app \
-e GOCACHE="/tmp" \
-e CI=${CI} \
$(DOCKER_IMAGE_GO) \
/bin/sh -c \
"cd /app && \
Expand Down
4 changes: 4 additions & 0 deletions config/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ ice_address_tcp = ""
# The TCP port used to route media (audio/screen/video tracks). This is used to
# generate TCP candidates.
ice_port_tcp = 8443
# Enables experimental IPv6 support. When this setting is true the RTC service
# will work in dual-stack mode, listening for IPv6 connections and generating
# candidates in addition to IPv4 ones.
enable_ipv6 = false
# An optional hostname used to override the default value. By default, the
# service will try to guess its own public IP through STUN (if configured).
#
Expand Down
2 changes: 2 additions & 0 deletions service/rtc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type ServerConfig struct {
// A list of ICE server (STUN/TURN) configurations to use.
ICEServers ICEServers `toml:"ice_servers"`
TURNConfig TURNConfig `toml:"turn"`
// EnableIPv6 specifies whether or not IPv6 should be used.
EnableIPv6 bool `toml:"enable_ipv6"`
}

func (c ServerConfig) IsValid() error {
Expand Down
40 changes: 26 additions & 14 deletions service/rtc/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"net"
"net/netip"
"runtime"
"syscall"
"time"
Expand All @@ -22,9 +23,9 @@ const (
tcpSocketWriteBufferSize = 1024 * 1024 * 4 // 4MB
)

// getSystemIPs returns a list of all the available IPv4 addresses.
func getSystemIPs(log mlog.LoggerIFace) ([]string, error) {
var ips []string
// getSystemIPs returns a list of all the available local addresses.
func getSystemIPs(log mlog.LoggerIFace, dualStack bool) ([]netip.Addr, error) {
var ips []netip.Addr

interfaces, err := net.Interfaces()
if err != nil {
Expand All @@ -34,36 +35,43 @@ func getSystemIPs(log mlog.LoggerIFace) ([]string, error) {
for _, iface := range interfaces {
// filter out inactive interfaces
if iface.Flags&net.FlagUp == 0 {
log.Debug("skipping inactive interface", mlog.String("interface", iface.Name))
log.Info("skipping inactive interface", mlog.String("interface", iface.Name))
continue
}

addrs, err := iface.Addrs()
if err != nil {
log.Debug("failed to get addresses for interface", mlog.String("interface", iface.Name))
log.Warn("failed to get addresses for interface", mlog.String("interface", iface.Name))
continue
}

for _, addr := range addrs {
ip, _, err := net.ParseCIDR(addr.String())
prefix, err := netip.ParsePrefix(addr.String())
if err != nil {
log.Debug("failed to parse address", mlog.Err(err), mlog.String("addr", addr.String()))
log.Warn("failed to parse prefix", mlog.Err(err), mlog.String("prefix", prefix.String()))
continue
}

// IPv4 only (for the time being at least, see MM-50294)
if ip.To4() == nil {
ip := prefix.Addr()

if !dualStack && ip.Is6() {
log.Debug("ignoring IPv6 address: dual stack support is disabled by config", mlog.String("addr", ip.String()))
continue
}

if ip.Is6() && !ip.IsGlobalUnicast() {
log.Debug("ignoring non global IPv6 address", mlog.String("addr", ip.String()))
continue
}

ips = append(ips, ip.String())
ips = append(ips, ip)
}
}

return ips, nil
}

func createUDPConnsForAddr(log mlog.LoggerIFace, listenAddress string) ([]net.PacketConn, error) {
func createUDPConnsForAddr(log mlog.LoggerIFace, network, listenAddress string) ([]net.PacketConn, error) {
var conns []net.PacketConn

for i := 0; i < runtime.NumCPU(); i++ {
Expand All @@ -84,7 +92,7 @@ func createUDPConnsForAddr(log mlog.LoggerIFace, listenAddress string) ([]net.Pa
},
}

udpConn, err := listenConfig.ListenPacket(context.Background(), "udp4", listenAddress)
udpConn, err := listenConfig.ListenPacket(context.Background(), network, listenAddress)
if err != nil {
return nil, fmt.Errorf("failed to listen on udp: %w", err)
}
Expand Down Expand Up @@ -132,12 +140,12 @@ func createUDPConnsForAddr(log mlog.LoggerIFace, listenAddress string) ([]net.Pa
return conns, nil
}

func resolveHost(host string, timeout time.Duration) (string, error) {
func resolveHost(host, network string, timeout time.Duration) (string, error) {
var ip string
r := net.Resolver{}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
addrs, err := r.LookupIP(ctx, "ip4", host)
addrs, err := r.LookupIP(ctx, network, host)
if err != nil {
return ip, fmt.Errorf("failed to resolve host %q: %w", host, err)
}
Expand All @@ -146,3 +154,7 @@ func resolveHost(host string, timeout time.Duration) (string, error) {
}
return ip, err
}

func areAddressesSameStack(addrA, addrB netip.Addr) bool {
return (addrA.Is4() && addrB.Is4()) || (addrA.Is6() && addrB.Is6())
}
79 changes: 67 additions & 12 deletions service/rtc/net_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package rtc

import (
"net/netip"
"os"
"runtime"
"testing"

Expand All @@ -20,9 +22,40 @@ func TestGetSystemIPs(t *testing.T) {
require.NoError(t, err)
}()

ips, err := getSystemIPs(log)
require.NoError(t, err)
require.NotEmpty(t, ips)
t.Run("ipv4", func(t *testing.T) {
ips, err := getSystemIPs(log, false)
require.NoError(t, err)
require.NotEmpty(t, ips)

for _, ip := range ips {
require.True(t, ip.Is4())
}
})

t.Run("dual stack", func(t *testing.T) {
// Skipping this test in CI since IPv6 is not yet supported by Github actions.
if os.Getenv("CI") != "" {
t.Skip()
}

ips, err := getSystemIPs(log, true)
require.NoError(t, err)
require.NotEmpty(t, ips)

var hasIPv4 bool
var hasIPv6 bool
for _, ip := range ips {
if ip.Is4() {
hasIPv4 = true
}
if ip.Is6() {
hasIPv6 = true
}
}

require.True(t, hasIPv4)
require.True(t, hasIPv6)
})
}

func TestCreateUDPConnsForAddr(t *testing.T) {
Expand All @@ -33,16 +66,38 @@ func TestCreateUDPConnsForAddr(t *testing.T) {
require.NoError(t, err)
}()

ips, err := getSystemIPs(log)
require.NoError(t, err)
require.NotEmpty(t, ips)
t.Run("IPv4", func(t *testing.T) {
ips, err := getSystemIPs(log, false)
require.NoError(t, err)
require.NotEmpty(t, ips)

for _, ip := range ips {
conns, err := createUDPConnsForAddr(log, ip+":30443")
for _, ip := range ips {
conns, err := createUDPConnsForAddr(log, "udp4", netip.AddrPortFrom(ip, 30443).String())
require.NoError(t, err)
require.Len(t, conns, runtime.NumCPU())
for _, conn := range conns {
require.NoError(t, conn.Close())
}
}
})

t.Run("dual stack", func(t *testing.T) {
// Skipping this test in CI since IPv6 is not yet supported by Github actions.
if os.Getenv("CI") != "" {
t.Skip()
}

ips, err := getSystemIPs(log, false)
require.NoError(t, err)
require.Len(t, conns, runtime.NumCPU())
for _, conn := range conns {
require.NoError(t, conn.Close())
require.NotEmpty(t, ips)

for _, ip := range ips {
conns, err := createUDPConnsForAddr(log, "udp", netip.AddrPortFrom(ip, 30443).String())
require.NoError(t, err)
require.Len(t, conns, runtime.NumCPU())
for _, conn := range conns {
require.NoError(t, conn.Close())
}
}
}
})
}
46 changes: 28 additions & 18 deletions service/rtc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/json"
"fmt"
"net"
"net/netip"
"sync"
"time"

Expand All @@ -32,8 +33,8 @@ type Server struct {

udpMux ice.UDPMux
tcpMux ice.TCPMux
publicAddrsMap map[string]string
localIPs []string
publicAddrsMap map[netip.Addr]string
localIPs []netip.Addr

sendCh chan Message
receiveCh chan Message
Expand Down Expand Up @@ -63,7 +64,7 @@ func NewServer(cfg ServerConfig, log mlog.LoggerIFace, metrics Metrics) (*Server
sendCh: make(chan Message, msgChSize),
receiveCh: make(chan Message, msgChSize),
bufPool: &sync.Pool{New: func() interface{} { return make([]byte, receiveMTU) }},
publicAddrsMap: make(map[string]string),
publicAddrsMap: make(map[netip.Addr]string),
}

return s, nil
Expand All @@ -83,9 +84,16 @@ func (s *Server) ReceiveCh() <-chan Message {
}

func (s *Server) Start() error {
var err error
udpNetwork := "udp4"
tcpNetwork := "tcp4"

localIPs, err := getSystemIPs(s.log)
if s.cfg.EnableIPv6 {
s.log.Info("rtc: experimental IPv6 support enabled")
udpNetwork = "udp"
tcpNetwork = "tcp"
}

localIPs, err := getSystemIPs(s.log, s.cfg.EnableIPv6)
if err != nil {
return fmt.Errorf("failed to get system IPs: %w", err)
}
Expand All @@ -95,34 +103,36 @@ func (s *Server) Start() error {

s.localIPs = localIPs

s.log.Debug("rtc: found local IPs", mlog.Any("ips", s.localIPs))

// Populate public IP addresses map if override is not set and STUN is provided.
if s.cfg.ICEHostOverride == "" && len(s.cfg.ICEServers) > 0 {
for _, ip := range localIPs {
udpListenAddr := fmt.Sprintf("%s:%d", ip, s.cfg.ICEPortUDP)
udpAddr, err := net.ResolveUDPAddr("udp4", udpListenAddr)
udpListenAddr := netip.AddrPortFrom(ip, uint16(s.cfg.ICEPortUDP)).String()
udpAddr, err := net.ResolveUDPAddr(udpNetwork, udpListenAddr)
if err != nil {
s.log.Error("failed to resolve UDP address", mlog.Err(err))
continue
}

// TODO: consider making this logic concurrent to lower total time taken
// in case of multiple interfaces.
addr, err := getPublicIP(udpAddr, s.cfg.ICEServers.getSTUN())
addr, err := getPublicIP(udpAddr, udpNetwork, s.cfg.ICEServers.getSTUN())
if err != nil {
s.log.Warn("failed to get public IP address for local interface", mlog.String("localAddr", ip), mlog.Err(err))
s.log.Warn("failed to get public IP address for local interface", mlog.String("localAddr", ip.String()), mlog.Err(err))
} else {
s.log.Info("got public IP address for local interface", mlog.String("localAddr", ip), mlog.String("remoteAddr", addr))
s.log.Info("got public IP address for local interface", mlog.String("localAddr", ip.String()), mlog.String("remoteAddr", addr))
}

s.publicAddrsMap[ip] = addr
}
}

if err := s.initUDP(localIPs); err != nil {
if err := s.initUDP(localIPs, udpNetwork); err != nil {
return err
}

if err := s.initTCP(); err != nil {
if err := s.initTCP(tcpNetwork); err != nil {
return err
}

Expand Down Expand Up @@ -291,11 +301,11 @@ func (s *Server) msgReader() {
}
}

func (s *Server) initUDP(localIPs []string) error {
func (s *Server) initUDP(localIPs []netip.Addr, network string) error {
var udpMuxes []ice.UDPMux

initUDPMux := func(addr string) error {
conns, err := createUDPConnsForAddr(s.log, addr)
conns, err := createUDPConnsForAddr(s.log, network, addr)
if err != nil {
return fmt.Errorf("failed to create UDP connections: %w", err)
}
Expand All @@ -315,7 +325,7 @@ func (s *Server) initUDP(localIPs []string) error {

// If an address is specified we create a single udp mux.
if s.cfg.ICEAddressUDP != "" {
if err := initUDPMux(fmt.Sprintf("%s:%d", s.cfg.ICEAddressUDP, s.cfg.ICEPortUDP)); err != nil {
if err := initUDPMux(net.JoinHostPort(s.cfg.ICEAddressUDP, fmt.Sprintf("%d", s.cfg.ICEPortUDP))); err != nil {
return err
}
s.udpMux = udpMuxes[0]
Expand All @@ -324,7 +334,7 @@ func (s *Server) initUDP(localIPs []string) error {

// If no address is specified we create a mux for each interface we find.
for _, ip := range localIPs {
if err := initUDPMux(fmt.Sprintf("%s:%d", ip, s.cfg.ICEPortUDP)); err != nil {
if err := initUDPMux(netip.AddrPortFrom(ip, uint16(s.cfg.ICEPortUDP)).String()); err != nil {
return err
}
}
Expand All @@ -334,8 +344,8 @@ func (s *Server) initUDP(localIPs []string) error {
return nil
}

func (s *Server) initTCP() error {
tcpListener, err := net.Listen("tcp4", fmt.Sprintf("%s:%d", s.cfg.ICEAddressTCP, s.cfg.ICEPortTCP))
func (s *Server) initTCP(network string) error {
tcpListener, err := net.Listen(network, net.JoinHostPort(s.cfg.ICEAddressTCP, fmt.Sprintf("%d", s.cfg.ICEPortTCP)))
if err != nil {
return fmt.Errorf("failed to create TCP listener: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion service/rtc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestStartServer(t *testing.T) {
require.NoError(t, err)
defer udpConn.Close()

ips, err := getSystemIPs(log)
ips, err := getSystemIPs(log, false)
require.NoError(t, err)
require.NotEmpty(t, ips)

Expand Down
Loading