-
Notifications
You must be signed in to change notification settings - Fork 0
/
config.go
146 lines (126 loc) · 5 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package cluster
import (
"regexp"
"time"
"github.com/IBM/sarama"
)
var minVersion = sarama.V0_9_0_0
type ConsumerMode uint8
const (
ConsumerModeMultiplex ConsumerMode = iota
ConsumerModePartitions
)
// Config extends sarama.Config with Group specific namespace
type Config struct {
sarama.Config
// Group is the namespace for group management properties
Group struct {
// The strategy to use for the allocation of partitions to consumers (defaults to StrategyRange)
PartitionStrategy Strategy
// By default, messages and errors from the subscribed topics and partitions are all multiplexed and
// made available through the consumer's Messages() and Errors() channels.
//
// Users who require low-level access can enable ConsumerModePartitions where individual partitions
// are exposed on the Partitions() channel. Messages and errors must then be consumed on the partitions
// themselves.
Mode ConsumerMode
Offsets struct {
Retry struct {
// The numer retries when committing offsets (defaults to 3).
Max int
}
Synchronization struct {
// The duration allowed for other clients to commit their offsets before resumption in this client, e.g. during a rebalance
// NewConfig sets this to the Consumer.MaxProcessingTime duration of the Sarama configuration
DwellTime time.Duration
}
}
Session struct {
// The allowed session timeout for registered consumers (defaults to 30s).
// Must be within the allowed server range.
Timeout time.Duration
}
Heartbeat struct {
// Interval between each heartbeat (defaults to 3s). It should be no more
// than 1/3rd of the Group.Session.Timout setting
Interval time.Duration
}
// Return specifies which group channels will be populated. If they are set to true,
// you must read from the respective channels to prevent deadlock.
Return struct {
// If enabled, rebalance notification will be returned on the
// Notifications channel (default disabled).
Notifications bool
}
Topics struct {
// An additional whitelist of topics to subscribe to.
Whitelist *regexp.Regexp
// An additional blacklist of topics to avoid. If set, this will precede over
// the Whitelist setting.
Blacklist *regexp.Regexp
}
Member struct {
// Custom metadata to include when joining the group. The user data for all joined members
// can be retrieved by sending a DescribeGroupRequest to the broker that is the
// coordinator for the group.
UserData []byte
}
}
}
// NewConfig returns a new configuration instance with sane defaults.
func NewConfig() *Config {
c := &Config{
Config: *sarama.NewConfig(),
}
c.Group.PartitionStrategy = StrategyRange
c.Group.Offsets.Retry.Max = 3
c.Group.Offsets.Synchronization.DwellTime = c.Consumer.MaxProcessingTime
c.Group.Session.Timeout = 30 * time.Second
c.Group.Heartbeat.Interval = 3 * time.Second
c.Config.Version = minVersion
return c
}
// Validate checks a Config instance. It will return a
// sarama.ConfigurationError if the specified values don't make sense.
func (c *Config) Validate() error {
if c.Group.Heartbeat.Interval%time.Millisecond != 0 {
sarama.Logger.Println("Group.Heartbeat.Interval only supports millisecond precision; nanoseconds will be truncated.")
}
if c.Group.Session.Timeout%time.Millisecond != 0 {
sarama.Logger.Println("Group.Session.Timeout only supports millisecond precision; nanoseconds will be truncated.")
}
if c.Group.PartitionStrategy != StrategyRange && c.Group.PartitionStrategy != StrategyRoundRobin {
sarama.Logger.Println("Group.PartitionStrategy is not supported; range will be assumed.")
}
if !c.Version.IsAtLeast(minVersion) {
sarama.Logger.Println("Version is not supported; 0.9. will be assumed.")
c.Version = minVersion
}
if err := c.Config.Validate(); err != nil {
return err
}
// validate the Group values
switch {
case c.Group.Offsets.Retry.Max < 0:
return sarama.ConfigurationError("Group.Offsets.Retry.Max must be >= 0")
case c.Group.Offsets.Synchronization.DwellTime <= 0:
return sarama.ConfigurationError("Group.Offsets.Synchronization.DwellTime must be > 0")
case c.Group.Offsets.Synchronization.DwellTime > 10*time.Minute:
return sarama.ConfigurationError("Group.Offsets.Synchronization.DwellTime must be <= 10m")
case c.Group.Heartbeat.Interval <= 0:
return sarama.ConfigurationError("Group.Heartbeat.Interval must be > 0")
case c.Group.Session.Timeout <= 0:
return sarama.ConfigurationError("Group.Session.Timeout must be > 0")
case !c.Metadata.Full && c.Group.Topics.Whitelist != nil:
return sarama.ConfigurationError("Metadata.Full must be enabled when Group.Topics.Whitelist is used")
case !c.Metadata.Full && c.Group.Topics.Blacklist != nil:
return sarama.ConfigurationError("Metadata.Full must be enabled when Group.Topics.Blacklist is used")
}
// ensure offset is correct
switch c.Consumer.Offsets.Initial {
case sarama.OffsetOldest, sarama.OffsetNewest:
default:
return sarama.ConfigurationError("Consumer.Offsets.Initial must be either OffsetOldest or OffsetNewest")
}
return nil
}