Skip to content
This repository has been archived by the owner on Mar 16, 2022. It is now read-only.

Commit

Permalink
Impleted bufferedAmount interface
Browse files Browse the repository at this point in the history
Resolves pion#569
  • Loading branch information
enobufs committed Apr 6, 2019
1 parent 64eecd6 commit c7a5c7b
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 15 deletions.
50 changes: 36 additions & 14 deletions datachannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ type DataChannel struct {
// "blob". This attribute controls how binary data is exposed to scripts.
// binaryType string

// OnBufferedAmountLow func()
// OnError func()

onMessageHandler func(DataChannelMessage)
onOpenHandler func()
onCloseHandler func()
onMessageHandler func(DataChannelMessage)
onOpenHandler func()
onCloseHandler func()
onBufferedAmountLow func()

sctpTransport *SCTPTransport
dataChannel *datachannel.DataChannel
Expand Down Expand Up @@ -141,6 +141,10 @@ func (d *DataChannel) open(sctpTransport *SCTPTransport) error {
return err
}

// bufferedAmountLowThreshold and onBufferedAmountLow might be set earlier
dc.SetBufferedAmountLowThreshold(d.bufferedAmountLowThreshold)
dc.OnBufferedAmountLow(d.onBufferedAmountLow)

d.readyState = DataChannelStateOpen
d.mu.Unlock()

Expand Down Expand Up @@ -458,32 +462,50 @@ func (d *DataChannel) BufferedAmount() uint64 {
d.mu.RLock()
defer d.mu.RUnlock()

// TODO: wire to SCTP (pion/sctp#11)
return 0
if d.dataChannel == nil {
return 0
}
return d.dataChannel.BufferedAmount()
}

// BufferedAmountLowThreshold represents the threshold at which the
// bufferedAmount is considered to be low. When the bufferedAmount decreases
// from above this threshold to equal or below it, the bufferedamountlow
// event fires. BufferedAmountLowThreshold is initially zero on each new
// DataChannel, but the application may change its value at any time.
// The threshold is set to 0 by default.
func (d *DataChannel) BufferedAmountLowThreshold() uint64 {
d.mu.RLock()
defer d.mu.RUnlock()

// TODO: wire to SCTP (pion/sctp#11)
return d.bufferedAmountLowThreshold
if d.dataChannel == nil {
return d.bufferedAmountLowThreshold
}
return d.dataChannel.BufferedAmountLowThreshold()
}

// SetBufferedAmountLowThreshold represents the threshold at which the
// bufferedAmount is considered to be low. When the bufferedAmount decreases
// from above this threshold to equal or below it, the bufferedamountlow
// event fires. BufferedAmountLowThreshold is initially zero on each new
// DataChannel, but the application may change its value at any time.
// SetBufferedAmountLowThreshold is used to update the threshold.
// See BufferedAmountLowThreshold().
func (d *DataChannel) SetBufferedAmountLowThreshold(th uint64) {
d.mu.Lock()
defer d.mu.Unlock()

// TODO: wire to SCTP (pion/sctp#11)
d.bufferedAmountLowThreshold = th

if d.dataChannel != nil {
d.dataChannel.SetBufferedAmountLowThreshold(th)
}
}

// OnBufferedAmountLow sets an event handler which is invoked when
// the number of bytes of outgoing data becomes lower than the
// BufferedAmountLowThreshold.
func (d *DataChannel) OnBufferedAmountLow(f func()) {
d.mu.Lock()
defer d.mu.Unlock()

d.onBufferedAmountLow = f
if d.dataChannel != nil {
d.dataChannel.OnBufferedAmountLow(f)
}
}
150 changes: 150 additions & 0 deletions datachannel_go_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,3 +195,153 @@ func TestDataChannelParamters_Go(t *testing.T) {
assert.Equal(t, uint64(1500), dc.BufferedAmountLowThreshold(), "should match")
})
}

func TestDataChannelBufferedAmount(t *testing.T) {
t.Run("set before datachannel becomes open", func(t *testing.T) {
report := test.CheckRoutines(t)
defer report()

var nCbs int
buf := make([]byte, 1000)

offerPC, answerPC, err := newPair()
if err != nil {
t.Fatalf("Failed to create a PC pair for testing")
}

done := make(chan bool)

answerPC.OnDataChannel(func(d *DataChannel) {
// Make sure this is the data channel we were looking for. (Not the one
// created in signalPair).
if d.Label() != expectedLabel {
return
}
var nPacketsReceived int
d.OnMessage(func(msg DataChannelMessage) {
nPacketsReceived++

if nPacketsReceived == 10 {
go func() {
time.Sleep(time.Second)
done <- true
}()
}
})
assert.True(t, d.Ordered(), "Ordered should be set to true")
})

dc, err := offerPC.CreateDataChannel(expectedLabel, nil)
if err != nil {
t.Fatalf("Failed to create a PC pair for testing")
}

assert.True(t, dc.Ordered(), "Ordered should be set to true")

dc.OnOpen(func() {
for i := 0; i < 10; i++ {
e := dc.Send(buf)
if e != nil {
t.Fatalf("Failed to send string on data channel")
}
assert.Equal(t, uint64(1500), dc.BufferedAmountLowThreshold(), "value mimatch")

//assert.Equal(t, (i+1)*len(buf), int(dc.BufferedAmount()), "unexpected bufferedAmount")
}
})

dc.OnMessage(func(msg DataChannelMessage) {
})

// The value is temporarily stored in the dc object
// until the dc gets opened
dc.SetBufferedAmountLowThreshold(1500)
// The callback function is temporarily stored in the dc object
// until the dc gets opened
dc.OnBufferedAmountLow(func() {
nCbs++
})

err = signalPair(offerPC, answerPC)
if err != nil {
t.Fatalf("Failed to signal our PC pair for testing")
}

closePair(t, offerPC, answerPC, done)

assert.True(t, nCbs > 0, "callback should be made at least once")
})

t.Run("set after datachannel becomes open", func(t *testing.T) {
report := test.CheckRoutines(t)
defer report()

var nCbs int
buf := make([]byte, 1000)

offerPC, answerPC, err := newPair()
if err != nil {
t.Fatalf("Failed to create a PC pair for testing")
}

done := make(chan bool)

answerPC.OnDataChannel(func(d *DataChannel) {
// Make sure this is the data channel we were looking for. (Not the one
// created in signalPair).
if d.Label() != expectedLabel {
return
}
var nPacketsReceived int
d.OnMessage(func(msg DataChannelMessage) {
nPacketsReceived++

if nPacketsReceived == 10 {
go func() {
time.Sleep(time.Second)
done <- true
}()
}
})
assert.True(t, d.Ordered(), "Ordered should be set to true")
})

dc, err := offerPC.CreateDataChannel(expectedLabel, nil)
if err != nil {
t.Fatalf("Failed to create a PC pair for testing")
}

assert.True(t, dc.Ordered(), "Ordered should be set to true")

dc.OnOpen(func() {
// The value should directly be passed to sctp
dc.SetBufferedAmountLowThreshold(1500)
// The callback function should directly be passed to sctp
dc.OnBufferedAmountLow(func() {
nCbs++
})

for i := 0; i < 10; i++ {
e := dc.Send(buf)
if e != nil {
t.Fatalf("Failed to send string on data channel")
}
assert.Equal(t, uint64(1500), dc.BufferedAmountLowThreshold(), "value mimatch")

//assert.Equal(t, (i+1)*len(buf), int(dc.BufferedAmount()), "unexpected bufferedAmount")
}
})

dc.OnMessage(func(msg DataChannelMessage) {
})

err = signalPair(offerPC, answerPC)
if err != nil {
t.Fatalf("Failed to signal our PC pair for testing")
}

closePair(t, offerPC, answerPC, done)

assert.True(t, nCbs > 0, "callback should be made at least once")
})
}
2 changes: 1 addition & 1 deletion datachannel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestDataChannel_Send(t *testing.T) {
closePair(t, offerPC, answerPC, done)
}

func TestDataChannelParamters(t *testing.T) {
func TestDataChannelParameters(t *testing.T) {
report := test.CheckRoutines(t)
defer report()

Expand Down

0 comments on commit c7a5c7b

Please sign in to comment.