From 73c7ec8eb8ee6beaf7f9ee5a4474debb48625d03 Mon Sep 17 00:00:00 2001 From: Jackson Fleming Date: Tue, 6 Jun 2023 16:52:24 +1000 Subject: [PATCH 01/10] Add InstaclustrPasswordAuthenticator to defaultApprovedAuthenticators --- conn.go | 1 + 1 file changed, 1 insertion(+) diff --git a/conn.go b/conn.go index 9fb40c488..2b535ad81 100644 --- a/conn.go +++ b/conn.go @@ -32,6 +32,7 @@ var ( "com.ericsson.bss.cassandra.ecaudit.auth.AuditPasswordAuthenticator", "com.amazon.helenus.auth.HelenusAuthenticator", "com.ericsson.bss.cassandra.ecaudit.auth.AuditAuthenticator", + "com.instaclustr.cassandra.auth.InstaclustrPasswordAuthenticator", } ) From e19ee32ba06339a0bf34481a8084a44a0b1d6727 Mon Sep 17 00:00:00 2001 From: Jackson Fleming Date: Tue, 6 Jun 2023 16:55:29 +1000 Subject: [PATCH 02/10] Add unit test for conn.go update --- conn_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/conn_test.go b/conn_test.go index 916a3d1e0..bd8737f4e 100644 --- a/conn_test.go +++ b/conn_test.go @@ -39,6 +39,7 @@ func TestApprove(t *testing.T) { approve("com.datastax.bdp.cassandra.auth.DseAuthenticator", []string{}): true, approve("io.aiven.cassandra.auth.AivenAuthenticator", []string{}): true, approve("com.amazon.helenus.auth.HelenusAuthenticator", []string{}): true, + approve("com.instaclustr.cassandra.auth.InstaclustrPasswordAuthenticator", []string{}): true, approve("com.apache.cassandra.auth.FakeAuthenticator", []string{}): false, approve("com.apache.cassandra.auth.FakeAuthenticator", nil): false, approve("com.apache.cassandra.auth.FakeAuthenticator", []string{"com.apache.cassandra.auth.FakeAuthenticator"}): true, From 998c1d28c8a50ec77c0ab0828df223ad497a4183 Mon Sep 17 00:00:00 2001 From: Jackson Fleming Date: Tue, 6 Jun 2023 16:59:59 +1000 Subject: [PATCH 03/10] Add missing test for another authenticatr, add my name to AUTHORS --- AUTHORS | 1 + conn_test.go | 1 + 2 files changed, 2 insertions(+) diff --git a/AUTHORS b/AUTHORS index 42591736a..5f2d36134 100644 --- a/AUTHORS +++ b/AUTHORS @@ -139,3 +139,4 @@ João Reis Lauro Ramos Venancio Dmitry Kropachev Oliver Boyle +Jackson Fleming diff --git a/conn_test.go b/conn_test.go index bd8737f4e..e67ea4056 100644 --- a/conn_test.go +++ b/conn_test.go @@ -39,6 +39,7 @@ func TestApprove(t *testing.T) { approve("com.datastax.bdp.cassandra.auth.DseAuthenticator", []string{}): true, approve("io.aiven.cassandra.auth.AivenAuthenticator", []string{}): true, approve("com.amazon.helenus.auth.HelenusAuthenticator", []string{}): true, + approve("com.ericsson.bss.cassandra.ecaudit.auth.AuditAuthenticator", []string{}): true, approve("com.instaclustr.cassandra.auth.InstaclustrPasswordAuthenticator", []string{}): true, approve("com.apache.cassandra.auth.FakeAuthenticator", []string{}): false, approve("com.apache.cassandra.auth.FakeAuthenticator", nil): false, From b9737ddcadbbe8092b27df4f6ab2e6e9f3cf4c72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Thu, 22 Jun 2023 15:59:19 +0200 Subject: [PATCH 04/10] Reresolve DNS as fallback when all hosts are unreachable If all nodes in the cluster change their IPs at one time, driver used to no longer be able to ever contact the cluster; the only solution was to restart the driver. A fallback is added to the control connection `reconnect()` logic so that when no known host is reachable, all hostnames provided in ClusterConfig (initial contact points) are reresolved and control connection is attempted to be opened to any of them. If this succeeds, a metadata fetch is issued normally and the whole cluster is discovered with its new IPs. For the cluster to correctly learn new IPs in case that nodes are accessible indirectly (e.g. through a proxy), that is, by translated address and not `rpc_address` or `broadcast_address`, the code introduced in #1682 was extended to remove and re-add a host also when its translated address changed (even when its internal address stays the same). As a bonus, a misnamed variable `hostport` is renamed to a suitable `hostaddr`. --- control.go | 45 +++++++++++++++++++++++++++++++++++---------- host_source.go | 2 +- session.go | 4 ++-- 3 files changed, 38 insertions(+), 13 deletions(-) diff --git a/control.go b/control.go index bda84ebce..47ec7abaf 100644 --- a/control.go +++ b/control.go @@ -282,7 +282,7 @@ func (c *controlConn) setupConn(conn *Conn) error { } if err := c.registerEvents(conn); err != nil { - return err + return fmt.Errorf("register events: %v", err) } ch := &connHost{ @@ -347,6 +347,20 @@ func (c *controlConn) reconnect() { } defer atomic.StoreInt32(&c.reconnecting, 0) + conn, err := c.attemptReconnect() + + if conn == nil { + c.session.logger.Printf("gocql: unable to reconnect control connection: %v\n", err) + return + } + + err = c.session.refreshRing() + if err != nil { + c.session.logger.Printf("gocql: unable to refresh ring: %v\n", err) + } +} + +func (c *controlConn) attemptReconnect() (*Conn, error) { hosts := c.session.ring.allHosts() hosts = shuffleHosts(hosts) @@ -363,6 +377,25 @@ func (c *controlConn) reconnect() { ch.conn.Close() } + conn, err := c.attemptReconnectToAnyOfHosts(hosts) + + if conn != nil { + return conn, err + } + + c.session.logger.Printf("gocql: unable to connect to any ring node: %v\n", err) + c.session.logger.Printf("gocql: control falling back to initial contact points.\n") + // Fallback to initial contact points, as it may be the case that all known initialHosts + // changed their IPs while keeping the same hostname(s). + initialHosts, resolvErr := addrsToHosts(c.session.cfg.Hosts, c.session.cfg.Port, c.session.logger) + if resolvErr != nil { + return nil, fmt.Errorf("resolve contact points' hostnames: %v", resolvErr) + } + + return c.attemptReconnectToAnyOfHosts(initialHosts) +} + +func (c *controlConn) attemptReconnectToAnyOfHosts(hosts []*HostInfo) (*Conn, error) { var conn *Conn var err error for _, host := range hosts { @@ -379,15 +412,7 @@ func (c *controlConn) reconnect() { conn.Close() conn = nil } - if conn == nil { - c.session.logger.Printf("gocql: control unable to register events: %v\n", err) - return - } - - err = c.session.refreshRing() - if err != nil { - c.session.logger.Printf("gocql: unable to refresh ring: %v\n", err) - } + return conn, err } func (c *controlConn) HandleError(conn *Conn, err error, closed bool) { diff --git a/host_source.go b/host_source.go index ac30bfc55..a0b7058d7 100644 --- a/host_source.go +++ b/host_source.go @@ -714,7 +714,7 @@ func refreshRing(r *ringDescriber) error { if !ok { return fmt.Errorf("get existing host=%s from prevHosts: %w", h, ErrCannotFindHost) } - if h.nodeToNodeAddress().Equal(existing.nodeToNodeAddress()) { + if h.connectAddress.Equal(existing.connectAddress) && h.nodeToNodeAddress().Equal(existing.nodeToNodeAddress()) { // no host IP change host.update(h) } else { diff --git a/session.go b/session.go index 0263d564d..d5ff9ecae 100644 --- a/session.go +++ b/session.go @@ -93,8 +93,8 @@ var queryPool = &sync.Pool{ func addrsToHosts(addrs []string, defaultPort int, logger StdLogger) ([]*HostInfo, error) { var hosts []*HostInfo - for _, hostport := range addrs { - resolvedHosts, err := hostInfo(hostport, defaultPort) + for _, hostaddr := range addrs { + resolvedHosts, err := hostInfo(hostaddr, defaultPort) if err != nil { // Try other hosts if unable to resolve DNS name if _, ok := err.(*net.DNSError); ok { From f0a05baf1a30d182a751b51b4b0fcdccc8b2af06 Mon Sep 17 00:00:00 2001 From: Jackson Fleming Date: Tue, 4 Jul 2023 20:26:24 +1000 Subject: [PATCH 05/10] Update CHANGELOG --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0cd62029e..16fd89329 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added - +- Added the InstaclustrPasswordAuthenticator to the list of default approved authenticators. ### Changed ### Fixed From 64175cf2bdc63dfaeee16b3604cc76afc2ddbe50 Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Mon, 10 Jul 2023 12:14:42 +0200 Subject: [PATCH 06/10] Add com.scylladb.auth.{SaslauthdAuthenticator,TransitionalAuthenticator} authenticators These are already allowed in github.com/scylladb/gocql. Closes https://github.com/gocql/gocql/issues/1703 --- CHANGELOG.md | 2 ++ conn.go | 2 ++ conn_test.go | 2 ++ 3 files changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 16fd89329..94bc1de43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Added the InstaclustrPasswordAuthenticator to the list of default approved authenticators. +- Added the `com.scylladb.auth.SaslauthdAuthenticator` and `com.scylladb.auth.TransitionalAuthenticator` + to the list of default approved authenticators. ### Changed ### Fixed diff --git a/conn.go b/conn.go index b0e15851d..06a969e85 100644 --- a/conn.go +++ b/conn.go @@ -32,6 +32,8 @@ var ( "com.ericsson.bss.cassandra.ecaudit.auth.AuditPasswordAuthenticator", "com.amazon.helenus.auth.HelenusAuthenticator", "com.ericsson.bss.cassandra.ecaudit.auth.AuditAuthenticator", + "com.scylladb.auth.SaslauthdAuthenticator", + "com.scylladb.auth.TransitionalAuthenticator", "com.instaclustr.cassandra.auth.InstaclustrPasswordAuthenticator", } ) diff --git a/conn_test.go b/conn_test.go index e67ea4056..42bdbb637 100644 --- a/conn_test.go +++ b/conn_test.go @@ -40,6 +40,8 @@ func TestApprove(t *testing.T) { approve("io.aiven.cassandra.auth.AivenAuthenticator", []string{}): true, approve("com.amazon.helenus.auth.HelenusAuthenticator", []string{}): true, approve("com.ericsson.bss.cassandra.ecaudit.auth.AuditAuthenticator", []string{}): true, + approve("com.scylladb.auth.SaslauthdAuthenticator", []string{}): true, + approve("com.scylladb.auth.TransitionalAuthenticator", []string{}): true, approve("com.instaclustr.cassandra.auth.InstaclustrPasswordAuthenticator", []string{}): true, approve("com.apache.cassandra.auth.FakeAuthenticator", []string{}): false, approve("com.apache.cassandra.auth.FakeAuthenticator", nil): false, From b51e1de8105362e9096f4f89287e46fb85a37424 Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Wed, 19 Jul 2023 13:19:29 +0200 Subject: [PATCH 07/10] Add shard/thread information to the tracing This information is needed for testing shard awareness. Also it is more in line with cqlsh tracing. --- session.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/session.go b/session.go index d5ff9ecae..dd222940c 100644 --- a/session.go +++ b/session.go @@ -2088,6 +2088,7 @@ func (t *traceWriter) Trace(traceId []byte) { activity string source string elapsed int + thread string ) t.mu.Lock() @@ -2096,13 +2097,13 @@ func (t *traceWriter) Trace(traceId []byte) { fmt.Fprintf(t.w, "Tracing session %016x (coordinator: %s, duration: %v):\n", traceId, coordinator, time.Duration(duration)*time.Microsecond) - iter = t.session.control.query(`SELECT event_id, activity, source, source_elapsed + iter = t.session.control.query(`SELECT event_id, activity, source, source_elapsed, thread FROM system_traces.events WHERE session_id = ?`, traceId) - for iter.Scan(×tamp, &activity, &source, &elapsed) { - fmt.Fprintf(t.w, "%s: %s (source: %s, elapsed: %d)\n", - timestamp.Format("2006/01/02 15:04:05.999999"), activity, source, elapsed) + for iter.Scan(×tamp, &activity, &source, &elapsed, &thread) { + fmt.Fprintf(t.w, "%s: %s [%s] (source: %s, elapsed: %d)\n", + timestamp.Format("2006/01/02 15:04:05.999999"), activity, thread, source, elapsed) } if err := iter.Close(); err != nil { From f6b159c3488ad1e286ec5a2e11785c738ddd07ea Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Fri, 7 Jul 2023 11:36:12 +0200 Subject: [PATCH 08/10] Update Keyspace/Table name in prepared Query statement Previously TokenAwarePolicy always used Keyspace explicitly set in cluster.Keyspace regardless of the keyspace in the Query. Now after preparing statement Keyspace and Table names are transferred to the Query and it can make use of that. Fixes: #1621 --- cass1batch_test.go | 2 +- conn.go | 6 ++++ frame.go | 13 ++++--- keyspace_table_test.go | 81 ++++++++++++++++++++++++++++++++++++++++++ policies_test.go | 14 ++++---- query_executor.go | 1 + session.go | 71 ++++++++++++++++++++++++++++-------- session_test.go | 4 +-- 8 files changed, 163 insertions(+), 29 deletions(-) create mode 100644 keyspace_table_test.go diff --git a/cass1batch_test.go b/cass1batch_test.go index 43d73ae42..f8a796b62 100644 --- a/cass1batch_test.go +++ b/cass1batch_test.go @@ -53,7 +53,7 @@ func TestShouldPrepareFunction(t *testing.T) { } for _, test := range shouldPrepareTests { - q := &Query{stmt: test.Stmt} + q := &Query{stmt: test.Stmt, routingInfo: &queryRoutingInfo{}} if got := q.shouldPrepare(); got != test.Result { t.Fatalf("%q: got %v, expected %v\n", test.Stmt, got, test.Result) } diff --git a/conn.go b/conn.go index 06a969e85..d780bcd97 100644 --- a/conn.go +++ b/conn.go @@ -1378,6 +1378,12 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter { params: params, customPayload: qry.customPayload, } + + // Set "keyspace" and "table" property in the query if it is present in preparedMetadata + qry.routingInfo.mu.Lock() + qry.routingInfo.keyspace = info.request.keyspace + qry.routingInfo.table = info.request.table + qry.routingInfo.mu.Unlock() } else { frame = &writeQueryFrame{ statement: qry.stmt, diff --git a/frame.go b/frame.go index fff55a41a..44be7879d 100644 --- a/frame.go +++ b/frame.go @@ -918,6 +918,10 @@ type preparedMetadata struct { // proto v4+ pkeyColumns []int + + keyspace string + + table string } func (r preparedMetadata) String() string { @@ -952,11 +956,10 @@ func (f *framer) parsePreparedMetadata() preparedMetadata { return meta } - var keyspace, table string globalSpec := meta.flags&flagGlobalTableSpec == flagGlobalTableSpec if globalSpec { - keyspace = f.readString() - table = f.readString() + meta.keyspace = f.readString() + meta.table = f.readString() } var cols []ColumnInfo @@ -964,14 +967,14 @@ func (f *framer) parsePreparedMetadata() preparedMetadata { // preallocate columninfo to avoid excess copying cols = make([]ColumnInfo, meta.colCount) for i := 0; i < meta.colCount; i++ { - f.readCol(&cols[i], &meta.resultMetadata, globalSpec, keyspace, table) + f.readCol(&cols[i], &meta.resultMetadata, globalSpec, meta.keyspace, meta.table) } } else { // use append, huge number of columns usually indicates a corrupt frame or // just a huge row. for i := 0; i < meta.colCount; i++ { var col ColumnInfo - f.readCol(&col, &meta.resultMetadata, globalSpec, keyspace, table) + f.readCol(&col, &meta.resultMetadata, globalSpec, meta.keyspace, meta.table) cols = append(cols, col) } } diff --git a/keyspace_table_test.go b/keyspace_table_test.go new file mode 100644 index 000000000..2040d0a83 --- /dev/null +++ b/keyspace_table_test.go @@ -0,0 +1,81 @@ +//go:build all || integration +// +build all integration + +package gocql + +import ( + "context" + "fmt" + "testing" +) + +// Keyspace_table checks if Query.Keyspace() is updated based on prepared statement +func TestKeyspaceTable(t *testing.T) { + cluster := createCluster() + + fallback := RoundRobinHostPolicy() + cluster.PoolConfig.HostSelectionPolicy = TokenAwareHostPolicy(fallback) + + session, err := cluster.CreateSession() + if err != nil { + t.Fatal("createSession:", err) + } + + cluster.Keyspace = "wrong_keyspace" + + keyspace := "test1" + table := "table1" + + err = createTable(session, `DROP KEYSPACE IF EXISTS `+keyspace) + if err != nil { + t.Fatal("unable to drop keyspace:", err) + } + + err = createTable(session, fmt.Sprintf(`CREATE KEYSPACE %s + WITH replication = { + 'class' : 'SimpleStrategy', + 'replication_factor' : 1 + }`, keyspace)) + + if err != nil { + t.Fatal("unable to create keyspace:", err) + } + + if err := session.control.awaitSchemaAgreement(); err != nil { + t.Fatal(err) + } + + err = createTable(session, fmt.Sprintf(`CREATE TABLE %s.%s (pk int, ck int, v int, PRIMARY KEY (pk, ck)); + `, keyspace, table)) + + if err != nil { + t.Fatal("unable to create table:", err) + } + + if err := session.control.awaitSchemaAgreement(); err != nil { + t.Fatal(err) + } + + ctx := context.Background() + + // insert a row + if err := session.Query(`INSERT INTO test1.table1(pk, ck, v) VALUES (?, ?, ?)`, + 1, 2, 3).WithContext(ctx).Consistency(One).Exec(); err != nil { + t.Fatal(err) + } + + var pk int + + /* Search for a specific set of records whose 'pk' column matches + * the value of inserted row. */ + qry := session.Query(`SELECT pk FROM test1.table1 WHERE pk = ? LIMIT 1`, + 1).WithContext(ctx).Consistency(One) + if err := qry.Scan(&pk); err != nil { + t.Fatal(err) + } + + // cluster.Keyspace was set to "wrong_keyspace", but during prepering statement + // Keyspace in Query should be changed to "test" and Table should be changed to table1 + assertEqual(t, "qry.Keyspace()", "test1", qry.Keyspace()) + assertEqual(t, "qry.Table()", "table1", qry.Table()) +} diff --git a/policies_test.go b/policies_test.go index 19c38ed19..170e9e761 100644 --- a/policies_test.go +++ b/policies_test.go @@ -54,7 +54,7 @@ func TestHostPolicy_TokenAware_SimpleStrategy(t *testing.T) { return nil, errors.New("not initalized") } - query := &Query{} + query := &Query{routingInfo: &queryRoutingInfo{}} query.getKeyspace = func() string { return keyspace } iter := policy.Pick(nil) @@ -201,7 +201,7 @@ func TestHostPolicy_TokenAware_NilHostInfo(t *testing.T) { } policy.SetPartitioner("OrderedPartitioner") - query := &Query{} + query := &Query{routingInfo: &queryRoutingInfo{}} query.getKeyspace = func() string { return "myKeyspace" } query.RoutingKey([]byte("20")) @@ -259,7 +259,7 @@ func TestCOWList_Add(t *testing.T) { // TestSimpleRetryPolicy makes sure that we only allow 1 + numRetries attempts func TestSimpleRetryPolicy(t *testing.T) { - q := &Query{} + q := &Query{routingInfo: &queryRoutingInfo{}} // this should allow a total of 3 tries. rt := &SimpleRetryPolicy{NumRetries: 2} @@ -317,7 +317,7 @@ func TestExponentialBackoffPolicy(t *testing.T) { func TestDowngradingConsistencyRetryPolicy(t *testing.T) { - q := &Query{cons: LocalQuorum} + q := &Query{cons: LocalQuorum, routingInfo: &queryRoutingInfo{}} rewt0 := &RequestErrWriteTimeout{ Received: 0, @@ -478,7 +478,7 @@ func TestHostPolicy_TokenAware(t *testing.T) { return nil, errors.New("not initialized") } - query := &Query{} + query := &Query{routingInfo: &queryRoutingInfo{}} query.getKeyspace = func() string { return keyspace } iter := policy.Pick(nil) @@ -580,7 +580,7 @@ func TestHostPolicy_TokenAware_NetworkStrategy(t *testing.T) { return nil, errors.New("not initialized") } - query := &Query{} + query := &Query{routingInfo: &queryRoutingInfo{}} query.getKeyspace = func() string { return keyspace } iter := policy.Pick(nil) @@ -707,7 +707,7 @@ func TestHostPolicy_TokenAware_RackAware(t *testing.T) { policyWithFallbackInternal.getKeyspaceName = policyInternal.getKeyspaceName policyWithFallbackInternal.getKeyspaceMetadata = policyInternal.getKeyspaceMetadata - query := &Query{} + query := &Query{routingInfo: &queryRoutingInfo{}} query.getKeyspace = func() string { return keyspace } iter := policy.Pick(nil) diff --git a/query_executor.go b/query_executor.go index 58976a7e7..d5b53b0c8 100644 --- a/query_executor.go +++ b/query_executor.go @@ -15,6 +15,7 @@ type ExecutableQuery interface { speculativeExecutionPolicy() SpeculativeExecutionPolicy GetRoutingKey() ([]byte, error) Keyspace() string + Table() string IsIdempotent() bool withContext(context.Context) ExecutableQuery diff --git a/session.go b/session.go index d5ff9ecae..fe903f4e7 100644 --- a/session.go +++ b/session.go @@ -87,7 +87,7 @@ type Session struct { var queryPool = &sync.Pool{ New: func() interface{} { - return &Query{refCount: 1} + return &Query{routingInfo: &queryRoutingInfo{}, refCount: 1} }, } @@ -630,6 +630,9 @@ func (s *Session) routingKeyInfo(ctx context.Context, stmt string) (*routingKeyI return nil, nil } + table := info.request.table + keyspace := info.request.keyspace + if len(info.request.pkeyColumns) > 0 { // proto v4 dont need to calculate primary key columns types := make([]TypeInfo, len(info.request.pkeyColumns)) @@ -638,17 +641,16 @@ func (s *Session) routingKeyInfo(ctx context.Context, stmt string) (*routingKeyI } routingKeyInfo := &routingKeyInfo{ - indexes: info.request.pkeyColumns, - types: types, + indexes: info.request.pkeyColumns, + types: types, + keyspace: keyspace, + table: table, } inflight.value = routingKeyInfo return routingKeyInfo, nil } - // get the table metadata - table := info.request.columns[0].Table - var keyspaceMetadata *KeyspaceMetadata keyspaceMetadata, inflight.err = s.KeyspaceMetadata(info.request.columns[0].Keyspace) if inflight.err != nil { @@ -672,8 +674,10 @@ func (s *Session) routingKeyInfo(ctx context.Context, stmt string) (*routingKeyI size := len(partitionKey) routingKeyInfo := &routingKeyInfo{ - indexes: make([]int, size), - types: make([]TypeInfo, size), + indexes: make([]int, size), + types: make([]TypeInfo, size), + keyspace: keyspace, + table: table, } for keyIndex, keyColumn := range partitionKey { @@ -909,6 +913,18 @@ type Query struct { // used by control conn queries to prevent triggering a write to systems // tables in AWS MCS see skipPrepare bool + + // routingInfo is a pointer because Query can be copied and copyable struct can't hold a mutex. + routingInfo *queryRoutingInfo +} + +type queryRoutingInfo struct { + // mu protects contents of queryRoutingInfo. + mu sync.RWMutex + + keyspace string + + table string } func (q *Query) defaultsFromSession() { @@ -1104,6 +1120,10 @@ func (q *Query) Keyspace() string { if q.getKeyspace != nil { return q.getKeyspace() } + if q.routingInfo.keyspace != "" { + return q.routingInfo.keyspace + } + if q.session == nil { return "" } @@ -1112,6 +1132,11 @@ func (q *Query) Keyspace() string { return q.session.cfg.Keyspace } +// Table returns name of the table the query will be executed against. +func (q *Query) Table() string { + return q.routingInfo.table +} + // GetRoutingKey gets the routing key to use for routing this query. If // a routing key has not been explicitly set, then the routing key will // be constructed if possible using the keyspace's schema and the query @@ -1134,6 +1159,12 @@ func (q *Query) GetRoutingKey() ([]byte, error) { return nil, err } + if routingKeyInfo != nil { + q.routingInfo.mu.Lock() + q.routingInfo.keyspace = routingKeyInfo.keyspace + q.routingInfo.table = routingKeyInfo.table + q.routingInfo.mu.Unlock() + } return createRoutingKey(routingKeyInfo, q.values) } @@ -1349,7 +1380,7 @@ func (q *Query) Release() { // reset zeroes out all fields of a query so that it can be safely pooled. func (q *Query) reset() { - *q = Query{refCount: 1} + *q = Query{routingInfo: &queryRoutingInfo{}, refCount: 1} } func (q *Query) incRefCount() { @@ -1691,6 +1722,9 @@ type Batch struct { cancelBatch func() keyspace string metrics *queryMetrics + + // routingInfo is a pointer because Query can be copied and copyable struct can't hold a mutex. + routingInfo *queryRoutingInfo } // NewBatch creates a new batch operation without defaults from the cluster @@ -1698,9 +1732,10 @@ type Batch struct { // Deprecated: use session.NewBatch instead func NewBatch(typ BatchType) *Batch { return &Batch{ - Type: typ, - metrics: &queryMetrics{m: make(map[string]*hostMetrics)}, - spec: &NonSpeculativeExecution{}, + Type: typ, + metrics: &queryMetrics{m: make(map[string]*hostMetrics)}, + spec: &NonSpeculativeExecution{}, + routingInfo: &queryRoutingInfo{}, } } @@ -1719,6 +1754,7 @@ func (s *Session) NewBatch(typ BatchType) *Batch { keyspace: s.cfg.Keyspace, metrics: &queryMetrics{m: make(map[string]*hostMetrics)}, spec: &NonSpeculativeExecution{}, + routingInfo: &queryRoutingInfo{}, } s.mu.RUnlock() @@ -1743,6 +1779,11 @@ func (b *Batch) Keyspace() string { return b.keyspace } +// Batch has no reasonable eqivalent of Query.Table(). +func (b *Batch) Table() string { + return b.routingInfo.table +} + // Attempts returns the number of attempts made to execute the batch. func (b *Batch) Attempts() int { return b.metrics.attempts() @@ -2014,8 +2055,10 @@ type routingKeyInfoLRU struct { } type routingKeyInfo struct { - indexes []int - types []TypeInfo + indexes []int + types []TypeInfo + keyspace string + table string } func (r *routingKeyInfo) String() string { diff --git a/session_test.go b/session_test.go index a7c8b808e..cbe3f6731 100644 --- a/session_test.go +++ b/session_test.go @@ -100,7 +100,7 @@ func (f funcQueryObserver) ObserveQuery(ctx context.Context, o ObservedQuery) { } func TestQueryBasicAPI(t *testing.T) { - qry := &Query{} + qry := &Query{routingInfo: &queryRoutingInfo{}} // Initiate host ip := "127.0.0.1" @@ -164,7 +164,7 @@ func TestQueryBasicAPI(t *testing.T) { func TestQueryShouldPrepare(t *testing.T) { toPrepare := []string{"select * ", "INSERT INTO", "update table", "delete from", "begin batch"} cantPrepare := []string{"create table", "USE table", "LIST keyspaces", "alter table", "drop table", "grant user", "revoke user"} - q := &Query{} + q := &Query{routingInfo: &queryRoutingInfo{}} for i := 0; i < len(toPrepare); i++ { q.stmt = toPrepare[i] From 65f29c88c8e31d9d7a6e2332d0bccca64b477e49 Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Fri, 21 Jul 2023 09:41:53 +0200 Subject: [PATCH 09/10] Update AUTHORS and CHANGELOG.md --- AUTHORS | 1 + CHANGELOG.md | 2 ++ 2 files changed, 3 insertions(+) diff --git a/AUTHORS b/AUTHORS index 5f2d36134..327570751 100644 --- a/AUTHORS +++ b/AUTHORS @@ -140,3 +140,4 @@ Lauro Ramos Venancio Dmitry Kropachev Oliver Boyle Jackson Fleming +Sylwia Szunejko diff --git a/CHANGELOG.md b/CHANGELOG.md index 94bc1de43..4674c4854 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added the InstaclustrPasswordAuthenticator to the list of default approved authenticators. - Added the `com.scylladb.auth.SaslauthdAuthenticator` and `com.scylladb.auth.TransitionalAuthenticator` to the list of default approved authenticators. +- Added transferring Keyspace and Table names to the Query from the prepared response and updating + information about that every time this information is received ### Changed ### Fixed From 7a686db6d366bb536c0abce24749af6b33cdd574 Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Fri, 21 Jul 2023 10:37:55 +0200 Subject: [PATCH 10/10] Update changelog --- CHANGELOG.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4674c4854..685f9931f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,12 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added -- Added the InstaclustrPasswordAuthenticator to the list of default approved authenticators. +- Added the InstaclustrPasswordAuthenticator to the list of default approved authenticators. (#1711) - Added the `com.scylladb.auth.SaslauthdAuthenticator` and `com.scylladb.auth.TransitionalAuthenticator` - to the list of default approved authenticators. + to the list of default approved authenticators. (#1712) - Added transferring Keyspace and Table names to the Query from the prepared response and updating - information about that every time this information is received + information about that every time this information is received. (#1714) ### Changed +- Tracer created with NewTraceWriter now includes the thread information from trace events in the output. (#1716) ### Fixed