forked from scylladb/scylladb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvalidation.cc
80 lines (65 loc) · 2.3 KB
/
validation.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
/*
* Copyright (C) 2015-present ScyllaDB
*
* Modified by ScyllaDB
*/
/*
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
*/
#include "validation.hh"
#include "schema/schema.hh"
#include "keys.hh"
#include "data_dictionary/data_dictionary.hh"
#include "exceptions/exceptions.hh"
namespace validation {
/**
* Based on org.apache.cassandra.thrift.ThriftValidation#validate_key()
*/
std::optional<sstring> is_cql_key_invalid(const schema& schema, partition_key_view key) {
// C* validates here that the thrift key is not empty.
// It can only be empty if it is not composite and its only component in CQL form is empty.
if (schema.partition_key_size() == 1 && (*key.begin(schema)).empty()) {
return sstring("Key may not be empty");
}
// check that key can be handled by FBUtilities.writeShortByteArray
auto b = key.representation();
if (b.size() > max_key_size) {
return format("Key length of {:d} is longer than maximum of {:d}", b.size(), max_key_size);
}
try {
key.validate(schema);
} catch (const marshal_exception& e) {
return sstring(e.what());
}
return std::nullopt;
}
void
validate_cql_key(const schema& schema, partition_key_view key) {
if (const auto err = is_cql_key_invalid(schema, key); err) {
throw exceptions::invalid_request_exception(std::move(*err));
}
}
/**
* Based on org.apache.cassandra.thrift.ThriftValidation#validateColumnFamily(java.lang.String, java.lang.String)
*/
schema_ptr
validate_column_family(data_dictionary::database db, const sstring& keyspace_name, const sstring& cf_name) {
validate_keyspace(db, keyspace_name);
if (cf_name.empty()) {
throw exceptions::invalid_request_exception("non-empty table is required");
}
auto t = db.try_find_table(keyspace_name, cf_name);
if (!t) {
throw exceptions::invalid_request_exception(format("unconfigured table {}", cf_name));
}
return t->schema();
}
void validate_keyspace(data_dictionary::database db, const sstring& keyspace_name) {
if (keyspace_name.empty()) {
throw exceptions::invalid_request_exception("Keyspace not set");
}
if (!db.has_keyspace(keyspace_name)) {
throw exceptions::keyspace_not_defined_exception(format("Keyspace {} does not exist", keyspace_name));
}
}
}