From 43504ff9302451acc4783d04d27f030c41becf44 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Fri, 5 Jul 2024 10:51:21 -0400 Subject: [PATCH] Make heavy loaded optimization configurable Scylla Go Driver has a capability to avoid sending requests to an overloaded shard, instead sending the request on a different connection (at the same node). This change makes it possible to customize the parameters used to determine when this behavior would kick in. --- cluster.go | 77 ++++++++++++++++++++++++++++--------- conn.go | 4 ++ internal/streams/streams.go | 5 +++ scylla.go | 7 ++-- scylla_test.go | 71 ++++++++++++++++++++++++++++++++++ session.go | 11 +----- 6 files changed, 144 insertions(+), 31 deletions(-) diff --git a/cluster.go b/cluster.go index 9afd354a7..62f8735d7 100644 --- a/cluster.go +++ b/cluster.go @@ -7,6 +7,7 @@ package gocql import ( "context" "errors" + "fmt" "net" "time" ) @@ -91,6 +92,24 @@ type ClusterConfig struct { // Default: 128 for older CQL versions MaxRequestsPerConn int + // Threshold for the number of inflight requests per connection + // after which the connection is considered as heavy loaded + // Default: 512 + HeavyLoadedConnectionThreshold int + + // When a connection is considered as heavy loaded, the driver + // could switch to the least loaded connection for the same node. + // The switch will happen if the other connection is at least + // HeavyLoadedSwitchConnectionPercentage percentage less busy + // (in terms of inflight requests). + // + // For the default value of 20%, if the heavy loaded connection + // has 100 inflight requests, the switch will happen only if the + // least busy connection has less than 80 inflight requests. + // + // Default: 20% + HeavyLoadedSwitchConnectionPercentage int + // Default consistency level. // Default: Quorum Consistency Consistency @@ -278,24 +297,26 @@ type Dialer interface { // the same host, and will not mark the node being down or up from events. func NewCluster(hosts ...string) *ClusterConfig { cfg := &ClusterConfig{ - Hosts: hosts, - CQLVersion: "3.0.0", - Timeout: 11 * time.Second, - ConnectTimeout: 11 * time.Second, - Port: 9042, - NumConns: 2, - Consistency: Quorum, - MaxPreparedStmts: defaultMaxPreparedStmts, - MaxRoutingKeyInfo: 1000, - PageSize: 5000, - DefaultTimestamp: true, - MaxWaitSchemaAgreement: 60 * time.Second, - ReconnectInterval: 60 * time.Second, - ConvictionPolicy: &SimpleConvictionPolicy{}, - ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second}, - SocketKeepalive: 15 * time.Second, - WriteCoalesceWaitTime: 200 * time.Microsecond, - MetadataSchemaRequestTimeout: 60 * time.Second, + Hosts: hosts, + CQLVersion: "3.0.0", + Timeout: 11 * time.Second, + ConnectTimeout: 11 * time.Second, + Port: 9042, + NumConns: 2, + Consistency: Quorum, + MaxPreparedStmts: defaultMaxPreparedStmts, + MaxRoutingKeyInfo: 1000, + PageSize: 5000, + DefaultTimestamp: true, + MaxWaitSchemaAgreement: 60 * time.Second, + ReconnectInterval: 60 * time.Second, + ConvictionPolicy: &SimpleConvictionPolicy{}, + ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second}, + SocketKeepalive: 15 * time.Second, + WriteCoalesceWaitTime: 200 * time.Microsecond, + MetadataSchemaRequestTimeout: 60 * time.Second, + HeavyLoadedConnectionThreshold: 512, + HeavyLoadedSwitchConnectionPercentage: 20, } return cfg @@ -333,6 +354,26 @@ func (cfg *ClusterConfig) filterHost(host *HostInfo) bool { return !(cfg.HostFilter == nil || cfg.HostFilter.Accept(host)) } +func (cfg *ClusterConfig) Validate() error { + if len(cfg.Hosts) == 0 { + return ErrNoHosts + } + + if cfg.Authenticator != nil && cfg.AuthProvider != nil { + return errors.New("Can't use both Authenticator and AuthProvider in cluster config.") + } + + if cfg.HeavyLoadedSwitchConnectionPercentage > 100 || cfg.HeavyLoadedSwitchConnectionPercentage < 0 { + return fmt.Errorf("HeavyLoadedSwitchConnectionPercentage must be between 0 and 100, got %d", cfg.HeavyLoadedSwitchConnectionPercentage) + } + + if cfg.HeavyLoadedConnectionThreshold < 0 { + return fmt.Errorf("HeavyLoadedConnectionThreshold must be greater than or equal to 0, got %d", cfg.HeavyLoadedConnectionThreshold) + } + + return nil +} + var ( ErrNoHosts = errors.New("no hosts provided") ErrNoConnectionsStarted = errors.New("no connections were made when creating the session") diff --git a/conn.go b/conn.go index b8c776b8b..f293ddf5f 100644 --- a/conn.go +++ b/conn.go @@ -1615,6 +1615,10 @@ func (c *Conn) AvailableStreams() int { return c.streams.Available() } +func (c *Conn) StreamsInUse() int { + return c.streams.InUse() +} + func (c *Conn) UseKeyspace(keyspace string) error { q := &writeQueryFrame{statement: `USE "` + keyspace + `"`} q.params.consistency = c.session.cons diff --git a/internal/streams/streams.go b/internal/streams/streams.go index 7e502c2cc..7d7648ef7 100644 --- a/internal/streams/streams.go +++ b/internal/streams/streams.go @@ -149,3 +149,8 @@ func (s *IDGenerator) Available() int { func (s *IDGenerator) InUse() int { return int(atomic.LoadInt32(&s.inuseStreams)) } + +// SetStreamsInUse sets streams in use counter, to be used for testing only +func SetStreamsInUse(s *IDGenerator, val int32) { + atomic.StoreInt32(&s.inuseStreams, val) +} diff --git a/scylla.go b/scylla.go index 03fc8737b..02d16e593 100644 --- a/scylla.go +++ b/scylla.go @@ -416,15 +416,14 @@ func (p *scyllaConnPicker) maybeReplaceWithLessBusyConnection(c *Conn) *Conn { return c } alternative := p.leastBusyConn() - if alternative == nil || alternative.AvailableStreams()*120 > c.AvailableStreams()*100 { - return c - } else { + if alternative != nil && alternative.StreamsInUse()*100 <= c.StreamsInUse()*(100-c.session.cfg.HeavyLoadedSwitchConnectionPercentage) { return alternative } + return c } func isHeavyLoaded(c *Conn) bool { - return c.streams.NumStreams/2 > c.AvailableStreams() + return c.StreamsInUse() > c.session.cfg.HeavyLoadedConnectionThreshold } func (p *scyllaConnPicker) leastBusyConn() *Conn { diff --git a/scylla_test.go b/scylla_test.go index 4241df350..0c856d58c 100644 --- a/scylla_test.go +++ b/scylla_test.go @@ -99,6 +99,76 @@ func TestScyllaConnPickerHammerPickNilToken(t *testing.T) { wg.Wait() } +func TestScyllaConnPicker(t *testing.T) { + t.Parallel() + + t.Run("maybeReplaceWithLessBusyConnection", func(t *testing.T) { + + cfg := ClusterConfig{ + HeavyLoadedSwitchConnectionPercentage: 30, + HeavyLoadedConnectionThreshold: 100, + } + + tcases := []struct { + name string + streamsInUse [3]int32 + expected int + }{ + { + name: "all connections below threshold", + streamsInUse: [3]int32{99, 98, 97}, + expected: 0, + }, + { + name: "all connections in threshold, but none is switchable", + streamsInUse: [3]int32{110, 109, 108}, + expected: 0, + }, + { + name: "all connections in threshold, one is below threshold", + streamsInUse: [3]int32{110, 109, 70}, + expected: 2, + }, + { + name: "all connections in threshold, one is above threshold, but below switchable percentage", + streamsInUse: [3]int32{210, 130, 209}, + expected: 1, + }, + } + + for _, tcase := range tcases { + t.Run(tcase.name, func(t *testing.T) { + s := scyllaConnPicker{ + nrShards: 4, + msbIgnore: 12, + } + + conns := [3]*Conn{ + mockConn(0), + mockConn(1), + mockConn(2), + } + + for _, conn := range conns { + conn.session.cfg = cfg + s.Put(conn) + } + + for id, inUse := range tcase.streamsInUse { + streams.SetStreamsInUse(conns[id].streams, inUse) + } + + expectedConn := conns[tcase.expected] + + c := s.maybeReplaceWithLessBusyConnection(conns[0]) + if c != expectedConn { + t.Errorf("expected connection from shard %d, got %d", expectedConn.scyllaSupported.shard, c.scyllaSupported.shard) + } + }) + } + }) +} + func TestScyllaConnPickerRemove(t *testing.T) { t.Parallel() @@ -135,6 +205,7 @@ func mockConn(shard int) *Conn { partitioner: "org.apache.cassandra.dht.Murmur3Partitioner", shardingAlgorithm: "biased-token-round-robin", }, + session: &Session{}, } } diff --git a/session.go b/session.go index 3e967aaad..0cf4ed5ed 100644 --- a/session.go +++ b/session.go @@ -119,16 +119,9 @@ func addrsToHosts(addrs []string, defaultPort int, logger StdLogger) ([]*HostInf // NewSession wraps an existing Node. func NewSession(cfg ClusterConfig) (*Session, error) { - // Check that hosts in the ClusterConfig is not empty - if len(cfg.Hosts) < 1 { - return nil, ErrNoHosts + if err := cfg.Validate(); err != nil { + return nil, fmt.Errorf("gocql: unable to create session: cluster config validation failed: %v", err) } - - // Check that either Authenticator is set or AuthProvider, not both - if cfg.Authenticator != nil && cfg.AuthProvider != nil { - return nil, errors.New("Can't use both Authenticator and AuthProvider in cluster config.") - } - // TODO: we should take a context in here at some point ctx, cancel := context.WithCancel(context.TODO())