From 7a8b873d282ac42841096b2da7f8be616eaf3b1c Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Wed, 24 Jul 2024 15:38:49 +0200 Subject: [PATCH 1/2] Fix locking when retriving connection from controlConn --- session.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/session.go b/session.go index bbfc8d862..ea095622b 100644 --- a/session.go +++ b/session.go @@ -233,9 +233,10 @@ func (s *Session) init() error { if err := s.control.connect(hosts); err != nil { return err } - s.control.getConn().conn.mu.Lock() - s.tabletsRoutingV1 = s.control.getConn().conn.isTabletSupported() - s.control.getConn().conn.mu.Unlock() + conn := s.control.getConn().conn + conn.mu.Lock() + s.tabletsRoutingV1 = conn.isTabletSupported() + conn.mu.Unlock() if !s.cfg.DisableInitialHostLookup { var partitioner string From 0861335d5d44d528831058ae51f6029ef1b9bb3f Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Wed, 24 Jul 2024 08:50:03 +0200 Subject: [PATCH 2/2] Add USING TIMEOUT to schema queries --- cluster.go | 38 +++++++++++++++++++++----------------- conn.go | 22 +++++++++++++++++----- control.go | 7 ++++--- host_source.go | 2 +- metadata_cassandra.go | 14 +++++++------- metadata_scylla.go | 18 +++++++++--------- schema_queries_test.go | 20 ++++++++++++++++++++ session.go | 6 ++++++ 8 files changed, 85 insertions(+), 42 deletions(-) create mode 100644 schema_queries_test.go diff --git a/cluster.go b/cluster.go index e223c0ab0..9afd354a7 100644 --- a/cluster.go +++ b/cluster.go @@ -255,6 +255,9 @@ type ClusterConfig struct { // If not specified, defaults to the global gocql.Logger. Logger StdLogger + // The timeout for the requests to the schema tables. (default: 60s) + MetadataSchemaRequestTimeout time.Duration + // internal config for testing disableControlConn bool disableInit bool @@ -275,23 +278,24 @@ type Dialer interface { // the same host, and will not mark the node being down or up from events. func NewCluster(hosts ...string) *ClusterConfig { cfg := &ClusterConfig{ - Hosts: hosts, - CQLVersion: "3.0.0", - Timeout: 11 * time.Second, - ConnectTimeout: 11 * time.Second, - Port: 9042, - NumConns: 2, - Consistency: Quorum, - MaxPreparedStmts: defaultMaxPreparedStmts, - MaxRoutingKeyInfo: 1000, - PageSize: 5000, - DefaultTimestamp: true, - MaxWaitSchemaAgreement: 60 * time.Second, - ReconnectInterval: 60 * time.Second, - ConvictionPolicy: &SimpleConvictionPolicy{}, - ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second}, - SocketKeepalive: 15 * time.Second, - WriteCoalesceWaitTime: 200 * time.Microsecond, + Hosts: hosts, + CQLVersion: "3.0.0", + Timeout: 11 * time.Second, + ConnectTimeout: 11 * time.Second, + Port: 9042, + NumConns: 2, + Consistency: Quorum, + MaxPreparedStmts: defaultMaxPreparedStmts, + MaxRoutingKeyInfo: 1000, + PageSize: 5000, + DefaultTimestamp: true, + MaxWaitSchemaAgreement: 60 * time.Second, + ReconnectInterval: 60 * time.Second, + ConvictionPolicy: &SimpleConvictionPolicy{}, + ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second}, + SocketKeepalive: 15 * time.Second, + WriteCoalesceWaitTime: 200 * time.Microsecond, + MetadataSchemaRequestTimeout: 60 * time.Second, } return cfg diff --git a/conn.go b/conn.go index 49f637e08..b8c776b8b 100644 --- a/conn.go +++ b/conn.go @@ -1769,9 +1769,13 @@ func (c *Conn) query(ctx context.Context, statement string, values ...interface{ } func (c *Conn) querySystemPeers(ctx context.Context, version cassVersion) *Iter { - const ( - peerSchema = "SELECT * FROM system.peers" - peerV2Schemas = "SELECT * FROM system.peers_v2" + usingClause := "" + if c.session.control != nil { + usingClause = c.session.usingTimeoutClause + } + var ( + peerSchema = "SELECT * FROM system.peers" + usingClause + peerV2Schemas = "SELECT * FROM system.peers_v2" + usingClause ) c.mu.Lock() @@ -1804,11 +1808,19 @@ func (c *Conn) querySystemPeers(ctx context.Context, version cassVersion) *Iter } func (c *Conn) querySystemLocal(ctx context.Context) *Iter { - return c.query(ctx, "SELECT * FROM system.local WHERE key='local'") + usingClause := "" + if c.session.control != nil { + usingClause = c.session.usingTimeoutClause + } + return c.query(ctx, "SELECT * FROM system.local WHERE key='local'"+usingClause) } func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) { - const localSchemas = "SELECT schema_version FROM system.local WHERE key='local'" + usingClause := "" + if c.session.control != nil { + usingClause = c.session.usingTimeoutClause + } + var localSchemas = "SELECT schema_version FROM system.local WHERE key='local'" + usingClause var versions map[string]struct{} var schemaVersion string diff --git a/control.go b/control.go index 5a21baf26..bc610e4ac 100644 --- a/control.go +++ b/control.go @@ -50,10 +50,11 @@ type controlConn struct { } func createControlConn(session *Session) *controlConn { + control := &controlConn{ - session: session, - quit: make(chan struct{}), - retry: &SimpleRetryPolicy{NumRetries: 3}, + session: session, + quit: make(chan struct{}), + retry: &SimpleRetryPolicy{NumRetries: 3}, } control.conn.Store((*connHost)(nil)) diff --git a/host_source.go b/host_source.go index b2d6e199d..23c3dfb2c 100644 --- a/host_source.go +++ b/host_source.go @@ -642,7 +642,7 @@ type ringDescriber struct { // Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces func checkSystemSchema(control *controlConn) (bool, error) { - iter := control.query("SELECT * FROM system_schema.keyspaces") + iter := control.query("SELECT * FROM system_schema.keyspaces" + control.session.usingTimeoutClause) if err := iter.err; err != nil { if errf, ok := err.(*errorFrame); ok { if errf.code == ErrCodeSyntax { diff --git a/metadata_cassandra.go b/metadata_cassandra.go index 4886fdab5..96f794bc8 100644 --- a/metadata_cassandra.go +++ b/metadata_cassandra.go @@ -557,7 +557,7 @@ func getKeyspaceMetadata(session *Session, keyspaceName string) (*KeyspaceMetada var replication map[string]string - iter := session.control.query(stmt, keyspaceName) + iter := session.control.query(stmt+session.usingTimeoutClause, keyspaceName) if iter.NumRows() == 0 { return nil, ErrKeyspaceDoesNotExist } @@ -583,7 +583,7 @@ func getKeyspaceMetadata(session *Session, keyspaceName string) (*KeyspaceMetada var strategyOptionsJSON []byte - iter := session.control.query(stmt, keyspaceName) + iter := session.control.query(stmt+session.usingTimeoutClause, keyspaceName) if iter.NumRows() == 0 { return nil, ErrKeyspaceDoesNotExist } @@ -631,7 +631,7 @@ func getTableMetadata(session *Session, keyspaceName string) ([]TableMetadata, e view_name FROM system_schema.views WHERE keyspace_name = ?` - iter = session.control.query(stmt, keyspaceName) + iter = session.control.query(stmt+session.usingTimeoutClause, keyspaceName) return iter } @@ -693,7 +693,7 @@ func getTableMetadata(session *Session, keyspaceName string) ([]TableMetadata, e } } - iter = session.control.query(stmt, keyspaceName) + iter = session.control.query(stmt+session.usingTimeoutClause, keyspaceName) tables := []TableMetadata{} table := TableMetadata{Keyspace: keyspaceName} @@ -756,7 +756,7 @@ func (s *Session) scanColumnMetadataV1(keyspace string) ([]ColumnMetadata, error var columns []ColumnMetadata - rows := s.control.query(stmt, keyspace).Scanner() + rows := s.control.query(stmt+s.usingTimeoutClause, keyspace).Scanner() for rows.Next() { var ( column = ColumnMetadata{Keyspace: keyspace} @@ -817,7 +817,7 @@ func (s *Session) scanColumnMetadataV2(keyspace string) ([]ColumnMetadata, error var columns []ColumnMetadata - rows := s.control.query(stmt, keyspace).Scanner() + rows := s.control.query(stmt+s.usingTimeoutClause, keyspace).Scanner() for rows.Next() { var ( column = ColumnMetadata{Keyspace: keyspace} @@ -875,7 +875,7 @@ func (s *Session) scanColumnMetadataSystem(keyspace string) ([]ColumnMetadata, e var columns []ColumnMetadata - rows := s.control.query(stmt, keyspace).Scanner() + rows := s.control.query(stmt+s.usingTimeoutClause, keyspace).Scanner() for rows.Next() { column := ColumnMetadata{Keyspace: keyspace} diff --git a/metadata_scylla.go b/metadata_scylla.go index 7996a254e..779274d50 100644 --- a/metadata_scylla.go +++ b/metadata_scylla.go @@ -513,7 +513,7 @@ func getKeyspaceMetadata(session *Session, keyspaceName string) (*KeyspaceMetada var replication map[string]string - iter := session.control.query(stmt, keyspaceName) + iter := session.control.query(stmt+session.usingTimeoutClause, keyspaceName) if iter.NumRows() == 0 { return nil, ErrKeyspaceDoesNotExist } @@ -541,7 +541,7 @@ func getTableMetadata(session *Session, keyspaceName string) ([]TableMetadata, e } stmt := `SELECT * FROM system_schema.tables WHERE keyspace_name = ?` - iter := session.control.query(stmt, keyspaceName) + iter := session.control.query(stmt+session.usingTimeoutClause, keyspaceName) var tables []TableMetadata table := TableMetadata{Keyspace: keyspaceName} @@ -573,7 +573,7 @@ func getTableMetadata(session *Session, keyspaceName string) ([]TableMetadata, e stmt = `SELECT * FROM system_schema.scylla_tables WHERE keyspace_name = ? AND table_name = ?` for i, t := range tables { - iter := session.control.query(stmt, keyspaceName, t.Name) + iter := session.control.query(stmt+session.usingTimeoutClause, keyspaceName, t.Name) table := TableMetadata{} if iter.MapScan(map[string]interface{}{ @@ -601,7 +601,7 @@ func getColumnMetadata(session *Session, keyspaceName string) ([]ColumnMetadata, var columns []ColumnMetadata - iter := session.control.query(stmt, keyspaceName) + iter := session.control.query(stmt+session.usingTimeoutClause, keyspaceName) column := ColumnMetadata{Keyspace: keyspaceName} for iter.MapScan(map[string]interface{}{ @@ -630,7 +630,7 @@ func getTypeMetadata(session *Session, keyspaceName string) ([]TypeMetadata, err } stmt := `SELECT * FROM system_schema.types WHERE keyspace_name = ?` - iter := session.control.query(stmt, keyspaceName) + iter := session.control.query(stmt+session.usingTimeoutClause, keyspaceName) var types []TypeMetadata tm := TypeMetadata{Keyspace: keyspaceName} @@ -661,7 +661,7 @@ func getFunctionsMetadata(session *Session, keyspaceName string) ([]FunctionMeta var functions []FunctionMetadata function := FunctionMetadata{Keyspace: keyspaceName} - iter := session.control.query(stmt, keyspaceName) + iter := session.control.query(stmt+session.usingTimeoutClause, keyspaceName) for iter.MapScan(map[string]interface{}{ "function_name": &function.Name, "argument_types": &function.ArgumentTypes, @@ -693,7 +693,7 @@ func getAggregatesMetadata(session *Session, keyspaceName string) ([]AggregateMe var aggregates []AggregateMetadata aggregate := AggregateMetadata{Keyspace: keyspaceName} - iter := session.control.query(stmt, keyspaceName) + iter := session.control.query(stmt+session.usingTimeoutClause, keyspaceName) for iter.MapScan(map[string]interface{}{ "aggregate_name": &aggregate.Name, "argument_types": &aggregate.ArgumentTypes, @@ -725,7 +725,7 @@ func getIndexMetadata(session *Session, keyspaceName string) ([]IndexMetadata, e var indexes []IndexMetadata index := IndexMetadata{} - iter := session.control.query(stmt, keyspaceName) + iter := session.control.query(stmt+session.usingTimeoutClause, keyspaceName) for iter.MapScan(map[string]interface{}{ "index_name": &index.Name, "keyspace_name": &index.KeyspaceName, @@ -752,7 +752,7 @@ func getViewMetadata(session *Session, keyspaceName string) ([]ViewMetadata, err stmt := `SELECT * FROM system_schema.views WHERE keyspace_name = ?` - iter := session.control.query(stmt, keyspaceName) + iter := session.control.query(stmt+session.usingTimeoutClause, keyspaceName) var views []ViewMetadata view := ViewMetadata{KeyspaceName: keyspaceName} diff --git a/schema_queries_test.go b/schema_queries_test.go new file mode 100644 index 000000000..2b0c88171 --- /dev/null +++ b/schema_queries_test.go @@ -0,0 +1,20 @@ +//go:build integration && scylla +// +build integration,scylla + +package gocql + +import ( + "testing" +) + +func TestSchemaQueries(t *testing.T) { + cluster := createCluster() + + fallback := RoundRobinHostPolicy() + cluster.PoolConfig.HostSelectionPolicy = TokenAwareHostPolicy(fallback) + + session := createSessionFromCluster(cluster, t) + defer session.Close() + + assertTrue(t, "keyspace present in schemaDescriber", session.schemaDescriber.cache["gocql_test"].Name == "gocql_test") +} diff --git a/session.go b/session.go index ea095622b..3e967aaad 100644 --- a/session.go +++ b/session.go @@ -12,6 +12,7 @@ import ( "fmt" "io" "net" + "strconv" "strings" "sync" "sync/atomic" @@ -85,6 +86,8 @@ type Session struct { logger StdLogger tabletsRoutingV1 bool + + usingTimeoutClause string } var queryPool = &sync.Pool{ @@ -236,6 +239,9 @@ func (s *Session) init() error { conn := s.control.getConn().conn conn.mu.Lock() s.tabletsRoutingV1 = conn.isTabletSupported() + if s.cfg.MetadataSchemaRequestTimeout > time.Duration(0) && isScyllaConn(conn) { + s.usingTimeoutClause = " USING TIMEOUT " + strconv.FormatInt(int64(s.cfg.MetadataSchemaRequestTimeout.Milliseconds()), 10) + "ms" + } conn.mu.Unlock() if !s.cfg.DisableInitialHostLookup {