Skip to content

Commit

Permalink
[Core] Add an ObjListPool, LRU Cache, FIFO Cache to support swapping …
Browse files Browse the repository at this point in the history
…of ObjLists later

issue #256
Open a pull request for more discussion and changes. Temporary version.
  • Loading branch information
lmatz committed Mar 15, 2017
1 parent 218b3db commit 5d969d4
Show file tree
Hide file tree
Showing 20 changed files with 1,182 additions and 152 deletions.
15 changes: 15 additions & 0 deletions base/disk_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,20 @@ bool DiskStore::write(BinStream&& bs) {
return true;
}

bool DiskStore::write(BinStream& bs) {
if (bs.get_buffer_vector().empty())
return false;

std::ofstream file;
file.open(path_, std::ofstream::out | std::ofstream::binary);
if (!file)
return false;

std::copy(bs.get_buffer_vector().begin(), bs.get_buffer_vector().end(), std::ostreambuf_iterator<char>(file));
file.close();
return true;
}


} // namespace base
} // namespace husky
1 change: 1 addition & 0 deletions base/disk_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class DiskStore {

BinStream read();
bool write(BinStream&& bs);
bool write(BinStream& bs);

private:
std::string path_;
Expand Down
15 changes: 15 additions & 0 deletions base/serialization.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,21 @@ void BinStream::resize(size_t size) {

void BinStream::seek(size_t pos) { front_ = pos; }

BinStream BinStream::sub_stream(size_t s_pos, size_t len) {
int size = buffer_.size();
auto it_start = buffer_.begin() + front_ + s_pos;
if ( front_+s_pos > size ) {
it_start = buffer_.end();
}
auto it_end = it_start +len;
if ( front_+s_pos+len > size ) {
it_end = buffer_.end();
}
std::vector<char> v(it_start, it_end);
return BinStream(std::move(v));
}


void BinStream::push_back_bytes(const char* src, size_t sz) {
buffer_.insert(buffer_.end(), (const char*) src, (const char*) src + sz);
}
Expand Down
10 changes: 7 additions & 3 deletions base/serialization.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class BinStream {
void purge();
void resize(size_t size);
void seek(size_t pos);
BinStream sub_stream(size_t s_pos, size_t len);

void append(const BinStream& m);
void push_back_bytes(const char* src, size_t sz);
Expand Down Expand Up @@ -144,9 +145,12 @@ BinStream& operator>>(BinStream& stream, std::vector<OutputT>& v) {
size_t len;
stream >> len;
v.clear();
v.resize(len);
for (int i = 0; i < v.size(); ++i)
stream >> v[i];
// v.resize(len);
for (int i = 0; i < v.size(); ++i) {
OutputT tmp;
stream >> tmp;
v.push_back(std::move(tmp));
}
return stream;
}

Expand Down
7 changes: 5 additions & 2 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ add_subdirectory(channel)
file(GLOB core-src-files
accessor.cpp
accessor_store.cpp
balance.cpp
combiner.cpp
config.cpp
context.cpp
Expand All @@ -28,12 +29,14 @@ file(GLOB core-src-files
job_runner.cpp
mailbox.cpp
memory_checker.cpp
memory_pool.cpp
network.cpp
objlist.cpp
objlist_store.cpp
page.cpp
page_store.cpp
shuffle_combiner_store.cpp
worker_info.cpp
balance.cpp)
worker_info.cpp)
husky_cache_variable(core-src-files ${core-src-files})

add_library(core-objs OBJECT ${core-src-files})
Expand Down
134 changes: 134 additions & 0 deletions core/cache.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright 2016 Husky Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <list>
#include <unordered_map>
#include <utility>

namespace husky {

template <typename KeyT, typename ValT>
class ICache {
public:
explicit ICache(size_t size) { size_ = size; }

size_t get_max_size() { return size_; }

protected:
size_t size_;
};

template <typename KeyT, typename ValT>
class LRUCache : public ICache<KeyT, ValT> {
public:
using ListIterator = typename std::list<std::pair<KeyT, ValT>>::iterator;

explicit LRUCache(size_t size) : ICache<KeyT, ValT>(size) {}

~LRUCache() = default;

template <typename KeyU, typename ValU>
void put(KeyU&& key, ValU&& val) {
auto it_cache = cache_map_.find(key);
cache_list_.push_front(std::make_pair(key, val));
if (it_cache != cache_map_.end()) {
cache_list_.erase(it_cache->second);
cache_map_.erase(it_cache);
}
cache_map_[key] = cache_list_.begin();

if (cache_map_.size() > this->size_) {
auto last = cache_list_.end();
last--;
cache_map_.erase(last->first);
cache_list_.pop_back();
}
}

std::pair<KeyT, ValT> del() {
auto last = cache_list_.end();
last--;
KeyT key = last->first;
ValT objlist = last->second;
cache_map_.erase(key);
cache_list_.pop_back();
return *last;
}

std::pair<KeyT, ValT> poll() { return cache_list_.back(); }

size_t get_size() { return cache_map_.size(); }

template <typename KeyU>
bool exists(KeyU&& key) {
return cache_map_.find(key) != cache_map_.end();
}

private:
std::unordered_map<KeyT, ListIterator> cache_map_;
std::list<std::pair<KeyT, ValT>> cache_list_;
};

template <typename KeyT, typename ValT>
class FIFOCache : public ICache<KeyT, ValT> {
public:
using ListIterator = typename std::list<std::pair<KeyT, ValT>>::iterator;

explicit FIFOCache(size_t size) : ICache<KeyT, ValT>(size) {}

~FIFOCache() = default;

template <typename KeyU, typename ValU>
void put(KeyU&& key, ValU&& val) {
auto it_cache = cache_map_.find(key);
if (it_cache != cache_map_.end() && it_cache->second->second != val) {
cache_list_.erase(it_cache->second);
cache_map_.erase(it_cache);
} else if (it_cache != cache_map_.end() && it_cache->second->second == val) {
return;
}
cache_list_.push_front(std::make_pair(key, val));
cache_map_[key] = cache_list_.begin();

if (cache_map_.size() > this->size_) {
del();
}
}

std::pair<KeyT, ValT> del() {
auto last = cache_list_.end();
last--;
KeyT key = last->first;
ValT objlist = last->second;
cache_map_.erase(key);
cache_list_.pop_back();
return *last;
}

std::pair<KeyT, ValT> poll() { return cache_list_.back(); }

template <typename KeyU>
bool exists(KeyU&& key) {
return cache_map_.find(key) != cache_map_.end();
}

size_t get_size() { return cache_map_.size(); }

private:
std::unordered_map<KeyT, ListIterator> cache_map_;
std::list<std::pair<KeyT, ValT>> cache_list_;
};
} // namespace husky
Loading

0 comments on commit 5d969d4

Please sign in to comment.