diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 5579dc83..3199eaef 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "math" + "math/rand" "net" "os" "strconv" @@ -948,7 +949,9 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error { if latencyMillis > minPessimismMillis { minPessimismMillis = latencyMillis } - maxPessimismMillis := float64(lifetimeMillis) * (0.05 - 0.03*cxn.b.cl.rng()) // 95 to 98% of lifetime (pessimism 2% to 5%) + var random float64 + cxn.b.cl.rng(func(r *rand.Rand) { random = r.Float64() }) + maxPessimismMillis := float64(lifetimeMillis) * (0.05 - 0.03*random) // 95 to 98% of lifetime (pessimism 2% to 5%) // Our minimum lifetime is always 1s (or latency, if larger). // When our max pessimism becomes more than min pessimism, diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index d1368f7e..0a02b01a 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -39,12 +39,12 @@ type Client struct { ctx context.Context ctxCancel func() - rng func() float64 + rng func(func(*rand.Rand)) brokersMu sync.RWMutex brokers []*broker // ordered by broker ID seeds atomic.Value // []*broker, seed brokers, also ordered by ID - anyBrokerIdx int32 + anyBrokerOrd []int32 // shuffled brokers, for random ordering anySeedIdx int32 stopBrokers bool // set to true on close to stop updateBrokers @@ -462,13 +462,13 @@ func NewClient(opts ...Opt) (*Client, error) { ctx: ctx, ctxCancel: cancel, - rng: func() func() float64 { + rng: func() func(func(*rand.Rand)) { var mu sync.Mutex rng := rand.New(rand.NewSource(time.Now().UnixNano())) - return func() float64 { + return func(fn func(*rand.Rand)) { mu.Lock() defer mu.Unlock() - return rng.Float64() + fn(rng) } }(), @@ -733,9 +733,21 @@ func (c *connTimeouter) timeouts(req kmsg.Request) (r, w time.Duration) { } } +func (cl *Client) reinitAnyBrokerOrd() { + cl.anyBrokerOrd = append(cl.anyBrokerOrd[:0], make([]int32, len(cl.brokers))...) + for i := range cl.anyBrokerOrd { + cl.anyBrokerOrd[i] = int32(i) + } + cl.rng(func(r *rand.Rand) { + r.Shuffle(len(cl.anyBrokerOrd), func(i, j int) { + cl.anyBrokerOrd[i], cl.anyBrokerOrd[j] = cl.anyBrokerOrd[j], cl.anyBrokerOrd[i] + }) + }) +} + // broker returns a random broker from all brokers ever known. func (cl *Client) broker() *broker { - cl.brokersMu.Lock() // full lock needed for anyBrokerIdx below + cl.brokersMu.Lock() defer cl.brokersMu.Unlock() // Every time we loop through all discovered brokers, we issue one @@ -743,23 +755,23 @@ func (cl *Client) broker() *broker { // brokers are down, we will *eventually* loop through seeds and // hopefully have a reachable seed. var b *broker - if len(cl.brokers) > 0 && int(cl.anyBrokerIdx) < len(cl.brokers) { - cl.anyBrokerIdx %= int32(len(cl.brokers)) - b = cl.brokers[cl.anyBrokerIdx] - cl.anyBrokerIdx++ - } else { - seeds := cl.loadSeeds() - cl.anySeedIdx %= int32(len(seeds)) - b = seeds[cl.anySeedIdx] - cl.anySeedIdx++ - // If we have brokers, we ranged past discovered brokers. - // We now reset the anyBrokerIdx to begin ranging through - // discovered brokers again. - if len(cl.brokers) > 0 { - cl.anyBrokerIdx = 0 - } + if len(cl.anyBrokerOrd) > 0 { + b = cl.brokers[cl.anyBrokerOrd[0]] + cl.anyBrokerOrd = cl.anyBrokerOrd[1:] + return b } + + seeds := cl.loadSeeds() + cl.anySeedIdx %= int32(len(seeds)) + b = seeds[cl.anySeedIdx] + cl.anySeedIdx++ + + // If we have brokers, we ranged past discovered brokers. + // We now reset the anyBrokerOrd to begin ranging through + // discovered brokers again. If there are still no brokers, + // this reinit will do nothing and we will keep looping seeds. + cl.reinitAnyBrokerOrd() return b } @@ -946,6 +958,7 @@ func (cl *Client) updateBrokers(brokers []kmsg.MetadataResponseBroker) { } cl.brokers = newBrokers + cl.reinitAnyBrokerOrd() } // CloseAllowingRebalance allows rebalances, leaves any group, and closes all