diff --git a/rhp/listener.go b/rhp/listener.go index b2784a02..f16b6540 100644 --- a/rhp/listener.go +++ b/rhp/listener.go @@ -1,6 +1,7 @@ package rhp import ( + "context" "net" "golang.org/x/time/rate" @@ -24,7 +25,7 @@ type ( rhpConn struct { net.Conn - + rl, wl *rate.Limiter monitor DataMonitor } @@ -66,6 +67,10 @@ func WithDataMonitor(m DataMonitor) Option { func (c *rhpConn) Read(b []byte) (int, error) { n, err := c.Conn.Read(b) c.monitor.ReadBytes(n) + if err != nil { + return n, err + } + c.rl.WaitN(context.Background(), len(b)) // error can be ignored since context will never be cancelled and len(b) should never exceed burst size return n, err } @@ -74,6 +79,10 @@ func (c *rhpConn) Read(b []byte) (int, error) { func (c *rhpConn) Write(b []byte) (int, error) { n, err := c.Conn.Write(b) c.monitor.WriteBytes(n) + if err != nil { + return n, err + } + c.wl.WaitN(context.Background(), len(b)) // error can be ignored since context will never be cancelled and len(b) should never exceed burst size return n, err } @@ -84,6 +93,8 @@ func (l *rhpListener) Accept() (net.Conn, error) { } return &rhpConn{ Conn: c, + rl: l.readLimiter, + wl: l.writeLimiter, monitor: l.monitor, }, nil }