This repository has been archived by the owner on Jan 30, 2023. It is now read-only.
forked from bsm/sarama-cluster
-
Notifications
You must be signed in to change notification settings - Fork 0
/
balancer_test.go
124 lines (107 loc) · 3.84 KB
/
balancer_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package cluster
import (
"github.com/Shopify/sarama"
. "github.com/onsi/ginkgo"
. "github.com/onsi/ginkgo/extensions/table"
. "github.com/onsi/gomega"
)
var _ = Describe("Notification", func() {
It("should init and update", func() {
n := newNotification(map[string][]int32{
"a": {1, 2, 3},
"b": {4, 5},
"c": {1, 2},
})
n.claim(map[string][]int32{
"a": {3, 4},
"b": {1, 2, 3, 4},
"d": {3, 4},
})
Expect(n).To(Equal(&Notification{
Claimed: map[string][]int32{"a": {4}, "b": {1, 2, 3}, "d": {3, 4}},
Released: map[string][]int32{"a": {1, 2}, "b": {5}, "c": {1, 2}},
Current: map[string][]int32{"a": {3, 4}, "b": {1, 2, 3, 4}, "d": {3, 4}},
}))
})
})
var _ = Describe("balancer", func() {
var subject *balancer
BeforeEach(func() {
client := &mockClient{
topics: map[string][]int32{
"one": {0, 1, 2, 3},
"two": {0, 1, 2},
"three": {0, 1},
},
}
var err error
subject, err = newBalancerFromMeta(client, map[string]sarama.ConsumerGroupMemberMetadata{
"b": sarama.ConsumerGroupMemberMetadata{Topics: []string{"three", "one"}},
"a": sarama.ConsumerGroupMemberMetadata{Topics: []string{"one", "two"}},
})
Expect(err).NotTo(HaveOccurred())
})
It("should parse from meta data", func() {
Expect(subject.topics).To(HaveLen(3))
})
It("should perform", func() {
Expect(subject.Perform(StrategyRange)).To(Equal(map[string]map[string][]int32{
"a": {"one": {0, 1}, "two": {0, 1, 2}},
"b": {"one": {2, 3}, "three": {0, 1}},
}))
Expect(subject.Perform(StrategyRoundRobin)).To(Equal(map[string]map[string][]int32{
"a": {"one": {0, 2}, "two": {0, 1, 2}},
"b": {"one": {1, 3}, "three": {0, 1}},
}))
})
})
var _ = Describe("topicInfo", func() {
DescribeTable("Ranges",
func(memberIDs []string, partitions []int32, expected map[string][]int32) {
info := topicInfo{MemberIDs: memberIDs, Partitions: partitions}
Expect(info.Ranges()).To(Equal(expected))
},
Entry("three members, three partitions", []string{"M1", "M2", "M3"}, []int32{0, 1, 2}, map[string][]int32{
"M1": {0}, "M2": {1}, "M3": {2},
}),
Entry("member ID order", []string{"M3", "M1", "M2"}, []int32{0, 1, 2}, map[string][]int32{
"M1": {0}, "M2": {1}, "M3": {2},
}),
Entry("more members than partitions", []string{"M1", "M2", "M3"}, []int32{0, 1}, map[string][]int32{
"M1": {0}, "M3": {1},
}),
Entry("far more members than partitions", []string{"M1", "M2", "M3"}, []int32{0}, map[string][]int32{
"M2": {0},
}),
Entry("fewer members than partitions", []string{"M1", "M2", "M3"}, []int32{0, 1, 2, 3}, map[string][]int32{
"M1": {0}, "M2": {1, 2}, "M3": {3},
}),
Entry("uneven members/partitions ratio", []string{"M1", "M2", "M3"}, []int32{0, 2, 4, 6, 8}, map[string][]int32{
"M1": {0, 2}, "M2": {4}, "M3": {6, 8},
}),
)
DescribeTable("RoundRobin",
func(memberIDs []string, partitions []int32, expected map[string][]int32) {
info := topicInfo{MemberIDs: memberIDs, Partitions: partitions}
Expect(info.RoundRobin()).To(Equal(expected))
},
Entry("three members, three partitions", []string{"M1", "M2", "M3"}, []int32{0, 1, 2}, map[string][]int32{
"M1": {0}, "M2": {1}, "M3": {2},
}),
Entry("member ID order", []string{"M3", "M1", "M2"}, []int32{0, 1, 2}, map[string][]int32{
"M1": {0}, "M2": {1}, "M3": {2},
}),
Entry("more members than partitions", []string{"M1", "M2", "M3"}, []int32{0, 1}, map[string][]int32{
"M1": {0}, "M2": {1},
}),
Entry("far more members than partitions", []string{"M1", "M2", "M3"}, []int32{0}, map[string][]int32{
"M1": {0},
}),
Entry("fewer members than partitions", []string{"M1", "M2", "M3"}, []int32{0, 1, 2, 3}, map[string][]int32{
"M1": {0, 3}, "M2": {1}, "M3": {2},
}),
Entry("uneven members/partitions ratio", []string{"M1", "M2", "M3"}, []int32{0, 2, 4, 6, 8}, map[string][]int32{
"M1": {0, 6}, "M2": {2, 8}, "M3": {4},
}),
)
})