Skip to content

Commit

Permalink
Smooth c2s (#179)
Browse files Browse the repository at this point in the history
* Eliminate application-level quantization of throughput measurement.
* Added websocket c2s unit tests and added better logging for flaky e2e test.
* Clarified measurement code.
  • Loading branch information
pboothe authored Sep 4, 2019
1 parent 1a6d6ce commit aeaeaba
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 91 deletions.
4 changes: 2 additions & 2 deletions ndt-server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,8 @@ func Test_MainIntegrationTest(t *testing.T) {
preFileCount := countFiles(dataDir)
stdout, stderr, err := pipe.DividedOutput(pipe.Script(tc.name, pipe.System(tc.cmd)))
if err != nil {
t.Errorf("ERROR %s (Command: %s)\nStdout: %s\nStderr: %s\n",
tc.name, tc.cmd, string(stdout), string(stderr))
t.Errorf("ERROR %s gave error %q (Command: %s)\nStdout: %s\nStderr: %s\n",
tc.name, err, tc.cmd, string(stdout), string(stderr))
}
postFileCount := countFiles(dataDir)
if !tc.ignoreData {
Expand Down
75 changes: 28 additions & 47 deletions ndt5/c2s/c2s.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"strconv"
"time"

"github.com/m-lab/ndt-server/ndt5/metrics"

"github.com/m-lab/go/warnonerror"
"github.com/m-lab/ndt-server/ndt5/metrics"
"github.com/m-lab/ndt-server/ndt5/ndt"
"github.com/m-lab/ndt-server/ndt5/protocol"
"github.com/m-lab/ndt-server/ndt5/web100"
)

// ArchivalData is the data saved by the C2S test. If a researcher wants deeper
Expand Down Expand Up @@ -94,7 +94,7 @@ func ManageTest(ctx context.Context, controlConn protocol.Connection, s ndt.Serv
}

record.StartTime = time.Now()
byteCount, err := drainForeverButMeasureFor(testConn, 10*time.Second)
byteCount, err := drainForeverButMeasureFor(ctx, testConn, 10*time.Second)
record.EndTime = time.Now()
seconds := record.EndTime.Sub(record.StartTime).Seconds()
log.Println("Ended C2S test on", testConn)
Expand Down Expand Up @@ -139,58 +139,39 @@ func ManageTest(ctx context.Context, controlConn protocol.Connection, s ndt.Serv
// measuring the connection for the first part of the drain. This method does
// not close the passed-in Connection, and starts a goroutine which runs until
// that Connection is closed.
func drainForeverButMeasureFor(conn protocol.Connection, d time.Duration) (int64, error) {
type measurement struct {
totalByteCount int64
err error
}
measurements := make(chan measurement)
func drainForeverButMeasureFor(ctx context.Context, conn protocol.MeasuredConnection, d time.Duration) (int64, error) {
derivedCtx, derivedCancel := context.WithTimeout(ctx, d)
defer derivedCancel()

conn.StartMeasuring(derivedCtx)

errs := make(chan error, 1)
// This is the "drain forever" part of this function. Read the passed-in
// connection until the passed-in connection is closed. Only send measurements
// on the measurement channel if the channel can be written to without
// blocking.
// connection until the passed-in connection is closed.
go func() {
var totalByteCount int64
var err error
var connErr error
// Read the connections until the connection is closed. Reading on a closed
// connection returns an error, which terminates the loop and the goroutine.
for err == nil {
var byteCount int64
byteCount, err = conn.ReadBytes()
totalByteCount += byteCount
// Only write to the channel if it won't block, to ensure the reading process
// goes as fast as possible.
select {
case measurements <- measurement{totalByteCount, err}:
default:
}
}
// After we get an error, drain the channel and then close it.
fullChannel := true
for fullChannel {
select {
case <-measurements:
default:
fullChannel = false
}
for connErr == nil {
_, connErr = conn.ReadBytes()
}
close(measurements)
errs <- connErr
}()

// Read the measurements channel until the timer goes off.
timer := time.NewTimer(d)
var bytesRead int64
var socketStats *web100.Metrics
var err error
timerActive := true
for timerActive {
select {
case m := <-measurements:
bytesRead = m.totalByteCount
err = m.err
case <-timer.C:
timerActive = false
}
select {
case <-derivedCtx.Done(): // Wait for timeout
log.Println("Timed out")
socketStats, err = conn.StopMeasuring()
case err = <-errs: // Error in c2s transfer
log.Println("C2S error:", err)
socketStats, _ = conn.StopMeasuring()
}
if socketStats == nil {
return 0, err
}
return bytesRead, err
// The TCPInfo element of socketstats is a value not a pointer, so this is safe
// if socketStats is not nil.
return socketStats.TCPInfo.BytesReceived, err
}
122 changes: 122 additions & 0 deletions ndt5/c2s/c2s_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package c2s

import (
"context"
"net"
"net/http"
"strconv"
"testing"
"time"

"github.com/gorilla/websocket"
"github.com/m-lab/ndt-server/ndt5/singleserving"

"github.com/m-lab/ndt-server/ndt5/tcplistener"

"github.com/m-lab/go/rtx"
"github.com/m-lab/ndt-server/ndt5/protocol"
)

func MustMakeNetConnection(ctx context.Context) (protocol.MeasuredConnection, net.Conn) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
rtx.Must(err, "Could not listen")
tl := &tcplistener.RawListener{TCPListener: listener.(*net.TCPListener)}
conns := make(chan net.Conn)
defer close(conns)
go func() {
clientConn, err := net.Dial("tcp", listener.Addr().String())
rtx.Must(err, "Could not dial temp conn")
conns <- clientConn
}()
conn, err := tl.Accept()
rtx.Must(err, "Could not accept")
return protocol.AdaptNetConn(conn, conn), <-conns
}

func Test_DrainForeverButMeasureFor_NormalOperation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sConn, cConn := MustMakeNetConnection(ctx)
defer sConn.Close()
defer cConn.Close()
// Send for longer than we measure.
go func() {
ctx2, cancel2 := context.WithTimeout(ctx, 10*time.Second)
defer cancel2() // Useless, but makes the linter happpy.
for ctx2.Err() == nil {
cConn.Write([]byte("hello"))
}
cConn.Close()
}()
count, err := drainForeverButMeasureFor(ctx, sConn, time.Duration(100*time.Millisecond))
if err != nil {
t.Error("Should not have gotten error:", err)
}
if count <= 0 {
t.Errorf("Expected positive byte count but got %d", count)
}
}

func Test_DrainForeverButMeasureFor_EarlyClientQuit(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sConn, cConn := MustMakeNetConnection(ctx)
defer sConn.Close()
defer cConn.Close()
// Measure longer than we send.
go func() {
cConn.Write([]byte("hello"))
time.Sleep(100 * time.Millisecond) // Give the drainForever process time to get going
cConn.Close()
}()
count, err := drainForeverButMeasureFor(ctx, sConn, time.Duration(1*time.Second))
if err == nil {
t.Error("Should have gotten an error")
}
if count <= 0 {
t.Errorf("Expected positive byte count but got %d", count)
}
}

func MustMakeWsConnection(ctx context.Context) (protocol.MeasuredConnection, *websocket.Conn) {
srv, err := singleserving.ListenWS("c2s")
rtx.Must(err, "Could not listen")
conns := make(chan *websocket.Conn)
defer close(conns)
go func() {
d := websocket.Dialer{}
// This will actually result in a failed websocket connection attempt because
// we aren't setting any headers. That's okay for testing purposes, as we are
// trying to make sure that the underlying socket stats are counted, and the
// failed upgrade will still result in non-zero socket stats.
clientConn, _, err := d.Dial("ws://localhost:"+strconv.Itoa(srv.Port())+"/ndt_protocol", http.Header{})
rtx.Must(err, "Could not dial temp conn")
conns <- clientConn
}()
conn, err := srv.ServeOnce(ctx)
rtx.Must(err, "Could not accept")
return conn, <-conns
}

func Test_DrainForeverButMeasureFor_CountsAllBytesNotJustWsGoodput(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sConn, cConn := MustMakeWsConnection(ctx)
defer sConn.Close()
defer cConn.Close()
// Send for longer than we measure.
go func() {
// Send nothing. But the websocket handshake used some bytes, so the underlying socket should not measure zero.
ctx2, cancel2 := context.WithTimeout(ctx, 1*time.Second)
defer cancel2() // Useless, but makes the linter happpy.
<-ctx2.Done()
cConn.Close()
}()
count, err := drainForeverButMeasureFor(ctx, sConn, time.Duration(1*time.Millisecond))
if err != nil {
t.Error("Should not have gotten error:", err)
}
if count <= 0 {
t.Errorf("Expected positive byte count but got %d", count)
}
}
44 changes: 29 additions & 15 deletions ndt5/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,32 +126,46 @@ type MeasuredConnection interface {
Measurable
}

// The measurer struct is a hack to ensure that we only have to write the
// complicated measurement code at most once.
// measurer allows all types of connections to embed this struct and be measured
// in the same way. It also means that we have to write the complicated
// measurement code at most once.
type measurer struct {
measurements chan *web100.Metrics
summaryC <-chan *web100.Metrics
cancelMeasurementContext context.CancelFunc
}

// newMeasurer creates a measurer struct with sensible and safe defaults.
func newMeasurer() *measurer {
// We want the channel to be closed by default, not nil. A read on a closed
// channel returns immediately, while a read on a nil channel blocks forever.
c := make(chan *web100.Metrics)
close(c)
return &measurer{
summaryC: c,
// We want the cancel function to always be safe to call.
cancelMeasurementContext: func() {},
}
}

// StartMeasuring starts a polling measurement goroutine that runs until the ctx
// expires. After measurement is complete, the given `fd` is closed.
func (m *measurer) StartMeasuring(ctx context.Context, fd *os.File) {
m.measurements = make(chan *web100.Metrics)
var newctx context.Context
newctx, m.cancelMeasurementContext = context.WithCancel(ctx)
go func() {
defer fd.Close()
web100.MeasureViaPolling(newctx, fd, m.measurements)
}()
m.summaryC = web100.MeasureViaPolling(newctx, fd)
}

// StopMeasuring stops the measurement process and returns the collected
// measurements. The measurement process can also be stopped by cancelling the
// context that was passed in to StartMeasuring().
func (m *measurer) StopMeasuring() (*web100.Metrics, error) {
m.cancelMeasurementContext()
info, ok := <-m.measurements
if !ok {
return nil, errors.New("No data")
m.cancelMeasurementContext() // Start the channel close process.

summary := <-m.summaryC
if summary == nil {
return nil, errors.New("No data returned from web100.MeasureViaPolling due to nil")
}
return info, nil
return summary, nil
}

// wsConnection wraps a websocket connection to allow it to be used as a
Expand All @@ -163,7 +177,7 @@ type wsConnection struct {

// AdaptWsConn turns a websocket Connection into a struct which implements both Measurer and Connection
func AdaptWsConn(ws *websocket.Conn) MeasuredConnection {
return &wsConnection{Conn: ws, measurer: &measurer{}}
return &wsConnection{Conn: ws, measurer: newMeasurer()}
}

func (ws *wsConnection) FillUntil(t time.Time, bytes []byte) (bytesWritten int64, err error) {
Expand Down Expand Up @@ -320,7 +334,7 @@ type MeasuredFlexibleConnection interface {

// AdaptNetConn turns a non-WS-based TCP connection into a protocol.MeasuredConnection that can have its encoding set on the fly.
func AdaptNetConn(conn net.Conn, input io.Reader) MeasuredFlexibleConnection {
return &netConnection{Conn: conn, measurer: &measurer{}, input: input, c2sBuffer: make([]byte, 8192)}
return &netConnection{Conn: conn, measurer: newMeasurer(), input: input, c2sBuffer: make([]byte, 8192)}
}

// ReadTLVMessage reads a single NDT message out of the connection.
Expand Down
58 changes: 32 additions & 26 deletions ndt5/web100/web100_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,45 +22,51 @@ func summarize(snaps []*tcp.LinuxTCPInfo) (*Metrics, error) {
}
}
info := &Metrics{
TCPInfo: *snaps[len(snaps)-1], // Save the last snapshot into the metric struct.
TCPInfo: *snaps[len(snaps)-1], // Save the last snapshot of TCPInfo data into the metric struct.
MinRTT: minrtt / 1000, // Convert microseconds to milliseconds for legacy compatibility.
}
log.Println("Summarized data:", info)
return info, nil
}

// MeasureViaPolling collects all required data by polling. It is required for
// non-BBR connections because MinRTT is one of our critical metrics.
func MeasureViaPolling(ctx context.Context, fp *os.File, c chan *Metrics) {
defer close(c)
func measureUntilContextCancellation(ctx context.Context, fp *os.File) (*Metrics, error) {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
snaps := make([]*tcp.LinuxTCPInfo, 0, 100)
// Poll until the context is canceled.
for {

snaps := make([]*tcp.LinuxTCPInfo, 0, 200) // Enough space for 20 seconds of data.

// Poll until the context is canceled, but never more than once per ticker-firing.
//
// This slightly-funny way of writing the loop ensures that one last
// measurement occurs after the context is canceled (unless the most recent
// measurement and the context cancellation happened simultaneously, in which
// case the most recent measurement should count as the last measurement).
for ; ctx.Err() == nil; <-ticker.C {
// Get the tcp_cc metrics
info, err := tcpinfox.GetTCPInfo(fp)
snapshot, err := tcpinfox.GetTCPInfo(fp)
if err == nil {
snaps = append(snaps, info)
snaps = append(snaps, snapshot)
} else {
log.Println("Getsockopt error:", err)
}
select {
case <-ticker.C:
continue
case <-ctx.Done():
info, err := summarize(snaps)
if err == nil {
c <- info
}
return
}
}
return summarize(snaps)
}

// TODO: Implement BBR support for ndt5 clients.
/*
func MeasureBBR(ctx context.Context, fp *os.File) (Metrics, error) {
return Metrics{}, errors.New("MeasureBBR is unimplemented")
// MeasureViaPolling collects all required data by polling and returns a channel
// for the results. This function may or may not send socket information along
// the channel, depending on whether or not an error occurred. The value is sent
// along the channel sometime after the context is canceled.
func MeasureViaPolling(ctx context.Context, fp *os.File) <-chan *Metrics {
// Give a capacity of 1 because we will only ever send one message and the
// buffer allows the component goroutine to exit when done, no matter what the
// client does.
c := make(chan *Metrics, 1)
go func() {
summary, err := measureUntilContextCancellation(ctx, fp)
if err == nil {
c <- summary
}
close(c)
}()
return c
}
*/
Loading

0 comments on commit aeaeaba

Please sign in to comment.