diff --git a/msq/global_test.go b/msq/global_test.go index 1875b21..0dec88b 100644 --- a/msq/global_test.go +++ b/msq/global_test.go @@ -26,13 +26,13 @@ func setup() { queueConfig = &QueueConfig{ Name: "testing", - MaxRetries: 3, + MaxRetries: 4, MessageTTL: 5 * time.Minute, } listenerConfig = ListenerConfig{ - Interval: time.Second, - Timeout: 500 * time.Millisecond, + Interval: 100 * time.Millisecond, + Timeout: 250 * time.Millisecond, } } diff --git a/msq/listener.go b/msq/listener.go index cb053f4..a25d93a 100644 --- a/msq/listener.go +++ b/msq/listener.go @@ -2,7 +2,6 @@ package msq import ( "context" - "sync" "time" ) @@ -30,8 +29,7 @@ func (l *Listener) Context() context.Context { } func (l *Listener) Start(handle func(Event) bool) { - var wg sync.WaitGroup - wg.Add(1) + started := make(chan bool) go func() { if l.Running { @@ -40,25 +38,30 @@ func (l *Listener) Start(handle func(Event) bool) { defer l.cancel() - l.Running = true + firstTick := true + l.interval = time.NewTicker(l.Config.Interval).C l.stop = make(chan bool) - wg.Done() - for { select { case <-l.interval: - if !l.Running { + if !firstTick && !l.Running { return } + if firstTick { + l.Running = true + started <- true + firstTick = false + } + + timeout := time.NewTimer(l.Config.Timeout).C + go func() { event, err := l.Queue.Pop() if err == nil { - timeout := time.NewTimer(l.Config.Timeout).C - var resultValue bool result := make(chan bool) @@ -75,6 +78,8 @@ func (l *Listener) Start(handle func(Event) bool) { } else { l.Queue.ReQueue(event) } + + break } } }() @@ -85,7 +90,7 @@ func (l *Listener) Start(handle func(Event) bool) { } }() - wg.Wait() + <-started } func (l *Listener) Stop() { diff --git a/msq/listener_test.go b/msq/listener_test.go index c0fb3c5..b9a2453 100644 --- a/msq/listener_test.go +++ b/msq/listener_test.go @@ -72,22 +72,20 @@ func TestHandleFail(t *testing.T) { listener.Start(func(event Event) bool { assert.Equal(t, queuedEvent.UID, event.UID) - return true + return false }) go func() { assert.True(t, listener.Running, "The listener should be started") - time.Sleep(time.Second) + time.Sleep(2 * listenerConfig.Interval) - poppedEvent, err := queue.Pop() + failedEvents, err := queue.Failed() - if assert.Nil(t, err, "We should get an event back as it should've been re-queued") { - assert.Equal(t, poppedEvent.UID, queuedEvent.UID) - queue.Done(poppedEvent) + if assert.Nil(t, err, "We should get a list of failed events back") { + assert.Equal(t, queuedEvent.UID, failedEvents[0].UID) + queue.Done(failedEvents[0]) } - queue.Done(poppedEvent) - listener.Stop() }() @@ -121,20 +119,19 @@ func TestHandleTimeout(t *testing.T) { ctx := listener.Context() listener.Start(func(event Event) bool { - time.Sleep(time.Second) - assert.Equal(t, queuedEvent.UID, event.UID) - return true + time.Sleep(2 * listenerConfig.Timeout) + return false }) go func() { assert.True(t, listener.Running, "The listener should be started") - time.Sleep(time.Second) + time.Sleep(2 * listenerConfig.Interval) - poppedEvent, err := queue.Pop() + failedEvents, err := queue.Failed() - if assert.Nil(t, err, "We should get an event back as it should've been re-queued") { - assert.Equal(t, poppedEvent.UID, queuedEvent.UID) - queue.Done(poppedEvent) + if assert.Nil(t, err, "We should get a list of failed events back") { + assert.Equal(t, queuedEvent.UID, failedEvents[0].UID) + queue.Done(failedEvents[0]) } listener.Stop() diff --git a/msq/queue.go b/msq/queue.go index 1267cff..2d4ed39 100644 --- a/msq/queue.go +++ b/msq/queue.go @@ -24,11 +24,20 @@ func (q *Queue) Done(event *Event) error { } func (q *Queue) ReQueue(event *Event) error { + + now := time.Now() + pushback := time.Now().Add(time.Millisecond * (time.Duration(event.Retries) * 100)) + retries := event.Retries + 1 + return q.Connection.Database(). Unscoped(). Model(event). - Update("deleted_at", nil). - Update("retries", event.Retries+1). + Updates(map[string]interface{}{ + "deleted_at": nil, + "created_at": pushback, + "updated_at": now, + "retries": retries, + }). Error } @@ -38,6 +47,7 @@ func (q *Queue) Pop() (*Event, error) { db := q.Connection.Database() err := db.Order("created_at desc"). + Where("created_at <= ?", time.Now()). Where("retries <= ?", q.Config.MaxRetries). Where("namespace = ?", q.Config.Name). First(event).Error @@ -51,6 +61,23 @@ func (q *Queue) Pop() (*Event, error) { return event, nil } +func (q *Queue) Failed() ([]*Event, error) { + events := []*Event{} + + db := q.Connection.Database() + + err := db.Unscoped().Order("created_at desc"). + Where("namespace = ?", q.Config.Name). + Find(&events). + Error + + if err != nil { + return events, err + } + + return events, nil +} + func (q *Queue) Push(payload Payload) (*Event, error) { encodedPayload, err := payload.Marshal() diff --git a/msq/queue_test.go b/msq/queue_test.go index fb5788f..deec857 100644 --- a/msq/queue_test.go +++ b/msq/queue_test.go @@ -2,6 +2,7 @@ package msq import ( "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -87,20 +88,64 @@ func TestReQueue(t *testing.T) { queue.Configure(queueConfig) - _, err = queue.Push(payload) + originalEvent, err := queue.Push(payload) if assert.Nil(t, err) { event, err := queue.Pop() if assert.Nil(t, err, "There should be an event in the queue") { - err := queue.ReQueue(event) + // Simulate waiting + time.Sleep(listenerConfig.Interval) + err := queue.ReQueue(event) assert.Nil(t, err, "We should have no problem re-queuing the event") + // Simulate waiting + time.Sleep(listenerConfig.Interval) + newEvent, err := queue.Pop() if assert.Nil(t, err, "We should find a requeued event in the queue") { assert.Equal(t, event.UID, newEvent.UID, "We should get back the same event") + assert.NotEqual(t, originalEvent.Retries, newEvent.Retries, "The retries should have increased") + assert.NotEqual(t, originalEvent.UpdatedAt.UnixNano(), newEvent.UpdatedAt.UnixNano(), "The updated at timestamps should be different") + assert.NotEqual(t, originalEvent.CreatedAt.UnixNano(), newEvent.CreatedAt.UnixNano(), "The created at timestamps should be different") + } + } + } +} + +func TestFailed(t *testing.T) { + setup() + defer teardown() + + config := *connectionConfig + queue, err := Connect(config) + + queue.Configure(queueConfig) + + originalEvent, err := queue.Push(payload) + + if assert.Nil(t, err) { + event, err := queue.Pop() + + if assert.Nil(t, err, "There should be an event in the queue") { + // Simulate waiting + time.Sleep(listenerConfig.Interval) + + event.Retries = 3 + + err := queue.ReQueue(event) + assert.Nil(t, err, "We should have no problem re-queuing the event") + + // Simulate waiting + time.Sleep(listenerConfig.Interval) + + failedEvents, err := queue.Failed() + + if assert.Nil(t, err, "We should get a list of a failed events") { + assert.Equal(t, originalEvent.UID, failedEvents[0].UID) + queue.Done(failedEvents[0]) } } }