Skip to content

Commit

Permalink
Complete the multi-key judgment of a pair of simple commands
Browse files Browse the repository at this point in the history
  • Loading branch information
Mixficsol committed Mar 27, 2024
1 parent e8b0629 commit 79a34f8
Show file tree
Hide file tree
Showing 14 changed files with 283 additions and 260 deletions.
3 changes: 3 additions & 0 deletions src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ void CmdRes::SetRes(CmdRes::CmdRet _ret, const std::string& content) {
case kInvalidCursor:
AppendStringRaw("-ERR invalid cursor");
break;
case kmultikey:
AppendStringRaw("-WRONGTYPE Operation against a key holding the wrong kind of value");
break;
default:
break;
}
Expand Down
1 change: 1 addition & 0 deletions src/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class CmdRes {
kErrOther,
KIncrByOverFlow,
kInvalidCursor,
kmultikey,
};

CmdRes() = default;
Expand Down
3 changes: 2 additions & 1 deletion src/cmd_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@ void HSetCmd::DoCmd(PClient* client) {
s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->HSet(client->Key(), field, value, &temp);
if (s.ok()) {
ret += temp;
} else if (s.IsInvalidArgument()) {
client->SetRes(CmdRes::kmultikey);
} else {
// FIXME(century): need txn, if bw crashes, it should rollback
client->SetRes(CmdRes::kErrOther);
return;
}
}

client->AppendInteger(ret);
}

Expand Down
15 changes: 6 additions & 9 deletions src/storage/include/storage/storage_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,12 @@ const int kTimestampLength = 8;

enum ColumnFamilyIndex {
kStringsCF = 0,
kHashesMetaCF = 1,
kHashesDataCF = 2,
kSetsMetaCF = 3,
kSetsDataCF = 4,
kListsMetaCF = 5,
kListsDataCF = 6,
kZsetsMetaCF = 7,
kZsetsDataCF = 8,
kZsetsScoreCF = 9,
kHashesDataCF = 1,
kSetsDataCF = 2,
kListsDataCF = 3,
kZsetsDataCF = 4,
kZsetsScoreCF = 5,
kMetaCF = 6,
};

const static char kNeedTransformCharacter = '\u0000';
Expand Down
2 changes: 2 additions & 0 deletions src/storage/src/base_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,5 +166,7 @@ using ZSetsMetaFilterFactory = BaseMetaFilterFactory;
using ZSetsDataFilter = BaseDataFilter;
using ZSetsDataFilterFactory = BaseDataFilterFactory;

using MetaFilter = BaseMetaFilter;
using MetaFilterFactory = BaseMetaFilterFactory;
} // namespace storage
#endif // SRC_BASE_FILTER_H_
19 changes: 15 additions & 4 deletions src/storage/src/base_meta_value_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,26 @@ class ParsedBaseMetaValue : public ParsedInternalValue {
explicit ParsedBaseMetaValue(std::string* internal_value_str) : ParsedInternalValue(internal_value_str) {
if (internal_value_str->size() >= kBaseMetaValueSuffixLength) {
int offset = 0;
user_value_ = Slice(internal_value_str->data(), internal_value_str->size() - kBaseMetaValueSuffixLength);
type_ = Slice(internal_value_str->data(), 1);
offset += 1;
// std::cout << "type: " << type_.ToStringView() << std::endl;
user_value_ = Slice(internal_value_str->data() + 1, internal_value_str->size() - kBaseMetaValueSuffixLength - 1);
// std::cout << "user_value: " << user_value_.ToStringView() << std::endl;
// std::cout << "user_value_size: " << user_value_.size() << std::endl;
offset += user_value_.size();
version_ = DecodeFixed64(internal_value_str->data() + offset);
// std::cout << "version:" << version_ << std::endl;
offset += sizeof(version_);
memcpy(reserve_, internal_value_str->data() + offset, sizeof(reserve_));
offset += sizeof(reserve_);
ctime_ = DecodeFixed64(internal_value_str->data() + offset);
// std::cout << "ctime: " << ctime_ << std::endl;
offset += sizeof(ctime_);
etime_ = DecodeFixed64(internal_value_str->data() + offset);
// std::cout << "etime: " << etime_ << std::endl;
}
count_ = DecodeFixed32(internal_value_str->data());
count_ = DecodeFixed32(internal_value_str->data() + 1);
// std::cout << "count: " << count_ << std::endl;
}

// Use this constructor in rocksdb::CompactionFilter::Filter();
Expand Down Expand Up @@ -134,11 +143,13 @@ class ParsedBaseMetaValue : public ParsedInternalValue {

int32_t Count() { return count_; }

bool IsType(Slice c) { return type_.ToStringView() == c.ToStringView(); }

void SetCount(int32_t count) {
count_ = count;
if (value_) {
char* dst = const_cast<char*>(value_->data());
EncodeFixed32(dst, count_);
EncodeFixed32(dst + 1, count_);
}
}

Expand All @@ -155,7 +166,7 @@ class ParsedBaseMetaValue : public ParsedInternalValue {
count_ += delta;
if (value_) {
char* dst = const_cast<char*>(value_->data());
EncodeFixed32(dst, count_);
EncodeFixed32(dst + 1, count_);
}
}

Expand Down
1 change: 1 addition & 0 deletions src/storage/src/base_value_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ class ParsedInternalValue {
virtual void SetEtimeToValue() = 0;
virtual void SetCtimeToValue() = 0;
std::string* value_ = nullptr;
Slice type_;
Slice user_value_;
uint64_t version_ = 0;
uint64_t ctime_ = 0;
Expand Down
9 changes: 9 additions & 0 deletions src/storage/src/coding.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ namespace storage {
static const bool kLittleEndian = STORAGE_PLATFORM_IS_LITTLE_ENDIAN;
#undef STORAGE_PLATFORM_IS_LITTLE_ENDIAN

inline void EncodeFixed8(char* buf, char type) {
if (kLittleEndian) {
memcpy(buf, &type, sizeof(type));
} else {
uint8_t converted_value = static_cast<uint8_t>(type);
buf[0] = converted_value;
}
}

inline void EncodeFixed32(char* buf, uint32_t value) {
if (kLittleEndian) {
memcpy(buf, &value, sizeof(value));
Expand Down
55 changes: 19 additions & 36 deletions src/storage/src/redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ Status Redis::Open(const StorageOptions& storage_options, const std::string& db_
rocksdb::DBOptions db_ops(storage_options.options);
db_ops.create_missing_column_families = true;

// Meta column-famil options
rocksdb::ColumnFamilyOptions meta_cf_ops(storage_options.options);
rocksdb::BlockBasedTableOptions meta_cf_table_ops(table_ops);
meta_cf_ops.compaction_filter_factory = std::make_shared<MetaFilterFactory>();
if (!storage_options.share_block_cache && (storage_options.block_cache_size > 0)) {
meta_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
}
meta_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(meta_cf_table_ops));
// string column-family options
rocksdb::ColumnFamilyOptions string_cf_ops(storage_options.options);
string_cf_ops.compaction_filter_factory = std::make_shared<StringsFilterFactory>();
Expand All @@ -71,86 +79,61 @@ Status Redis::Open(const StorageOptions& storage_options, const std::string& db_
string_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(string_table_ops));

// hash column-family options
rocksdb::ColumnFamilyOptions hash_meta_cf_ops(storage_options.options);
rocksdb::ColumnFamilyOptions hash_data_cf_ops(storage_options.options);
hash_meta_cf_ops.compaction_filter_factory = std::make_shared<HashesMetaFilterFactory>();
hash_data_cf_ops.compaction_filter_factory =
std::make_shared<HashesDataFilterFactory>(&db_, &handles_, kHashesMetaCF);
rocksdb::BlockBasedTableOptions hash_meta_cf_table_ops(table_ops);
hash_data_cf_ops.compaction_filter_factory = std::make_shared<HashesDataFilterFactory>(&db_, &handles_, kMetaCF);
rocksdb::BlockBasedTableOptions hash_data_cf_table_ops(table_ops);
if (!storage_options.share_block_cache && (storage_options.block_cache_size > 0)) {
hash_meta_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
hash_data_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
}
hash_meta_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(hash_meta_cf_table_ops));
hash_data_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(hash_data_cf_table_ops));

// list column-family options
rocksdb::ColumnFamilyOptions list_meta_cf_ops(storage_options.options);
rocksdb::ColumnFamilyOptions list_data_cf_ops(storage_options.options);
list_meta_cf_ops.compaction_filter_factory = std::make_shared<ListsMetaFilterFactory>();
list_data_cf_ops.compaction_filter_factory = std::make_shared<ListsDataFilterFactory>(&db_, &handles_, kListsMetaCF);
list_data_cf_ops.compaction_filter_factory = std::make_shared<ListsDataFilterFactory>(&db_, &handles_, kMetaCF);
list_data_cf_ops.comparator = ListsDataKeyComparator();
rocksdb::BlockBasedTableOptions list_meta_cf_table_ops(table_ops);
rocksdb::BlockBasedTableOptions list_data_cf_table_ops(table_ops);
if (!storage_options.share_block_cache && (storage_options.block_cache_size > 0)) {
list_meta_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
list_data_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
}
list_meta_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(list_meta_cf_table_ops));
list_data_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(list_data_cf_table_ops));

// set column-family options
rocksdb::ColumnFamilyOptions set_meta_cf_ops(storage_options.options);
rocksdb::ColumnFamilyOptions set_data_cf_ops(storage_options.options);
set_meta_cf_ops.compaction_filter_factory = std::make_shared<SetsMetaFilterFactory>();
set_data_cf_ops.compaction_filter_factory = std::make_shared<SetsMemberFilterFactory>(&db_, &handles_, kSetsMetaCF);
rocksdb::BlockBasedTableOptions set_meta_cf_table_ops(table_ops);
set_data_cf_ops.compaction_filter_factory = std::make_shared<SetsMemberFilterFactory>(&db_, &handles_, kMetaCF);
rocksdb::BlockBasedTableOptions set_data_cf_table_ops(table_ops);
if (!storage_options.share_block_cache && (storage_options.block_cache_size > 0)) {
set_meta_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
set_data_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
}
set_meta_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(set_meta_cf_table_ops));
set_data_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(set_data_cf_table_ops));

// zset column-family options
rocksdb::ColumnFamilyOptions zset_meta_cf_ops(storage_options.options);
rocksdb::ColumnFamilyOptions zset_data_cf_ops(storage_options.options);
rocksdb::ColumnFamilyOptions zset_score_cf_ops(storage_options.options);
zset_meta_cf_ops.compaction_filter_factory = std::make_shared<ZSetsMetaFilterFactory>();
zset_data_cf_ops.compaction_filter_factory = std::make_shared<ZSetsDataFilterFactory>(&db_, &handles_, kZsetsMetaCF);
zset_score_cf_ops.compaction_filter_factory =
std::make_shared<ZSetsScoreFilterFactory>(&db_, &handles_, kZsetsMetaCF);
zset_data_cf_ops.compaction_filter_factory = std::make_shared<ZSetsDataFilterFactory>(&db_, &handles_, kMetaCF);
zset_score_cf_ops.compaction_filter_factory = std::make_shared<ZSetsScoreFilterFactory>(&db_, &handles_, kMetaCF);
zset_score_cf_ops.comparator = ZSetsScoreKeyComparator();

rocksdb::BlockBasedTableOptions zset_meta_cf_table_ops(table_ops);
rocksdb::BlockBasedTableOptions zset_data_cf_table_ops(table_ops);
rocksdb::BlockBasedTableOptions zset_score_cf_table_ops(table_ops);
if (!storage_options.share_block_cache && (storage_options.block_cache_size > 0)) {
zset_meta_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
zset_data_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
zset_meta_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
}
zset_meta_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(zset_meta_cf_table_ops));
zset_data_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(zset_data_cf_table_ops));
zset_score_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(zset_score_cf_table_ops));

std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
column_families.emplace_back(rocksdb::kDefaultColumnFamilyName, string_cf_ops);
// hash CF
column_families.emplace_back("hash_meta_cf", hash_meta_cf_ops);
column_families.emplace_back("hash_data_cf", hash_data_cf_ops);
// set CF
column_families.emplace_back("set_meta_cf", set_meta_cf_ops);
column_families.emplace_back("set_data_cf", set_data_cf_ops);
// list CF
column_families.emplace_back("list_meta_cf", list_meta_cf_ops);
column_families.emplace_back("list_data_cf", list_data_cf_ops);
// zset CF
column_families.emplace_back("zset_meta_cf", zset_meta_cf_ops);
column_families.emplace_back("zset_data_cf", zset_data_cf_ops);
column_families.emplace_back("zset_score_cf", zset_score_cf_ops);
// meta CF
column_families.emplace_back("meta_cf", meta_cf_ops);
return rocksdb::DB::Open(db_ops, db_path, column_families, &handles_, &db_);
}

Expand Down Expand Up @@ -194,31 +177,31 @@ Status Redis::CompactRange(const DataType& dtype, const rocksdb::Slice* begin, c
break;
case DataType::kHashes:
if (type == kMeta || type == kMetaAndData) {
s = db_->CompactRange(default_compact_range_options_, handles_[kHashesMetaCF], begin, end);
s = db_->CompactRange(default_compact_range_options_, handles_[kMetaCF], begin, end);
}
if (s.ok() && (type == kData || type == kMetaAndData)) {
s = db_->CompactRange(default_compact_range_options_, handles_[kHashesDataCF], begin, end);
}
break;
case DataType::kSets:
if (type == kMeta || type == kMetaAndData) {
db_->CompactRange(default_compact_range_options_, handles_[kSetsMetaCF], begin, end);
db_->CompactRange(default_compact_range_options_, handles_[kMetaCF], begin, end);
}
if (s.ok() && (type == kData || type == kMetaAndData)) {
db_->CompactRange(default_compact_range_options_, handles_[kSetsDataCF], begin, end);
}
break;
case DataType::kLists:
if (type == kMeta || type == kMetaAndData) {
s = db_->CompactRange(default_compact_range_options_, handles_[kListsMetaCF], begin, end);
s = db_->CompactRange(default_compact_range_options_, handles_[kMetaCF], begin, end);
}
if (s.ok() && (type == kData || type == kMetaAndData)) {
s = db_->CompactRange(default_compact_range_options_, handles_[kListsDataCF], begin, end);
}
break;
case DataType::kZSets:
if (type == kMeta || type == kMetaAndData) {
db_->CompactRange(default_compact_range_options_, handles_[kZsetsMetaCF], begin, end);
db_->CompactRange(default_compact_range_options_, handles_[kMetaCF], begin, end);
}
if (s.ok() && (type == kData || type == kMetaAndData)) {
db_->CompactRange(default_compact_range_options_, handles_[kZsetsDataCF], begin, end);
Expand Down
10 changes: 5 additions & 5 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,19 +308,19 @@ class Redis {
options.iterate_upper_bound = upper_bound;
switch (type) {
case 'k':
return new StringsIterator(options, db_, handles_[kStringsCF], pattern);
return new StringsIterator(options, db_, handles_[kMetaCF], pattern);
break;
case 'h':
return new HashesIterator(options, db_, handles_[kHashesMetaCF], pattern);
return new HashesIterator(options, db_, handles_[kMetaCF], pattern);
break;
case 's':
return new SetsIterator(options, db_, handles_[kSetsMetaCF], pattern);
return new SetsIterator(options, db_, handles_[kMetaCF], pattern);
break;
case 'l':
return new ListsIterator(options, db_, handles_[kListsMetaCF], pattern);
return new ListsIterator(options, db_, handles_[kMetaCF], pattern);
break;
case 'z':
return new ZsetsIterator(options, db_, handles_[kZsetsMetaCF], pattern);
return new ZsetsIterator(options, db_, handles_[kMetaCF], pattern);
break;
default:
WARN("Invalid datatype to create iterator");
Expand Down
Loading

0 comments on commit 79a34f8

Please sign in to comment.