Skip to content

Commit

Permalink
Support for Cassandra 3.0 schema metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
mpenick committed Nov 30, 2015
1 parent 2cb8360 commit 1944158
Show file tree
Hide file tree
Showing 30 changed files with 1,541 additions and 883 deletions.
12 changes: 12 additions & 0 deletions include/cassandra.h
Original file line number Diff line number Diff line change
Expand Up @@ -4278,6 +4278,18 @@ cass_data_type_set_class_name_n(CassDataType* data_type,
const char* class_name,
size_t class_name_length);

/**
* Gets the sub-data type count of a UDT (user defined type), tuple
* or collection.
*
* <b>Note:</b> Only valid for UDT, tuple and collection data types.
*
* @param[in] data_type
* @return Returns the number of sub-data types
*/
CASS_EXPORT size_t
cass_data_sub_type_count(const CassDataType* data_type);

/**
* Gets the sub-data type of a UDT (user defined type), tuple or collection at
* the specified index.
Expand Down
4 changes: 2 additions & 2 deletions src/abstract_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class AbstractData {
protected:
virtual size_t get_indices(StringRef name,
IndexVec* indices) = 0;
virtual const SharedRefPtr<const DataType>& get_type(size_t index) const = 0;
virtual const DataType::ConstPtr& get_type(size_t index) const = 0;

private:
template <class T>
Expand All @@ -152,7 +152,7 @@ class AbstractData {
return CASS_ERROR_LIB_INDEX_OUT_OF_BOUNDS;
}
IsValidDataType<T> is_valid_type;
SharedRefPtr<const DataType> data_type(get_type(index));
DataType::ConstPtr data_type(get_type(index));
if (data_type && !is_valid_type(value, data_type)) {
return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
}
Expand Down
6 changes: 3 additions & 3 deletions src/collection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class Collection : public RefCounted<Collection> {
items_.reserve(item_count);
}

Collection(const SharedRefPtr<const CollectionType>& data_type,
Collection(const CollectionType::ConstPtr& data_type,
size_t item_count)
: data_type_(data_type) {
items_.reserve(item_count);
Expand All @@ -51,7 +51,7 @@ class Collection : public RefCounted<Collection> {
return static_cast<CassCollectionType>(data_type_->value_type());
}

const SharedRefPtr<const CollectionType>& data_type() const { return data_type_; }
const CollectionType::ConstPtr& data_type() const { return data_type_; }
const BufferVec& items() const { return items_; }

#define APPEND_TYPE(Type) \
Expand Down Expand Up @@ -129,7 +129,7 @@ class Collection : public RefCounted<Collection> {
void encode_items_uint16(char* buf) const;

private:
SharedRefPtr<const CollectionType> data_type_;
CollectionType::ConstPtr data_type_;
BufferVec items_;

private:
Expand Down
2 changes: 1 addition & 1 deletion src/collection_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ char* CollectionIterator::decode_value(char* position) {
int32_t size;
char* buffer = decode_size(protocol_version, position, size);

SharedRefPtr<const DataType> data_type;
DataType::ConstPtr data_type;
if (collection_->value_type() == CASS_VALUE_TYPE_MAP) {
data_type = (index_ % 2 == 0) ? collection_->primary_data_type()
: collection_->secondary_data_type();
Expand Down
2 changes: 1 addition & 1 deletion src/collection_iterator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class TupleIterator : public ValueIterator {
: ValueIterator(CASS_ITERATOR_TYPE_TUPLE)
, tuple_(tuple)
, position_(tuple->data()) {
SharedRefPtr<const CollectionType> collection_type(tuple->data_type());
CollectionType::ConstPtr collection_type(tuple->data_type());
next_ = collection_type->types().begin();
end_ = collection_type->types().end();
}
Expand Down
140 changes: 91 additions & 49 deletions src/control_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,19 @@
#define SELECT_PEERS "SELECT peer, data_center, rack, release_version, rpc_address FROM system.peers"
#define SELECT_PEERS_TOKENS "SELECT peer, data_center, rack, release_version, rpc_address, tokens FROM system.peers"

#define SELECT_KEYSPACES "SELECT * FROM system.schema_keyspaces"
#define SELECT_COLUMN_FAMILIES "SELECT * FROM system.schema_columnfamilies"
#define SELECT_COLUMNS "SELECT * FROM system.schema_columns"
#define SELECT_USERTYPES "SELECT * FROM system.schema_usertypes"
#define SELECT_FUNCTIONS "SELECT * FROM system.schema_functions"
#define SELECT_AGGREGATES "SELECT * FROM system.schema_aggregates"
#define SELECT_KEYSPACES_20 "SELECT * FROM system.schema_keyspaces"
#define SELECT_COLUMN_FAMILIES_20 "SELECT * FROM system.schema_columnfamilies"
#define SELECT_COLUMNS_20 "SELECT * FROM system.schema_columns"
#define SELECT_USERTYPES_21 "SELECT * FROM system.schema_usertypes"
#define SELECT_FUNCTIONS_22 "SELECT * FROM system.schema_functions"
#define SELECT_AGGREGATES_22 "SELECT * FROM system.schema_aggregates"

#define SELECT_KEYSPACES_30 "SELECT * FROM system_schema.keyspaces"
#define SELECT_TABLES_30 "SELECT * FROM system_schema.tables"
#define SELECT_COLUMNS_30 "SELECT * FROM system_schema.columns"
#define SELECT_USERTYPES_30 "SELECT * FROM system_schema.types"
#define SELECT_FUNCTIONS_30 "SELECT * FROM system_schema.functions"
#define SELECT_AGGREGATES_30 "SELECT * FROM system_schema.aggregates"

namespace cass {

Expand Down Expand Up @@ -194,7 +201,7 @@ void ControlConnection::on_ready(Connection* connection) {

// The control connection has to refresh meta when there's a reconnect because
// events could have been missed while not connected.
query_meta_all();
query_meta_hosts();
}

void ControlConnection::on_close(Connection* connection) {
Expand Down Expand Up @@ -359,40 +366,23 @@ void ControlConnection::on_event(EventResponse* response) {
}
}

//TODO: query and callbacks should be in Metadata
// punting for now because of tight coupling of Session and CC state
void ControlConnection::query_meta_all() {
ScopedRefPtr<ControlMultipleRequestHandler<QueryMetadataAllData> > handler(
new ControlMultipleRequestHandler<QueryMetadataAllData>(this, ControlConnection::on_query_meta_all, QueryMetadataAllData()));
void ControlConnection::query_meta_hosts() {
ScopedRefPtr<ControlMultipleRequestHandler<UnusedData> > handler(
new ControlMultipleRequestHandler<UnusedData>(this, ControlConnection::on_query_hosts, UnusedData()));
handler->execute_query(SELECT_LOCAL_TOKENS);
handler->execute_query(SELECT_PEERS_TOKENS);

if (session_->config().use_schema()) {
handler->execute_query(SELECT_KEYSPACES);
handler->execute_query(SELECT_COLUMN_FAMILIES);
handler->execute_query(SELECT_COLUMNS);
if (protocol_version_ >= 3) {
handler->execute_query(SELECT_USERTYPES);
}
if (protocol_version_ >= 4) {
handler->execute_query(SELECT_FUNCTIONS);
handler->execute_query(SELECT_AGGREGATES);
}
}
}

void ControlConnection::on_query_meta_all(ControlConnection* control_connection,
const QueryMetadataAllData& unused,
const MultipleRequestHandler::ResponseVec& responses) {
void ControlConnection::on_query_hosts(ControlConnection* control_connection,
const UnusedData& data,
const MultipleRequestHandler::ResponseVec& responses) {
Connection* connection = control_connection->connection_;
if (connection == NULL) {
return;
}

Session* session = control_connection->session_;

session->metadata().clear_and_update_back();

bool is_initial_connection = (control_connection->state_ == CONTROL_STATE_NEW);

// If the 'system.local' table is empty the connection isn't used as a control
Expand Down Expand Up @@ -457,22 +447,73 @@ void ControlConnection::on_query_meta_all(ControlConnection* control_connection,
}
}

session->purge_hosts(is_initial_connection);

if (session->config().use_schema()) {
session->metadata().update_keyspaces(static_cast<ResultResponse*>(responses[2].get()));
session->metadata().update_tables(static_cast<ResultResponse*>(responses[3].get()),
static_cast<ResultResponse*>(responses[4].get()));
if (control_connection->protocol_version_ >= 3) {
session->metadata().update_user_types(static_cast<ResultResponse*>(responses[5].get()));
control_connection->query_meta_schema();
} else {
control_connection->state_ = CONTROL_STATE_READY;
session->on_control_connection_ready();
// Create a new query plan that considers all the new hosts from the
// "system" tables.
control_connection->query_plan_.reset(session->new_query_plan());
}
}

//TODO: query and callbacks should be in Metadata
// punting for now because of tight coupling of Session and CC state
void ControlConnection::query_meta_schema() {
ScopedRefPtr<ControlMultipleRequestHandler<UnusedData> > handler(
new ControlMultipleRequestHandler<UnusedData>(this, ControlConnection::on_query_meta_schema, UnusedData()));

if (session_->metadata().cassandra_version() >= VersionNumber(3, 0, 0)) {
handler->execute_query(SELECT_KEYSPACES_30);
handler->execute_query(SELECT_TABLES_30);
handler->execute_query(SELECT_COLUMNS_30);
handler->execute_query(SELECT_USERTYPES_30);
handler->execute_query(SELECT_FUNCTIONS_30);
handler->execute_query(SELECT_AGGREGATES_30);
} else {
handler->execute_query(SELECT_KEYSPACES_20);
handler->execute_query(SELECT_COLUMN_FAMILIES_20);
handler->execute_query(SELECT_COLUMNS_20);
if (session_->metadata().cassandra_version() >= VersionNumber(2, 1, 0)) {
handler->execute_query(SELECT_USERTYPES_21);
}
if (control_connection->protocol_version_ >= 4) {
session->metadata().update_functions(static_cast<ResultResponse*>(responses[6].get()));
session->metadata().update_aggregates(static_cast<ResultResponse*>(responses[7].get()));
if (session_->metadata().cassandra_version() >= VersionNumber(2, 2, 0)) {
handler->execute_query(SELECT_FUNCTIONS_22);
handler->execute_query(SELECT_AGGREGATES_22);
}
session->metadata().swap_to_back_and_update_front();
if (control_connection->should_query_tokens_) session->metadata().build();
}
}

void ControlConnection::on_query_meta_schema(ControlConnection* control_connection,
const UnusedData& unused,
const MultipleRequestHandler::ResponseVec& responses) {
Connection* connection = control_connection->connection_;
if (connection == NULL) {
return;
}

Session* session = control_connection->session_;

session->metadata().clear_and_update_back();

bool is_initial_connection = (control_connection->state_ == CONTROL_STATE_NEW);

session->metadata().update_keyspaces(static_cast<ResultResponse*>(responses[0].get()));
session->metadata().update_tables(static_cast<ResultResponse*>(responses[1].get()),
static_cast<ResultResponse*>(responses[2].get()));

if (session->metadata().cassandra_version() >= VersionNumber(2, 1, 0)) {
session->metadata().update_user_types(static_cast<ResultResponse*>(responses[3].get()));
}

if (session->metadata().cassandra_version() >= VersionNumber(2, 2, 0)) {
session->metadata().update_functions(static_cast<ResultResponse*>(responses[4].get()));
session->metadata().update_aggregates(static_cast<ResultResponse*>(responses[5].get()));
}

session->metadata().swap_to_back_and_update_front();
if (control_connection->should_query_tokens_) session->metadata().build();

if (is_initial_connection) {
control_connection->state_ = CONTROL_STATE_READY;
Expand Down Expand Up @@ -636,8 +677,9 @@ void ControlConnection::update_node_info(SharedRefPtr<Host> host, const Row* row
}

if (should_query_tokens_) {
bool is_connected_host = connection_ != NULL && host->address().compare(connection_->address()) == 0;
std::string partitioner;
if (row->get_string_by_name("partitioner", &partitioner)) {
if (is_connected_host && row->get_string_by_name("partitioner", &partitioner)) {
session_->metadata().set_partitioner(partitioner);
}
v = row->get_by_name("tokens");
Expand All @@ -655,7 +697,7 @@ void ControlConnection::update_node_info(SharedRefPtr<Host> host, const Row* row
}

void ControlConnection::refresh_keyspace(const StringRef& keyspace_name) {
std::string query(SELECT_KEYSPACES);
std::string query(SELECT_KEYSPACES_20);
query.append(" WHERE keyspace_name='")
.append(keyspace_name.data(), keyspace_name.size())
.append("'");
Expand Down Expand Up @@ -683,11 +725,11 @@ void ControlConnection::on_refresh_keyspace(ControlConnection* control_connectio

void ControlConnection::refresh_table(const StringRef& keyspace_name,
const StringRef& table_name) {
std::string cf_query(SELECT_COLUMN_FAMILIES);
std::string cf_query(SELECT_COLUMN_FAMILIES_20);
cf_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
.append("' AND columnfamily_name='").append(table_name.data(), table_name.size()).append("'");

std::string col_query(SELECT_COLUMNS);
std::string col_query(SELECT_COLUMNS_20);
col_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
.append("' AND columnfamily_name='").append(table_name.data(), table_name.size()).append("'");

Expand Down Expand Up @@ -720,7 +762,7 @@ void ControlConnection::on_refresh_table(ControlConnection* control_connection,
void ControlConnection::refresh_type(const StringRef& keyspace_name,
const StringRef& type_name) {

std::string query(SELECT_USERTYPES);
std::string query(SELECT_USERTYPES_21);
query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
.append("' AND type_name='").append(type_name.data(), type_name.size()).append("'");

Expand Down Expand Up @@ -753,10 +795,10 @@ void ControlConnection::refresh_function(const StringRef& keyspace_name,

std::string query;
if (is_aggregate) {
query.assign(SELECT_AGGREGATES);
query.assign(SELECT_AGGREGATES_22);
query.append(" WHERE keyspace_name=? AND aggregate_name=? AND signature=?");
} else {
query.assign(SELECT_FUNCTIONS);
query.assign(SELECT_FUNCTIONS_22);
query.append(" WHERE keyspace_name=? AND function_name=? AND signature=?");
}

Expand Down
32 changes: 18 additions & 14 deletions src/control_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class ControlConnection : public Connection::Listener {
std::string table_name;
};

struct QueryMetadataAllData {};
struct UnusedData {};

template<class T>
class ControlHandler : public Handler {
Expand Down Expand Up @@ -180,28 +180,32 @@ class ControlConnection : public Connection::Listener {
virtual void on_availability_change(Connection* connection) {}
virtual void on_event(EventResponse* response);

//TODO: possibly reorder callback functions to pair with initiator
static void on_query_meta_all(ControlConnection* control_connection,
const QueryMetadataAllData& data,
const MultipleRequestHandler::ResponseVec& responses);
static void on_refresh_node_info(ControlConnection* control_connection,
const RefreshNodeData& data,
Response* response);
static void on_refresh_node_info_all(ControlConnection* control_connection,
const RefreshNodeData& data,
Response* response);
void on_local_query(ResponseMessage* response);
void on_peer_query(ResponseMessage* response);
static void on_reconnect(Timer* timer);

bool handle_query_invalid_response(Response* response);
void handle_query_failure(CassError code, const std::string& message);
void handle_query_timeout();

void query_meta_all();
void query_meta_hosts();
static void on_query_hosts(ControlConnection* control_connection,
const UnusedData& data,
const MultipleRequestHandler::ResponseVec& responses);

void query_meta_schema();
static void on_query_meta_schema(ControlConnection* control_connection,
const UnusedData& data,
const MultipleRequestHandler::ResponseVec& responses);

void refresh_node_info(SharedRefPtr<Host> host,
bool is_new_node,
bool query_tokens = false);
static void on_refresh_node_info(ControlConnection* control_connection,
const RefreshNodeData& data,
Response* response);
static void on_refresh_node_info_all(ControlConnection* control_connection,
const RefreshNodeData& data,
Response* response);

void update_node_info(SharedRefPtr<Host> host, const Row* row);

void refresh_keyspace(const StringRef& keyspace_name);
Expand Down
Loading

0 comments on commit 1944158

Please sign in to comment.