forked from adjust/rmq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
delivery.go
79 lines (67 loc) · 1.62 KB
/
delivery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package rmq
import (
"fmt"
"github.com/go-redis/redis"
)
type Delivery interface {
Id() int
Payload() string
Ack() bool
Reject() bool
Push() bool
}
type wrapDelivery struct {
id int
payload string
unackedKey string
rejectedKey string
pushKey string
redisClient RedisClient
}
func newDelivery(id int, payload, unackedKey, rejectedKey, pushKey string, redisClient RedisClient) *wrapDelivery {
return &wrapDelivery{
id: id,
payload: payload,
unackedKey: unackedKey,
rejectedKey: rejectedKey,
pushKey: pushKey,
redisClient: redisClient,
}
}
func (delivery *wrapDelivery) String() string {
return fmt.Sprintf("[%s %s]", delivery.payload, delivery.unackedKey)
}
func (delivery *wrapDelivery) Id() int {
return delivery.id
}
func (delivery *wrapDelivery) Payload() string {
return delivery.payload
}
func (delivery *wrapDelivery) Ack() bool {
cmd := delivery.redisClient.RunShaScript("ack", []string{delivery.unackedKey}, delivery.id)
if cmd.Err() != nil && cmd.Err() != redis.Nil {
return false
}
count, err := cmd.Int()
if err != nil {
return false
}
return count == 1
}
func (delivery *wrapDelivery) Reject() bool {
return delivery.move(delivery.rejectedKey)
}
func (delivery *wrapDelivery) Push() bool {
if delivery.pushKey != "" {
return delivery.move(delivery.pushKey)
} else {
return delivery.move(delivery.rejectedKey)
}
}
func (delivery *wrapDelivery) move(key string) bool {
cmd := delivery.redisClient.RunShaScript("move", []string{delivery.unackedKey, key}, delivery.id)
if cmd.Err() != nil && cmd.Err() != redis.Nil {
return false
}
return true
}