Skip to content

Commit

Permalink
Fix race conditions with listener again; implement a way of getting f…
Browse files Browse the repository at this point in the history
…ailed messsages. Add some pushback so we delay the receival of messages
  • Loading branch information
CraigChilds94 committed Aug 6, 2018
1 parent c67ec27 commit f8df289
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 33 deletions.
6 changes: 3 additions & 3 deletions msq/global_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ func setup() {

queueConfig = &QueueConfig{
Name: "testing",
MaxRetries: 3,
MaxRetries: 4,
MessageTTL: 5 * time.Minute,
}

listenerConfig = ListenerConfig{
Interval: time.Second,
Timeout: 500 * time.Millisecond,
Interval: 100 * time.Millisecond,
Timeout: 250 * time.Millisecond,
}
}

Expand Down
25 changes: 15 additions & 10 deletions msq/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package msq

import (
"context"
"sync"
"time"
)

Expand Down Expand Up @@ -30,8 +29,7 @@ func (l *Listener) Context() context.Context {
}

func (l *Listener) Start(handle func(Event) bool) {
var wg sync.WaitGroup
wg.Add(1)
started := make(chan bool)

go func() {
if l.Running {
Expand All @@ -40,25 +38,30 @@ func (l *Listener) Start(handle func(Event) bool) {

defer l.cancel()

l.Running = true
firstTick := true

l.interval = time.NewTicker(l.Config.Interval).C
l.stop = make(chan bool)

wg.Done()

for {
select {
case <-l.interval:
if !l.Running {
if !firstTick && !l.Running {
return
}

if firstTick {
l.Running = true
started <- true
firstTick = false
}

timeout := time.NewTimer(l.Config.Timeout).C

go func() {
event, err := l.Queue.Pop()

if err == nil {
timeout := time.NewTimer(l.Config.Timeout).C

var resultValue bool
result := make(chan bool)

Expand All @@ -75,6 +78,8 @@ func (l *Listener) Start(handle func(Event) bool) {
} else {
l.Queue.ReQueue(event)
}

break
}
}
}()
Expand All @@ -85,7 +90,7 @@ func (l *Listener) Start(handle func(Event) bool) {
}
}()

wg.Wait()
<-started
}

func (l *Listener) Stop() {
Expand Down
29 changes: 13 additions & 16 deletions msq/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,22 +72,20 @@ func TestHandleFail(t *testing.T) {

listener.Start(func(event Event) bool {
assert.Equal(t, queuedEvent.UID, event.UID)
return true
return false
})

go func() {
assert.True(t, listener.Running, "The listener should be started")
time.Sleep(time.Second)
time.Sleep(2 * listenerConfig.Interval)

poppedEvent, err := queue.Pop()
failedEvents, err := queue.Failed()

if assert.Nil(t, err, "We should get an event back as it should've been re-queued") {
assert.Equal(t, poppedEvent.UID, queuedEvent.UID)
queue.Done(poppedEvent)
if assert.Nil(t, err, "We should get a list of failed events back") {
assert.Equal(t, queuedEvent.UID, failedEvents[0].UID)
queue.Done(failedEvents[0])
}

queue.Done(poppedEvent)

listener.Stop()
}()

Expand Down Expand Up @@ -121,20 +119,19 @@ func TestHandleTimeout(t *testing.T) {
ctx := listener.Context()

listener.Start(func(event Event) bool {
time.Sleep(time.Second)
assert.Equal(t, queuedEvent.UID, event.UID)
return true
time.Sleep(2 * listenerConfig.Timeout)
return false
})

go func() {
assert.True(t, listener.Running, "The listener should be started")
time.Sleep(time.Second)
time.Sleep(2 * listenerConfig.Interval)

poppedEvent, err := queue.Pop()
failedEvents, err := queue.Failed()

if assert.Nil(t, err, "We should get an event back as it should've been re-queued") {
assert.Equal(t, poppedEvent.UID, queuedEvent.UID)
queue.Done(poppedEvent)
if assert.Nil(t, err, "We should get a list of failed events back") {
assert.Equal(t, queuedEvent.UID, failedEvents[0].UID)
queue.Done(failedEvents[0])
}

listener.Stop()
Expand Down
31 changes: 29 additions & 2 deletions msq/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,20 @@ func (q *Queue) Done(event *Event) error {
}

func (q *Queue) ReQueue(event *Event) error {

now := time.Now()
pushback := time.Now().Add(time.Millisecond * (time.Duration(event.Retries) * 100))
retries := event.Retries + 1

return q.Connection.Database().
Unscoped().
Model(event).
Update("deleted_at", nil).
Update("retries", event.Retries+1).
Updates(map[string]interface{}{
"deleted_at": nil,
"created_at": pushback,
"updated_at": now,
"retries": retries,
}).
Error
}

Expand All @@ -38,6 +47,7 @@ func (q *Queue) Pop() (*Event, error) {
db := q.Connection.Database()

err := db.Order("created_at desc").
Where("created_at <= ?", time.Now()).
Where("retries <= ?", q.Config.MaxRetries).
Where("namespace = ?", q.Config.Name).
First(event).Error
Expand All @@ -51,6 +61,23 @@ func (q *Queue) Pop() (*Event, error) {
return event, nil
}

func (q *Queue) Failed() ([]*Event, error) {
events := []*Event{}

db := q.Connection.Database()

err := db.Unscoped().Order("created_at desc").
Where("namespace = ?", q.Config.Name).
Find(&events).
Error

if err != nil {
return events, err
}

return events, nil
}

func (q *Queue) Push(payload Payload) (*Event, error) {
encodedPayload, err := payload.Marshal()

Expand Down
49 changes: 47 additions & 2 deletions msq/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package msq

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -87,20 +88,64 @@ func TestReQueue(t *testing.T) {

queue.Configure(queueConfig)

_, err = queue.Push(payload)
originalEvent, err := queue.Push(payload)

if assert.Nil(t, err) {
event, err := queue.Pop()

if assert.Nil(t, err, "There should be an event in the queue") {
err := queue.ReQueue(event)
// Simulate waiting
time.Sleep(listenerConfig.Interval)

err := queue.ReQueue(event)
assert.Nil(t, err, "We should have no problem re-queuing the event")

// Simulate waiting
time.Sleep(listenerConfig.Interval)

newEvent, err := queue.Pop()

if assert.Nil(t, err, "We should find a requeued event in the queue") {
assert.Equal(t, event.UID, newEvent.UID, "We should get back the same event")
assert.NotEqual(t, originalEvent.Retries, newEvent.Retries, "The retries should have increased")
assert.NotEqual(t, originalEvent.UpdatedAt.UnixNano(), newEvent.UpdatedAt.UnixNano(), "The updated at timestamps should be different")
assert.NotEqual(t, originalEvent.CreatedAt.UnixNano(), newEvent.CreatedAt.UnixNano(), "The created at timestamps should be different")
}
}
}
}

func TestFailed(t *testing.T) {
setup()
defer teardown()

config := *connectionConfig
queue, err := Connect(config)

queue.Configure(queueConfig)

originalEvent, err := queue.Push(payload)

if assert.Nil(t, err) {
event, err := queue.Pop()

if assert.Nil(t, err, "There should be an event in the queue") {
// Simulate waiting
time.Sleep(listenerConfig.Interval)

event.Retries = 3

err := queue.ReQueue(event)
assert.Nil(t, err, "We should have no problem re-queuing the event")

// Simulate waiting
time.Sleep(listenerConfig.Interval)

failedEvents, err := queue.Failed()

if assert.Nil(t, err, "We should get a list of a failed events") {
assert.Equal(t, originalEvent.UID, failedEvents[0].UID)
queue.Done(failedEvents[0])
}
}
}
Expand Down

0 comments on commit f8df289

Please sign in to comment.