Skip to content

Commit

Permalink
Merge pull request #222 from sylwiaszunejko/decouple_schema_fetch
Browse files Browse the repository at this point in the history
Schema queries with configurable server-side timeouts
  • Loading branch information
sylwiaszunejko authored Jul 31, 2024
2 parents 8a812b2 + 0861335 commit c33cb52
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 45 deletions.
38 changes: 21 additions & 17 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,9 @@ type ClusterConfig struct {
// If not specified, defaults to the global gocql.Logger.
Logger StdLogger

// The timeout for the requests to the schema tables. (default: 60s)
MetadataSchemaRequestTimeout time.Duration

// internal config for testing
disableControlConn bool
disableInit bool
Expand All @@ -275,23 +278,24 @@ type Dialer interface {
// the same host, and will not mark the node being down or up from events.
func NewCluster(hosts ...string) *ClusterConfig {
cfg := &ClusterConfig{
Hosts: hosts,
CQLVersion: "3.0.0",
Timeout: 11 * time.Second,
ConnectTimeout: 11 * time.Second,
Port: 9042,
NumConns: 2,
Consistency: Quorum,
MaxPreparedStmts: defaultMaxPreparedStmts,
MaxRoutingKeyInfo: 1000,
PageSize: 5000,
DefaultTimestamp: true,
MaxWaitSchemaAgreement: 60 * time.Second,
ReconnectInterval: 60 * time.Second,
ConvictionPolicy: &SimpleConvictionPolicy{},
ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second},
SocketKeepalive: 15 * time.Second,
WriteCoalesceWaitTime: 200 * time.Microsecond,
Hosts: hosts,
CQLVersion: "3.0.0",
Timeout: 11 * time.Second,
ConnectTimeout: 11 * time.Second,
Port: 9042,
NumConns: 2,
Consistency: Quorum,
MaxPreparedStmts: defaultMaxPreparedStmts,
MaxRoutingKeyInfo: 1000,
PageSize: 5000,
DefaultTimestamp: true,
MaxWaitSchemaAgreement: 60 * time.Second,
ReconnectInterval: 60 * time.Second,
ConvictionPolicy: &SimpleConvictionPolicy{},
ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second},
SocketKeepalive: 15 * time.Second,
WriteCoalesceWaitTime: 200 * time.Microsecond,
MetadataSchemaRequestTimeout: 60 * time.Second,
}

return cfg
Expand Down
22 changes: 17 additions & 5 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1769,9 +1769,13 @@ func (c *Conn) query(ctx context.Context, statement string, values ...interface{
}

func (c *Conn) querySystemPeers(ctx context.Context, version cassVersion) *Iter {
const (
peerSchema = "SELECT * FROM system.peers"
peerV2Schemas = "SELECT * FROM system.peers_v2"
usingClause := ""
if c.session.control != nil {
usingClause = c.session.usingTimeoutClause
}
var (
peerSchema = "SELECT * FROM system.peers" + usingClause
peerV2Schemas = "SELECT * FROM system.peers_v2" + usingClause
)

c.mu.Lock()
Expand Down Expand Up @@ -1804,11 +1808,19 @@ func (c *Conn) querySystemPeers(ctx context.Context, version cassVersion) *Iter
}

func (c *Conn) querySystemLocal(ctx context.Context) *Iter {
return c.query(ctx, "SELECT * FROM system.local WHERE key='local'")
usingClause := ""
if c.session.control != nil {
usingClause = c.session.usingTimeoutClause
}
return c.query(ctx, "SELECT * FROM system.local WHERE key='local'"+usingClause)
}

func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) {
const localSchemas = "SELECT schema_version FROM system.local WHERE key='local'"
usingClause := ""
if c.session.control != nil {
usingClause = c.session.usingTimeoutClause
}
var localSchemas = "SELECT schema_version FROM system.local WHERE key='local'" + usingClause

var versions map[string]struct{}
var schemaVersion string
Expand Down
7 changes: 4 additions & 3 deletions control.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ type controlConn struct {
}

func createControlConn(session *Session) *controlConn {

control := &controlConn{
session: session,
quit: make(chan struct{}),
retry: &SimpleRetryPolicy{NumRetries: 3},
session: session,
quit: make(chan struct{}),
retry: &SimpleRetryPolicy{NumRetries: 3},
}

control.conn.Store((*connHost)(nil))
Expand Down
2 changes: 1 addition & 1 deletion host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ type ringDescriber struct {

// Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces
func checkSystemSchema(control *controlConn) (bool, error) {
iter := control.query("SELECT * FROM system_schema.keyspaces")
iter := control.query("SELECT * FROM system_schema.keyspaces" + control.session.usingTimeoutClause)
if err := iter.err; err != nil {
if errf, ok := err.(*errorFrame); ok {
if errf.code == ErrCodeSyntax {
Expand Down
14 changes: 7 additions & 7 deletions metadata_cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ func getKeyspaceMetadata(session *Session, keyspaceName string) (*KeyspaceMetada

var replication map[string]string

iter := session.control.query(stmt, keyspaceName)
iter := session.control.query(stmt+session.usingTimeoutClause, keyspaceName)
if iter.NumRows() == 0 {
return nil, ErrKeyspaceDoesNotExist
}
Expand All @@ -583,7 +583,7 @@ func getKeyspaceMetadata(session *Session, keyspaceName string) (*KeyspaceMetada

var strategyOptionsJSON []byte

iter := session.control.query(stmt, keyspaceName)
iter := session.control.query(stmt+session.usingTimeoutClause, keyspaceName)
if iter.NumRows() == 0 {
return nil, ErrKeyspaceDoesNotExist
}
Expand Down Expand Up @@ -631,7 +631,7 @@ func getTableMetadata(session *Session, keyspaceName string) ([]TableMetadata, e
view_name
FROM system_schema.views
WHERE keyspace_name = ?`
iter = session.control.query(stmt, keyspaceName)
iter = session.control.query(stmt+session.usingTimeoutClause, keyspaceName)
return iter
}

Expand Down Expand Up @@ -693,7 +693,7 @@ func getTableMetadata(session *Session, keyspaceName string) ([]TableMetadata, e
}
}

iter = session.control.query(stmt, keyspaceName)
iter = session.control.query(stmt+session.usingTimeoutClause, keyspaceName)

tables := []TableMetadata{}
table := TableMetadata{Keyspace: keyspaceName}
Expand Down Expand Up @@ -756,7 +756,7 @@ func (s *Session) scanColumnMetadataV1(keyspace string) ([]ColumnMetadata, error

var columns []ColumnMetadata

rows := s.control.query(stmt, keyspace).Scanner()
rows := s.control.query(stmt+s.usingTimeoutClause, keyspace).Scanner()
for rows.Next() {
var (
column = ColumnMetadata{Keyspace: keyspace}
Expand Down Expand Up @@ -817,7 +817,7 @@ func (s *Session) scanColumnMetadataV2(keyspace string) ([]ColumnMetadata, error

var columns []ColumnMetadata

rows := s.control.query(stmt, keyspace).Scanner()
rows := s.control.query(stmt+s.usingTimeoutClause, keyspace).Scanner()
for rows.Next() {
var (
column = ColumnMetadata{Keyspace: keyspace}
Expand Down Expand Up @@ -875,7 +875,7 @@ func (s *Session) scanColumnMetadataSystem(keyspace string) ([]ColumnMetadata, e

var columns []ColumnMetadata

rows := s.control.query(stmt, keyspace).Scanner()
rows := s.control.query(stmt+s.usingTimeoutClause, keyspace).Scanner()
for rows.Next() {
column := ColumnMetadata{Keyspace: keyspace}

Expand Down
18 changes: 9 additions & 9 deletions metadata_scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ func getKeyspaceMetadata(session *Session, keyspaceName string) (*KeyspaceMetada

var replication map[string]string

iter := session.control.query(stmt, keyspaceName)
iter := session.control.query(stmt+session.usingTimeoutClause, keyspaceName)
if iter.NumRows() == 0 {
return nil, ErrKeyspaceDoesNotExist
}
Expand Down Expand Up @@ -550,7 +550,7 @@ func getTableMetadata(session *Session, keyspaceName string) ([]TableMetadata, e
}

stmt := `SELECT * FROM system_schema.tables WHERE keyspace_name = ?`
iter := session.control.query(stmt, keyspaceName)
iter := session.control.query(stmt+session.usingTimeoutClause, keyspaceName)

var tables []TableMetadata
table := TableMetadata{Keyspace: keyspaceName}
Expand Down Expand Up @@ -582,7 +582,7 @@ func getTableMetadata(session *Session, keyspaceName string) ([]TableMetadata, e

stmt = `SELECT * FROM system_schema.scylla_tables WHERE keyspace_name = ? AND table_name = ?`
for i, t := range tables {
iter := session.control.query(stmt, keyspaceName, t.Name)
iter := session.control.query(stmt+session.usingTimeoutClause, keyspaceName, t.Name)

table := TableMetadata{}
if iter.MapScan(map[string]interface{}{
Expand Down Expand Up @@ -610,7 +610,7 @@ func getColumnMetadata(session *Session, keyspaceName string) ([]ColumnMetadata,

var columns []ColumnMetadata

iter := session.control.query(stmt, keyspaceName)
iter := session.control.query(stmt+session.usingTimeoutClause, keyspaceName)
column := ColumnMetadata{Keyspace: keyspaceName}

for iter.MapScan(map[string]interface{}{
Expand Down Expand Up @@ -639,7 +639,7 @@ func getTypeMetadata(session *Session, keyspaceName string) ([]TypeMetadata, err
}

stmt := `SELECT * FROM system_schema.types WHERE keyspace_name = ?`
iter := session.control.query(stmt, keyspaceName)
iter := session.control.query(stmt+session.usingTimeoutClause, keyspaceName)

var types []TypeMetadata
tm := TypeMetadata{Keyspace: keyspaceName}
Expand Down Expand Up @@ -670,7 +670,7 @@ func getFunctionsMetadata(session *Session, keyspaceName string) ([]FunctionMeta
var functions []FunctionMetadata
function := FunctionMetadata{Keyspace: keyspaceName}

iter := session.control.query(stmt, keyspaceName)
iter := session.control.query(stmt+session.usingTimeoutClause, keyspaceName)
for iter.MapScan(map[string]interface{}{
"function_name": &function.Name,
"argument_types": &function.ArgumentTypes,
Expand Down Expand Up @@ -702,7 +702,7 @@ func getAggregatesMetadata(session *Session, keyspaceName string) ([]AggregateMe
var aggregates []AggregateMetadata
aggregate := AggregateMetadata{Keyspace: keyspaceName}

iter := session.control.query(stmt, keyspaceName)
iter := session.control.query(stmt+session.usingTimeoutClause, keyspaceName)
for iter.MapScan(map[string]interface{}{
"aggregate_name": &aggregate.Name,
"argument_types": &aggregate.ArgumentTypes,
Expand Down Expand Up @@ -734,7 +734,7 @@ func getIndexMetadata(session *Session, keyspaceName string) ([]IndexMetadata, e
var indexes []IndexMetadata
index := IndexMetadata{}

iter := session.control.query(stmt, keyspaceName)
iter := session.control.query(stmt+session.usingTimeoutClause, keyspaceName)
for iter.MapScan(map[string]interface{}{
"index_name": &index.Name,
"keyspace_name": &index.KeyspaceName,
Expand Down Expand Up @@ -790,7 +790,7 @@ func getViewMetadata(session *Session, keyspaceName string) ([]ViewMetadata, err

stmt := `SELECT * FROM system_schema.views WHERE keyspace_name = ?`

iter := session.control.query(stmt, keyspaceName)
iter := session.control.query(stmt+session.usingTimeoutClause, keyspaceName)

var views []ViewMetadata
view := ViewMetadata{KeyspaceName: keyspaceName}
Expand Down
20 changes: 20 additions & 0 deletions schema_queries_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
//go:build integration && scylla
// +build integration,scylla

package gocql

import (
"testing"
)

func TestSchemaQueries(t *testing.T) {
cluster := createCluster()

fallback := RoundRobinHostPolicy()
cluster.PoolConfig.HostSelectionPolicy = TokenAwareHostPolicy(fallback)

session := createSessionFromCluster(cluster, t)
defer session.Close()

assertTrue(t, "keyspace present in schemaDescriber", session.schemaDescriber.cache["gocql_test"].Name == "gocql_test")
}
13 changes: 10 additions & 3 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"fmt"
"io"
"net"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -85,6 +86,8 @@ type Session struct {
logger StdLogger

tabletsRoutingV1 bool

usingTimeoutClause string
}

var queryPool = &sync.Pool{
Expand Down Expand Up @@ -233,9 +236,13 @@ func (s *Session) init() error {
if err := s.control.connect(hosts); err != nil {
return err
}
s.control.getConn().conn.mu.Lock()
s.tabletsRoutingV1 = s.control.getConn().conn.isTabletSupported()
s.control.getConn().conn.mu.Unlock()
conn := s.control.getConn().conn
conn.mu.Lock()
s.tabletsRoutingV1 = conn.isTabletSupported()
if s.cfg.MetadataSchemaRequestTimeout > time.Duration(0) && isScyllaConn(conn) {
s.usingTimeoutClause = " USING TIMEOUT " + strconv.FormatInt(int64(s.cfg.MetadataSchemaRequestTimeout.Milliseconds()), 10) + "ms"
}
conn.mu.Unlock()

if !s.cfg.DisableInitialHostLookup {
var partitioner string
Expand Down

0 comments on commit c33cb52

Please sign in to comment.