Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Add usesTablets to KeyspaceMetadata #403

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class KeyspaceMetadata {
final Map<String, AggregateMetadata> aggregates =
new ConcurrentHashMap<String, AggregateMetadata>();

// Scylla feature
private boolean usesTablets = false;

@VisibleForTesting
@Deprecated
KeyspaceMetadata(String name, boolean durableWrites, Map<String, String> replication) {
Expand Down Expand Up @@ -458,4 +461,12 @@ void add(UserType type) {
ReplicationStrategy replicationStrategy() {
return strategy;
}

void setUsesTablets(boolean predicate) {
this.usesTablets = predicate;
}

public boolean usesTablets() {
return this.usesTablets;
}
}
33 changes: 19 additions & 14 deletions driver-core/src/main/java/com/datastax/driver/core/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -558,26 +558,31 @@ public Set<TokenRange> getTokenRanges(String keyspace, Host host) {
public Set<Host> getReplicas(
String keyspace, String table, Token.Factory partitioner, ByteBuffer partitionKey) {
keyspace = handleId(keyspace);
table = handleId(table);
TokenMap current = tokenMap;
if (current == null) {
if (partitioner == null && current != null) {
partitioner = current.factory;
}
if (partitioner == null) {
return Collections.emptySet();
} else {
if (partitioner == null) {
partitioner = current.factory;
}
// If possible, try tablet lookup first
}
Token token = partitioner.hash(partitionKey);

// Tablets:
KeyspaceMetadata ksMetadata = getKeyspace(keyspace);
if (ksMetadata != null && ksMetadata.usesTablets()) {
if (keyspace != null && table != null) {
Token token = partitioner.hash(partitionKey);
assert (token instanceof Token.TokenLong64);
Set<Host> replicas = tabletMap.getReplicas(keyspace, table, (long) token.getValue());
if (!replicas.isEmpty()) {
return replicas;
}
return tabletMap.getReplicas(keyspace, table, (long) token.getValue());
} else {
return Collections.emptySet();
}
// Fall back to tokenMap
Set<Host> hosts = current.getReplicas(keyspace, partitioner.hash(partitionKey));
return hosts == null ? Collections.<Host>emptySet() : hosts;
}

// TokenMap:
if (current == null) return Collections.<Host>emptySet();
Set<Host> hosts = current.getReplicas(keyspace, token);
return hosts == null ? Collections.<Host>emptySet() : hosts;
}

/**
Expand Down
134 changes: 126 additions & 8 deletions driver-core/src/main/java/com/datastax/driver/core/SchemaParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,20 @@

import com.datastax.driver.core.exceptions.BusyConnectionException;
import com.datastax.driver.core.exceptions.ConnectionException;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -56,6 +59,9 @@ abstract class SchemaParser {
private static final SchemaParser V3_PARSER = new V3SchemaParser();
private static final SchemaParser V4_PARSER = new V4SchemaParser();

private static final String SELECT_SCYLLA_KEYSPACES =
"SELECT * FROM system_schema.scylla_keyspaces";

static SchemaParser forVersion(VersionNumber cassandraVersion) {
if (cassandraVersion.getMajor() >= 4) return V4_PARSER;
if (cassandraVersion.getMajor() >= 3) return V3_PARSER;
Expand Down Expand Up @@ -197,7 +203,6 @@ void refresh(

private Map<String, KeyspaceMetadata> buildKeyspaces(
SystemRows rows, VersionNumber cassandraVersion, Cluster cluster) {

Map<String, KeyspaceMetadata> keyspaces = new LinkedHashMap<String, KeyspaceMetadata>();
for (Row keyspaceRow : rows.keyspaces) {
KeyspaceMetadata keyspace = KeyspaceMetadata.build(keyspaceRow, cassandraVersion);
Expand Down Expand Up @@ -239,6 +244,13 @@ private Map<String, KeyspaceMetadata> buildKeyspaces(
for (MaterializedViewMetadata view : views.values()) {
keyspace.add(view);
}
Row scyllaKeyspacesRow = rows.scyllaKeyspaces.getOrDefault(keyspace.getName(), null);
if (scyllaKeyspacesRow != null) {
if (scyllaKeyspacesRow.getColumnDefinitions().contains("initial_tablets")
&& !scyllaKeyspacesRow.isNull("initial_tablets")) {
keyspace.setUsesTablets(true);
}
}
keyspaces.put(keyspace.getName(), keyspace);
}
if (rows.virtualKeyspaces != null) {
Expand Down Expand Up @@ -619,6 +631,29 @@ private void updateViews(
}
}

static Set<String> toKeyspaceSet(ResultSet rs) {
if (rs == null) return Collections.emptySet();

Set<String> result = new HashSet<>();
for (Row row : rs) {
result.add(row.getString(KeyspaceMetadata.KS_NAME));
}
return result;
}

static Map<String, Row> groupByKeyspacePk(ResultSet rs) {
// Assumes keyspace name is full primary key, therefore
// each keyspace name identifies at most one row
if (rs == null) return Collections.emptyMap();

Map<String, Row> result = new HashMap<String, Row>();
for (Row row : rs) {
String ksName = row.getString(KeyspaceMetadata.KS_NAME);
result.put(ksName, row);
}
return result;
}

static Map<String, List<Row>> groupByKeyspace(ResultSet rs) {
if (rs == null) return Collections.emptyMap();

Expand Down Expand Up @@ -696,6 +731,25 @@ private static ResultSet get(ResultSetFuture future)
return (future == null) ? null : future.get();
}

private static ResultSet getIfExists(ResultSetFuture future)
throws InterruptedException, ExecutionException {
// Some of Scylla specific tables/columns may not exist depending on version.
// This method is meant to try to get results without failing whole schema parse
// if something additional does not exist.
if (future == null) return null;
try {
ResultSet resultSet = future.get();
return resultSet;
} catch (ExecutionException ex) {
if (ex.getCause() instanceof InvalidQueryException) {
// meant to handle keyspace/table does not exist exceptions
return null;
}
// rethrow if it's something else
throw ex;
}
}

/**
* The rows from the system tables that we want to parse to metadata classes. The format of these
* rows depends on the Cassandra version, but our parsing code knows how to handle the
Expand All @@ -713,6 +767,7 @@ private static class SystemRows {
final ResultSet virtualKeyspaces;
final Map<String, List<Row>> virtualTables;
final Map<String, Map<String, Map<String, ColumnMetadata.Raw>>> virtualColumns;
final Map<String, Row> scyllaKeyspaces;

public SystemRows(
ResultSet keyspaces,
Expand All @@ -725,7 +780,8 @@ public SystemRows(
Map<String, Map<String, List<Row>>> indexes,
ResultSet virtualKeyspaces,
Map<String, List<Row>> virtualTables,
Map<String, Map<String, Map<String, ColumnMetadata.Raw>>> virtualColumns) {
Map<String, Map<String, Map<String, ColumnMetadata.Raw>>> virtualColumns,
Map<String, Row> scyllaKeyspaces) {
this.keyspaces = keyspaces;
this.tables = tables;
this.columns = columns;
Expand All @@ -737,6 +793,7 @@ public SystemRows(
this.virtualKeyspaces = virtualKeyspaces;
this.virtualTables = virtualTables;
this.virtualColumns = virtualColumns;
this.scyllaKeyspaces = scyllaKeyspaces;
}
}

Expand Down Expand Up @@ -790,7 +847,8 @@ else if (targetType == AGGREGATE)
cfFuture = null,
colsFuture = null,
functionsFuture = null,
aggregatesFuture = null;
aggregatesFuture = null,
scyllaKsFuture = null;

ProtocolVersion protocolVersion =
cluster.getConfiguration().getProtocolOptions().getProtocolVersion();
Expand All @@ -812,6 +870,21 @@ else if (targetType == AGGREGATE)
if (isSchemaOrKeyspace && supportsUdfs(cassandraVersion) || targetType == AGGREGATE)
aggregatesFuture = queryAsync(SELECT_AGGREGATES + whereClause, connection, protocolVersion);

if (isSchemaOrKeyspace) {
if (targetType == KEYSPACE) {
scyllaKsFuture =
queryAsync(
SELECT_SCYLLA_KEYSPACES
+ " WHERE keyspace_name = '"
+ targetKeyspace
+ "' LIMIT 1;",
connection,
protocolVersion);
} else {
scyllaKsFuture = queryAsync(SELECT_SCYLLA_KEYSPACES, connection, protocolVersion);
}
}

return new SystemRows(
get(ksFuture),
groupByKeyspace(get(cfFuture)),
Expand All @@ -824,7 +897,8 @@ else if (targetType == AGGREGATE)
Collections.<String, Map<String, List<Row>>>emptyMap(),
null,
Collections.<String, List<Row>>emptyMap(),
Collections.<String, Map<String, Map<String, ColumnMetadata.Raw>>>emptyMap());
Collections.<String, Map<String, Map<String, ColumnMetadata.Raw>>>emptyMap(),
groupByKeyspacePk(getIfExists(scyllaKsFuture)));
}

@Override
Expand Down Expand Up @@ -1197,9 +1271,19 @@ private Map<String, KeyspaceMetadata> buildSchema(
cluster.getConfiguration().getProtocolOptions().getProtocolVersion();

Map<String, KeyspaceMetadata> keyspaces = new LinkedHashMap<String, KeyspaceMetadata>();
ResultSetFuture scyllaKeyspacesFuture =
queryAsync(SELECT_SCYLLA_KEYSPACES, connection, protocolVersion);
ResultSet keyspacesData = queryAsync(SELECT_KEYSPACES, connection, protocolVersion).get();
Map<String, Row> scyllaKeyspacesData = groupByKeyspacePk(getIfExists(scyllaKeyspacesFuture));
for (Row keyspaceRow : keyspacesData) {
KeyspaceMetadata keyspace = KeyspaceMetadata.build(keyspaceRow, cassandraVersion);
Row scyllaKeyspacesRow = scyllaKeyspacesData.getOrDefault(keyspace.getName(), null);
if (scyllaKeyspacesRow != null) {
if (scyllaKeyspacesRow.getColumnDefinitions().contains("initial_tablets")
&& !scyllaKeyspacesRow.isNull("initial_tablets")) {
keyspace.setUsesTablets(true);
}
}
keyspaces.put(keyspace.getName(), keyspace);
}

Expand Down Expand Up @@ -1288,7 +1372,8 @@ SystemRows fetchSystemRows(
functionsFuture = null,
aggregatesFuture = null,
indexesFuture = null,
viewsFuture = null;
viewsFuture = null,
scyllaKsFuture = null;

ProtocolVersion protocolVersion =
cluster.getConfiguration().getProtocolOptions().getProtocolVersion();
Expand Down Expand Up @@ -1356,6 +1441,21 @@ SystemRows fetchSystemRows(
connection,
protocolVersion);

if (isSchemaOrKeyspace) {
if (targetType == KEYSPACE) {
scyllaKsFuture =
queryAsync(
SELECT_SCYLLA_KEYSPACES
+ " WHERE keyspace_name = '"
+ targetKeyspace
+ "' LIMIT 1;",
connection,
protocolVersion);
} else {
scyllaKsFuture = queryAsync(SELECT_SCYLLA_KEYSPACES, connection, protocolVersion);
}
}

return new SystemRows(
get(ksFuture),
groupByKeyspace(get(cfFuture)),
Expand All @@ -1367,7 +1467,8 @@ SystemRows fetchSystemRows(
groupByKeyspaceAndCf(get(indexesFuture), TABLE_NAME),
null,
Collections.<String, List<Row>>emptyMap(),
Collections.<String, Map<String, Map<String, ColumnMetadata.Raw>>>emptyMap());
Collections.<String, Map<String, Map<String, ColumnMetadata.Raw>>>emptyMap(),
groupByKeyspacePk(getIfExists(scyllaKsFuture)));
}

@Override
Expand Down Expand Up @@ -1499,7 +1600,8 @@ SystemRows fetchSystemRows(
viewsFuture = null,
virtualKeyspacesFuture = null,
virtualTableFuture = null,
virtualColumnsFuture = null;
virtualColumnsFuture = null,
scyllaKsFuture = null;

ProtocolVersion protocolVersion =
cluster.getConfiguration().getProtocolOptions().getProtocolVersion();
Expand Down Expand Up @@ -1589,6 +1691,21 @@ SystemRows fetchSystemRows(
protocolVersion);
}

if (isSchemaOrKeyspace) {
if (targetType == KEYSPACE) {
scyllaKsFuture =
queryAsync(
SELECT_SCYLLA_KEYSPACES
+ " WHERE keyspace_name = '"
+ targetKeyspace
+ "' LIMIT 1;",
connection,
protocolVersion);
} else {
scyllaKsFuture = queryAsync(SELECT_SCYLLA_KEYSPACES, connection, protocolVersion);
}
dkropachev marked this conversation as resolved.
Show resolved Hide resolved
}

return new SystemRows(
get(ksFuture),
groupByKeyspace(get(cfFuture)),
Expand All @@ -1600,7 +1717,8 @@ SystemRows fetchSystemRows(
groupByKeyspaceAndCf(get(indexesFuture), TABLE_NAME),
get(virtualKeyspacesFuture),
groupByKeyspace(get(virtualTableFuture)),
groupByKeyspaceAndCf(get(virtualColumnsFuture), cassandraVersion, TABLE_NAME));
groupByKeyspaceAndCf(get(virtualColumnsFuture), cassandraVersion, TABLE_NAME),
groupByKeyspacePk(getIfExists(scyllaKsFuture)));
}
}
}
Loading