Skip to content

Commit

Permalink
fix bug for ordered flush
Browse files Browse the repository at this point in the history
  • Loading branch information
HaoyuHuang committed Sep 29, 2020
1 parent 8c3faa9 commit 29ebb57
Show file tree
Hide file tree
Showing 15 changed files with 346 additions and 273 deletions.
4 changes: 2 additions & 2 deletions bench_memtable/memtable_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ namespace leveldb {
auto cmp = new YCSBKeyComparator();
leveldb::InternalKeyComparator *comp = new leveldb::InternalKeyComparator(
cmp);
MemTable *table = new MemTable(*comp, 0, nullptr, 1);
MemTable *table = new MemTable(*comp, 0, nullptr, true);
table->Ref();
active_memtables_.push_back(table);
mutexs_.push_back(new std::mutex);
Expand All @@ -65,7 +65,7 @@ namespace leveldb {
auto cmp = new YCSBKeyComparator();
leveldb::InternalKeyComparator *comp = new leveldb::InternalKeyComparator(
cmp);
table = new MemTable(*comp, 0, nullptr, 1);
table = new MemTable(*comp, 0, nullptr, true);
table->Ref();
active_memtables_[partition_id] = table;
}
Expand Down
4 changes: 2 additions & 2 deletions build.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
scp nova_server_main [email protected]:~/nova_server_main &
scp nova_server_main [email protected]:~/nova_server_main &
scp nova_server_main [email protected]:~/nova_server_main &
scp nova_server_main [email protected]:~/nova_server_main &
#scp nova_server_main [email protected]:~/nova_server_main &
#scp nova_server_main [email protected]:~/nova_server_main &
sleep 1000
220 changes: 120 additions & 100 deletions db/db_impl.cc

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ namespace leveldb {
std::unordered_map<uint32_t, leveldb::MemTableLogFilePair> *mid_table_map);

void ScheduleFlushMemTableTask(
int thread_id, MemTable *imm,
int thread_id,
uint32_t memtable_id,
MemTable *imm,
uint32_t partition_id, uint32_t imm_slot,
unsigned int *rand_seed, bool merge_memtables_without_flushing);

Expand Down
2 changes: 1 addition & 1 deletion db/dbformat.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace leveldb {
static double MaxBytesForLevel(const Options &options, int level) {
double result = options.l0bytes_start_compaction_trigger;
if (result == 0) {
result = 4.0 * 1024 * 1024 * 1024 / nova::NovaConfig::config->cfgs[0]->fragments.size();
result = 4.0 * 1024 * 1024 * 1024;// / nova::NovaConfig::config->cfgs[0]->fragments.size();
}
while (level > 0) {
result *= 3.2;
Expand Down
160 changes: 88 additions & 72 deletions db/flush_order.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,84 +5,100 @@
#include "flush_order.h"

namespace leveldb {
std::string ImpactedDranges::DebugString() const {
return fmt::format("[{},{}]:{}", lower_drange_index, upper_drange_index, generation_id);
}

std::string ImpactedDrangeCollection::DebugString() const {
std::string msg;
for (auto &dranges : impacted_dranges) {
msg = fmt::format("{},{}", msg, dranges.DebugString());
}
return msg;
}

FlushOrder::FlushOrder(std::vector<MemTablePartition *> *partitioned_active_memtables)
: partitioned_active_memtables_(partitioned_active_memtables) {
latest_generation_id = INIT_GEN_ID;
: partitioned_active_memtables_(partitioned_active_memtables), impacted_dranges_(nullptr),
latest_generation_id(INIT_GEN_ID) {
}

bool FlushOrder::IsSafeToFlush(uint32_t drange_idx, MemTable *memtable) {
return true;
// Safe only if older memtables of all overlapping dranges are flushed.
// auto dranges_col = impacted_dranges_.load();
// if (!dranges_col) {
// return true;
// }
// bool safe_to_flush = true;
// for (const auto &dranges : dranges_col->impacted_dranges) {
// if (!safe_to_flush) {
// break;
// }
// if (drange_idx >= dranges.lower_drange_index && drange_idx <= dranges.upper_drange_index &&
// memtable->generation_id_ >= dranges.generation_id) {
// // overlap and this memtable is newer than the generation of the impacted dranges.
// for (uint32_t drange_idx = dranges.lower_drange_index;
// drange_idx <= dranges.upper_drange_index; drange_idx++) {
// auto memtable_partition = (*partitioned_active_memtables_)[drange_idx];
// memtable_partition->mutex.Lock();
// auto it = memtable_partition->generation_num_memtables_.begin();
// while (it != memtable_partition->generation_num_memtables_.end()) {
// if (it->first < memtable->generation_id_) {
// safe_to_flush = false;
// } else {
// break;
// }
// it++;
// }
// memtable_partition->mutex.Unlock();
// if (!safe_to_flush) {
// break;
// }
// }
// }
// }
// return safe_to_flush;

bool FlushOrder::IsSafeToFlush(uint32_t drange_idx, uint64_t generation_id) {
if (!nova::NovaConfig::config->use_ordered_flush) {
return true;
}
// Safe only if older memtables of all overlapping dranges are flushed.
auto dranges_col = impacted_dranges_.load();
if (!dranges_col) {
return true;
}
bool safe_to_flush = true;
for (const auto &dranges : dranges_col->impacted_dranges) {
if (drange_idx >= dranges.lower_drange_index && drange_idx <= dranges.upper_drange_index &&
generation_id >= dranges.generation_id) {
// overlap and this memtable is newer than the generation of the impacted dranges.
for (uint32_t idx = dranges.lower_drange_index; idx <= dranges.upper_drange_index; idx++) {
auto memtable_partition = (*partitioned_active_memtables_)[idx];
memtable_partition->mutex.Lock();
auto it = memtable_partition->generation_num_memtables_.begin();
while (it != memtable_partition->generation_num_memtables_.end()) {
if (it->first < dranges.generation_id) {
safe_to_flush = false;
NOVA_LOG(rdmaio::INFO) << fmt::format("Cannot flush {}, partition:{}", generation_id,
memtable_partition->DebugString());
break;
}
it++;
}
memtable_partition->mutex.Unlock();
if (!safe_to_flush) {
break;
}
}
}
if (!safe_to_flush) {
break;
}
}
return safe_to_flush;
}

void FlushOrder::UpdateImpactedDranges(const ImpactedDranges &impacted_dranges) {
return;
if (!nova::NovaConfig::config->use_ordered_flush) {
return;
}
// Remove dranges where their memtables are in the current/newer generation.
// auto new_col = new ImpactedDrangeCollection;
// auto dranges_col = impacted_dranges_.load();
// if (dranges_col) {
// for (const auto &dranges : dranges_col->impacted_dranges) {
// bool drop = true;
// for (uint32_t drange_idx = dranges.lower_drange_index;
// drange_idx <= dranges.upper_drange_index; drange_idx++) {
// auto memtable_partition = (*partitioned_active_memtables_)[drange_idx];
// memtable_partition->mutex.Lock();
// auto it = memtable_partition->generation_num_memtables_.begin();
// while (it != memtable_partition->generation_num_memtables_.end()) {
// if (it->first < dranges.generation_id) {
// // some older memtables are not flushed.
// drop = false;
// } else {
// break;
// }
// it++;
// }
// memtable_partition->mutex.Unlock();
// if (!drop) {
// break;
// }
// }
// if (!drop) {
// // Keep.
// new_col->impacted_dranges.push_back(dranges);
// }
// }
// }
// new_col->impacted_dranges.push_back(impacted_dranges);
// impacted_dranges_.store(new_col);
// latest_generation_id += 1;
auto new_col = new ImpactedDrangeCollection;
auto dranges_col = impacted_dranges_.load();
if (dranges_col) {
for (const auto &dranges : dranges_col->impacted_dranges) {
bool keep = true;
for (uint32_t idx = dranges.lower_drange_index; idx <= dranges.upper_drange_index; idx++) {
auto memtable_partition = (*partitioned_active_memtables_)[idx];
memtable_partition->mutex.Lock();
auto it = memtable_partition->generation_num_memtables_.begin();
while (it != memtable_partition->generation_num_memtables_.end()) {
if (it->first < dranges.generation_id) {
// some older memtables are not flushed.
keep = false;
break;
}
it++;
}
memtable_partition->mutex.Unlock();
if (!keep) {
break;
}
}
if (keep) {
// Keep.
new_col->impacted_dranges.push_back(dranges);
}
}
}
new_col->impacted_dranges.push_back(impacted_dranges);
NOVA_LOG(rdmaio::INFO) << "Latest flush order: " << new_col->DebugString();
impacted_dranges_.store(new_col);
latest_generation_id += 1;
}
}
5 changes: 4 additions & 1 deletion db/flush_order.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ namespace leveldb {
uint32_t lower_drange_index = 0;
uint32_t upper_drange_index = 0;
uint64_t generation_id;

std::string DebugString() const;
};

struct ImpactedDrangeCollection {
std::vector<ImpactedDranges> impacted_dranges;
std::string DebugString() const;
};

class FlushOrder {
Expand All @@ -32,7 +35,7 @@ namespace leveldb {

void UpdateImpactedDranges(const ImpactedDranges& impacted_dranges);

bool IsSafeToFlush(uint32_t drange_idx, MemTable* memtable);
bool IsSafeToFlush(uint32_t drange_idx, uint64_t generation_id);

std::atomic_uint_fast64_t latest_generation_id;
private:
Expand Down
58 changes: 39 additions & 19 deletions db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,10 @@ namespace leveldb {
MemTable::MemTable(const InternalKeyComparator &comparator,
uint32_t memtable_id,
DBProfiler *db_profiler,
uint64_t generation_id,
bool is_ready)
: comparator_(comparator), memtable_id_(memtable_id), refs_(0),
table_(comparator_, &arena_),
db_profiler_(db_profiler), generation_id_(generation_id), is_ready_(is_ready),
db_profiler_(db_profiler), is_ready_(is_ready),
is_ready_signal_(&is_ready_mutex_) {
}

Expand All @@ -52,23 +51,20 @@ namespace leveldb {
is_ready_mutex_.Unlock();
}

void MemTablePartition::AddMemTable(MemTable *memtable) {
// auto it = generation_num_memtables_.find(memtable->generation_id_);
// if (it == generation_num_memtables_.end()) {
// generation_num_memtables_[memtable->generation_id_] = 1;
// } else {
// generation_num_memtables_[memtable->generation_id_] += 1;
// }
void MemTablePartition::AddMemTable(uint64_t generation_id, uint32_t memtableid) {
NOVA_LOG(rdmaio::DEBUG) << fmt::format("Add memtable {}:{}", generation_id, memtableid);
auto it = generation_num_memtables_.find(generation_id);
generation_num_memtables_[generation_id].insert(memtableid);
}

void MemTablePartition::RemoveMemTable(MemTable *imm) {
// auto it = generation_num_memtables_.find(imm->generation_id_);
// NOVA_ASSERT(it != generation_num_memtables_.end());
// if (it->second == 1) {
// generation_num_memtables_.erase(imm->generation_id_);
// } else {
// generation_num_memtables_[imm->generation_id_] -= 1;
// }
void MemTablePartition::RemoveMemTable(uint64_t generation_id, uint32_t memtableid) {
NOVA_LOG(rdmaio::DEBUG) << fmt::format("Remove memtable {}:{}", generation_id, memtableid);
auto it = generation_num_memtables_.find(generation_id);
NOVA_ASSERT(it != generation_num_memtables_.end());
it->second.erase(memtableid);
if (it->second.empty()) {
generation_num_memtables_.erase(generation_id);
}
}

MemTable::~MemTable() { assert(refs_ == 0); }
Expand Down Expand Up @@ -252,12 +248,13 @@ namespace leveldb {
return false;
}

void AtomicMemTable::SetMemTable(leveldb::MemTable *mem) {
void AtomicMemTable::SetMemTable(uint64_t generation_id, leveldb::MemTable *mem) {
mutex_.lock();
l0_file_numbers_.clear();
is_flushed_ = false;
is_immutable_ = false;
is_scheduled_for_flushing = false;
generation_id_ = generation_id;
memtable_id_ = mem->memtableid();

// NOVA_ASSERT(!memtable_);
Expand All @@ -276,7 +273,8 @@ namespace leveldb {
last_version_id_ = std::max(last_version_id_, version_id);
l0_file_numbers_.insert(l0_file_numbers.begin(), l0_file_numbers.end());
is_flushed_ = true;
NOVA_ASSERT(memtable_);
NOVA_ASSERT(memtable_)
<< fmt::format("{}:{}:{}", memtable_id_, l0_file_numbers.empty() ? 0 : l0_file_numbers[0], version_id);
uint32_t mid = memtable_->memtableid();
uint32_t refs = memtable_->Unref();
if (refs <= 0) {
Expand Down Expand Up @@ -385,4 +383,26 @@ namespace leveldb {
}
mutex_.unlock();
}

std::string MemTablePartition::DebugString() const {
std::string msg = fmt::format("p-{}:", partition_id);
for (auto gen : generation_num_memtables_) {
std::string m;
for (auto id : gen.second) {
m += fmt::format(",{}", id);
}
msg += fmt::format("gen-{}:{},", gen.first, m);
}
msg += "\n";
msg += "immutable slots:";
for (auto slots : immutable_memtable_ids) {
msg += fmt::format("{},", slots);
}
msg += "\n";
msg += "slot-id-map:";
for (auto slots : slot_imm_id) {
msg += fmt::format("s-{}:{},", slots.first, slots.second);
}
return msg;
}
} // namespace leveldb
Loading

0 comments on commit 29ebb57

Please sign in to comment.