From f6f8c979cf70db3499ebe1d7cafab735c0562b57 Mon Sep 17 00:00:00 2001 From: lmatz Date: Wed, 8 Mar 2017 02:00:38 +0800 Subject: [PATCH] [Core] Add an ObjListPool, LRU Cache, FIFO Cache to support swapping of ObjLists later issue #256 Open a pull request for more discussion and changes. --- core/CMakeLists.txt | 4 +- core/objlist.hpp | 6 + core/objlist_pool.hpp | 304 +++++++++++++++++++++++++++++++++ core/objlist_pool_unittest.cpp | 230 +++++++++++++++++++++++++ 4 files changed, 542 insertions(+), 2 deletions(-) create mode 100644 core/objlist_pool.hpp create mode 100644 core/objlist_pool_unittest.cpp diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index 9a3f204..7139bfd 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -20,6 +20,7 @@ add_subdirectory(channel) file(GLOB core-src-files accessor.cpp accessor_store.cpp + balance.cpp combiner.cpp config.cpp context.cpp @@ -32,8 +33,7 @@ file(GLOB core-src-files objlist.cpp objlist_store.cpp shuffle_combiner_store.cpp - worker_info.cpp - balance.cpp) + worker_info.cpp) husky_cache_variable(core-src-files ${core-src-files}) add_library(core-objs OBJECT ${core-src-files}) diff --git a/core/objlist.hpp b/core/objlist.hpp index 54b4a10..fbe0300 100644 --- a/core/objlist.hpp +++ b/core/objlist.hpp @@ -54,6 +54,12 @@ class ObjListBase : public ChannelSource, public ChannelDestination { virtual size_t get_size() const = 0; + virtual bool write_to_disk() = 0; + + virtual void read_from_disk(const std::string&) = 0; + + virtual size_t estimated_storage_size(const double sample_rate = 0.005) = 0; + private: size_t id_; diff --git a/core/objlist_pool.hpp b/core/objlist_pool.hpp new file mode 100644 index 0000000..53cb714 --- /dev/null +++ b/core/objlist_pool.hpp @@ -0,0 +1,304 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +#include "core/objlist.hpp" + +namespace husky { + +// the reponsibility of cache is executing some policy which determines which objlist +// or (TODO) which part of a objlist should stay in memory. +// cache is manipulated by ObjListPool only +// cache provides 5 interface: +// 1 put a pair of key and objlist(in memory) into cache so that it will be managed by cache +// 2 del a objlist with specific 'key' so that cache doesn't manage it anymore +// 3 check whether a objlist with specific 'key' lives in memory or disk +// 4 validate a objlist(on disk. if already in memory, do nothing) into memory +// 5 invalidate a objlist according to the cache policy, put it onto disk(logically, real job is done by ObjListPool) +template +class ICache { + public: + size_t num_objlist_on_disk() { return disk_map_.size(); } + + protected: + std::unordered_map disk_map_; +}; + +template +class LRUCache : public ICache { + public: + using ListIterator = typename std::list>::iterator; + + LRUCache() = default; + + ~LRUCache() = default; + + // when objlist with 'key' is registered at the first time to ObjListPool, 'put' should be called + // at this moment, objlist contains no object, so it will be automatically considered as living in memory + template + void put(KeyU&& key, ObjListBase* objlist) { + cache_list_.push_front(std::make_pair(key, objlist)); + cache_map_[std::forward(key)] = cache_list_.begin(); + } + + // refresh the objlist with 'key' + template + ObjListBase* validate(KeyU&& key) { + auto it_cache = cache_map_.find(key); + ObjListBase* ret; + if (it_cache == cache_map_.end()) { + // if objlist with 'key' is not in the memory + auto it_disk = this->disk_map_.find(key); + // if objlist with 'key' is also not on the disk + // impossible + if (it_disk == this->disk_map_.end()) { + throw husky::base::HuskyException("there is no such key in LRUCache"); + } + ret = it_disk->second; + put(std::forward(key), ret); + this->disk_map_.erase(key); + } else { + ret = it_cache->second->second; + cache_list_.erase(it_cache->second); + cache_map_.erase(it_cache); + put(std::forward(key), ret); + } + return ret; + } + + // LRUCache: delete the one at the back of cache_list_ + // and put it into disk_map_ + std::pair invalidate() { + // since it's lru cache, we delete from the last one + auto last_key_list_pair = cache_list_.back(); + KeyT key = last_key_list_pair.first; + ObjListBase* objlist = last_key_list_pair.second; + // invalidate the objlist + cache_map_.erase(key); + cache_list_.pop_back(); + // store the objlist moved to disk + // 'this' is needed because https://isocpp.org/wiki/faq/templates#nondependent-name-lookup-members + this->disk_map_[std::move(key)] = objlist; + return last_key_list_pair; + } + + // delete the objlist with 'key' so that cache won't manage it any more + template + void del(KeyU&& key) { + this->disk_map_.erase(key); + auto it_cache = cache_map_.find(key); + if (it_cache == cache_map_.end()) { + return; + } + cache_list_.erase(it_cache->second); + cache_map_.erase(key); + } + + // true indicates that objlist with 'key' lives in memory + // false indicates that objlist with 'key' lives on disk + template + bool exists(KeyU&& key) { + return cache_map_.find(key) != cache_map_.end(); + } + + private: + std::unordered_map cache_map_; + std::list> cache_list_; +}; + +template +class FIFOCache : public ICache { + public: + using ListIterator = typename std::list>::iterator; + + FIFOCache() = default; + + ~FIFOCache() = default; + + template + void put(KeyU&& key, ObjListBase* objlist) { + cache_list_.push_front(std::make_pair(key, objlist)); + cache_map_[std::forward(key)] = cache_list_.begin(); + } + + template + ObjListBase* validate(KeyU&& key) { + auto it_cache = cache_map_.find(key); + ObjListBase* ret; + if (it_cache == cache_map_.end()) { + // if objlist with 'key' is not in the memory + auto it_disk = this->disk_map_.find(key); + // if objlist with 'key' is also not on the disk + // impossible + if (it_disk == this->disk_map_.end()) { + throw husky::base::HuskyException("there is no such key in FIFOCache"); + } + ret = it_disk->second; + put(std::forward(key), ret); + this->disk_map_.erase(key); + } else { + ret = it_cache->second->second; + } + return ret; + } + + std::pair invalidate() { + // since it's fifo cache, we delete from the back (which is the first in) + auto last_key_list_pair = cache_list_.back(); + KeyT key = last_key_list_pair.first; + ObjListBase* objlist = last_key_list_pair.second; + // invalidate the objlist + cache_map_.erase(key); + cache_list_.pop_back(); + // store the objlist moved to disk + this->disk_map_[std::move(key)] = objlist; + return last_key_list_pair; + } + + template + void del(KeyU&& key) { + this->disk_map_.erase(key); + auto it_cache = cache_map_.find(key); + if (it_cache == cache_map_.end()) { + return; + } + cache_list_.erase(it_cache->second); + cache_map_.erase(key); + } + + template + bool exists(KeyU&& key) { + return cache_map_.find(key) != cache_map_.end(); + } + + private: + std::unordered_map cache_map_; + std::list> cache_list_; +}; + +template class CacheT = LRUCache> +class ObjListPool { + public: + static ObjListPool& get_instance() { + static ObjListPool pool; + return pool; + } + + // register the objlist when it is created + // at this moment, objlist contains no objects so it is considered as living in the memory + template + bool register_objlist(KeyU&& key, ObjListBase* objlist) { + std::lock_guard lock(mu); + if (register_set_.find(key) != register_set_.end()) { + return false; + } + register_set_.insert(key); + cache_.put(std::forward(key), objlist); + return true; + } + + // deregister the objlist so ObjListPool won't manage it anymore + template + bool deregister_objlist(KeyU&& key) { + std::lock_guard lock(mu); + if (register_set_.find(key) == register_set_.end()) { + return false; + } + register_set_.erase(key); + cache_.del(std::forward(key)); + return true; + } + + // we bring the objlist with 'key' into the momery + // ObjListPool is not aware of any memory issue + // It will do the job when caller think it is approriate + template + bool request_objlist(KeyU&& key); + + // bring the objlist chosen according to the cache policy from memory onto disk + // by default, we will evict one objlist + // or the caller has a better idea of how many bytes it needs exactly + int64_t request_space(const int64_t bytes_required = 1); + + template + bool in_memory(KeyU&& key) { + std::lock_guard lock(mu); + if (register_set_.find(key) == register_set_.end()) { + throw husky::base::HuskyException("key has not been registered [in memory]"); + } + return cache_.exists(std::forward(key)); + } + + template + bool on_disk(KeyU&& key) { + std::lock_guard lock(mu); + if (register_set_.find(key) == register_set_.end()) { + throw husky::base::HuskyException("key has not been registered [on disk]"); + } + return !cache_.exists(std::forward(key)); + } + + private: + ObjListPool() = default; + + CacheT cache_; + std::unordered_set register_set_; + std::mutex mu; +}; + +// true indicates that objlist with 'key' is brought to memory from disk +// false indicates that objlist with 'key' has already existsed in memory +template class CacheT> +template +bool ObjListPool::request_objlist(KeyU&& key) { + std::lock_guard lock(mu); + if (register_set_.find(key) == register_set_.end()) { + throw husky::base::HuskyException("The objlist has not registered in the ObjListPool"); + } + bool live_in_memory = cache_.exists(key); + if (live_in_memory) + return false; + ObjListBase* objlist = cache_.validate(std::forward(key)); + objlist->read_from_disk(objlist->id2str()); + return true; +} + +template class CacheT> +int64_t ObjListPool::request_space(const int64_t bytes_required) { + std::lock_guard lock(mu); + int64_t bytes_evicted = 0; + int count = 0; + int num = cache_.num_objlist_on_disk(); + while (bytes_evicted < bytes_required || count < num) { + auto key_list_pair = cache_.invalidate(); + KeyT key = key_list_pair.first; + ObjListBase* objlist = key_list_pair.second; + int64_t storage_size = objlist->estimated_storage_size(); + bool res = objlist->write_to_disk(); + if (res) { + bytes_evicted += storage_size; + } else { + cache_.validate(std::move(key)); + } + count++; + } + return bytes_evicted; +} +} // namespace husky diff --git a/core/objlist_pool_unittest.cpp b/core/objlist_pool_unittest.cpp new file mode 100644 index 0000000..d76d885 --- /dev/null +++ b/core/objlist_pool_unittest.cpp @@ -0,0 +1,230 @@ +#include "core/objlist_pool.hpp" + +#include "gtest/gtest.h" + +#include "core/objlist_store.hpp" + +namespace husky { +namespace { + +class TestObjListPool : public testing::Test { + public: + TestObjListPool() {} + ~TestObjListPool() {} + + protected: + void SetUp() {} + void TearDown() {} +}; + +class TestLRUCache : public testing::Test { + public: + TestLRUCache() {} + ~TestLRUCache() {} + + protected: + void SetUp() {} + void TearDown() {} +}; + +class TestFIFOCache : public testing::Test { + public: + TestFIFOCache() {} + ~TestFIFOCache() {} + + protected: + void SetUp() {} + void TearDown() {} +}; + +class Obj { + public: + using KeyT = int; + KeyT key; + const KeyT& id() const { return key; } + Obj() {} + explicit Obj(const KeyT& k) : key(k) {} +}; + +TEST_F(TestLRUCache, Basic) { + LRUCache cache; + + ObjList& objlist_0 = ObjListStore::create_objlist(); + size_t objlist_0_id = objlist_0.get_id(); + cache.put(objlist_0_id, &objlist_0); + auto p1 = cache.invalidate(); + EXPECT_EQ(objlist_0_id, p1.first); + EXPECT_EQ(&objlist_0, p1.second); + + ObjListBase* objlist_temp = cache.validate(objlist_0_id); + EXPECT_EQ(&objlist_0, objlist_temp); + + ObjList& objlist_1 = ObjListStore::create_objlist(); + ObjList& objlist_2 = ObjListStore::create_objlist(); + ObjList& objlist_3 = ObjListStore::create_objlist(); + size_t objlist_1_id = objlist_1.get_id(); + size_t objlist_2_id = objlist_2.get_id(); + size_t objlist_3_id = objlist_3.get_id(); + cache.put(objlist_1_id, &objlist_1); + cache.put(objlist_2_id, &objlist_2); + cache.put(objlist_3_id, &objlist_3); + + objlist_temp = cache.validate(objlist_1_id); + EXPECT_EQ(&objlist_1, objlist_temp); + + cache.del(objlist_2_id); + + auto p2 = cache.invalidate(); + EXPECT_EQ(objlist_0_id, p2.first); + EXPECT_EQ(&objlist_0, p2.second); + + auto p3 = cache.invalidate(); + EXPECT_EQ(objlist_3_id, p3.first); + EXPECT_EQ(&objlist_3, p3.second); + + auto p4 = cache.invalidate(); + EXPECT_EQ(objlist_1_id, p4.first); + EXPECT_EQ(&objlist_1, p4.second); +} + +TEST_F(TestFIFOCache, Basic) { + FIFOCache cache; + + ObjList& objlist_0 = ObjListStore::create_objlist(); + size_t objlist_0_id = objlist_0.get_id(); + cache.put(objlist_0_id, &objlist_0); + auto p1 = cache.invalidate(); + EXPECT_EQ(objlist_0_id, p1.first); + EXPECT_EQ(&objlist_0, p1.second); + + ObjListBase* objlist_temp = cache.validate(objlist_0_id); + EXPECT_EQ(&objlist_0, objlist_temp); + + ObjList& objlist_1 = ObjListStore::create_objlist(); + ObjList& objlist_2 = ObjListStore::create_objlist(); + ObjList& objlist_3 = ObjListStore::create_objlist(); + size_t objlist_1_id = objlist_1.get_id(); + size_t objlist_2_id = objlist_2.get_id(); + size_t objlist_3_id = objlist_3.get_id(); + cache.put(objlist_1_id, &objlist_1); + cache.put(objlist_2_id, &objlist_2); + cache.put(objlist_3_id, &objlist_3); + + objlist_temp = cache.validate(objlist_1_id); + EXPECT_EQ(&objlist_1, objlist_temp); + + cache.del(objlist_2_id); + + auto p2 = cache.invalidate(); + EXPECT_EQ(objlist_0_id, p2.first); + EXPECT_EQ(&objlist_0, p2.second); + + auto p3 = cache.invalidate(); + EXPECT_EQ(objlist_1_id, p3.first); + EXPECT_EQ(&objlist_1, p3.second); + + auto p4 = cache.invalidate(); + EXPECT_EQ(objlist_3_id, p4.first); + EXPECT_EQ(&objlist_3, p4.second); +} + +TEST_F(TestObjListPool, Register) { + auto& pool = ObjListPool::get_instance(); + + ObjList& objlist_0 = ObjListStore::create_objlist(); + size_t objlist_0_id = objlist_0.get_id(); + + ObjList& objlist_1 = ObjListStore::create_objlist(); + size_t objlist_1_id = objlist_1.get_id(); + + bool ret = pool.register_objlist(objlist_0_id, &objlist_0); + EXPECT_TRUE(ret); + ret = pool.register_objlist(objlist_1_id, &objlist_1); + EXPECT_TRUE(ret); + + ret = pool.register_objlist(objlist_0_id, &objlist_0); + EXPECT_FALSE(ret); + ret = pool.deregister_objlist(objlist_0_id); + EXPECT_TRUE(ret); + ret = pool.deregister_objlist(objlist_0_id); + EXPECT_FALSE(ret); +} + +TEST_F(TestObjListPool, Request) { + auto& pool = ObjListPool::get_instance(); + + ObjList& objlist_0 = ObjListStore::create_objlist(); + size_t id_0 = objlist_0.get_id(); + Obj obj0(0); + Obj obj1(1); + Obj obj2(2); + Obj obj3(3); + Obj obj4(4); + Obj obj5(5); + objlist_0.add_object(obj0); + objlist_0.add_object(obj1); + objlist_0.add_object(obj2); + objlist_0.add_object(obj3); + objlist_0.add_object(obj4); + objlist_0.add_object(obj5); + + ObjList& objlist_1 = ObjListStore::create_objlist(); + size_t id_1 = objlist_1.get_id(); + Obj obj6(6); + Obj obj7(7); + Obj obj8(8); + Obj obj9(9); + Obj obj10(10); + objlist_1.add_object(obj6); + objlist_1.add_object(obj7); + objlist_1.add_object(obj8); + objlist_1.add_object(obj9); + objlist_1.add_object(obj10); + + // EXPECT_TRUE(false) << "after adding objects"; + + bool ret = pool.register_objlist(id_0, &objlist_0); + EXPECT_TRUE(ret); + ret = pool.register_objlist(id_1, &objlist_1); + EXPECT_TRUE(ret); + + // EXPECT_TRUE(false) << "after registering objlist"; + + ret = pool.on_disk(id_0); + EXPECT_FALSE(ret); + ret = pool.on_disk(id_1); + EXPECT_FALSE(ret); + + // EXPECT_TRUE(false) << "after checking on disk"; + + ret = pool.request_objlist(id_0); + EXPECT_FALSE(ret); + + // EXPECT_TRUE(false) << "after requesting objlist id_0"; + + ret = pool.in_memory(id_0); + EXPECT_TRUE(ret); + ret = pool.request_objlist(id_1); + EXPECT_FALSE(ret); + + // EXPECT_TRUE(false) << "after requesting objlist id_1"; + + ret = pool.in_memory(id_1); + EXPECT_TRUE(ret); + + int64_t bytes = pool.request_space(); + EXPECT_GE(bytes, 1); + ret = pool.in_memory(id_0); + EXPECT_FALSE(ret); + ret = pool.in_memory(id_1); + EXPECT_TRUE(ret); + + bytes = pool.request_space(); + EXPECT_GE(bytes, 1); + ret = pool.in_memory(id_0); + EXPECT_FALSE(ret); + ret = pool.in_memory(id_1); + EXPECT_FALSE(ret); +} +} // namespace +} // namespace husky