Skip to content

Commit

Permalink
Update the API for Listener.Start() to take a number of events to Pop…
Browse files Browse the repository at this point in the history
…, update tests & expected handler params to match this. Update readme with example of this
  • Loading branch information
CraigChilds94 committed Aug 15, 2018
1 parent 9c8420d commit 757922a
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 29 deletions.
13 changes: 10 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,17 @@ listener := &Listener{
ctx := listener.Context()
listener.Start(func(event Event) bool {
fmt.Println("Received event " + event.UID)
// Define how many you want to fetch on each tick
numToFetch := 2
// Start the listener
listener.Start(func(events []Event) bool {
for _, event := range events {
fmt.Println("Received event " + event.UID)
}
return true
})
}, numToFetch)
fmt.Println("Listener started")
Expand Down
56 changes: 38 additions & 18 deletions msq/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@ func (l *Listener) Context() context.Context {
return l.ctx
}

func (l *Listener) Start(handle func(Event) bool) {
func (l *Listener) Start(handle func([]Event) bool, num int) {
started := make(chan bool)

if num < 1 {
num = 1
}

go func() {
if l.Running {
panic("Cannot start the listener whilst it is already running")
Expand Down Expand Up @@ -58,29 +62,45 @@ func (l *Listener) Start(handle func(Event) bool) {

timeout := time.NewTimer(l.Config.Timeout).C

// Go off and actually pull the events
go func() {
event, err := l.Queue.Pop()

if err == nil {
var resultValue bool
result := make(chan bool)

go func(event Event, handle func(Event) bool, result chan bool) {
result <- handle(event)
}(*event, handle, result)
var resultValue bool
result := make(chan bool)
events := []Event{}

// Depending on how many we want, that's what
// we will pop off the queue
for i := 0; i < num; i++ {
event, err := l.Queue.Pop()

if err == nil {
events = append(events, *event)
continue
}
}

select {
case <-timeout:
l.Queue.ReQueue(event)
case resultValue = <-result:
// Go off and handle those events
go func(events []Event, handle func([]Event) bool, result chan bool) {
result <- handle(events)
}(events, handle, result)

// Block on either a timeout on the handle
// or a result from the handle.
select {
case <-timeout:
for _, event := range events {
l.Queue.ReQueue(&event)
}
case resultValue = <-result:
for _, event := range events {
if resultValue {
l.Queue.Done(event)
l.Queue.Done(&event)
} else {
l.Queue.ReQueue(event)
l.Queue.ReQueue(&event)
}

break
}

break
}
}()
case <-l.stop:
Expand Down
21 changes: 13 additions & 8 deletions msq/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@ func TestStartStop(t *testing.T) {

ctx := listener.Context()

listener.Start(func(event Event) bool {
assert.Equal(t, queuedEvent.UID, event.UID)
listener.Start(func(events []Event) bool {
if len(events) > 0 {
assert.Equal(t, queuedEvent.UID, events[0].UID)
}

return true
})
}, 1)

go func() {
assert.True(t, listener.Running, "The listener should be running")
Expand Down Expand Up @@ -70,10 +73,12 @@ func TestHandleFail(t *testing.T) {

ctx := listener.Context()

listener.Start(func(event Event) bool {
assert.Equal(t, queuedEvent.UID, event.UID)
listener.Start(func(events []Event) bool {
if len(events) > 0 {
assert.Equal(t, queuedEvent.UID, events[0].UID)
}
return false
})
}, 1)

go func() {
assert.True(t, listener.Running, "The listener should be started")
Expand Down Expand Up @@ -118,10 +123,10 @@ func TestHandleTimeout(t *testing.T) {

ctx := listener.Context()

listener.Start(func(event Event) bool {
listener.Start(func(events []Event) bool {
time.Sleep(2 * listenerConfig.Timeout)
return false
})
}, 1)

go func() {
assert.True(t, listener.Running, "The listener should be started")
Expand Down

0 comments on commit 757922a

Please sign in to comment.