Skip to content

Commit

Permalink
kgo: reintroduce random broker iteration
Browse files Browse the repository at this point in the history
Random iteration was removed with 1e5c11d
We can reintroduce random iteration easily enough, while still keeping
the behavior of try-a-seed-occasionally.

Closes #579.
  • Loading branch information
twmb committed Oct 22, 2023
1 parent 6a961da commit b2ccc2f
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 22 deletions.
5 changes: 4 additions & 1 deletion pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io"
"math"
"math/rand"
"net"
"os"
"strconv"
Expand Down Expand Up @@ -948,7 +949,9 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {
if latencyMillis > minPessimismMillis {
minPessimismMillis = latencyMillis
}
maxPessimismMillis := float64(lifetimeMillis) * (0.05 - 0.03*cxn.b.cl.rng()) // 95 to 98% of lifetime (pessimism 2% to 5%)
var random float64
cxn.b.cl.rng(func(r *rand.Rand) { random = r.Float64() })
maxPessimismMillis := float64(lifetimeMillis) * (0.05 - 0.03*random) // 95 to 98% of lifetime (pessimism 2% to 5%)

// Our minimum lifetime is always 1s (or latency, if larger).
// When our max pessimism becomes more than min pessimism,
Expand Down
55 changes: 34 additions & 21 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ type Client struct {
ctx context.Context
ctxCancel func()

rng func() float64
rng func(func(*rand.Rand))

brokersMu sync.RWMutex
brokers []*broker // ordered by broker ID
seeds atomic.Value // []*broker, seed brokers, also ordered by ID
anyBrokerIdx int32
anyBrokerOrd []int32 // shuffled brokers, for random ordering
anySeedIdx int32
stopBrokers bool // set to true on close to stop updateBrokers

Expand Down Expand Up @@ -462,13 +462,13 @@ func NewClient(opts ...Opt) (*Client, error) {
ctx: ctx,
ctxCancel: cancel,

rng: func() func() float64 {
rng: func() func(func(*rand.Rand)) {
var mu sync.Mutex
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
return func() float64 {
return func(fn func(*rand.Rand)) {
mu.Lock()
defer mu.Unlock()
return rng.Float64()
fn(rng)
}
}(),

Expand Down Expand Up @@ -733,33 +733,45 @@ func (c *connTimeouter) timeouts(req kmsg.Request) (r, w time.Duration) {
}
}

func (cl *Client) reinitAnyBrokerOrd() {
cl.anyBrokerOrd = append(cl.anyBrokerOrd[:0], make([]int32, len(cl.brokers))...)
for i := range cl.anyBrokerOrd {
cl.anyBrokerOrd[i] = int32(i)
}
cl.rng(func(r *rand.Rand) {
r.Shuffle(len(cl.anyBrokerOrd), func(i, j int) {
cl.anyBrokerOrd[i], cl.anyBrokerOrd[j] = cl.anyBrokerOrd[j], cl.anyBrokerOrd[i]
})
})
}

// broker returns a random broker from all brokers ever known.
func (cl *Client) broker() *broker {
cl.brokersMu.Lock() // full lock needed for anyBrokerIdx below
cl.brokersMu.Lock()
defer cl.brokersMu.Unlock()

// Every time we loop through all discovered brokers, we issue one
// request to the next seed. This ensures that if all discovered
// brokers are down, we will *eventually* loop through seeds and
// hopefully have a reachable seed.
var b *broker
if len(cl.brokers) > 0 && int(cl.anyBrokerIdx) < len(cl.brokers) {
cl.anyBrokerIdx %= int32(len(cl.brokers))
b = cl.brokers[cl.anyBrokerIdx]
cl.anyBrokerIdx++
} else {
seeds := cl.loadSeeds()
cl.anySeedIdx %= int32(len(seeds))
b = seeds[cl.anySeedIdx]
cl.anySeedIdx++

// If we have brokers, we ranged past discovered brokers.
// We now reset the anyBrokerIdx to begin ranging through
// discovered brokers again.
if len(cl.brokers) > 0 {
cl.anyBrokerIdx = 0
}
if len(cl.anyBrokerOrd) > 0 {
b = cl.brokers[cl.anyBrokerOrd[0]]
cl.anyBrokerOrd = cl.anyBrokerOrd[1:]
return b
}

seeds := cl.loadSeeds()
cl.anySeedIdx %= int32(len(seeds))
b = seeds[cl.anySeedIdx]
cl.anySeedIdx++

// If we have brokers, we ranged past discovered brokers.
// We now reset the anyBrokerOrd to begin ranging through
// discovered brokers again. If there are still no brokers,
// this reinit will do nothing and we will keep looping seeds.
cl.reinitAnyBrokerOrd()
return b
}

Expand Down Expand Up @@ -946,6 +958,7 @@ func (cl *Client) updateBrokers(brokers []kmsg.MetadataResponseBroker) {
}

cl.brokers = newBrokers
cl.reinitAnyBrokerOrd()
}

// CloseAllowingRebalance allows rebalances, leaves any group, and closes all
Expand Down

0 comments on commit b2ccc2f

Please sign in to comment.