Skip to content

Commit

Permalink
[Core] Add an ObjListPool, LRU Cache, FIFO Cache to support swapping …
Browse files Browse the repository at this point in the history
…of ObjLists later

issue husky-team#256
Open a pull request for more discussion and changes.
  • Loading branch information
lmatz committed Mar 7, 2017
1 parent c0822ed commit f6f8c97
Show file tree
Hide file tree
Showing 4 changed files with 542 additions and 2 deletions.
4 changes: 2 additions & 2 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ add_subdirectory(channel)
file(GLOB core-src-files
accessor.cpp
accessor_store.cpp
balance.cpp
combiner.cpp
config.cpp
context.cpp
Expand All @@ -32,8 +33,7 @@ file(GLOB core-src-files
objlist.cpp
objlist_store.cpp
shuffle_combiner_store.cpp
worker_info.cpp
balance.cpp)
worker_info.cpp)
husky_cache_variable(core-src-files ${core-src-files})

add_library(core-objs OBJECT ${core-src-files})
Expand Down
6 changes: 6 additions & 0 deletions core/objlist.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ class ObjListBase : public ChannelSource, public ChannelDestination {

virtual size_t get_size() const = 0;

virtual bool write_to_disk() = 0;

virtual void read_from_disk(const std::string&) = 0;

virtual size_t estimated_storage_size(const double sample_rate = 0.005) = 0;

private:
size_t id_;

Expand Down
304 changes: 304 additions & 0 deletions core/objlist_pool.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
// Copyright 2016 Husky Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <list>
#include <mutex>
#include <unordered_map>
#include <utility>

#include "core/objlist.hpp"

namespace husky {

// the reponsibility of cache is executing some policy which determines which objlist
// or (TODO) which part of a objlist should stay in memory.
// cache is manipulated by ObjListPool only
// cache provides 5 interface:
// 1 put a pair of key and objlist(in memory) into cache so that it will be managed by cache
// 2 del a objlist with specific 'key' so that cache doesn't manage it anymore
// 3 check whether a objlist with specific 'key' lives in memory or disk
// 4 validate a objlist(on disk. if already in memory, do nothing) into memory
// 5 invalidate a objlist according to the cache policy, put it onto disk(logically, real job is done by ObjListPool)
template <typename KeyT>
class ICache {
public:
size_t num_objlist_on_disk() { return disk_map_.size(); }

protected:
std::unordered_map<KeyT, ObjListBase*> disk_map_;
};

template <typename KeyT>
class LRUCache : public ICache<KeyT> {
public:
using ListIterator = typename std::list<std::pair<KeyT, ObjListBase*>>::iterator;

LRUCache() = default;

~LRUCache() = default;

// when objlist with 'key' is registered at the first time to ObjListPool, 'put' should be called
// at this moment, objlist contains no object, so it will be automatically considered as living in memory
template <typename KeyU>
void put(KeyU&& key, ObjListBase* objlist) {
cache_list_.push_front(std::make_pair(key, objlist));
cache_map_[std::forward<KeyU>(key)] = cache_list_.begin();
}

// refresh the objlist with 'key'
template <typename KeyU>
ObjListBase* validate(KeyU&& key) {
auto it_cache = cache_map_.find(key);
ObjListBase* ret;
if (it_cache == cache_map_.end()) {
// if objlist with 'key' is not in the memory
auto it_disk = this->disk_map_.find(key);
// if objlist with 'key' is also not on the disk
// impossible
if (it_disk == this->disk_map_.end()) {
throw husky::base::HuskyException("there is no such key in LRUCache");
}
ret = it_disk->second;
put(std::forward<KeyU>(key), ret);
this->disk_map_.erase(key);
} else {
ret = it_cache->second->second;
cache_list_.erase(it_cache->second);
cache_map_.erase(it_cache);
put(std::forward<KeyU>(key), ret);
}
return ret;
}

// LRUCache: delete the one at the back of cache_list_
// and put it into disk_map_
std::pair<KeyT, ObjListBase*> invalidate() {
// since it's lru cache, we delete from the last one
auto last_key_list_pair = cache_list_.back();
KeyT key = last_key_list_pair.first;
ObjListBase* objlist = last_key_list_pair.second;
// invalidate the objlist
cache_map_.erase(key);
cache_list_.pop_back();
// store the objlist moved to disk
// 'this' is needed because https://isocpp.org/wiki/faq/templates#nondependent-name-lookup-members
this->disk_map_[std::move(key)] = objlist;
return last_key_list_pair;
}

// delete the objlist with 'key' so that cache won't manage it any more
template <typename KeyU>
void del(KeyU&& key) {
this->disk_map_.erase(key);
auto it_cache = cache_map_.find(key);
if (it_cache == cache_map_.end()) {
return;
}
cache_list_.erase(it_cache->second);
cache_map_.erase(key);
}

// true indicates that objlist with 'key' lives in memory
// false indicates that objlist with 'key' lives on disk
template <typename KeyU>
bool exists(KeyU&& key) {
return cache_map_.find(key) != cache_map_.end();
}

private:
std::unordered_map<KeyT, ListIterator> cache_map_;
std::list<std::pair<KeyT, ObjListBase*>> cache_list_;
};

template <typename KeyT>
class FIFOCache : public ICache<KeyT> {
public:
using ListIterator = typename std::list<std::pair<KeyT, ObjListBase*>>::iterator;

FIFOCache() = default;

~FIFOCache() = default;

template <typename KeyU>
void put(KeyU&& key, ObjListBase* objlist) {
cache_list_.push_front(std::make_pair(key, objlist));
cache_map_[std::forward<KeyU>(key)] = cache_list_.begin();
}

template <typename KeyU>
ObjListBase* validate(KeyU&& key) {
auto it_cache = cache_map_.find(key);
ObjListBase* ret;
if (it_cache == cache_map_.end()) {
// if objlist with 'key' is not in the memory
auto it_disk = this->disk_map_.find(key);
// if objlist with 'key' is also not on the disk
// impossible
if (it_disk == this->disk_map_.end()) {
throw husky::base::HuskyException("there is no such key in FIFOCache");
}
ret = it_disk->second;
put(std::forward<KeyU>(key), ret);
this->disk_map_.erase(key);
} else {
ret = it_cache->second->second;
}
return ret;
}

std::pair<KeyT, ObjListBase*> invalidate() {
// since it's fifo cache, we delete from the back (which is the first in)
auto last_key_list_pair = cache_list_.back();
KeyT key = last_key_list_pair.first;
ObjListBase* objlist = last_key_list_pair.second;
// invalidate the objlist
cache_map_.erase(key);
cache_list_.pop_back();
// store the objlist moved to disk
this->disk_map_[std::move(key)] = objlist;
return last_key_list_pair;
}

template <typename KeyU>
void del(KeyU&& key) {
this->disk_map_.erase(key);
auto it_cache = cache_map_.find(key);
if (it_cache == cache_map_.end()) {
return;
}
cache_list_.erase(it_cache->second);
cache_map_.erase(key);
}

template <typename KeyU>
bool exists(KeyU&& key) {
return cache_map_.find(key) != cache_map_.end();
}

private:
std::unordered_map<KeyT, ListIterator> cache_map_;
std::list<std::pair<KeyT, ObjListBase*>> cache_list_;
};

template <typename KeyT, template <typename> class CacheT = LRUCache>
class ObjListPool {
public:
static ObjListPool<KeyT, CacheT>& get_instance() {
static ObjListPool<KeyT, CacheT> pool;
return pool;
}

// register the objlist when it is created
// at this moment, objlist contains no objects so it is considered as living in the memory
template <typename KeyU>
bool register_objlist(KeyU&& key, ObjListBase* objlist) {
std::lock_guard<std::mutex> lock(mu);
if (register_set_.find(key) != register_set_.end()) {
return false;
}
register_set_.insert(key);
cache_.put(std::forward<KeyU>(key), objlist);
return true;
}

// deregister the objlist so ObjListPool won't manage it anymore
template <typename KeyU>
bool deregister_objlist(KeyU&& key) {
std::lock_guard<std::mutex> lock(mu);
if (register_set_.find(key) == register_set_.end()) {
return false;
}
register_set_.erase(key);
cache_.del(std::forward<KeyU>(key));
return true;
}

// we bring the objlist with 'key' into the momery
// ObjListPool is not aware of any memory issue
// It will do the job when caller think it is approriate
template <typename KeyU>
bool request_objlist(KeyU&& key);

// bring the objlist chosen according to the cache policy from memory onto disk
// by default, we will evict one objlist
// or the caller has a better idea of how many bytes it needs exactly
int64_t request_space(const int64_t bytes_required = 1);

template <typename KeyU>
bool in_memory(KeyU&& key) {
std::lock_guard<std::mutex> lock(mu);
if (register_set_.find(key) == register_set_.end()) {
throw husky::base::HuskyException("key has not been registered [in memory]");
}
return cache_.exists(std::forward<KeyU>(key));
}

template <typename KeyU>
bool on_disk(KeyU&& key) {
std::lock_guard<std::mutex> lock(mu);
if (register_set_.find(key) == register_set_.end()) {
throw husky::base::HuskyException("key has not been registered [on disk]");
}
return !cache_.exists(std::forward<KeyU>(key));
}

private:
ObjListPool() = default;

CacheT<KeyT> cache_;
std::unordered_set<KeyT> register_set_;
std::mutex mu;
};

// true indicates that objlist with 'key' is brought to memory from disk
// false indicates that objlist with 'key' has already existsed in memory
template <typename KeyT, template <typename> class CacheT>
template <typename KeyU>
bool ObjListPool<KeyT, CacheT>::request_objlist(KeyU&& key) {
std::lock_guard<std::mutex> lock(mu);
if (register_set_.find(key) == register_set_.end()) {
throw husky::base::HuskyException("The objlist has not registered in the ObjListPool");
}
bool live_in_memory = cache_.exists(key);
if (live_in_memory)
return false;
ObjListBase* objlist = cache_.validate(std::forward<KeyU>(key));
objlist->read_from_disk(objlist->id2str());
return true;
}

template <typename KeyT, template <typename> class CacheT>
int64_t ObjListPool<KeyT, CacheT>::request_space(const int64_t bytes_required) {
std::lock_guard<std::mutex> lock(mu);
int64_t bytes_evicted = 0;
int count = 0;
int num = cache_.num_objlist_on_disk();
while (bytes_evicted < bytes_required || count < num) {
auto key_list_pair = cache_.invalidate();
KeyT key = key_list_pair.first;
ObjListBase* objlist = key_list_pair.second;
int64_t storage_size = objlist->estimated_storage_size();
bool res = objlist->write_to_disk();
if (res) {
bytes_evicted += storage_size;
} else {
cache_.validate(std::move(key));
}
count++;
}
return bytes_evicted;
}
} // namespace husky
Loading

0 comments on commit f6f8c97

Please sign in to comment.