Skip to content

Commit

Permalink
Merge pull request #44 from heroku/bs-async-transport
Browse files Browse the repository at this point in the history
Deliver non-terminal errors to rollbar asynchronously
  • Loading branch information
bernerdschaefer authored Jun 25, 2020
2 parents 0dcf481 + 1842e48 commit 0274a97
Show file tree
Hide file tree
Showing 3 changed files with 261 additions and 1 deletion.
7 changes: 6 additions & 1 deletion hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

"github.com/rollbar/rollbar-go"
"github.com/sirupsen/logrus"

"github.com/heroku/rollrus/internal/transport"
)

var _ logrus.Hook = &Hook{} //assert that *Hook is a logrus.Hook
Expand All @@ -26,8 +28,11 @@ type Hook struct {

// NewHookForLevels provided by the caller. Otherwise works like NewHook.
func NewHookForLevels(token string, env string, levels []logrus.Level) *Hook {
client := rollbar.NewSync(token, env, "", "", "")
client.Transport = transport.NewBuffered(client.Transport, rollbar.DefaultBuffer)

return &Hook{
Client: rollbar.NewSync(token, env, "", "", ""),
Client: client,
triggers: levels,
ignoredErrors: make([]error, 0),
ignoreErrorFunc: func(error) bool { return false },
Expand Down
104 changes: 104 additions & 0 deletions internal/transport/buffered.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package transport

import (
"context"
"errors"
"sync"

"github.com/rollbar/rollbar-go"
)

var (
errBufferFull = errors.New("rollbar message buffer full")
errClosed = errors.New("rollbar transport closed")
)

// Buffered is an alternative to rollbar's AsyncTransport, providing
// threadsafe and predictable message delivery built on top of the SyncTransport.
type Buffered struct {
queue chan op
once sync.Once
ctx context.Context

rollbar.Transport
}

// op represents an operation queued for transport. It is only valid
// to set a single field in the struct to represent the operation that should
// be performed.
type op struct {
send map[string]interface{}
wait chan struct{}
close bool
}

// NewBuffered wraps the provided transport for async delivery.
func NewBuffered(inner rollbar.Transport, bufSize int) *Buffered {
ctx, cancel := context.WithCancel(context.Background())

t := &Buffered{
queue: make(chan op, bufSize),
ctx: ctx,
Transport: inner,
}

go t.run(cancel)

return t
}

// Send enqueues delivery of the message body to Rollbar without waiting for
// the result. If the buffer is full, it will immediately return an error.
func (t *Buffered) Send(body map[string]interface{}) error {
select {
case t.queue <- op{send: body}:
return nil
case <-t.ctx.Done():
return errClosed
default:
return errBufferFull
}
}

// Wait blocks until all messages buffered before calling Wait are
// delivered.
func (t *Buffered) Wait() {
done := make(chan struct{})
select {
case t.queue <- op{wait: done}:
case <-t.ctx.Done():
return
}

select {
case <-done:
case <-t.ctx.Done():
}
}

// Close shuts down the transport and waits for queued messages to be
// delivered.
func (t *Buffered) Close() error {
t.once.Do(func() {
t.queue <- op{close: true}
})

<-t.ctx.Done()
return nil
}

func (t *Buffered) run(cancel func()) {
defer cancel()

for m := range t.queue {
switch {
case m.send != nil:
_ = t.Transport.Send(m.send)
case m.wait != nil:
close(m.wait)
case m.close:
t.Transport.Close()
return
}
}
}
151 changes: 151 additions & 0 deletions internal/transport/buffered_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package transport

import (
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/rollbar/rollbar-go"
)

func TestBufferedTransportSend(t *testing.T) {
inner := &testTransport{
sendHook: make(chan map[string]interface{}),
}
transport := NewBuffered(inner, 1)
data := map[string]interface{}{"a": "b"}

if err := transport.Send(data); err != nil {
t.Fatal(err)
}

// Verify data is delivered to inner transport
recv := <-inner.sendHook
if recv["a"] != "b" {
t.Errorf("transport sent %v, want %v", recv, data)
}

var lastErr error
var sent int
for ; sent < 10; sent++ {
if err := transport.Send(data); err != nil {
lastErr = err
break
}
}

if lastErr != errBufferFull {
t.Fatal("send did not fill buffer")
}

// drain pending messages
for i := 0; i < sent; i++ {
<-inner.sendHook
}

transport.Close()

lastErr = nil
sent = 0
for ; sent < 10; sent++ {
if err := transport.Send(data); err != nil {
lastErr = err
break
}
}

if lastErr != errClosed {
t.Fatal("send after close did not return errClosed")
}
}

func TestBufferedTransportWait(t *testing.T) {
inner := &testTransport{}
transport := NewBuffered(inner, 1)
data := map[string]interface{}{"a": "b"}

// Wait returns immediately when nothing is queued
transport.Wait()
transport.Wait()

for i := 0; i < 100; i++ {
_ = transport.Send(data)
transport.Wait()
}

inner.sendHook = make(chan map[string]interface{})
waitDone := make(chan struct{})

if err := transport.Send(data); err != nil {
t.Fatal(err)
}
go func() {
transport.Wait()
close(waitDone)
}()

select {
case <-waitDone:
t.Fatal("wait returned before message was sent")
case <-inner.sendHook:
}

<-waitDone

transport.Close()

transport.Wait() // wait returns immediately after closed
}

// Regression test for original issue with async rollbar client:
// https://github.com/rollbar/rollbar-go/issues/68#issuecomment-540308646
func TestBufferedTransportRace(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer srv.Close()

sync := rollbar.NewSyncTransport("token", srv.URL)
sync.SetLogger(&rollbar.SilentClientLogger{})

transport := NewBuffered(sync, 1)
body := map[string]interface{}{
"hello": "world",
}
started := make(chan struct{})
go func() {
close(started)
for {
transport.Wait()
}
}()
iter := make([]struct{}, 100)
<-started
for range iter {
err := transport.Send(body)
if err != nil {
if err == errBufferFull {
time.Sleep(time.Millisecond)
continue
}
t.Error("Send returned an unexpected error:", err)
}
}
}

type testTransport struct {
sendHook chan map[string]interface{}
rollbar.Transport
}

func (t *testTransport) Send(body map[string]interface{}) error {
if t.sendHook != nil {
t.sendHook <- body
}
return nil
}

func (t *testTransport) Close() error {
return nil
}

0 comments on commit 0274a97

Please sign in to comment.