From 75f2dabe7051316bc972aa6678271c703c16c7e9 Mon Sep 17 00:00:00 2001 From: sterlingdeng Date: Sat, 23 Nov 2024 16:35:44 -0800 Subject: [PATCH] Fixes bug in group accumulator --- pkg/gcc/arrival_group.go | 9 ++++- pkg/gcc/arrival_group_accumulator.go | 24 ++++++----- pkg/gcc/arrival_group_accumulator_test.go | 4 +- pkg/gcc/arrival_group_test.go | 49 +++++++++++++++++++++-- 4 files changed, 70 insertions(+), 16 deletions(-) diff --git a/pkg/gcc/arrival_group.go b/pkg/gcc/arrival_group.go index e0300f1b..5aa1d89d 100644 --- a/pkg/gcc/arrival_group.go +++ b/pkg/gcc/arrival_group.go @@ -16,10 +16,17 @@ type arrivalGroup struct { arrival time.Time } +func newArrivalGroup(a cc.Acknowledgment) arrivalGroup { + return arrivalGroup{ + packets: []cc.Acknowledgment{a}, + departure: a.Departure, + arrival: a.Arrival, + } +} + func (g *arrivalGroup) add(a cc.Acknowledgment) { g.packets = append(g.packets, a) g.arrival = a.Arrival - g.departure = a.Departure } func (g arrivalGroup) String() string { diff --git a/pkg/gcc/arrival_group_accumulator.go b/pkg/gcc/arrival_group_accumulator.go index df170a12..c568ffdc 100644 --- a/pkg/gcc/arrival_group_accumulator.go +++ b/pkg/gcc/arrival_group_accumulator.go @@ -29,7 +29,7 @@ func (a *arrivalGroupAccumulator) run(in <-chan []cc.Acknowledgment, agWriter fu for acks := range in { for _, next := range acks { if !init { - group.add(next) + group = newArrivalGroup(next) init = true continue } @@ -38,11 +38,16 @@ func (a *arrivalGroupAccumulator) run(in <-chan []cc.Acknowledgment, agWriter fu continue } if next.Departure.After(group.departure) { + // A sequence of packets which are sent within a burst_time interval + // constitute a group. if interDepartureTimePkt(group, next) <= a.interDepartureThreshold { group.add(next) continue } + // A Packet which has an inter-arrival time less than burst_time and + // an inter-group delay variation d(i) less than 0 is considered + // being part of the current group of packets. if interArrivalTimePkt(group, next) <= a.interArrivalThreshold && interGroupDelayVariationPkt(group, next) < a.interGroupDelayVariationTreshold { group.add(next) @@ -50,24 +55,23 @@ func (a *arrivalGroupAccumulator) run(in <-chan []cc.Acknowledgment, agWriter fu } agWriter(group) - group = arrivalGroup{} - group.add(next) + group = newArrivalGroup(next) } } } } -func interArrivalTimePkt(a arrivalGroup, b cc.Acknowledgment) time.Duration { - return b.Arrival.Sub(a.arrival) +func interArrivalTimePkt(group arrivalGroup, ack cc.Acknowledgment) time.Duration { + return ack.Arrival.Sub(group.arrival) } -func interDepartureTimePkt(a arrivalGroup, b cc.Acknowledgment) time.Duration { - if len(a.packets) == 0 { +func interDepartureTimePkt(group arrivalGroup, ack cc.Acknowledgment) time.Duration { + if len(group.packets) == 0 { return 0 } - return b.Departure.Sub(a.packets[len(a.packets)-1].Departure) + return ack.Departure.Sub(group.departure) } -func interGroupDelayVariationPkt(a arrivalGroup, b cc.Acknowledgment) time.Duration { - return b.Arrival.Sub(a.arrival) - b.Departure.Sub(a.departure) +func interGroupDelayVariationPkt(group arrivalGroup, ack cc.Acknowledgment) time.Duration { + return ack.Arrival.Sub(group.arrival) - ack.Departure.Sub(group.departure) } diff --git a/pkg/gcc/arrival_group_accumulator_test.go b/pkg/gcc/arrival_group_accumulator_test.go index 7dc1a0ce..be48962a 100644 --- a/pkg/gcc/arrival_group_accumulator_test.go +++ b/pkg/gcc/arrival_group_accumulator_test.go @@ -70,7 +70,7 @@ func TestArrivalGroupAccumulator(t *testing.T) { }, }, arrival: time.Time{}.Add(20 * time.Millisecond), - departure: time.Time{}.Add(3 * time.Millisecond), + departure: time.Time{}, }}, }, { @@ -102,7 +102,7 @@ func TestArrivalGroupAccumulator(t *testing.T) { }, }, arrival: time.Time{}.Add(20 * time.Millisecond), - departure: time.Time{}.Add(3 * time.Millisecond), + departure: time.Time{}.Add(0 * time.Millisecond), }, { packets: []cc.Acknowledgment{ diff --git a/pkg/gcc/arrival_group_test.go b/pkg/gcc/arrival_group_test.go index 6857085d..cf24f5a7 100644 --- a/pkg/gcc/arrival_group_test.go +++ b/pkg/gcc/arrival_group_test.go @@ -71,7 +71,46 @@ func TestArrivalGroup(t *testing.T) { Arrival: time.Time{}.Add(time.Second), }}, arrival: time.Time{}.Add(time.Second), - departure: time.Time{}.Add(time.Second), + departure: time.Time{}, + }, + }, + { + name: "departure time of group is the departure time of the first packet in the group", + acks: []cc.Acknowledgment{{ + SequenceNumber: 0, + Size: 0, + Departure: time.Time{}.Add(27 * time.Millisecond), + Arrival: time.Time{}, + }, { + SequenceNumber: 1, + Size: 1, + Departure: time.Time{}.Add(32 * time.Millisecond), + Arrival: time.Time{}.Add(37 * time.Millisecond), + }, { + SequenceNumber: 2, + Size: 2, + Departure: time.Time{}.Add(50 * time.Millisecond), + Arrival: time.Time{}.Add(56 * time.Millisecond), + }}, + expected: arrivalGroup{ + packets: []cc.Acknowledgment{{ + SequenceNumber: 0, + Size: 0, + Departure: time.Time{}.Add(27 * time.Millisecond), + Arrival: time.Time{}, + }, { + SequenceNumber: 1, + Size: 1, + Departure: time.Time{}.Add(32 * time.Millisecond), + Arrival: time.Time{}.Add(37 * time.Millisecond), + }, { + SequenceNumber: 2, + Size: 2, + Departure: time.Time{}.Add(50 * time.Millisecond), + Arrival: time.Time{}.Add(56 * time.Millisecond), + }}, + arrival: time.Time{}.Add(56 * time.Millisecond), + departure: time.Time{}.Add(27 * time.Millisecond), }, }, } @@ -80,8 +119,12 @@ func TestArrivalGroup(t *testing.T) { tc := tc t.Run(tc.name, func(t *testing.T) { ag := arrivalGroup{} - for _, ack := range tc.acks { - ag.add(ack) + for i, ack := range tc.acks { + if i == 0 { + ag = newArrivalGroup(ack) + } else { + ag.add(ack) + } } assert.Equal(t, tc.expected, ag) })