Skip to content

Commit

Permalink
added prometheus stats for rtp packets
Browse files Browse the repository at this point in the history
  • Loading branch information
Robin Raymond committed Dec 14, 2020
1 parent 0997471 commit 320b4cf
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 7 deletions.
36 changes: 31 additions & 5 deletions cmd/signal/json-rpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ package main
import (
"flag"
"fmt"
"net"
"net/http"
"os"

"github.com/gorilla/websocket"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sourcegraph/jsonrpc2"
websocketjsonrpc2 "github.com/sourcegraph/jsonrpc2/websocket"
"github.com/spf13/viper"
Expand All @@ -18,11 +20,12 @@ import (
)

var (
conf = sfu.Config{}
file string
cert string
key string
addr string
conf = sfu.Config{}
file string
cert string
key string
addr string
metricsAddr string
)

const (
Expand Down Expand Up @@ -77,6 +80,7 @@ func parse() bool {
flag.StringVar(&cert, "cert", "", "cert file")
flag.StringVar(&key, "key", "", "key file")
flag.StringVar(&addr, "a", ":7000", "address to use")
flag.StringVar(&metricsAddr, "m", ":8100", "merics to use")
help := flag.Bool("h", false, "help info")
flag.Parse()
if !load() {
Expand All @@ -89,6 +93,26 @@ func parse() bool {
return true
}

func startMetrics(addr string) {
// start metrics server
m := http.NewServeMux()
m.Handle("/metrics", promhttp.Handler())
srv := &http.Server{
Handler: m,
}

metricsLis, err := net.Listen("tcp", addr)
if err != nil {
log.Panicf("cannot bind to metrics endpoint %s. err: %s", addr, err)
}
log.Infof("Metrics Listening at %s", addr)

err = srv.Serve(metricsLis)
if err != nil {
log.Errorf("debug server stopped. got err: %s", err)
}
}

func main() {
if !parse() {
showHelp()
Expand Down Expand Up @@ -123,6 +147,8 @@ func main() {
<-jc.DisconnectNotify()
}))

go startMetrics(metricsAddr)

var err error
if key != "" && cert != "" {
log.Infof("Listening at https://[%s]", addr)
Expand Down
32 changes: 32 additions & 0 deletions pkg/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"strings"
"sync/atomic"
"time"
"unsafe"

"github.com/pion/sdp/v3"

Expand Down Expand Up @@ -65,6 +66,15 @@ type Buffer struct {
feedbackTWCC func(sn uint16, timeNS int64, marker bool)
}

type BufferStats struct {
LastExpected uint32
LastReceived uint32
LostRate float32
PacketCount uint32 // Number of packets received from this source.
Jitter float64 // An estimate of the statistical variance of the RTP data packet inter-arrival time.
TotalByte uint64
}

// BufferOptions provides configuration options for the buffer
type Options struct {
BufferTime int
Expand Down Expand Up @@ -272,14 +282,17 @@ func (b *Buffer) onNack(fn func(fb *rtcp.TransportLayerNack)) {
b.pktQueue.onLost = fn
}

// GetMediaSSRC returns the associated SSRC of the RTP stream
func (b *Buffer) GetMediaSSRC() uint32 {
return b.mediaSSRC
}

// GetClockRate returns the RTP clock rate
func (b *Buffer) GetClockRate() uint32 {
return b.clockRate
}

// GetSenderReportData returns the rtp, ntp and nanos of the last sender report
func (b *Buffer) GetSenderReportData() (rtpTime uint32, ntpTime uint64, lastReceivedTimeInNanosSinceEpoch int64) {
rtpTime = atomic.LoadUint32(&b.lastSRRTPTime)
ntpTime = atomic.LoadUint64(&b.lastSRNTPTime)
Expand All @@ -288,17 +301,36 @@ func (b *Buffer) GetSenderReportData() (rtpTime uint32, ntpTime uint64, lastRece
return rtpTime, ntpTime, lastReceivedTimeInNanosSinceEpoch
}

// GetStats returns the raw statistics about a particular buffer state
func (b *Buffer) GetStats() (stats BufferStats) {
stats.LastExpected = atomic.LoadUint32(&b.lastExpected)
stats.LastReceived = atomic.LoadUint32(&b.lastReceived)
raw32Lost := atomic.LoadUint32((*uint32)(unsafe.Pointer(&b.lostRate)))
stats.LostRate = *((*float32)(unsafe.Pointer(&raw32Lost)))
stats.PacketCount = atomic.LoadUint32(&b.packetCount)

raw64Jitter := atomic.LoadUint64((*uint64)(unsafe.Pointer(&b.lostRate)))
stats.Jitter = *((*float64)(unsafe.Pointer(&raw64Jitter)))

stats.TotalByte = atomic.LoadUint64(&b.totalByte)

return stats
}

// GetLatestTimestamp returns the latest RTP timestamp factoring in potential RTP timestamp wrap-around
func (b *Buffer) GetLatestTimestamp() (latestTimestamp uint32, latestTimestampTimeInNanosSinceEpoch int64) {
latestTimestamp = atomic.LoadUint32(&b.latestTimestamp)
latestTimestampTimeInNanosSinceEpoch = atomic.LoadInt64(&b.latestTimestampTime)

return latestTimestamp, latestTimestampTimeInNanosSinceEpoch
}

// IsTimestampWrapAround returns true if wrap around happens from timestamp1 to timestamp2
func IsTimestampWrapAround(timestamp1 uint32, timestamp2 uint32) bool {
return (timestamp1&0xC000000 == 0) && (timestamp2&0xC000000 == 0xC000000)
}

// IsLaterTimestamp returns true if timestamp1 is later in time than timestamp2 factoring in timestamp wrap-around
func IsLaterTimestamp(timestamp1 uint32, timestamp2 uint32) bool {
if timestamp1 > timestamp2 {
if IsTimestampWrapAround(timestamp2, timestamp1) {
Expand Down
114 changes: 114 additions & 0 deletions pkg/stats/interceptor.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,72 @@
package stats

import (
"math"
"sync"

"github.com/pion/interceptor"
"github.com/pion/ion-sfu/pkg/buffer"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/prometheus/client_golang/prometheus"
)

var (
driftBuckets = []float64{5, 10, 20, 40, 80, 160, math.Inf(+1)}

drift = prometheus.NewHistogram(prometheus.HistogramOpts{
Subsystem: "rtp",
Name: "drift_millis",
Buckets: driftBuckets,
})

expectedCount = prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: "rtp",
Name: "expected",
})

receivedCount = prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: "rtp",
Name: "received",
})

packetCount = prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: "rtp",
Name: "packets",
})

totalBytes = prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: "rtp",
Name: "bytes",
})

expectedMinusReceived = prometheus.NewSummary(prometheus.SummaryOpts{
Subsystem: "rtp",
Name: "expected_minus_received",
})

lostRate = prometheus.NewSummary(prometheus.SummaryOpts{
Subsystem: "rtp",
Name: "lostRate",
})

jitter = prometheus.NewSummary(prometheus.SummaryOpts{
Subsystem: "rtp",
Name: "jitter",
})
)

func init() {
prometheus.MustRegister(drift)
prometheus.MustRegister(expectedCount)
prometheus.MustRegister(receivedCount)
prometheus.MustRegister(packetCount)
prometheus.MustRegister(totalBytes)
prometheus.MustRegister(expectedMinusReceived)
prometheus.MustRegister(lostRate)
prometheus.MustRegister(jitter)
}

type Interceptor struct {
sync.RWMutex
bufferInterceptor *buffer.Interceptor
Expand Down Expand Up @@ -76,6 +134,33 @@ func (i *Interceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.
}
}
}
case *rtcp.ReceiverReport:
calculateStats := func(ssrc uint32) {
i.RLock()
defer i.RUnlock()

for _, s := range i.streams {
if s.Buffer.GetMediaSSRC() != ssrc {
continue
}
bufferStats := s.Buffer.GetStats()

hadStats, diffStats := s.updateStats(bufferStats)

if hadStats {
expectedCount.Add(float64(diffStats.LastExpected))
receivedCount.Add(float64(diffStats.LastReceived))
packetCount.Add(float64(diffStats.PacketCount))
totalBytes.Add(float64(diffStats.TotalByte))
}

expectedMinusReceived.Observe(float64(bufferStats.LastExpected - bufferStats.LastReceived))
lostRate.Observe(float64(bufferStats.LostRate))
jitter.Observe(float64(bufferStats.Jitter))
}
}
calculateStats(pkt.SSRC)

case *rtcp.SenderReport:
findRelatedCName := func(ssrc uint32) string {
i.RLock()
Expand Down Expand Up @@ -134,13 +219,42 @@ func (i *Interceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.
}
}

calculateStats := func(ssrc uint32) {
i.RLock()
defer i.RUnlock()

for _, s := range i.streams {
if s.Buffer.GetMediaSSRC() != ssrc {
continue
}

bufferStats := s.Buffer.GetStats()
driftInMillis := s.getDriftInMillis()

hadStats, diffStats := s.updateStats(bufferStats)

drift.Observe(float64(driftInMillis))
if hadStats {
expectedCount.Add(float64(diffStats.LastExpected))
receivedCount.Add(float64(diffStats.LastReceived))
packetCount.Add(float64(diffStats.PacketCount))
totalBytes.Add(float64(diffStats.TotalByte))
}

expectedMinusReceived.Observe(float64(bufferStats.LastExpected - bufferStats.LastReceived))
lostRate.Observe(float64(bufferStats.LostRate))
jitter.Observe(float64(bufferStats.Jitter))
}
}

cname := findRelatedCName(pkt.SSRC)

minPacketNtpTimeInMillisSinceSenderEpoch, maxPacketNtpTimeInMillisSinceSenderEpoch := calculateLatestMinMaxSenderNtpTime(cname)

driftInMillis := maxPacketNtpTimeInMillisSinceSenderEpoch - minPacketNtpTimeInMillisSinceSenderEpoch

setDrift(cname, driftInMillis)
calculateStats(pkt.SSRC)
}
}

Expand Down
32 changes: 30 additions & 2 deletions pkg/stats/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@ import (
log "github.com/pion/ion-log"
)

// Stream contains buffer with statistics
// Stream contains buffer statistics
type Stream struct {
sync.RWMutex
Buffer *buffer.Buffer
cname string
driftInMillis uint64
hasStats bool
lastStats buffer.BufferStats
diffStats buffer.BufferStats
}

// NewBuffer constructs a new Buffer
// NewStream constructs a new Stream
func NewStream(buffer *buffer.Buffer, _ *interceptor.StreamInfo) *Stream {
s := &Stream{
Buffer: buffer,
Expand All @@ -29,6 +32,7 @@ func NewStream(buffer *buffer.Buffer, _ *interceptor.StreamInfo) *Stream {
return s
}

// GetCName returns the cname for a given stream
func (s *Stream) GetCName() string {
s.RLock()
defer s.RUnlock()
Expand All @@ -46,3 +50,27 @@ func (s *Stream) setCName(cname string) {
func (s *Stream) setDriftInMillis(driftInMillis uint64) {
atomic.StoreUint64(&s.driftInMillis, driftInMillis)
}

func (s *Stream) getDriftInMillis() uint64 {
return atomic.LoadUint64(&s.driftInMillis)
}

func (s *Stream) updateStats(stats buffer.BufferStats) (hasDiff bool, diffStats buffer.BufferStats) {
s.Lock()
defer s.Unlock()

hadStats := false

if s.hasStats {
s.diffStats.LastExpected = stats.LastExpected - s.lastStats.LastExpected
s.diffStats.LastReceived = stats.LastReceived - s.lastStats.LastReceived
s.diffStats.PacketCount = stats.PacketCount - s.lastStats.PacketCount
s.diffStats.TotalByte = stats.TotalByte - s.lastStats.TotalByte
hadStats = true
}

s.lastStats = stats
s.hasStats = true

return hadStats, s.diffStats
}

0 comments on commit 320b4cf

Please sign in to comment.