Skip to content

Commit

Permalink
Lint fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcoPolo committed Apr 3, 2024
1 parent 18610c6 commit 6772f59
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 137 deletions.
2 changes: 1 addition & 1 deletion association.go
Original file line number Diff line number Diff line change
Expand Up @@ -1124,7 +1124,7 @@ func (a *Association) SRTT() float64 {
// getMaxTSNOffset returns the maximum offset over the current cummulative TSN that
// we are willing to enqueue. Limiting the maximum offset limits the number of
// tsns we have in the payloadQueue map. This ensures that we don't use too much space in
// the map itself. This also ensures that we keep the bytes utilised in the receive
// the map itself. This also ensures that we keep the bytes utilized in the receive
// buffer within a small multiple of the user provided max receive buffer size.
func (a *Association) getMaxTSNOffset() uint32 {
// 4 is a magic number here. There is no theory behind this.
Expand Down
216 changes: 80 additions & 136 deletions association_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2748,89 +2748,96 @@ func (d *udpDiscardReader) Read(b []byte) (n int, err error) {
return d.Conn.Read(b)
}

func TestAssociationReceiveWindow(t *testing.T) {
udp1, udp2 := createUDPConnPair()
ctx, cancel := context.WithCancel(context.Background())
dudp1 := &udpDiscardReader{Conn: udp1, ctx: ctx}
createAssociations := func() (*Association, *Association, error) {
loggerFactory := logging.NewDefaultLoggerFactory()
func createAssociationPair(udpConn1 net.Conn, udpConn2 net.Conn) (*Association, *Association, error) {
loggerFactory := logging.NewDefaultLoggerFactory()

a1Chan := make(chan interface{})
a2Chan := make(chan interface{})
a1Chan := make(chan interface{})
a2Chan := make(chan interface{})

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

go func() {
a, err2 := createClientWithContext(ctx, Config{
NetConn: dudp1,
LoggerFactory: loggerFactory,
})
if err2 != nil {
a1Chan <- err2
} else {
a1Chan <- a
}
}()
go func() {
a, err2 := createClientWithContext(ctx, Config{
NetConn: udpConn1,
LoggerFactory: loggerFactory,
})
if err2 != nil {
a1Chan <- err2
} else {
a1Chan <- a
}
}()

go func() {
a, err2 := createClientWithContext(ctx, Config{
NetConn: udp2,
LoggerFactory: loggerFactory,
MaxReceiveBufferSize: 100_000,
})
if err2 != nil {
a2Chan <- err2
} else {
a2Chan <- a
}
}()
go func() {
a, err2 := createClientWithContext(ctx, Config{
NetConn: udpConn2,
LoggerFactory: loggerFactory,
MaxReceiveBufferSize: 100_000,
})
if err2 != nil {
a2Chan <- err2
} else {
a2Chan <- a
}
}()

var a1 *Association
var a2 *Association
var a1 *Association
var a2 *Association

loop:
for {
select {
case v1 := <-a1Chan:
switch v := v1.(type) {
case *Association:
a1 = v
if a2 != nil {
break loop
}
case error:
return nil, nil, v
loop:
for {
select {
case v1 := <-a1Chan:
switch v := v1.(type) {
case *Association:
a1 = v
if a2 != nil {
break loop
}
case v2 := <-a2Chan:
switch v := v2.(type) {
case *Association:
a2 = v
if a1 != nil {
break loop
}
case error:
return nil, nil, v
case error:
return nil, nil, v
}
case v2 := <-a2Chan:
switch v := v2.(type) {
case *Association:
a2 = v
if a1 != nil {
break loop
}
case error:
return nil, nil, v
}
}
return a1, a2, nil
}
return a1, a2, nil
}

func noErrorClose(t *testing.T, closeF func() error) {
t.Helper()
require.NoError(t, closeF())
}

func TestAssociationReceiveWindow(t *testing.T) {
udp1, udp2 := createUDPConnPair()
ctx, cancel := context.WithCancel(context.Background())
dudp1 := &udpDiscardReader{Conn: udp1, ctx: ctx}
// a1 is the association used for sending data
// a2 is the association with receive window of 100kB which we will
// try to bypass
a1, a2, err := createAssociations()

a1, a2, err := createAssociationPair(dudp1, udp2)
require.NoError(t, err)
defer a2.Close()
defer a1.Close()
defer noErrorClose(t, a2.Close)
defer noErrorClose(t, a1.Close)
s1, err := a1.OpenStream(1, PayloadTypeWebRTCBinary)
require.NoError(t, err)
defer s1.Close()
s1.WriteSCTP([]byte("hello"), PayloadTypeWebRTCBinary)
defer noErrorClose(t, s1.Close)
_, err = s1.WriteSCTP([]byte("hello"), PayloadTypeWebRTCBinary)
require.NoError(t, err)
dudp1.block.Store(true)

s1.WriteSCTP([]byte("hello"), PayloadTypeWebRTCBinary)
_, err = s1.WriteSCTP([]byte("hello"), PayloadTypeWebRTCBinary)
require.NoError(t, err)
s2, err := a2.AcceptStream()
require.NoError(t, err)
require.Equal(t, uint16(1), s2.streamIdentifier)
Expand All @@ -2848,7 +2855,7 @@ func TestAssociationReceiveWindow(t *testing.T) {
return
default:
}
chunk.tsn -= 1
chunk.tsn--
pp := a1.bundleDataChunksIntoPackets(chunks)
for _, p := range pp {
raw, err := p.marshal(true)
Expand Down Expand Up @@ -2881,83 +2888,20 @@ func TestAssociationReceiveWindow(t *testing.T) {

func TestAssociationMaxTSNOffset(t *testing.T) {
udp1, udp2 := createUDPConnPair()
createAssociations := func() (*Association, *Association, error) {
loggerFactory := logging.NewDefaultLoggerFactory()

a1Chan := make(chan interface{})
a2Chan := make(chan interface{})

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

go func() {
a, err2 := createClientWithContext(ctx, Config{
NetConn: udp1,
LoggerFactory: loggerFactory,
})
if err2 != nil {
a1Chan <- err2
} else {
a1Chan <- a
}
}()

go func() {
a, err2 := createClientWithContext(ctx, Config{
NetConn: udp2,
LoggerFactory: loggerFactory,
MaxReceiveBufferSize: 100_000,
})
if err2 != nil {
a2Chan <- err2
} else {
a2Chan <- a
}
}()

var a1 *Association
var a2 *Association

loop:
for {
select {
case v1 := <-a1Chan:
switch v := v1.(type) {
case *Association:
a1 = v
if a2 != nil {
break loop
}
case error:
return nil, nil, v
}
case v2 := <-a2Chan:
switch v := v2.(type) {
case *Association:
a2 = v
if a1 != nil {
break loop
}
case error:
return nil, nil, v
}
}
}
return a1, a2, nil
}
// a1 is the association used for sending data
// a2 is the association with receive window of 100kB which we will
// try to bypass
a1, a2, err := createAssociations()

a1, a2, err := createAssociationPair(udp1, udp2)
require.NoError(t, err)
defer a2.Close()
defer a1.Close()
defer noErrorClose(t, a2.Close)
defer noErrorClose(t, a1.Close)
s1, err := a1.OpenStream(1, PayloadTypeWebRTCBinary)
require.NoError(t, err)
defer s1.Close()
s1.WriteSCTP([]byte("hello"), PayloadTypeWebRTCBinary)
s1.WriteSCTP([]byte("hello"), PayloadTypeWebRTCBinary)
defer noErrorClose(t, s1.Close)
_, err = s1.WriteSCTP([]byte("hello"), PayloadTypeWebRTCBinary)
require.NoError(t, err)
_, err = s1.WriteSCTP([]byte("hello"), PayloadTypeWebRTCBinary)
require.NoError(t, err)
s2, err := a2.AcceptStream()
require.NoError(t, err)
require.Equal(t, uint16(1), s2.streamIdentifier)
Expand Down

0 comments on commit 6772f59

Please sign in to comment.