Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Add an ObjListPool, LRU Cache, FIFO Cache to support swapping of ObjLists later #265

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,5 @@ script:
# Run unit test.
- make -j4 HuskyUnitTest
- ./HuskyUnitTest --gtest_shuffle # To check if the order causes some time-out failures.
# Test if example PI succeeds to run
- ./PI --master_host localhost --master_port 10086 --comm_port 10010 --worker.info localhost:4
7 changes: 7 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ if(THRIFT_FOUND)
list(APPEND HUSKY_EXTERNAL_DEFINITION ${THRIFT_DEFINITION})
endif(THRIFT_FOUND)

# ORC
if(ORC_FOUND)
list(APPEND HUSKY_EXTERNAL_INCLUDE ${ORC_INCLUDE_DIR})
list(APPEND HUSKY_EXTERNAL_LIB ${ORC_LIBRARY})
list(APPEND HUSKY_EXTERNAL_DEFINITION ${ORC_DEFINITION})
endif(ORC_FOUND)

if(WIN32)
list(APPEND HUSKY_EXTERNAL_LIB wsock32 ws2_32)
endif()
Expand Down
3 changes: 2 additions & 1 deletion base/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ file(GLOB base-src-files
disk_store.cpp
serialization.cpp
session_local.cpp
thread_support.cpp)
thread_support.cpp
memory.cpp)
husky_cache_variable(base-src-files ${base-src-files})

add_library(base-objs OBJECT ${base-src-files})
Expand Down
16 changes: 15 additions & 1 deletion base/disk_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,23 @@ BinStream DiskStore::read() {

// TODO(legend): Will use `pwrite` or other asio functions instead.
bool DiskStore::write(BinStream&& bs) {
if (bs.get_buffer_vector().empty())
// if (bs.get_buffer_vector().empty())
// return false;

std::ofstream file;
file.open(path_, std::ofstream::out | std::ofstream::binary);
if (!file)
return false;

std::copy(bs.get_buffer_vector().begin(), bs.get_buffer_vector().end(), std::ostreambuf_iterator<char>(file));
file.close();
return true;
}

bool DiskStore::write(BinStream& bs) {
// if (bs.get_buffer_vector().empty())
// return false;

std::ofstream file;
file.open(path_, std::ofstream::out | std::ofstream::binary);
if (!file)
Expand Down
1 change: 1 addition & 0 deletions base/disk_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class DiskStore {

BinStream read();
bool write(BinStream&& bs);
bool write(BinStream& bs);

private:
std::string path_;
Expand Down
22 changes: 22 additions & 0 deletions base/memory.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2016 Husky Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "base/memory.hpp"

namespace husky {
namespace base {

struct sysinfo Memory::mem_info;
}
}
43 changes: 43 additions & 0 deletions base/memory.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2016 Husky Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "sys/sysinfo.h"
#include "sys/types.h"

namespace husky {
namespace base {
class Memory {
public:
static int64_t total_phys_mem() {
sysinfo(&mem_info);
int64_t total_phys_mem = mem_info.totalram;
total_phys_mem *= mem_info.mem_unit;
return total_phys_mem;
}

static int64_t total_virtual_mem() {
sysinfo(&mem_info);
int64_t total_vir_mem = mem_info.totalram;
total_vir_mem += mem_info.totalswap;
total_vir_mem *= mem_info.mem_unit;
return total_vir_mem;
}

private:
static struct sysinfo mem_info;
};
} // namespace base
} // namespace husky
15 changes: 15 additions & 0 deletions base/serialization.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,21 @@ void BinStream::resize(size_t size) {

void BinStream::seek(size_t pos) { front_ = pos; }

BinStream BinStream::sub_stream(size_t s_pos, size_t len) {
int size = buffer_.size();
auto it_start = buffer_.begin() + front_ + s_pos;
if (front_ + s_pos > size) {
std::vector<char> v;
return BinStream(std::move(v));
}
auto it_end = it_start + len;
if (front_ + s_pos + len > size) {
it_end = buffer_.end();
}
std::vector<char> v(it_start, it_end);
return BinStream(std::move(v));
}

void BinStream::push_back_bytes(const char* src, size_t sz) {
buffer_.insert(buffer_.end(), (const char*) src, (const char*) src + sz);
}
Expand Down
4 changes: 3 additions & 1 deletion base/serialization.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class BinStream {
void purge();
void resize(size_t size);
void seek(size_t pos);
BinStream sub_stream(size_t s_pos, size_t len);

void append(const BinStream& m);
void push_back_bytes(const char* src, size_t sz);
Expand Down Expand Up @@ -145,8 +146,9 @@ BinStream& operator>>(BinStream& stream, std::vector<OutputT>& v) {
stream >> len;
v.clear();
v.resize(len);
for (int i = 0; i < v.size(); ++i)
for (int i = 0; i < v.size(); ++i) {
stream >> v[i];
}
return stream;
}

Expand Down
33 changes: 33 additions & 0 deletions cmake/dep.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,36 @@ if(WITHOUT_THRIFT)
unset(THRIFT_FOUND)
message(STATUS "Not using Thrift due to WITHOUT_THRIFT option")
endif(WITHOUT_THRIFT)

### ORC ###

#NAMES liblz4.a liborc.a libprotobuf.a libsnappy.a libz.a
#NAMES ColumnPrinter.hh Int128.hh MemoryPool.hh orc-config.hh OrcFile.hh Reader.hh Type.hh Vector.hh
find_path(ORC_INCLUDE_DIR NAMES orc/OrcFile.hh)
find_library(ORC_L0 NAMES protobuf NO_CMAKE_ENVIRONMENT_PATH NO_CMAKE_SYSTEM_PATH NO_SYSTEM_ENVIRONMENT_PATH)
find_library(ORC_L1 NAMES z NO_CMAKE_ENVIRONMENT_PATH NO_CMAKE_SYSTEM_PATH NO_SYSTEM_ENVIRONMENT_PATH)
find_library(ORC_L2 NAMES lz4 NO_CMAKE_ENVIRONMENT_PATH NO_CMAKE_SYSTEM_PATH NO_SYSTEM_ENVIRONMENT_PATH)
find_library(ORC_L3 NAMES snappy NO_CMAKE_ENVIRONMENT_PATH NO_CMAKE_SYSTEM_PATH NO_SYSTEM_ENVIRONMENT_PATH)
find_library(ORC_L4 NAMES orc)

if (ORC_INCLUDE_DIR AND ORC_L1 AND ORC_L0 AND ORC_L2 AND ORC_L3 AND ORC_L4)
set(ORC_FOUND true)
endif (ORC_INCLUDE_DIR AND ORC_L1 AND ORC_L0 AND ORC_L2 AND ORC_L3 AND ORC_L4)
if (ORC_FOUND)
set(ORC_DEFINITION "-DWITH_ORC")
# The order is important for dependencies.
set(ORC_LIBRARY ${ORC_L4} ${ORC_L3} ${ORC_L2} ${ORC_L1} ${ORC_L0})
message (STATUS "Found ORC:")
message (STATUS " (Headers) ${ORC_INCLUDE_DIR}")
message (STATUS " (Library) ${ORC_L0}")
message (STATUS " (Library) ${ORC_L1}")
message (STATUS " (Library) ${ORC_L2}")
message (STATUS " (Library) ${ORC_L3}")
message (STATUS " (Library) ${ORC_L4}")
else(ORC_FOUND)
message (STATUS "Could NOT find ORC")
endif(ORC_FOUND)
if(WITHOUT_ORC)
unset(ORC_FOUND)
message(STATUS "Not using ORC due to WITHOUT_ORC option")
endif(WITHOUT_ORC)
7 changes: 5 additions & 2 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ add_subdirectory(channel)
file(GLOB core-src-files
accessor.cpp
accessor_store.cpp
balance.cpp
combiner.cpp
config.cpp
context.cpp
Expand All @@ -28,12 +29,14 @@ file(GLOB core-src-files
job_runner.cpp
mailbox.cpp
memory_checker.cpp
memory_pool.cpp
network.cpp
objlist.cpp
objlist_store.cpp
page.cpp
page_store.cpp
shuffle_combiner_store.cpp
worker_info.cpp
balance.cpp)
worker_info.cpp)
husky_cache_variable(core-src-files ${core-src-files})

add_library(core-objs OBJECT ${core-src-files})
Expand Down
8 changes: 7 additions & 1 deletion core/attrlist_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@ class TestAttrList : public testing::Test {
~TestAttrList() {}

protected:
void SetUp() {}
void SetUp() {
Config config;
config.set_param("maximum_thread_memory", "134217728");
config.set_param("page_size", "2097152");
Context::set_config(std::move(config));
Context::set_local_tid(0);
}
void TearDown() {}
};

Expand Down
149 changes: 149 additions & 0 deletions core/cache.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright 2016 Husky Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <list>
#include <unordered_map>
#include <utility>

#include "boost/optional.hpp"

namespace husky {

template <typename KeyT, typename ValT>
class ICache {
public:
explicit ICache(size_t size) { size_ = size; }

size_t get_max_size() { return size_; }

protected:
size_t size_;
};

template <typename KeyT, typename ValT>
class LRUCache : public ICache<KeyT, ValT> {
public:
using ListIterator = typename std::list<std::pair<KeyT, ValT>>::iterator;

explicit LRUCache(size_t size) : ICache<KeyT, ValT>(size) {}

~LRUCache() = default;

template <typename KeyU, typename ValU>
void put(KeyU&& key, ValU&& val) {
auto it_cache = cache_map_.find(key);
cache_list_.push_front(std::make_pair(key, val));
if (it_cache != cache_map_.end()) {
cache_list_.erase(it_cache->second);
cache_map_.erase(it_cache);
}
cache_map_[key] = cache_list_.begin();

if (cache_map_.size() > this->size_) {
del();
}
}

boost::optional<std::pair<KeyT, ValT>> del() {
if (cache_map_.size() == 0) {
return boost::none;
}
auto last = cache_list_.end();
last--;
KeyT key = last->first;
ValT objlist = last->second;
cache_map_.erase(key);
cache_list_.pop_back();
return *last;
}

boost::optional<std::pair<KeyT, ValT>> poll() {
if (cache_map_.size() == 0) {
return boost::none;
}
return cache_list_.back();
}

size_t get_size() { return cache_map_.size(); }

template <typename KeyU>
bool exists(KeyU&& key) {
return cache_map_.find(std::forward<KeyU>(key)) != cache_map_.end();
}

private:
std::unordered_map<KeyT, ListIterator> cache_map_;
std::list<std::pair<KeyT, ValT>> cache_list_;
};

template <typename KeyT, typename ValT>
class FIFOCache : public ICache<KeyT, ValT> {
public:
using ListIterator = typename std::list<std::pair<KeyT, ValT>>::iterator;

explicit FIFOCache(size_t size) : ICache<KeyT, ValT>(size) {}

~FIFOCache() = default;

template <typename KeyU, typename ValU>
void put(KeyU&& key, ValU&& val) {
auto it_cache = cache_map_.find(key);
if (it_cache != cache_map_.end() && it_cache->second->second != val) {
cache_list_.erase(it_cache->second);
cache_map_.erase(it_cache);
} else if (it_cache != cache_map_.end() && it_cache->second->second == val) {
return;
}
cache_list_.push_front(std::make_pair(key, val));
cache_map_[key] = cache_list_.begin();

if (cache_map_.size() > this->size_) {
del();
}
}

boost::optional<std::pair<KeyT, ValT>> del() {
if (cache_map_.size() == 0) {
return boost::none;
}
auto last = cache_list_.end();
last--;
KeyT key = last->first;
ValT objlist = last->second;
cache_map_.erase(key);
cache_list_.pop_back();
return *last;
}

boost::optional<std::pair<KeyT, ValT>> poll() {
if (cache_map_.size() == 0) {
return boost::none;
}
return cache_list_.back();
}

template <typename KeyU>
bool exists(KeyU&& key) {
return cache_map_.find(key) != cache_map_.end();
}

size_t get_size() { return cache_map_.size(); }

private:
std::unordered_map<KeyT, ListIterator> cache_map_;
std::list<std::pair<KeyT, ValT>> cache_list_;
};
} // namespace husky
Loading