From db5d7ea72b110ed3fbd1843b1a484ce91add933a Mon Sep 17 00:00:00 2001 From: Sean DuBois Date: Thu, 15 Jun 2023 14:40:19 -0400 Subject: [PATCH] Implement Active ICE TCP Candidates Co-authored-by: Steffen Vogel Co-authored-by: Artur Shellunts --- active_tcp.go | 158 ++++++++++++++++++++++++++++++++++ active_tcp_test.go | 210 +++++++++++++++++++++++++++++++++++++++++++++ agent.go | 74 ++++++++++++++-- agent_config.go | 20 +++++ agent_test.go | 11 ++- candidate_base.go | 26 +++++- candidate_test.go | 64 ++++++++++++-- gather_test.go | 5 +- 8 files changed, 547 insertions(+), 21 deletions(-) create mode 100644 active_tcp.go create mode 100644 active_tcp_test.go diff --git a/active_tcp.go b/active_tcp.go new file mode 100644 index 00000000..07383947 --- /dev/null +++ b/active_tcp.go @@ -0,0 +1,158 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +package ice + +import ( + "context" + "io" + "net" + "sync/atomic" + "time" + + "github.com/pion/logging" + "github.com/pion/transport/v2/packetio" +) + +type activeTCPConn struct { + readBuffer, writeBuffer *packetio.Buffer + localAddr, remoteAddr atomic.Value + closed int32 +} + +func newActiveTCPConn(ctx context.Context, localAddress, remoteAddress string, log logging.LeveledLogger) (a *activeTCPConn) { + a = &activeTCPConn{ + readBuffer: packetio.NewBuffer(), + writeBuffer: packetio.NewBuffer(), + } + + laddr, err := getTCPAddrOnInterface(localAddress) + if err != nil { + atomic.StoreInt32(&a.closed, 1) + log.Infof("Failed to dial TCP address %s: %v", remoteAddress, err) + return + } + a.localAddr.Store(laddr) + + go func() { + defer func() { + atomic.StoreInt32(&a.closed, 1) + }() + + dialer := &net.Dialer{ + LocalAddr: laddr, + } + conn, err := dialer.DialContext(ctx, "tcp", remoteAddress) + if err != nil { + log.Infof("Failed to dial TCP address %s: %v", remoteAddress, err) + return + } + + a.remoteAddr.Store(conn.RemoteAddr()) + + go func() { + buff := make([]byte, receiveMTU) + + for atomic.LoadInt32(&a.closed) == 0 { + n, err := readStreamingPacket(conn, buff) + if err != nil { + log.Infof("%v: %s", errReadingStreamingPacket, err) + break + } + + if _, err := a.readBuffer.Write(buff[:n]); err != nil { + log.Infof("%v: %s", errReadingStreamingPacket, err) + break + } + } + }() + + buff := make([]byte, receiveMTU) + + for atomic.LoadInt32(&a.closed) == 0 { + n, err := a.writeBuffer.Read(buff) + if err != nil { + log.Infof("%v: %s", errReadingStreamingPacket, err) + break + } + + if _, err = writeStreamingPacket(conn, buff[:n]); err != nil { + log.Infof("%v: %s", errReadingStreamingPacket, err) + break + } + } + + if err := conn.Close(); err != nil { + log.Infof("%v: %s", errReadingStreamingPacket, err) + } + }() + + return a +} + +func (a *activeTCPConn) ReadFrom(buff []byte) (n int, srcAddr net.Addr, err error) { + if atomic.LoadInt32(&a.closed) == 1 { + return 0, nil, io.ErrClosedPipe + } + + srcAddr = a.RemoteAddr() + n, err = a.readBuffer.Read(buff) + return +} + +func (a *activeTCPConn) WriteTo(buff []byte, _ net.Addr) (n int, err error) { + if atomic.LoadInt32(&a.closed) == 1 { + return 0, io.ErrClosedPipe + } + + return a.writeBuffer.Write(buff) +} + +func (a *activeTCPConn) Close() error { + atomic.StoreInt32(&a.closed, 1) + _ = a.readBuffer.Close() + _ = a.writeBuffer.Close() + return nil +} + +func (a *activeTCPConn) LocalAddr() net.Addr { + if v, ok := a.localAddr.Load().(*net.TCPAddr); ok { + return v + } + + return &net.TCPAddr{} +} + +func (a *activeTCPConn) RemoteAddr() net.Addr { + if v, ok := a.remoteAddr.Load().(*net.TCPAddr); ok { + return v + } + + return &net.TCPAddr{} +} + +func (a *activeTCPConn) SetDeadline(time.Time) error { return io.EOF } +func (a *activeTCPConn) SetReadDeadline(time.Time) error { return io.EOF } +func (a *activeTCPConn) SetWriteDeadline(time.Time) error { return io.EOF } + +func getTCPAddrOnInterface(address string) (*net.TCPAddr, error) { + addr, err := net.ResolveTCPAddr("tcp", address) + if err != nil { + return nil, err + } + + l, err := net.ListenTCP("tcp", addr) + if err != nil { + return nil, err + } + defer func() { + _ = l.Close() + }() + + tcpAddr, ok := l.Addr().(*net.TCPAddr) + if !ok { + return nil, errInvalidAddress + } + + return tcpAddr, nil +} diff --git a/active_tcp_test.go b/active_tcp_test.go new file mode 100644 index 00000000..730db907 --- /dev/null +++ b/active_tcp_test.go @@ -0,0 +1,210 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +//go:build !js +// +build !js + +package ice + +import ( + "net" + "testing" + "time" + + "github.com/pion/logging" + "github.com/pion/transport/v2/stdnet" + "github.com/pion/transport/v2/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func getLocalIPAddress(t *testing.T, networkType NetworkType) net.IP { + net, err := stdnet.NewNet() + require.NoError(t, err) + localIPs, err := localInterfaces(net, nil, nil, []NetworkType{networkType}, false) + require.NoError(t, err) + require.NotEmpty(t, localIPs) + return localIPs[0] +} + +func ipv6Available(t *testing.T) bool { + net, err := stdnet.NewNet() + require.NoError(t, err) + localIPs, err := localInterfaces(net, nil, nil, []NetworkType{NetworkTypeTCP6}, false) + require.NoError(t, err) + return len(localIPs) > 0 +} + +func TestActiveTCP(t *testing.T) { + report := test.CheckRoutines(t) + defer report() + + lim := test.TimeOut(time.Second * 5) + defer lim.Stop() + + const listenPort = 7686 + type testCase struct { + name string + networkTypes []NetworkType + listenIPAddress net.IP + selectedPairNetworkType string + } + + testCases := []testCase{ + { + name: "TCP4 connection", + networkTypes: []NetworkType{NetworkTypeTCP4}, + listenIPAddress: getLocalIPAddress(t, NetworkTypeTCP4), + selectedPairNetworkType: tcp, + }, + { + name: "UDP is preferred over TCP4", // This fails some time + networkTypes: supportedNetworkTypes(), + listenIPAddress: getLocalIPAddress(t, NetworkTypeTCP4), + selectedPairNetworkType: udp, + }, + } + + if ipv6Available(t) { + testCases = append(testCases, + testCase{ + name: "TCP6 connection", + networkTypes: []NetworkType{NetworkTypeTCP6}, + listenIPAddress: getLocalIPAddress(t, NetworkTypeTCP6), + selectedPairNetworkType: tcp, + }, + testCase{ + name: "UDP is preferred over TCP6", // This fails some time + networkTypes: supportedNetworkTypes(), + listenIPAddress: getLocalIPAddress(t, NetworkTypeTCP6), + selectedPairNetworkType: udp, + }, + ) + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + r := require.New(t) + + listener, err := net.ListenTCP("tcp", &net.TCPAddr{ + IP: testCase.listenIPAddress, + Port: listenPort, + }) + r.NoError(err) + defer func() { + _ = listener.Close() + }() + + loggerFactory := logging.NewDefaultLoggerFactory() + + tcpMux := NewTCPMuxDefault(TCPMuxParams{ + Listener: listener, + Logger: loggerFactory.NewLogger("passive-ice-tcp-mux"), + ReadBufferSize: 20, + }) + + defer func() { + _ = tcpMux.Close() + }() + + r.NotNil(tcpMux.LocalAddr(), "tcpMux.LocalAddr() is nil") + + hostAcceptanceMinWait := 100 * time.Millisecond + passiveAgent, err := NewAgent(&AgentConfig{ + TCPMux: tcpMux, + CandidateTypes: []CandidateType{CandidateTypeHost}, + NetworkTypes: testCase.networkTypes, + LoggerFactory: loggerFactory, + IncludeLoopback: true, + HostAcceptanceMinWait: &hostAcceptanceMinWait, + }) + r.NoError(err) + r.NotNil(passiveAgent) + + activeAgent, err := NewAgent(&AgentConfig{ + CandidateTypes: []CandidateType{CandidateTypeHost}, + NetworkTypes: testCase.networkTypes, + LoggerFactory: loggerFactory, + HostAcceptanceMinWait: &hostAcceptanceMinWait, + }) + r.NoError(err) + r.NotNil(activeAgent) + + passiveAgentConn, activeAgenConn := connect(passiveAgent, activeAgent) + r.NotNil(passiveAgentConn) + r.NotNil(activeAgenConn) + + pair := passiveAgent.getSelectedPair() + r.NotNil(pair) + r.Equal(testCase.selectedPairNetworkType, pair.Local.NetworkType().NetworkShort()) + + foo := []byte("foo") + _, err = passiveAgentConn.Write(foo) + r.NoError(err) + + buffer := make([]byte, 1024) + n, err := activeAgenConn.Read(buffer) + r.NoError(err) + r.Equal(foo, buffer[:n]) + + bar := []byte("bar") + _, err = activeAgenConn.Write(bar) + r.NoError(err) + + n, err = passiveAgentConn.Read(buffer) + r.NoError(err) + r.Equal(bar, buffer[:n]) + + r.NoError(activeAgenConn.Close()) + r.NoError(passiveAgentConn.Close()) + }) + } +} + +// Assert that Active TCP connectivity isn't established inside +// the main thread of the Agent +func TestActiveTCP_NonBlocking(t *testing.T) { + report := test.CheckRoutines(t) + defer report() + + lim := test.TimeOut(time.Second * 5) + defer lim.Stop() + + cfg := &AgentConfig{ + NetworkTypes: supportedNetworkTypes(), + } + + aAgent, err := NewAgent(cfg) + if err != nil { + t.Error(err) + } + + bAgent, err := NewAgent(cfg) + if err != nil { + t.Error(err) + } + + isConnected := make(chan interface{}) + err = aAgent.OnConnectionStateChange(func(c ConnectionState) { + if c == ConnectionStateConnected { + close(isConnected) + } + }) + if err != nil { + t.Error(err) + } + + // Add a invalid ice-tcp candidate to each + invalidCandidate, err := UnmarshalCandidate("1052353102 1 tcp 1675624447 192.0.2.1 8080 typ host tcptype passive") + if err != nil { + t.Fatal(err) + } + assert.NoError(t, aAgent.AddRemoteCandidate(invalidCandidate)) + assert.NoError(t, bAgent.AddRemoteCandidate(invalidCandidate)) + + connect(aAgent, bAgent) + + <-isConnected + assert.NoError(t, aAgent.Close()) + assert.NoError(t, bAgent.Close()) +} diff --git a/agent.go b/agent.go index 5350330f..c69ffad2 100644 --- a/agent.go +++ b/agent.go @@ -9,6 +9,7 @@ import ( "context" "fmt" "net" + "strconv" "strings" "sync" "sync/atomic" @@ -72,6 +73,9 @@ type Agent struct { prflxAcceptanceMinWait time.Duration relayAcceptanceMinWait time.Duration + tcpPriorityOffset uint16 + disableActiveTCP bool + portMin uint16 portMax uint16 @@ -315,6 +319,8 @@ func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit insecureSkipVerify: config.InsecureSkipVerify, includeLoopback: config.IncludeLoopback, + + disableActiveTCP: config.DisableActiveTCP, } if a.net == nil { @@ -651,11 +657,9 @@ func (a *Agent) AddRemoteCandidate(c Candidate) error { return nil } - // Cannot check for network yet because it might not be applied - // when mDNS hostname is used. + // TCP Candidates with TCP type active will probe server passive ones, so + // no need to do anything with them. if c.TCPType() == TCPTypeActive { - // TCP Candidates with TCP type active will probe server passive ones, so - // no need to do anything with them. a.log.Infof("Ignoring remote candidate with tcpType active: %s", c) return nil } @@ -678,6 +682,7 @@ func (a *Agent) AddRemoteCandidate(c Candidate) error { go func() { if err := a.run(a.context(), func(ctx context.Context, agent *Agent) { + // nolint: contextcheck agent.addRemoteCandidate(c) }); err != nil { a.log.Warnf("Failed to add remote candidate %s: %v", c.Address(), err) @@ -709,6 +714,7 @@ func (a *Agent) resolveAndAddMulticastCandidate(c *CandidateHost) { } if err = a.run(a.context(), func(ctx context.Context, agent *Agent) { + // nolint: contextcheck agent.addRemoteCandidate(c) }); err != nil { a.log.Warnf("Failed to add mDNS candidate %s: %v", c.Address(), err) @@ -723,6 +729,47 @@ func (a *Agent) requestConnectivityCheck() { } } +func (a *Agent) addRemotePassiveTCPCandidate(remoteCandidate Candidate) { + localIPs, err := localInterfaces(a.net, a.interfaceFilter, a.ipFilter, []NetworkType{remoteCandidate.NetworkType()}, a.includeLoopback) + if err != nil { + a.log.Warnf("Failed to iterate local interfaces, host candidates will not be gathered %s", err) + return + } + + for i := range localIPs { + conn := newActiveTCPConn( + a.context(), + net.JoinHostPort(localIPs[i].String(), "0"), + net.JoinHostPort(remoteCandidate.Address(), strconv.Itoa(remoteCandidate.Port())), + a.log, + ) + + tcpAddr, ok := conn.LocalAddr().(*net.TCPAddr) + if !ok { + closeConnAndLog(conn, a.log, "Failed to create Active ICE-TCP Candidate: %v", errInvalidAddress) + continue + } + + localCandidate, err := NewCandidateHost(&CandidateHostConfig{ + Network: remoteCandidate.NetworkType().String(), + Address: localIPs[i].String(), + Port: tcpAddr.Port, + Component: ComponentRTP, + TCPType: TCPTypeActive, + }) + if err != nil { + closeConnAndLog(conn, a.log, "Failed to create Active ICE-TCP Candidate: %v", err) + continue + } + + localCandidate.start(a, conn, a.startedCh) + a.localCandidates[localCandidate.NetworkType()] = append(a.localCandidates[localCandidate.NetworkType()], localCandidate) + a.chanCandidate <- localCandidate + + a.addPair(localCandidate, remoteCandidate) + } +} + // addRemoteCandidate assumes you are holding the lock (must be execute using a.run) func (a *Agent) addRemoteCandidate(c Candidate) { set := a.remoteCandidates[c.NetworkType()] @@ -733,12 +780,25 @@ func (a *Agent) addRemoteCandidate(c Candidate) { } } + tcpNetworkTypeFound := false + for _, networkType := range a.networkTypes { + if networkType.IsTCP() { + tcpNetworkTypeFound = true + } + } + + if !a.disableActiveTCP && tcpNetworkTypeFound && c.TCPType() == TCPTypePassive { + a.addRemotePassiveTCPCandidate(c) + } + set = append(set, c) a.remoteCandidates[c.NetworkType()] = set - if localCandidates, ok := a.localCandidates[c.NetworkType()]; ok { - for _, localCandidate := range localCandidates { - a.addPair(localCandidate, c) + if c.TCPType() != TCPTypePassive { + if localCandidates, ok := a.localCandidates[c.NetworkType()]; ok { + for _, localCandidate := range localCandidates { + a.addPair(localCandidate, c) + } } } diff --git a/agent_config.go b/agent_config.go index f5897cd4..9629fd5a 100644 --- a/agent_config.go +++ b/agent_config.go @@ -41,6 +41,10 @@ const ( // defaultMaxBindingRequests is the maximum number of binding requests before considering a pair failed defaultMaxBindingRequests = 7 + // TCPPriorityOffset is a number which is subtracted from the default (UDP) candidate type preference + // for host, srflx and prfx candidate types. + defaultTCPPriorityOffset = 27 + // maxBufferSize is the number of bytes that can be buffered before we start to error maxBufferSize = 1000 * 1000 // 1MB @@ -174,6 +178,16 @@ type AgentConfig struct { // Include loopback addresses in the candidate list. IncludeLoopback bool + + // TCPPriorityOffset is a number which is subtracted from the default (UDP) candidate type preference + // for host, srflx and prfx candidate types. It helps to configure relative preference of UDP candidates + // against TCP ones. Relay candidates for TCP and UDP are always 0 and not affected by this setting. + // When this is nil, defaultTCPPriorityOffset is used. + TCPPriorityOffset *uint16 + + // DisableActiveTCP can be used to disable Active TCP candidates. Otherwise when TCP is enabled + // Active TCP candidates will be created when a new passive TCP remote candidate is added. + DisableActiveTCP bool } // initWithDefaults populates an agent and falls back to defaults if fields are unset @@ -208,6 +222,12 @@ func (config *AgentConfig) initWithDefaults(a *Agent) { a.relayAcceptanceMinWait = *config.RelayAcceptanceMinWait } + if config.TCPPriorityOffset == nil { + a.tcpPriorityOffset = defaultTCPPriorityOffset + } else { + a.tcpPriorityOffset = *config.TCPPriorityOffset + } + if config.DisconnectedTimeout == nil { a.disconnectedTimeout = defaultDisconnectedTimeout } else { diff --git a/agent_test.go b/agent_test.go index ba24f533..6374cae7 100644 --- a/agent_test.go +++ b/agent_test.go @@ -9,7 +9,6 @@ package ice import ( "context" "errors" - "fmt" "net" "strconv" "sync" @@ -87,6 +86,7 @@ func TestHandlePeerReflexive(t *testing.T) { t.Fatal(err) } + // nolint: contextcheck a.handleInbound(msg, local, remote) // Length of remote candidate list must be one now @@ -134,6 +134,7 @@ func TestHandlePeerReflexive(t *testing.T) { remote := &BadAddr{} + // nolint: contextcheck a.handleInbound(nil, local, remote) if len(a.remoteCandidates) != 0 { @@ -173,6 +174,7 @@ func TestHandlePeerReflexive(t *testing.T) { t.Fatal(err) } + // nolint: contextcheck a.handleInbound(msg, local, remote) if len(a.remoteCandidates) != 0 { t.Fatal("unknown remote was able to create a candidate") @@ -440,6 +442,7 @@ func TestInboundValidity(t *testing.T) { err = a.run(context.Background(), func(ctx context.Context, a *Agent) { a.selector = &controllingSelector{agent: a, log: a.log} + // nolint: contextcheck a.handleInbound(buildMsg(stun.ClassRequest, a.localUfrag+":"+a.remoteUfrag, a.localPwd), local, remote) if len(a.remoteCandidates) != 1 { t.Fatal("Binding with valid values was unable to create prflx candidate") @@ -462,6 +465,7 @@ func TestInboundValidity(t *testing.T) { t.Fatal(err) } + // nolint: contextcheck a.handleInbound(msg, local, remote) if len(a.remoteCandidates) != 1 { t.Fatal("Binding with valid values (but no fingerprint) was unable to create prflx candidate") @@ -1507,7 +1511,6 @@ func TestLiteLifecycle(t *testing.T) { bFailed := make(chan interface{}) require.NoError(t, bAgent.OnConnectionStateChange(func(c ConnectionState) { - fmt.Println(c) switch c { case ConnectionStateConnected: close(bConnected) @@ -1637,7 +1640,7 @@ func TestAcceptAggressiveNomination(t *testing.T) { KeepaliveInterval := time.Hour cfg0 := &AgentConfig{ - NetworkTypes: supportedNetworkTypes(), + NetworkTypes: []NetworkType{NetworkTypeUDP4, NetworkTypeUDP6}, MulticastDNSMode: MulticastDNSModeDisabled, Net: net0, @@ -1652,7 +1655,7 @@ func TestAcceptAggressiveNomination(t *testing.T) { require.NoError(t, aAgent.OnConnectionStateChange(aNotifier)) cfg1 := &AgentConfig{ - NetworkTypes: supportedNetworkTypes(), + NetworkTypes: []NetworkType{NetworkTypeUDP4, NetworkTypeUDP6}, MulticastDNSMode: MulticastDNSModeDisabled, Net: net1, KeepaliveInterval: &KeepaliveInterval, diff --git a/candidate_base.go b/candidate_base.go index 8e551d8f..bbe04eb5 100644 --- a/candidate_base.go +++ b/candidate_base.go @@ -268,6 +268,7 @@ func (c *candidateBase) handleInboundPacket(buf []byte, srcAddr net.Addr) { } if err := a.run(c, func(ctx context.Context, a *Agent) { + // nolint: contextcheck a.handleInbound(m, c, srcAddr) }); err != nil { a.log.Warnf("Failed to handle message: %v", err) @@ -343,7 +344,27 @@ func (c *candidateBase) writeTo(raw []byte, dst Candidate) (int, error) { return n, nil } +// TypePreference returns the type preference for this candidate +func (c *candidateBase) TypePreference() uint16 { + pref := c.Type().Preference() + if pref == 0 { + return 0 + } + + if c.NetworkType().IsTCP() { + var tcpPriorityOffset uint16 = defaultTCPPriorityOffset + if c.agent() != nil { + tcpPriorityOffset = c.agent().tcpPriorityOffset + } + + pref -= tcpPriorityOffset + } + + return pref +} + // Priority computes the priority for this ICE Candidate +// See: https://www.rfc-editor.org/rfc/rfc8445#section-5.1.2.1 func (c *candidateBase) Priority() uint32 { if c.priorityOverride != 0 { return c.priorityOverride @@ -355,9 +376,10 @@ func (c *candidateBase) Priority() uint32 { // candidates for a particular component for a particular data stream // that have the same type, the local preference MUST be unique for each // one. - return (1<<24)*uint32(c.Type().Preference()) + + + return (1<<24)*uint32(c.TypePreference()) + (1<<8)*uint32(c.LocalPreference()) + - uint32(256-c.Component()) + (1<<0)*uint32(256-c.Component()) } // Equal is used to compare two candidateBases diff --git a/candidate_test.go b/candidate_test.go index d668936c..9fb7ebc5 100644 --- a/candidate_test.go +++ b/candidate_test.go @@ -13,6 +13,58 @@ import ( "github.com/stretchr/testify/require" ) +func TestCandidateTypePreference(t *testing.T) { + r := require.New(t) + + hostDefaultPreference := uint16(126) + prflxDefaultPreference := uint16(110) + srflxDefaultPreference := uint16(100) + relayDefaultPreference := uint16(0) + + tcpOffsets := []uint16{0, 10} + + for _, tcpOffset := range tcpOffsets { + agent := &Agent{ + tcpPriorityOffset: tcpOffset, + } + + for _, networkType := range supportedNetworkTypes() { + hostCandidate := candidateBase{ + candidateType: CandidateTypeHost, + networkType: networkType, + currAgent: agent, + } + prflxCandidate := candidateBase{ + candidateType: CandidateTypePeerReflexive, + networkType: networkType, + currAgent: agent, + } + srflxCandidate := candidateBase{ + candidateType: CandidateTypeServerReflexive, + networkType: networkType, + currAgent: agent, + } + relayCandidate := candidateBase{ + candidateType: CandidateTypeRelay, + networkType: networkType, + currAgent: agent, + } + + if networkType.IsTCP() { + r.Equal(hostDefaultPreference-tcpOffset, hostCandidate.TypePreference()) + r.Equal(prflxDefaultPreference-tcpOffset, prflxCandidate.TypePreference()) + r.Equal(srflxDefaultPreference-tcpOffset, srflxCandidate.TypePreference()) + } else { + r.Equal(hostDefaultPreference, hostCandidate.TypePreference()) + r.Equal(prflxDefaultPreference, prflxCandidate.TypePreference()) + r.Equal(srflxDefaultPreference, srflxCandidate.TypePreference()) + } + + r.Equal(relayDefaultPreference, relayCandidate.TypePreference()) + } + } +} + func TestCandidatePriority(t *testing.T) { for _, test := range []struct { Candidate Candidate @@ -36,7 +88,7 @@ func TestCandidatePriority(t *testing.T) { tcpType: TCPTypeActive, }, }, - WantPriority: 2128609279, + WantPriority: 1675624447, }, { Candidate: &CandidateHost{ @@ -47,7 +99,7 @@ func TestCandidatePriority(t *testing.T) { tcpType: TCPTypePassive, }, }, - WantPriority: 2124414975, + WantPriority: 1671430143, }, { Candidate: &CandidateHost{ @@ -58,7 +110,7 @@ func TestCandidatePriority(t *testing.T) { tcpType: TCPTypeSimultaneousOpen, }, }, - WantPriority: 2120220671, + WantPriority: 1667235839, }, { Candidate: &CandidatePeerReflexive{ @@ -78,7 +130,7 @@ func TestCandidatePriority(t *testing.T) { tcpType: TCPTypeSimultaneousOpen, }, }, - WantPriority: 1860173823, + WantPriority: 1407188991, }, { Candidate: &CandidatePeerReflexive{ @@ -89,7 +141,7 @@ func TestCandidatePriority(t *testing.T) { tcpType: TCPTypeActive, }, }, - WantPriority: 1855979519, + WantPriority: 1402994687, }, { Candidate: &CandidatePeerReflexive{ @@ -100,7 +152,7 @@ func TestCandidatePriority(t *testing.T) { tcpType: TCPTypePassive, }, }, - WantPriority: 1851785215, + WantPriority: 1398800383, }, { Candidate: &CandidateServerReflexive{ diff --git a/gather_test.go b/gather_test.go index 9baf11b7..ecafd509 100644 --- a/gather_test.go +++ b/gather_test.go @@ -675,7 +675,7 @@ func TestMultiUDPMuxUsage(t *testing.T) { } a, err := NewAgent(&AgentConfig{ - NetworkTypes: supportedNetworkTypes(), + NetworkTypes: []NetworkType{NetworkTypeUDP4, NetworkTypeUDP6}, CandidateTypes: []CandidateType{CandidateTypeHost}, UDPMux: NewMultiUDPMuxDefault(udpMuxInstances...), }) @@ -751,7 +751,8 @@ func TestMultiTCPMuxUsage(t *testing.T) { portFound := make(map[int]bool) for c := range candidateCh { - if c.NetworkType().IsTCP() { + activeCandidate := c.Port() == 0 + if c.NetworkType().IsTCP() && !activeCandidate { portFound[c.Port()] = true } }