Skip to content

Commit

Permalink
Added integration tests for parition/clustering key columns
Browse files Browse the repository at this point in the history
  • Loading branch information
mpenick committed Oct 27, 2015
1 parent 38a3174 commit a8f39a7
Show file tree
Hide file tree
Showing 3 changed files with 250 additions and 27 deletions.
119 changes: 98 additions & 21 deletions src/metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,23 @@ namespace cass {
template <class T>
const T& as_const(const T& x) { return x; }

struct ColumnCompare {
typedef CassColumnMeta::Ptr argument_type;
bool operator()(const CassColumnMeta::Ptr& a, const CassColumnMeta::Ptr& b) const {
bool result;
if (a->type() == b->type()) {
result = (a->type() == CASS_COLUMN_TYPE_PARTITION_KEY ||
a->type() == CASS_COLUMN_TYPE_CLUSTERING_KEY) &&
a->position() < b->position();
} else {
result = a->type() == CASS_COLUMN_TYPE_PARTITION_KEY ||
(a->type() == CASS_COLUMN_TYPE_CLUSTERING_KEY &&
b->type() != CASS_COLUMN_TYPE_PARTITION_KEY);
}
return result;
}
};

const KeyspaceMetadata* Metadata::SchemaSnapshot::get_keyspace(const std::string& name) const {
KeyspaceMetadata::Map::const_iterator i = keyspaces_->find(name);
if (i == keyspaces_->end()) return NULL;
Expand Down Expand Up @@ -957,7 +974,12 @@ size_t get_column_count(const ColumnMetadata::Vec& columns, CassColumnType type)
return count;
}

void TableMetadata::build_keys(const VersionNumber& cassandra_version) {
void TableMetadata::build_keys_and_sort(const VersionNumber& cassandra_version) {
// Also, Reorders columns so that the order is:
// 1) Parition key
// 2) Clustering keys
// 3) Other columns

if (cassandra_version.major() >= 2) {
partition_key_.resize(get_column_count(columns_, CASS_COLUMN_TYPE_PARTITION_KEY));
clustering_key_.resize(get_column_count(columns_, CASS_COLUMN_TYPE_CLUSTERING_KEY));
Expand All @@ -974,7 +996,12 @@ void TableMetadata::build_keys(const VersionNumber& cassandra_version) {
clustering_key_[column->position()] = column;
}
}

std::stable_sort(columns_.begin(), columns_.end(), ColumnCompare());
} else {
// Cassandra 1.2 requires a lot more work because "system.schema_columns" only
// contains regular columns.

// Partition key
{
StringRefVec key_aliases;
Expand All @@ -1001,6 +1028,7 @@ void TableMetadata::build_keys(const VersionNumber& cassandra_version) {
key_alias = ss.str();
}
partition_key_.push_back(ColumnMetadata::Ptr(new ColumnMetadata(key_alias,
partition_key_.size(),
CASS_COLUMN_TYPE_PARTITION_KEY,
key_validator->types()[i])));
}
Expand Down Expand Up @@ -1041,10 +1069,22 @@ void TableMetadata::build_keys(const VersionNumber& cassandra_version) {
column_alias = ss.str();
}
clustering_key_.push_back(ColumnMetadata::Ptr(new ColumnMetadata(column_alias,
clustering_key_.size(),
CASS_COLUMN_TYPE_CLUSTERING_KEY,
comparator->types()[i])));
}
}

// TODO: Handle value alias column

ColumnMetadata::Vec columns(partition_key_.size() + clustering_key_.size() + columns_.size());

ColumnMetadata::Vec::iterator pos = columns.begin();
pos = std::copy(partition_key_.begin(), partition_key_.end(), pos);
pos = std::copy(clustering_key_.begin(), clustering_key_.end(), pos);
std::copy(columns_.begin(), columns_.end(), pos);

columns_.swap(columns);
}
}

Expand Down Expand Up @@ -1165,8 +1205,8 @@ AggregateMetadata::AggregateMetadata(const std::string& name, const Value* signa
value = add_field(buffer, row, "final_func");
if (value != NULL &&
value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
std::string state_type_string = state_type_->to_string();
StringVec final_func_signature(1, state_type_string);
StringVec final_func_signature;
final_func_signature.push_back(state_type_->to_string());
std::string full_final_func_name(Metadata::full_function_name(value->to_string(), final_func_signature));
FunctionMetadata::Map::const_iterator i = functions.find(full_final_func_name);
if (i != functions.end()) final_func_ = i->second;
Expand All @@ -1176,8 +1216,7 @@ AggregateMetadata::AggregateMetadata(const std::string& name, const Value* signa
if (value != NULL &&
value->value_type() == CASS_VALUE_TYPE_VARCHAR) {
StringVec state_func_signature;
std::string state_type_string = state_type_->to_string();
state_func_signature.push_back(state_type_string);
state_func_signature.push_back(state_type_->to_string());
CollectionIterator iterator(signature);
while (iterator.next()) {
state_func_signature.push_back(iterator.value()->to_string());
Expand Down Expand Up @@ -1283,6 +1322,7 @@ void Metadata::InternalData::update_tables(int version, const VersionNumber& cas

std::string keyspace_name;
std::string columnfamily_name;
KeyspaceMetadata* keyspace;

while (rows.next()) {
std::string temp_keyspace_name;
Expand All @@ -1293,10 +1333,12 @@ void Metadata::InternalData::update_tables(int version, const VersionNumber& cas
LOG_ERROR("Unable to get column value for 'keyspace_name' or 'columnfamily_name'");
continue;
}

if (keyspace_name != temp_keyspace_name) {
keyspace_name = temp_keyspace_name;
keyspace = get_or_create_keyspace(keyspace_name);
}
KeyspaceMetadata* keyspace = get_or_create_keyspace(keyspace_name);

keyspace->add_table(TableMetadata::Ptr(new TableMetadata(columnfamily_name, version, buffer, row)));
}

Expand All @@ -1307,17 +1349,25 @@ void Metadata::InternalData::update_user_types(ResultResponse* result) {
result->decode_first_row();
ResultIterator rows(result);

std::string keyspace_name;
KeyspaceMetadata* keyspace;

while (rows.next()) {
std::string keyspace_name;
std::string temp_keyspace_name;
std::string type_name;
const Row* row = rows.row();

if (!row->get_string_by_name("keyspace_name", &keyspace_name) ||
if (!row->get_string_by_name("keyspace_name", &temp_keyspace_name) ||
!row->get_string_by_name("type_name", &type_name)) {
LOG_ERROR("Unable to get column value for 'keyspace_name' or 'type_name'");
continue;
}

if (keyspace_name != temp_keyspace_name) {
keyspace_name = temp_keyspace_name;
keyspace = get_or_create_keyspace(keyspace_name);
}

const Value* names_value = row->get_by_name("field_names");
if (names_value == NULL || names_value->is_null()) {
LOG_ERROR("'field_name's column for keyspace \"%s\" and type \"%s\" is null",
Expand Down Expand Up @@ -1367,7 +1417,7 @@ void Metadata::InternalData::update_user_types(ResultResponse* result) {
fields.push_back(UserType::Field(field_name, data_type));
}

get_or_create_keyspace(keyspace_name)->add_user_type(
keyspace->add_user_type(
SharedRefPtr<UserType>(new UserType(keyspace_name, type_name, fields)));
}
}
Expand All @@ -1378,21 +1428,29 @@ void Metadata::InternalData::update_functions(ResultResponse* result) {
result->decode_first_row();
ResultIterator rows(result);

std::string keyspace_name;
KeyspaceMetadata* keyspace;

while (rows.next()) {
std::string keyspace_name;
std::string temp_keyspace_name;
std::string function_name;
const Row* row = rows.row();

const Value* signature = row->get_by_name("signature");
if (!row->get_string_by_name("keyspace_name", &keyspace_name) ||
if (!row->get_string_by_name("keyspace_name", &temp_keyspace_name) ||
!row->get_string_by_name("function_name", &function_name) ||
signature == NULL) {
LOG_ERROR("Unable to get column value for 'keyspace_name', 'function_name' or 'signature'");
continue;
}

get_or_create_keyspace(keyspace_name)->add_function(FunctionMetadata::Ptr(new FunctionMetadata(function_name,
signature, buffer, row)));
if (keyspace_name != temp_keyspace_name) {
keyspace_name = temp_keyspace_name;
keyspace = get_or_create_keyspace(keyspace_name);
}

keyspace->add_function(FunctionMetadata::Ptr(new FunctionMetadata(function_name,
signature, buffer, row)));

}
}
Expand All @@ -1403,8 +1461,11 @@ void Metadata::InternalData::update_aggregates(int version, ResultResponse* resu
result->decode_first_row();
ResultIterator rows(result);

std::string keyspace_name;
KeyspaceMetadata* keyspace;

while (rows.next()) {
std::string keyspace_name;
std::string temp_keyspace_name;
std::string aggregate_name;
const Row* row = rows.row();

Expand All @@ -1416,7 +1477,11 @@ void Metadata::InternalData::update_aggregates(int version, ResultResponse* resu
continue;
}

KeyspaceMetadata* keyspace = get_or_create_keyspace(keyspace_name);
if (keyspace_name != temp_keyspace_name) {
keyspace_name = temp_keyspace_name;
keyspace = get_or_create_keyspace(keyspace_name);
}

keyspace->add_aggregate(AggregateMetadata::Ptr(new AggregateMetadata(aggregate_name, signature,
keyspace->functions(),
version, buffer, row)));
Expand Down Expand Up @@ -1460,7 +1525,10 @@ void Metadata::InternalData::update_columns(int version, const VersionNumber& ca
std::string keyspace_name;
std::string columnfamily_name;
std::string column_name;

KeyspaceMetadata* keyspace;
TableMetadata::Ptr table;

while (rows.next()) {
std::string temp_keyspace_name;
std::string temp_columnfamily_name;
Expand All @@ -1473,19 +1541,28 @@ void Metadata::InternalData::update_columns(int version, const VersionNumber& ca
continue;
}

if (keyspace_name != temp_keyspace_name ||
columnfamily_name != temp_columnfamily_name) {
if (table) table->build_keys(cassandra_version);

if (keyspace_name != temp_keyspace_name) {
keyspace_name = temp_keyspace_name;
columnfamily_name = temp_columnfamily_name;
keyspace = get_or_create_keyspace(keyspace_name);
}

table = get_or_create_keyspace(keyspace_name)->get_or_create_table(columnfamily_name);
if (columnfamily_name != temp_columnfamily_name) {
// Build keys for the previous table
if (table) {
table->build_keys_and_sort(cassandra_version);
}
columnfamily_name = temp_columnfamily_name;
table = keyspace->get_or_create_table(columnfamily_name);
table->clear_columns();
}

table->add_column(ColumnMetadata::Ptr(new ColumnMetadata(column_name, version, buffer, row)));
}

// Build keys for the last table
if (table) {
table->build_keys_and_sort(cassandra_version);
}
}

KeyspaceMetadata* Metadata::InternalData::get_or_create_keyspace(const std::string& name) {
Expand Down
10 changes: 8 additions & 2 deletions src/metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ class MetadataBase {
std::string get_string_field(const std::string& name) const;
Iterator* iterator_fields() const { return new MetadataFieldIterator(fields_); }

void swap_fields(MetadataBase& meta) {
fields_.swap(meta.fields_);
}

protected:
const Value* add_field(const SharedRefPtr<RefBuffer>& buffer, const Row* row, const std::string& name);
void add_json_list_field(int version, const Row* row, const std::string& name);
Expand Down Expand Up @@ -254,11 +258,12 @@ class ColumnMetadata : public MetadataBase, public RefCounted<ColumnMetadata> {
, is_reversed_(false) { }

ColumnMetadata(const std::string& name,
int32_t position,
CassColumnType type,
const SharedRefPtr<const DataType>& data_type)
: MetadataBase(name)
, type_(type)
, position_(0)
, position_(position)
, data_type_(data_type)
, is_reversed_(false) { }

Expand All @@ -270,6 +275,7 @@ class ColumnMetadata : public MetadataBase, public RefCounted<ColumnMetadata> {
const SharedRefPtr<const DataType>& data_type() const { return data_type_; }
bool is_reversed() const { return is_reversed_; }


private:
CassColumnType type_;
int32_t position_;
Expand Down Expand Up @@ -308,7 +314,7 @@ class TableMetadata : public MetadataBase, public RefCounted<TableMetadata> {
const ColumnMetadata::Ptr& get_or_create_column(const std::string& name);
void add_column(const ColumnMetadata::Ptr& column);
void clear_columns();
void build_keys(const VersionNumber& cassandra_version);
void build_keys_and_sort(const VersionNumber& cassandra_version);
void key_aliases(KeyAliases* output) const;

private:
Expand Down
Loading

0 comments on commit a8f39a7

Please sign in to comment.