Skip to content

Commit

Permalink
Merge branch '295'
Browse files Browse the repository at this point in the history
  • Loading branch information
mpenick committed Oct 22, 2015
2 parents 4198255 + 0fc4ece commit 987feaa
Show file tree
Hide file tree
Showing 18 changed files with 1,321 additions and 142 deletions.
431 changes: 429 additions & 2 deletions include/cassandra.h

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions src/collection_iterator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ class TupleIterator : public ValueIterator {
const Value* tuple_;

char* position_;
DataTypeVec::const_iterator next_;
DataTypeVec::const_iterator current_;
DataTypeVec::const_iterator end_;
DataType::Vec::const_iterator next_;
DataType::Vec::const_iterator current_;
DataType::Vec::const_iterator end_;
};

} // namespace cass
Expand Down
82 changes: 78 additions & 4 deletions src/control_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
#define SELECT_COLUMN_FAMILIES "SELECT * FROM system.schema_columnfamilies"
#define SELECT_COLUMNS "SELECT * FROM system.schema_columns"
#define SELECT_USERTYPES "SELECT * FROM system.schema_usertypes"
#define SELECT_FUNCTIONS "SELECT * FROM system.schema_functions"
#define SELECT_AGGREGATES "SELECT * FROM system.schema_aggregates"

namespace cass {

Expand Down Expand Up @@ -312,6 +314,13 @@ void ControlConnection::on_event(EventResponse* response) {
case EventResponse::TYPE:
refresh_type(response->keyspace(), response->target());
break;
case EventResponse::FUNCTION:
case EventResponse::AGGREGATE:
refresh_function(response->keyspace(),
response->target(),
response->arg_types(),
response->schema_change() == EventResponse::AGGREGATE);
break;
}
break;

Expand All @@ -326,7 +335,7 @@ void ControlConnection::on_event(EventResponse* response) {
break;
case EventResponse::TYPE:
session_->metadata().drop_user_type(response->keyspace().to_string(),
response->target().to_string());
response->target().to_string());
break;
}
break;
Expand Down Expand Up @@ -355,6 +364,10 @@ void ControlConnection::query_meta_all() {
if (protocol_version_ >= 3) {
handler->execute_query(SELECT_USERTYPES);
}
if (protocol_version_ >= 4) {
handler->execute_query(SELECT_FUNCTIONS);
handler->execute_query(SELECT_AGGREGATES);
}
}
}

Expand Down Expand Up @@ -442,7 +455,10 @@ void ControlConnection::on_query_meta_all(ControlConnection* control_connection,
if (control_connection->protocol_version_ >= 3) {
session->metadata().update_user_types(static_cast<ResultResponse*>(responses[5].get()));
}

if (control_connection->protocol_version_ >= 4) {
session->metadata().update_functions(static_cast<ResultResponse*>(responses[6].get()));
session->metadata().update_aggregates(static_cast<ResultResponse*>(responses[7].get()));
}
session->metadata().swap_to_back_and_update_front();
}

Expand Down Expand Up @@ -698,14 +714,72 @@ void ControlConnection::on_refresh_type(ControlConnection* control_connection,
Response* response) {
ResultResponse* result = static_cast<ResultResponse*>(response);
if (result->row_count() == 0) {
LOG_ERROR("No row found for keyspace %s and type %s in system schema table.",
LOG_ERROR("No row found for keyspace %s and type %s in system schema.",
keyspace_and_type_names.first.c_str(),
keyspace_and_type_names.first.c_str());
keyspace_and_type_names.second.c_str());
return;
}
control_connection->session_->metadata().update_user_types(result);
}

void ControlConnection::refresh_function(const StringRef& keyspace_name,
const StringRef& function_name,
const StringRefVec& arg_types,
bool is_aggregate) {

std::string query;
if (is_aggregate) {
query.assign(SELECT_AGGREGATES);
query.append(" WHERE keyspace_name=? AND aggregate_name=? AND signature=?");
} else {
query.assign(SELECT_FUNCTIONS);
query.append(" WHERE keyspace_name=? AND function_name=? AND signature=?");
}

LOG_DEBUG("Refreshing %s %s in keyspace %s",
is_aggregate ? "aggregate" : "function",
Metadata::full_function_name(function_name, arg_types).c_str(),
std::string(keyspace_name.data(), keyspace_name.length()).c_str());

SharedRefPtr<QueryRequest> request(new QueryRequest(query, 3));
SharedRefPtr<Collection> signature(new Collection(CASS_COLLECTION_TYPE_LIST, arg_types.size()));

for (StringRefVec::const_iterator i = arg_types.begin(),
end = arg_types.end();
i != end;
++i) {
signature->append(CassString(i->data(), i->size()));
}

request->set(0, CassString(keyspace_name.data(), keyspace_name.size()));
request->set(1, CassString(function_name.data(), function_name.size()));
request->set(2, signature.get());

connection_->write(
new ControlHandler<RefreshFunctionData>(request.get(),
this,
ControlConnection::on_refresh_function,
RefreshFunctionData(keyspace_name, function_name, arg_types, is_aggregate)));
}

void ControlConnection::on_refresh_function(ControlConnection* control_connection,
const RefreshFunctionData& data,
Response* response) {
ResultResponse* result = static_cast<ResultResponse*>(response);
if (result->row_count() == 0) {
LOG_ERROR("No row found for keyspace %s and %s %s",
data.keyspace.c_str(),
data.is_aggregate ? "aggregate" : "function",
Metadata::full_function_name(data.function, data.arg_types_as_string_refs()).c_str());
return;
}
if (data.is_aggregate) {
control_connection->session_->metadata().update_aggregates(result);
} else {
control_connection->session_->metadata().update_functions(result);
}
}

bool ControlConnection::handle_query_invalid_response(Response* response) {
if (check_error_or_invalid_response("ControlConnection", CQL_OPCODE_RESULT,
response)) {
Expand Down
44 changes: 43 additions & 1 deletion src/control_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,40 @@ class ControlConnection : public Connection::Listener {
bool is_new_node;
};

struct RefreshFunctionData {
typedef std::vector<std::string> StringVec;

RefreshFunctionData(StringRef keyspace,
StringRef function,
const StringRefVec& arg_types,
bool is_aggregate)
: keyspace(keyspace.to_string())
, function(function.to_string())
, is_aggregate(is_aggregate) {
this->arg_types.reserve(arg_types.size());
for (StringRefVec::const_iterator i = arg_types.begin(),
end = arg_types.end();
i != end; ++i) {
this->arg_types.push_back(i->to_string());
}
}

StringRefVec arg_types_as_string_refs() const {
StringRefVec string_refs;
for (StringVec::const_iterator i = arg_types.begin(),
end = arg_types.end();
i != end; ++i) {
string_refs.push_back(StringRef(*i));
}
return string_refs;
}

std::string keyspace;
std::string function;
StringVec arg_types;
bool is_aggregate;
};

void schedule_reconnect(uint64_t ms = 0);
void reconnect(bool retry_current_host);

Expand Down Expand Up @@ -196,11 +230,19 @@ class ControlConnection : public Connection::Listener {
const MultipleRequestHandler::ResponseVec& responses);

void refresh_type(const StringRef& keyspace_name,
const StringRef& type_name);
const StringRef& type_name);
static void on_refresh_type(ControlConnection* control_connection,
const std::pair<std::string, std::string>& keyspace_and_type_names,
Response* response);

void refresh_function(const StringRef& keyspace_name,
const StringRef& function_name,
const StringRefVec& arg_types,
bool is_aggregate);
static void on_refresh_function(ControlConnection* control_connection,
const RefreshFunctionData& data,
Response* response);

private:
State state_;
Session* session_;
Expand Down
88 changes: 71 additions & 17 deletions src/data_type.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ inline bool equals_both_not_empty(const std::string& s1,

class DataType : public RefCounted<DataType> {
public:
typedef SharedRefPtr<const DataType> Ptr;
typedef std::vector<Ptr> Vec;

static const SharedRefPtr<const DataType> NIL;

DataType(CassValueType value_type)
Expand Down Expand Up @@ -112,6 +115,36 @@ class DataType : public RefCounted<DataType> {
return new DataType(value_type_);
}

virtual std::string to_string() const {
switch (value_type_) {
case CASS_VALUE_TYPE_ASCII: return "ascii";
case CASS_VALUE_TYPE_BIGINT: return "bigint";
case CASS_VALUE_TYPE_BLOB: return "blob";
case CASS_VALUE_TYPE_BOOLEAN: return "boolean";
case CASS_VALUE_TYPE_COUNTER: return "counter";
case CASS_VALUE_TYPE_DECIMAL: return "decimal";
case CASS_VALUE_TYPE_DOUBLE: return "double";
case CASS_VALUE_TYPE_FLOAT: return "float";
case CASS_VALUE_TYPE_INT: return "int";
case CASS_VALUE_TYPE_TEXT: return "text";
case CASS_VALUE_TYPE_TIMESTAMP: return "timestamp";
case CASS_VALUE_TYPE_UUID: return "uuid";
case CASS_VALUE_TYPE_VARCHAR: return "varchar";
case CASS_VALUE_TYPE_VARINT: return "varint";
case CASS_VALUE_TYPE_TIMEUUID: return "timeuuid";
case CASS_VALUE_TYPE_INET: return "inet";
case CASS_VALUE_TYPE_DATE: return "date";
case CASS_VALUE_TYPE_TIME: return "time";
case CASS_VALUE_TYPE_SMALL_INT: return "smallint";
case CASS_VALUE_TYPE_TINY_INT: return "tinyint";
case CASS_VALUE_TYPE_LIST: return "list";
case CASS_VALUE_TYPE_MAP: return "map";
case CASS_VALUE_TYPE_SET: return "set";
case CASS_VALUE_TYPE_TUPLE: return "tuple";
default: return "";
}
}

private:
int protocol_version_;
CassValueType value_type_;
Expand All @@ -120,8 +153,6 @@ class DataType : public RefCounted<DataType> {
DISALLOW_COPY_AND_ASSIGN(DataType);
};

typedef std::vector<SharedRefPtr<const DataType> > DataTypeVec;

class CustomType : public DataType {
public:
CustomType()
Expand Down Expand Up @@ -150,6 +181,10 @@ class CustomType : public DataType {
return new CustomType(class_name_);
}

virtual std::string to_string() const {
return class_name_;
}

private:
std::string class_name_;
};
Expand All @@ -159,15 +194,29 @@ class SubTypesDataType : public DataType {
SubTypesDataType(CassValueType type)
: DataType(type) { }

SubTypesDataType(CassValueType type, const DataTypeVec& types)
SubTypesDataType(CassValueType type, const DataType::Vec& types)
: DataType(type)
, types_(types) { }

DataTypeVec& types() { return types_; }
const DataTypeVec& types() const { return types_; }
DataType::Vec& types() { return types_; }
const DataType::Vec& types() const { return types_; }

virtual std::string to_string() const {
std::string str(DataType::to_string());
str.push_back('<');
bool first = true;
for (DataType::Vec::const_iterator i = types_.begin(),
end = types_.end();
i != end; ++i) {
if (!first) str.append(", ");
str.append((*i)->to_string());
}
str.push_back('>');
return str;
}

protected:
DataTypeVec types_;
DataType::Vec types_;
};

class CollectionType : public SubTypesDataType {
Expand All @@ -181,7 +230,7 @@ class CollectionType : public SubTypesDataType {
types_.reserve(types_count);
}

CollectionType(CassValueType collection_type, const DataTypeVec& types)
CollectionType(CassValueType collection_type, const DataType::Vec& types)
: SubTypesDataType(collection_type, types) { }

virtual bool equals(const SharedRefPtr<const DataType>& data_type) const {
Expand Down Expand Up @@ -215,23 +264,24 @@ class CollectionType : public SubTypesDataType {
}

public:
static SharedRefPtr<DataType> list(SharedRefPtr<DataType> element_type) {
DataTypeVec types;
static SharedRefPtr<const DataType> list(SharedRefPtr<const DataType> element_type) {
DataType::Vec types;
types.push_back(element_type);
return SharedRefPtr<DataType>(new CollectionType(CASS_VALUE_TYPE_LIST, types));
return SharedRefPtr<const DataType>(new CollectionType(CASS_VALUE_TYPE_LIST, types));
}

static SharedRefPtr<DataType> set(SharedRefPtr<DataType> element_type) {
DataTypeVec types;
static SharedRefPtr<const DataType> set(SharedRefPtr<const DataType> element_type) {
DataType::Vec types;
types.push_back(element_type);
return SharedRefPtr<DataType>(new CollectionType(CASS_VALUE_TYPE_SET, types));
return SharedRefPtr<const DataType>(new CollectionType(CASS_VALUE_TYPE_SET, types));
}

static SharedRefPtr<DataType> map(SharedRefPtr<DataType> key_type, SharedRefPtr<DataType> value_type) {
DataTypeVec types;
static SharedRefPtr<const DataType> map(SharedRefPtr<const DataType> key_type,
SharedRefPtr<const DataType> value_type) {
DataType::Vec types;
types.push_back(key_type);
types.push_back(value_type);
return SharedRefPtr<DataType>(new CollectionType(CASS_VALUE_TYPE_MAP, types));
return SharedRefPtr<const DataType>(new CollectionType(CASS_VALUE_TYPE_MAP, types));
}
};

Expand All @@ -240,7 +290,7 @@ class TupleType : public SubTypesDataType {
TupleType()
: SubTypesDataType(CASS_VALUE_TYPE_TUPLE) { }

TupleType(const DataTypeVec& types)
TupleType(const DataType::Vec& types)
: SubTypesDataType(CASS_VALUE_TYPE_TUPLE, types) { }

virtual bool equals(const SharedRefPtr<const DataType>& data_type) const {
Expand Down Expand Up @@ -359,6 +409,10 @@ class UserType : public DataType {
return new UserType(keyspace_, type_name_, fields_.entries());
}

virtual std::string to_string() const {
return type_name_;
}

private:
std::string keyspace_;
std::string type_name_;
Expand Down
Loading

0 comments on commit 987feaa

Please sign in to comment.