diff --git a/base/CMakeLists.txt b/base/CMakeLists.txt index 90e3013..9b75092 100644 --- a/base/CMakeLists.txt +++ b/base/CMakeLists.txt @@ -24,7 +24,8 @@ file(GLOB base-src-files disk_store.cpp serialization.cpp session_local.cpp - thread_support.cpp) + thread_support.cpp + memory.cpp) husky_cache_variable(base-src-files ${base-src-files}) add_library(base-objs OBJECT ${base-src-files}) diff --git a/base/disk_store.cpp b/base/disk_store.cpp index 93369db..8c76f53 100644 --- a/base/disk_store.cpp +++ b/base/disk_store.cpp @@ -40,9 +40,23 @@ BinStream DiskStore::read() { // TODO(legend): Will use `pwrite` or other asio functions instead. bool DiskStore::write(BinStream&& bs) { - if (bs.get_buffer_vector().empty()) + // if (bs.get_buffer_vector().empty()) + // return false; + + std::ofstream file; + file.open(path_, std::ofstream::out | std::ofstream::binary); + if (!file) return false; + std::copy(bs.get_buffer_vector().begin(), bs.get_buffer_vector().end(), std::ostreambuf_iterator(file)); + file.close(); + return true; +} + +bool DiskStore::write(BinStream& bs) { + // if (bs.get_buffer_vector().empty()) + // return false; + std::ofstream file; file.open(path_, std::ofstream::out | std::ofstream::binary); if (!file) diff --git a/base/disk_store.hpp b/base/disk_store.hpp index 340b214..dcf6d75 100644 --- a/base/disk_store.hpp +++ b/base/disk_store.hpp @@ -30,6 +30,7 @@ class DiskStore { BinStream read(); bool write(BinStream&& bs); + bool write(BinStream& bs); private: std::string path_; diff --git a/base/memory.cpp b/base/memory.cpp new file mode 100644 index 0000000..06d3966 --- /dev/null +++ b/base/memory.cpp @@ -0,0 +1,22 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "base/memory.hpp" + +namespace husky { +namespace base { + +struct sysinfo Memory::mem_info; +} +} diff --git a/base/memory.hpp b/base/memory.hpp new file mode 100644 index 0000000..1b5b2e2 --- /dev/null +++ b/base/memory.hpp @@ -0,0 +1,43 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "sys/sysinfo.h" +#include "sys/types.h" + +namespace husky { +namespace base { +class Memory { + public: + static int64_t total_phys_mem() { + sysinfo(&mem_info); + int64_t total_phys_mem = mem_info.totalram; + total_phys_mem *= mem_info.mem_unit; + return total_phys_mem; + } + + static int64_t total_virtual_mem() { + sysinfo(&mem_info); + int64_t total_vir_mem = mem_info.totalram; + total_vir_mem += mem_info.totalswap; + total_vir_mem *= mem_info.mem_unit; + return total_vir_mem; + } + + private: + static struct sysinfo mem_info; +}; +} // namespace base +} // namespace husky diff --git a/base/serialization.cpp b/base/serialization.cpp index 8cfbe3a..7dbb745 100644 --- a/base/serialization.cpp +++ b/base/serialization.cpp @@ -71,6 +71,21 @@ void BinStream::resize(size_t size) { void BinStream::seek(size_t pos) { front_ = pos; } +BinStream BinStream::sub_stream(size_t s_pos, size_t len) { + int size = buffer_.size(); + auto it_start = buffer_.begin() + front_ + s_pos; + if (front_ + s_pos > size) { + std::vector v; + return BinStream(std::move(v)); + } + auto it_end = it_start + len; + if (front_ + s_pos + len > size) { + it_end = buffer_.end(); + } + std::vector v(it_start, it_end); + return BinStream(std::move(v)); +} + void BinStream::push_back_bytes(const char* src, size_t sz) { buffer_.insert(buffer_.end(), (const char*) src, (const char*) src + sz); } diff --git a/base/serialization.hpp b/base/serialization.hpp index 5962686..5142486 100644 --- a/base/serialization.hpp +++ b/base/serialization.hpp @@ -49,6 +49,7 @@ class BinStream { void purge(); void resize(size_t size); void seek(size_t pos); + BinStream sub_stream(size_t s_pos, size_t len); void append(const BinStream& m); void push_back_bytes(const char* src, size_t sz); @@ -145,8 +146,9 @@ BinStream& operator>>(BinStream& stream, std::vector& v) { stream >> len; v.clear(); v.resize(len); - for (int i = 0; i < v.size(); ++i) + for (int i = 0; i < v.size(); ++i) { stream >> v[i]; + } return stream; } diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index 9a3f204..0ff8dfd 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -20,6 +20,7 @@ add_subdirectory(channel) file(GLOB core-src-files accessor.cpp accessor_store.cpp + balance.cpp combiner.cpp config.cpp context.cpp @@ -28,12 +29,14 @@ file(GLOB core-src-files job_runner.cpp mailbox.cpp memory_checker.cpp + memory_pool.cpp network.cpp objlist.cpp objlist_store.cpp + page.cpp + page_store.cpp shuffle_combiner_store.cpp - worker_info.cpp - balance.cpp) + worker_info.cpp) husky_cache_variable(core-src-files ${core-src-files}) add_library(core-objs OBJECT ${core-src-files}) diff --git a/core/attrlist_unittest.cpp b/core/attrlist_unittest.cpp index 10f653d..8700ac6 100644 --- a/core/attrlist_unittest.cpp +++ b/core/attrlist_unittest.cpp @@ -28,7 +28,13 @@ class TestAttrList : public testing::Test { ~TestAttrList() {} protected: - void SetUp() {} + void SetUp() { + Config config; + config.set_param("maximum_thread_memory", "134217728"); + config.set_param("page_size", "2097152"); + Context::set_config(std::move(config)); + Context::set_local_tid(0); + } void TearDown() {} }; diff --git a/core/cache.hpp b/core/cache.hpp new file mode 100644 index 0000000..563dcd5 --- /dev/null +++ b/core/cache.hpp @@ -0,0 +1,149 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#include "boost/optional.hpp" + +namespace husky { + +template +class ICache { + public: + explicit ICache(size_t size) { size_ = size; } + + size_t get_max_size() { return size_; } + + protected: + size_t size_; +}; + +template +class LRUCache : public ICache { + public: + using ListIterator = typename std::list>::iterator; + + explicit LRUCache(size_t size) : ICache(size) {} + + ~LRUCache() = default; + + template + void put(KeyU&& key, ValU&& val) { + auto it_cache = cache_map_.find(key); + cache_list_.push_front(std::make_pair(key, val)); + if (it_cache != cache_map_.end()) { + cache_list_.erase(it_cache->second); + cache_map_.erase(it_cache); + } + cache_map_[key] = cache_list_.begin(); + + if (cache_map_.size() > this->size_) { + del(); + } + } + + boost::optional> del() { + if (cache_map_.size() == 0) { + return boost::none; + } + auto last = cache_list_.end(); + last--; + KeyT key = last->first; + ValT objlist = last->second; + cache_map_.erase(key); + cache_list_.pop_back(); + return *last; + } + + boost::optional> poll() { + if (cache_map_.size() == 0) { + return boost::none; + } + return cache_list_.back(); + } + + size_t get_size() { return cache_map_.size(); } + + template + bool exists(KeyU&& key) { + return cache_map_.find(std::forward(key)) != cache_map_.end(); + } + + private: + std::unordered_map cache_map_; + std::list> cache_list_; +}; + +template +class FIFOCache : public ICache { + public: + using ListIterator = typename std::list>::iterator; + + explicit FIFOCache(size_t size) : ICache(size) {} + + ~FIFOCache() = default; + + template + void put(KeyU&& key, ValU&& val) { + auto it_cache = cache_map_.find(key); + if (it_cache != cache_map_.end() && it_cache->second->second != val) { + cache_list_.erase(it_cache->second); + cache_map_.erase(it_cache); + } else if (it_cache != cache_map_.end() && it_cache->second->second == val) { + return; + } + cache_list_.push_front(std::make_pair(key, val)); + cache_map_[key] = cache_list_.begin(); + + if (cache_map_.size() > this->size_) { + del(); + } + } + + boost::optional> del() { + if (cache_map_.size() == 0) { + return boost::none; + } + auto last = cache_list_.end(); + last--; + KeyT key = last->first; + ValT objlist = last->second; + cache_map_.erase(key); + cache_list_.pop_back(); + return *last; + } + + boost::optional> poll() { + if (cache_map_.size() == 0) { + return boost::none; + } + return cache_list_.back(); + } + + template + bool exists(KeyU&& key) { + return cache_map_.find(key) != cache_map_.end(); + } + + size_t get_size() { return cache_map_.size(); } + + private: + std::unordered_map cache_map_; + std::list> cache_list_; +}; +} // namespace husky diff --git a/core/cache_unittest.cpp b/core/cache_unittest.cpp new file mode 100644 index 0000000..a4ceaee --- /dev/null +++ b/core/cache_unittest.cpp @@ -0,0 +1,189 @@ +#include "core/cache.hpp" + +#include "gtest/gtest.h" + +namespace husky { +namespace { +class TestLRUCache : public testing::Test { + public: + TestLRUCache() {} + ~TestLRUCache() {} + + protected: + void SetUp() {} + void TearDown() {} +}; + +class TestFIFOCache : public testing::Test { + public: + TestFIFOCache() {} + ~TestFIFOCache() {} + + protected: + void SetUp() {} + void TearDown() {} +}; + +TEST_F(TestLRUCache, Basic) { + LRUCache cache(5); + + EXPECT_EQ(cache.poll(), boost::none); + EXPECT_EQ(cache.del(), boost::none); + cache.put(1, 11); + cache.put(2, 12); + cache.put(3, 13); + cache.put(4, 14); + cache.put(5, 15); + + auto kv_pair = *cache.poll(); + EXPECT_EQ(kv_pair.first, 1); + EXPECT_EQ(kv_pair.second, 11); + + kv_pair = *cache.del(); + EXPECT_EQ(kv_pair.first, 1); + EXPECT_EQ(kv_pair.second, 11); + + cache.put(2, 16); + kv_pair = *cache.poll(); + EXPECT_EQ(kv_pair.first, 3); + EXPECT_EQ(kv_pair.second, 13); + + kv_pair = *cache.del(); + EXPECT_EQ(kv_pair.first, 3); + EXPECT_EQ(kv_pair.second, 13); + + cache.put(4, 17); + cache.put(5, 18); + + kv_pair = *cache.poll(); + EXPECT_EQ(kv_pair.first, 2); + EXPECT_EQ(kv_pair.second, 16); + + kv_pair = *cache.del(); + EXPECT_EQ(kv_pair.first, 2); + EXPECT_EQ(kv_pair.second, 16); + + kv_pair = *cache.poll(); + EXPECT_EQ(kv_pair.first, 4); + EXPECT_EQ(kv_pair.second, 17); + + kv_pair = *cache.del(); + EXPECT_EQ(kv_pair.first, 4); + EXPECT_EQ(kv_pair.second, 17); + + kv_pair = *cache.poll(); + EXPECT_EQ(kv_pair.first, 5); + EXPECT_EQ(kv_pair.second, 18); + + kv_pair = *cache.del(); + EXPECT_EQ(kv_pair.first, 5); + EXPECT_EQ(kv_pair.second, 18); + + EXPECT_EQ(cache.get_size(), 0); + + cache.put(1, 100); + cache.put(2, 200); + cache.put(3, 300); + cache.put(4, 400); + cache.put(5, 500); + EXPECT_EQ(cache.get_size(), 5); + + cache.put(6, 600); + EXPECT_EQ(cache.get_size(), 5); + cache.put(7, 700); + cache.put(8, 800); + cache.put(9, 900); + cache.put(10, 1000); + EXPECT_EQ(cache.get_size(), 5); + + kv_pair = *cache.poll(); + EXPECT_EQ(kv_pair.first, 6); + EXPECT_EQ(kv_pair.second, 600); +} + +TEST_F(TestFIFOCache, Basic) { + FIFOCache cache(5); + + EXPECT_EQ(cache.poll(), boost::none); + EXPECT_EQ(cache.del(), boost::none); + cache.put(1, 11); + cache.put(2, 12); + cache.put(3, 13); + cache.put(4, 14); + cache.put(5, 15); + + auto kv_pair = *cache.poll(); + EXPECT_EQ(kv_pair.first, 1); + EXPECT_EQ(kv_pair.second, 11); + + kv_pair = *cache.del(); + EXPECT_EQ(kv_pair.first, 1); + EXPECT_EQ(kv_pair.second, 11); + + cache.put(2, 12); + kv_pair = *cache.poll(); + EXPECT_EQ(kv_pair.first, 2); + EXPECT_EQ(kv_pair.second, 12); + + cache.put(2, 16); + kv_pair = *cache.poll(); + EXPECT_EQ(kv_pair.first, 3); + EXPECT_EQ(kv_pair.second, 13); + + kv_pair = *cache.del(); + EXPECT_EQ(kv_pair.first, 3); + EXPECT_EQ(kv_pair.second, 13); + + kv_pair = *cache.poll(); + EXPECT_EQ(kv_pair.first, 4); + EXPECT_EQ(kv_pair.second, 14); + + kv_pair = *cache.del(); + EXPECT_EQ(kv_pair.first, 4); + EXPECT_EQ(kv_pair.second, 14); + + cache.put(5, 25); + kv_pair = *cache.poll(); + EXPECT_EQ(kv_pair.first, 2); + EXPECT_EQ(kv_pair.second, 16); + + kv_pair = *cache.del(); + EXPECT_EQ(kv_pair.first, 2); + EXPECT_EQ(kv_pair.second, 16); + + kv_pair = *cache.poll(); + EXPECT_EQ(kv_pair.first, 5); + EXPECT_EQ(kv_pair.second, 25); + + kv_pair = *cache.del(); + EXPECT_EQ(kv_pair.first, 5); + EXPECT_EQ(kv_pair.second, 25); + + EXPECT_EQ(cache.get_size(), 0); + EXPECT_EQ(cache.poll(), boost::none); + EXPECT_EQ(cache.del(), boost::none); + + cache.put(1, 11); + cache.put(2, 12); + cache.put(3, 13); + cache.put(4, 14); + cache.put(5, 15); + + EXPECT_EQ(cache.get_size(), 5); + + cache.put(6, 600); + cache.put(7, 700); + cache.put(8, 800); + cache.put(9, 900); + cache.put(10, 1000); + + EXPECT_EQ(cache.get_size(), 5); + cache.put(6, 600); + + kv_pair = *cache.poll(); + EXPECT_EQ(kv_pair.first, 6); + EXPECT_EQ(kv_pair.second, 600); +} + +} // namespace +} // namespace husky diff --git a/core/config.cpp b/core/config.cpp index 35656f1..b207aef 100644 --- a/core/config.cpp +++ b/core/config.cpp @@ -23,6 +23,7 @@ #include "boost/program_options.hpp" #include "base/log.hpp" +#include "base/memory.hpp" #include "core/network.hpp" namespace husky { @@ -77,15 +78,27 @@ bool Config::init_with_args(int ac, char** av, const std::vector& c "Worker information.\nFormat is '%worker_hostname:%thread_number'.\nUse " "colon ':' as separator."); + po::options_description optional_options("Optional options"); + optional_options.add_options()("maximum_thread_memory", po::value()->default_value(""), + "Maximum memory for a thread")( + "maximum_process_memory", + po::value()->default_value(std::to_string(base::Memory::total_phys_mem())), + "Maximum memory for a process")("page_size", po::value()->default_value("4194304"), + "Size of a page"); + po::options_description customized_options("Customized options"); if (!customized.empty()) for (auto& arg : customized) customized_options.add_options()(arg.c_str(), po::value(), ""); po::options_description cmdline_options; - cmdline_options.add(generic_options).add(config_file_options).add(required_options).add(worker_info_options); + cmdline_options.add(generic_options) + .add(config_file_options) + .add(required_options) + .add(worker_info_options) + .add(optional_options); po::options_description config_options; - config_options.add(required_options).add(worker_info_config); + config_options.add(required_options).add(worker_info_config).add(optional_options); if (!customized_options.options().empty()) { cmdline_options.add(customized_options); config_options.add(customized_options); @@ -185,6 +198,23 @@ bool Config::init_with_args(int ac, char** av, const std::vector& c LOG_E << "arg worker.info is needed"; } + // optional options + if (vm.count("maximum_thread_memory")) { + set_param("maximum_thread_memory", vm["maximum_thread_memory"].as()); + } else { + std::string str_max_proc_mem = vm["maximum_process_memory"].as(); + std::stringstream str_stream(str_max_proc_mem); + int64_t max_proc_mem; + str_stream >> max_proc_mem; + int num_local_threads = 1; + if (worker_info != nullptr) { + num_local_threads = worker_info->get_num_local_workers(); + } + set_param("maximum_thread_memory", std::to_string(max_proc_mem / num_local_threads)); + } + set_param("maximum_process_memory", vm["maximum_process_memory"].as()); + set_param("page_size", vm["page_size"].as()); + if (!customized.empty()) { for (auto& arg : customized) if (vm.count(arg.c_str())) { diff --git a/core/memory_pool.cpp b/core/memory_pool.cpp new file mode 100644 index 0000000..05614b8 --- /dev/null +++ b/core/memory_pool.cpp @@ -0,0 +1,99 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "core/memory_pool.hpp" + +#include + +namespace husky { + +thread_local MemoryPool* MemoryPool::mem_pool_ = nullptr; + +MemoryPool& MemoryPool::get_mem_pool() { + init_mem_pool(); + return *mem_pool_; +} + +void MemoryPool::init_mem_pool() { + if (mem_pool_ == nullptr) { + mem_pool_ = new MemoryPool(); + } +} + +void MemoryPool::free_mem_pool() { + if (mem_pool_ != nullptr) + delete mem_pool_; + mem_pool_ = nullptr; +} + +MemoryPool::MemoryPool() { + std::stringstream str_stream; + std::string tmp_tm = Context::get_param("maximum_thread_memory"); + std::string tmp_ps = Context::get_param("page_size"); + DLOG_I << "MemoryPool maximum_thread_memory: " << tmp_tm; + DLOG_I << "MemoryPool page_size: " << tmp_ps; + str_stream.str(tmp_tm); + int64_t max_thread_mem; + str_stream >> max_thread_mem; + str_stream.clear(); + str_stream.str(tmp_ps); + int64_t page_size; + str_stream >> page_size; + num_pages_ = max_thread_mem / page_size; + DLOG_I << "MemoryPool num_pages:" << num_pages_; + cache_ = new LRUCache(num_pages_); +} + +// true indicates that objlist with 'key' is brought to memory from disk +// false indicates that objlist with 'key' has already existsed in memory +bool MemoryPool::request_page(const typename Page::KeyT& key, Page* page) { + bool live_in_memory = cache_->exists(key); + // first check whether the page we request currently lives in the memory + // if yes, then nothing we need to do, just return + if (live_in_memory) + return false; + // if not, then check out the candidate that 'may' be deleted by the cache in the 'put' operation + auto del_candidate1 = cache_->poll(); + cache_->put(key, page); + // check out the candiidate to be deleted again + auto del_candidate2 = cache_->poll(); + // if they are not the same, which means the first candidate just get deleted + if (del_candidate1 != boost::none && del_candidate1 != del_candidate2) { + (*del_candidate1).second->swap_out_memory(); + } + // first swap out the old page, then bring in the new page + page->swap_in_memory(); + return true; +} + +// in case that user want to get some free space without swapping in a page +int64_t MemoryPool::request_space(const int64_t bytes_required) { + int64_t bytes_evicted = 0; + int count = 0; + // check out the number of pages in cache + int num = cache_->get_size(); + while (bytes_evicted < bytes_required && count < num) { + auto del_candidate = cache_->poll(); + typename Page::KeyT key = (*del_candidate).first; + Page* page = (*del_candidate).second; + // currently we use all memory a page can hold to determine the memory used by this page + int64_t page_bytes = page->all_memory(); + page->swap_out_memory(); + bytes_evicted += page_bytes; + cache_->del(); + count++; + } + return bytes_evicted; +} +} // namespace husky diff --git a/core/memory_pool.hpp b/core/memory_pool.hpp new file mode 100644 index 0000000..9a1938c --- /dev/null +++ b/core/memory_pool.hpp @@ -0,0 +1,62 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +#include "core/cache.hpp" +#include "core/page.hpp" + +namespace husky { + +class MemoryPool { + public: + static MemoryPool& get_mem_pool(); + + static void init_mem_pool(); + static void free_mem_pool(); + + // we bring the objlist with 'key' into the momery + // ObjListPool is not aware of any memory issue + // It will do the job when caller think it is approriate + bool request_page(const typename Page::KeyT& key, Page* page); + + // bring the objlist chosen according to the cache policy from memory onto disk + // by default, we will evict one objlist + // or the caller has a better idea of how many bytes it needs exactly + int64_t request_space(const int64_t bytes_required = 1); + + inline bool in_memory(const typename Page::KeyT& key) { return cache_->exists(key); } + + inline bool on_disk(const typename Page::KeyT&& key) { return !cache_->exists(key); } + + inline size_t num_pages_in_memory() { return cache_->get_size(); } + + // the maximum number of pages allowed by the pool + inline size_t capacity() { return num_pages_; } + + private: + MemoryPool(); + + LRUCache* cache_; + int64_t num_pages_; + + static thread_local MemoryPool* mem_pool_; +}; + +} // namespace husky diff --git a/core/memory_pool_unittest.cpp b/core/memory_pool_unittest.cpp new file mode 100644 index 0000000..d23a842 --- /dev/null +++ b/core/memory_pool_unittest.cpp @@ -0,0 +1,96 @@ +#include "core/memory_pool.hpp" + +#include +#include + +#include "gtest/gtest.h" + +#include "core/objlist.hpp" +#include "core/objlist_data.hpp" +#include "core/objlist_store.hpp" + +namespace husky { +namespace { + +class TestMemoryPool : public testing::Test { + public: + TestMemoryPool() {} + ~TestMemoryPool() {} + + protected: + void SetUp() { + MemoryPool::free_mem_pool(); + PageStore::drop_all_pages(); + PageStore::free_page_map(); + Config config; + config.set_param("maximum_thread_memory", "134217728"); + config.set_param("page_size", "2097152"); + Context::set_config(std::move(config)); + Context::set_local_tid(0); + } + void TearDown() { + MemoryPool::free_mem_pool(); + PageStore::drop_all_pages(); + PageStore::free_page_map(); + } +}; + +class Obj { + public: + using KeyT = int; + KeyT key; + const KeyT& id() const { return key; } + Obj() {} + explicit Obj(const KeyT& k) : key(k) {} +}; + +TEST_F(TestMemoryPool, Functional) { + std::stringstream str_stream(Context::get_param("page_size")); + int64_t page_size; + str_stream >> page_size; + str_stream.clear(); + int64_t max_thread_mem; + str_stream.str(Context::get_param("maximum_thread_memory")); + str_stream >> max_thread_mem; + + auto& mem_pool = MemoryPool::get_mem_pool(); + + size_t num_pages = max_thread_mem / page_size; + EXPECT_EQ(mem_pool.capacity(), num_pages); + EXPECT_EQ(mem_pool.num_pages_in_memory(), 0); + + for (int i = 1; i <= num_pages; i++) { + Page* p = PageStore::create_page(); + EXPECT_EQ(p->all_memory(), page_size); + EXPECT_EQ(mem_pool.request_page(p->get_key(), p), true); + EXPECT_EQ(mem_pool.num_pages_in_memory(), i); + } + + for (int i = 1; i <= num_pages; i++) { + Page* p = PageStore::create_page(); + EXPECT_EQ(p->all_memory(), page_size); + EXPECT_EQ(mem_pool.request_page(p->get_key(), p), true); + EXPECT_EQ(mem_pool.num_pages_in_memory(), num_pages); + } + + EXPECT_EQ(mem_pool.request_space(1), page_size); + EXPECT_EQ(mem_pool.num_pages_in_memory(), num_pages - 1); + + EXPECT_EQ(mem_pool.request_space(page_size), page_size); + EXPECT_EQ(mem_pool.num_pages_in_memory(), num_pages - 2); + + EXPECT_EQ(mem_pool.request_space(page_size + 1), page_size * 2); + EXPECT_EQ(mem_pool.num_pages_in_memory(), num_pages - 4); + + for (int i = 5; i >= 0; i--) { + EXPECT_EQ(mem_pool.request_space(page_size - 1), page_size); + EXPECT_EQ(mem_pool.num_pages_in_memory(), num_pages + i - 10); + } + + EXPECT_EQ(mem_pool.request_space(1), page_size); + EXPECT_EQ(mem_pool.num_pages_in_memory(), num_pages - 11 < 0 ? 0 : num_pages - 11); + EXPECT_EQ(mem_pool.capacity(), num_pages); +} + +} // namespace +} // namespace husky diff --git a/core/objlist.hpp b/core/objlist.hpp index 54b4a10..95c22a4 100644 --- a/core/objlist.hpp +++ b/core/objlist.hpp @@ -18,17 +18,18 @@ #include #include #include +#include #include #include "boost/random.hpp" #include "base/assert.hpp" -#include "base/disk_store.hpp" #include "base/exception.hpp" #include "base/serialization.hpp" #include "core/attrlist.hpp" #include "core/channel/channel_destination.hpp" #include "core/channel/channel_source.hpp" +#include "core/objlist_data.hpp" namespace husky { @@ -54,6 +55,12 @@ class ObjListBase : public ChannelSource, public ChannelDestination { virtual size_t get_size() const = 0; + virtual void sort() = 0; + + virtual void deletion_finalize() = 0; + + virtual void clear_page_from_memory(Page* page) = 0; + private: size_t id_; @@ -63,7 +70,6 @@ class ObjListBase : public ChannelSource, public ChannelDestination { template class ObjList : public ObjListBase { public: - // TODO(all): should be protected. The list should be constructed by possibly Context ObjList() = default; virtual ~ObjList() { @@ -78,17 +84,18 @@ class ObjList : public ObjListBase { ObjList(ObjList&&) = default; ObjList& operator=(ObjList&&) = default; - std::vector& get_data() { return objlist_data_.data_; } - std::vector& get_del_bitmap() { return del_bitmap_; } + std::vector& get_data() { return objlist_data_.get_data(); } + + std::vector& get_del_bitmap() { return objlist_data_.get_del_bitmap(); } - // Sort the objlist void sort() { - auto& data = objlist_data_.data_; + auto& data = objlist_data_.get_data(); if (data.size() == 0) return; std::vector order(this->get_size()); - for (int i = 0; i < order.size(); ++i) + for (int i = 0; i < order.size(); ++i) { order[i] = i; + } // sort the permutation std::sort(order.begin(), order.end(), [&](const size_t a, const size_t b) { return data[a].id() < data[b].id(); }); @@ -96,68 +103,62 @@ class ObjList : public ObjListBase { for (auto& it : this->attrlist_map) it.second->reorder(order); std::sort(data.begin(), data.end(), [](const ObjT& a, const ObjT& b) { return a.id() < b.id(); }); - hashed_objs_.clear(); - sorted_size_ = data.size(); + auto& hashed_objs = objlist_data_.get_hashed_objs(); + hashed_objs.clear(); + auto& sorted_size = objlist_data_.get_sorted_size(); + sorted_size = data.size(); } - // TODO(Fan): This will invalidate the object dict void deletion_finalize() { - auto& data = objlist_data_.data_; + auto& data = objlist_data_.get_data(); + auto& del_bitmap = objlist_data_.get_del_bitmap(); if (data.size() == 0) return; size_t i = 0, j; // move i to the first empty place - while (i < data.size() && !del_bitmap_[i]) + while (i < data.size() && !del_bitmap[i]) i++; if (i == data.size()) return; for (j = data.size() - 1; j > 0; j--) { - if (!del_bitmap_[j]) { + if (!del_bitmap[j]) { data[i] = std::move(data[j]); // move j_th attribute to i_th for all attr lists for (auto& it : this->attrlist_map) it.second->move(i, j); i += 1; // move i to the next empty place - while (i < data.size() && !del_bitmap_[i]) + while (i < data.size() && !del_bitmap[i]) i++; } if (i >= j) break; } data.resize(j); - del_bitmap_.resize(j); + del_bitmap.resize(j); for (auto& it : this->attrlist_map) it.second->resize(j); - objlist_data_.num_del_ = 0; - std::fill(del_bitmap_.begin(), del_bitmap_.end(), 0); + auto& num_del = objlist_data_.get_num_del(); + num_del = 0; + std::fill(del_bitmap.begin(), del_bitmap.end(), 0); + BinStream bs; + bs << data; + objlist_data_.byte_size_ = bs.size(); } - // Delete an object - size_t delete_object(const ObjT* const obj_ptr) { - // TODO(all): Decide whether we can remove this - // if (unlikely(del_bitmap_.size() < data.size())) { - // del_bitmap_.resize(data.size()); - // } - // lazy operation - size_t idx = obj_ptr - &objlist_data_.data_[0]; - if (idx < 0 || idx >= objlist_data_.data_.size()) - throw base::HuskyException("ObjList::delete_object error: index out of range"); - del_bitmap_[idx] = true; - objlist_data_.num_del_ += 1; - return idx; - } + size_t delete_object(const ObjT* const obj_ptr) { return objlist_data_.delete_object(obj_ptr); } + + size_t delete_object(const typename ObjT::KeyT& key) { return objlist_data_.delete_object(key); } - // Find obj according to key - // @Return a pointer to obj ObjT* find(const typename ObjT::KeyT& key) { - auto& working_list = objlist_data_.data_; + auto& working_list = objlist_data_.get_data(); + auto& sorted_size = objlist_data_.get_sorted_size(); if (working_list.size() == 0) return nullptr; ObjT* start_addr = &working_list[0]; - int r = this->sorted_size_ - 1; + int r = sorted_size - 1; int l = 0; int m = (r + l) / 2; @@ -180,34 +181,45 @@ class ObjList : public ObjListBase { m = (r + l) / 2; } + auto& data = objlist_data_.get_data(); + auto& hashed_objs = objlist_data_.get_hashed_objs(); // The object to find is not in the sorted part - if ((sorted_size_ < objlist_data_.data_.size()) && (hashed_objs_.find(key) != hashed_objs_.end())) - return &(objlist_data_.data_[hashed_objs_[key]]); + if ((sorted_size < data.size()) && (hashed_objs.find(key) != hashed_objs.end())) + return &(data[hashed_objs[key]]); return nullptr; } - // Find the index of an obj - size_t index_of(const ObjT* const obj_ptr) const { return objlist_data_.index_of(obj_ptr); } + size_t index_of(const ObjT* const obj_ptr) { return objlist_data_.index_of(obj_ptr); } - // Add an object - size_t add_object(ObjT&& obj) { - auto& data = objlist_data_.data_; - size_t ret = hashed_objs_[obj.id()] = data.size(); - data.push_back(std::move(obj)); - del_bitmap_.push_back(0); - return ret; - } - size_t add_object(const ObjT& obj) { - auto& data = objlist_data_.data_; - size_t ret = hashed_objs_[obj.id()] = data.size(); - data.push_back(obj); - del_bitmap_.push_back(0); + size_t index_of(const typename ObjT::KeyT& key) { return objlist_data_.index_of(key); } + + template + size_t add_object(ObjU&& obj) { + if (!objlist_data_.check_data_in_memory()) { + DLOG_I << "data not in memory, bring data from disk"; + objlist_data_.read_data_from_disk(); + } + BinStream bs; + bs << obj; + objlist_data_.byte_size_ += bs.size(); + size_t num_pages = objlist_data_.pages_.size(); + while (objlist_data_.byte_size_ > objlist_data_.page_size_ * num_pages) { + DLOG_I << "not enough pages"; + Page* new_page = PageStore::create_page(); + new_page->set_owner(this); + DLOG_I << "request new page"; + MemoryPool::get_mem_pool().request_page(new_page->get_key(), new_page); + DLOG_I << "push_back new page into objlist_data"; + objlist_data_.pages_.push_back(new_page); + num_pages += 1; + } + size_t ret = objlist_data_.get_hashed_objs()[obj.id()] = objlist_data_.get_data().size(); + objlist_data_.get_data().push_back(std::forward(obj)); + objlist_data_.get_del_bitmap().push_back(0); return ret; } - // Check a certain position of del_bitmap_ - // @Return True if it's deleted - bool get_del(size_t idx) const { return del_bitmap_[idx]; } + bool get_del(size_t idx) const { return objlist_data_.del_bitmap_[idx]; } // Create AttrList template @@ -219,7 +231,6 @@ class ObjList : public ObjListBase { return (*attrlist); } - // Get AttrList template AttrList& get_attrlist(const std::string& attr_name) { if (attrlist_map.find(attr_name) == attrlist_map.end()) @@ -246,43 +257,6 @@ class ObjList : public ObjListBase { item.second->process_bin(bin, idx); } - inline size_t get_sorted_size() const { return sorted_size_; } - inline size_t get_num_del() const { return objlist_data_.num_del_; } - inline size_t get_hashed_size() const { return hashed_objs_.size(); } - inline size_t get_size() const override { return objlist_data_.get_size(); } - inline size_t get_vector_size() const { return objlist_data_.get_vector_size(); } - inline ObjT& get(size_t i) { return objlist_data_.data_[i]; } - - bool write_to_disk() { - DiskStore ds(id2str()); - BinStream bs; - deletion_finalize(); - sort(); - bs << objlist_data_; - this->clear_from_memory(); - return ds.write(std::move(bs)); - } - - void read_from_disk(const std::string& objlist_path) { - DiskStore ds(objlist_path); - BinStream bs = ds.read(); - objlist_data_.clear(); - bs >> objlist_data_; - sorted_size_ = objlist_data_.data_.size(); - del_bitmap_.clear(); - del_bitmap_.resize(sorted_size_, false); - hashed_objs_.clear(); - } - - void clear_from_memory() { - std::vector tmp_obj; - std::vector& data = this->get_data(); - data.swap(tmp_obj); - - std::vector tmp_bool; - del_bitmap_.swap(tmp_bool); - } - size_t estimated_storage_size(const double sample_rate = 0.005) { if (this->get_vector_size() == 0) return 0; @@ -300,18 +274,44 @@ class ObjList : public ObjListBase { // log the size for (auto iter = sample_container.begin(); iter != sample_container.end(); ++iter) - bs << objlist_data_.data_[*iter]; + bs << objlist_data_.get_data()[*iter]; - std::vector& v = objlist_data_.data_; + std::vector& v = objlist_data_.get_data(); size_t ret = bs.size() * sizeof(char) * v.capacity() / sample_num; return ret; } + inline size_t get_sorted_size() const { return objlist_data_.get_sorted_size(); } + inline size_t get_num_del() const { return objlist_data_.get_num_del(); } + inline size_t get_hashed_size() const { return objlist_data_.get_hashed_size(); } + inline size_t get_size() const override { return objlist_data_.get_size(); } + inline size_t get_vector_size() const { return objlist_data_.get_vector_size(); } + inline ObjT& get(size_t i) { return objlist_data_.get(i); } + + bool check_data_in_memory() { return objlist_data_.check_data_in_memory(); } + protected: + // this method is called by page owned by this objlist_data when the page is about to be swapped out of the memory + // since currently only supporting the swapping of the whole objlist_data + // so when the first page is about to be swapped out of the memory + // we write the data vector into these pages + // then these pages write into their corresponding file on the disk + void clear_page_from_memory(Page* page) { + DLOG_I << "objlist " << this->get_id() << " clear page from memory get called"; + bool data_in_mem = objlist_data_.check_data_in_memory(); + bool pages_in_mem = objlist_data_.check_pages_in_memory(); + // check whether this is the first page owned by this objlist to be swapped out of the memory + if (data_in_mem && pages_in_mem) { + DLOG_I << "objlist " << this->get_id() << " clear page from memory get called and we write data to disk"; + deletion_finalize(); + sort(); + objlist_data_.write_to_disk(); + } + } + ObjListData objlist_data_; - size_t sorted_size_ = 0; - std::vector del_bitmap_; - std::unordered_map hashed_objs_; std::unordered_map attrlist_map; + + friend class Page; }; } // namespace husky diff --git a/core/objlist_data.hpp b/core/objlist_data.hpp index 07ad210..0304031 100644 --- a/core/objlist_data.hpp +++ b/core/objlist_data.hpp @@ -14,20 +14,39 @@ #pragma once +#include +#include +#include #include #include "base/exception.hpp" #include "base/serialization.hpp" +#include "core/memory_pool.hpp" +#include "core/page.hpp" +#include "core/page_store.hpp" namespace husky { using base::BinStream; +class ObjListDataBase {}; + template -class ObjListData { +class ObjListData : public ObjListDataBase { public: - ObjListData() = default; - ~ObjListData() = default; + ObjListData() { + std::string page_size = Context::get_param("page_size"); + std::stringstream str_stream; + str_stream.str(page_size); + str_stream >> page_size_; + DLOG_I << "ObjListData constructor page size:" << page_size_; + } + + ~ObjListData() { + for (int i = 0; i < pages_.size(); i++) { + PageStore::release_page(pages_[i]); + } + } ObjListData(const ObjListData&) = delete; ObjListData& operator=(const ObjListData&) = delete; @@ -35,30 +54,215 @@ class ObjListData { ObjListData(ObjListData&&) = delete; ObjListData& operator=(ObjListData&&) = delete; - inline size_t get_size() const { return data_.size() - num_del_; } - inline size_t get_vector_size() const { return data_.size(); } + inline size_t get_sorted_size() const { return sorted_size_; } + + inline size_t get_num_del() const { return num_del_; } + + inline size_t get_hashed_size() const { return hashed_objs_.size(); } + + inline size_t get_size() const { + if (!check_data_in_memory()) { + return size_ - num_del_; + } + return data_.size() - num_del_; + } + + inline size_t get_vector_size() const { + if (!check_data_in_memory()) { + return size_; + } + return data_.size(); + } + + inline ObjT& get(size_t idx) { + if (!check_data_in_memory()) { + read_data_from_disk(); + } + return data_[idx]; + } + + inline void clear() { data_.clear(); } + + inline std::vector& get_data() const { + if (!check_data_in_memory()) { + read_data_from_disk(); + } + return data_; + } + + inline std::vector& get_del_bitmap() const { return del_bitmap_; } + + inline std::unordered_map& get_hashed_objs() const { return hashed_objs_; } + + inline size_t& get_sorted_size() { return sorted_size_; } - void clear() { data_.clear(); } + inline size_t& get_num_del() { return num_del_; } + + size_t delete_object(const ObjT* const obj_ptr) { + size_t idx = index_of(obj_ptr); + if (del_bitmap_[idx] == false) { + del_bitmap_[idx] = true; + num_del_ += 1; + } + return idx; + } + + size_t delete_object(const typename ObjT::KeyT& key) { + size_t idx; + if (hashed_objs_.find(key) != hashed_objs_.end()) { + idx = hashed_objs_[key]; + } else { + idx = index_of(key); + } + if (del_bitmap_[idx] == false) { + del_bitmap_[idx] = true; + num_del_ += 1; + } + return idx; + } // Find the index of an obj + size_t index_of(const typename ObjT::KeyT& key) const { + if (!check_data_in_memory()) { + read_data_from_disk(); + } + for (int i = 0; i < data_.size(); i++) { + if (data_[i].id() == key) { + return i; + } + } + throw base::HuskyException("ObjListData::index_of error: key out of range"); + } + size_t index_of(const ObjT* const obj_ptr) const { + if (!check_data_in_memory()) { + throw base::HuskyException("ObjListData::index_of error: data is not in memory"); + read_data_from_disk(); + } size_t idx = obj_ptr - &data_[0]; if (idx < 0 || idx >= data_.size()) throw base::HuskyException("ObjListData::index_of error: index out of range"); return idx; } - friend BinStream& operator<<(BinStream& stream, ObjListData& obj_list_data) { - return stream << obj_list_data.data_; + friend BinStream& operator<<(BinStream& stream, ObjListData& objlist_data) { + return stream << objlist_data.data_; } - friend BinStream& operator>>(BinStream& stream, ObjListData& obj_list_data) { - return stream >> obj_list_data.data_; + friend BinStream& operator>>(BinStream& stream, ObjListData& objlist_data) { + return stream >> objlist_data.data_; + } + + // check whether data_ is valid to be manipulated + bool check_data_in_memory() const { return data_in_memory_; } + + // check whether all the pages are in the memory + bool check_pages_in_memory() const { + for (int i = 0; i < pages_.size(); i++) { + if (!pages_[i]->in_memory()) { + return false; + } + } + return true; + } + + protected: + // write data to the pages + void write_to_disk() { + // serialize the data into binstream + BinStream bs; + bs << data_; + byte_size_ = bs.size(); + + // since deletion finalize may actually delete some data + // we may not need that many pages any more + // get rid of some + size_t num_pages = pages_.size(); + DLOG_I << "before we have " << num_pages << " pages"; + while (byte_size_ + page_size_ <= page_size_ * num_pages && num_pages >= 1) { + Page* page = pages_.back(); + pages_.pop_back(); + PageStore::release_page(page); + num_pages -= 1; + } + DLOG_I << "after we have " << num_pages << " pages"; + + // make sure every page is in the memory + read_pages_from_disk(); + size_t start = 0; + size_t size_bin = bs.size(); + DLOG_I << "the size of binstream written to disk is:" << size_bin; + for (int i = 0; i < pages_.size(); i++) { + pages_[i]->clear_bin(); + // write a subset of binstream to this page + BinStream sub = bs.sub_stream(start, page_size_); + size_t written = sub.size(); + pages_[i]->write(std::move(sub)); + bool ret = pages_[i]->write_to_disk(); + if (!ret) + throw base::HuskyException("when writing data to disk, a page cannot write to its file."); + start += written; + } + // all data already on disk, so clear it from memory + clear_data_from_memory(); + } + + // turn data_ into a emptry vector + void clear_data_from_memory() { + size_ = data_.size(); + std::vector tmp; + data_.swap(tmp); + byte_size_ = 0; + data_in_memory_ = false; + } + + // put all pages into memory pool + void read_pages_from_disk() const { + DLOG_I << "read pages from disk"; + if (check_pages_in_memory()) { + DLOG_I << "all pages are in memory"; + return; + } + DLOG_I << "Not all pages are in memory"; + for (int i = 0; i < pages_.size(); i++) { + if (!pages_[i]->in_memory()) { + MemoryPool::get_mem_pool().request_page(pages_[i]->get_key(), pages_[i]); + } + } + if (!check_pages_in_memory()) + throw base::HuskyException( + "read pages from disk: The memory pool cannot hold all pages in this objlist_data"); + } + // put all pages into memory pool and turn them into a data vector + void read_data_from_disk() const { + BinStream bs; + read_pages_from_disk(); + for (int i = 0; i < pages_.size(); i++) { + bs.append(pages_[i]->get_bin()); + } + bs >> data_; + byte_size_ = data_.size(); + data_in_memory_ = true; + for (int i = 0; i < pages_.size(); i++) { + pages_[i]->clear_bin(); + } } private: - std::vector data_; - size_t num_del_ = 0; + // info about all the pages which required by this objlist_data + mutable std::vector pages_; + mutable std::vector data_; + mutable std::vector del_bitmap_; + mutable std::unordered_map hashed_objs_; + + mutable bool data_in_memory_ = true; + // number of objects in data vector + // return size_ when user request 'size' without data vector being in the memory + mutable size_t size_ = 0; + mutable size_t sorted_size_ = 0; + mutable size_t num_del_ = 0; + int page_size_ = 0; + mutable int64_t byte_size_ = 0; template friend class ObjList; diff --git a/core/objlist_unittest.cpp b/core/objlist_unittest.cpp index 0d10e3d..d25cf24 100644 --- a/core/objlist_unittest.cpp +++ b/core/objlist_unittest.cpp @@ -1,11 +1,15 @@ #include "core/objlist.hpp" #include +#include #include #include #include "gtest/gtest.h" +#include "core/memory_pool.hpp" +#include "core/page_store.hpp" + namespace husky { namespace { @@ -15,13 +19,27 @@ class TestObjList : public testing::Test { ~TestObjList() {} protected: - void SetUp() {} - void TearDown() {} + void SetUp() { + MemoryPool::free_mem_pool(); + PageStore::drop_all_pages(); + PageStore::free_page_map(); + Config config; + int64_t max_thread_mem = 1024 * 1024 * 32; + config.set_param("maximum_thread_memory", std::to_string(max_thread_mem)); + config.set_param("page_size", "4194304"); + Context::set_config(std::move(config)); + Context::set_local_tid(0); + } + void TearDown() { + MemoryPool::free_mem_pool(); + PageStore::drop_all_pages(); + PageStore::free_page_map(); + } }; class Obj { public: - using KeyT = int; + using KeyT = int64_t; KeyT key; const KeyT& id() const { return key; } Obj() {} @@ -32,6 +50,8 @@ class Obj { friend BinStream& operator>>(BinStream& stream, Obj& obj) { return stream >> obj.key; } }; +size_t size__ = 100; + TEST_F(TestObjList, InitAndDelete) { ObjList* obj_list = new ObjList(); ASSERT_TRUE(obj_list != nullptr); @@ -40,41 +60,41 @@ TEST_F(TestObjList, InitAndDelete) { TEST_F(TestObjList, AddObject) { ObjList obj_list; - for (int i = 0; i < 10; ++i) { + for (int64_t i = 0; i < size__; ++i) { Obj obj(i); obj_list.add_object(obj); } std::vector& v = obj_list.get_data(); - for (int i = 0; i < 10; ++i) { + for (int64_t i = 0; i < size__; ++i) { EXPECT_EQ(v[i].key, i); } } TEST_F(TestObjList, AddMoveObject) { ObjList obj_list; - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < size__; ++i) { Obj obj(i); obj_list.add_object(std::move(obj)); } std::vector& v = obj_list.get_data(); - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < size__; ++i) { EXPECT_EQ(v[i].key, i); } } TEST_F(TestObjList, Sort) { ObjList obj_list; - for (int i = 0; i < 10; ++i) { - Obj obj(10 - i - 1); + for (int i = 0; i < size__; ++i) { + Obj obj(size__ - i - 1); obj_list.add_object(std::move(obj)); } obj_list.sort(); - EXPECT_EQ(obj_list.get_sorted_size(), 10); + EXPECT_EQ(obj_list.get_sorted_size(), size__); EXPECT_EQ(obj_list.get_num_del(), 0); EXPECT_EQ(obj_list.get_hashed_size(), 0); - EXPECT_EQ(obj_list.get_size(), 10); + EXPECT_EQ(obj_list.get_size(), size__); std::vector& v = obj_list.get_data(); - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < size__; ++i) { EXPECT_EQ(v[i].key, i); } } @@ -85,11 +105,15 @@ TEST_F(TestObjList, Delete) { Obj obj(i); obj_list.add_object(std::move(obj)); } + EXPECT_EQ(obj_list.get_size(), 10); std::vector& v = obj_list.get_data(); Obj* p = &v[3]; Obj* p2 = &v[7]; obj_list.delete_object(p); + EXPECT_EQ(obj_list.get_num_del(), 1); + EXPECT_EQ(obj_list.get_size(), 9); obj_list.delete_object(p2); + EXPECT_EQ(obj_list.get_size(), 8); EXPECT_EQ(obj_list.get_num_del(), 2); EXPECT_EQ(obj_list.get_del(3), 1); EXPECT_EQ(obj_list.get_del(5), 0); @@ -131,47 +155,14 @@ TEST_F(TestObjList, IndexOf) { } } -TEST_F(TestObjList, WriteAndRead) { - ObjList list_to_write; - for (int i = 10; i > 0; --i) { - Obj obj(i); - list_to_write.add_object(std::move(obj)); - } - list_to_write.sort(); - list_to_write.add_object(std::move(Obj(13))); - list_to_write.add_object(std::move(Obj(12))); - list_to_write.add_object(std::move(Obj(11))); - std::vector& v = list_to_write.get_data(); - Obj* p = &v[0]; - Obj* p2 = &v[10]; - list_to_write.delete_object(p); // rm 1 - list_to_write.delete_object(p2); // rm 13 - size_t old_objlist_size = list_to_write.get_size(); - - EXPECT_TRUE(list_to_write.write_to_disk()); - size_t data_capacity_after_write = list_to_write.get_data().capacity(); - size_t bitmap_capacity_after_write = list_to_write.get_del_bitmap().capacity(); - EXPECT_EQ(data_capacity_after_write, 0); - EXPECT_EQ(bitmap_capacity_after_write, 0); - - std::string list_to_write_path = list_to_write.id2str(); - ObjList list_to_read; - list_to_read.read_from_disk(list_to_write_path); - - EXPECT_EQ(list_to_read.get_size(), old_objlist_size); - EXPECT_EQ(list_to_read.get_size(), list_to_read.get_sorted_size()); - EXPECT_EQ(list_to_read.get_size(), list_to_read.get_del_bitmap().size()); - EXPECT_EQ(list_to_read.get_hashed_size(), 0); - EXPECT_EQ(list_to_read.get_num_del(), 0); - - for (size_t i = 0; i < list_to_read.get_size() - 1; i++) - EXPECT_LE(list_to_read.get(i).id(), list_to_read.get(i + 1).id()); - for (size_t i = 0; i < list_to_read.get_size() - 1; i++) - EXPECT_EQ(list_to_read.get_del(i), false); -} - TEST_F(TestObjList, EstimatedStorage) { - const size_t len = 1000 * 1000 * 10; + std::string str = Context::get_param("maximum_thread_memory"); + std::stringstream str_stream(str); + int64_t max_thread_mem; + str_stream >> max_thread_mem; + int size_obj = sizeof(Obj); + + const size_t len = max_thread_mem / (size_obj + 4); ObjList test_list; for (size_t i = 0; i < len; ++i) { Obj obj(i); @@ -186,5 +177,58 @@ TEST_F(TestObjList, EstimatedStorage) { EXPECT_EQ(diff_time, 1); } +TEST_F(TestObjList, Large) { + auto& mem_pool = MemoryPool::get_mem_pool(); + + std::string str = Context::get_param("maximum_thread_memory"); + std::stringstream str_stream(str); + int64_t max_thread_mem; + str_stream >> max_thread_mem; + int size_obj = sizeof(Obj); + str = Context::get_param("page_size"); + str_stream.clear(); + str_stream.str(str); + int page_size; + str_stream >> page_size; + + EXPECT_EQ(mem_pool.capacity(), max_thread_mem / page_size); + + const size_t len = max_thread_mem / size_obj; + const size_t del_len = 1024 * 1024; + ObjList list1; + ObjList list2; + ObjList list3; + + for (size_t i = 0; i < len; ++i) { + Obj obj(i); + list1.add_object(obj); + } + + for (size_t i = 0; i < del_len; ++i) { + list1.delete_object(i); + } + + for (size_t i = 0; i < len; ++i) { + Obj obj(i); + list2.add_object(obj); + } + + EXPECT_FALSE(list1.check_data_in_memory()); + EXPECT_EQ(list1.get_size(), len - del_len); + + Obj* ptr = list1.find(1024 * 1024); + size_t idx = list1.index_of(ptr); + EXPECT_EQ(idx, 0); + + EXPECT_FALSE(list2.check_data_in_memory()); + EXPECT_EQ(list1.get_size(), len - del_len); + + list1.delete_object(1024 * 1024); + EXPECT_EQ(list1.get_num_del(), 1); + list1.deletion_finalize(); + EXPECT_EQ(list1.get_num_del(), 0); + EXPECT_EQ(list1.get_size(), len - del_len - 1); +} + } // namespace } // namespace husky diff --git a/core/page.cpp b/core/page.cpp new file mode 100644 index 0000000..740e9bc --- /dev/null +++ b/core/page.cpp @@ -0,0 +1,48 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "page.hpp" + +#include + +#include "objlist.hpp" + +namespace husky { + +void Page::swap_in_memory() { + // set the flag first so that other method can be called + in_memory_ = true; + read_from_disk(); +} + +void Page::swap_out_memory() { + // tell the objlist data that this page owned by it is swapped out of memory. + DLOG_I << "page " + std::to_string(this->id_) + " is about to be swapped out of the memory"; + if (owner_ != nullptr) { + DLOG_I << "owner is not nullptr"; + owner_->clear_page_from_memory(this); + } + bool ret = write_to_disk(); + if (!ret) + throw base::HuskyException("page " + std::to_string(this->id_) + + " fails to write to disk when being swapped out of the memory"); + clear_bin(); + in_memory_ = false; +} + +void Page::set_owner(ObjListBase* owner) { owner_ = owner; } + +ObjListBase* Page::get_owner() { return owner_; } + +} // namespace husky diff --git a/core/page.hpp b/core/page.hpp new file mode 100644 index 0000000..6cc2bf8 --- /dev/null +++ b/core/page.hpp @@ -0,0 +1,179 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "base/disk_store.hpp" +#include "base/serialization.hpp" +#include "core/context.hpp" + +namespace husky { + +class ObjListBase; + +using base::DiskStore; +using base::BinStream; + +class Page { + public: + using KeyT = size_t; + + explicit Page(KeyT id) : id_(id), tid_(Context::get_local_tid()) { + this->file_name_ = "/var/tmp/page-" + std::to_string(tid_) + "-" + std::to_string(id_); + std::string page_size = Context::get_param("page_size"); + std::stringstream str_stream; + str_stream.str(page_size); + str_stream >> this->all_bytes_; + this->free_bytes_ = all_bytes_; + this->used_bytes_ = 0; + this->in_memory_ = false; + this->bin_in_memory_ = false; + } + + std::string get_file_name() { return file_name_; } + + size_t get_key() { return id_; } + + size_t get_id() { return id_; } + + // called by page store when the page is released finally + // remove file created temporarily + void finalize() { + std::ifstream f(this->file_name_.c_str()); + if (!f.good()) + return; + int ret = remove(file_name_.c_str()); + if (ret != 0) { + LOG_I << "Fail to delete a page file " << file_name_; + } + } + + // currently not used + void take_memory(int64_t bytes) { + used_bytes_ += bytes; + free_bytes_ -= bytes; + } + + // currently not used + void release_memory(int64_t bytes) { + free_bytes_ += bytes; + used_bytes_ -= bytes; + } + + // currently not used + size_t free_memory() { return free_bytes_; } + + // currently not used + size_t used_memory() { return used_bytes_; } + + size_t all_memory() { return all_bytes_; } + + const BinStream& get_bin() const { return bs_; } + + BinStream& get_bin() { + if (!in_memory()) + throw base::HuskyException("a page wants to get_bin when it is not in memory"); + if (!check_bin_in_memory()) { + read_from_disk(); + } + return bs_; + } + + bool check_bin_in_memory() { return bin_in_memory_; } + // write into the binstream + // buffered so that it will be finally written onto the disk once for all + size_t write(const BinStream& bs) { + size_t ret = bs.size(); + bs_.append(bs); + return ret; + } + + size_t write(const std::vector& v) { + BinStream bs(v); + return write(bs); + } + + size_t write(std::vector&& v) { + BinStream bs(std::move(v)); + return write(bs); + } + + bool write_to_disk() { + if (!in_memory_) + throw base::HuskyException("page " + std::to_string(id_) + " on thread " + std::to_string(tid_) + + " cannot write to disk because it is not in memory."); + DiskStore ds(file_name_); + return ds.write(bs_); + } + + // read the data from disk into binstream + void read_from_disk() { + if (!in_memory_) + throw base::HuskyException("page " + std::to_string(id_) + " on thread " + std::to_string(tid_) + + " cannot read from disk because it is not in memory."); + DiskStore ds(file_name_); + bs_ = ds.read(); + bin_in_memory_ = true; + } + + void clear_bin() { + bs_.clear(); + bin_in_memory_ = false; + } + + bool in_memory() { return in_memory_; } + + bool on_disk() { return !in_memory_; } + + size_t get_bin_size() { + if (!check_bin_in_memory()) { + read_from_disk(); + } + return bs_.size(); + } + + // called by memory pool to swap in the page + // when it put page into the cache owned by memory pool + void swap_in_memory(); + // called by memory pool to swap out the page + // when page is deleted by the cached owned by memory pool + void swap_out_memory(); + + void set_owner(ObjListBase* owner); + + ObjListBase* get_owner(); + + protected: + std::string file_name_; + BinStream bs_; + bool in_memory_; + bool bin_in_memory_; + int64_t all_bytes_; + int64_t free_bytes_; + int64_t used_bytes_; + + private: + size_t id_; + size_t tid_; + ObjListBase* owner_ = nullptr; +}; + +} // namespace husky diff --git a/core/page_store.cpp b/core/page_store.cpp new file mode 100644 index 0000000..3e95d2e --- /dev/null +++ b/core/page_store.cpp @@ -0,0 +1,67 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "core/page_store.hpp" + +#include "base/session_local.hpp" + +namespace husky { + +thread_local PageMap* PageStore::page_map_ = nullptr; +thread_local PageSet* PageStore::page_set_ = nullptr; +thread_local size_t PageStore::s_counter = 0; + +// set finalize_all_objlists priority to Level1, the higher the level, the higher the priority +static thread_local base::RegSessionThreadFinalizer finalize_all_objlists(base::SessionLocalPriority::Level1, []() { + PageStore::drop_all_pages(); + PageStore::free_page_map(); +}); + +void PageStore::drop_all_pages() { + if (page_map_ == nullptr) + return; + for (auto& page_pair : (*page_map_)) { + page_pair.second->finalize(); + delete page_pair.second; + } + page_map_->clear(); + page_set_->clear(); +} + +void PageStore::init_page_map() { + if (page_map_ == nullptr) + page_map_ = new PageMap(); + + if (page_set_ == nullptr) + page_set_ = new PageSet(); +} + +void PageStore::free_page_map() { + if (page_map_ != nullptr) + delete page_map_; + page_map_ = nullptr; + + if (page_set_ != nullptr) + delete page_set_; + page_set_ = nullptr; + + s_counter = 0; +} + +PageMap& PageStore::get_page_map() { + init_page_map(); + return *page_map_; +} + +} // namespace husky diff --git a/core/page_store.hpp b/core/page_store.hpp new file mode 100644 index 0000000..26d68de --- /dev/null +++ b/core/page_store.hpp @@ -0,0 +1,69 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#include "core/page.hpp" + +namespace husky { + +typedef std::unordered_map PageMap; + +typedef std::unordered_set PageSet; + +class PageStore { + public: + static Page* create_page() { + PageMap& page_map = get_page_map(); + Page* page; + if (page_set_->size() != 0) { + auto it = page_set_->begin(); + page = *it; + page_set_->erase(it); + return page; + } + page = new Page(s_counter); + s_counter++; + page_map.insert({page->get_id(), page}); + DLOG_I << "page store has " << page_map.size() << " pages"; + return page; + } + + static bool release_page(Page* page) { + PageMap& page_map = get_page_map(); + if (page_map.find(page->get_id()) == page_map.end()) + throw base::HuskyException("The page to be released is not created by this page store"); + if (page_set_->find(page) != page_set_->end()) + return false; + page->set_owner(nullptr); + page_set_->insert(page); + return true; + } + + static void drop_all_pages(); + static void init_page_map(); + static void free_page_map(); + static PageMap& get_page_map(); + + protected: + static thread_local PageSet* page_set_; + static thread_local PageMap* page_map_; + static thread_local size_t s_counter; +}; + +} // namespace husky diff --git a/core/page_store_unittest.cpp b/core/page_store_unittest.cpp new file mode 100644 index 0000000..befa623 --- /dev/null +++ b/core/page_store_unittest.cpp @@ -0,0 +1,49 @@ +#include "core/page_store.hpp" + +#include + +#include "gtest/gtest.h" + +#include "base/session_local.hpp" +#include "core/memory_pool.hpp" + +namespace husky { +namespace { + +class TestPageStore : public testing::Test { + public: + TestPageStore() {} + ~TestPageStore() {} + + protected: + void SetUp() { + MemoryPool::free_mem_pool(); + PageStore::drop_all_pages(); + PageStore::free_page_map(); + Config config; + int64_t max_thread_mem = 1024 * 1024 * 32; + config.set_param("maximum_thread_memory", std::to_string(max_thread_mem)); + config.set_param("page_size", "4194304"); + Context::set_config(std::move(config)); + Context::set_local_tid(0); + } + void TearDown() { + MemoryPool::free_mem_pool(); + PageStore::drop_all_pages(); + PageStore::free_page_map(); + } +}; + +TEST_F(TestPageStore, Functional) { + Page* p1 = PageStore::create_page(); + EXPECT_EQ(p1->get_id(), 0); + auto p2 = PageStore::create_page(); + EXPECT_EQ(p2->get_id(), 1); + auto p3 = PageStore::create_page(); + EXPECT_EQ(p3->get_id(), 2); + auto p4 = PageStore::create_page(); + EXPECT_EQ(p4->get_id(), 3); +} + +} // namespace +} // namespace husky diff --git a/core/page_unittest.cpp b/core/page_unittest.cpp new file mode 100644 index 0000000..eb52598 --- /dev/null +++ b/core/page_unittest.cpp @@ -0,0 +1,65 @@ +#include "core/page.hpp" + +#include +#include +#include + +#include "gtest/gtest.h" + +#include "core/memory_pool.hpp" +#include "core/page_store.hpp" + +namespace husky { +namespace { + +class TestPage : public testing::Test { + public: + TestPage() {} + ~TestPage() {} + + protected: + void SetUp() { + MemoryPool::free_mem_pool(); + PageStore::drop_all_pages(); + PageStore::free_page_map(); + Config config; + int64_t max_thread_mem = 1024 * 1024 * 32; + config.set_param("maximum_thread_memory", std::to_string(max_thread_mem)); + config.set_param("page_size", "4194304"); + Context::set_config(std::move(config)); + Context::set_local_tid(0); + } + void TearDown() { + MemoryPool::free_mem_pool(); + PageStore::drop_all_pages(); + PageStore::free_page_map(); + } +}; + +TEST_F(TestPage, Functional) { + Context::set_local_tid(0); + Page* p1 = PageStore::create_page(); + Page* p2 = PageStore::create_page(); + + EXPECT_TRUE(MemoryPool::get_mem_pool().request_page(p1->get_key(), p1)); + + std::vector vec; + for (int i = 0; i < 26; i++) { + vec.push_back('a' + i); + } + + BinStream bs(vec); + EXPECT_EQ(p1->write(bs), 26); + EXPECT_EQ(p1->write(vec), 26); + + BinStream& inner_bs = p1->get_bin(); + int size = inner_bs.size(); + EXPECT_EQ(size, 52); + + p1->clear_bin(); + size = inner_bs.size(); + EXPECT_EQ(size, 0); +} + +} // namespace +} // namespace husky diff --git a/examples/aggregator.cpp b/examples/aggregator.cpp index 0d9f8f4..77be6c3 100644 --- a/examples/aggregator.cpp +++ b/examples/aggregator.cpp @@ -38,6 +38,16 @@ class Word { KeyT word; int count = 0; + + friend husky::BinStream& operator<<(husky::BinStream& stream, const Word& word) { + stream << word.word << word.count; + return stream; + } + + friend husky::BinStream& operator>>(husky::BinStream& stream, Word& word) { + stream >> word.word >> word.count; + return stream; + } }; void aggregator() { diff --git a/examples/pi.cpp b/examples/pi.cpp index 26f441d..acfeaf6 100644 --- a/examples/pi.cpp +++ b/examples/pi.cpp @@ -23,9 +23,21 @@ class PIObject { typedef int KeyT; int key; + PIObject() = default; + explicit PIObject(KeyT key) { this->key = key; } const int& id() const { return key; } + + // Serialization and deserialization + friend husky::BinStream& operator<<(husky::BinStream& stream, PIObject obj) { + stream << obj.key; + return stream; + } + friend husky::BinStream& operator>>(husky::BinStream& stream, PIObject& obj) { + stream >> obj.key; + return stream; + } }; void pi() { diff --git a/examples/tfidf.cpp b/examples/tfidf.cpp index 70bbf0e..86a893e 100644 --- a/examples/tfidf.cpp +++ b/examples/tfidf.cpp @@ -97,6 +97,15 @@ class Term { KeyT termid; double idf; const KeyT& id() const { return termid; } + + friend husky::BinStream& operator<<(husky::BinStream& stream, const Term& term) { + stream << term.termid << term.idf; + return stream; + } + + friend husky::BinStream& operator>>(husky::BinStream& stream, Term term) { + stream >> term.termid >> term.idf; + } }; void tfidf() { diff --git a/examples/wc_mr.cpp b/examples/wc_mr.cpp index b842261..cbd1f58 100644 --- a/examples/wc_mr.cpp +++ b/examples/wc_mr.cpp @@ -30,6 +30,16 @@ class Word { KeyT word; int count = 0; + + friend husky::BinStream& operator<<(husky::BinStream& stream, const Word& word) { + stream << word.word << word.count; + return stream; + } + + friend husky::BinStream& operator>>(husky::BinStream& stream, Word& word) { + stream >> word.word >> word.count; + return stream; + } }; void wc() { diff --git a/examples/wc_mr_flume.cpp b/examples/wc_mr_flume.cpp index d4432f3..a0580cb 100644 --- a/examples/wc_mr_flume.cpp +++ b/examples/wc_mr_flume.cpp @@ -31,6 +31,16 @@ class Word { KeyT word; int count = 0; + + friend husky::BinStream& operator<<(husky::BinStream& stream, const Word& word) { + stream << word.word << word.count; + return stream; + } + + friend husky::BinStream& operator>>(husky::BinStream& stream, Word& word) { + stream >> word.word >> word.count; + return stream; + } }; void test_load() { diff --git a/examples/wc_mr_mongo.cpp b/examples/wc_mr_mongo.cpp index 63021fd..26ca78c 100644 --- a/examples/wc_mr_mongo.cpp +++ b/examples/wc_mr_mongo.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -36,6 +37,14 @@ class Word { KeyT word; int count = 0; + + friend husky::BinStream& operator<<(husky::BinStream& stream, const Word& w) { + stream << w.word << w.count; + } + + friend husky::BinStream& operator>>(husky::BinStream& stream, Word& w) { + stream >> w.word >> w.count; + } }; bool operator<(const std::pair& a, const std::pair& b) { @@ -49,7 +58,9 @@ void wc() { infmt.set_query(""); auto& word_list = husky::ObjListStore::create_objlist(); + auto& word_list2 = husky::ObjListStore::create_objlist(); auto& ch = husky::ChannelStore::create_push_combined_channel>(infmt, word_list); + auto& ch2 = husky::ChannelStore::create_push_combined_channel>(infmt, word_list2); auto parse_wc = [&](std::string& chunk) { mongo::BSONObj o = mongo::fromjson(chunk); @@ -59,7 +70,16 @@ void wc() { boost::char_separator sep(" \t"); boost::tokenizer> tok(content, sep); for (auto& w : tok) { + std::random_device rd; + std::mt19937 generator(rd()); + std::uniform_real_distribution distribution(-1.0, 1.0); + double x = distribution(generator); + if (x < 0.4) { + ch.push(1, w+"1"); + ch2.push(1, w+"1"); + } ch.push(1, w); + ch2.push(1, w); } }; @@ -97,6 +117,9 @@ void wc() { unique_topk.update(add_to_topk, std::make_pair(ch.get(word), word.id())); }); + husky::list_execute(word_list2, [](Word& word){ + }); + husky::lib::AggregatorFactory::sync(); if (husky::Context::get_global_tid() == 0) { diff --git a/lib/ml/feature_label.hpp b/lib/ml/feature_label.hpp index 86172b6..f61cd5d 100644 --- a/lib/ml/feature_label.hpp +++ b/lib/ml/feature_label.hpp @@ -35,6 +35,15 @@ class LabeledPointHObj : public LabeledPoint, LabelT explicit LabeledPointHObj(int feature_num) : LabeledPoint() { this->x = FeatureV(feature_num); } LabeledPointHObj(FeatureV& x, LabelT& y) : LabeledPoint(x, y) {} LabeledPointHObj(FeatureV&& x, LabelT&& y) : LabeledPoint(x, y) {} + + friend husky::BinStream& operator<<(husky::BinStream& stream, + const LabeledPointHObj& b) { + stream << b.x << b.y << b.key; + } + + friend husky::BinStream& operator>>(husky::BinStream& stream, LabeledPointHObj& b) { + stream >> b.x >> b.y >> b.key; + } }; // LabeledPointHObj } // namespace ml