Skip to content

Commit

Permalink
Apply TxController to ndt5 "plain" socket Accept calls (#268)
Browse files Browse the repository at this point in the history
* Use TxController after Accept()
* Add Accepter interface to ListenAndServe
* Apply TxController to raw ndt5 port, remove from local WS ndt5 port.
  • Loading branch information
stephen-soltesz authored Mar 3, 2020
1 parent c725cb2 commit 3ff6fc6
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 27 deletions.
89 changes: 73 additions & 16 deletions access/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,28 @@ import (
"flag"
"fmt"
"log"
"math"
"net"
"net/http"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/prometheus/procfs"
)

var (
procPath = "/proc"
device string
accessRequests = promauto.NewCounterVec(
procPath = "/proc"
device string
txAccessRequests = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "ndt_access_txcontroller_requests_total",
Help: "Total number of requests handled by the access txcontroller.",
},
[]string{"request"},
[]string{"request", "protocol"},
)
// ErrNoDevice is returned when device is empty or not found in procfs.
ErrNoDevice = errors.New("no device found")
)

Expand Down Expand Up @@ -61,24 +63,61 @@ func NewTxController(rate uint64) (*TxController, error) {
device: device,
limit: rate,
pfs: pfs,
period: time.Second,
period: 100 * time.Millisecond,
}
return tx, nil
}

// Accept wraps the call to listener's Accept. If the TxController is
// limited, then Accept immediately closes the connection and returns an error.
func (tx *TxController) Accept(l net.Listener) (net.Conn, error) {
conn, err := l.Accept()
if tx == nil {
// Simple pass-through.
return conn, err
}
if err != nil {
// No need to check isLimited, the accept failed.
return nil, err
}
if tx.isLimited("raw") {
defer conn.Close()
return nil, fmt.Errorf("TxController rejected connection %s", conn.RemoteAddr())
}
// err was nil, so the conn is good.
return conn, nil
}

// Current exports the current rate. Useful for diagnostics.
func (tx *TxController) Current() uint64 {
return atomic.LoadUint64(&tx.current)
}
func (tx *TxController) set(value uint64) {
atomic.StoreUint64(&tx.current, value)
}

// isLimited checks the current tx rate and returns whether the connection should
// be accepted or rejected.
func (tx *TxController) isLimited(proto string) bool {
cur := tx.Current()
if tx.limit > 0 && cur > tx.limit {
txAccessRequests.WithLabelValues("rejected", proto).Inc()
return true
}
return tx, err
txAccessRequests.WithLabelValues("accepted", proto).Inc()
return false
}

// Limit enforces that the TxController rate limit is respected before running
// the next handler. If the rate is unspecified (zero), all requests are accepted.
func (tx *TxController) Limit(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
cur := atomic.LoadUint64(&tx.current)
if tx.limit > 0 && cur > tx.limit {
accessRequests.WithLabelValues("rejected").Inc()
if tx.isLimited("http") {
// 503 - https://tools.ietf.org/html/rfc7231#section-6.6.4
w.WriteHeader(http.StatusServiceUnavailable)
// Return without additional response.
return
}
accessRequests.WithLabelValues("accepted").Inc() // accepted != success.
next.ServeHTTP(w, r)
})
}
Expand All @@ -100,16 +139,34 @@ func (tx *TxController) Watch(ctx context.Context) error {
return err
}

// Setup.
ratePrev := 0.0
prevTxBytes := v.TxBytes
tickNow := <-t.C // Read first time from ticker.
tickPrev := tickNow.Add(-tx.period) // Initialize difference to expected sample period.
alpha := tx.period.Seconds() / 2 // Alpha controls the decay rate based on configured period.

// Check the device every period until the context returns an error.
for prev := v.TxBytes; ctx.Err() == nil; <-t.C {
v, err := readNetDevLine(tx.pfs, tx.device)
for ; ctx.Err() == nil; tickNow = <-t.C {
cur, err := readNetDevLine(tx.pfs, tx.device)
if err != nil {
log.Println("Error reading /proc/net/dev:", err)
continue
}
cur := (v.TxBytes - prev) * 8
atomic.StoreUint64(&tx.current, cur)
prev = v.TxBytes

// Under heavy load, tickers may fire slow (and then early). Only update
// values when interval is long enough, i.e. more than half the tx.period.
if tickNow.Sub(tickPrev).Seconds() > tx.period.Seconds()/2 {
// Calculate the new rate in bits-per-second, using the actual interval.
rateNow := float64(8*(cur.TxBytes-prevTxBytes)) / tickNow.Sub(tickPrev).Seconds()
// A few seconds for decreases and rapid response for increases.
ratePrev = math.Max(rateNow, (1-alpha)*ratePrev+alpha*rateNow)
tx.set(uint64(ratePrev))

// Save the total bytes sent from this round for the next.
prevTxBytes = cur.TxBytes
tickPrev = tickNow
}
}
return ctx.Err()
}
Expand Down
98 changes: 96 additions & 2 deletions access/tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package access

import (
"context"
"errors"
"net"
"net/http"
"time"

"net/http/httptest"
"reflect"
"testing"
"time"

"github.com/m-lab/go/rtx"

"github.com/prometheus/procfs"
)

Expand Down Expand Up @@ -68,26 +70,36 @@ func TestNewTxController(t *testing.T) {
limit uint64
want *TxController
procPath string
device string
wantErr bool
}{
{
name: "failure",
procPath: "testdata/proc-failure",
device: "eth0",
wantErr: true,
},
{
name: "failure-nodevfile",
procPath: "testdata/proc-nodevfile",
device: "eth0",
wantErr: true,
},
{
name: "failure-nodevice",
procPath: "testdata/proc-nodevice",
device: "eth0",
wantErr: true,
},
{
name: "failure-nodevice",
device: "",
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
device = tt.device
procPath = tt.procPath
got, err := NewTxController(tt.limit)
if (err != nil) != tt.wantErr {
Expand Down Expand Up @@ -131,6 +143,7 @@ func TestTxController_Watch(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
device = "eth0"
procPath = tt.procPath
tx, err := NewTxController(tt.limit)
if err != nil {
Expand All @@ -154,3 +167,84 @@ func TestTxController_Watch(t *testing.T) {
})
}
}

type fakeListener struct {
conn fakeConn
err error
closed int
}

type fakeConn struct {
net.TCPConn
closed int
}

func (c *fakeConn) Close() error {
c.closed++
return nil
}

func (f *fakeListener) Accept() (net.Conn, error) {
return &f.conn, f.err
}
func (f *fakeListener) Close() error {
f.closed++
return nil
}
func (f *fakeListener) Addr() net.Addr {
return &net.TCPAddr{}
}

func TestTxController_Accept(t *testing.T) {
tests := []struct {
name string
l *fakeListener
tx *TxController
wantClosed int
wantErr bool
}{
{
name: "success-accepted",
l: &fakeListener{},
tx: &TxController{
current: 0,
limit: 1,
},
wantClosed: 0,
},
{
name: "success-rejected",
l: &fakeListener{conn: fakeConn{}},
tx: &TxController{
current: 2,
limit: 1,
},
wantClosed: 1,
wantErr: true,
},
{
name: "success-accept-with-nil-tx",
l: &fakeListener{conn: fakeConn{}},
tx: nil, // Accept should work even with a nil tx.
},
{
name: "error-accept-returns-error",
l: &fakeListener{err: errors.New("this is a fake accept error")},
tx: &TxController{},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
conn, err := tt.tx.Accept(tt.l)
if (err != nil) != tt.wantErr {
t.Errorf("TxController.Accept() error = %v, wantErr %v", err, tt.wantErr)
return
}
fc, ok := conn.(*fakeConn)
if conn != nil && ok && fc.closed != tt.wantClosed {
t.Errorf("TxController.Accept() failed to close conn; got %d, want %d", fc.closed, tt.wantClosed)
}
})
}
}
8 changes: 5 additions & 3 deletions ndt-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func main() {
// server if the first three bytes are "GET".
ndt5Server := plain.NewServer(*dataDir+"/ndt5", *ndt5WsAddr)
rtx.Must(
ndt5Server.ListenAndServe(ctx, *ndt5Addr),
ndt5Server.ListenAndServe(ctx, *ndt5Addr, tx),
"Could not start raw server")

// The ndt5 protocol serving Ws-based tests. Most clients are hard-coded to
Expand All @@ -141,8 +141,10 @@ func main() {
ndt5WsMux.Handle("/static/", http.StripPrefix("/static", http.FileServer(http.Dir("html"))))
ndt5WsMux.Handle("/ndt_protocol", ndt5handler.NewWS(*dataDir+"/ndt5"))
ndt5WsServer := &http.Server{
Addr: *ndt5WsAddr,
Handler: ac.Then(logging.MakeAccessLogHandler(ndt5WsMux)),
Addr: *ndt5WsAddr,
// NOTE: do not use `ac.Then()` to prevent 'double jeopardy' for
// forwarded clients when txcontroller is enabled.
Handler: logging.MakeAccessLogHandler(ndt5WsMux),
}
log.Println("About to listen for unencrypted ndt5 NDT tests on " + *ndt5WsAddr)
rtx.Must(listener.ListenAndServeAsync(ndt5WsServer), "Could not start unencrypted ndt5 NDT server")
Expand Down
12 changes: 9 additions & 3 deletions ndt5/plain/plain.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (ps *plainServer) sniffThenHandle(ctx context.Context, conn net.Conn) {

// ListenAndServe starts up the sniffing server that delegates to the
// appropriate just-TCP or WS protocol.Connection.
func (ps *plainServer) ListenAndServe(ctx context.Context, addr string) error {
func (ps *plainServer) ListenAndServe(ctx context.Context, addr string, tx Accepter) error {
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
Expand All @@ -142,7 +142,7 @@ func (ps *plainServer) ListenAndServe(ctx context.Context, addr string) error {
// Serve requests until the context is canceled.
go func() {
for ctx.Err() == nil {
conn, err := ln.Accept()
conn, err := tx.Accept(ln)
if err != nil {
log.Println("Failed to accept connection:", err)
continue
Expand Down Expand Up @@ -199,10 +199,16 @@ func (ps *plainServer) Addr() net.Addr {
return ps.listener.Addr()
}

// Accepter defines an interface the listening server to decide whether to
// accept new connections.
type Accepter interface {
Accept(l net.Listener) (net.Conn, error)
}

// Server is the interface implemented by the non-HTTP-based NDT server.
// Because it isn't run by the http.Server machinery, it has its own interface.
type Server interface {
ListenAndServe(ctx context.Context, addr string) error
ListenAndServe(ctx context.Context, addr string, tx Accepter) error
Addr() net.Addr
}

Expand Down
15 changes: 12 additions & 3 deletions ndt5/plain/plain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ import (
"github.com/m-lab/go/rtx"
)

type fakeAccepter struct{}

func (f *fakeAccepter) Accept(l net.Listener) (net.Conn, error) {
return l.Accept()
}

func TestNewPlainServer(t *testing.T) {
d, err := ioutil.TempDir("", "TestNewPlainServer")
rtx.Must(err, "Could not create tempdir")
Expand Down Expand Up @@ -53,7 +59,8 @@ func TestNewPlainServer(t *testing.T) {
tcpS := NewServer(d, wsSrv.Addr)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rtx.Must(tcpS.ListenAndServe(ctx, ":0"), "Could not start tcp server")
fa := &fakeAccepter{}
rtx.Must(tcpS.ListenAndServe(ctx, ":0", fa), "Could not start tcp server")

t.Run("Test that GET forwarding works", func(t *testing.T) {
url := "http://" + tcpS.Addr().String() + "/test_url"
Expand All @@ -76,7 +83,8 @@ func TestNewPlainServer(t *testing.T) {
})

t.Run("Test that we can't listen and run twice on the same port", func(t *testing.T) {
err := tcpS.ListenAndServe(ctx, tcpS.Addr().String())
fa := &fakeAccepter{}
err := tcpS.ListenAndServe(ctx, tcpS.Addr().String(), fa)
if err == nil {
t.Error("We should not have been able to start a second server")
}
Expand Down Expand Up @@ -106,7 +114,8 @@ func TestNewPlainServerBrokenForwarding(t *testing.T) {
tcpS := NewServer(d, "127.0.0.1:1")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rtx.Must(tcpS.ListenAndServe(ctx, ":0"), "Could not start tcp server")
fa := &fakeAccepter{}
rtx.Must(tcpS.ListenAndServe(ctx, ":0", fa), "Could not start tcp server")

client := &http.Client{
Timeout: 10 * time.Millisecond,
Expand Down

0 comments on commit 3ff6fc6

Please sign in to comment.