-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathenqueue_test.go
54 lines (44 loc) · 1.29 KB
/
enqueue_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package sqlq_test
import (
"encoding/json"
"fmt"
. "github.com/mergestat/sqlq"
"math/rand"
"testing"
"time"
)
func NewAdditionJob(a, b int) *JobDescription {
var input = struct{ A, B int }{A: a, B: b}
var params, _ = json.Marshal(&input)
return NewJobDesc("addition", WithParameters(params), WithRetention(24*time.Hour))
}
func TestEnqueue(t *testing.T) {
var upstream = MustOpen(PostgresUrl)
defer upstream.Close()
var queues = []Queue{"a", "b", "c", "d"}
for i := 0; i < 10; i++ {
var queue = queues[rand.Int()%len(queues)]
t.Run(fmt.Sprintf("enqueue-%d", i), func(t *testing.T) {
time.Sleep(100 * time.Millisecond)
if job, err := Enqueue(upstream, queue, NewAdditionJob(rand.Int(), rand.Int())); err != nil {
t.Fatal(err)
} else {
t.Logf("enqueued: %#v", job)
}
})
}
}
func TestEnqueue_NoEnqueueOnRollback(t *testing.T) {
var upstream = MustOpen(PostgresUrl)
defer upstream.Close()
var tx, _ = upstream.Begin()
if _, err := Enqueue(tx, "enqueue/rollback", NewAdditionJob(1, 2)); err != nil {
t.Fatalf("failed to enqueue job: %v", err)
}
_ = tx.Rollback()
if job, err := Dequeue(upstream, []Queue{"enqueue/rollback"}); err != nil {
t.Fatalf("failed to dequeue job: %v", err)
} else if job != nil {
t.Fatalf("expected job to be nil; got=%#v", job)
}
}