From aeaeabaca8898fafe2387ef5e86c2a8cf5763f15 Mon Sep 17 00:00:00 2001 From: Peter Boothe Date: Wed, 4 Sep 2019 17:43:03 -0400 Subject: [PATCH] Smooth c2s (#179) * Eliminate application-level quantization of throughput measurement. * Added websocket c2s unit tests and added better logging for flaky e2e test. * Clarified measurement code. --- ndt-server_test.go | 4 +- ndt5/c2s/c2s.go | 75 +++++++++------------- ndt5/c2s/c2s_test.go | 122 ++++++++++++++++++++++++++++++++++++ ndt5/protocol/protocol.go | 44 ++++++++----- ndt5/web100/web100_linux.go | 58 +++++++++-------- ndt5/web100/web100_stub.go | 3 +- 6 files changed, 215 insertions(+), 91 deletions(-) create mode 100644 ndt5/c2s/c2s_test.go diff --git a/ndt-server_test.go b/ndt-server_test.go index e4d810ec..05f8b827 100644 --- a/ndt-server_test.go +++ b/ndt-server_test.go @@ -286,8 +286,8 @@ func Test_MainIntegrationTest(t *testing.T) { preFileCount := countFiles(dataDir) stdout, stderr, err := pipe.DividedOutput(pipe.Script(tc.name, pipe.System(tc.cmd))) if err != nil { - t.Errorf("ERROR %s (Command: %s)\nStdout: %s\nStderr: %s\n", - tc.name, tc.cmd, string(stdout), string(stderr)) + t.Errorf("ERROR %s gave error %q (Command: %s)\nStdout: %s\nStderr: %s\n", + tc.name, err, tc.cmd, string(stdout), string(stderr)) } postFileCount := countFiles(dataDir) if !tc.ignoreData { diff --git a/ndt5/c2s/c2s.go b/ndt5/c2s/c2s.go index cf5f2e04..361cf8db 100644 --- a/ndt5/c2s/c2s.go +++ b/ndt5/c2s/c2s.go @@ -6,11 +6,11 @@ import ( "strconv" "time" - "github.com/m-lab/ndt-server/ndt5/metrics" - "github.com/m-lab/go/warnonerror" + "github.com/m-lab/ndt-server/ndt5/metrics" "github.com/m-lab/ndt-server/ndt5/ndt" "github.com/m-lab/ndt-server/ndt5/protocol" + "github.com/m-lab/ndt-server/ndt5/web100" ) // ArchivalData is the data saved by the C2S test. If a researcher wants deeper @@ -94,7 +94,7 @@ func ManageTest(ctx context.Context, controlConn protocol.Connection, s ndt.Serv } record.StartTime = time.Now() - byteCount, err := drainForeverButMeasureFor(testConn, 10*time.Second) + byteCount, err := drainForeverButMeasureFor(ctx, testConn, 10*time.Second) record.EndTime = time.Now() seconds := record.EndTime.Sub(record.StartTime).Seconds() log.Println("Ended C2S test on", testConn) @@ -139,58 +139,39 @@ func ManageTest(ctx context.Context, controlConn protocol.Connection, s ndt.Serv // measuring the connection for the first part of the drain. This method does // not close the passed-in Connection, and starts a goroutine which runs until // that Connection is closed. -func drainForeverButMeasureFor(conn protocol.Connection, d time.Duration) (int64, error) { - type measurement struct { - totalByteCount int64 - err error - } - measurements := make(chan measurement) +func drainForeverButMeasureFor(ctx context.Context, conn protocol.MeasuredConnection, d time.Duration) (int64, error) { + derivedCtx, derivedCancel := context.WithTimeout(ctx, d) + defer derivedCancel() + conn.StartMeasuring(derivedCtx) + + errs := make(chan error, 1) // This is the "drain forever" part of this function. Read the passed-in - // connection until the passed-in connection is closed. Only send measurements - // on the measurement channel if the channel can be written to without - // blocking. + // connection until the passed-in connection is closed. go func() { - var totalByteCount int64 - var err error + var connErr error // Read the connections until the connection is closed. Reading on a closed // connection returns an error, which terminates the loop and the goroutine. - for err == nil { - var byteCount int64 - byteCount, err = conn.ReadBytes() - totalByteCount += byteCount - // Only write to the channel if it won't block, to ensure the reading process - // goes as fast as possible. - select { - case measurements <- measurement{totalByteCount, err}: - default: - } - } - // After we get an error, drain the channel and then close it. - fullChannel := true - for fullChannel { - select { - case <-measurements: - default: - fullChannel = false - } + for connErr == nil { + _, connErr = conn.ReadBytes() } - close(measurements) + errs <- connErr }() - // Read the measurements channel until the timer goes off. - timer := time.NewTimer(d) - var bytesRead int64 + var socketStats *web100.Metrics var err error - timerActive := true - for timerActive { - select { - case m := <-measurements: - bytesRead = m.totalByteCount - err = m.err - case <-timer.C: - timerActive = false - } + select { + case <-derivedCtx.Done(): // Wait for timeout + log.Println("Timed out") + socketStats, err = conn.StopMeasuring() + case err = <-errs: // Error in c2s transfer + log.Println("C2S error:", err) + socketStats, _ = conn.StopMeasuring() + } + if socketStats == nil { + return 0, err } - return bytesRead, err + // The TCPInfo element of socketstats is a value not a pointer, so this is safe + // if socketStats is not nil. + return socketStats.TCPInfo.BytesReceived, err } diff --git a/ndt5/c2s/c2s_test.go b/ndt5/c2s/c2s_test.go new file mode 100644 index 00000000..05ca7210 --- /dev/null +++ b/ndt5/c2s/c2s_test.go @@ -0,0 +1,122 @@ +package c2s + +import ( + "context" + "net" + "net/http" + "strconv" + "testing" + "time" + + "github.com/gorilla/websocket" + "github.com/m-lab/ndt-server/ndt5/singleserving" + + "github.com/m-lab/ndt-server/ndt5/tcplistener" + + "github.com/m-lab/go/rtx" + "github.com/m-lab/ndt-server/ndt5/protocol" +) + +func MustMakeNetConnection(ctx context.Context) (protocol.MeasuredConnection, net.Conn) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + rtx.Must(err, "Could not listen") + tl := &tcplistener.RawListener{TCPListener: listener.(*net.TCPListener)} + conns := make(chan net.Conn) + defer close(conns) + go func() { + clientConn, err := net.Dial("tcp", listener.Addr().String()) + rtx.Must(err, "Could not dial temp conn") + conns <- clientConn + }() + conn, err := tl.Accept() + rtx.Must(err, "Could not accept") + return protocol.AdaptNetConn(conn, conn), <-conns +} + +func Test_DrainForeverButMeasureFor_NormalOperation(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sConn, cConn := MustMakeNetConnection(ctx) + defer sConn.Close() + defer cConn.Close() + // Send for longer than we measure. + go func() { + ctx2, cancel2 := context.WithTimeout(ctx, 10*time.Second) + defer cancel2() // Useless, but makes the linter happpy. + for ctx2.Err() == nil { + cConn.Write([]byte("hello")) + } + cConn.Close() + }() + count, err := drainForeverButMeasureFor(ctx, sConn, time.Duration(100*time.Millisecond)) + if err != nil { + t.Error("Should not have gotten error:", err) + } + if count <= 0 { + t.Errorf("Expected positive byte count but got %d", count) + } +} + +func Test_DrainForeverButMeasureFor_EarlyClientQuit(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sConn, cConn := MustMakeNetConnection(ctx) + defer sConn.Close() + defer cConn.Close() + // Measure longer than we send. + go func() { + cConn.Write([]byte("hello")) + time.Sleep(100 * time.Millisecond) // Give the drainForever process time to get going + cConn.Close() + }() + count, err := drainForeverButMeasureFor(ctx, sConn, time.Duration(1*time.Second)) + if err == nil { + t.Error("Should have gotten an error") + } + if count <= 0 { + t.Errorf("Expected positive byte count but got %d", count) + } +} + +func MustMakeWsConnection(ctx context.Context) (protocol.MeasuredConnection, *websocket.Conn) { + srv, err := singleserving.ListenWS("c2s") + rtx.Must(err, "Could not listen") + conns := make(chan *websocket.Conn) + defer close(conns) + go func() { + d := websocket.Dialer{} + // This will actually result in a failed websocket connection attempt because + // we aren't setting any headers. That's okay for testing purposes, as we are + // trying to make sure that the underlying socket stats are counted, and the + // failed upgrade will still result in non-zero socket stats. + clientConn, _, err := d.Dial("ws://localhost:"+strconv.Itoa(srv.Port())+"/ndt_protocol", http.Header{}) + rtx.Must(err, "Could not dial temp conn") + conns <- clientConn + }() + conn, err := srv.ServeOnce(ctx) + rtx.Must(err, "Could not accept") + return conn, <-conns +} + +func Test_DrainForeverButMeasureFor_CountsAllBytesNotJustWsGoodput(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sConn, cConn := MustMakeWsConnection(ctx) + defer sConn.Close() + defer cConn.Close() + // Send for longer than we measure. + go func() { + // Send nothing. But the websocket handshake used some bytes, so the underlying socket should not measure zero. + ctx2, cancel2 := context.WithTimeout(ctx, 1*time.Second) + defer cancel2() // Useless, but makes the linter happpy. + <-ctx2.Done() + cConn.Close() + }() + count, err := drainForeverButMeasureFor(ctx, sConn, time.Duration(1*time.Millisecond)) + if err != nil { + t.Error("Should not have gotten error:", err) + } + if count <= 0 { + t.Errorf("Expected positive byte count but got %d", count) + } +} diff --git a/ndt5/protocol/protocol.go b/ndt5/protocol/protocol.go index 46b86f62..675d6b94 100644 --- a/ndt5/protocol/protocol.go +++ b/ndt5/protocol/protocol.go @@ -126,32 +126,46 @@ type MeasuredConnection interface { Measurable } -// The measurer struct is a hack to ensure that we only have to write the -// complicated measurement code at most once. +// measurer allows all types of connections to embed this struct and be measured +// in the same way. It also means that we have to write the complicated +// measurement code at most once. type measurer struct { - measurements chan *web100.Metrics + summaryC <-chan *web100.Metrics cancelMeasurementContext context.CancelFunc } +// newMeasurer creates a measurer struct with sensible and safe defaults. +func newMeasurer() *measurer { + // We want the channel to be closed by default, not nil. A read on a closed + // channel returns immediately, while a read on a nil channel blocks forever. + c := make(chan *web100.Metrics) + close(c) + return &measurer{ + summaryC: c, + // We want the cancel function to always be safe to call. + cancelMeasurementContext: func() {}, + } +} + // StartMeasuring starts a polling measurement goroutine that runs until the ctx // expires. After measurement is complete, the given `fd` is closed. func (m *measurer) StartMeasuring(ctx context.Context, fd *os.File) { - m.measurements = make(chan *web100.Metrics) var newctx context.Context newctx, m.cancelMeasurementContext = context.WithCancel(ctx) - go func() { - defer fd.Close() - web100.MeasureViaPolling(newctx, fd, m.measurements) - }() + m.summaryC = web100.MeasureViaPolling(newctx, fd) } +// StopMeasuring stops the measurement process and returns the collected +// measurements. The measurement process can also be stopped by cancelling the +// context that was passed in to StartMeasuring(). func (m *measurer) StopMeasuring() (*web100.Metrics, error) { - m.cancelMeasurementContext() - info, ok := <-m.measurements - if !ok { - return nil, errors.New("No data") + m.cancelMeasurementContext() // Start the channel close process. + + summary := <-m.summaryC + if summary == nil { + return nil, errors.New("No data returned from web100.MeasureViaPolling due to nil") } - return info, nil + return summary, nil } // wsConnection wraps a websocket connection to allow it to be used as a @@ -163,7 +177,7 @@ type wsConnection struct { // AdaptWsConn turns a websocket Connection into a struct which implements both Measurer and Connection func AdaptWsConn(ws *websocket.Conn) MeasuredConnection { - return &wsConnection{Conn: ws, measurer: &measurer{}} + return &wsConnection{Conn: ws, measurer: newMeasurer()} } func (ws *wsConnection) FillUntil(t time.Time, bytes []byte) (bytesWritten int64, err error) { @@ -320,7 +334,7 @@ type MeasuredFlexibleConnection interface { // AdaptNetConn turns a non-WS-based TCP connection into a protocol.MeasuredConnection that can have its encoding set on the fly. func AdaptNetConn(conn net.Conn, input io.Reader) MeasuredFlexibleConnection { - return &netConnection{Conn: conn, measurer: &measurer{}, input: input, c2sBuffer: make([]byte, 8192)} + return &netConnection{Conn: conn, measurer: newMeasurer(), input: input, c2sBuffer: make([]byte, 8192)} } // ReadTLVMessage reads a single NDT message out of the connection. diff --git a/ndt5/web100/web100_linux.go b/ndt5/web100/web100_linux.go index e1fe92c4..6c9f5321 100644 --- a/ndt5/web100/web100_linux.go +++ b/ndt5/web100/web100_linux.go @@ -22,45 +22,51 @@ func summarize(snaps []*tcp.LinuxTCPInfo) (*Metrics, error) { } } info := &Metrics{ - TCPInfo: *snaps[len(snaps)-1], // Save the last snapshot into the metric struct. + TCPInfo: *snaps[len(snaps)-1], // Save the last snapshot of TCPInfo data into the metric struct. MinRTT: minrtt / 1000, // Convert microseconds to milliseconds for legacy compatibility. } - log.Println("Summarized data:", info) return info, nil } -// MeasureViaPolling collects all required data by polling. It is required for -// non-BBR connections because MinRTT is one of our critical metrics. -func MeasureViaPolling(ctx context.Context, fp *os.File, c chan *Metrics) { - defer close(c) +func measureUntilContextCancellation(ctx context.Context, fp *os.File) (*Metrics, error) { ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() - snaps := make([]*tcp.LinuxTCPInfo, 0, 100) - // Poll until the context is canceled. - for { + + snaps := make([]*tcp.LinuxTCPInfo, 0, 200) // Enough space for 20 seconds of data. + + // Poll until the context is canceled, but never more than once per ticker-firing. + // + // This slightly-funny way of writing the loop ensures that one last + // measurement occurs after the context is canceled (unless the most recent + // measurement and the context cancellation happened simultaneously, in which + // case the most recent measurement should count as the last measurement). + for ; ctx.Err() == nil; <-ticker.C { // Get the tcp_cc metrics - info, err := tcpinfox.GetTCPInfo(fp) + snapshot, err := tcpinfox.GetTCPInfo(fp) if err == nil { - snaps = append(snaps, info) + snaps = append(snaps, snapshot) } else { log.Println("Getsockopt error:", err) } - select { - case <-ticker.C: - continue - case <-ctx.Done(): - info, err := summarize(snaps) - if err == nil { - c <- info - } - return - } } + return summarize(snaps) } -// TODO: Implement BBR support for ndt5 clients. -/* -func MeasureBBR(ctx context.Context, fp *os.File) (Metrics, error) { - return Metrics{}, errors.New("MeasureBBR is unimplemented") +// MeasureViaPolling collects all required data by polling and returns a channel +// for the results. This function may or may not send socket information along +// the channel, depending on whether or not an error occurred. The value is sent +// along the channel sometime after the context is canceled. +func MeasureViaPolling(ctx context.Context, fp *os.File) <-chan *Metrics { + // Give a capacity of 1 because we will only ever send one message and the + // buffer allows the component goroutine to exit when done, no matter what the + // client does. + c := make(chan *Metrics, 1) + go func() { + summary, err := measureUntilContextCancellation(ctx, fp) + if err == nil { + c <- summary + } + close(c) + }() + return c } -*/ diff --git a/ndt5/web100/web100_stub.go b/ndt5/web100/web100_stub.go index f4eeea7e..f00add0d 100644 --- a/ndt5/web100/web100_stub.go +++ b/ndt5/web100/web100_stub.go @@ -9,6 +9,7 @@ import ( // MeasureViaPolling collects all required data by polling. It is required for // non-BBR connections because MinRTT is one of our critical metrics. -func MeasureViaPolling(ctx context.Context, fp *os.File, c chan *Metrics) { +func MeasureViaPolling(ctx context.Context, fp *os.File) <-chan *Metrics { // Just a stub. + return nil }