forked from vmihailenco/taskq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
azsqs_test.go
111 lines (90 loc) · 2.4 KB
/
azsqs_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package taskq_test
import (
"os"
"testing"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/vmihailenco/taskq/v3"
"github.com/vmihailenco/taskq/v3/azsqs"
)
var accountID string
func init() {
accountID = os.Getenv("AWS_ACCOUNT_ID")
}
func awsSQS() *sqs.SQS {
return sqs.New(session.New())
}
func azsqsFactory() taskq.Factory {
return azsqs.NewFactory(awsSQS(), accountID)
}
func TestSQSConsumer(t *testing.T) {
testConsumer(t, azsqsFactory(), &taskq.QueueOptions{
Name: queueName("sqs-consumer"),
})
}
func TestSQSUnknownTask(t *testing.T) {
testUnknownTask(t, azsqsFactory(), &taskq.QueueOptions{
Name: queueName("sqs-unknown-task"),
})
}
func TestSQSFallback(t *testing.T) {
testFallback(t, azsqsFactory(), &taskq.QueueOptions{
Name: queueName("sqs-fallback"),
})
}
func TestSQSDelay(t *testing.T) {
testDelay(t, azsqsFactory(), &taskq.QueueOptions{
Name: queueName("sqs-delay"),
})
}
func TestSQSRetry(t *testing.T) {
testRetry(t, azsqsFactory(), &taskq.QueueOptions{
Name: queueName("sqs-retry"),
})
}
func TestSQSNamedMessage(t *testing.T) {
testNamedMessage(t, azsqsFactory(), &taskq.QueueOptions{
Name: queueName("sqs-named-message"),
})
}
func TestSQSCallOnce(t *testing.T) {
testCallOnce(t, azsqsFactory(), &taskq.QueueOptions{
Name: queueName("sqs-call-once"),
})
}
func TestSQSLen(t *testing.T) {
testLen(t, azsqsFactory(), &taskq.QueueOptions{
Name: queueName("sqs-queue-len"),
})
}
func TestSQSRateLimit(t *testing.T) {
testRateLimit(t, azsqsFactory(), &taskq.QueueOptions{
Name: queueName("sqs-rate-limit"),
})
}
func TestSQSErrorDelay(t *testing.T) {
testErrorDelay(t, azsqsFactory(), &taskq.QueueOptions{
Name: queueName("sqs-delayer"),
})
}
func TestSQSWorkerLimit(t *testing.T) {
testWorkerLimit(t, azsqsFactory(), &taskq.QueueOptions{
Name: queueName("sqs-worker-limit"),
})
}
func TestSQSInvalidCredentials(t *testing.T) {
man := azsqs.NewFactory(awsSQS(), "123")
testInvalidCredentials(t, man, &taskq.QueueOptions{
Name: queueName("sqs-invalid-credentials"),
})
}
func TestSQSBatchConsumerSmallMessage(t *testing.T) {
testBatchConsumer(t, azsqsFactory(), &taskq.QueueOptions{
Name: queueName("sqs-batch-consumer-small-message"),
}, 100)
}
func TestSQSBatchConsumerLarge(t *testing.T) {
testBatchConsumer(t, azsqsFactory(), &taskq.QueueOptions{
Name: queueName("sqs-batch-processor-large-message"),
}, 64000)
}