Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: floyd supports one key for one data structure #240

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ void CmdRes::SetRes(CmdRes::CmdRet _ret, const std::string& content) {
case kInvalidCursor:
AppendStringRaw("-ERR invalid cursor");
break;
case kmultikey:
AppendStringRaw("-WRONGTYPE Operation against a key holding the wrong kind of value");
break;
default:
break;
}
Expand Down
1 change: 1 addition & 0 deletions src/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class CmdRes {
kErrOther,
KIncrByOverFlow,
kInvalidCursor,
kmultikey,
};

CmdRes() = default;
Expand Down
3 changes: 2 additions & 1 deletion src/cmd_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@ void HSetCmd::DoCmd(PClient* client) {
s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->HSet(client->Key(), field, value, &temp);
if (s.ok()) {
ret += temp;
} else if (s.IsInvalidArgument()) {
client->SetRes(CmdRes::kmultikey);
} else {
// FIXME(century): need txn, if bw crashes, it should rollback
client->SetRes(CmdRes::kErrOther);
return;
}
}

client->AppendInteger(ret);
}

Expand Down
15 changes: 6 additions & 9 deletions src/storage/include/storage/storage_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,12 @@ const int kTimestampLength = 8;

enum ColumnFamilyIndex {
kStringsCF = 0,
kHashesMetaCF = 1,
kHashesDataCF = 2,
kSetsMetaCF = 3,
kSetsDataCF = 4,
kListsMetaCF = 5,
kListsDataCF = 6,
kZsetsMetaCF = 7,
kZsetsDataCF = 8,
kZsetsScoreCF = 9,
kHashesDataCF = 1,
kSetsDataCF = 2,
kListsDataCF = 3,
kZsetsDataCF = 4,
kZsetsScoreCF = 5,
kMetaCF = 6,
};

const static char kNeedTransformCharacter = '\u0000';
Expand Down
2 changes: 2 additions & 0 deletions src/storage/src/base_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,5 +166,7 @@ using ZSetsMetaFilterFactory = BaseMetaFilterFactory;
using ZSetsDataFilter = BaseDataFilter;
using ZSetsDataFilterFactory = BaseDataFilterFactory;

using MetaFilter = BaseMetaFilter;
using MetaFilterFactory = BaseMetaFilterFactory;
} // namespace storage
#endif // SRC_BASE_FILTER_H_
19 changes: 15 additions & 4 deletions src/storage/src/base_meta_value_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,26 @@ class ParsedBaseMetaValue : public ParsedInternalValue {
explicit ParsedBaseMetaValue(std::string* internal_value_str) : ParsedInternalValue(internal_value_str) {
if (internal_value_str->size() >= kBaseMetaValueSuffixLength) {
int offset = 0;
user_value_ = Slice(internal_value_str->data(), internal_value_str->size() - kBaseMetaValueSuffixLength);
type_ = Slice(internal_value_str->data(), 1);
offset += 1;
// std::cout << "type: " << type_.ToStringView() << std::endl;
user_value_ = Slice(internal_value_str->data() + 1, internal_value_str->size() - kBaseMetaValueSuffixLength - 1);
// std::cout << "user_value: " << user_value_.ToStringView() << std::endl;
// std::cout << "user_value_size: " << user_value_.size() << std::endl;
offset += user_value_.size();
version_ = DecodeFixed64(internal_value_str->data() + offset);
// std::cout << "version:" << version_ << std::endl;
offset += sizeof(version_);
memcpy(reserve_, internal_value_str->data() + offset, sizeof(reserve_));
offset += sizeof(reserve_);
ctime_ = DecodeFixed64(internal_value_str->data() + offset);
// std::cout << "ctime: " << ctime_ << std::endl;
offset += sizeof(ctime_);
etime_ = DecodeFixed64(internal_value_str->data() + offset);
// std::cout << "etime: " << etime_ << std::endl;
}
count_ = DecodeFixed32(internal_value_str->data());
count_ = DecodeFixed32(internal_value_str->data() + 1);
// std::cout << "count: " << count_ << std::endl;
}

// Use this constructor in rocksdb::CompactionFilter::Filter();
Expand Down Expand Up @@ -134,11 +143,13 @@ class ParsedBaseMetaValue : public ParsedInternalValue {

int32_t Count() { return count_; }

bool IsType(Slice c) { return type_.ToStringView() == c.ToStringView(); }

void SetCount(int32_t count) {
count_ = count;
if (value_) {
char* dst = const_cast<char*>(value_->data());
EncodeFixed32(dst, count_);
EncodeFixed32(dst + 1, count_);
baerwang marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -155,7 +166,7 @@ class ParsedBaseMetaValue : public ParsedInternalValue {
count_ += delta;
if (value_) {
char* dst = const_cast<char*>(value_->data());
EncodeFixed32(dst, count_);
EncodeFixed32(dst + 1, count_);
}
}

Expand Down
1 change: 1 addition & 0 deletions src/storage/src/base_value_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ class ParsedInternalValue {
virtual void SetEtimeToValue() = 0;
virtual void SetCtimeToValue() = 0;
std::string* value_ = nullptr;
Slice type_;
Slice user_value_;
uint64_t version_ = 0;
uint64_t ctime_ = 0;
Expand Down
9 changes: 9 additions & 0 deletions src/storage/src/coding.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ namespace storage {
static const bool kLittleEndian = STORAGE_PLATFORM_IS_LITTLE_ENDIAN;
#undef STORAGE_PLATFORM_IS_LITTLE_ENDIAN

inline void EncodeFixed8(char* buf, char type) {
if (kLittleEndian) {
memcpy(buf, &type, sizeof(type));
} else {
uint8_t converted_value = static_cast<uint8_t>(type);
buf[0] = converted_value;
}
}

inline void EncodeFixed32(char* buf, uint32_t value) {
if (kLittleEndian) {
memcpy(buf, &value, sizeof(value));
Expand Down
55 changes: 19 additions & 36 deletions src/storage/src/redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ Status Redis::Open(const StorageOptions& storage_options, const std::string& db_
rocksdb::DBOptions db_ops(storage_options.options);
db_ops.create_missing_column_families = true;

// Meta column-family options
rocksdb::ColumnFamilyOptions meta_cf_ops(storage_options.options);
rocksdb::BlockBasedTableOptions meta_cf_table_ops(table_ops);
meta_cf_ops.compaction_filter_factory = std::make_shared<MetaFilterFactory>();
if (!storage_options.share_block_cache && (storage_options.block_cache_size > 0)) {
meta_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
}
meta_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(meta_cf_table_ops));
// string column-family options
rocksdb::ColumnFamilyOptions string_cf_ops(storage_options.options);
string_cf_ops.compaction_filter_factory = std::make_shared<StringsFilterFactory>();
Expand All @@ -71,86 +79,61 @@ Status Redis::Open(const StorageOptions& storage_options, const std::string& db_
string_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(string_table_ops));

// hash column-family options
rocksdb::ColumnFamilyOptions hash_meta_cf_ops(storage_options.options);
rocksdb::ColumnFamilyOptions hash_data_cf_ops(storage_options.options);
hash_meta_cf_ops.compaction_filter_factory = std::make_shared<HashesMetaFilterFactory>();
hash_data_cf_ops.compaction_filter_factory =
std::make_shared<HashesDataFilterFactory>(&db_, &handles_, kHashesMetaCF);
rocksdb::BlockBasedTableOptions hash_meta_cf_table_ops(table_ops);
hash_data_cf_ops.compaction_filter_factory = std::make_shared<HashesDataFilterFactory>(&db_, &handles_, kMetaCF);
rocksdb::BlockBasedTableOptions hash_data_cf_table_ops(table_ops);
if (!storage_options.share_block_cache && (storage_options.block_cache_size > 0)) {
hash_meta_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
hash_data_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
}
hash_meta_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(hash_meta_cf_table_ops));
hash_data_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(hash_data_cf_table_ops));

// list column-family options
rocksdb::ColumnFamilyOptions list_meta_cf_ops(storage_options.options);
rocksdb::ColumnFamilyOptions list_data_cf_ops(storage_options.options);
list_meta_cf_ops.compaction_filter_factory = std::make_shared<ListsMetaFilterFactory>();
list_data_cf_ops.compaction_filter_factory = std::make_shared<ListsDataFilterFactory>(&db_, &handles_, kListsMetaCF);
list_data_cf_ops.compaction_filter_factory = std::make_shared<ListsDataFilterFactory>(&db_, &handles_, kMetaCF);
list_data_cf_ops.comparator = ListsDataKeyComparator();
rocksdb::BlockBasedTableOptions list_meta_cf_table_ops(table_ops);
rocksdb::BlockBasedTableOptions list_data_cf_table_ops(table_ops);
if (!storage_options.share_block_cache && (storage_options.block_cache_size > 0)) {
list_meta_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
list_data_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
}
list_meta_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(list_meta_cf_table_ops));
list_data_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(list_data_cf_table_ops));

// set column-family options
rocksdb::ColumnFamilyOptions set_meta_cf_ops(storage_options.options);
rocksdb::ColumnFamilyOptions set_data_cf_ops(storage_options.options);
set_meta_cf_ops.compaction_filter_factory = std::make_shared<SetsMetaFilterFactory>();
set_data_cf_ops.compaction_filter_factory = std::make_shared<SetsMemberFilterFactory>(&db_, &handles_, kSetsMetaCF);
rocksdb::BlockBasedTableOptions set_meta_cf_table_ops(table_ops);
set_data_cf_ops.compaction_filter_factory = std::make_shared<SetsMemberFilterFactory>(&db_, &handles_, kMetaCF);
rocksdb::BlockBasedTableOptions set_data_cf_table_ops(table_ops);
if (!storage_options.share_block_cache && (storage_options.block_cache_size > 0)) {
set_meta_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
set_data_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
}
set_meta_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(set_meta_cf_table_ops));
set_data_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(set_data_cf_table_ops));

// zset column-family options
rocksdb::ColumnFamilyOptions zset_meta_cf_ops(storage_options.options);
rocksdb::ColumnFamilyOptions zset_data_cf_ops(storage_options.options);
rocksdb::ColumnFamilyOptions zset_score_cf_ops(storage_options.options);
zset_meta_cf_ops.compaction_filter_factory = std::make_shared<ZSetsMetaFilterFactory>();
zset_data_cf_ops.compaction_filter_factory = std::make_shared<ZSetsDataFilterFactory>(&db_, &handles_, kZsetsMetaCF);
zset_score_cf_ops.compaction_filter_factory =
std::make_shared<ZSetsScoreFilterFactory>(&db_, &handles_, kZsetsMetaCF);
zset_data_cf_ops.compaction_filter_factory = std::make_shared<ZSetsDataFilterFactory>(&db_, &handles_, kMetaCF);
zset_score_cf_ops.compaction_filter_factory = std::make_shared<ZSetsScoreFilterFactory>(&db_, &handles_, kMetaCF);
zset_score_cf_ops.comparator = ZSetsScoreKeyComparator();

rocksdb::BlockBasedTableOptions zset_meta_cf_table_ops(table_ops);
rocksdb::BlockBasedTableOptions zset_data_cf_table_ops(table_ops);
rocksdb::BlockBasedTableOptions zset_score_cf_table_ops(table_ops);
if (!storage_options.share_block_cache && (storage_options.block_cache_size > 0)) {
zset_meta_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
zset_data_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
zset_meta_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
}
zset_meta_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(zset_meta_cf_table_ops));
zset_data_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(zset_data_cf_table_ops));
zset_score_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(zset_score_cf_table_ops));

std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
column_families.emplace_back(rocksdb::kDefaultColumnFamilyName, string_cf_ops);
// hash CF
column_families.emplace_back("hash_meta_cf", hash_meta_cf_ops);
column_families.emplace_back("hash_data_cf", hash_data_cf_ops);
// set CF
column_families.emplace_back("set_meta_cf", set_meta_cf_ops);
column_families.emplace_back("set_data_cf", set_data_cf_ops);
// list CF
column_families.emplace_back("list_meta_cf", list_meta_cf_ops);
column_families.emplace_back("list_data_cf", list_data_cf_ops);
// zset CF
column_families.emplace_back("zset_meta_cf", zset_meta_cf_ops);
column_families.emplace_back("zset_data_cf", zset_data_cf_ops);
column_families.emplace_back("zset_score_cf", zset_score_cf_ops);
// meta CF
column_families.emplace_back("meta_cf", meta_cf_ops);
return rocksdb::DB::Open(db_ops, db_path, column_families, &handles_, &db_);
}

Expand Down Expand Up @@ -194,31 +177,31 @@ Status Redis::CompactRange(const DataType& dtype, const rocksdb::Slice* begin, c
break;
case DataType::kHashes:
if (type == kMeta || type == kMetaAndData) {
s = db_->CompactRange(default_compact_range_options_, handles_[kHashesMetaCF], begin, end);
s = db_->CompactRange(default_compact_range_options_, handles_[kMetaCF], begin, end);
}
if (s.ok() && (type == kData || type == kMetaAndData)) {
s = db_->CompactRange(default_compact_range_options_, handles_[kHashesDataCF], begin, end);
}
break;
case DataType::kSets:
if (type == kMeta || type == kMetaAndData) {
db_->CompactRange(default_compact_range_options_, handles_[kSetsMetaCF], begin, end);
db_->CompactRange(default_compact_range_options_, handles_[kMetaCF], begin, end);
}
if (s.ok() && (type == kData || type == kMetaAndData)) {
db_->CompactRange(default_compact_range_options_, handles_[kSetsDataCF], begin, end);
}
break;
case DataType::kLists:
if (type == kMeta || type == kMetaAndData) {
s = db_->CompactRange(default_compact_range_options_, handles_[kListsMetaCF], begin, end);
s = db_->CompactRange(default_compact_range_options_, handles_[kMetaCF], begin, end);
}
if (s.ok() && (type == kData || type == kMetaAndData)) {
s = db_->CompactRange(default_compact_range_options_, handles_[kListsDataCF], begin, end);
}
break;
case DataType::kZSets:
if (type == kMeta || type == kMetaAndData) {
db_->CompactRange(default_compact_range_options_, handles_[kZsetsMetaCF], begin, end);
db_->CompactRange(default_compact_range_options_, handles_[kMetaCF], begin, end);
}
if (s.ok() && (type == kData || type == kMetaAndData)) {
db_->CompactRange(default_compact_range_options_, handles_[kZsetsDataCF], begin, end);
Expand Down
10 changes: 5 additions & 5 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,19 +308,19 @@ class Redis {
options.iterate_upper_bound = upper_bound;
switch (type) {
case 'k':
return new StringsIterator(options, db_, handles_[kStringsCF], pattern);
return new StringsIterator(options, db_, handles_[kMetaCF], pattern);
break;
case 'h':
return new HashesIterator(options, db_, handles_[kHashesMetaCF], pattern);
return new HashesIterator(options, db_, handles_[kMetaCF], pattern);
break;
case 's':
return new SetsIterator(options, db_, handles_[kSetsMetaCF], pattern);
return new SetsIterator(options, db_, handles_[kMetaCF], pattern);
break;
case 'l':
return new ListsIterator(options, db_, handles_[kListsMetaCF], pattern);
return new ListsIterator(options, db_, handles_[kMetaCF], pattern);
break;
case 'z':
return new ZsetsIterator(options, db_, handles_[kZsetsMetaCF], pattern);
return new ZsetsIterator(options, db_, handles_[kMetaCF], pattern);
break;
default:
WARN("Invalid datatype to create iterator");
Expand Down
Loading
Loading