diff --git a/hook.go b/hook.go index ad57617..4bad7f6 100644 --- a/hook.go +++ b/hook.go @@ -8,6 +8,8 @@ import ( "github.com/rollbar/rollbar-go" "github.com/sirupsen/logrus" + + "github.com/heroku/rollrus/internal/transport" ) var _ logrus.Hook = &Hook{} //assert that *Hook is a logrus.Hook @@ -26,8 +28,11 @@ type Hook struct { // NewHookForLevels provided by the caller. Otherwise works like NewHook. func NewHookForLevels(token string, env string, levels []logrus.Level) *Hook { + client := rollbar.NewSync(token, env, "", "", "") + client.Transport = transport.NewBuffered(client.Transport, rollbar.DefaultBuffer) + return &Hook{ - Client: rollbar.NewSync(token, env, "", "", ""), + Client: client, triggers: levels, ignoredErrors: make([]error, 0), ignoreErrorFunc: func(error) bool { return false }, diff --git a/internal/transport/buffered.go b/internal/transport/buffered.go new file mode 100644 index 0000000..f1094c8 --- /dev/null +++ b/internal/transport/buffered.go @@ -0,0 +1,104 @@ +package transport + +import ( + "context" + "errors" + "sync" + + "github.com/rollbar/rollbar-go" +) + +var ( + errBufferFull = errors.New("rollbar message buffer full") + errClosed = errors.New("rollbar transport closed") +) + +// Buffered is an alternative to rollbar's AsyncTransport, providing +// threadsafe and predictable message delivery built on top of the SyncTransport. +type Buffered struct { + queue chan op + once sync.Once + ctx context.Context + + rollbar.Transport +} + +// op represents an operation queued for transport. It is only valid +// to set a single field in the struct to represent the operation that should +// be performed. +type op struct { + send map[string]interface{} + wait chan struct{} + close bool +} + +// NewBuffered wraps the provided transport for async delivery. +func NewBuffered(inner rollbar.Transport, bufSize int) *Buffered { + ctx, cancel := context.WithCancel(context.Background()) + + t := &Buffered{ + queue: make(chan op, bufSize), + ctx: ctx, + Transport: inner, + } + + go t.run(cancel) + + return t +} + +// Send enqueues delivery of the message body to Rollbar without waiting for +// the result. If the buffer is full, it will immediately return an error. +func (t *Buffered) Send(body map[string]interface{}) error { + select { + case t.queue <- op{send: body}: + return nil + case <-t.ctx.Done(): + return errClosed + default: + return errBufferFull + } +} + +// Wait blocks until all messages buffered before calling Wait are +// delivered. +func (t *Buffered) Wait() { + done := make(chan struct{}) + select { + case t.queue <- op{wait: done}: + case <-t.ctx.Done(): + return + } + + select { + case <-done: + case <-t.ctx.Done(): + } +} + +// Close shuts down the transport and waits for queued messages to be +// delivered. +func (t *Buffered) Close() error { + t.once.Do(func() { + t.queue <- op{close: true} + }) + + <-t.ctx.Done() + return nil +} + +func (t *Buffered) run(cancel func()) { + defer cancel() + + for m := range t.queue { + switch { + case m.send != nil: + _ = t.Transport.Send(m.send) + case m.wait != nil: + close(m.wait) + case m.close: + t.Transport.Close() + return + } + } +} diff --git a/internal/transport/buffered_test.go b/internal/transport/buffered_test.go new file mode 100644 index 0000000..5e3a9f3 --- /dev/null +++ b/internal/transport/buffered_test.go @@ -0,0 +1,151 @@ +package transport + +import ( + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/rollbar/rollbar-go" +) + +func TestBufferedTransportSend(t *testing.T) { + inner := &testTransport{ + sendHook: make(chan map[string]interface{}), + } + transport := NewBuffered(inner, 1) + data := map[string]interface{}{"a": "b"} + + if err := transport.Send(data); err != nil { + t.Fatal(err) + } + + // Verify data is delivered to inner transport + recv := <-inner.sendHook + if recv["a"] != "b" { + t.Errorf("transport sent %v, want %v", recv, data) + } + + var lastErr error + var sent int + for ; sent < 10; sent++ { + if err := transport.Send(data); err != nil { + lastErr = err + break + } + } + + if lastErr != errBufferFull { + t.Fatal("send did not fill buffer") + } + + // drain pending messages + for i := 0; i < sent; i++ { + <-inner.sendHook + } + + transport.Close() + + lastErr = nil + sent = 0 + for ; sent < 10; sent++ { + if err := transport.Send(data); err != nil { + lastErr = err + break + } + } + + if lastErr != errClosed { + t.Fatal("send after close did not return errClosed") + } +} + +func TestBufferedTransportWait(t *testing.T) { + inner := &testTransport{} + transport := NewBuffered(inner, 1) + data := map[string]interface{}{"a": "b"} + + // Wait returns immediately when nothing is queued + transport.Wait() + transport.Wait() + + for i := 0; i < 100; i++ { + _ = transport.Send(data) + transport.Wait() + } + + inner.sendHook = make(chan map[string]interface{}) + waitDone := make(chan struct{}) + + if err := transport.Send(data); err != nil { + t.Fatal(err) + } + go func() { + transport.Wait() + close(waitDone) + }() + + select { + case <-waitDone: + t.Fatal("wait returned before message was sent") + case <-inner.sendHook: + } + + <-waitDone + + transport.Close() + + transport.Wait() // wait returns immediately after closed +} + +// Regression test for original issue with async rollbar client: +// https://github.com/rollbar/rollbar-go/issues/68#issuecomment-540308646 +func TestBufferedTransportRace(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + sync := rollbar.NewSyncTransport("token", srv.URL) + sync.SetLogger(&rollbar.SilentClientLogger{}) + + transport := NewBuffered(sync, 1) + body := map[string]interface{}{ + "hello": "world", + } + started := make(chan struct{}) + go func() { + close(started) + for { + transport.Wait() + } + }() + iter := make([]struct{}, 100) + <-started + for range iter { + err := transport.Send(body) + if err != nil { + if err == errBufferFull { + time.Sleep(time.Millisecond) + continue + } + t.Error("Send returned an unexpected error:", err) + } + } +} + +type testTransport struct { + sendHook chan map[string]interface{} + rollbar.Transport +} + +func (t *testTransport) Send(body map[string]interface{}) error { + if t.sendHook != nil { + t.sendHook <- body + } + return nil +} + +func (t *testTransport) Close() error { + return nil +}