Skip to content

Commit

Permalink
Merge pull request #606 from twmb/579
Browse files Browse the repository at this point in the history
kgo: reintroduce random broker iteration
  • Loading branch information
twmb authored Oct 22, 2023
2 parents ec02fac + b2ccc2f commit 913b4b0
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 22 deletions.
5 changes: 4 additions & 1 deletion pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io"
"math"
"math/rand"
"net"
"os"
"strconv"
Expand Down Expand Up @@ -948,7 +949,9 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {
if latencyMillis > minPessimismMillis {
minPessimismMillis = latencyMillis
}
maxPessimismMillis := float64(lifetimeMillis) * (0.05 - 0.03*cxn.b.cl.rng()) // 95 to 98% of lifetime (pessimism 2% to 5%)
var random float64
cxn.b.cl.rng(func(r *rand.Rand) { random = r.Float64() })
maxPessimismMillis := float64(lifetimeMillis) * (0.05 - 0.03*random) // 95 to 98% of lifetime (pessimism 2% to 5%)

// Our minimum lifetime is always 1s (or latency, if larger).
// When our max pessimism becomes more than min pessimism,
Expand Down
55 changes: 34 additions & 21 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ type Client struct {
ctx context.Context
ctxCancel func()

rng func() float64
rng func(func(*rand.Rand))

brokersMu sync.RWMutex
brokers []*broker // ordered by broker ID
seeds atomic.Value // []*broker, seed brokers, also ordered by ID
anyBrokerIdx int32
anyBrokerOrd []int32 // shuffled brokers, for random ordering
anySeedIdx int32
stopBrokers bool // set to true on close to stop updateBrokers

Expand Down Expand Up @@ -462,13 +462,13 @@ func NewClient(opts ...Opt) (*Client, error) {
ctx: ctx,
ctxCancel: cancel,

rng: func() func() float64 {
rng: func() func(func(*rand.Rand)) {
var mu sync.Mutex
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
return func() float64 {
return func(fn func(*rand.Rand)) {
mu.Lock()
defer mu.Unlock()
return rng.Float64()
fn(rng)
}
}(),

Expand Down Expand Up @@ -733,33 +733,45 @@ func (c *connTimeouter) timeouts(req kmsg.Request) (r, w time.Duration) {
}
}

func (cl *Client) reinitAnyBrokerOrd() {
cl.anyBrokerOrd = append(cl.anyBrokerOrd[:0], make([]int32, len(cl.brokers))...)
for i := range cl.anyBrokerOrd {
cl.anyBrokerOrd[i] = int32(i)
}
cl.rng(func(r *rand.Rand) {
r.Shuffle(len(cl.anyBrokerOrd), func(i, j int) {
cl.anyBrokerOrd[i], cl.anyBrokerOrd[j] = cl.anyBrokerOrd[j], cl.anyBrokerOrd[i]
})
})
}

// broker returns a random broker from all brokers ever known.
func (cl *Client) broker() *broker {
cl.brokersMu.Lock() // full lock needed for anyBrokerIdx below
cl.brokersMu.Lock()
defer cl.brokersMu.Unlock()

// Every time we loop through all discovered brokers, we issue one
// request to the next seed. This ensures that if all discovered
// brokers are down, we will *eventually* loop through seeds and
// hopefully have a reachable seed.
var b *broker
if len(cl.brokers) > 0 && int(cl.anyBrokerIdx) < len(cl.brokers) {
cl.anyBrokerIdx %= int32(len(cl.brokers))
b = cl.brokers[cl.anyBrokerIdx]
cl.anyBrokerIdx++
} else {
seeds := cl.loadSeeds()
cl.anySeedIdx %= int32(len(seeds))
b = seeds[cl.anySeedIdx]
cl.anySeedIdx++

// If we have brokers, we ranged past discovered brokers.
// We now reset the anyBrokerIdx to begin ranging through
// discovered brokers again.
if len(cl.brokers) > 0 {
cl.anyBrokerIdx = 0
}
if len(cl.anyBrokerOrd) > 0 {
b = cl.brokers[cl.anyBrokerOrd[0]]
cl.anyBrokerOrd = cl.anyBrokerOrd[1:]
return b
}

seeds := cl.loadSeeds()
cl.anySeedIdx %= int32(len(seeds))
b = seeds[cl.anySeedIdx]
cl.anySeedIdx++

// If we have brokers, we ranged past discovered brokers.
// We now reset the anyBrokerOrd to begin ranging through
// discovered brokers again. If there are still no brokers,
// this reinit will do nothing and we will keep looping seeds.
cl.reinitAnyBrokerOrd()
return b
}

Expand Down Expand Up @@ -946,6 +958,7 @@ func (cl *Client) updateBrokers(brokers []kmsg.MetadataResponseBroker) {
}

cl.brokers = newBrokers
cl.reinitAnyBrokerOrd()
}

// CloseAllowingRebalance allows rebalances, leaves any group, and closes all
Expand Down

0 comments on commit 913b4b0

Please sign in to comment.