From 2d9be9b7bc6a4daf616b1b74c60590986d5dc534 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 16 Sep 2024 23:59:57 +0530 Subject: [PATCH] Add round trip time measurement to candidate pair (#731) * Add round trip time measurement to candidate pair Use the round trip time measurement to populate RTT fields in CandidatePairStats. Atomic and tests * Use int64 nanosecnods to make atomic easier --- agent.go | 6 +++--- agent_stats.go | 6 +++--- agent_test.go | 21 +++++++++++++++++++++ candidatepair.go | 34 ++++++++++++++++++++++++++++++++++ selection.go | 8 ++++++-- 5 files changed, 67 insertions(+), 8 deletions(-) diff --git a/agent.go b/agent.go index c7970a51..b9268b8b 100644 --- a/agent.go +++ b/agent.go @@ -973,16 +973,16 @@ func (a *Agent) invalidatePendingBindingRequests(filterTime time.Time) { // Assert that the passed TransactionID is in our pendingBindingRequests and returns the destination // If the bindingRequest was valid remove it from our pending cache -func (a *Agent) handleInboundBindingSuccess(id [stun.TransactionIDSize]byte) (bool, *bindingRequest) { +func (a *Agent) handleInboundBindingSuccess(id [stun.TransactionIDSize]byte) (bool, *bindingRequest, time.Duration) { a.invalidatePendingBindingRequests(time.Now()) for i := range a.pendingBindingRequests { if a.pendingBindingRequests[i].transactionID == id { validBindingRequest := a.pendingBindingRequests[i] a.pendingBindingRequests = append(a.pendingBindingRequests[:i], a.pendingBindingRequests[i+1:]...) - return true, &validBindingRequest + return true, &validBindingRequest, time.Since(validBindingRequest.timestamp) } } - return false, nil + return false, nil, 0 } // handleInbound processes STUN traffic from a remote candidate diff --git a/agent_stats.go b/agent_stats.go index 035c652b..785e7ff7 100644 --- a/agent_stats.go +++ b/agent_stats.go @@ -29,14 +29,14 @@ func (a *Agent) GetCandidatePairsStats() []CandidatePairStats { // FirstRequestTimestamp time.Time // LastRequestTimestamp time.Time // LastResponseTimestamp time.Time - // TotalRoundTripTime float64 - // CurrentRoundTripTime float64 + TotalRoundTripTime: cp.TotalRoundTripTime(), + CurrentRoundTripTime: cp.CurrentRoundTripTime(), // AvailableOutgoingBitrate float64 // AvailableIncomingBitrate float64 // CircuitBreakerTriggerCount uint32 // RequestsReceived uint64 // RequestsSent uint64 - // ResponsesReceived uint64 + ResponsesReceived: cp.ResponsesReceived(), // ResponsesSent uint64 // RetransmissionsReceived uint64 // RetransmissionsSent uint64 diff --git a/agent_test.go b/agent_test.go index b53c7d0d..5622ec15 100644 --- a/agent_test.go +++ b/agent_test.go @@ -721,6 +721,10 @@ func TestCandidatePairStats(t *testing.T) { p := a.findPair(hostLocal, prflxRemote) p.state = CandidatePairStateFailed + for i := 0; i < 10; i++ { + p.UpdateRoundTripTime(time.Duration(i+1) * time.Second) + } + stats := a.GetCandidatePairsStats() if len(stats) != 4 { t.Fatal("expected 4 candidate pairs stats") @@ -766,6 +770,23 @@ func TestCandidatePairStats(t *testing.T) { t.Fatalf("expected host-prflx pair to have state failed, it has state %s instead", prflxPairStat.State.String()) } + + expectedCurrentRoundTripTime := time.Duration(10) * time.Second + if prflxPairStat.CurrentRoundTripTime != expectedCurrentRoundTripTime.Seconds() { + t.Fatalf("expected current round trip time to be %f, it is %f instead", + expectedCurrentRoundTripTime.Seconds(), prflxPairStat.CurrentRoundTripTime) + } + + expectedTotalRoundTripTime := time.Duration(55) * time.Second + if prflxPairStat.TotalRoundTripTime != expectedTotalRoundTripTime.Seconds() { + t.Fatalf("expected total round trip time to be %f, it is %f instead", + expectedTotalRoundTripTime.Seconds(), prflxPairStat.TotalRoundTripTime) + } + + if prflxPairStat.ResponsesReceived != 10 { + t.Fatalf("expected responses received to be 10, it is %d instead", + prflxPairStat.ResponsesReceived) + } } func TestLocalCandidateStats(t *testing.T) { diff --git a/candidatepair.go b/candidatepair.go index 2139209e..2b27eb18 100644 --- a/candidatepair.go +++ b/candidatepair.go @@ -5,6 +5,8 @@ package ice import ( "fmt" + "sync/atomic" + "time" "github.com/pion/stun/v3" ) @@ -28,6 +30,11 @@ type CandidatePair struct { state CandidatePairState nominated bool nominateOnBindingSuccess bool + + // stats + currentRoundTripTime int64 // in ns + totalRoundTripTime int64 // in ns + responsesReceived uint64 } func (p *CandidatePair) String() string { @@ -100,3 +107,30 @@ func (a *Agent) sendSTUN(msg *stun.Message, local, remote Candidate) { a.log.Tracef("Failed to send STUN message: %s", err) } } + +// UpdateRoundTripTime sets the current round time of this pair and +// accumulates total round trip time and responses received +func (p *CandidatePair) UpdateRoundTripTime(rtt time.Duration) { + rttNs := rtt.Nanoseconds() + atomic.StoreInt64(&p.currentRoundTripTime, rttNs) + atomic.AddInt64(&p.totalRoundTripTime, rttNs) + atomic.AddUint64(&p.responsesReceived, 1) +} + +// CurrentRoundTripTime returns the current round trip time in seconds +// https://www.w3.org/TR/webrtc-stats/#dom-rtcicecandidatepairstats-currentroundtriptime +func (p *CandidatePair) CurrentRoundTripTime() float64 { + return time.Duration(atomic.LoadInt64(&p.currentRoundTripTime)).Seconds() +} + +// TotalRoundTripTime returns the current round trip time in seconds +// https://www.w3.org/TR/webrtc-stats/#dom-rtcicecandidatepairstats-totalroundtriptime +func (p *CandidatePair) TotalRoundTripTime() float64 { + return time.Duration(atomic.LoadInt64(&p.totalRoundTripTime)).Seconds() +} + +// ResponsesReceived returns the total number of connectivity responses received +// https://www.w3.org/TR/webrtc-stats/#dom-rtcicecandidatepairstats-responsesreceived +func (p *CandidatePair) ResponsesReceived() uint64 { + return atomic.LoadUint64(&p.responsesReceived) +} diff --git a/selection.go b/selection.go index f386d51c..d3105301 100644 --- a/selection.go +++ b/selection.go @@ -120,7 +120,7 @@ func (s *controllingSelector) HandleBindingRequest(m *stun.Message, local, remot } func (s *controllingSelector) HandleSuccessResponse(m *stun.Message, local, remote Candidate, remoteAddr net.Addr) { - ok, pendingRequest := s.agent.handleInboundBindingSuccess(m.TransactionID) + ok, pendingRequest, rtt := s.agent.handleInboundBindingSuccess(m.TransactionID) if !ok { s.log.Warnf("Discard message from (%s), unknown TransactionID 0x%x", remote, m.TransactionID) return @@ -149,6 +149,8 @@ func (s *controllingSelector) HandleSuccessResponse(m *stun.Message, local, remo if pendingRequest.isUseCandidate && s.agent.getSelectedPair() == nil { s.agent.setSelectedPair(p) } + + p.UpdateRoundTripTime(rtt) } func (s *controllingSelector) PingCandidate(local, remote Candidate) { @@ -211,7 +213,7 @@ func (s *controlledSelector) HandleSuccessResponse(m *stun.Message, local, remot // request with an appropriate error code response (e.g., 400) // [RFC5389]. - ok, pendingRequest := s.agent.handleInboundBindingSuccess(m.TransactionID) + ok, pendingRequest, rtt := s.agent.handleInboundBindingSuccess(m.TransactionID) if !ok { s.log.Warnf("Discard message from (%s), unknown TransactionID 0x%x", remote, m.TransactionID) return @@ -245,6 +247,8 @@ func (s *controlledSelector) HandleSuccessResponse(m *stun.Message, local, remot s.log.Tracef("Ignore nominate new pair %s, already nominated pair %s", p, selectedPair) } } + + p.UpdateRoundTripTime(rtt) } func (s *controlledSelector) HandleBindingRequest(m *stun.Message, local, remote Candidate) {