From 26c61efc92aa03c09d472befd06ed8ed86601878 Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Wed, 17 Apr 2024 12:13:32 +0200 Subject: [PATCH] Add LOAD_BALANCING_POLICY_SLOW_AVOIDANCE funtionality The java driver has the feature to automatically avoid slow replicas by doing simple heuristics. This is one of the key feature to have a stable latency. This commit adds additional field in tokenAwareHostPolicy to control if the feature is enabled and what is the maximum in flight threshold. If feature is enabled driver sorts the replicas to first try those with less than specified maximum in flight connections. Fixes: #154 --- connectionpool.go | 20 ++++++++++++++++++++ connpicker.go | 10 ++++++++++ host_source.go | 5 +++++ internal/streams/streams.go | 4 ++++ policies.go | 30 ++++++++++++++++++++++++++++++ scylla.go | 10 ++++++++++ 6 files changed, 79 insertions(+) diff --git a/connectionpool.go b/connectionpool.go index 4e61f3062..c34448326 100644 --- a/connectionpool.go +++ b/connectionpool.go @@ -211,6 +211,17 @@ func (p *policyConnPool) SetHosts(hosts []*HostInfo) { } } +func (p *policyConnPool) InFlight() int { + p.mu.RLock() + count := 0 + for _, pool := range p.hostConnPools { + count += pool.InFlight() + } + p.mu.RUnlock() + + return count +} + func (p *policyConnPool) Size() int { p.mu.RLock() count := 0 @@ -348,6 +359,15 @@ func (pool *hostConnPool) Size() int { return size } +// Size returns the number of connections currently active in the pool +func (pool *hostConnPool) InFlight() int { + pool.mu.RLock() + defer pool.mu.RUnlock() + + size := pool.connPicker.InFlight() + return size +} + // Close the connection pool func (pool *hostConnPool) Close() { pool.mu.Lock() diff --git a/connpicker.go b/connpicker.go index af43d35c0..c6c65f7d3 100644 --- a/connpicker.go +++ b/connpicker.go @@ -10,6 +10,7 @@ type ConnPicker interface { Pick(token, string, string) *Conn Put(*Conn) Remove(conn *Conn) + InFlight() int Size() (int, int) Close() @@ -60,6 +61,11 @@ func (p *defaultConnPicker) Close() { } } +func (p *defaultConnPicker) InFlight() int { + size := len(p.conns) + return size +} + func (p *defaultConnPicker) Size() (int, int) { size := len(p.conns) return size, p.size - size @@ -114,6 +120,10 @@ func (nopConnPicker) Put(*Conn) { func (nopConnPicker) Remove(conn *Conn) { } +func (nopConnPicker) InFlight() int { + return 0 +} + func (nopConnPicker) Size() (int, int) { // Return 1 to make hostConnPool to try to establish a connection. // When first connection is established hostConnPool replaces nopConnPicker diff --git a/host_source.go b/host_source.go index 31132e38f..8dcf371ae 100644 --- a/host_source.go +++ b/host_source.go @@ -409,6 +409,11 @@ func (h *HostInfo) IsUp() bool { return h != nil && h.State() == NodeUp } +func (h *HostInfo) IsBusy(s *Session) bool { + pool, ok := s.pool.getPool(h) + return ok && h != nil && pool.InFlight() >= MAX_IN_FLIGHT_THRESHOLD +} + func (h *HostInfo) HostnameAndPort() string { h.mu.Lock() defer h.mu.Unlock() diff --git a/internal/streams/streams.go b/internal/streams/streams.go index 05bcd7d6a..7e502c2cc 100644 --- a/internal/streams/streams.go +++ b/internal/streams/streams.go @@ -145,3 +145,7 @@ func (s *IDGenerator) Clear(stream int) (inuse bool) { func (s *IDGenerator) Available() int { return s.NumStreams - int(atomic.LoadInt32(&s.inuseStreams)) - 1 } + +func (s *IDGenerator) InUse() int { + return int(atomic.LoadInt32(&s.inuseStreams)) +} diff --git a/policies.go b/policies.go index 70ea00164..98867d2fa 100644 --- a/policies.go +++ b/policies.go @@ -391,6 +391,17 @@ func ShuffleReplicas() func(*tokenAwareHostPolicy) { } } +// AvoidSlowReplicas enabled avoiding slow replicas +// +// TokenAwareHostPolicy normally does not check how busy replica is, with avoidSlowReplicas enabled it avoids replicas +// if they have equal or more than MAX_IN_FLIGHT_THRESHOLD requests in flight +func AvoidSlowReplicas(max_in_flight_threshold int) func(policy *tokenAwareHostPolicy) { + return func(t *tokenAwareHostPolicy) { + t.avoidSlowReplicas = true + MAX_IN_FLIGHT_THRESHOLD = max_in_flight_threshold + } +} + // NonLocalReplicasFallback enables fallback to replicas that are not considered local. // // TokenAwareHostPolicy used with DCAwareHostPolicy fallback first selects replicas by partition key in local DC, then @@ -424,6 +435,8 @@ type clusterMeta struct { tokenRing *tokenRing } +var MAX_IN_FLIGHT_THRESHOLD int = 10 + type tokenAwareHostPolicy struct { fallback HostSelectionPolicy getKeyspaceMetadata func(keyspace string) (*KeyspaceMetadata, error) @@ -443,6 +456,8 @@ type tokenAwareHostPolicy struct { // Experimental, this interface and use may change tablets cowTabletList + + avoidSlowReplicas bool } func (t *tokenAwareHostPolicy) Init(s *Session) { @@ -687,6 +702,21 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost { } } + if s := qry.GetSession(); s != nil && t.avoidSlowReplicas { + healthyReplicas := make([]*HostInfo, 0, len(replicas)) + unhealthyReplicas := make([]*HostInfo, 0, len(replicas)) + + for _, h := range replicas { + if h.IsBusy(s) { + unhealthyReplicas = append(unhealthyReplicas, h) + } else { + healthyReplicas = append(healthyReplicas, h) + } + } + + replicas = append(healthyReplicas, unhealthyReplicas...) + } + var ( fallbackIter NextHost i, j, k int diff --git a/scylla.go b/scylla.go index 7dece242a..49d14d55a 100644 --- a/scylla.go +++ b/scylla.go @@ -544,6 +544,16 @@ func (p *scyllaConnPicker) Remove(conn *Conn) { } } +func (p *scyllaConnPicker) InFlight() int { + result := 0 + for _, conn := range p.conns { + if conn != nil { + result = result + (conn.streams.InUse()) + } + } + return result +} + func (p *scyllaConnPicker) Size() (int, int) { return p.nrConns, p.nrShards - p.nrConns }