Skip to content
This repository has been archived by the owner on Jun 6, 2023. It is now read-only.

Commit

Permalink
Close responses channel when done (#58)
Browse files Browse the repository at this point in the history
closes #56
  • Loading branch information
nathany authored Jun 14, 2016
1 parent 35ad6ab commit 61a4db0
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 53 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ c.out
*.pass
*.pkpass
!testdata/*
*.sublime-project
11 changes: 5 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,10 @@ HTTP/2 can send multiple requests over a single connection, but `service.Push` w
```go
queue := push.NewQueue(service, workers)

// process responses
// process responses (responses may be received in any order)
go func() {
for {
// Response blocks until a response is available
log.Println(queue.Response())
for resp := range queue.Responses {
log.Println(resp)
}
}()

Expand All @@ -121,7 +120,7 @@ for i := 0; i < number; i++ {
queue.Push(deviceToken, nil, b)
}

// done sending notifications, wait for all responses
// done sending notifications, wait for all responses and shutdown
queue.Wait()
```

Expand Down Expand Up @@ -170,7 +169,7 @@ id, err := service.Push(deviceToken, nil, b)

#### Error responses

If `service.Push` or `queue.Response` returns an error, it could be an HTTP error, or it could be an error response from Apple. To access the Reason and HTTP Status code, you must convert the `error` to a `push.Error` as follows:
Errors from `service.Push` or `queue.Response` could be HTTP errors or an error response from Apple. To access the Reason and HTTP Status code, you must convert the `error` to a `push.Error` as follows:

```go
if e, ok := err.(*push.Error); ok {
Expand Down
12 changes: 6 additions & 6 deletions example/concurrent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ func main() {
queue := push.NewQueue(service, workers)

// process responses
// NOTE: Responses may be received in any order.
go func() {
count := 1
for {
id, device, err := queue.Response()
if err != nil {
log.Printf("(%d) device: %s, error: %v", count, device, err)
for resp := range queue.Responses {
if resp.Err != nil {
log.Printf("(%d) device: %s, error: %v", count, resp.DeviceToken, resp.Err)
} else {
log.Printf("(%d) device: %s, apns-id: %s", count, device, id)
log.Printf("(%d) device: %s, apns-id: %s", count, resp.DeviceToken, resp.ID)
}
count++
}
Expand All @@ -87,7 +87,7 @@ func main() {
for i := 0; i < number; i++ {
queue.Push(deviceToken, nil, b)
}
// done sending notifications, wait for all responses:
// done sending notifications, wait for all responses and shutdown:
queue.Wait()
elapsed := time.Since(start)

Expand Down
7 changes: 3 additions & 4 deletions push/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,9 @@ func BenchmarkPush(b *testing.B) {

// handle responses
go func() {
for {
_, _, err := queue.Response()
if err != nil {
b.Fatal(err)
for resp := range queue.Responses {
if resp.Err != nil {
b.Fatal(resp.Err)
}
}
}()
Expand Down
1 change: 0 additions & 1 deletion push/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
type Headers struct {
// ID for the notification. Apple generates one if omitted.
// This should be a UUID with 32 lowercase hexadecimal digits.
// TODO: use a UUID type.
ID string

// Apple will retry delivery until this time. The default behavior only tries once.
Expand Down
55 changes: 25 additions & 30 deletions push/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,31 @@ import "sync"
type Queue struct {
service *Service
notifications chan notification
responses chan response
Responses chan Response
wg sync.WaitGroup
}

// notification to send.
type notification struct {
DeviceToken string
Headers *Headers
Payload []byte
}

// Response from sending a notification.
type Response struct {
DeviceToken string
ID string
Err error
}

// NewQueue wraps a service with a queue for sending notifications asynchronously.
func NewQueue(service *Service, workers uint) *Queue {
// unbuffered channels
q := &Queue{
service: service,
notifications: make(chan notification),
responses: make(chan response),
Responses: make(chan Response),
}
// startup workers to send notifications
for i := uint(0); i < workers; i++ {
Expand All @@ -36,40 +50,21 @@ func (q *Queue) Push(deviceToken string, headers *Headers, payload []byte) {
q.notifications <- n
}

// Response blocks waiting for a response. Responses may be received in any order.
func (q *Queue) Response() (id string, deviceToken string, err error) {
resp := <-q.responses
q.wg.Done()
return resp.ApnsID, resp.DeviceToken, resp.Err
}

// Wait for all responses to be handled and shutdown workers to stop accepting notifications.
// Wait for all responses to be handled and then close channels.
func (q *Queue) Wait() {
// Stop accepting new notifications and shutdown workers after existing notifications
// are processed:
close(q.notifications)
// Wait for all responses to be handled:
q.wg.Wait()
}

// notification to send.
type notification struct {
DeviceToken string
Headers *Headers
Payload []byte
}

// response from sending a notification.
type response struct {
ApnsID string
Err error
DeviceToken string
// Close responses channel to clean up:
close(q.Responses)
}

func worker(q *Queue) {
for {
n, more := <-q.notifications
if !more {
return
}
for n := range q.notifications {
id, err := q.service.Push(n.DeviceToken, n.Headers, n.Payload)
q.responses <- response{ApnsID: id, Err: err, DeviceToken: n.DeviceToken}
q.Responses <- Response{DeviceToken: n.DeviceToken, ID: id, Err: err}
q.wg.Done()
}
}
11 changes: 5 additions & 6 deletions push/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@ func TestQueuePush(t *testing.T) {
queue := push.NewQueue(service, workers)

go func() {
for i := 0; i < number; i++ {
id, deviceToken, err := queue.Response()
if err != nil {
t.Error(err)
for resp := range queue.Responses {
if resp.Err != nil {
t.Error(resp.Err)
}
if id != deviceToken {
t.Errorf("Expected %q == %q.", id, deviceToken)
if resp.ID != resp.DeviceToken {
t.Errorf("Expected %q == %q.", resp.ID, resp.DeviceToken)
}
}
}()
Expand Down

0 comments on commit 61a4db0

Please sign in to comment.