Skip to content

Commit

Permalink
Reuse outbound HTTP connections (#131)
Browse files Browse the repository at this point in the history
Without this it can quickly lead to the dreaded connection reset by peer
due to the ephemeral ports for HTTP connections being used up.
  • Loading branch information
mhutchinson authored May 2, 2024
1 parent d45e472 commit b0cba6c
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 19 deletions.
34 changes: 19 additions & 15 deletions hammer/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,9 @@ func MonotonicallyIncreasingNextLeaf() func(uint64) uint64 {
// NewLogWriter creates a LogWriter.
// u is the URL of the write endpoint for the log.
// gen is a function that generates new leaves to add.
func NewLogWriter(u *url.URL, gen func() []byte, throttle <-chan bool, errchan chan<- error) *LogWriter {
func NewLogWriter(hc *http.Client, u *url.URL, gen func() []byte, throttle <-chan bool, errchan chan<- error) *LogWriter {
return &LogWriter{
hc: hc,
u: u,
gen: gen,
throttle: throttle,
Expand All @@ -162,6 +163,7 @@ func NewLogWriter(u *url.URL, gen func() []byte, throttle <-chan bool, errchan c

// LogWriter writes new leaves to the log that are generated by `gen`.
type LogWriter struct {
hc *http.Client
u *url.URL
gen func() []byte
throttle <-chan bool
Expand All @@ -170,40 +172,42 @@ type LogWriter struct {
}

// Run runs the log writer. This should be called in a goroutine.
func (r *LogWriter) Run(ctx context.Context) {
if r.cancel != nil {
func (w *LogWriter) Run(ctx context.Context) {
if w.cancel != nil {
panic("LogWriter was ran multiple times")
}
ctx, r.cancel = context.WithCancel(ctx)
ctx, w.cancel = context.WithCancel(ctx)
for {
select {
case <-ctx.Done():
return
case <-r.throttle:
case <-w.throttle:
}
newLeaf := r.gen()
resp, err := http.Post(r.u.String(), "application/octet-stream", bytes.NewReader(newLeaf))
newLeaf := w.gen()

resp, err := w.hc.Post(w.u.String(), "application/octet-stream", bytes.NewReader(newLeaf))
if err != nil {
r.errchan <- fmt.Errorf("failed to write leaf: %v", err)
w.errchan <- fmt.Errorf("failed to write leaf: %v", err)
continue
}
body, err := io.ReadAll(resp.Body)
_ = resp.Body.Close()
if err != nil {
r.errchan <- fmt.Errorf("failed to read body: %v", err)
w.errchan <- fmt.Errorf("failed to read body: %v", err)
continue
}
if resp.StatusCode != http.StatusOK {
r.errchan <- fmt.Errorf("write leaf was not OK. Status code: %d. Body: %q", resp.StatusCode, body)
w.errchan <- fmt.Errorf("write leaf was not OK. Status code: %d. Body: %q", resp.StatusCode, body)
continue
}
if resp.Request.Method != http.MethodPost {
r.errchan <- fmt.Errorf("write leaf was redirected to %s", resp.Request.URL)
w.errchan <- fmt.Errorf("write leaf was redirected to %s", resp.Request.URL)
continue
}
parts := bytes.Split(body, []byte("\n"))
index, err := strconv.Atoi(string(parts[0]))
if err != nil {
r.errchan <- fmt.Errorf("write leaf failed to parse response: %v", body)
w.errchan <- fmt.Errorf("write leaf failed to parse response: %v", body)
continue
}

Expand All @@ -213,8 +217,8 @@ func (r *LogWriter) Run(ctx context.Context) {

// Kills this writer at the next opportune moment.
// This function may return before the writer is dead.
func (r *LogWriter) Kill() {
if r.cancel != nil {
r.cancel()
func (w *LogWriter) Kill() {
if w.cancel != nil {
w.cancel()
}
}
17 changes: 13 additions & 4 deletions hammer/hammer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ var (
leafBundleSize = flag.Int("leaf_bundle_size", 1, "The log-configured number of leaves in each leaf bundle")

showUI = flag.Bool("show_ui", true, "Set to false to disable the text-based UI")

hc = &http.Client{
Transport: &http.Transport{
MaxIdleConns: 256,
MaxIdleConnsPerHost: 256,
DisableKeepAlives: false,
},
}
)

func main() {
Expand Down Expand Up @@ -121,7 +129,7 @@ func NewHammer(tracker *client.LogStateTracker, f client.Fetcher, addURL *url.UR
}
gen := newLeafGenerator()
for i := 0; i < *numWriters; i++ {
writers[i] = NewLogWriter(addURL, gen, writeThrottle.tokenChan, errChan)
writers[i] = NewLogWriter(hc, addURL, gen, writeThrottle.tokenChan, errChan)
}
return &Hammer{
randomReaders: randomReaders,
Expand Down Expand Up @@ -257,14 +265,15 @@ func (t *Throttle) Run(ctx context.Context) {
tokenCount := t.opsPerSecond
timeout := time.After(1 * time.Second)
Loop:
for i := 0; i < tokenCount; i++ {
for i := 0; i < t.opsPerSecond; i++ {
select {
case t.tokenChan <- true:
tokenCount--
case <-timeout:
t.oversupply = tokenCount - i
break Loop
}
}
t.oversupply = tokenCount
}
}
}
Expand Down Expand Up @@ -389,7 +398,7 @@ func readHTTP(ctx context.Context, u *url.URL) ([]byte, error) {
if err != nil {
return nil, err
}
resp, err := http.DefaultClient.Do(req.WithContext(ctx))
resp, err := hc.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}
Expand Down

0 comments on commit b0cba6c

Please sign in to comment.