Skip to content

Commit

Permalink
client: pass the provided ctx to backoff.RetryNotify
Browse files Browse the repository at this point in the history
If we hit a connection error in any of the ..WithContext functions, we
end up blocked in the backoff.RetryNotify loop. This loop can use a
context, but it needs to be bound to the BackOff provided when
RetryNotify is called.

Added a test case `TestSubscribeWithContextAbortRetrier` to
demonstrate the problem and verify the fix.

Fixes r3labs#131.
  • Loading branch information
wade-arista committed Jan 24, 2023
1 parent c6d5381 commit fe8f773
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 17 deletions.
31 changes: 14 additions & 17 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,7 @@ func (c *Client) SubscribeWithContext(ctx context.Context, stream string, handle
}
}
}

// Apply user specified reconnection strategy or default to standard NewExponentialBackOff() reconnection method
var err error
if c.ReconnectStrategy != nil {
err = backoff.RetryNotify(operation, c.ReconnectStrategy, c.ReconnectNotify)
} else {
err = backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), c.ReconnectNotify)
}
return err
return c.retryNotify(ctx, operation)
}

// SubscribeChan sends all events to the provided channel
Expand Down Expand Up @@ -183,14 +175,7 @@ func (c *Client) SubscribeChanWithContext(ctx context.Context, stream string, ch

go func() {
defer c.cleanup(ch)
// Apply user specified reconnection strategy or default to standard NewExponentialBackOff() reconnection method
var err error
if c.ReconnectStrategy != nil {
err = backoff.RetryNotify(operation, c.ReconnectStrategy, c.ReconnectNotify)
} else {
err = backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), c.ReconnectNotify)
}

err := c.retryNotify(ctx, operation)
// channel closed once connected
if err != nil && !connected {
errch <- err
Expand All @@ -201,6 +186,18 @@ func (c *Client) SubscribeChanWithContext(ctx context.Context, stream string, ch
return err
}

func (c *Client) retryNotify(ctx context.Context, operation func() error) error {
var bk backoff.BackOff
// Apply user specified reconnection strategy or default to standard NewExponentialBackOff() reconnection method
if c.ReconnectStrategy != nil {
bk = c.ReconnectStrategy
} else {
bk = backoff.NewExponentialBackOff()
}
bk = backoff.WithContext(bk, ctx)
return backoff.RetryNotify(operation, bk, c.ReconnectNotify)
}

func (c *Client) startReadLoop(reader *EventStreamReader) (chan *Event, chan error) {
outCh := make(chan *Event)
erChan := make(chan error)
Expand Down
26 changes: 26 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,3 +422,29 @@ func TestSubscribeWithContextDone(t *testing.T) {

assert.Equal(t, n1, n2)
}

func TestSubscribeWithContextAbortRetrier(t *testing.T) {
// Run a server that only responds with HTTP errors which will put the client into the
// backoff.RetryNotify loop.
const status = http.StatusBadGateway
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
t.Log(r.Method, r.URL.String(), http.StatusText(status))
w.WriteHeader(status)
}))
defer srv.Close()

ctx, cancel := context.WithCancel(context.Background())
c := NewClient(srv.URL)
c.ReconnectNotify = backoff.Notify(func(err error, d time.Duration) {
t.Logf("ReconnectNotify err: %v, duration: %s", err, d.String())
// The client has processed the HTTP server error from above, so cancel the context
// for the SubscribeWithContext call.
cancel()
})

err := c.SubscribeWithContext(ctx, "test", func(msg *Event) {
t.Fatal("Received event when none was expected:", msg)
})
require.Error(t, err)
assert.Regexp(t, `could not connect to stream: `+http.StatusText(status), err.Error())
}

0 comments on commit fe8f773

Please sign in to comment.