diff --git a/cluster.go b/cluster.go index e223c0ab0..87d20c3a3 100644 --- a/cluster.go +++ b/cluster.go @@ -7,6 +7,7 @@ package gocql import ( "context" "errors" + "fmt" "net" "time" ) @@ -91,6 +92,24 @@ type ClusterConfig struct { // Default: 128 for older CQL versions MaxRequestsPerConn int + // Threshold for the number of inflight requests per connection + // after which the connection is considered as heavy loaded + // Default: 512 + HeavyLoadedConnectionThreshold int + + // When a connection is considered as heavy loaded, the driver + // could switch to the least loaded connection for the same node. + // The switch will happen if the other connection is at least + // HeavyLoadedSwitchConnectionPercentage percentage less busy + // (in terms of inflight requests). + // + // For the default value of 20%, if the heavy loaded connection + // has 100 inflight requests, the switch will happen only if the + // least busy connection has less than 80 inflight requests. + // + // Default: 20% + HeavyLoadedSwitchConnectionPercentage int + // Default consistency level. // Default: Quorum Consistency Consistency @@ -275,23 +294,25 @@ type Dialer interface { // the same host, and will not mark the node being down or up from events. func NewCluster(hosts ...string) *ClusterConfig { cfg := &ClusterConfig{ - Hosts: hosts, - CQLVersion: "3.0.0", - Timeout: 11 * time.Second, - ConnectTimeout: 11 * time.Second, - Port: 9042, - NumConns: 2, - Consistency: Quorum, - MaxPreparedStmts: defaultMaxPreparedStmts, - MaxRoutingKeyInfo: 1000, - PageSize: 5000, - DefaultTimestamp: true, - MaxWaitSchemaAgreement: 60 * time.Second, - ReconnectInterval: 60 * time.Second, - ConvictionPolicy: &SimpleConvictionPolicy{}, - ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second}, - SocketKeepalive: 15 * time.Second, - WriteCoalesceWaitTime: 200 * time.Microsecond, + Hosts: hosts, + CQLVersion: "3.0.0", + Timeout: 11 * time.Second, + ConnectTimeout: 11 * time.Second, + Port: 9042, + NumConns: 2, + Consistency: Quorum, + MaxPreparedStmts: defaultMaxPreparedStmts, + MaxRoutingKeyInfo: 1000, + PageSize: 5000, + DefaultTimestamp: true, + MaxWaitSchemaAgreement: 60 * time.Second, + ReconnectInterval: 60 * time.Second, + ConvictionPolicy: &SimpleConvictionPolicy{}, + ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second}, + SocketKeepalive: 15 * time.Second, + WriteCoalesceWaitTime: 200 * time.Microsecond, + HeavyLoadedConnectionThreshold: 512, + HeavyLoadedSwitchConnectionPercentage: 20, } return cfg @@ -329,6 +350,26 @@ func (cfg *ClusterConfig) filterHost(host *HostInfo) bool { return !(cfg.HostFilter == nil || cfg.HostFilter.Accept(host)) } +func (cfg *ClusterConfig) Validate() error { + if len(cfg.Hosts) == 0 { + return ErrNoHosts + } + + if cfg.Authenticator != nil && cfg.AuthProvider != nil { + return errors.New("Can't use both Authenticator and AuthProvider in cluster config.") + } + + if cfg.HeavyLoadedSwitchConnectionPercentage > 100 || cfg.HeavyLoadedSwitchConnectionPercentage < 0 { + return fmt.Errorf("HeavyLoadedSwitchConnectionPercentage must be between 0 and 100, got %d", cfg.HeavyLoadedSwitchConnectionPercentage) + } + + if cfg.HeavyLoadedConnectionThreshold < 0 { + return fmt.Errorf("HeavyLoadedConnectionThreshold must be greater than or equal to 0, got %d", cfg.HeavyLoadedConnectionThreshold) + } + + return nil +} + var ( ErrNoHosts = errors.New("no hosts provided") ErrNoConnectionsStarted = errors.New("no connections were made when creating the session") diff --git a/conn.go b/conn.go index 49f637e08..e392c1ebd 100644 --- a/conn.go +++ b/conn.go @@ -1615,6 +1615,10 @@ func (c *Conn) AvailableStreams() int { return c.streams.Available() } +func (c *Conn) StreamsInUse() int { + return c.streams.InUse() +} + func (c *Conn) UseKeyspace(keyspace string) error { q := &writeQueryFrame{statement: `USE "` + keyspace + `"`} q.params.consistency = c.session.cons diff --git a/scylla.go b/scylla.go index da9d98390..fa5459ed3 100644 --- a/scylla.go +++ b/scylla.go @@ -413,15 +413,14 @@ func (p *scyllaConnPicker) maybeReplaceWithLessBusyConnection(c *Conn) *Conn { return c } alternative := p.leastBusyConn() - if alternative == nil || alternative.AvailableStreams()*120 > c.AvailableStreams()*100 { - return c - } else { + if alternative != nil && alternative.StreamsInUse()*100 >= c.StreamsInUse()*(100-c.session.cfg.HeavyLoadedSwitchConnectionPercentage) { return alternative } + return c } func isHeavyLoaded(c *Conn) bool { - return c.streams.NumStreams/2 > c.AvailableStreams() + return c.StreamsInUse() > c.session.cfg.HeavyLoadedConnectionThreshold } func (p *scyllaConnPicker) leastBusyConn() *Conn { diff --git a/session.go b/session.go index b553d154d..af846557d 100644 --- a/session.go +++ b/session.go @@ -116,16 +116,9 @@ func addrsToHosts(addrs []string, defaultPort int, logger StdLogger) ([]*HostInf // NewSession wraps an existing Node. func NewSession(cfg ClusterConfig) (*Session, error) { - // Check that hosts in the ClusterConfig is not empty - if len(cfg.Hosts) < 1 { - return nil, ErrNoHosts + if err := cfg.Validate(); err != nil { + return nil, fmt.Errorf("gocql: unable to create session: cluster config validation failed: %v", err) } - - // Check that either Authenticator is set or AuthProvider, not both - if cfg.Authenticator != nil && cfg.AuthProvider != nil { - return nil, errors.New("Can't use both Authenticator and AuthProvider in cluster config.") - } - // TODO: we should take a context in here at some point ctx, cancel := context.WithCancel(context.TODO())