Skip to content

Commit

Permalink
feat: add mad filter
Browse files Browse the repository at this point in the history
  • Loading branch information
b4nst committed Sep 4, 2024
1 parent 9cdd5be commit a44f916
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 10 deletions.
2 changes: 0 additions & 2 deletions pkg/recorder/record.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package recorder

import (
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -61,7 +60,6 @@ func (r *Record) Stats() []*Stat {
rtt += packets[i].Rtt
localbw := float64(packets[i].Nbytes-packets[i-1].Nbytes) / (packets[i].Rtt - packets[i-1].Rtt).Seconds()
locallat := packets[i].Rtt.Seconds() - (float64(packets[i].Nbytes) / localbw)
fmt.Println(localbw, locallat)

bandwidth += localbw
latency += locallat
Expand Down
44 changes: 39 additions & 5 deletions pkg/recorder/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package recorder

import (
"fmt"
"math"
"time"

"github.com/dustin/go-humanize"
"github.com/montanaflynn/stats"
)

Expand All @@ -19,29 +21,61 @@ type Stat struct {
Seq int
}

func (s *Stat) String() string {
return fmt.Sprintf("Latency: %s, Rtt: %s, Bandwidth: %s/s", s.Latency, s.Rtt, humanize.Bytes(uint64(s.Bandwidth)))
}

func ProcessStats(stats []*Stat) (*Stat, error) {
sanitized := Sanitize(stats)
fmt.Println(sanitized)
// Apply IQR filter
sanitized, err := IQRFilter(sanitized)
sanitized, err := MADFilter(sanitized)
if err != nil {
return nil, err
}
fmt.Println(sanitized)
// Return average
return Average(sanitized), nil
}

func Sanitize(stats []*Stat) []*Stat {
sanitized := make([]*Stat, 0, len(stats))
for _, stat := range stats {
if stat.Bandwidth > 0 {
if stat.Bandwidth >= 0 {
sanitized = append(sanitized, stat)
}
}
return sanitized
}

func MADFilter(s []*Stat) ([]*Stat, error) {
bandwidths := make(stats.Float64Data, 0, len(s))
for _, stat := range s {
bandwidths = append(bandwidths, stat.Bandwidth)
}

mad, err := stats.MedianAbsoluteDeviation(bandwidths)
if err != nil {
return nil, err
}
if mad == 0 { // If all bandwidths are identical, return the input unchanged
return s, nil
}

median, err := bandwidths.Median()
if err != nil {
return nil, err
}
zthresh := 3.5
adjust := 0.6745

filtered := make([]*Stat, 0, len(s))
for _, stat := range s {
zscore := adjust * math.Abs(stat.Bandwidth-median) / mad
if zscore < zthresh {
filtered = append(filtered, stat)
}
}
return filtered, nil
}

func IQRFilter(s []*Stat) ([]*Stat, error) {
bandwidths := make(stats.Float64Data, 0, len(s))
for _, stat := range s {
Expand Down
50 changes: 47 additions & 3 deletions pkg/recorder/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,52 @@ func TestIQRFilter(t *testing.T) {
})
}

func TestMADFilter(t *testing.T) {
t.Parallel()

t.Run("no outliers", func(t *testing.T) {
t.Parallel()
stats := []*Stat{
{Bandwidth: 1.0},
{Bandwidth: 2.0},
{Bandwidth: 3.0},
{Bandwidth: 4.0},
{Bandwidth: 5.0},
}

filtered, err := MADFilter(stats)
assert.NoError(t, err)
assert.ElementsMatch(t, stats, filtered)
})

t.Run("outliers", func(t *testing.T) {
t.Parallel()
stats := []*Stat{
{Bandwidth: 1.0},
{Bandwidth: 31.0},
{Bandwidth: 30.0},
{Bandwidth: 35.0},
{Bandwidth: 29.0},
{Bandwidth: 100.0},
}

filtered, err := MADFilter(stats)
assert.NoError(t, err)
assert.Len(t, filtered, len(stats)-2)
assert.NotContains(t, filtered, &Stat{Bandwidth: 1.0})
assert.NotContains(t, filtered, &Stat{Bandwidth: 100.0})
})

t.Run("error", func(t *testing.T) {
t.Parallel()
stats := []*Stat{}
_, err := MADFilter(stats)
if ok := assert.Error(t, err); ok {
assert.EqualError(t, err, "Input must not be empty.")
}
})
}

func TestSanitize(t *testing.T) {
t.Parallel()
stats := []*Stat{
Expand All @@ -80,8 +126,7 @@ func TestSanitize(t *testing.T) {
}

sanitized := Sanitize(stats)
assert.Len(t, sanitized, 3)
assert.NotContains(t, sanitized, &Stat{Bandwidth: 0.0})
assert.Len(t, sanitized, 4)
assert.NotContains(t, sanitized, &Stat{Bandwidth: -1.0})
}

Expand All @@ -99,7 +144,6 @@ func TestProcessStats(t *testing.T) {
t.Run("stats", func(t *testing.T) {
stats := []*Stat{
{Bandwidth: -30.0},
{Bandwidth: 0.0},
{Bandwidth: 1.0},
{Bandwidth: 100.0},
{Bandwidth: 30.0, Latency: 1 * time.Second, Rtt: 1 * time.Second, Seq: 1},
Expand Down

0 comments on commit a44f916

Please sign in to comment.