Skip to content

Commit

Permalink
bufferedTransport -> transport.Buffered
Browse files Browse the repository at this point in the history
  • Loading branch information
bernerdschaefer committed Jun 12, 2020
1 parent 0228ff5 commit 1842e48
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 22 deletions.
4 changes: 3 additions & 1 deletion hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

"github.com/rollbar/rollbar-go"
"github.com/sirupsen/logrus"

"github.com/heroku/rollrus/internal/transport"
)

var _ logrus.Hook = &Hook{} //assert that *Hook is a logrus.Hook
Expand All @@ -27,7 +29,7 @@ type Hook struct {
// NewHookForLevels provided by the caller. Otherwise works like NewHook.
func NewHookForLevels(token string, env string, levels []logrus.Level) *Hook {
client := rollbar.NewSync(token, env, "", "", "")
client.Transport = newBufferTransport(client.Transport, rollbar.DefaultBuffer)
client.Transport = transport.NewBuffered(client.Transport, rollbar.DefaultBuffer)

return &Hook{
Client: client,
Expand Down
35 changes: 18 additions & 17 deletions transport.go → internal/transport/buffered.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package rollrus
package transport

import (
"context"
Expand All @@ -13,30 +13,31 @@ var (
errClosed = errors.New("rollbar transport closed")
)

// bufferedTransport is an alternative to rollbar's AsyncTransport, providing
// threadsafe and predictable built on top of the SyncTransport.
type bufferedTransport struct {
queue chan transportOp
// Buffered is an alternative to rollbar's AsyncTransport, providing
// threadsafe and predictable message delivery built on top of the SyncTransport.
type Buffered struct {
queue chan op
once sync.Once
ctx context.Context

rollbar.Transport
}

// transportOp represents an operation queued for transport. It is only valid
// op represents an operation queued for transport. It is only valid
// to set a single field in the struct to represent the operation that should
// be performed.
type transportOp struct {
type op struct {
send map[string]interface{}
wait chan struct{}
close bool
}

func newBufferTransport(inner rollbar.Transport, bufSize int) *bufferedTransport {
// NewBuffered wraps the provided transport for async delivery.
func NewBuffered(inner rollbar.Transport, bufSize int) *Buffered {
ctx, cancel := context.WithCancel(context.Background())

t := &bufferedTransport{
queue: make(chan transportOp, bufSize),
t := &Buffered{
queue: make(chan op, bufSize),
ctx: ctx,
Transport: inner,
}
Expand All @@ -48,9 +49,9 @@ func newBufferTransport(inner rollbar.Transport, bufSize int) *bufferedTransport

// Send enqueues delivery of the message body to Rollbar without waiting for
// the result. If the buffer is full, it will immediately return an error.
func (t *bufferedTransport) Send(body map[string]interface{}) error {
func (t *Buffered) Send(body map[string]interface{}) error {
select {
case t.queue <- transportOp{send: body}:
case t.queue <- op{send: body}:
return nil
case <-t.ctx.Done():
return errClosed
Expand All @@ -61,10 +62,10 @@ func (t *bufferedTransport) Send(body map[string]interface{}) error {

// Wait blocks until all messages buffered before calling Wait are
// delivered.
func (t *bufferedTransport) Wait() {
func (t *Buffered) Wait() {
done := make(chan struct{})
select {
case t.queue <- transportOp{wait: done}:
case t.queue <- op{wait: done}:
case <-t.ctx.Done():
return
}
Expand All @@ -77,16 +78,16 @@ func (t *bufferedTransport) Wait() {

// Close shuts down the transport and waits for queued messages to be
// delivered.
func (t *bufferedTransport) Close() error {
func (t *Buffered) Close() error {
t.once.Do(func() {
t.queue <- transportOp{close: true}
t.queue <- op{close: true}
})

<-t.ctx.Done()
return nil
}

func (t *bufferedTransport) run(cancel func()) {
func (t *Buffered) run(cancel func()) {
defer cancel()

for m := range t.queue {
Expand Down
8 changes: 4 additions & 4 deletions transport_test.go → internal/transport/buffered_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package rollrus
package transport

import (
"net/http"
Expand All @@ -13,7 +13,7 @@ func TestBufferedTransportSend(t *testing.T) {
inner := &testTransport{
sendHook: make(chan map[string]interface{}),
}
transport := newBufferTransport(inner, 1)
transport := NewBuffered(inner, 1)
data := map[string]interface{}{"a": "b"}

if err := transport.Send(data); err != nil {
Expand Down Expand Up @@ -62,7 +62,7 @@ func TestBufferedTransportSend(t *testing.T) {

func TestBufferedTransportWait(t *testing.T) {
inner := &testTransport{}
transport := newBufferTransport(inner, 1)
transport := NewBuffered(inner, 1)
data := map[string]interface{}{"a": "b"}

// Wait returns immediately when nothing is queued
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestBufferedTransportRace(t *testing.T) {
sync := rollbar.NewSyncTransport("token", srv.URL)
sync.SetLogger(&rollbar.SilentClientLogger{})

transport := newBufferTransport(sync, 1)
transport := NewBuffered(sync, 1)
body := map[string]interface{}{
"hello": "world",
}
Expand Down

0 comments on commit 1842e48

Please sign in to comment.